TIF_E41211115_Genso_quiz_ba.../app/controllers/socket_conroller.py

458 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask_socketio import SocketIO, emit, join_room, leave_room
from flask import request
from app.services import SessionService
import threading
import time
import json
from redis import Redis
class SocketController:
def __init__(
self,
socketio: SocketIO,
session_service: SessionService,
):
self.socketio = socketio
self.session_service = session_service
self._register_events()
@staticmethod
def _is_correct(user_answer, question: dict) -> bool:
"""Advanced answer validation method."""
print(f"Validating answer: user={user_answer}, question={question}")
try:
if question["type"] == "fill_the_blank":
return (
str(user_answer).strip().lower()
== str(question["target_answer"]).strip().lower()
)
elif question["type"] == "true_false":
return bool(user_answer) == question["target_answer"]
elif question["type"] == "option":
# Handle both index and text-based answers
try:
# First try numeric index comparison
return int(user_answer) == question["target_answer"]
except ValueError:
# If not an index, compare text
return (
str(user_answer).strip().lower()
== str(question["options"][question["target_answer"]])
.strip()
.lower()
)
return False
except Exception as e:
print(f"Error in answer validation: {e}")
return False
def _questions_key(self, session_code: str) -> str:
return f"session:{session_code}:questions"
def _answers_key(self, session_code: str) -> str:
return f"session:{session_code}:answers"
def _scores_key(self, session_code: str) -> str:
return f"session:{session_code}:scores"
# ---------------------------------------------------------------------
# Socket.IO event bindings
# ---------------------------------------------------------------------
def _register_events(self):
@self.socketio.on("connect")
def on_connect():
print(f"Client connected: {request.sid}")
emit("connection_response", {"status": "connected", "sid": request.sid})
@self.socketio.on("disconnect")
def on_disconnect():
print(f"Client disconnected: {request.sid}")
@self.socketio.on("join_room")
def handle_join_room(data):
session_code = data.get("session_code")
user_id = data.get("user_id")
if not session_code or not user_id:
emit("error", {"message": "session_code and user_id are required"})
return
session = self.session_service.join_session(
session_code=session_code,
user_id=user_id,
)
if session is None:
emit("error", {"message": "Failed to join session or session inactive"})
return
session_id = session["session_id"]
join_room(session_id)
if session["is_admin"]:
message = "Admin has joined the room."
else:
message = f"User {session['username']} has joined the room."
emit(
"room_message",
{
"type": "join",
"message": message,
"room": session_id,
"argument": "adm_update",
"data": {
"session_info": session["session_info"],
"quiz_info": session["quiz_info"],
},
},
to=request.sid,
)
emit(
"room_message",
{
"type": "participan_join",
"message": message,
"room": session_id,
"argument": "adm_update",
"data": {
"participants": session["session_info"]["participants"],
},
},
room=session_id,
skip_sid=request.sid,
)
@self.socketio.on("submit_answer")
def handle_submit_answer(data):
session_code = data.get("session_id")
user_id = data.get("user_id")
question_index = data.get("question_index")
user_answer = data.get("answer")
if not all(
[
session_code,
user_id,
question_index is not None,
user_answer is not None,
]
):
emit(
"error",
{
"message": "session_id, user_id, question_index, and answer are required"
},
)
return
# ----- 1. Ambil list pertanyaan untuk mendapatkan kunci jawaban ----------
questions = (
self.redis_repo.get_data(self._questions_key(session_code)) or []
)
question = next(
(q for q in questions if q["index"] == question_index), None
)
if question is None:
emit("error", {"message": "Question not found"})
return
is_correct = self._is_correct(user_answer, question)
print(
f"User {user_id} answered Q{question_index} with '{user_answer}' -> {'' if is_correct else ''}"
)
# ----- 2. Simpan jawaban ke Redis --------------------------------------
answers = self.redis_repo.get_data(self._answers_key(session_code)) or []
answers.append(
{
"user_id": user_id,
"question_index": question_index,
"answer": user_answer,
"correct": is_correct,
}
)
self.redis_repo.set_data(self._answers_key(session_code), answers)
# ----- 3. Update skor peruser -----------------------------------------
scores = self.redis_repo.get_data(self._scores_key(session_code)) or {}
user_score = scores.get(str(user_id), {"correct": 0, "incorrect": 0})
if is_correct:
user_score["correct"] += 1
else:
user_score["incorrect"] += 1
scores[str(user_id)] = user_score
self.redis_repo.set_data(self._scores_key(session_code), scores)
# ----- 4. Beri tahu user (ack) -----------------------------------------
emit(
"answer_submitted",
{
"user_id": user_id,
"question_index": question_index,
"answer": user_answer,
"correct": is_correct,
},
room=request.sid,
)
# ----- 5. Kirim update skor hanya ke admin -----------------------------
admin_sid = self.admin_sids.get(session_code)
if admin_sid:
emit("score_update", scores, room=admin_sid)
@self.socketio.on("leave_room")
def handle_leave_room(data):
session_id = data.get("session_id")
user_id = data.get("user_id")
username = data.get("username", "anonymous")
leave_result = self.session_service.leave_session(
session_id=session_id,
user_id=user_id,
)
leave_room(session_id)
if leave_result["is_success"]:
emit(
"room_message",
{
"type": "participan_leave",
"message": f"{username} has left the room.",
"room": session_id,
"argument": "adm_update",
"data": {
"participants": leave_result["participants"],
},
},
room=session_id,
skip_sid=request.sid,
)
emit(
"room_message",
{
"type": "leave",
"message": f"{username} has left the room.",
"room": session_id,
"argument": "adm_update",
"data": None,
},
room=session_id,
to=request.sid,
)
@self.socketio.on("send_message")
def on_send_message(data):
session_code = data.get("session_id")
message = data.get("message")
username = data.get("username", "anonymous")
emit(
"receive_message",
{"message": message, "from": username},
room=session_code,
)
@self.socketio.on("end_session")
def handle_end_session(data):
session_code = data.get("session_id")
user_id = data.get("user_id")
if not session_code or not user_id:
emit("error", {"message": "session_id and user_id required"})
return
# Validasi user berhak mengakhiri session
self.session_service.end_session(session_id=session_code, user_id=user_id)
# Bersihkan semua data session di Redis
for key in [
self._answers_key(session_code),
self._scores_key(session_code),
self._questions_key(session_code),
]:
self.redis_repo.delete_key(key)
emit(
"room_closed",
{"message": "Session has ended.", "room": session_code},
room=session_code,
)
@self.socketio.on("start_quiz")
def handle_start_quiz(data):
session_code = data.get("session_code")
if not session_code:
emit("error", {"message": "session_code is required"})
return
emit("quiz_started", {"message": "Quiz has started!"}, room=session_code)
threading.Thread(
target=self._simulate_quiz_flow, args=(session_code,), daemon=True
).start()
def _simulate_quiz_flow(self, session_code: str):
"""Enhanced quiz flow with better question management."""
questions = [
{
"index": 1,
"question": "Kerajaan Hindu tertua di Indonesia adalah?",
"target_answer": "Kutai",
"duration": 30,
"type": "fill_the_blank",
"points": 10,
"options": None,
},
{
"index": 2,
"question": "Apakah benar Majapahit mencapai puncak kejayaan pada masa Hayam Wuruk?",
"target_answer": True,
"duration": 30,
"type": "true_false",
"points": 10,
"options": None,
},
{
"index": 3,
"question": "Kerajaan maritim terbesar di Asia Tenggara pada abad ke7 adalah?",
"target_answer": 2,
"duration": 30,
"type": "option",
"points": 10,
"options": ["Majapahit", "Tarumanegara", "Sriwijaya", "Mataram Kuno"],
},
]
# Simpan ke Redis
self.redis_repo.set_data(self._questions_key(session_code), questions)
# Inisialisasi skor untuk semua peserta
scores = self.redis_repo.get_data(self._scores_key(session_code)) or {}
self.redis_repo.set_data(self._scores_key(session_code), scores)
# Beri jeda sebelum pertanyaan pertama
time.sleep(2)
for q in questions:
print(f"\n📢 Mengirim pertanyaan {q['index']} ke room {session_code}")
# Kirim pertanyaan tanpa jawaban yang benar
question_to_send = q.copy()
question_to_send.pop("target_answer", None)
self.socketio.emit("quiz_question", question_to_send, room=session_code)
# Tunggu durasi pertanyaan
time.sleep(q["duration"])
# Generate dan kirim rekap kuis
self._generate_quiz_recap(session_code)
def _generate_quiz_recap(self, session_code: str):
"""Comprehensive quiz recap generation."""
try:
# Ambil data dari Redis
answers = self.redis_repo.get_data(self._answers_key(session_code)) or []
scores = self.redis_repo.get_data(self._scores_key(session_code)) or {}
questions = (
self.redis_repo.get_data(self._questions_key(session_code)) or []
)
# Persiapkan data rekap
recap_data = {
"session_code": session_code,
"total_questions": len(questions),
"answers": [],
"scores": [],
"questions": [],
}
# Tambahkan detail pertanyaan
for q in questions:
question_recap = {
"index": q["index"],
"question": q["question"],
"type": q["type"],
"points": q.get("points", 10),
}
recap_data["questions"].append(question_recap)
# Tambahkan detail jawaban
for entry in answers:
# Temukan pertanyaan terkait
related_question = next(
(q for q in questions if q["index"] == entry["question_index"]),
None,
)
answer_recap = {
"user_id": entry["user_id"],
"question_index": entry["question_index"],
"answer": entry["answer"],
"correct": entry["correct"],
"question": (
related_question["question"]
if related_question
else "Pertanyaan tidak ditemukan"
),
}
recap_data["answers"].append(answer_recap)
# Tambahkan skor per pengguna
for uid, sc in scores.items():
score_recap = {
"user_id": uid,
"correct": sc.get("correct", 0),
"incorrect": sc.get("incorrect", 0),
"total_score": sc.get("correct", 0)
* 10, # 10 poin per jawaban benar
}
recap_data["scores"].append(score_recap)
# Urutkan skor dari tertinggi ke terendah
recap_data["scores"].sort(key=lambda x: x["total_score"], reverse=True)
# Kirim rekap ke semua peserta
self.socketio.emit(
"quiz_recap",
{
"message": "Kuis telah selesai. Berikut adalah rekap lengkap.",
"recap": recap_data,
},
room=session_code,
)
print(recap_data)
# Cetak rekap di konsol server
print("\n🏁 Rekap Kuis Lengkap")
print("=" * 50)
print(f"Kode Sesi: {session_code}")
print(f"Total Pertanyaan: {len(questions)}")
print("\nPeringkat Peserta:")
for i, score in enumerate(recap_data["scores"], 1):
print(
f"{i}. User {score['user_id']}: Skor {score['total_score']} (Benar: {score['correct']}, Salah: {score['incorrect']})"
)
# Kirim tanda kuis telah berakhir
self.socketio.emit(
"quiz_done",
{"message": "Kuis telah berakhir.", "session_code": session_code},
room=session_code,
)
except Exception as e:
print(f"Error generating quiz recap: {e}")
# Kirim pesan error jika gagal membuat rekap
self.socketio.emit(
"quiz_error",
{
"message": "Terjadi kesalahan saat membuat rekap kuis.",
"error": str(e),
},
room=session_code,
)