MIF_E31222544/main.py

400 lines
14 KiB
Python

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import JSONResponse
from fastapi.responses import StreamingResponse
from fastapi import Query, HTTPException
from pydantic import BaseModel
from typing import List, Dict, Optional
import subprocess
import glob
import os
import pandas as pd
import re
from io import BytesIO
from fastapi import APIRouter
from google_sheets_helper import get_sheet
app = FastAPI()
router = APIRouter()
# CORS Middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
DATA_FOLDER = "D:\\lecturertask"
# BASE_FILENAME = "data_scopus"
FILE_EXT = ".xlsx"
BASE_DIR = "D:/lecturertask/download_files_scraper"
folder_map = {
"cleaned_pengabdian.xlsx": "pengabdian",
"cleaned_penelitian.xlsx": "penelitian",
"cleaned_hki.xlsx": "hki",
"cleaned_scholar.xlsx": "scholar",
"cleaned_scopus.xlsx": "scopus",
}
file_map = {
"pengabdian": os.path.join(BASE_DIR, "pengabdian", "pengabdian_cleaned.xlsx"),
"penelitian": os.path.join(BASE_DIR, "penelitian", "penelitian_cleaned.xlsx"),
"hki": os.path.join(BASE_DIR, "hki", "hki_cleaned.xlsx"),
"scopus": os.path.join(BASE_DIR, "scopus", "scopus_cleaned.xlsx"),
"scholar": os.path.join(BASE_DIR, "scholar", "scholar_cleaned.xlsx"),
}
class SaveFileRequest(BaseModel):
folder: str
filename: str
data: List[Dict]
@router.post("/google-sheet/update/{category}")
def update_sheet_from_local(category: str):
path_map = {
"penelitian": "D:/lecturertask/download_files_scraper/penelitian/penelitian_cleaned.xlsx",
"pengabdian": "D:/lecturertask/download_files_scraper/pengabdian/pengabdian_cleaned.xlsx",
"hki": "D:/lecturertask/download_files_scraper/hki/hki_cleaned.xlsx",
"scopus": "D:/lecturertask/download_files_scraper/scopus/scopus_cleaned.xlsx",
"scholar": "D:/lecturertask/download_files_scraper/scholar/scholar_cleaned.xlsx",
}
spreadsheet_id = "SPREADSHEET_ID_KAMU"
worksheet_name = category
if category not in path_map:
raise HTTPException(status_code=400, detail="Kategori tidak valid")
file_path = path_map[category]
df = pd.read_excel(file_path)
worksheet = get_sheet(spreadsheet_id, worksheet_name)
worksheet.clear()
# Set header + data
worksheet.update([df.columns.values.tolist()] + df.values.tolist())
return {"message": f"Data dari {category} berhasil dikirim ke Google Sheets."}
@app.post("/save-cleaned-file")
async def save_cleaned_file(req: SaveFileRequest):
# Tentukan path file
base_path = r"D:\lecturertask\download_files_scraper"
folder_path = os.path.join(base_path, req.folder)
os.makedirs(folder_path, exist_ok=True)
file_path = os.path.join(folder_path, req.filename)
# Convert list of dict ke DataFrame
df = pd.DataFrame(req.data)
# Simpan ke Excel (overwrite)
df.to_excel(file_path, index=False)
return {"message": "File berhasil disimpan", "path": file_path}
@app.delete("/delete_all_excel")
def delete_all_excel_files():
folder_path = "D:/lecturertask/download_files_scraper"
files = glob.glob(os.path.join(folder_path, "*.xlsx"))
for file in files:
os.remove(file)
return {"message": "Semua file Excel berhasil dihapus."}
@app.get("/download/{file_type}")
def download_file(file_type: str):
if file_type in file_map:
file_path = file_map[file_type]
if os.path.exists(file_path):
file_like = open(file_path, mode="rb")
return StreamingResponse(
file_like,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f"attachment; filename={os.path.basename(file_path)}"}
)
else:
raise HTTPException(status_code=404, detail="File not found.")
else:
raise HTTPException(status_code=404, detail="Invalid file type.")
@app.get("/cleaned-files/{folder}/{filename}")
def get_cleaned_file(folder: str, filename: str):
file_path = os.path.join(BASE_DIR, folder, filename)
if os.path.exists(file_path):
response = FileResponse(file_path, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
response.headers["Cache-Control"] = "no-store"
return response
return {"error": "File not found"}
@app.get("/list-files/{category}")
def list_files(category: str):
folder_path = os.path.join(BASE_FOLDER, category.lower())
if not os.path.exists(folder_path):
raise HTTPException(status_code=404, detail="Folder tidak ditemukan")
files = os.listdir(folder_path)
return {"files": files}
@app.get("/download-file/{category}/{filename}")
def download_file(category: str, filename: str):
file_path = os.path.join(BASE_FOLDER, category.lower(), filename)
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File tidak ditemukan")
return FileResponse(path=file_path, filename=filename, media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
def get_latest_file(file_pattern: str):
files = glob.glob(os.path.join(DATA_FOLDER, file_pattern))
if not files:
return None
latest_file = max(files, key=os.path.getctime)
return latest_file
# Home route
@app.get("/")
def home():
return {"message": "Welcome to Scraper penelitian API"}
def scrape_and_download(script_name: str, file_pattern: str):
try:
result = subprocess.run(
["python", f"D:\\lecturertask\\scraper\\{script_name}"],
capture_output=True, text=True
)
if result.returncode != 0:
return JSONResponse(content={"error": result.stderr}, status_code=500)
latest_file = get_latest_file(file_pattern)
if latest_file:
return FileResponse(latest_file, filename=os.path.basename(latest_file))
return JSONResponse(content={"error": "File hasil scraping tidak ditemukan."}, status_code=404)
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
@app.post("/submit-cleaned/{chart_type}")
async def submit_cleaned_file(chart_type: str, file: UploadFile = File(...)):
chart_type = chart_type.lower()
folder_map = {
"hki": "hki",
"penelitian": "penelitian",
"pengabdian": "pengabdian",
"scopus": "scopus",
"scholar": "scholar"
}
folder = folder_map.get(chart_type)
if not folder:
raise HTTPException(status_code=400, detail="Kategori tidak valid")
save_path = f"download_files_scraper/{folder}/{file.filename}"
with open(save_path, "wb") as f:
f.write(await file.read())
return {"message": f"File berhasil disimpan ke grafik {chart_type}"}
@app.post("/upload-excel")
async def upload_excel(file: UploadFile = File(...)):
try:
file_content = await file.read()
df = pd.read_excel(BytesIO(file_content))
with open("D:/lecturertask/cleaner_tokens.txt", "r") as f:
cleaner_tokens = [line.strip().lower() for line in f.readlines()]
def contains_name(row):
for token in cleaner_tokens:
pattern = r"\b" + re.escape(token) + r"\b"
if any(re.search(pattern, str(cell), re.IGNORECASE) for cell in row):
return True
return False
df_cleaned = df[df.apply(contains_name, axis=1)]
existing_files = [f for f in os.listdir(DATA_FOLDER) if f.startswith("cleaned_excel_file_scraping") and f.endswith(FILE_EXT)]
max_num = 0
for filename in existing_files:
try:
num = int(filename.replace("cleaned_excel_file_scraping", "").replace(FILE_EXT, ""))
if num > max_num:
max_num = num
except ValueError:
pass
next_num = max_num + 1
new_filename = f"cleaned_excel_file_scraping{next_num}{FILE_EXT}"
save_path = os.path.join(DATA_FOLDER, new_filename)
print(f"Path penyimpanan file: {save_path}")
df_cleaned.to_excel(save_path, index=False, engine="openpyxl")
return FileResponse(save_path, media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', filename=new_filename)
except Exception as e:
import traceback
error_message = traceback.format_exc()
return JSONResponse(status_code=500, content={"error": error_message})
@app.get("/scrape/scholar")
def scrape_scholar():
return scrape_data("scraper_scholar.py", "Scraping Scholar selesai.")
@app.get("/scrape/onescholar")
def scrape_one_scholar(link: str):
return scrape_data("scraper_onescholar.py", "Scraping One Scholar selesai.", extra_args=[link])
@app.get("/scholar/download")
def download_latest_scholar():
return download_file("data_scholar*.xlsx")
@app.get("/scrape/scopus")
def scrape_scopus(affiliation_id: Optional[int] = Query(None)):
if not affiliation_id:
return JSONResponse(content={"error": "affiliation_id kosong"}, status_code=400)
return scrape_data("scraper_scopus.py", "Scraping Scopus selesai.", extra_args=[str(affiliation_id)])
@app.get("/scopus/download")
def download_latest_file():
return download_file(f"{BASE_FILENAME}*{FILE_EXT}")
@app.get("/scrape/pengabdian")
def scrape_pengabdian(affiliation_id: Optional[int] = Query(None)):
if not affiliation_id:
return JSONResponse(content={"error": "affiliation_id kosong"}, status_code=400)
return scrape_data("scraper_pengabdian.py", "Scraping Pengabdian selesai.", extra_args=[str(affiliation_id)])
@app.get("/pengabdian/download")
def download_latest_pengabdian():
return download_file("data_pengabdian*.xlsx")
@app.get("/scrape/hki")
def scrape_hki(affiliation_id: Optional[int] = Query(None)):
if not affiliation_id:
return JSONResponse(content={"error": "affiliation_id kosong"}, status_code=400)
return scrape_data("scraper_HKI.py", "Scraping HKI selesai.", extra_args=[str(affiliation_id)])
@app.get("/hki/download")
def download_latest_hki():
return download_file("data_hki*.xlsx")
@app.get("/scrape/penelitian")
def scrape_penelitian(affiliation_id: Optional[int] = Query(None)):
print("📩 Affiliation ID diterima di FastAPI:", affiliation_id)
if not affiliation_id:
return JSONResponse(content={"error": "affiliation_id kosong"}, status_code=400)
return scrape_data("scraper_penelitian.py", "Scraping Penelitian selesai.", extra_args=[str(affiliation_id)])
@app.get("/penelitian/download")
def download_latest_penelitian():
return download_file("data_penelitian*.xlsx")
def scrape_data(script_name: str, success_message: str, extra_args: list = None):
try:
args = ["python", f"D:\\lecturertask\\scraper\\{script_name}"]
if extra_args:
args.extend(extra_args)
result = subprocess.run(
args,
capture_output=True, text=True
)
if result.returncode == 0:
return {"message": success_message}
return JSONResponse(content={"error": result.stderr}, status_code=500)
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
@app.get("/scrape-download/scopus")
def scrape_download_scopus():
return scrape_and_download("scraper_scopus.py", "data_scopus*.xlsx")
@app.get("/scrape-download/scholar")
def scrape_download_scholar():
return scrape_and_download("scraper_scholar.py", "data_scholar*.xlsx")
@app.get("/scrape-download/pengabdian")
def scrape_download_pengabdian():
return scrape_and_download("scraper_pengabdian.py", "data_pengabdian*.xlsx")
@app.get("/scrape-download/hki")
def scrape_download_hki():
return scrape_and_download("scraper_HKI.py", "data_hki*.xlsx")
@app.get("/scrape-download/penelitian")
def scrape_download_penelitian():
return scrape_and_download("scraper_penelitian.py", "data_penelitian*.xlsx")
# Generic function to download file
def download_file(file_pattern: str):
latest_file = get_latest_file(file_pattern)
if latest_file:
return FileResponse(latest_file, filename=os.path.basename(latest_file))
return JSONResponse(content={"error": "File tidak ditemukan."}, status_code=404)
# Generic function to preview file
def preview_file(file_pattern: str):
latest_file = get_latest_file(file_pattern)
if not latest_file:
return {"data": []}
df = pd.read_excel(latest_file)
return {"data": df.to_dict(orient="records")}
@app.post("/upload-pengabdian")
async def upload_pengabdian_excel(file: UploadFile = File(...)):
contents = await file.read()
path = os.path.join(DATA_FOLDER, f"uploaded_pengabdian_{file.filename}")
with open(path, "wb") as f:
f.write(contents)
return {"message": "File berhasil diunggah", "filename": file.filename}
@app.post("/upload-dashboard")
def upload_dashboard_file(file: UploadFile = File(...)):
contents = file.file.read()
filename = file.filename.lower()
# Simpan file sementara
path = f"uploaded_files/{file.filename}"
os.makedirs("uploaded_files", exist_ok=True)
with open(path, "wb") as f:
f.write(contents)
# Baca data dan tentukan tipe
df = pd.read_excel(path)
detected_type = None
if "judul pengabdian" in df.columns.str.lower():
detected_type = "pengabdian"
elif "judul penelitian" in df.columns.str.lower():
detected_type = "penelitian"
elif "inventor" in df.columns.str.lower():
detected_type = "hki"
elif "source title" in df.columns.str.lower():
detected_type = "scopus"
elif "title" in df.columns.str.lower() and "citations" in df.columns.str.lower():
detected_type = "scholar"
return {
"status": "success",
"detected_type": detected_type,
"filename": file.filename,
"columns": df.columns.tolist(),
"rows": df.head(5).to_dict(orient="records")
}
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True)