fix: logic quiz multiplayer

This commit is contained in:
akhdanre 2025-05-17 21:25:24 +07:00
parent bd8353b6d6
commit a6ef847403
12 changed files with 361 additions and 365 deletions

View File

@ -75,7 +75,7 @@ class QuizController:
def get_answer(self, quiz_id, user_id, session_id):
try:
# self.answer_service.
print("yps")
pass
except Exception as e:
return make_error_response(e)

View File

@ -83,62 +83,6 @@ class SocketController:
skip_sid=request.sid,
)
# @self.socketio.on("submit_answer")
# def handle_submit_answer(data):
# session_code = data.get("session_id")
# user_id = data.get("user_id")
# question_index = data.get("question_index")
# user_answer = data.get("answer")
# if not all(
# [
# session_code,
# user_id,
# question_index is not None,
# user_answer is not None,
# ]
# ):
# emit(
# "error",
# {
# "message": "session_id, user_id, question_index, and answer are required"
# },
# )
# return
# quiz_service = QuizService(self.redis_repo)
# question = quiz_service.get_question(session_code, question_index)
# if question is None:
# emit("error", {"message": "Question not found"})
# return
# is_correct = quiz_service.is_correct(user_answer, question)
# print(
# f"User {user_id} answered Q{question_index} with '{user_answer}' -> {'✔' if is_correct else '✖'}"
# )
# quiz_service.save_answer(
# session_code, user_id, question_index, user_answer, is_correct
# )
# scores = quiz_service.update_score(session_code, user_id, is_correct)
# emit(
# "answer_submitted",
# {
# "user_id": user_id,
# "question_index": question_index,
# "answer": user_answer,
# "correct": is_correct,
# },
# room=request.sid,
# )
# admin_sid = self.admin_sids.get(session_code)
# if admin_sid:
# emit("score_update", scores, room=admin_sid)
@self.socketio.on("leave_room")
def handle_leave_room(data):
session_id = data.get("session_id")
@ -223,7 +167,63 @@ class SocketController:
emit("quiz_started", {"message": "Quiz has started!"}, room=session_id)
threading.Thread(
target=self.session_service.simulate_quiz_flow,
target=self.session_service.run_quiz_flow,
args=(session_id, self.socketio),
daemon=True,
).start()
@self.socketio.on("submit_answer")
def handle_submit_answer(data):
session_id = data.get("session_id")
user_id = data.get("user_id")
question_index = data.get("question_index")
user_answer = data.get("answer")
time_spent = data.get("time_spent")
if not all(
[
session_id,
user_id,
question_index is not None,
user_answer is not None,
time_spent is not None,
]
):
emit(
"error",
{
"message": "session_id, user_id, question_index, and answer are required"
},
)
return
try:
result = self.session_service.submit_answer(
session_id=session_id,
user_id=user_id,
question_index=question_index,
answer=user_answer,
time_spent=time_spent,
)
except ValueError as exc:
emit("error", {"message": str(exc)})
return
emit(
"answer_submitted",
{
"question_index": result["question_index"],
"answer": result["answer"],
"correct": result["correct"],
"score": result["scores"],
},
to=request.sid,
)
emit(
"score_update",
{
"scores": self.session_service.get_ranked_scores(session_id),
},
room=session_id,
)

View File

@ -7,6 +7,9 @@ from app.repositories import (
SessionRepository,
NERSRLRepository,
SessionMemoryRepository,
QuizMemoryRepository,
AnswerMemoryRepository,
ScoreMemoryRepository,
)
from app.services import (
@ -46,6 +49,9 @@ class Container(containers.DeclarativeContainer):
session_repository = providers.Factory(SessionRepository, mongo.provided.db)
ner_srl_repository = providers.Factory(NERSRLRepository)
session_memory_repository = providers.Factory(SessionMemoryRepository, redis)
quiz_memory_repository = providers.Factory(QuizMemoryRepository, redis)
answer_memory_repository = providers.Factory(AnswerMemoryRepository, redis)
score_memory_repository = providers.Factory(ScoreMemoryRepository, redis)
# services
auth_service = providers.Factory(
@ -86,6 +92,9 @@ class Container(containers.DeclarativeContainer):
SessionService,
session_repository,
session_memory_repository,
quiz_memory_repository,
answer_memory_repository,
score_memory_repository,
user_repository,
quiz_repository,
answer_repository,

View File

@ -5,6 +5,9 @@ from .subject_repository import SubjectRepository
from .session_repostory import SessionRepository
from .ner_srl_repository import NERSRLRepository
from .session_memory_repository import SessionMemoryRepository
from .quiz_memory_repository import QuizMemoryRepository
from .answer_memory_repository import AnswerMemoryRepository
from .score_memory_repository import ScoreMemoryRepository
__all__ = [
"UserRepository",
@ -14,4 +17,7 @@ __all__ = [
"SessionRepository",
"NERSRLRepository",
"SessionMemoryRepository",
"QuizMemoryRepository",
"AnswerMemoryRepository",
"ScoreMemoryRepository",
]

View File

@ -0,0 +1,106 @@
import json
from typing import Any, Dict, List
from redis import Redis
class AnswerMemoryRepository:
KEY_TEMPLATE = "answer:{session_id}:{user_id}"
KEY_PATTERN_TEMPLATE = "answer:{session_id}:*"
def __init__(self, redis: Redis):
self.redis = redis
def _build_key(self, session_id: str, user_id: str) -> str:
return self.KEY_TEMPLATE.format(session_id=session_id, user_id=user_id)
def _build_pattern_key(self, session_id: str) -> str:
return self.KEY_PATTERN_TEMPLATE.format(session_id=session_id)
def save_user_answer(
self,
session_id: str,
user_id: str,
question_index: int,
answer: Any,
correct: bool,
time_spent: float,
):
key = self._build_key(session_id, user_id)
answers = self.get_data(key) or []
for ans in answers:
if ans["question_index"] == question_index:
ans.update(
{
"answer": answer,
"is_true": correct,
"time_spent": time_spent,
}
)
break
else:
answers.append(
{
"question_index": question_index,
"answer": answer,
"is_true": correct,
"time_spent": time_spent,
}
)
self.set_data(key, answers)
def get_user_answers(self, session_id: str, user_id: str) -> List[Dict[str, Any]]:
key = self._build_key(session_id, user_id)
return self.get_data(key) or []
def get_all_user_answers(self, session_id: str) -> Dict[str, List[Dict[str, Any]]]:
pattern = self._build_pattern_key(session_id)
keys = self.redis.keys(pattern)
all_answers = {}
for key in keys:
user_id = key.decode().split(":")[-1]
all_answers[user_id] = self.get_data(key) or []
return all_answers
def delete_all_answers(self, session_id: str):
pattern = self._build_pattern_key(session_id)
keys = self.redis.keys(pattern)
for key in keys:
self.redis.delete(key)
def set_data(self, key: str, value: Any):
self.redis.set(key, json.dumps(value))
def get_data(self, key: str) -> Any:
data = self.redis.get(key)
return json.loads(data) if data else None
def auto_fill_incorrect_answers(
self,
session_id: str,
question_index: int,
default_time_spent: float = 0.0,
):
pattern = self._build_pattern_key(session_id)
keys = self.redis.keys(pattern)
for key in keys:
answers = self.get_data(key) or []
has_answered = any(
ans["question_index"] == question_index for ans in answers
)
if not has_answered:
answers.append(
{
"question_index": question_index,
"answer": "",
"is_true": False,
"time_spent": default_time_spent,
}
)
self.set_data(key, answers)

View File

@ -0,0 +1,32 @@
import json
from typing import Dict, Any, Optional
from redis import Redis
from app.helpers import DatetimeUtil
from app.models.entities import QuizEntity
class QuizMemoryRepository:
KEY_TEMPLATE = "quiz:{session_id}"
def __init__(self, redis: Redis):
self.redis = redis
def _build_key(self, session_id: str) -> str:
return self.KEY_TEMPLATE.format(session_id=session_id)
def set_quiz_for_session(self, session_id: str, quiz_data: QuizEntity):
data = quiz_data.model_dump()
data["id"] = str(data["id"])
data["date"] = DatetimeUtil.to_string(data["date"])
self.redis.set(self._build_key(session_id), json.dumps(data))
def get_quiz_for_session(self, session_id: str) -> Optional[QuizEntity]:
data = self.redis.get(self._build_key(session_id))
if data:
data = json.loads(data)
data["date"] = DatetimeUtil.from_string(data["date"])
return QuizEntity(**data)
return None
def delete_quiz_for_session(self, session_id: str):
self.redis.delete(self._build_key(session_id))

View File

@ -86,7 +86,7 @@ class QuizRepository:
object_ids = [ObjectId(qid) for qid in quiz_ids]
cursor = self.collection.find({"_id": {"$in": object_ids}})
datas = list(cursor)
print(datas)
if not datas:
return None

View File

@ -0,0 +1,28 @@
from typing import Dict
from redis import Redis
class ScoreMemoryRepository:
KEY_TEMPLATE = "score:{session_id}"
def __init__(self, redis: Redis):
self.redis = redis
def _build_key(self, session_id: str) -> str:
return self.KEY_TEMPLATE.format(session_id=session_id)
def update_user_score(self, session_id: str, user_id: str, correct: bool):
hkey = self._build_key(session_id)
field = f"{user_id}:{'correct' if correct else 'incorrect'}"
self.redis.hincrby(hkey, field, 1)
def get_scores(self, session_id: str) -> Dict[str, Dict[str, int]]:
raw = self.redis.hgetall(self._build_key(session_id))
scores = {}
for k, v in raw.items():
uid, category = k.decode().split(":")
scores.setdefault(uid, {"correct": 0, "incorrect": 0})[category] = int(v)
return scores
def delete_scores(self, session_id: str):
self.redis.delete(self._build_key(session_id))

View File

@ -1,114 +1,60 @@
import json
from typing import Dict, Any, List, Optional
from typing import Dict, Any, Optional
from redis import Redis
from app.helpers import DatetimeUtil
from app.models.entities import SessionEntity, QuizEntity
from app.models.entities import SessionEntity
class SessionMemoryRepository:
KEY_TEMPLATE = "session:{session_id}"
KEY_PATTERN = "session:*"
def __init__(self, redis: Redis):
self.redis = redis
def _build_key(self, session_id: str) -> str:
return self.KEY_TEMPLATE.format(session_id=session_id)
def set_data(self, key: str, value: Any):
"""
Set data in Redis
:param key: Redis key
:param value: Value to store
"""
self.redis.set(key, json.dumps(value))
def get_data(self, key: str) -> Optional[Any]:
"""
Get data from Redis
:param key: Redis key
:return: Decoded JSON data or None
"""
data = self.redis.get(key)
return json.loads(data) if data else None
def delete_key(self, key: str):
"""
Delete a key from Redis
:param key: Redis key to delete
"""
self.redis.delete(key)
def create_session(self, session_id: str, initial_data: SessionEntity) -> str:
"""
Create a new session
:param session_id: ID for the session
:param initial_data: Initial data for the session
:return: Session ID
"""
data = initial_data.model_dump()
data["id"] = data["id"]
data["created_at"] = str(data["created_at"])
self.set_data(f"session:{session_id}", data)
self.set_data(self._build_key(session_id), data)
return session_id
def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
"""
Retrieve a session by its ID
:param session_id: ID of the session
:return: Session data or None if not found
"""
return self.get_data(f"session:{session_id}")
return self.get_data(self._build_key(session_id))
def close_session(
self, session_id: str, additional_data: Optional[Dict[str, Any]] = None
) -> Optional[Dict[str, Any]]:
"""
Close/end a session and return its data before deleting
:param session_id: ID of the session to close
:param additional_data: Optional extra data to add when closing
:return: Session data if found and closed, None otherwise
"""
# Ambil data session
session = self.get_data(f"session:{session_id}")
def close_session(self, session_id: str) -> Optional[Dict[str, Any]]:
session = self.get_data(self._build_key(session_id))
if not session:
return None
session["status"] = "closed"
session["closed_at"] = DatetimeUtil.now_iso
if additional_data:
session.update(additional_data)
try:
self.delete_key(f"session:{session_id}")
return session
except Exception as e:
print(f"Error closing session {session_id}: {e}")
session["closed_at"] = DatetimeUtil.now_iso()
self.delete_key(self._build_key(session_id))
return session
def find_session_by_code(self, session_code: str) -> Optional[Dict[str, Any]]:
"""
Find a session by its session code.
:param session_code: Session code to search for.
:return: Session data if found, None otherwise.
"""
# Dapatkan semua session keys
session_keys = self.redis.keys("session:*")
session_keys = self.redis.keys(self.KEY_PATTERN)
for key in session_keys:
session_data = self.get_data(key)
if session_data and session_data.get("session_code") == session_code:
return session_data
return None
def add_user_to_session(
self, session_id: str, user_data: Dict[str, Any] = None
) -> bool:
"""
Add a user to an existing session.
:param session_id: ID of the session.
:param user_id: ID of the user to add.
:param user_data: Optional additional data about the user in the session.
:return: True if user was added successfully, False if session doesn't exist.
"""
session = self.get_session(session_id)
if not session:
return False
@ -118,131 +64,29 @@ class SessionMemoryRepository:
"joined_at": DatetimeUtil.now_iso(),
}
print("Final user_entry:", user_entry)
existing_users = session.get("participants", [])
existing_users.append(user_entry)
session["participants"] = existing_users
self.set_data(f"session:{session_id}", session)
self.set_data(self._build_key(session_id), session)
return True
def get_user_in_session(self, session_id: str):
session = self.get_session(session_id)
if not session:
return []
existing_users = session.get("participants", [])
return existing_users
return session.get("participants", [])
def remove_user_from_session(self, session_id: str, user_id: str) -> bool:
"""
Remove a user from a session
:param session_id: ID of the session
:param user_id: ID of the user to remove
:return: True if user was removed, False if session doesn't exist or user not in session
"""
session = self.get_session(session_id)
if not session:
return False
session["participants"] = [
user for user in session.get("participants", []) if user["id"] != user_id
user
for user in session.get("participants", [])
if user.get("id") != user_id
]
self.set_data(f"session:{session_id}", session)
self.set_data(self._build_key(session_id), session)
return True
def set_quiz_for_session(self, session_id: str, quiz_data: QuizEntity):
"""
Store quiz questions for a session.
"""
data = quiz_data.model_dump()
data["id"] = str(data["id"]) ## objectId
data["date"] = DatetimeUtil.to_string(data["date"])
self.set_data(f"session:{session_id}:quiz", data)
def get_quiz_for_session(self, session_id: str) -> QuizEntity:
"""
Retrieve quiz questions for a session.
"""
data = self.get_data(f"session:{session_id}:quiz")
print(data)
data["date"] = DatetimeUtil.from_string(data["date"])
return QuizEntity(**data)
def delete_quiz_for_session(self, session_id: str):
"""
Delete quiz data for a session.
"""
self.delete_key(f"session:{session_id}:quiz")
def save_user_answer(
self,
session_id: str,
user_id: str,
question_index: int,
answer: Any,
correct: bool,
):
"""
Save user answer for a session.
"""
key = f"session:{session_id}:answers"
answers = self.get_data(key) or []
answers.append(
{
"user_id": user_id,
"question_index": question_index,
"answer": answer,
"correct": correct,
}
)
self.set_data(key, answers)
def get_all_answers(self, session_id: str) -> List[Dict[str, Any]]:
"""
Retrieve all answers for a session.
"""
return self.get_data(f"session:{session_id}:answers") or []
def delete_all_answers(self, session_id: str):
"""
Delete all answers for a session.
"""
self.delete_key(f"session:{session_id}:answers")
def update_user_score(
self, session_id: str, user_id: str, correct: bool
) -> Dict[str, Dict[str, int]]:
"""
Update the user's score based on the latest answer.
"""
key = f"session:{session_id}:scores"
scores = self.get_data(key) or {}
user_score = scores.get(str(user_id), {"correct": 0, "incorrect": 0})
if correct:
user_score["correct"] += 1
else:
user_score["incorrect"] += 1
scores[str(user_id)] = user_score
self.set_data(key, scores)
return scores
def get_scores(self, session_id: str) -> Dict[str, Dict[str, int]]:
"""
Retrieve all user scores for a session.
"""
return self.get_data(f"session:{session_id}:scores") or {}
def delete_scores(self, session_id: str):
"""
Delete all scores for a session.
"""
self.delete_key(f"session:{session_id}:scores")

View File

@ -23,7 +23,7 @@ class HistoryService:
quiz_ids = [asn.quiz_id for asn in answer_data]
quiz_data = self.quiz_repository.get_by_ids(quiz_ids)
quiz_map = {str(quiz.id): quiz for quiz in quiz_data}
print(quiz_map)
result = []
for answer in answer_data:
quiz = quiz_map.get(answer.quiz_id)

View File

@ -63,7 +63,7 @@ class QuizService:
total_user_quiz = self.quiz_repository.count_by_user_id(user_id)
print(total_user_quiz)
user = self.user_repostory.get_user_by_id(user_id)

View File

@ -1,4 +1,4 @@
from typing import Optional
from typing import Any, Dict, Optional
from uuid import uuid4
from app.repositories import (
SessionRepository,
@ -6,6 +6,9 @@ from app.repositories import (
SessionMemoryRepository,
QuizRepository,
UserAnswerRepository,
QuizMemoryRepository,
AnswerMemoryRepository,
ScoreMemoryRepository,
)
from app.models.entities import SessionEntity
from app.helpers import DatetimeUtil
@ -16,14 +19,20 @@ import time
class SessionService:
def __init__(
self,
repository_mongo: SessionRepository,
repository_redis: SessionMemoryRepository,
session_mongo_repository: SessionRepository,
session_redis_repository: SessionMemoryRepository,
quiz_redis_repository: QuizMemoryRepository,
answer_redis_repository: AnswerMemoryRepository,
score_redis_repostory: ScoreMemoryRepository,
user_repository: UserRepository,
quiz_repository: QuizRepository,
answer_repository: UserAnswerRepository,
):
self.repository_mongo = repository_mongo
self.repository_redis = repository_redis
self.session_mongo_repository = session_mongo_repository
self.session_redis_repository = session_redis_repository
self.quiz_redis_repository = quiz_redis_repository
self.answer_redis_repository = answer_redis_repository
self.score_redis_repository = score_redis_repostory
self.user_repository = user_repository
self.quiz_repository = quiz_repository
self.answer_repository = answer_repository
@ -40,11 +49,11 @@ class SessionService:
current_question_index=0,
is_active=True,
)
session_id = self.repository_mongo.insert(session)
session_id = self.session_mongo_repository.insert(session)
session.id = session_id
self.repository_redis.create_session(session_id, session)
self.session_redis_repository.create_session(session_id, session)
data = self.quiz_repository.get_by_id(quiz_id=quiz_id)
self.repository_redis.set_quiz_for_session(session_id, data)
self.quiz_redis_repository.set_quiz_for_session(session_id, data)
return {
"session_id": session_id,
"session_code": generateed_code,
@ -52,7 +61,7 @@ class SessionService:
def join_session(self, session_code: str, user_id: str) -> dict:
user = self.user_repository.get_user_by_id(user_id)
session = self.repository_redis.find_session_by_code(session_code)
session = self.session_redis_repository.find_session_by_code(session_code)
if session is None:
return None
@ -62,7 +71,7 @@ class SessionService:
u["id"] == user_id for u in session.get("participants", [])
)
session_quiz = self.repository_redis.get_quiz_for_session(session["id"])
session_quiz = self.quiz_redis_repository.get_quiz_for_session(session["id"])
quiz_info = {
"title": session_quiz.title,
@ -91,7 +100,7 @@ class SessionService:
"quiz_info": quiz_info,
"new_user": not is_existing_user,
}
self.repository_redis.add_user_to_session(
self.session_redis_repository.add_user_to_session(
session_id=session["id"],
user_data={
"id": str(user.id),
@ -99,7 +108,7 @@ class SessionService:
"user_pic": user.pic_url,
},
)
session = self.repository_redis.get_session(session["id"])
session = self.session_redis_repository.get_session(session["id"])
response = {
"session_id": session_id,
@ -114,46 +123,20 @@ class SessionService:
return response
def leave_session(self, session_id: str, user_id: str) -> dict:
is_success = self.repository_redis.remove_user_from_session(session_id, user_id)
is_success = self.session_redis_repository.remove_user_from_session(
session_id, user_id
)
if is_success:
participant_left = self.repository_redis.get_user_in_session(session_id)
participant_left = self.session_redis_repository.get_user_in_session(
session_id
)
return {"is_success": True, "participants": participant_left}
return {"is_success": False}
def start_session(self, session_id: str) -> bool:
now = DatetimeUtil.now_iso()
return self.repository.update(session_id, {"started_at": now})
def end_session(self, session_id: str, user_id: str):
session = self.repository.find_by_session_id(session_id)
if session and session.host_id == user_id:
session.is_active = False
self.repository.update(session_id, {"is_active": False})
def advance_question(self, session_id: str) -> Optional[int]:
session = self.repository.find_by_session_id(session_id)
if not session:
return None
current_index = session.get("current_question_index", 0)
total = session.get("total_questions", 0)
if current_index + 1 >= total:
self.end_session(session_id)
return None
new_index = current_index + 1
self.repository.update(session_id, {"current_question_index": new_index})
return new_index
def get_session(self, session_id: str) -> Optional[SessionEntity]:
session = self.repository.find_by_session_id(session_id)
return SessionEntity(**session) if session else None
def simulate_quiz_flow(self, session_id: str, socketio: SocketIO):
quiz = self.repository_redis.get_quiz_for_session(session_id)
def run_quiz_flow(self, session_id: str, socketio: SocketIO):
quiz = self.quiz_redis_repository.get_quiz_for_session(session_id)
questions = quiz.question_listings
time.sleep(2)
@ -165,88 +148,76 @@ class SessionService:
socketio.emit("quiz_question", question_to_send, room=session_id)
time.sleep(q.duration)
self.answer_redis_repository.auto_fill_incorrect_answers(
session_id=session_id,
question_index=q.index,
default_time_spent=q.duration,
)
def generate_quiz_recap(self, session_id: str, socketio: SocketIO):
try:
socketio.emit("quiz_done", room=session_id)
answers = self.repository_redis.get_all_answers(session_id)
scores = self.repository_redis.get_scores(session_id)
quiz = self.repository_redis.get_quiz_for_session(session_id)
questions = quiz.question_listings
def submit_answer(
self,
session_id: str,
user_id: str,
question_index: int,
answer: Any,
time_spent: int,
) -> Dict[str, Any]:
recap_data = {
"session_id": session_id,
"total_questions": len(questions),
"answers": [],
"scores": [],
"questions": [],
}
quiz = self.quiz_redis_repository.get_quiz_for_session(session_id)
recap_data["questions"] = [
{
"index": q["index"],
"question": q["question"],
"type": q["type"],
}
for q in questions
]
for entry in answers:
related_question = next(
(q for q in questions if q["index"] == entry["question_index"]),
question = next(
(q for q in quiz.question_listings if q.index == question_index),
None,
)
recap_data["answers"].append(
{
"user_id": entry["user_id"],
"question_index": entry["question_index"],
"answer": entry["answer"],
"correct": entry["correct"],
"question": (
related_question["question"]
if related_question
else "Pertanyaan tidak ditemukan"
),
}
if question is None:
raise ValueError(
f"Question {question_index} not found in session {session_id}"
)
for uid, sc in scores.items():
recap_data["scores"].append(
is_correct = self._is_correct(question, answer)
self.answer_redis_repository.save_user_answer(
session_id=session_id,
user_id=user_id,
question_index=question_index,
answer=answer,
correct=is_correct,
time_spent=time_spent,
)
scores = self.score_redis_repository.update_user_score(
session_id=session_id,
user_id=user_id,
correct=is_correct,
)
return {
"user_id": user_id,
"question_index": question_index,
"answer": answer,
"correct": is_correct,
"scores": scores,
}
def get_ranked_scores(self, session_id: str):
raw = self.score_redis_repository.get_scores(session_id)
ranked = [
{
"user_id": uid,
"correct": sc.get("correct", 0),
"incorrect": sc.get("incorrect", 0),
"total_score": sc.get("correct", 0) * 10,
"correct": v.get("correct", 0),
"incorrect": v.get("incorrect", 0),
"total_score": v.get("correct", 0) * 10,
}
)
for uid, v in raw.items()
]
ranked.sort(key=lambda x: x["total_score"], reverse=True)
return ranked
# Urutkan skor tertinggi ke terendah
recap_data["scores"].sort(key=lambda x: x["total_score"], reverse=True)
# Emit recap data ke semua peserta
socketio.emit(
"quiz_recap",
{
"message": "Kuis telah selesai. Berikut adalah rekap lengkap.",
"recap": recap_data,
},
room=session_id,
)
# Emit informasi bahwa kuis telah berakhir
socketio.emit(
"quiz_done",
{"message": "Kuis telah berakhir.", "session_id": session_id},
room=session_id,
)
except Exception as e:
# Tangani error dan informasikan ke peserta
socketio.emit(
"quiz_error",
{
"message": "Terjadi kesalahan saat membuat rekap kuis.",
"error": str(e),
},
room=session_id,
)
def _is_correct(self, q, ans) -> bool:
if q.type in {"multiple_choice", "true_false"}:
return str(ans).strip().lower() == str(q.target_answer).strip().lower()
if q.type == "essay":
return str(q.target_answer).lower() in str(ans).lower()
# fallback
return False