from typing import Optional from uuid import uuid4 from app.repositories import ( SessionRepository, UserRepository, SessionMemoryRepository, QuizRepository, UserAnswerRepository, ) from app.models.entities import SessionEntity from app.helpers import DatetimeUtil class SessionService: def __init__( self, repository_mongo: SessionRepository, repository_redis: SessionMemoryRepository, user_repository: UserRepository, quiz_repository: QuizRepository, answer_repository: UserAnswerRepository, ): self.repository_mongo = repository_mongo self.repository_redis = repository_redis self.user_repository = user_repository self.quiz_repository = quiz_repository self.answer_repository = answer_repository def create_session(self, quiz_id: str, host_id: str, limit_participan: int) -> str: generateed_code = uuid4().hex[:6].upper() session = SessionEntity( session_code=generateed_code, quiz_id=quiz_id, host_id=host_id, created_at=DatetimeUtil.now_iso(), limit_participan=limit_participan, participants=[], current_question_index=0, is_active=True, ) session_id = self.repository_mongo.insert(session) session.id = session_id self.repository_redis.create_session(session_id, session) data = self.quiz_repository.get_by_id(quiz_id=quiz_id) self.repository_redis.set_quiz_for_session(session_id, data) return { "session_id": session_id, "session_code": generateed_code, } def join_session(self, session_code: str, user_id: str) -> dict: user = self.user_repository.get_user_by_id(user_id) session = self.repository_redis.find_session_by_code(session_code) if session is None: return None is_existing_user = any( u["id"] == user_id for u in session.get("participants", []) ) session_quiz = self.repository_redis.get_quiz_for_session(session["id"]) quiz_info = { "title": session_quiz.title, "limit_duration": session_quiz.title, } if session["host_id"] == user_id: return { "is_admin": True, "message": "admin joined", "session_info": session, "quiz_info": quiz_info, } if is_existing_user: return { "is_admin": False, "user_id": str(user.id), "username": user.name, "user_pic": user.pic_url, "session_info": session, "quiz_info": quiz_info, "new_user": not is_existing_user, } self.repository_redis.add_user_to_session( session_id=session["id"], user_data={ "id": str(user.id), "username": user.name, "user_pic": user.pic_url, }, ) session = self.repository_redis.get_session(session["id"]) response = { "is_admin": False, "user_id": str(user.id), "username": user.name, "user_pic": user.pic_url, "session_info": session if not is_existing_user else None, "quiz_info": quiz_info, "new_user": not is_existing_user, } return response def leave_session(self, session_id: str, user_id: str) -> dict: session = self.repository_mongo.get_by_id(session_id) if session is None: return {"error": "Session not found"} if user_id == session.host_id: return {"message": "Host cannot leave the session"} if user_id in session.participants: session.participants.remove(user_id) self.repository.update(session.id, {"participants": session.participants}) return {"message": "User has left the session"} return {"message": "User not in session"} def start_session(self, session_id: str) -> bool: now = DatetimeUtil.now_iso() return self.repository.update(session_id, {"started_at": now}) def end_session(self, session_id: str, user_id: str): session = self.repository.find_by_session_id(session_id) if session and session.host_id == user_id: session.is_active = False self.repository.update(session_id, {"is_active": False}) def advance_question(self, session_id: str) -> Optional[int]: session = self.repository.find_by_session_id(session_id) if not session: return None current_index = session.get("current_question_index", 0) total = session.get("total_questions", 0) if current_index + 1 >= total: self.end_session(session_id) return None new_index = current_index + 1 self.repository.update(session_id, {"current_question_index": new_index}) return new_index def get_session(self, session_id: str) -> Optional[SessionEntity]: session = self.repository.find_by_session_id(session_id) return SessionEntity(**session) if session else None