77 lines
2.7 KiB
Python
77 lines
2.7 KiB
Python
from datetime import datetime
|
|
from typing import Optional
|
|
from uuid import uuid4
|
|
from repositories import SessionRepository, UserRepository
|
|
from models.entities import SessionEntity
|
|
from helpers import DatetimeUtil
|
|
|
|
|
|
class SessionService:
|
|
def __init__(self, repository: SessionRepository, user_repository: UserRepository):
|
|
self.repository = repository
|
|
self.user_repository = user_repository
|
|
|
|
def create_session(
|
|
self, quiz_id: str, host_id: str, limit_participan: int
|
|
) -> SessionEntity:
|
|
session = SessionEntity(
|
|
session_code=uuid4().hex[:6].upper(),
|
|
quiz_id=quiz_id,
|
|
host_id=host_id,
|
|
created_at=DatetimeUtil.now_iso(),
|
|
limit_participan=limit_participan,
|
|
participants=[],
|
|
current_question_index=0,
|
|
is_active=True,
|
|
)
|
|
self.repository.insert(session)
|
|
return session
|
|
|
|
def join_session(self, session_code: str, user_id: str) -> Optional[SessionEntity]:
|
|
user = self.user_repository.get_user_by_id(user_id)
|
|
session = self.repository.find_by_session_code(session_code=session_code)
|
|
|
|
if session is None or not session["is_active"]:
|
|
return None
|
|
|
|
if user_id not in session["participants"]:
|
|
session["participants"].append(user_id)
|
|
self.repository.update(
|
|
session.id, {"participants": session["participants"]}
|
|
)
|
|
response = {
|
|
"user_id": user.id,
|
|
"username": user.id,
|
|
"user_pic": user.pic_url,
|
|
"session_id": session.id,
|
|
}
|
|
return response
|
|
|
|
def start_session(self, session_id: str) -> bool:
|
|
now = DatetimeUtil.now_iso()
|
|
return self.repository.update(session_id, {"started_at": now})
|
|
|
|
def end_session(self, session_id: str) -> bool:
|
|
now = DatetimeUtil.now_iso()
|
|
return self.repository.update(session_id, {"ended_at": now, "is_active": False})
|
|
|
|
def advance_question(self, session_id: str) -> Optional[int]:
|
|
session = self.repository.find_by_session_id(session_id)
|
|
if not session:
|
|
return None
|
|
|
|
current_index = session.get("current_question_index", 0)
|
|
total = session.get("total_questions", 0)
|
|
|
|
if current_index + 1 >= total:
|
|
self.end_session(session_id)
|
|
return None
|
|
|
|
new_index = current_index + 1
|
|
self.repository.update(session_id, {"current_question_index": new_index})
|
|
return new_index
|
|
|
|
def get_session(self, session_id: str) -> Optional[SessionEntity]:
|
|
session = self.repository.find_by_session_id(session_id)
|
|
return SessionEntity(**session) if session else None
|