from typing import Any, Dict from uuid import uuid4 from app.repositories import ( SessionRepository, UserRepository, SessionMemoryRepository, QuizRepository, UserAnswerRepository, QuizMemoryRepository, AnswerMemoryRepository, ScoreMemoryRepository, ) from app.models.entities import SessionEntity, UserAnswerEntity, AnswerItemEntity from app.helpers import DatetimeUtil from flask_socketio import SocketIO import time from bson import ObjectId class SessionService: def __init__( self, session_mongo_repository: SessionRepository, session_redis_repository: SessionMemoryRepository, quiz_redis_repository: QuizMemoryRepository, answer_redis_repository: AnswerMemoryRepository, score_redis_repostory: ScoreMemoryRepository, user_repository: UserRepository, quiz_repository: QuizRepository, answer_repository: UserAnswerRepository, ): self.session_mongo_repository = session_mongo_repository self.session_redis_repository = session_redis_repository self.quiz_redis_repository = quiz_redis_repository self.answer_redis_repository = answer_redis_repository self.score_redis_repository = score_redis_repostory self.user_repository = user_repository self.quiz_repository = quiz_repository self.answer_repository = answer_repository def create_session(self, quiz_id: str, host_id: str, limit_participan: int) -> str: generateed_code = uuid4().hex[:6].upper() session = SessionEntity( session_code=generateed_code, quiz_id=quiz_id, host_id=host_id, created_at=DatetimeUtil.now_iso(), limit_participan=limit_participan, participants=[], current_question_index=0, is_active=True, ) session_id = self.session_mongo_repository.insert(session) session.id = session_id self.session_redis_repository.create_session(session_id, session) data = self.quiz_repository.get_by_id(quiz_id=quiz_id) self.quiz_redis_repository.set_quiz_for_session(session_id, data) return { "session_id": session_id, "session_code": generateed_code, } def join_session(self, session_code: str, user_id: str) -> dict: user = self.user_repository.get_user_by_id(user_id) session = self.session_redis_repository.find_session_by_code(session_code) if session is None: return None session_id = session["id"] is_existing_user = any( u["id"] == user_id for u in session.get("participants", []) ) session_quiz = self.quiz_redis_repository.get_quiz_for_session(session["id"]) quiz_info = { "title": session_quiz.title, "description": session_quiz.description, "total_quiz": session_quiz.total_quiz, "limit_duration": session_quiz.limit_duration, } if session["host_id"] == user_id: return { "session_id": session_id, "is_admin": True, "message": "admin joined", "session_info": session, "quiz_info": quiz_info, } if is_existing_user: return { "session_id": session_id, "is_admin": False, "user_id": str(user.id), "username": user.name, "user_pic": user.pic_url, "session_info": session, "quiz_info": quiz_info, "new_user": not is_existing_user, } self.session_redis_repository.add_user_to_session( session_id=session["id"], user_data={ "id": str(user.id), "username": user.name, "user_pic": user.pic_url, }, ) session = self.session_redis_repository.get_session(session["id"]) response = { "session_id": session_id, "is_admin": False, "user_id": str(user.id), "username": user.name, "user_pic": user.pic_url, "session_info": session if not is_existing_user else None, "quiz_info": quiz_info, "new_user": not is_existing_user, } return response def leave_session(self, session_id: str, user_id: str) -> dict: is_success = self.session_redis_repository.remove_user_from_session( session_id, user_id ) if is_success: participant_left = self.session_redis_repository.get_user_in_session( session_id ) return {"is_success": True, "participants": participant_left} return {"is_success": False} def run_quiz_flow(self, session_id: str, socketio: SocketIO): users = self.session_redis_repository.get_user_in_session(session_id) quiz = self.quiz_redis_repository.get_quiz_for_session(session_id) self.answer_redis_repository.initialize_empty_answers( session_id=session_id, user_ids=[u["id"] for u in users if "id" in u], total_questions=quiz.total_quiz, ) questions = quiz.question_listings start_quiz = DatetimeUtil.now_iso() time.sleep(2) for q in questions: print(f"\nMengirim pertanyaan {q.index} ke room {session_id}") question_to_send = q.model_dump(exclude={"target_answer"}) socketio.emit("quiz_question", question_to_send, room=session_id) time.sleep(q.duration) usersNotAnswer = self.answer_redis_repository.auto_fill_incorrect_answers( session_id=session_id, question_index=q.index, default_time_spent=q.duration, ) for userId in usersNotAnswer: self.score_redis_repository.update_user_score( session_id=session_id, user_id=userId, correct=False, ) socketio.emit( "score_update", { "scores": self.get_ranked_scores(session_id), }, room=session_id, ) socketio.emit("clean_up", room=session_id) self.summaryAllSessionData(session_id=session_id, start_time=start_quiz) socketio.emit("quiz_done", room=session_id) def submit_answer( self, session_id: str, user_id: str, question_index: int, answer: Any, time_spent: int, ) -> Dict[str, Any]: quiz = self.quiz_redis_repository.get_quiz_for_session(session_id) question = next( (q for q in quiz.question_listings if q.index == question_index), None, ) if question is None: raise ValueError( f"Question {question_index} not found in session {session_id}" ) is_correct = self._is_correct(question, answer) self.answer_redis_repository.save_user_answer( session_id=session_id, user_id=user_id, question_index=question_index, answer=answer, correct=is_correct, time_spent=time_spent, ) scores = self.score_redis_repository.update_user_score( session_id=session_id, user_id=user_id, correct=is_correct, ) return { "user_id": user_id, "question_index": question_index, "answer": answer, "correct": is_correct, "scores": scores, } def get_ranked_scores(self, session_id: str): raw = self.score_redis_repository.get_scores(session_id) ranked = [ { "user_id": uid, "correct": v.get("correct", 0), "incorrect": v.get("incorrect", 0), "total_score": v.get("correct", 0) * 10, } for uid, v in raw.items() ] ranked.sort(key=lambda x: x["total_score"], reverse=True) return ranked def _is_correct(self, q, ans) -> bool: if q.type in {"multiple_choice", "true_false"}: return str(ans).strip().lower() == str(q.target_answer).strip().lower() if q.type == "essay": return str(q.target_answer).lower() in str(ans).lower() return False def summaryAllSessionData(self, session_id: str, start_time): session = self.session_redis_repository.get_session(session_id=session_id) now = DatetimeUtil.now_iso() session["id"] = ObjectId(session["id"]) session["participants"] = [ {"id": user["id"], "joined_at": user["joined_at"]} for user in session["participants"] ] session["created_at"] = DatetimeUtil.from_iso(session["created_at"]) session["started_at"] = DatetimeUtil.from_iso(start_time) session["ended_at"] = DatetimeUtil.from_iso(now) newData = SessionEntity(**session) newData.is_active = False answers = self.answer_redis_repository.get_all_user_answers( session_id=session_id ) quiz = self.quiz_repository.get_by_id(newData.quiz_id) self.quiz_repository.update_user_playing( quiz_id=quiz.id, total_user=quiz.total_user_playing + len(answers) ) self.session_mongo_repository.update( session_id=session_id, update_fields=newData, ) for key, value_list in answers.items(): answer_items = [] total_correct = 0 for item in sorted(value_list, key=lambda x: x["question_index"]): is_correct = item["is_true"] if is_correct: total_correct += 1 answer_item = AnswerItemEntity( question_index=item["question_index"], answer=item["answer"], is_correct=is_correct, time_spent=item["time_spent"], ) answer_items.append(answer_item) total_questions = len(value_list) total_score = ( (total_correct / total_questions) * 100 if total_questions > 0 else 0.0 ) userAnswer = UserAnswerEntity( user_id=key, quiz_id=str(quiz.id), session_id=session_id, total_correct=total_correct, total_score=round(total_score, 2), answers=answer_items, answered_at=newData.started_at, ) self.answer_repository.create(userAnswer)