335 lines
12 KiB
Python
335 lines
12 KiB
Python
from typing import Any, Dict
|
|
from uuid import uuid4
|
|
from app.repositories import (
|
|
SessionRepository,
|
|
UserRepository,
|
|
SessionMemoryRepository,
|
|
QuizRepository,
|
|
UserAnswerRepository,
|
|
QuizMemoryRepository,
|
|
AnswerMemoryRepository,
|
|
ScoreMemoryRepository,
|
|
)
|
|
from app.models.entities import SessionEntity, UserAnswerEntity, AnswerItemEntity
|
|
from app.helpers import DatetimeUtil
|
|
from flask_socketio import SocketIO
|
|
import time
|
|
from bson import ObjectId
|
|
|
|
|
|
class SessionService:
|
|
def __init__(
|
|
self,
|
|
session_mongo_repository: SessionRepository,
|
|
session_redis_repository: SessionMemoryRepository,
|
|
quiz_redis_repository: QuizMemoryRepository,
|
|
answer_redis_repository: AnswerMemoryRepository,
|
|
score_redis_repostory: ScoreMemoryRepository,
|
|
user_repository: UserRepository,
|
|
quiz_repository: QuizRepository,
|
|
answer_repository: UserAnswerRepository,
|
|
):
|
|
self.session_mongo_repository = session_mongo_repository
|
|
self.session_redis_repository = session_redis_repository
|
|
self.quiz_redis_repository = quiz_redis_repository
|
|
self.answer_redis_repository = answer_redis_repository
|
|
self.score_redis_repository = score_redis_repostory
|
|
self.user_repository = user_repository
|
|
self.quiz_repository = quiz_repository
|
|
self.answer_repository = answer_repository
|
|
|
|
def create_session(self, quiz_id: str, host_id: str, limit_participan: int) -> str:
|
|
generateed_code = uuid4().hex[:6].upper()
|
|
session = SessionEntity(
|
|
session_code=generateed_code,
|
|
quiz_id=quiz_id,
|
|
host_id=host_id,
|
|
created_at=DatetimeUtil.now_iso(),
|
|
limit_participan=limit_participan,
|
|
participants=[],
|
|
current_question_index=0,
|
|
is_active=True,
|
|
)
|
|
session_id = self.session_mongo_repository.insert(session)
|
|
session.id = session_id
|
|
self.session_redis_repository.create_session(session_id, session)
|
|
data = self.quiz_repository.get_by_id(quiz_id=quiz_id)
|
|
self.quiz_redis_repository.set_quiz_for_session(session_id, data)
|
|
return {
|
|
"session_id": session_id,
|
|
"session_code": generateed_code,
|
|
}
|
|
|
|
def join_session(self, session_code: str, user_id: str) -> dict:
|
|
user = self.user_repository.get_user_by_id(user_id)
|
|
session = self.session_redis_repository.find_session_by_code(session_code)
|
|
if session is None:
|
|
return None
|
|
|
|
session_id = session["id"]
|
|
|
|
is_existing_user = any(
|
|
u["id"] == user_id for u in session.get("participants", [])
|
|
)
|
|
|
|
session_quiz = self.quiz_redis_repository.get_quiz_for_session(session["id"])
|
|
|
|
quiz_info = {
|
|
"title": session_quiz.title,
|
|
"description": session_quiz.description,
|
|
"total_quiz": session_quiz.total_quiz,
|
|
"limit_duration": session_quiz.limit_duration,
|
|
}
|
|
|
|
if session["host_id"] == user_id:
|
|
return {
|
|
"session_id": session_id,
|
|
"is_admin": True,
|
|
"message": "admin joined",
|
|
"session_info": session,
|
|
"quiz_info": quiz_info,
|
|
}
|
|
|
|
if is_existing_user:
|
|
return {
|
|
"session_id": session_id,
|
|
"is_admin": False,
|
|
"user_id": str(user.id),
|
|
"username": user.name,
|
|
"user_pic": user.pic_url,
|
|
"session_info": session,
|
|
"quiz_info": quiz_info,
|
|
"new_user": not is_existing_user,
|
|
}
|
|
self.session_redis_repository.add_user_to_session(
|
|
session_id=session["id"],
|
|
user_data={
|
|
"id": str(user.id),
|
|
"username": user.name,
|
|
"user_pic": user.pic_url,
|
|
},
|
|
)
|
|
session = self.session_redis_repository.get_session(session["id"])
|
|
|
|
response = {
|
|
"session_id": session_id,
|
|
"is_admin": False,
|
|
"user_id": str(user.id),
|
|
"username": user.name,
|
|
"user_pic": user.pic_url,
|
|
"session_info": session if not is_existing_user else None,
|
|
"quiz_info": quiz_info,
|
|
"new_user": not is_existing_user,
|
|
}
|
|
return response
|
|
|
|
def leave_session(self, session_id: str, user_id: str) -> dict:
|
|
is_success = self.session_redis_repository.remove_user_from_session(
|
|
session_id, user_id
|
|
)
|
|
|
|
if is_success:
|
|
participant_left = self.session_redis_repository.get_user_in_session(
|
|
session_id
|
|
)
|
|
return {"is_success": True, "participants": participant_left}
|
|
|
|
return {"is_success": False}
|
|
|
|
def run_quiz_flow(self, session_id: str, socketio: SocketIO):
|
|
users = self.session_redis_repository.get_user_in_session(session_id)
|
|
quiz = self.quiz_redis_repository.get_quiz_for_session(session_id)
|
|
self.answer_redis_repository.initialize_empty_answers(
|
|
session_id=session_id,
|
|
user_ids=[u["id"] for u in users if "id" in u],
|
|
total_questions=quiz.total_quiz,
|
|
)
|
|
questions = quiz.question_listings
|
|
start_quiz = DatetimeUtil.now_iso()
|
|
time.sleep(2)
|
|
|
|
for q in questions:
|
|
print(f"\nMengirim pertanyaan {q.index} ke room {session_id}")
|
|
|
|
question_to_send = q.model_dump(exclude={"target_answer"})
|
|
|
|
socketio.emit("quiz_question", question_to_send, room=session_id)
|
|
|
|
time.sleep(q.duration)
|
|
usersNotAnswer = self.answer_redis_repository.auto_fill_incorrect_answers(
|
|
session_id=session_id,
|
|
question_index=q.index,
|
|
default_time_spent=q.duration,
|
|
)
|
|
|
|
for userId in usersNotAnswer:
|
|
self.score_redis_repository.update_user_score(
|
|
session_id=session_id,
|
|
user_id=userId,
|
|
correct=False,
|
|
)
|
|
socketio.emit(
|
|
"score_update",
|
|
{
|
|
"scores": self.get_ranked_scores(session_id),
|
|
},
|
|
room=session_id,
|
|
)
|
|
|
|
socketio.emit("clean_up", room=session_id)
|
|
self.summaryAllSessionData(session_id=session_id, start_time=start_quiz)
|
|
socketio.emit("quiz_done", room=session_id)
|
|
|
|
def submit_answer(
|
|
self,
|
|
session_id: str,
|
|
user_id: str,
|
|
question_index: int,
|
|
answer: Any,
|
|
time_spent: int,
|
|
) -> Dict[str, Any]:
|
|
|
|
quiz = self.quiz_redis_repository.get_quiz_for_session(session_id)
|
|
|
|
question = next(
|
|
(q for q in quiz.question_listings if q.index == question_index),
|
|
None,
|
|
)
|
|
if question is None:
|
|
raise ValueError(
|
|
f"Question {question_index} not found in session {session_id}"
|
|
)
|
|
|
|
is_correct = self._is_correct(question, answer)
|
|
|
|
print(answer)
|
|
self.answer_redis_repository.save_user_answer(
|
|
session_id=session_id,
|
|
user_id=user_id,
|
|
question_index=question_index,
|
|
answer=answer,
|
|
correct=is_correct,
|
|
time_spent=time_spent,
|
|
)
|
|
scores = self.score_redis_repository.update_user_score(
|
|
session_id=session_id,
|
|
user_id=user_id,
|
|
correct=is_correct,
|
|
)
|
|
|
|
return {
|
|
"user_id": user_id,
|
|
"question_index": question_index,
|
|
"answer": answer,
|
|
"correct": is_correct,
|
|
"scores": scores,
|
|
}
|
|
|
|
def get_ranked_scores(self, session_id: str):
|
|
raw = self.score_redis_repository.get_scores(session_id)
|
|
ranked = [
|
|
{
|
|
"user_id": uid,
|
|
"correct": v.get("correct", 0),
|
|
"incorrect": v.get("incorrect", 0),
|
|
"total_score": v.get("correct", 0) * 10,
|
|
}
|
|
for uid, v in raw.items()
|
|
]
|
|
ranked.sort(key=lambda x: x["total_score"], reverse=True)
|
|
return ranked
|
|
|
|
def _is_correct(self, q, ans) -> bool:
|
|
result = False
|
|
|
|
if q.type == "true_false":
|
|
result = str(ans).strip().lower() == str(q.target_answer).strip().lower()
|
|
|
|
elif q.type in ["multiple_choice", "option"]:
|
|
try:
|
|
result = int(ans) == int(q.target_answer)
|
|
except (ValueError, TypeError):
|
|
result = False
|
|
|
|
elif q.type == "fill_the_blank":
|
|
result = str(q.target_answer).strip().lower() in str(ans).strip().lower()
|
|
|
|
else:
|
|
result = False
|
|
|
|
# Print informasi evaluasi
|
|
print(f"Tipe Soal: {q.type}")
|
|
print(f"Jawaban User: {ans}")
|
|
print(f"Jawaban Benar: {q.target_answer}")
|
|
print(f"Hasil: {'Benar' if result else 'Salah'}\n")
|
|
|
|
return result
|
|
|
|
def summaryAllSessionData(self, session_id: str, start_time):
|
|
session = self.session_redis_repository.get_session(session_id=session_id)
|
|
now = DatetimeUtil.now_iso()
|
|
session["id"] = ObjectId(session["id"])
|
|
session["participants"] = [
|
|
{"id": user["id"], "joined_at": user["joined_at"]}
|
|
for user in session["participants"]
|
|
]
|
|
session["created_at"] = DatetimeUtil.from_iso(session["created_at"])
|
|
session["started_at"] = DatetimeUtil.from_iso(start_time)
|
|
session["ended_at"] = DatetimeUtil.from_iso(now)
|
|
|
|
newData = SessionEntity(**session)
|
|
newData.is_active = False
|
|
|
|
answers = self.answer_redis_repository.get_all_user_answers(
|
|
session_id=session_id
|
|
)
|
|
|
|
quiz = self.quiz_repository.get_by_id(newData.quiz_id)
|
|
self.quiz_repository.update_user_playing(
|
|
quiz_id=quiz.id, total_user=quiz.total_user_playing + len(answers)
|
|
)
|
|
|
|
self.session_mongo_repository.update(
|
|
session_id=session_id,
|
|
update_fields=newData,
|
|
)
|
|
|
|
for key, value_list in answers.items():
|
|
answer_items = []
|
|
total_correct = 0
|
|
|
|
for item in sorted(value_list, key=lambda x: x["question_index"]):
|
|
is_correct = item["is_true"]
|
|
if is_correct:
|
|
total_correct += 1
|
|
|
|
answer_item = AnswerItemEntity(
|
|
question_index=item["question_index"],
|
|
answer=item["answer"],
|
|
is_correct=is_correct,
|
|
time_spent=item["time_spent"],
|
|
)
|
|
answer_items.append(answer_item)
|
|
|
|
total_questions = len(value_list)
|
|
total_score = (
|
|
(total_correct / total_questions) * 100 if total_questions > 0 else 0.0
|
|
)
|
|
|
|
userAnswer = UserAnswerEntity(
|
|
user_id=key,
|
|
quiz_id=str(quiz.id),
|
|
session_id=session_id,
|
|
total_correct=total_correct,
|
|
total_score=round(total_score, 2),
|
|
answers=answer_items,
|
|
answered_at=newData.started_at,
|
|
)
|
|
|
|
self.answer_repository.create(userAnswer)
|
|
|
|
self.session_redis_repository.delete_session(session_id=session_id)
|
|
self.quiz_redis_repository.delete_quiz_for_session(session_id=session_id)
|
|
self.answer_redis_repository.delete_all_answers(session_id=session_id)
|
|
self.score_redis_repository.delete_scores(session_id=session_id)
|