from flask_socketio import SocketIO, emit, join_room, leave_room from flask import request from app.services import SessionService import threading import time import json from redis import Redis class RedisRepository: """Small helper wrapper to (de)serialize python objects to Redis.""" def __init__(self, redis: Redis): self.redis = redis def set_data(self, key: str, value): self.redis.set(key, json.dumps(value)) def get_data(self, key: str): data = self.redis.get(key) return json.loads(data) if data else None def delete_key(self, key: str): self.redis.delete(key) class SocketController: def __init__( self, socketio: SocketIO, redis: Redis, session_service: SessionService, ): self.socketio = socketio self.session_service = session_service self.redis_repo = RedisRepository(redis) # Menyimpan SID admin untuk setiap session \u2192 {session_code: sid} self.admin_sids: dict[str, str] = {} self._register_events() @staticmethod def _is_correct(user_answer, question: dict) -> bool: """Advanced answer validation method.""" print(f"Validating answer: user={user_answer}, question={question}") try: if question["type"] == "fill_the_blank": return ( str(user_answer).strip().lower() == str(question["target_answer"]).strip().lower() ) elif question["type"] == "true_false": return bool(user_answer) == question["target_answer"] elif question["type"] == "option": # Handle both index and text-based answers try: # First try numeric index comparison return int(user_answer) == question["target_answer"] except ValueError: # If not an index, compare text return ( str(user_answer).strip().lower() == str(question["options"][question["target_answer"]]) .strip() .lower() ) return False except Exception as e: print(f"Error in answer validation: {e}") return False def _questions_key(self, session_code: str) -> str: return f"session:{session_code}:questions" def _answers_key(self, session_code: str) -> str: return f"session:{session_code}:answers" def _scores_key(self, session_code: str) -> str: return f"session:{session_code}:scores" # --------------------------------------------------------------------- # Socket.IO event bindings # --------------------------------------------------------------------- def _register_events(self): @self.socketio.on("connect") def on_connect(): print(f"Client connected: {request.sid}") emit("connection_response", {"status": "connected", "sid": request.sid}) @self.socketio.on("disconnect") def on_disconnect(): print(f"Client disconnected: {request.sid}") @self.socketio.on("join_room") def handle_join_room(data): session_code = data.get("session_code") user_id = data.get("user_id") if not session_code or not user_id: emit("error", {"message": "session_code and user_id are required"}) return session = self.session_service.join_session( session_code=session_code, user_id=user_id ) if session is None: emit("error", {"message": "Failed to join session or session inactive"}) return join_room(session_code) # Kalau user ini admin, simpan SID‑nya. if session["is_admin"]: self.admin_sids[session_code] = request.sid message = "Admin has joined the room." else: message = f"User {session['username']} has joined the room." print(message) emit( "room_message", { "type": "join", "message": message, "room": session_code, "argument": "adm_update", "data": session if not session["is_admin"] else None, }, room=session_code, ) @self.socketio.on("submit_answer") def handle_submit_answer(data): session_code = data.get("session_id") user_id = data.get("user_id") question_index = data.get("question_index") user_answer = data.get("answer") if not all( [ session_code, user_id, question_index is not None, user_answer is not None, ] ): emit( "error", { "message": "session_id, user_id, question_index, and answer are required" }, ) return # ----- 1. Ambil list pertanyaan untuk mendapatkan kunci jawaban ---------- questions = ( self.redis_repo.get_data(self._questions_key(session_code)) or [] ) question = next( (q for q in questions if q["index"] == question_index), None ) if question is None: emit("error", {"message": "Question not found"}) return is_correct = self._is_correct(user_answer, question) print( f"User {user_id} answered Q{question_index} with '{user_answer}' -> {'✔' if is_correct else '✖'}" ) # ----- 2. Simpan jawaban ke Redis -------------------------------------- answers = self.redis_repo.get_data(self._answers_key(session_code)) or [] answers.append( { "user_id": user_id, "question_index": question_index, "answer": user_answer, "correct": is_correct, } ) self.redis_repo.set_data(self._answers_key(session_code), answers) # ----- 3. Update skor per‑user ----------------------------------------- scores = self.redis_repo.get_data(self._scores_key(session_code)) or {} user_score = scores.get(str(user_id), {"correct": 0, "incorrect": 0}) if is_correct: user_score["correct"] += 1 else: user_score["incorrect"] += 1 scores[str(user_id)] = user_score self.redis_repo.set_data(self._scores_key(session_code), scores) # ----- 4. Beri tahu user (ack) ----------------------------------------- emit( "answer_submitted", { "user_id": user_id, "question_index": question_index, "answer": user_answer, "correct": is_correct, }, room=request.sid, ) # ----- 5. Kirim update skor hanya ke admin ----------------------------- admin_sid = self.admin_sids.get(session_code) if admin_sid: emit("score_update", scores, room=admin_sid) @self.socketio.on("leave_room") def handle_leave_room(data): session_code = data.get("session_id") user_id = data.get("user_id") username = data.get("username", "anonymous") leave_room(session_code) emit( "room_message", { "type": "leave", "message": f"{username} has left the room.", "room": session_code, "data": user_id, }, room=session_code, ) @self.socketio.on("send_message") def on_send_message(data): session_code = data.get("session_id") message = data.get("message") username = data.get("username", "anonymous") emit( "receive_message", {"message": message, "from": username}, room=session_code, ) @self.socketio.on("end_session") def handle_end_session(data): session_code = data.get("session_id") user_id = data.get("user_id") if not session_code or not user_id: emit("error", {"message": "session_id and user_id required"}) return # Validasi user berhak mengakhiri session self.session_service.end_session(session_id=session_code, user_id=user_id) # Bersihkan semua data session di Redis for key in [ self._answers_key(session_code), self._scores_key(session_code), self._questions_key(session_code), ]: self.redis_repo.delete_key(key) emit( "room_closed", {"message": "Session has ended.", "room": session_code}, room=session_code, ) @self.socketio.on("start_quiz") def handle_start_quiz(data): session_code = data.get("session_code") if not session_code: emit("error", {"message": "session_code is required"}) return emit("quiz_started", {"message": "Quiz has started!"}, room=session_code) threading.Thread( target=self._simulate_quiz_flow, args=(session_code,), daemon=True ).start() def _simulate_quiz_flow(self, session_code: str): """Enhanced quiz flow with better question management.""" questions = [ { "index": 1, "question": "Kerajaan Hindu tertua di Indonesia adalah?", "target_answer": "Kutai", "duration": 30, "type": "fill_the_blank", "points": 10, "options": None, }, { "index": 2, "question": "Apakah benar Majapahit mencapai puncak kejayaan pada masa Hayam Wuruk?", "target_answer": True, "duration": 30, "type": "true_false", "points": 10, "options": None, }, { "index": 3, "question": "Kerajaan maritim terbesar di Asia Tenggara pada abad ke‑7 adalah?", "target_answer": 2, "duration": 30, "type": "option", "points": 10, "options": ["Majapahit", "Tarumanegara", "Sriwijaya", "Mataram Kuno"], }, ] # Simpan ke Redis self.redis_repo.set_data(self._questions_key(session_code), questions) # Inisialisasi skor untuk semua peserta scores = self.redis_repo.get_data(self._scores_key(session_code)) or {} self.redis_repo.set_data(self._scores_key(session_code), scores) # Beri jeda sebelum pertanyaan pertama time.sleep(2) for q in questions: print(f"\n📢 Mengirim pertanyaan {q['index']} ke room {session_code}") # Kirim pertanyaan tanpa jawaban yang benar question_to_send = q.copy() question_to_send.pop("target_answer", None) self.socketio.emit("quiz_question", question_to_send, room=session_code) # Tunggu durasi pertanyaan time.sleep(q["duration"]) # Generate dan kirim rekap kuis self._generate_quiz_recap(session_code) def _generate_quiz_recap(self, session_code: str): """Comprehensive quiz recap generation.""" try: # Ambil data dari Redis answers = self.redis_repo.get_data(self._answers_key(session_code)) or [] scores = self.redis_repo.get_data(self._scores_key(session_code)) or {} questions = ( self.redis_repo.get_data(self._questions_key(session_code)) or [] ) # Persiapkan data rekap recap_data = { "session_code": session_code, "total_questions": len(questions), "answers": [], "scores": [], "questions": [], } # Tambahkan detail pertanyaan for q in questions: question_recap = { "index": q["index"], "question": q["question"], "type": q["type"], "points": q.get("points", 10), } recap_data["questions"].append(question_recap) # Tambahkan detail jawaban for entry in answers: # Temukan pertanyaan terkait related_question = next( (q for q in questions if q["index"] == entry["question_index"]), None, ) answer_recap = { "user_id": entry["user_id"], "question_index": entry["question_index"], "answer": entry["answer"], "correct": entry["correct"], "question": ( related_question["question"] if related_question else "Pertanyaan tidak ditemukan" ), } recap_data["answers"].append(answer_recap) # Tambahkan skor per pengguna for uid, sc in scores.items(): score_recap = { "user_id": uid, "correct": sc.get("correct", 0), "incorrect": sc.get("incorrect", 0), "total_score": sc.get("correct", 0) * 10, # 10 poin per jawaban benar } recap_data["scores"].append(score_recap) # Urutkan skor dari tertinggi ke terendah recap_data["scores"].sort(key=lambda x: x["total_score"], reverse=True) # Kirim rekap ke semua peserta self.socketio.emit( "quiz_recap", { "message": "Kuis telah selesai. Berikut adalah rekap lengkap.", "recap": recap_data, }, room=session_code, ) print(recap_data) # Cetak rekap di konsol server print("\n🏁 Rekap Kuis Lengkap") print("=" * 50) print(f"Kode Sesi: {session_code}") print(f"Total Pertanyaan: {len(questions)}") print("\nPeringkat Peserta:") for i, score in enumerate(recap_data["scores"], 1): print( f"{i}. User {score['user_id']}: Skor {score['total_score']} (Benar: {score['correct']}, Salah: {score['incorrect']})" ) # Kirim tanda kuis telah berakhir self.socketio.emit( "quiz_done", {"message": "Kuis telah berakhir.", "session_code": session_code}, room=session_code, ) except Exception as e: print(f"Error generating quiz recap: {e}") # Kirim pesan error jika gagal membuat rekap self.socketio.emit( "quiz_error", { "message": "Terjadi kesalahan saat membuat rekap kuis.", "error": str(e), }, room=session_code, )