from datetime import datetime from collections import Counter import io import json import logging import re import threading import time import traceback from urllib.parse import urlparse, urlunparse import uuid from Sastrawi.Stemmer.StemmerFactory import StemmerFactory from flask import Flask, Response, jsonify, request, send_file from flask_cors import CORS import joblib import matplotlib import matplotlib.pyplot as plt import nltk from nltk.corpus import stopwords from nltk.tokenize import word_tokenize import numpy as np from reportlab.lib import colors from reportlab.lib.pagesizes import A4 from reportlab.lib.styles import getSampleStyleSheet from reportlab.platypus import ( Image as RLImage, Paragraph, SimpleDocTemplate, Spacer, Table, TableStyle, ) from scipy.sparse import csr_matrix, hstack from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait from webdriver_manager.chrome import ChromeDriverManager matplotlib.use('Agg') # Setup Logging logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") def get_reviews_and_category(shortlink: str, on_progress=None): chrome_options = Options() chrome_options.add_argument("--start-maximized") chrome_options.add_argument("--headless=new") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--disable-blink-features=AutomationControlled") chrome_options.add_argument("--window-size=1366,768") chrome_options.add_argument( "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0 Safari/537.36" ) service = Service(ChromeDriverManager().install()) driver = webdriver.Chrome(service=service, options=chrome_options) try: driver.get(shortlink) # Wait for redirect off the shortlink and into tokopedia domain WebDriverWait(driver, 40).until(lambda d: d.current_url != shortlink) WebDriverWait(driver, 40).until(lambda d: "tokopedia.com" in d.current_url) time.sleep(2) original_url = driver.current_url if not original_url or original_url == shortlink: raise RuntimeError(f"Redirect did not resolve. current_url='{driver.current_url}'") parsed_url = urlparse(original_url) review_path = parsed_url.path.rstrip('/') + "/review" review_url = urlunparse((parsed_url.scheme, parsed_url.netloc, review_path, "", "", "")) # Capture product name on the product page (most reliable) try: product_name = WebDriverWait(driver, 10).until( EC.visibility_of_element_located((By.XPATH, "//h1[@data-testid='lblPDPDetailProductName']")) ).text.strip() except Exception: product_name = "" driver.get(review_url) if not WebDriverWait(driver, 40).until(lambda d: d.title.strip() != ""): raise RuntimeError("Halaman review tidak memuat title") time.sleep(5) all_reviews = [] category = "" productName = product_name while True: try: try: review_feed = WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.ID, "review-feed"))) except Exception: raise RuntimeError("Elemen review-feed tidak ditemukan") # Ensure the reviews container is in view so lazy content loads try: driver.execute_script("arguments[0].scrollIntoView({block: 'start'});", review_feed) time.sleep(1) except Exception: pass # Wait for at least one review to appear with retries reviews = review_feed.find_elements(By.XPATH, ".//span[@data-testid='lblItemUlasan']") retries = 3 while not reviews and retries > 0: try: driver.execute_script("window.scrollBy(0, 600);") except Exception: pass time.sleep(1) reviews = review_feed.find_elements(By.XPATH, ".//span[@data-testid='lblItemUlasan']") retries -= 1 category = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.XPATH, "//nav[@aria-label='Breadcrumb']//ol/li[2]//a")) ).text.strip() # productName already captured from product page; keep as-is if not productName: # Fallbacks on review page try: pn = WebDriverWait(driver, 5).until( EC.visibility_of_element_located((By.XPATH, "//h1[@data-testid='lblPDPDetailProductName']")) ).text.strip() if pn: productName = pn except Exception: try: og = driver.find_element(By.XPATH, "//meta[@property='og:title']").get_attribute("content") if og: productName = og.strip() except Exception: try: # As a last resort, use last breadcrumb item text bc = driver.find_element(By.XPATH, "//nav[@aria-label='Breadcrumb']//ol/li[last()]//a").text if bc: productName = bc.strip() except Exception: pass for review in reviews: try: parent_container = review.find_element(By.XPATH, "./ancestor::*[self::div or self::li][1]") more_button = parent_container.find_element(By.XPATH, ".//button[contains(normalize-space(.), 'Selengkapnya')]") driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", more_button) driver.execute_script("arguments[0].click();", more_button) try: WebDriverWait(driver, 5).until(EC.staleness_of(more_button)) except Exception: time.sleep(0.5) except Exception: pass review_text = review.text.strip() if review_text and review_text not in all_reviews: all_reviews.append(review_text) if on_progress: try: on_progress(len(all_reviews)) except Exception: pass try: next_button = WebDriverWait(driver, 5).until( EC.presence_of_element_located((By.XPATH, "//button[contains(@aria-label, 'Laman berikutnya')]") ) ) if next_button.get_attribute("disabled") is not None: break driver.execute_script("arguments[0].click();", next_button) time.sleep(3) except Exception: break except Exception as e: logging.error(f"Error saat mengambil review: {type(e).__name__}: {e!r}") break return all_reviews, category, productName finally: driver.quit() # Mapping kategori breadcrumb (ID) ke encoding yang Anda sediakan (EN) category_map = { "Perawatan Hewan": 0, # Animal Care "Otomotif": 1, # Automotive "Kecantikan": 2, # Beauty "Perawatan Tubuh": 3, # Body Care "Buku": 4, # Books "Audio, Kamera & Elektronik Lainnya": 5, # Camera (atau gunakan 20 jika ingin 'Other Products') "Pertukangan": 6, # Carpentry "Komputer & Laptop": 7, # Computers and Laptops "Elektronik": 8, # Electronics "Makanan & Minuman": 9, # Food and Drink "Gaming": 10, # Gaming "Kesehatan": 11, # Health "Rumah Tangga": 12, # Household "Fashion Anak & Bayi": 13, # Kids and Baby Fashion "Dapur": 14, # Kitchen "Fashion Pria": 15, # Men's Fashion "Ibu & Bayi": 16, # Mother and Baby "Film & Musik": 17, # Movies and Music "Fashion Muslim": 18, # Muslim Fashion "Office & Stationery": 19, # Office & Stationery "Lainnya": 20, # Other Products (fallback label jika ada) "Perlengkapan Pesta": 21, # Party Supplies and Craft "Handphone & Tablet": 22, # Phones and Tablets "Logam Mulia": 23, # Precious Metal "Properti": 24, # Property "Olahraga": 25, # Sport "Tiket, Travel, Voucher": 26, # Tour and Travel "Mainan & Hobi": 27, # Toys and Hobbies "Fashion Wanita": 28, # Women's Fashion } stemmer = StemmerFactory().create_stemmer() try: nltk.data.find('corpora/stopwords') except LookupError: nltk.download('stopwords') try: nltk.data.find('tokenizers/punkt') except LookupError: nltk.download('punkt') stop_words = set(stopwords.words('indonesian')) def preprocess_text(text): text = text.lower() text = re.sub(r'[^a-zA-Z\s]', '', text) words = word_tokenize(text) words = [stemmer.stem(word) for word in words if word not in stop_words] return ' '.join(words) def _preprocess_for_bigrams(text): text = str(text or "").lower() text = re.sub(r'[^a-zA-Z\s]', ' ', text) text = re.sub(r'\s+', ' ', text).strip() if not text: return [] toks = [t for t in text.split(' ') if t] # Keep common negations even if they exist in stopwords; they are important for bigram meaning. keep = {"tidak", "tak", "bukan", "belum", "jangan", "ga", "gak", "nggak", "tdk"} toks = [stemmer.stem(t) for t in toks if (t in keep) or (t not in stop_words)] return toks def summarize_top_terms_by_sentiment(X_text, preds, vectorizer, texts_preprocessed=None, reviews=None, top_n=15): feature_names = None try: feature_names = vectorizer.get_feature_names_out() except Exception: try: feature_names = vectorizer.get_feature_names() except Exception: feature_names = None def _bigram_counter_for_label(label_value): idx = np.where(np.asarray(preds) == label_value)[0] if idx.size == 0: return Counter() c = Counter() for i in idx: if reviews is not None and i < len(reviews): toks = _preprocess_for_bigrams(reviews[i]) elif texts_preprocessed is not None and i < len(texts_preprocessed): toks = [t for t in str(texts_preprocessed[i]).split() if t] else: toks = [] if len(toks) < 2: continue for a, b in zip(toks, toks[1:]): c[f"{a} {b}"] += 1 return c def _top_bigrams_distinctive(label_value): own = c_pos if label_value == 1 else c_neg other = c_neg if label_value == 1 else c_pos if not own: return [] scored = [] for bg, cnt in own.items(): # Distinctiveness score to avoid generic phrases leaking into the other class. score = (cnt + 1.0) / (other.get(bg, 0) + 1.0) scored.append((score, cnt, bg)) scored.sort(reverse=True) return [bg for _, __, bg in scored[:top_n]] preds_arr = np.asarray(preds) pos_count = int(np.sum(preds_arr == 1)) neg_count = int(np.sum(preds_arr == 0)) dominant = "Positif" if pos_count >= neg_count else "Negatif" c_pos = _bigram_counter_for_label(1) c_neg = _bigram_counter_for_label(0) return { "dominant_sentiment": dominant, "counts": {"Positif": pos_count, "Negatif": neg_count}, "top_bigrams": { "Positif": _top_bigrams_distinctive(1), "Negatif": _top_bigrams_distinctive(0), }, } model = joblib.load('naive_bayes_model.pkl') vectorizer = joblib.load('tfidf_vectorizer.pkl') app = Flask(__name__) CORS(app, resources={r"/analyze": {"origins": "*"}, r"/stream/*": {"origins": "*"}, r"/progress/*": {"origins": "*"}, r"/download/*": {"origins": "*"}}) # In-memory job progress store PROGRESS = {} PROGRESS_LOCK = threading.Lock() def set_progress(job_id, status=None, percent=None, message=None, data=None): with PROGRESS_LOCK: st = PROGRESS.get(job_id, {"status": "pending", "percent": 0, "message": "", "data": None, "ts": None}) if status is not None: st["status"] = status if percent is not None: st["percent"] = percent if message is not None: st["message"] = message if data is not None: st["data"] = data st["ts"] = datetime.now().isoformat(timespec='seconds') PROGRESS[job_id] = st def _run_analysis_job(job_id, shortlink): try: set_progress(job_id, status="running", percent=5, message="Resolving shortlink...") def _scrape_progress(n): set_progress(job_id, message=f"Sudah berhasil mengambil {n} data") reviews, category_name, product_name = get_reviews_and_category(shortlink, on_progress=_scrape_progress) set_progress(job_id, percent=30, message=f"Scraped {len(reviews)} reviews; preprocessing...") texts_preprocessed = [preprocess_text(r) for r in reviews] if not texts_preprocessed: set_progress(job_id, status="completed", percent=100, message="No reviews found", data={"category": category_name, "product_name": product_name, "items": []}) return set_progress(job_id, percent=55, message="Vectorizing...") X_text = vectorizer.transform(texts_preprocessed) encoded_category = category_map.get(category_name, 20) X_cat = csr_matrix(np.full((len(reviews), 1), encoded_category)) set_progress(job_id, percent=75, message="Predicting sentiments...") X_final = hstack([X_text, X_cat]) preds = model.predict(X_final) set_progress(job_id, percent=85, message="Meringkas kata dominan per sentimen...") summary = summarize_top_terms_by_sentiment( X_text, preds, vectorizer, texts_preprocessed=texts_preprocessed, reviews=reviews, top_n=15, ) items = [] for i, (review, p) in enumerate(zip(reviews, preds), start=1): label = "Positif" if p == 1 else ("Negatif" if p == 0 else str(p)) items.append({"sentiment": label, "review": review}) # Optional fine-grained progress set_progress(job_id, percent=75 + int(20 * (i/len(reviews))), message=f"Sudah berhasil mengambil {i} data") result = { "category": category_name, "category_encoded": encoded_category, "product_name": product_name, "count": len(items), "items": items, "summary": summary, } set_progress(job_id, status="completed", percent=100, message="Done", data=result) except Exception as e: tb = traceback.format_exc() set_progress(job_id, status="failed", percent=100, message=str(e) or "Unhandled error", data={"error": str(e), "traceback": tb}) @app.post('/analyze') def analyze(): data = request.get_json(silent=True) or {} shortlink = data.get('shortlink') if not shortlink: return jsonify({"error": "shortlink is required"}), 400 job_id = uuid.uuid4().hex set_progress(job_id, status="queued", percent=0, message="Job queued") t = threading.Thread(target=_run_analysis_job, args=(job_id, shortlink), daemon=True) t.start() base = request.host_url.rstrip('/') return jsonify({ "job_id": job_id, "progress_url": f"{base}/progress/{job_id}", "stream_url": f"{base}/stream/{job_id}" }), 202 @app.get('/progress/') def progress(job_id): with PROGRESS_LOCK: st = PROGRESS.get(job_id) if not st: return jsonify({"error": "unknown job_id"}), 404 payload = {"status": st.get("status"), "percent": st.get("percent"), "message": st.get("message"), "ts": st.get("ts")} if st.get("status") in ("completed", "failed") and st.get("data") is not None: payload["result"] = st["data"] return jsonify(payload) @app.get('/stream/') def stream(job_id): def gen(): last_payload = None while True: with PROGRESS_LOCK: st = PROGRESS.get(job_id) if not st: yield f"data: {json.dumps({'error': 'unknown job_id'})}\n\n" break payload = {"status": st.get("status"), "percent": st.get("percent"), "message": st.get("message"), "ts": st.get("ts")} if payload != last_payload: yield f"data: {json.dumps(payload)}\n\n" last_payload = payload if st.get("status") in ("completed", "failed"): # send final data when completed if st.get("data") is not None: yield f"data: {json.dumps({'status': st['status'], 'percent': st['percent'], 'message': st['message'], 'ts': st.get('ts'), 'result': st['data']})}\n\n" break time.sleep(0.1) return Response( gen(), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*' } ) @app.get('/download//pdf') def download_pdf(job_id): with PROGRESS_LOCK: st = PROGRESS.get(job_id) if not st: return jsonify({"error": "unknown job_id"}), 404 if st.get("status") != "completed" or not st.get("data"): return jsonify({"error": "job not completed"}), 400 data = st["data"] items = data.get("items", []) summary = data.get("summary") or {} pos = sum(1 for it in items if it.get("sentiment") == "Positif") neg = sum(1 for it in items if it.get("sentiment") == "Negatif") chart_bytes = io.BytesIO() labels = ["Positif", "Negatif"] sizes = [pos, neg] chart_colors = ['#34D399', '#F87171'] explode = (0.04, 0.04) if pos and neg else (0, 0) fig, ax = plt.subplots(figsize=(4.5, 4.5)) ax.pie(sizes, explode=explode, labels=labels, colors=chart_colors, autopct='%1.1f%%', startangle=140) ax.axis('equal') plt.tight_layout() fig.savefig(chart_bytes, format='png', dpi=150, bbox_inches='tight') plt.close(fig) chart_bytes.seek(0) pdf_buf = io.BytesIO() doc = SimpleDocTemplate(pdf_buf, pagesize=A4) styles = getSampleStyleSheet() elements = [] title = f"Hasil Analisis Sentimen" elements.append(Paragraph(title, styles['Title'])) elements.append(Spacer(1, 12)) meta_data = [ ["Category", data.get("category", "")], ["Product Name", data.get("product_name", "")], ["Total Reviews", str(data.get("count", 0))], ["Generated At", st.get("ts", "")], ["Positif", str(pos)], ["Negatif", str(neg)], ] meta_table = Table(meta_data, hAlign='LEFT') meta_table.setStyle(TableStyle([ ('BACKGROUND', (0,0), (0,-1), colors.whitesmoke), ('TEXTCOLOR', (0,0), (0,-1), colors.black), ('GRID', (0,0), (-1,-1), 0.25, colors.grey), ('VALIGN', (0,0), (-1,-1), 'TOP'), ('FONTNAME', (0,0), (-1,-1), 'Helvetica'), ('FONTSIZE', (0,0), (-1,-1), 9), ('BACKGROUND', (0,0), (-1,0), colors.whitesmoke), ])) elements.append(meta_table) elements.append(Spacer(1, 12)) top_bi = summary.get("top_bigrams") or {} pos_bi = top_bi.get("Positif") or [] neg_bi = top_bi.get("Negatif") or [] if pos_bi or neg_bi: elements.append(Paragraph("Ringkasan frasa dominan (bigram, frekuensi)", styles['Heading3'])) bi_rows = [[Paragraph("Positif (top)", styles['BodyText']), Paragraph("Negatif (top)", styles['BodyText'])]] max_rows = max(len(pos_bi), len(neg_bi), 1) for i in range(max_rows): p = pos_bi[i] if i < len(pos_bi) else "" n = neg_bi[i] if i < len(neg_bi) else "" bi_rows.append([Paragraph(p, styles['BodyText']), Paragraph(n, styles['BodyText'])]) bi_table = Table(bi_rows, repeatRows=1, hAlign='LEFT', colWidths=[240, 240]) bi_table.setStyle(TableStyle([ ('GRID', (0,0), (-1,-1), 0.25, colors.grey), ('BACKGROUND', (0,0), (-1,0), colors.whitesmoke), ('VALIGN', (0,0), (-1,-1), 'TOP'), ('FONTNAME', (0,0), (-1,-1), 'Helvetica'), ('FONTSIZE', (0,0), (-1,-1), 9), ])) elements.append(bi_table) elements.append(Spacer(1, 12)) rl_img = RLImage(chart_bytes, width=250, height=250) elements.append(rl_img) elements.append(Spacer(1, 12)) data_rows = [[Paragraph("sentiment", styles['BodyText']), Paragraph("review", styles['BodyText'])]] for it in items: s = it.get("sentiment", "") r = it.get("review", "") data_rows.append([Paragraph(s, styles['BodyText']), Paragraph(r, styles['BodyText'])]) table = Table(data_rows, repeatRows=1, hAlign='LEFT', colWidths=[80, 400]) table.setStyle(TableStyle([ ('GRID', (0,0), (-1,-1), 0.25, colors.grey), ('BACKGROUND', (0,0), (-1,0), colors.whitesmoke), ('VALIGN', (0,0), (-1,-1), 'TOP'), ('FONTNAME', (0,0), (-1,-1), 'Helvetica'), ('FONTSIZE', (0,0), (-1,-1), 9), ])) elements.append(table) doc.build(elements) pdf_buf.seek(0) filename = f"sentiment_{job_id}.pdf" return send_file(pdf_buf, as_attachment=True, download_name=filename, mimetype='application/pdf') if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, debug=False)