feat: working on the answer

This commit is contained in:
akhdanre 2025-05-17 17:24:10 +07:00
parent 3b6f827ae6
commit bd8353b6d6
3 changed files with 224 additions and 282 deletions

View File

@ -2,7 +2,6 @@ from flask_socketio import SocketIO, emit, join_room, leave_room
from flask import request from flask import request
from app.services import SessionService from app.services import SessionService
import threading import threading
import time
import json import json
from redis import Redis from redis import Redis
@ -17,49 +16,6 @@ class SocketController:
self.session_service = session_service self.session_service = session_service
self._register_events() self._register_events()
@staticmethod
def _is_correct(user_answer, question: dict) -> bool:
"""Advanced answer validation method."""
print(f"Validating answer: user={user_answer}, question={question}")
try:
if question["type"] == "fill_the_blank":
return (
str(user_answer).strip().lower()
== str(question["target_answer"]).strip().lower()
)
elif question["type"] == "true_false":
return bool(user_answer) == question["target_answer"]
elif question["type"] == "option":
# Handle both index and text-based answers
try:
# First try numeric index comparison
return int(user_answer) == question["target_answer"]
except ValueError:
# If not an index, compare text
return (
str(user_answer).strip().lower()
== str(question["options"][question["target_answer"]])
.strip()
.lower()
)
return False
except Exception as e:
print(f"Error in answer validation: {e}")
return False
def _questions_key(self, session_code: str) -> str:
return f"session:{session_code}:questions"
def _answers_key(self, session_code: str) -> str:
return f"session:{session_code}:answers"
def _scores_key(self, session_code: str) -> str:
return f"session:{session_code}:scores"
# ---------------------------------------------------------------------
# Socket.IO event bindings
# ---------------------------------------------------------------------
def _register_events(self): def _register_events(self):
@self.socketio.on("connect") @self.socketio.on("connect")
def on_connect(): def on_connect():
@ -127,84 +83,61 @@ class SocketController:
skip_sid=request.sid, skip_sid=request.sid,
) )
@self.socketio.on("submit_answer") # @self.socketio.on("submit_answer")
def handle_submit_answer(data): # def handle_submit_answer(data):
session_code = data.get("session_id") # session_code = data.get("session_id")
user_id = data.get("user_id") # user_id = data.get("user_id")
question_index = data.get("question_index") # question_index = data.get("question_index")
user_answer = data.get("answer") # user_answer = data.get("answer")
if not all( # if not all(
[ # [
session_code, # session_code,
user_id, # user_id,
question_index is not None, # question_index is not None,
user_answer is not None, # user_answer is not None,
] # ]
): # ):
emit( # emit(
"error", # "error",
{ # {
"message": "session_id, user_id, question_index, and answer are required" # "message": "session_id, user_id, question_index, and answer are required"
}, # },
) # )
return # return
# ----- 1. Ambil list pertanyaan untuk mendapatkan kunci jawaban ---------- # quiz_service = QuizService(self.redis_repo)
questions = ( # question = quiz_service.get_question(session_code, question_index)
self.redis_repo.get_data(self._questions_key(session_code)) or []
)
question = next(
(q for q in questions if q["index"] == question_index), None
)
if question is None:
emit("error", {"message": "Question not found"})
return
is_correct = self._is_correct(user_answer, question) # if question is None:
# emit("error", {"message": "Question not found"})
# return
print( # is_correct = quiz_service.is_correct(user_answer, question)
f"User {user_id} answered Q{question_index} with '{user_answer}' -> {'' if is_correct else ''}"
)
# ----- 2. Simpan jawaban ke Redis -------------------------------------- # print(
answers = self.redis_repo.get_data(self._answers_key(session_code)) or [] # f"User {user_id} answered Q{question_index} with '{user_answer}' -> {'✔' if is_correct else '✖'}"
answers.append( # )
{
"user_id": user_id,
"question_index": question_index,
"answer": user_answer,
"correct": is_correct,
}
)
self.redis_repo.set_data(self._answers_key(session_code), answers)
# ----- 3. Update skor peruser ----------------------------------------- # quiz_service.save_answer(
scores = self.redis_repo.get_data(self._scores_key(session_code)) or {} # session_code, user_id, question_index, user_answer, is_correct
user_score = scores.get(str(user_id), {"correct": 0, "incorrect": 0}) # )
if is_correct: # scores = quiz_service.update_score(session_code, user_id, is_correct)
user_score["correct"] += 1
else:
user_score["incorrect"] += 1
scores[str(user_id)] = user_score
self.redis_repo.set_data(self._scores_key(session_code), scores)
# ----- 4. Beri tahu user (ack) ----------------------------------------- # emit(
emit( # "answer_submitted",
"answer_submitted", # {
{ # "user_id": user_id,
"user_id": user_id, # "question_index": question_index,
"question_index": question_index, # "answer": user_answer,
"answer": user_answer, # "correct": is_correct,
"correct": is_correct, # },
}, # room=request.sid,
room=request.sid, # )
)
# ----- 5. Kirim update skor hanya ke admin ----------------------------- # admin_sid = self.admin_sids.get(session_code)
admin_sid = self.admin_sids.get(session_code) # if admin_sid:
if admin_sid: # emit("score_update", scores, room=admin_sid)
emit("score_update", scores, room=admin_sid)
@self.socketio.on("leave_room") @self.socketio.on("leave_room")
def handle_leave_room(data): def handle_leave_room(data):
@ -283,175 +216,14 @@ class SocketController:
@self.socketio.on("start_quiz") @self.socketio.on("start_quiz")
def handle_start_quiz(data): def handle_start_quiz(data):
session_code = data.get("session_code") session_id = data.get("session_id")
if not session_code: if not session_id:
emit("error", {"message": "session_code is required"}) emit("error", {"message": "session_id is required"})
return return
emit("quiz_started", {"message": "Quiz has started!"}, room=session_code) emit("quiz_started", {"message": "Quiz has started!"}, room=session_id)
threading.Thread( threading.Thread(
target=self._simulate_quiz_flow, args=(session_code,), daemon=True target=self.session_service.simulate_quiz_flow,
args=(session_id, self.socketio),
daemon=True,
).start() ).start()
def _simulate_quiz_flow(self, session_code: str):
"""Enhanced quiz flow with better question management."""
questions = [
{
"index": 1,
"question": "Kerajaan Hindu tertua di Indonesia adalah?",
"target_answer": "Kutai",
"duration": 30,
"type": "fill_the_blank",
"points": 10,
"options": None,
},
{
"index": 2,
"question": "Apakah benar Majapahit mencapai puncak kejayaan pada masa Hayam Wuruk?",
"target_answer": True,
"duration": 30,
"type": "true_false",
"points": 10,
"options": None,
},
{
"index": 3,
"question": "Kerajaan maritim terbesar di Asia Tenggara pada abad ke7 adalah?",
"target_answer": 2,
"duration": 30,
"type": "option",
"points": 10,
"options": ["Majapahit", "Tarumanegara", "Sriwijaya", "Mataram Kuno"],
},
]
# Simpan ke Redis
self.redis_repo.set_data(self._questions_key(session_code), questions)
# Inisialisasi skor untuk semua peserta
scores = self.redis_repo.get_data(self._scores_key(session_code)) or {}
self.redis_repo.set_data(self._scores_key(session_code), scores)
# Beri jeda sebelum pertanyaan pertama
time.sleep(2)
for q in questions:
print(f"\n📢 Mengirim pertanyaan {q['index']} ke room {session_code}")
# Kirim pertanyaan tanpa jawaban yang benar
question_to_send = q.copy()
question_to_send.pop("target_answer", None)
self.socketio.emit("quiz_question", question_to_send, room=session_code)
# Tunggu durasi pertanyaan
time.sleep(q["duration"])
# Generate dan kirim rekap kuis
self._generate_quiz_recap(session_code)
def _generate_quiz_recap(self, session_code: str):
"""Comprehensive quiz recap generation."""
try:
# Ambil data dari Redis
answers = self.redis_repo.get_data(self._answers_key(session_code)) or []
scores = self.redis_repo.get_data(self._scores_key(session_code)) or {}
questions = (
self.redis_repo.get_data(self._questions_key(session_code)) or []
)
# Persiapkan data rekap
recap_data = {
"session_code": session_code,
"total_questions": len(questions),
"answers": [],
"scores": [],
"questions": [],
}
# Tambahkan detail pertanyaan
for q in questions:
question_recap = {
"index": q["index"],
"question": q["question"],
"type": q["type"],
"points": q.get("points", 10),
}
recap_data["questions"].append(question_recap)
# Tambahkan detail jawaban
for entry in answers:
# Temukan pertanyaan terkait
related_question = next(
(q for q in questions if q["index"] == entry["question_index"]),
None,
)
answer_recap = {
"user_id": entry["user_id"],
"question_index": entry["question_index"],
"answer": entry["answer"],
"correct": entry["correct"],
"question": (
related_question["question"]
if related_question
else "Pertanyaan tidak ditemukan"
),
}
recap_data["answers"].append(answer_recap)
# Tambahkan skor per pengguna
for uid, sc in scores.items():
score_recap = {
"user_id": uid,
"correct": sc.get("correct", 0),
"incorrect": sc.get("incorrect", 0),
"total_score": sc.get("correct", 0)
* 10, # 10 poin per jawaban benar
}
recap_data["scores"].append(score_recap)
# Urutkan skor dari tertinggi ke terendah
recap_data["scores"].sort(key=lambda x: x["total_score"], reverse=True)
# Kirim rekap ke semua peserta
self.socketio.emit(
"quiz_recap",
{
"message": "Kuis telah selesai. Berikut adalah rekap lengkap.",
"recap": recap_data,
},
room=session_code,
)
print(recap_data)
# Cetak rekap di konsol server
print("\n🏁 Rekap Kuis Lengkap")
print("=" * 50)
print(f"Kode Sesi: {session_code}")
print(f"Total Pertanyaan: {len(questions)}")
print("\nPeringkat Peserta:")
for i, score in enumerate(recap_data["scores"], 1):
print(
f"{i}. User {score['user_id']}: Skor {score['total_score']} (Benar: {score['correct']}, Salah: {score['incorrect']})"
)
# Kirim tanda kuis telah berakhir
self.socketio.emit(
"quiz_done",
{"message": "Kuis telah berakhir.", "session_code": session_code},
room=session_code,
)
except Exception as e:
print(f"Error generating quiz recap: {e}")
# Kirim pesan error jika gagal membuat rekap
self.socketio.emit(
"quiz_error",
{
"message": "Terjadi kesalahan saat membuat rekap kuis.",
"error": str(e),
},
room=session_code,
)

View File

@ -169,6 +169,7 @@ class SessionMemoryRepository:
Retrieve quiz questions for a session. Retrieve quiz questions for a session.
""" """
data = self.get_data(f"session:{session_id}:quiz") data = self.get_data(f"session:{session_id}:quiz")
print(data)
data["date"] = DatetimeUtil.from_string(data["date"]) data["date"] = DatetimeUtil.from_string(data["date"])
return QuizEntity(**data) return QuizEntity(**data)
@ -177,3 +178,71 @@ class SessionMemoryRepository:
Delete quiz data for a session. Delete quiz data for a session.
""" """
self.delete_key(f"session:{session_id}:quiz") self.delete_key(f"session:{session_id}:quiz")
def save_user_answer(
self,
session_id: str,
user_id: str,
question_index: int,
answer: Any,
correct: bool,
):
"""
Save user answer for a session.
"""
key = f"session:{session_id}:answers"
answers = self.get_data(key) or []
answers.append(
{
"user_id": user_id,
"question_index": question_index,
"answer": answer,
"correct": correct,
}
)
self.set_data(key, answers)
def get_all_answers(self, session_id: str) -> List[Dict[str, Any]]:
"""
Retrieve all answers for a session.
"""
return self.get_data(f"session:{session_id}:answers") or []
def delete_all_answers(self, session_id: str):
"""
Delete all answers for a session.
"""
self.delete_key(f"session:{session_id}:answers")
def update_user_score(
self, session_id: str, user_id: str, correct: bool
) -> Dict[str, Dict[str, int]]:
"""
Update the user's score based on the latest answer.
"""
key = f"session:{session_id}:scores"
scores = self.get_data(key) or {}
user_score = scores.get(str(user_id), {"correct": 0, "incorrect": 0})
if correct:
user_score["correct"] += 1
else:
user_score["incorrect"] += 1
scores[str(user_id)] = user_score
self.set_data(key, scores)
return scores
def get_scores(self, session_id: str) -> Dict[str, Dict[str, int]]:
"""
Retrieve all user scores for a session.
"""
return self.get_data(f"session:{session_id}:scores") or {}
def delete_scores(self, session_id: str):
"""
Delete all scores for a session.
"""
self.delete_key(f"session:{session_id}:scores")

View File

@ -9,6 +9,8 @@ from app.repositories import (
) )
from app.models.entities import SessionEntity from app.models.entities import SessionEntity
from app.helpers import DatetimeUtil from app.helpers import DatetimeUtil
from flask_socketio import SocketIO
import time
class SessionService: class SessionService:
@ -149,3 +151,102 @@ class SessionService:
def get_session(self, session_id: str) -> Optional[SessionEntity]: def get_session(self, session_id: str) -> Optional[SessionEntity]:
session = self.repository.find_by_session_id(session_id) session = self.repository.find_by_session_id(session_id)
return SessionEntity(**session) if session else None return SessionEntity(**session) if session else None
def simulate_quiz_flow(self, session_id: str, socketio: SocketIO):
quiz = self.repository_redis.get_quiz_for_session(session_id)
questions = quiz.question_listings
time.sleep(2)
for q in questions:
print(f"\nMengirim pertanyaan {q.index} ke room {session_id}")
question_to_send = q.model_dump(exclude={"target_answer"})
socketio.emit("quiz_question", question_to_send, room=session_id)
time.sleep(q.duration)
def generate_quiz_recap(self, session_id: str, socketio: SocketIO):
try:
answers = self.repository_redis.get_all_answers(session_id)
scores = self.repository_redis.get_scores(session_id)
quiz = self.repository_redis.get_quiz_for_session(session_id)
questions = quiz.question_listings
recap_data = {
"session_id": session_id,
"total_questions": len(questions),
"answers": [],
"scores": [],
"questions": [],
}
recap_data["questions"] = [
{
"index": q["index"],
"question": q["question"],
"type": q["type"],
}
for q in questions
]
for entry in answers:
related_question = next(
(q for q in questions if q["index"] == entry["question_index"]),
None,
)
recap_data["answers"].append(
{
"user_id": entry["user_id"],
"question_index": entry["question_index"],
"answer": entry["answer"],
"correct": entry["correct"],
"question": (
related_question["question"]
if related_question
else "Pertanyaan tidak ditemukan"
),
}
)
for uid, sc in scores.items():
recap_data["scores"].append(
{
"user_id": uid,
"correct": sc.get("correct", 0),
"incorrect": sc.get("incorrect", 0),
"total_score": sc.get("correct", 0) * 10,
}
)
# Urutkan skor tertinggi ke terendah
recap_data["scores"].sort(key=lambda x: x["total_score"], reverse=True)
# Emit recap data ke semua peserta
socketio.emit(
"quiz_recap",
{
"message": "Kuis telah selesai. Berikut adalah rekap lengkap.",
"recap": recap_data,
},
room=session_id,
)
# Emit informasi bahwa kuis telah berakhir
socketio.emit(
"quiz_done",
{"message": "Kuis telah berakhir.", "session_id": session_id},
room=session_id,
)
except Exception as e:
# Tangani error dan informasikan ke peserta
socketio.emit(
"quiz_error",
{
"message": "Terjadi kesalahan saat membuat rekap kuis.",
"error": str(e),
},
room=session_id,
)