from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, JSONResponse from fastapi import FastAPI, File, UploadFile, HTTPException from fastapi.responses import JSONResponse from fastapi.responses import StreamingResponse from pydantic import BaseModel from typing import List, Dict import subprocess import glob import os import pandas as pd import re from io import BytesIO from fastapi import APIRouter from google_sheets_helper import get_sheet app = FastAPI() router = APIRouter() # CORS Middleware app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:5173"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) DATA_FOLDER = "D:\\lecturertask" BASE_FILENAME = "data_scopus" FILE_EXT = ".xlsx" BASE_DIR = "D:/lecturertask/download_files_scraper" folder_map = { "cleaned_pengabdian.xlsx": "pengabdian", "cleaned_penelitian.xlsx": "penelitian", "cleaned_hki.xlsx": "hki", "cleaned_scholar.xlsx": "scholar", "cleaned_scopus.xlsx": "scopus", } file_map = { "pengabdian": os.path.join(BASE_DIR, "pengabdian", "pengabdian_cleaned.xlsx"), "penelitian": os.path.join(BASE_DIR, "penelitian", "penelitian_cleaned.xlsx"), "hki": os.path.join(BASE_DIR, "hki", "hki_cleaned.xlsx"), "scopus": os.path.join(BASE_DIR, "scopus", "scopus_cleaned.xlsx"), "scholar": os.path.join(BASE_DIR, "scholar", "scholar_cleaned.xlsx"), } class SaveFileRequest(BaseModel): folder: str filename: str data: List[Dict] @router.post("/google-sheet/update/{category}") def update_sheet_from_local(category: str): path_map = { "penelitian": "D:/lecturertask/download_files_scraper/penelitian/penelitian_cleaned.xlsx", "pengabdian": "D:/lecturertask/download_files_scraper/pengabdian/pengabdian_cleaned.xlsx", "hki": "D:/lecturertask/download_files_scraper/hki/hki_cleaned.xlsx", "scopus": "D:/lecturertask/download_files_scraper/scopus/scopus_cleaned.xlsx", "scholar": "D:/lecturertask/download_files_scraper/scholar/scholar_cleaned.xlsx", } spreadsheet_id = "SPREADSHEET_ID_KAMU" worksheet_name = category if category not in path_map: raise HTTPException(status_code=400, detail="Kategori tidak valid") file_path = path_map[category] df = pd.read_excel(file_path) worksheet = get_sheet(spreadsheet_id, worksheet_name) worksheet.clear() # Set header + data worksheet.update([df.columns.values.tolist()] + df.values.tolist()) return {"message": f"Data dari {category} berhasil dikirim ke Google Sheets."} @app.post("/save-cleaned-file") async def save_cleaned_file(req: SaveFileRequest): # Tentukan path file base_path = r"D:\lecturertask\download_files_scraper" folder_path = os.path.join(base_path, req.folder) os.makedirs(folder_path, exist_ok=True) file_path = os.path.join(folder_path, req.filename) # Convert list of dict ke DataFrame df = pd.DataFrame(req.data) # Simpan ke Excel (overwrite) df.to_excel(file_path, index=False) return {"message": "File berhasil disimpan", "path": file_path} @app.delete("/delete_all_excel") def delete_all_excel_files(): folder_path = "D:/lecturertask/download_files_scraper" files = glob.glob(os.path.join(folder_path, "*.xlsx")) for file in files: os.remove(file) return {"message": "Semua file Excel berhasil dihapus."} @app.get("/download/{file_type}") def download_file(file_type: str): if file_type in file_map: file_path = file_map[file_type] if os.path.exists(file_path): file_like = open(file_path, mode="rb") return StreamingResponse( file_like, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={"Content-Disposition": f"attachment; filename={os.path.basename(file_path)}"} ) else: raise HTTPException(status_code=404, detail="File not found.") else: raise HTTPException(status_code=404, detail="Invalid file type.") @app.get("/cleaned-files/{folder}/{filename}") def get_cleaned_file(folder: str, filename: str): file_path = os.path.join(BASE_DIR, folder, filename) if os.path.exists(file_path): return FileResponse(file_path, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet") return {"error": "File not found"} @app.get("/list-files/{category}") def list_files(category: str): folder_path = os.path.join(BASE_FOLDER, category.lower()) if not os.path.exists(folder_path): raise HTTPException(status_code=404, detail="Folder tidak ditemukan") files = os.listdir(folder_path) return {"files": files} @app.get("/download-file/{category}/{filename}") def download_file(category: str, filename: str): file_path = os.path.join(BASE_FOLDER, category.lower(), filename) if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="File tidak ditemukan") return FileResponse(path=file_path, filename=filename, media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') def get_latest_file(file_pattern: str): files = glob.glob(os.path.join(DATA_FOLDER, file_pattern)) if not files: return None latest_file = max(files, key=os.path.getctime) return latest_file # Home route @app.get("/") def home(): return {"message": "Welcome to Scraper penelitian API"} def scrape_and_download(script_name: str, file_pattern: str): try: result = subprocess.run( ["python", f"D:\\lecturertask\\scraper\\{script_name}"], capture_output=True, text=True ) if result.returncode != 0: return JSONResponse(content={"error": result.stderr}, status_code=500) latest_file = get_latest_file(file_pattern) if latest_file: return FileResponse(latest_file, filename=os.path.basename(latest_file)) return JSONResponse(content={"error": "File hasil scraping tidak ditemukan."}, status_code=404) except Exception as e: return JSONResponse(content={"error": str(e)}, status_code=500) @app.post("/submit-cleaned/{chart_type}") async def submit_cleaned_file(chart_type: str, file: UploadFile = File(...)): chart_type = chart_type.lower() folder_map = { "hki": "hki", "penelitian": "penelitian", "pengabdian": "pengabdian", "scopus": "scopus", "scholar": "scholar" } folder = folder_map.get(chart_type) if not folder: raise HTTPException(status_code=400, detail="Kategori tidak valid") save_path = f"download_files_scraper/{folder}/{file.filename}" with open(save_path, "wb") as f: f.write(await file.read()) return {"message": f"File berhasil disimpan ke grafik {chart_type}"} @app.post("/upload-excel") async def upload_excel(file: UploadFile = File(...)): try: file_content = await file.read() df = pd.read_excel(BytesIO(file_content)) with open("D:/lecturertask/cleaner_tokens.txt", "r") as f: cleaner_tokens = [line.strip().lower() for line in f.readlines()] def contains_name(row): for token in cleaner_tokens: pattern = r"\b" + re.escape(token) + r"\b" if any(re.search(pattern, str(cell), re.IGNORECASE) for cell in row): return True return False df_cleaned = df[df.apply(contains_name, axis=1)] existing_files = [f for f in os.listdir(DATA_FOLDER) if f.startswith("cleaned_excel_file_scraping") and f.endswith(FILE_EXT)] max_num = 0 for filename in existing_files: try: num = int(filename.replace("cleaned_excel_file_scraping", "").replace(FILE_EXT, "")) if num > max_num: max_num = num except ValueError: pass next_num = max_num + 1 new_filename = f"cleaned_excel_file_scraping{next_num}{FILE_EXT}" save_path = os.path.join(DATA_FOLDER, new_filename) print(f"Path penyimpanan file: {save_path}") df_cleaned.to_excel(save_path, index=False, engine="openpyxl") return FileResponse(save_path, media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', filename=new_filename) except Exception as e: import traceback error_message = traceback.format_exc() return JSONResponse(status_code=500, content={"error": error_message}) @app.get("/scrape/scholar") def scrape_scholar(): return scrape_data("scraper_scholar.py", "Scraping Scholar selesai.") @app.get("/scholar/download") def download_latest_scholar(): return download_file("data_scholar*.xlsx") @app.get("/scrape/scopus") def scrape_scopus(): return scrape_data("scraper_scopus.py", "Scraping Scopus selesai.") @app.get("/scopus/download") def download_latest_file(): return download_file(f"{BASE_FILENAME}*{FILE_EXT}") @app.get("/scrape/pengabdian") def scrape_pengabdian(): return scrape_data("scraper_pengabdian.py", "Scraping Pengabdian selesai.") @app.get("/pengabdian/download") def download_latest_pengabdian(): return download_file("data_pengabdian*.xlsx") @app.get("/scrape/hki") def scrape_hki(): return scrape_data("scraper_HKI.py", "Scraping HKI selesai.") @app.get("/hki/download") def download_latest_hki(): return download_file("data_hki*.xlsx") @app.get("/scrape/penelitian") def scrape_penelitian(): return scrape_data("scraper_penelitian.py", "Scraping Penelitian selesai.") @app.get("/penelitian/download") def download_latest_penelitian(): return download_file("data_penelitian*.xlsx") # Generic function to scrape data def scrape_data(script_name: str, success_message: str): try: result = subprocess.run( ["python", f"D:\\lecturertask\\scraper\\{script_name}"], capture_output=True, text=True ) if result.returncode == 0: return {"message": success_message} return JSONResponse(content={"error": result.stderr}, status_code=500) except Exception as e: return JSONResponse(content={"error": str(e)}, status_code=500) @app.get("/scrape-download/scopus") def scrape_download_scopus(): return scrape_and_download("scraper_scopus.py", "data_scopus*.xlsx") @app.get("/scrape-download/scholar") def scrape_download_scholar(): return scrape_and_download("scraper_scholar.py", "data_scholar*.xlsx") @app.get("/scrape-download/pengabdian") def scrape_download_pengabdian(): return scrape_and_download("scraper_pengabdian.py", "data_pengabdian*.xlsx") @app.get("/scrape-download/hki") def scrape_download_hki(): return scrape_and_download("scraper_HKI.py", "data_hki*.xlsx") @app.get("/scrape-download/penelitian") def scrape_download_penelitian(): return scrape_and_download("scraper_penelitian.py", "data_penelitian*.xlsx") # Generic function to download file def download_file(file_pattern: str): latest_file = get_latest_file(file_pattern) if latest_file: return FileResponse(latest_file, filename=os.path.basename(latest_file)) return JSONResponse(content={"error": "File tidak ditemukan."}, status_code=404) # Generic function to preview file def preview_file(file_pattern: str): latest_file = get_latest_file(file_pattern) if not latest_file: return {"data": []} df = pd.read_excel(latest_file) return {"data": df.to_dict(orient="records")} @app.post("/upload-pengabdian") async def upload_pengabdian_excel(file: UploadFile = File(...)): contents = await file.read() path = os.path.join(DATA_FOLDER, f"uploaded_pengabdian_{file.filename}") with open(path, "wb") as f: f.write(contents) return {"message": "File berhasil diunggah", "filename": file.filename} @app.post("/upload-dashboard") def upload_dashboard_file(file: UploadFile = File(...)): contents = file.file.read() filename = file.filename.lower() # Simpan file sementara path = f"uploaded_files/{file.filename}" os.makedirs("uploaded_files", exist_ok=True) with open(path, "wb") as f: f.write(contents) # Baca data dan tentukan tipe df = pd.read_excel(path) detected_type = None if "judul pengabdian" in df.columns.str.lower(): detected_type = "pengabdian" elif "judul penelitian" in df.columns.str.lower(): detected_type = "penelitian" elif "inventor" in df.columns.str.lower(): detected_type = "hki" elif "source title" in df.columns.str.lower(): detected_type = "scopus" elif "title" in df.columns.str.lower() and "citations" in df.columns.str.lower(): detected_type = "scholar" return { "status": "success", "detected_type": detected_type, "filename": file.filename, "columns": df.columns.tolist(), "rows": df.head(5).to_dict(orient="records") } if __name__ == "__main__": import uvicorn uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True)