Merge pull request #4 from Akhdanre/feat/quiz

Feat/quiz
This commit is contained in:
Akhdan Robbani 2025-05-21 16:03:21 +07:00 committed by GitHub
commit ff58f79bc3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
126 changed files with 4880 additions and 250 deletions

View File

@ -1,6 +1,7 @@
# Existing Configurations
MONGO_URI= MONGO_URI=
FLASK_ENV= FLASK_ENV=development
DEBUG= DEBUG=True
SECRET_KEY= SECRET_KEY=
@ -10,3 +11,9 @@ GOOGLE_CLIENT_SECRET=
GOOGLE_AUHT_URI= GOOGLE_AUHT_URI=
GOOGLE_TOKEN_URI= GOOGLE_TOKEN_URI=
GOOGLE_AUTH_PROVIDER_X509_CERT_URL= GOOGLE_AUTH_PROVIDER_X509_CERT_URL=
# Redis Configuration
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASSWORD=

Binary file not shown.

Binary file not shown.

2
app/__init__.py Normal file
View File

@ -0,0 +1,2 @@
# from flask import Flask
from app.main import createApp

View File

@ -3,12 +3,18 @@ from .default import default_blueprint
from .auth import auth_blueprint from .auth import auth_blueprint
from .user import user_blueprint from .user import user_blueprint
from .quiz import quiz_bp from .quiz import quiz_bp
from .history import history_blueprint
from .subject import subject_blueprint
from .session import session_bp
__all__ = [ __all__ = [
"default_blueprint", "default_blueprint",
"auth_blueprint", "auth_blueprint",
"user_blueprint", "user_blueprint",
"quiz_bp", "quiz_bp",
"history_blueprint",
"subject_blueprint",
"session_bp",
] ]

View File

@ -1,6 +1,6 @@
from flask import Blueprint from flask import Blueprint
from controllers import AuthController from app.controllers import AuthController
from di_container import Container from app.di_container import Container
from dependency_injector.wiring import inject, Provide from dependency_injector.wiring import inject, Provide

31
app/blueprints/history.py Normal file
View File

@ -0,0 +1,31 @@
from flask import Blueprint
from app.controllers import HistoryController
from app.di_container import Container
from dependency_injector.wiring import inject, Provide
history_blueprint = Blueprint("history", __name__)
@history_blueprint.route("/<user_id>", methods=["GET"])
@inject
def user_history(
user_id: str, controller: HistoryController = Provide[Container.history_controller]
):
return controller.get_quiz_by_user(user_id)
@history_blueprint.route("/detail/<answer_id>", methods=["GET"])
@inject
def user_detail_history(
answer_id, controller: HistoryController = Provide[Container.history_controller]
):
return controller.get_detail_quiz_history(answer_id)
@history_blueprint.route("/session/<session_id>", methods=["GET"])
@inject
def session_history(
session_id: str,
controller: HistoryController = Provide[Container.history_controller],
):
return controller.get_session_history(session_id)

View File

@ -1,7 +1,7 @@
from flask import Blueprint, request from flask import Blueprint, request
from di_container import Container from app.di_container import Container
from dependency_injector.wiring import inject, Provide from dependency_injector.wiring import inject, Provide
from controllers import QuizController from app.controllers import QuizController
quiz_bp = Blueprint("quiz", __name__) quiz_bp = Blueprint("quiz", __name__)
@ -14,6 +14,14 @@ def create_quiz(controller: QuizController = Provide[Container.quiz_controller])
return controller.create_quiz(reqBody) return controller.create_quiz(reqBody)
@quiz_bp.route("/ai", methods=["POST"])
@inject
def create_quiz_auto(controller: QuizController = Provide[Container.quiz_controller]):
reqBody = request.get_json()
return controller.create_quiz_auto(reqBody)
@quiz_bp.route("/<quiz_id>", methods=["GET"]) @quiz_bp.route("/<quiz_id>", methods=["GET"])
@inject @inject
def get_quiz( def get_quiz(
@ -29,6 +37,13 @@ def submit_answer(controller: QuizController = Provide[Container.quiz_controller
return controller.submit_answer(req_body) return controller.submit_answer(req_body)
@quiz_bp.route("/answer/session", methods=["POST"])
@inject
def get_answer_session(controller: QuizController = Provide[Container.quiz_controller]):
req_body = request.get_json()
return controller.get_user_ans_session(req_body)
@quiz_bp.route("/answer", methods=["GET"]) @quiz_bp.route("/answer", methods=["GET"])
@inject @inject
def get_answer(controller: QuizController = Provide[Container.quiz_controller]): def get_answer(controller: QuizController = Provide[Container.quiz_controller]):
@ -46,14 +61,29 @@ def get_answer(controller: QuizController = Provide[Container.quiz_controller]):
def get_quiz_recommendation( def get_quiz_recommendation(
controller: QuizController = Provide[Container.quiz_controller], controller: QuizController = Provide[Container.quiz_controller],
): ):
return controller.get_quiz_recommendation() page = request.args.get("page")
limit = request.args.get("limit")
return controller.get_quiz_recommendation(page=page, limit=limit)
@quiz_bp.route("/user/<user_id>", methods=["GET"]) @quiz_bp.route("/user/<user_id>", methods=["GET"])
@inject @inject
def get_user_quiz(controller: QuizController = Provide[Container.quiz_controller]): def get_user_quiz(
user_id: str, controller: QuizController = Provide[Container.quiz_controller]
):
page = request.args.get("page", default=1, type=int) page = request.args.get("page", default=1, type=int)
page_size = request.args.get("page_size", default=10, type=int) page_size = request.args.get("page_size", default=10, type=int)
return controller.get_user_quiz( return controller.get_user_quiz(user_id=user_id, page=page, page_size=page_size)
user_id=request.view_args["user_id"], page=page, page_size=page_size
@quiz_bp.route("/search", methods=["GET"])
@inject
def search_quiz(controller: QuizController = Provide[Container.quiz_controller]):
keyword = request.args.get("keyword", "")
subject_id = request.args.get("subject_id")
page = int(request.args.get("page", 1))
limit = int(request.args.get("limit", 10))
return controller.search_quiz(
keyword=keyword, subject_id=subject_id, page=page, limit=limit
) )

18
app/blueprints/session.py Normal file
View File

@ -0,0 +1,18 @@
from flask import Blueprint, request
from dependency_injector.wiring import inject, Provide
from app.di_container import Container
from app.controllers import SessionController
session_bp = Blueprint("session", __name__)
@session_bp.route("", methods=["POST"])
@inject
def sessionGet(controller: SessionController = Provide[Container.session_controller]):
return controller.createRoom(request.get_json())
@session_bp.route("/summary", methods=["POST"])
@inject
def summary(controller: SessionController = Provide[Container.session_controller]):
return controller.summaryall(request.get_json())

50
app/blueprints/subject.py Normal file
View File

@ -0,0 +1,50 @@
from flask import Blueprint, request
from dependency_injector.wiring import inject, Provide
from app.di_container import Container
from app.controllers import SubjectController
subject_blueprint = Blueprint("subject", __name__)
@subject_blueprint.route("", methods=["POST"])
@inject
def create_subject(
controller: SubjectController = Provide[Container.subject_controller],
):
return controller.create(request.get_json())
@subject_blueprint.route("", methods=["GET"])
@inject
def get_all_subjects(
controller: SubjectController = Provide[Container.subject_controller],
):
return controller.get_all()
@subject_blueprint.route("/<subject_id>", methods=["GET"])
@inject
def get_subject(
subject_id: str,
controller: SubjectController = Provide[Container.subject_controller],
):
return controller.get_by_id(subject_id)
@subject_blueprint.route("/<subject_id>", methods=["PUT"])
@inject
def update_subject(
subject_id: str,
controller: SubjectController = Provide[Container.subject_controller],
):
return controller.update(subject_id, request.get_json())
@subject_blueprint.route("/<subject_id>", methods=["DELETE"])
@inject
def delete_subject(
subject_id: str,
controller: SubjectController = Provide[Container.subject_controller],
):
return controller.delete(subject_id)

View File

@ -1,6 +1,6 @@
from flask import Blueprint from flask import Blueprint
from controllers import UserController from app.di_container import Container
from di_container import Container from app.controllers import UserController
from dependency_injector.wiring import inject, Provide from dependency_injector.wiring import inject, Provide
user_blueprint = Blueprint("user", __name__) user_blueprint = Blueprint("user", __name__)
@ -16,3 +16,25 @@ def get_users(user_controller: UserController = Provide[Container.user_controlle
@inject @inject
def register(user_controller: UserController = Provide[Container.user_controller]): def register(user_controller: UserController = Provide[Container.user_controller]):
return user_controller.register() return user_controller.register()
@user_blueprint.route("/user/update", methods=["POST"])
@inject
def update_user(user_controller: UserController = Provide[Container.user_controller]):
return user_controller.update_profile()
@user_blueprint.route("/user/change-password", methods=["POST"])
@inject
def change_password(
user_controller: UserController = Provide[Container.user_controller],
):
return user_controller.change_password()
@user_blueprint.route("/user/<string:user_id>", methods=["GET"])
@inject
def get_user(
user_id, user_controller: UserController = Provide[Container.user_controller]
):
return user_controller.get_user_by_id(user_id)

View File

@ -1,17 +1,20 @@
from dotenv import load_dotenv from dotenv import load_dotenv
import os import os
# Load variabel dari file .env # Load variables from .env
load_dotenv(override=True) load_dotenv(override=True)
class Config: class Config:
# Flask Environment Settings
FLASK_ENV = os.getenv("FLASK_ENV", "development") FLASK_ENV = os.getenv("FLASK_ENV", "development")
DEBUG = os.getenv("DEBUG", "False").lower() in ("true", "1", "t") DEBUG = os.getenv("DEBUG", "False").lower() in ("true", "1", "t")
SECRET_KEY = os.getenv("SECRET_KEY", "your_secret_key") SECRET_KEY = os.getenv("SECRET_KEY", "your_secret_key")
# MongoDB Settings
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/yourdb") MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/yourdb")
# Google OAuth Settings
GOOGLE_PROJECT_ID = os.getenv("GOOGLE_PROJECT_ID") GOOGLE_PROJECT_ID = os.getenv("GOOGLE_PROJECT_ID")
GOOGLE_CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID") GOOGLE_CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID")
GOOGLE_CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET") GOOGLE_CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET")
@ -22,6 +25,17 @@ class Config:
"GOOGLE_TOKEN_URI", "https://oauth2.googleapis.com/token" "GOOGLE_TOKEN_URI", "https://oauth2.googleapis.com/token"
) )
GOOGLE_AUTH_PROVIDER_X509_CERT_URL = os.getenv("GOOGLE_AUTH_PROVIDER_X509_CERT_URL") GOOGLE_AUTH_PROVIDER_X509_CERT_URL = os.getenv("GOOGLE_AUTH_PROVIDER_X509_CERT_URL")
GOOGLE_SCOPE = "email profile" GOOGLE_SCOPE = "email profile"
GOOGLE_BASE_URL = "https://www.googleapis.com/oauth2/v1/" GOOGLE_BASE_URL = "https://www.googleapis.com/oauth2/v1/"
# Redis Configuration
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_DB = int(os.getenv("REDIS_DB", 0))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", None)
@property
def REDIS_URL(self):
if self.REDIS_PASSWORD:
return f"redis://:{self.REDIS_PASSWORD}@{self.REDIS_HOST}:{self.REDIS_PORT}/{self.REDIS_DB}"
return f"redis://{self.REDIS_HOST}:{self.REDIS_PORT}/{self.REDIS_DB}"

View File

@ -1,10 +1,17 @@
from .auth_controller import AuthController from .auth_controller import AuthController
from .user_controller import UserController from .user_controller import UserController
from .quiz_controller import QuizController from .quiz_controller import QuizController
from .history_controller import HistoryController
from .subject_controller import SubjectController
from .socket_conroller import SocketController
from .session_controller import SessionController
__all__ = [ __all__ = [
"AuthController", "AuthController",
"UserController", "UserController",
"QuizController", "QuizController",
"HistoryController",
"SubjectController",
"SocketController",
"SessionController",
] ]

View File

@ -1,13 +1,12 @@
from flask import jsonify, request, current_app from flask import jsonify, request, current_app
from pydantic import ValidationError from pydantic import ValidationError
from models.login.login_response import UserResponseModel from app.schemas.basic_response_schema import ResponseSchema
from schemas.basic_response_schema import ResponseSchema from app.schemas.google_login_schema import GoogleLoginSchema
from schemas.google_login_schema import GoogleLoginSchema from app.schemas import LoginSchema
from schemas import LoginSchema from app.services import UserService, AuthService
from services import UserService, AuthService from app.exception import AuthException
from exception import AuthException from app.mapper import UserMapper
from mapper import UserMapper from app.helpers import make_response, make_error_response
from helpers import make_response
import logging import logging
logging = logging.getLogger(__name__) logging = logging.getLogger(__name__)
@ -26,15 +25,7 @@ class AuthController:
if response is None: if response is None:
return make_response(message="User is not registered", status_code=401) return make_response(message="User is not registered", status_code=401)
return ( return make_response(message="Login success", data=response)
jsonify(
ResponseSchema(
message="Register success",
data=UserMapper.user_entity_to_response(response),
).model_dump()
),
200,
)
except ValidationError as e: except ValidationError as e:
current_app.logger.error(f"Validation error: {e}") current_app.logger.error(f"Validation error: {e}")
response = ResponseSchema(message="Invalid input", data=None, meta=None) response = ResponseSchema(message="Invalid input", data=None, meta=None)
@ -54,25 +45,14 @@ class AuthController:
try: try:
data = request.get_json() data = request.get_json()
# Validasi data dengan Pydantic
validated_data = GoogleLoginSchema(**data) validated_data = GoogleLoginSchema(**data)
id_token = validated_data.token_id id_token = validated_data.token_id
# Verifikasi ID Token ke layanan AuthService
user_info = self.auth_service.verify_google_id_token(id_token) user_info = self.auth_service.verify_google_id_token(id_token)
if not user_info: if not user_info:
current_app.logger.error("Invalid Google ID Token") return make_response(message="Invalid Google ID Token", data=user_info)
response = ResponseSchema(
message="Invalid Google ID Token", data=None, meta=None
)
return jsonify(response.model_dump()), 401
response = ResponseSchema( return make_response(message="Login Success", data=user_info)
message="Login successful",
data=UserMapper.user_entity_to_response(user_info),
meta=None,
)
return jsonify(response.model_dump()), 200
except ValidationError as e: except ValidationError as e:
current_app.logger.error(f"Validation error: {e}") current_app.logger.error(f"Validation error: {e}")

View File

@ -0,0 +1,32 @@
from app.services import HistoryService
from app.helpers import make_error_response, make_response
class HistoryController:
def __init__(self, history_service: HistoryService):
self.history_service = history_service
def get_quiz_by_user(self, user_id: str):
try:
data = self.history_service.get_history_by_user_id(user_id)
return make_response(message="retrive history data", data=data)
except Exception as e:
return make_error_response(e)
def get_detail_quiz_history(self, answer_id: str):
try:
data = self.history_service.get_history_by_answer_id(answer_id)
return make_response(
message="success retrive detail history data", data=data
)
except Exception as e:
return make_error_response(e)
def get_session_history(self, session_id):
try:
result = self.history_service.get_session_history(session_id)
return make_response(message="success get history session", data=result)
except Exception as e:
return make_error_response(e)

View File

@ -1,22 +1,31 @@
from flask import jsonify import json
from pydantic import ValidationError from pydantic import ValidationError
from schemas.requests import QuizCreateSchema, UserAnswerSchema from app.schemas.requests import QuizCreateSchema, UserAnswerSchema
from schemas.response import QuizCreationResponse from app.schemas.response import QuizCreationResponse
from services import QuizService, AnswerService from app.services import QuizService, AnswerService, QuestionGenerationService
from helpers import make_response, make_error_response from app.helpers import make_response, make_error_response
from app.exception import ValidationException, DataNotFoundException
class QuizController: class QuizController:
def __init__(self, quiz_service: QuizService, answer_service: AnswerService): def __init__(
self,
quiz_service: QuizService,
answer_service: AnswerService,
question_generate_service: QuestionGenerationService,
):
self.quiz_service = quiz_service self.quiz_service = quiz_service
self.answer_service = answer_service self.answer_service = answer_service
self.question_generate_service = question_generate_service
def get_quiz(self, quiz_id): def get_quiz(self, quiz_id):
try: try:
result = self.quiz_service.get_quiz(quiz_id) result = self.quiz_service.get_quiz(quiz_id)
if not result: if not result:
return make_response(message="Quiz not found", status_code=404) return make_response(message="Quiz not found", status_code=404)
return make_response(message="Quiz Found", data=result.dict()) return make_response(message="Quiz Found", data=result.model_dump())
except DataNotFoundException as e:
return make_response(message=e.message, status_code=e.status_code)
except Exception as e: except Exception as e:
return make_error_response(e) return make_error_response(e)
@ -29,41 +38,22 @@ class QuizController:
data=QuizCreationResponse(quiz_id=quiz_id), data=QuizCreationResponse(quiz_id=quiz_id),
status_code=201, status_code=201,
) )
except ValidationError as e: except (ValidationError, ValidationException) as e:
return make_response(e.errors(), status_code=400) return make_response(message="", status_code=400)
except Exception as e: except Exception as e:
return make_error_response(e) return make_error_response(e)
# def update_quiz(self, quiz_id, quiz_data):
# try:
# quiz_obj = QuizUpdateSchema(**quiz_data)
# success = self.quiz_service.update_quiz(
# quiz_id, quiz_obj.dict(exclude_unset=True)
# )
# if not success:
# return jsonify({"error": "Quiz not found or update failed"}), 400
# return jsonify({"message": "Quiz updated"}), 200
# except ValidationError as e:
# return jsonify({"error": "Validation error", "detail": e.errors()}), 400
def delete_quiz(self, quiz_id):
success = self.quiz_service.delete_quiz(quiz_id)
if not success:
return jsonify({"error": "Quiz not found"}), 400
return jsonify({"message": "Quiz deleted"}), 200
def quiz_recomendation(self): def quiz_recomendation(self):
try: try:
result = self.quiz_service.get_quiz_recommendation() result = self.quiz_service.get_quiz_recommendation()
if not result: if not result:
return make_response(message="Quiz not found", status_code=404) return make_response(message="Quiz not found", status_code=404)
return make_response(message="Quiz Found", data=result.dict()) return make_response(message="Quiz Found", data=result.model_dump())
except Exception as e: except Exception as e:
return make_error_response(e) return make_error_response(e)
def submit_answer(self, answer_data): def submit_answer(self, answer_data):
try: try:
# Assuming answer_data is a dictionary with the necessary fields
answer_obj = UserAnswerSchema(**answer_data) answer_obj = UserAnswerSchema(**answer_data)
answer_id = self.answer_service.create_answer(answer_obj) answer_id = self.answer_service.create_answer(answer_obj)
return make_response( return make_response(
@ -72,14 +62,20 @@ class QuizController:
status_code=201, status_code=201,
) )
except ValidationError as e: except ValidationError as e:
return make_response(e.errors(), status_code=400) return make_response(
message="validation error", data=json.loads(e.json()), status_code=400
)
except ValidationException as e:
return make_response(
message=f"validation issue {e.message}", status_code=400
)
except Exception as e: except Exception as e:
return make_error_response(e) return make_error_response(e)
def get_answer(self, quiz_id, user_id, session_id): def get_answer(self, quiz_id, user_id, session_id):
try: try:
# self.answer_service. # self.answer_service.
print("yps") pass
except Exception as e: except Exception as e:
return make_error_response(e) return make_error_response(e)
@ -89,7 +85,79 @@ class QuizController:
user_id=user_id, page=page, page_size=page_size user_id=user_id, page=page, page_size=page_size
) )
return make_response( return make_response(
message="User quizzes retrieved successfully", data=result message="User quizzes retrieved successfully",
data=result.quizzes,
page=page,
page_size=page_size,
total_all_data=result.total,
) )
except Exception as e: except Exception as e:
return make_error_response(e) return make_error_response(e)
def get_quiz_recommendation(self, page, limit):
try:
page = int(page) if page is not None else 1
limit = int(limit) if limit is not None else 3
result = self.quiz_service.get_quiz_recommendation(page=page, limit=limit)
return make_response(
message="success retrieve recommendation quiz", data=result
)
except DataNotFoundException as e:
return make_response(message=e.message, status_code=e.status_code)
except ValueError as e:
return make_response(message=str(e), data=None, status_code=400)
except ValidationError as e:
return make_response(
message="validation error", data=json.loads(e.json()), status_code=400
)
except Exception as e:
return make_error_response(e)
def search_quiz(self, keyword: str, subject_id: str, page: int, limit: int):
try:
quiz, total = self.quiz_service.search_quiz(
keyword, subject_id, page, limit
)
return make_response(
message="success",
data=quiz,
page=page,
page_size=limit,
total_all_data=total,
)
except Exception as e:
return make_error_response(e)
def create_quiz_auto(
self,
reqBody,
):
try:
result = self.question_generate_service.createQuizAutomate(
reqBody["sentence"]
)
return make_response(message="succes labeling", data=result)
except Exception as e:
return make_error_response(e)
def get_user_ans_session(self, body):
try:
session_id = body.get("session_id")
user_id = body.get("user_id")
if not session_id and not user_id:
return make_response(
message="session_id or user_id must be provided", status_code=400
)
data = self.answer_service.get_answer_session(
session_id=session_id,
user_id=user_id,
)
return make_response(message="Successfully retrieved the answer", data=data)
except KeyError as e:
return make_error_response(f"Missing required key: {str(e)}")
except Exception as e:
return make_error_response(f"An error occurred: {str(e)}")

View File

@ -0,0 +1,42 @@
from flask import request, jsonify
from flask.views import MethodView
from app.services.session_service import SessionService
from app.helpers import make_response
class SessionController(MethodView):
def __init__(self, session_service: SessionService):
self.session_service = session_service
def createRoom(self, data):
required_fields = [
"quiz_id",
"host_id",
"limit_participan",
]
for field in required_fields:
if field not in data:
return jsonify({"error": f"Missing field: {field}"}), 400
session = self.session_service.create_session(
quiz_id=data["quiz_id"],
host_id=data["host_id"],
limit_participan=data["limit_participan"],
)
return make_response(
message="succes create room",
data=session,
status_code=201,
)
def summaryall(self, body):
self.session_service.summaryAllSessionData(
session_id=body.get("session_id"), start_time=""
)
return make_response(
message="succes create room",
data="",
status_code=201,
)

View File

@ -0,0 +1,229 @@
from flask_socketio import SocketIO, emit, join_room, leave_room
from flask import request
from app.services import SessionService
import threading
import json
from redis import Redis
class SocketController:
def __init__(
self,
socketio: SocketIO,
session_service: SessionService,
):
self.socketio = socketio
self.session_service = session_service
self._register_events()
def _register_events(self):
@self.socketio.on("connect")
def on_connect():
print(f"Client connected: {request.sid}")
emit("connection_response", {"status": "connected", "sid": request.sid})
@self.socketio.on("disconnect")
def on_disconnect():
print(f"Client disconnected: {request.sid}")
@self.socketio.on("join_room")
def handle_join_room(data):
session_code = data.get("session_code")
user_id = data.get("user_id")
if not session_code or not user_id:
emit("error", {"message": "session_code and user_id are required"})
return
session = self.session_service.join_session(
session_code=session_code,
user_id=user_id,
)
if session is None:
emit("error", {"message": "Failed to join session or session inactive"})
return
session_id = session["session_id"]
join_room(session_id)
if session["is_admin"]:
message = "Admin has joined the room."
else:
message = f"User {session['username']} has joined the room."
emit(
"room_message",
{
"type": "join",
"message": message,
"room": session_id,
"argument": "adm_update",
"data": {
"session_info": session["session_info"],
"quiz_info": session["quiz_info"],
},
},
to=request.sid,
)
emit(
"room_message",
{
"type": "participan_join",
"message": message,
"room": session_id,
"argument": "adm_update",
"data": {
"participants": session["session_info"]["participants"],
},
},
room=session_id,
skip_sid=request.sid,
)
@self.socketio.on("leave_room")
def handle_leave_room(data):
session_id = data.get("session_id")
user_id = data.get("user_id")
username = data.get("username", "anonymous")
leave_result = self.session_service.leave_session(
session_id=session_id,
user_id=user_id,
)
leave_room(session_id)
if leave_result["is_success"]:
emit(
"room_message",
{
"type": "participan_leave",
"message": f"{username} has left the room.",
"room": session_id,
"argument": "adm_update",
"data": {
"participants": leave_result["participants"],
},
},
room=session_id,
skip_sid=request.sid,
)
emit(
"room_message",
{
"type": "leave",
"message": f"{username} has left the room.",
"room": session_id,
"argument": "adm_update",
"data": None,
},
room=session_id,
to=request.sid,
)
@self.socketio.on("send_message")
def on_send_message(data):
session_code = data.get("session_id")
message = data.get("message")
username = data.get("username", "anonymous")
emit(
"receive_message",
{"message": message, "from": username},
room=session_code,
)
@self.socketio.on("end_session")
def handle_end_session(data):
session_code = data.get("session_id")
user_id = data.get("user_id")
if not session_code or not user_id:
emit("error", {"message": "session_id and user_id required"})
return
# Validasi user berhak mengakhiri session
self.session_service.end_session(session_id=session_code, user_id=user_id)
# Bersihkan semua data session di Redis
for key in [
self._answers_key(session_code),
self._scores_key(session_code),
self._questions_key(session_code),
]:
self.redis_repo.delete_key(key)
emit(
"room_closed",
{"message": "Session has ended.", "room": session_code},
room=session_code,
)
@self.socketio.on("start_quiz")
def handle_start_quiz(data):
session_id = data.get("session_id")
if not session_id:
emit("error", {"message": "session_id is required"})
return
emit("quiz_started", {"message": "Quiz has started!"}, room=session_id)
threading.Thread(
target=self.session_service.run_quiz_flow,
args=(session_id, self.socketio),
daemon=True,
).start()
@self.socketio.on("submit_answer")
def handle_submit_answer(data):
session_id = data.get("session_id")
user_id = data.get("user_id")
question_index = data.get("question_index")
user_answer = data.get("answer")
time_spent = data.get("time_spent")
if not all(
[
session_id,
user_id,
question_index is not None,
user_answer is not None,
time_spent is not None,
]
):
emit(
"error",
{
"message": "session_id, user_id, question_index, and answer are required"
},
)
return
try:
result = self.session_service.submit_answer(
session_id=session_id,
user_id=user_id,
question_index=question_index,
answer=user_answer,
time_spent=time_spent,
)
except ValueError as exc:
emit("error", {"message": str(exc)})
return
emit(
"answer_submitted",
{
"question_index": result["question_index"],
"answer": result["answer"],
"correct": result["correct"],
"score": result["scores"],
},
to=request.sid,
)
emit(
"score_update",
{
"scores": self.session_service.get_ranked_scores(session_id),
},
room=session_id,
)

View File

@ -0,0 +1,48 @@
from app.services.subject_service import SubjectService
from app.helpers import make_response, make_error_response
class SubjectController:
def __init__(self, service: SubjectService):
self.service = service
def create(self, req_body):
try:
new_id = self.service.create_subject(req_body)
return make_response(message="Subject created", data={"id": new_id})
except Exception as e:
return make_error_response(e)
def get_all(self):
try:
subjects = self.service.get_all_subjects()
return make_response(message="success retrieve subject", data=subjects)
except Exception as e:
return make_error_response(e)
def get_by_id(self, subject_id: str):
try:
subject = self.service.get_subject_by_id(subject_id)
if not subject:
return make_response(message="Subject not found", status_code=404)
return make_response(data=subject.model_dump())
except Exception as e:
return make_error_response(e)
def update(self, subject_id: str, req_body):
try:
updated = self.service.update_subject(subject_id, req_body)
if not updated:
return make_response(message="No subject updated", status_code=404)
return make_response(message="Subject updated")
except Exception as e:
return make_error_response(e)
def delete(self, subject_id: str):
try:
deleted = self.service.delete_subject(subject_id)
if not deleted:
return make_response(message="No subject deleted", status_code=404)
return make_response(message="Subject deleted")
except Exception as e:
return make_error_response(e)

View File

@ -1,11 +1,12 @@
# /controllers/user_controller.py # /controllers/user_controller.py
from flask import jsonify, request, current_app from flask import jsonify, request, current_app
from services import UserService from app.services import UserService
from schemas import RegisterSchema from app.schemas import RegisterSchema
from pydantic import ValidationError from pydantic import ValidationError
from schemas import ResponseSchema from app.schemas import ResponseSchema
from exception import AlreadyExistException from app.exception import AlreadyExistException, DataNotFoundException
from helpers import make_response from app.helpers import make_response
from app.schemas.requests import ProfileUpdateSchema
class UserController: class UserController:
@ -23,7 +24,6 @@ class UserController:
current_app.logger.error(f"Validation error: {e}") current_app.logger.error(f"Validation error: {e}")
response = ResponseSchema(message="Invalid input", data=None, meta=None) response = ResponseSchema(message="Invalid input", data=None, meta=None)
return make_response("Invalid input", status_code=400) return make_response("Invalid input", status_code=400)
except AlreadyExistException as e: except AlreadyExistException as e:
return make_response("User already exists", status_code=409) return make_response("User already exists", status_code=409)
except Exception as e: except Exception as e:
@ -31,3 +31,86 @@ class UserController:
f"Error during Google login: {str(e)}", exc_info=True f"Error during Google login: {str(e)}", exc_info=True
) )
return make_response("Internal server error", status_code=500) return make_response("Internal server error", status_code=500)
def get_user_by_id(self, user_id):
try:
if not user_id:
return make_response("User ID is required", status_code=400)
user = self.user_service.get_user_by_id(user_id)
if user:
return make_response("User found", data=user)
else:
return make_response("User not found", status_code=404)
except Exception as e:
current_app.logger.error(
f"Error while retrieving user: {str(e)}", exc_info=True
)
return make_response(
message="An internal server error occurred. Please try again later.",
status_code=500,
)
def update_profile(self):
try:
body = request.get_json()
reqBody = ProfileUpdateSchema(**body)
result = self.user_service.update_profile(reqBody)
if result:
return make_response(message="User profile updated successfully.")
else:
return make_response(
message="Failed to update user profile. Please check the submitted data.",
status_code=400,
)
except DataNotFoundException as e:
return make_response(message="User data not found.", status_code=404)
except ValueError as e:
return make_response(
message=f"Invalid data provided: {str(e)}", status_code=400
)
except Exception as e:
current_app.logger.error(
f"Error while updating profile: {str(e)}", exc_info=True
)
return make_response(
message="An internal server error occurred. Please try again later.",
status_code=500,
)
def change_password(self):
try:
body = request.get_json()
user_id = body.get("id")
current_password = body.get("current_password")
new_password = body.get("new_password")
if not all([user_id, current_password, new_password]):
return make_response(
message="Missing required fields: id, current_password, new_password",
status_code=400,
)
result = self.user_service.change_password(
user_id, current_password, new_password
)
if result:
return make_response(message="Password changed successfully.")
else:
return make_response(
message="Failed to change password.",
status_code=400,
)
except DataNotFoundException as e:
return make_response(message="User data not found.", status_code=404)
except ValueError as e:
return make_response(message=f"{str(e)}", status_code=400)
except Exception as e:
current_app.logger.error(
f"Error while changing password: {str(e)}", exc_info=True
)
return make_response(
message="An internal server error occurred. Please try again later.",
status_code=500,
)

View File

@ -1,5 +1,6 @@
from flask_pymongo import PyMongo from flask_pymongo import PyMongo
from flask import Flask, current_app from flask import Flask, current_app
from .seed.subject_seed import seed_subjects
def init_db(app: Flask) -> PyMongo: def init_db(app: Flask) -> PyMongo:
@ -8,8 +9,8 @@ def init_db(app: Flask) -> PyMongo:
mongo.cx.server_info() mongo.cx.server_info()
app.logger.info("MongoDB connection established") app.logger.info("MongoDB connection established")
seed_subjects(mongo)
return mongo return mongo
except Exception as e: except Exception as e:
app.logger.error(f"MongoDB connection failed: {e}") app.logger.error(f"MongoDB connection failed: {e}")
return None # Handle failure gracefully return None

View File

@ -0,0 +1,42 @@
from flask_pymongo import PyMongo
def seed_subjects(mongo: PyMongo):
subject_collection = mongo.db.subjects
base_subjects = [
{
"name": "Ilmu Pengetahuan Alam",
"short_name": "IPA",
"description": "Pelajaran tentang sains dan alam",
},
{
"name": "Ilmu Pengetahuan Sosial",
"short_name": "IPS",
"description": "Pelajaran tentang masyarakat dan geografi",
},
{
"name": "Sejarah",
"short_name": "Sejarah",
"description": "Pelajaran mengenai sejarah di indonesia",
},
{
"name": "Matematika",
"short_name": "Matematika",
"description": "Pelajaran tentang angka dan logika",
},
{
"name": "Bahasa Indonesia",
"short_name": "B.Indonesia",
"description": "Pelajaran tentang bahasa nasional",
},
{
"name": "Sejarah",
"short_name": "Sejarah",
"description": "Pelajaran sejarah Indonesia",
},
]
for subject in base_subjects:
if not subject_collection.find_one({"name": subject["name"]}):
subject_collection.insert_one(subject)

View File

@ -1,26 +1,132 @@
from dependency_injector import containers, providers from dependency_injector import containers, providers
from controllers import UserController, AuthController, QuizController from app.repositories import (
from repositories import UserRepository, QuizRepository, UserAnswerRepository UserRepository,
from services import UserService, AuthService, QuizService, AnswerService QuizRepository,
UserAnswerRepository,
SubjectRepository,
SessionRepository,
NERSRLRepository,
SessionMemoryRepository,
QuizMemoryRepository,
AnswerMemoryRepository,
ScoreMemoryRepository,
QuestionGenerationRepository,
AnswerGenerationRepository,
)
from app.services import (
UserService,
AuthService,
QuizService,
AnswerService,
HistoryService,
SubjectService,
SessionService,
QuestionGenerationService,
)
from app.controllers import (
UserController,
AuthController,
QuizController,
HistoryController,
SubjectController,
SocketController,
SessionController,
)
class Container(containers.DeclarativeContainer): class Container(containers.DeclarativeContainer):
"""Dependency Injection Container""" """Dependency Injection Container"""
mongo = providers.Dependency() mongo = providers.Dependency()
redis = providers.Dependency()
socketio = providers.Dependency()
# repository # repository
user_repository = providers.Factory(UserRepository, mongo.provided.db) user_repository = providers.Factory(UserRepository, mongo.provided.db)
quiz_repository = providers.Factory(QuizRepository, mongo.provided.db) quiz_repository = providers.Factory(QuizRepository, mongo.provided.db)
answer_repository = providers.Factory(UserAnswerRepository, mongo.provided.db) answer_repository = providers.Factory(UserAnswerRepository, mongo.provided.db)
subject_repository = providers.Factory(SubjectRepository, mongo.provided.db)
session_repository = providers.Factory(SessionRepository, mongo.provided.db)
ner_srl_repository = providers.Factory(NERSRLRepository)
question_generation_repository = providers.Factory(QuestionGenerationRepository)
answer_generator_repository = providers.Factory(AnswerGenerationRepository)
session_memory_repository = providers.Factory(SessionMemoryRepository, redis)
quiz_memory_repository = providers.Factory(QuizMemoryRepository, redis)
answer_memory_repository = providers.Factory(AnswerMemoryRepository, redis)
score_memory_repository = providers.Factory(ScoreMemoryRepository, redis)
# services # services
auth_service = providers.Factory(AuthService, user_repository) auth_service = providers.Factory(
user_service = providers.Factory(UserService, user_repository) AuthService,
quiz_service = providers.Factory(QuizService, quiz_repository) user_repository,
answer_service = providers.Factory(AnswerService, answer_repository) )
user_service = providers.Factory(
UserService,
user_repository,
)
quiz_service = providers.Factory(
QuizService,
quiz_repository,
user_repository,
subject_repository,
)
answer_service = providers.Factory(
AnswerService,
answer_repository,
quiz_repository,
user_repository,
)
history_service = providers.Factory(
HistoryService,
quiz_repository,
answer_repository,
session_repository,
user_repository,
)
subject_service = providers.Factory(
SubjectService,
subject_repository,
)
session_service = providers.Factory(
SessionService,
session_repository,
session_memory_repository,
quiz_memory_repository,
answer_memory_repository,
score_memory_repository,
user_repository,
quiz_repository,
answer_repository,
)
question_generation_service = providers.Factory(
QuestionGenerationService,
ner_srl_repository,
question_generation_repository,
answer_generator_repository,
)
# controllers # controllers
auth_controller = providers.Factory(AuthController, user_service, auth_service) auth_controller = providers.Factory(AuthController, user_service, auth_service)
user_controller = providers.Factory(UserController, user_service) user_controller = providers.Factory(UserController, user_service)
quiz_controller = providers.Factory(QuizController, quiz_service, answer_service) quiz_controller = providers.Factory(
QuizController,
quiz_service,
answer_service,
question_generation_service,
)
history_controller = providers.Factory(HistoryController, history_service)
subject_controller = providers.Factory(SubjectController, subject_service)
socket_controller = providers.Factory(
SocketController,
socketio,
session_service,
)
session_controller = providers.Factory(SessionController, session_service)

View File

@ -1,10 +1,12 @@
from .auth_exception import AuthException from .auth_exception import AuthException
from .already_exist_exception import AlreadyExistException from .already_exist_exception import AlreadyExistException
from .data_not_found_exception import DataNotFoundException from .data_not_found_exception import DataNotFoundException
from .validation_exception import ValidationException
__all__ = [ __all__ = [
"AuthException", "AuthException",
"AlreadyExistException", "AlreadyExistException",
"DataNotFoundException", "DataNotFoundException",
"ValidationException",
] ]

View File

@ -8,3 +8,6 @@ class BaseExceptionTemplate(Exception):
def __str__(self): def __str__(self):
return f"{self.__class__.__name__}: {self.message}" return f"{self.__class__.__name__}: {self.message}"
def json(self):
return {"error": self.__class__.__name__, "message": self.message}

View File

@ -0,0 +1,8 @@
from .base_exception import BaseExceptionTemplate
class ValidationException(BaseExceptionTemplate):
"""Exception for validation"""
def __init__(self, message: str = "validation error, check yout input"):
super().__init__(message, status_code=400)

View File

@ -1,4 +1,9 @@
from .response_helper import make_response, make_error_response from .response_helper import make_response, make_error_response
from .datetime_util import DatetimeUtil
__all__ = ["make_response", "make_error_response"] __all__ = [
"make_response",
"make_error_response",
"DatetimeUtil",
]

View File

@ -0,0 +1,44 @@
from datetime import datetime
from zoneinfo import ZoneInfo
class DatetimeUtil:
@staticmethod
def now():
"""Waktu UTC (timezone-aware)"""
return datetime.now(tz=ZoneInfo("UTC"))
@staticmethod
def now_iso():
"""Waktu UTC dalam format ISO 8601 string"""
return datetime.now(tz=ZoneInfo("UTC")).isoformat()
@staticmethod
def now_jakarta():
"""Waktu sekarang di zona Asia/Jakarta (WIB)"""
return datetime.now(tz=ZoneInfo("Asia/Jakarta"))
@staticmethod
def to_string(dt: datetime, fmt: str = "%Y-%m-%d %H:%M:%S") -> str:
"""Convert UTC datetime to Asia/Jakarta time and format as string"""
if dt.tzinfo is None:
dt = dt.replace(tzinfo=ZoneInfo("UTC"))
jakarta_time = dt.astimezone(ZoneInfo("Asia/Jakarta"))
return jakarta_time.strftime(fmt)
@staticmethod
def from_string(
date_str: str, fmt: str = "%Y-%m-%d %H:%M:%S", tz: str = "UTC"
) -> datetime:
"""Convert string ke datetime dengan timezone"""
dt = datetime.strptime(date_str, fmt)
return dt.replace(tzinfo=ZoneInfo(tz))
@staticmethod
def from_iso(date_str: str, tz: str = "UTC") -> datetime:
"""Convert ISO 8601 string to datetime with timezone awareness"""
dt = datetime.fromisoformat(date_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=ZoneInfo(tz))
return dt

View File

@ -1,15 +1,37 @@
from flask import jsonify, current_app from flask import jsonify, current_app
from typing import Optional, Union from typing import Optional, Union
from schemas import ResponseSchema from app.schemas import ResponseSchema, MetaSchema
import math
def calculate_total_page(total_data: int, page_size: int) -> int:
if not page_size or page_size <= 0:
return 1
return math.ceil(total_data / page_size)
def make_response( def make_response(
message: str, message: str,
data: Optional[dict] = None, data: Optional[Union[dict, list]] = None,
meta: Optional[dict] = None, page: Optional[int] = None,
page_size: Optional[int] = None,
total_all_data: Optional[int] = None,
status_code: int = 200, status_code: int = 200,
): ):
response = ResponseSchema(message=message, data=data, meta=meta) meta = None
if page is not None and page_size is not None and total_all_data is not None:
meta = MetaSchema(
current_page=page,
total_all_data=total_all_data,
total_data=len(data) if data else 0,
total_page=calculate_total_page(total_all_data, page_size),
)
response = ResponseSchema(
message=message,
data=data,
meta=meta,
)
return jsonify(response.model_dump()), status_code return jsonify(response.model_dump()), status_code

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1,52 +1,75 @@
from di_container import Container import eventlet
from configs import Config, LoggerConfig
from flask import Flask eventlet.monkey_patch()
from blueprints import auth_blueprint, user_blueprint, quiz_bp, default_blueprint
from database import init_db
import logging import logging
from flask import Flask
from flask_socketio import SocketIO
from app.di_container import Container
from app.configs import Config, LoggerConfig
from app.blueprints import (
auth_blueprint,
user_blueprint,
quiz_bp,
default_blueprint,
history_blueprint,
subject_blueprint,
session_bp,
)
from app.database import init_db
from redis import Redis
def createApp() -> Flask: def createApp() -> tuple[Flask, SocketIO]:
app = Flask(__name__) app = Flask(__name__)
app.config.from_object(Config)
LoggerConfig.init_logger(app)
logging.basicConfig( logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s" level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
) )
app.config.from_object(Config)
LoggerConfig.init_logger(app)
container = Container() container = Container()
app.container = container app.container = container
mongo = init_db(app) mongo = init_db(app)
if mongo is not None: if mongo is not None:
container.mongo.override(mongo) container.mongo.override(mongo)
# container.wire(modules=["blueprints.auth"]) redis_url = Config().REDIS_URL
# container.wire(modules=["blueprints.user"]) redis_client = Redis.from_url(redis_url)
# container.wire(modules=["blueprints.quiz"]) redis_client.ping()
container.redis.override(redis_client)
socketio = SocketIO(
cors_allowed_origins="*",
# message_queue=redis_url,
async_mode="eventlet",
)
container.socketio.override(socketio)
container.socket_controller()
socketio.init_app(app)
container.wire( container.wire(
modules=[ modules=[
"blueprints.auth", "app.blueprints.auth",
"blueprints.user", "app.blueprints.user",
"blueprints.quiz", "app.blueprints.quiz",
"app.blueprints.history",
"app.blueprints.subject",
"app.blueprints.session",
] ]
) )
# Register Blueprints
app.register_blueprint(default_blueprint) app.register_blueprint(default_blueprint)
app.register_blueprint(auth_blueprint, url_prefix="/api") app.register_blueprint(auth_blueprint, url_prefix="/api")
app.register_blueprint(user_blueprint, url_prefix="/api") app.register_blueprint(user_blueprint, url_prefix="/api")
app.register_blueprint(quiz_bp, url_prefix="/api/quiz") app.register_blueprint(quiz_bp, url_prefix="/api/quiz")
app.register_blueprint(history_blueprint, url_prefix="/api/history")
app.register_blueprint(subject_blueprint, url_prefix="/api/subject")
app.register_blueprint(session_bp, url_prefix="/api/session")
for rule in app.url_map.iter_rules(): return app, socketio
print(f"Route: {rule} -> Methods: {rule.methods}")
return app
if __name__ == "__main__":
app = createApp()
app.run(host="0.0.0.0", debug=Config.DEBUG)

View File

@ -1,8 +1,10 @@
from .user_mapper import UserMapper from .user_mapper import UserMapper
from .quiz_mapper import map_quiz_entity_to_schema from .quiz_mapper import QuizMapper
from .subject_mapper import SubjectMapper
__all__ = [ __all__ = [
"UserMapper", "UserMapper",
"map_quiz_entity_to_schema", "QuizMapper",
"SubjectMapper",
] ]

View File

@ -1,26 +1,92 @@
from models import QuizEntity, QuestionItemEntity from datetime import datetime
from schemas import QuizGetSchema, QuestionItemSchema from app.helpers import DatetimeUtil
from app.models import QuizEntity, QuestionItemEntity, UserEntity
from app.models.entities import SubjectEntity
from app.schemas import QuizGetSchema, QuestionItemSchema
from app.schemas.response import ListingQuizResponse
from app.schemas.requests import QuizCreateSchema
def map_question_entity_to_schema(entity: QuestionItemEntity) -> QuestionItemSchema: class QuizMapper:
@staticmethod
def map_question_entity_to_schema(entity: QuestionItemEntity) -> QuestionItemSchema:
return QuestionItemSchema( return QuestionItemSchema(
index=entity.index,
question=entity.question, question=entity.question,
target_answer=entity.target_answer, target_answer=entity.target_answer,
duration=entity.duration, duration=entity.duration,
type=entity.type, type=entity.type,
options=entity.options,
) )
@staticmethod
def map_question_schema_to_entity(schema: QuestionItemSchema) -> QuestionItemEntity:
return QuestionItemEntity(
index=schema.index,
question=schema.question,
target_answer=schema.target_answer,
duration=schema.duration,
type=schema.type,
options=schema.options,
)
def map_quiz_entity_to_schema(entity: QuizEntity) -> QuizGetSchema: @staticmethod
def map_quiz_entity_to_schema(
entity: QuizEntity,
subjectE: SubjectEntity,
) -> QuizGetSchema:
return QuizGetSchema( return QuizGetSchema(
id=str(entity.id),
author_id=entity.author_id, author_id=entity.author_id,
subject_id=str(subjectE.id),
subject_alias=subjectE.short_name,
title=entity.title, title=entity.title,
description=entity.description, description=entity.description,
is_public=entity.is_public, is_public=entity.is_public,
date=entity.date.strftime("%Y-%m-%d %H:%M:%S") if entity.date else None, date=DatetimeUtil.to_string(entity.date, "%d-%m-%Y"),
time=DatetimeUtil.to_string(entity.date, "%H:%M:%S"),
total_quiz=entity.total_quiz or 0, total_quiz=entity.total_quiz or 0,
limit_duration=entity.limit_duration or 0, limit_duration=entity.limit_duration or 0,
question_listings=[ question_listings=[
map_question_entity_to_schema(q) for q in entity.question_listings or [] QuizMapper.map_question_entity_to_schema(q)
for q in entity.question_listings or []
], ],
) )
@staticmethod
def map_quiz_schema_to_entity(
schema: QuizCreateSchema,
datetime: datetime,
total_duration: int,
) -> QuizEntity:
return QuizEntity(
author_id=schema.author_id,
subject_id=schema.subject_id,
title=schema.title,
description=schema.description,
is_public=schema.is_public,
date=datetime,
total_quiz=len(schema.question_listings),
limit_duration=total_duration,
question_listings=[
QuizMapper.map_question_schema_to_entity(q)
for q in schema.question_listings or []
],
)
@staticmethod
def quiz_to_recomendation_mapper(
quiz_entity: QuizEntity,
user_entity: UserEntity,
) -> ListingQuizResponse:
return ListingQuizResponse(
quiz_id=str(quiz_entity.id),
author_id=str(user_entity.id),
author_name=user_entity.name,
title=quiz_entity.title,
description=quiz_entity.description,
date=quiz_entity.date.strftime("%d-%B-%Y") if quiz_entity.date else None,
duration=quiz_entity.limit_duration,
total_quiz=quiz_entity.total_quiz,
)

View File

@ -0,0 +1,12 @@
from app.schemas.requests import SubjectCreateRequest
from app.models.entities import SubjectEntity
class SubjectMapper:
@staticmethod
def to_entity(data: SubjectCreateRequest) -> SubjectEntity:
return SubjectEntity(
name=data.name,
short_name=data.alias,
description=data.description,
)

View File

@ -1,7 +1,9 @@
from datetime import datetime from datetime import datetime
from typing import Dict, Optional from typing import Dict, Optional
from models import UserEntity, UserResponseModel from app.models import UserEntity
from schemas import RegisterSchema from app.schemas import RegisterSchema
from app.schemas.response import LoginResponseSchema
from app.helpers import DatetimeUtil
class UserMapper: class UserMapper:
@ -41,16 +43,15 @@ class UserMapper:
) )
@staticmethod @staticmethod
def user_entity_to_response(user: UserEntity) -> UserResponseModel: def user_entity_to_response(user: UserEntity) -> LoginResponseSchema:
return UserResponseModel( return LoginResponseSchema(
id=str(user.id) if user.id else None, id=str(user.id) if user.id else None,
google_id=user.google_id,
email=user.email, email=user.email,
name=user.name, name=user.name,
birth_date=user.birth_date, birth_date=(
DatetimeUtil.to_string(user.birth_date) if user.birth_date else None
),
pic_url=user.pic_url, pic_url=user.pic_url,
phone=user.phone, phone=user.phone,
locale=user.locale, locale=user.locale,
# created_at=user.created_at,
# updated_at=user.updated_at,
) )

View File

@ -3,6 +3,9 @@ from .base import PyObjectId
from .quiz_entity import QuizEntity from .quiz_entity import QuizEntity
from .question_item_entity import QuestionItemEntity from .question_item_entity import QuestionItemEntity
from .user_answer_entity import UserAnswerEntity from .user_answer_entity import UserAnswerEntity
from .answer_item import AnswerItemEntity
from .subject_entity import SubjectEntity
from .session_entity import SessionEntity
__all__ = [ __all__ = [
"UserEntity", "UserEntity",
@ -10,4 +13,7 @@ __all__ = [
"QuizEntity", "QuizEntity",
"QuestionItemEntity", "QuestionItemEntity",
"UserAnswerEntity", "UserAnswerEntity",
"AnswerItemEntity",
"SubjectEntity",
"SessionEntity",
] ]

View File

@ -1,11 +1,9 @@
from pydantic import BaseModel from pydantic import BaseModel
from typing import Union
class AnswerItemEntity(BaseModel): class AnswerItemEntity(BaseModel):
question_index: int question_index: int
question: str answer: Union[str | int | bool]
answer: str
correct_answer: str
is_correct: bool is_correct: bool
duration: int
time_spent: float time_spent: float

View File

@ -1,12 +1,11 @@
from typing import Optional, List from typing import Optional, List, Union
from pydantic import BaseModel from pydantic import BaseModel
from datetime import datetime
from .base import PyObjectId
class QuestionItemEntity(BaseModel): class QuestionItemEntity(BaseModel):
_id: Optional[PyObjectId] = None index: int
question: str question: str
target_answer: str target_answer: Union[str, bool, int]
duration: int duration: int
type: str # "isian" | "true_false" type: str
options: Optional[List[str]] = None

View File

@ -1,21 +1,24 @@
from typing import Optional from typing import Optional
from pydantic import BaseModel from pydantic import BaseModel, Field
from datetime import datetime from datetime import datetime
from .base import PyObjectId from .base import PyObjectId
from .question_item_entity import QuestionItemEntity from .question_item_entity import QuestionItemEntity
class QuizEntity(BaseModel): class QuizEntity(BaseModel):
_id: Optional[PyObjectId] = None id: Optional[PyObjectId] = Field(default=None, alias="_id")
author_id: Optional[str] = None author_id: Optional[str] = None
subject_id: str
title: str title: str
description: Optional[str] = None description: Optional[str] = None
is_public: bool = False is_public: bool = False
date: Optional[datetime] = None date: datetime
total_quiz: Optional[int] = 0 total_quiz: int = 0
limit_duration: Optional[int] = 0 limit_duration: Optional[int] = 0 # in
total_user_playing: int = 0
question_listings: Optional[list[QuestionItemEntity]] = [] question_listings: Optional[list[QuestionItemEntity]] = []
class Config: class ConfigDict:
arbitrary_types_allowed = True arbitrary_types_allowed = True
populate_by_name = True
json_encoders = {PyObjectId: str} json_encoders = {PyObjectId: str}

View File

@ -0,0 +1,18 @@
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel, Field
from app.models.entities import PyObjectId
class SessionEntity(BaseModel):
id: Optional[PyObjectId] = Field(default=None, alias="_id")
session_code: str
quiz_id: str
host_id: str
created_at: datetime = Field(default_factory=datetime.now)
started_at: datetime | None = None
ended_at: datetime | None = None
is_active: bool = True
participan_limit: int = 10
participants: List[dict] = []
current_question_index: int = 0

View File

@ -0,0 +1,17 @@
from typing import Optional
from bson import ObjectId
from pydantic import BaseModel, Field
from app.models.entities import PyObjectId
class SubjectEntity(BaseModel):
id: Optional[PyObjectId] = Field(default=None, alias="_id")
name: str
short_name: str
description: Optional[str] = None
icon: Optional[str] = None
class ConfigDict:
populate_by_name = True
json_encoders = {ObjectId: str}
json_schema_extra = {}

View File

@ -8,17 +8,16 @@ from .base import PyObjectId
class UserAnswerEntity(BaseModel): class UserAnswerEntity(BaseModel):
_id: Optional[PyObjectId] = None id: Optional[PyObjectId] = Field(default=None, alias="_id")
session_id: Optional[PyObjectId] session_id: Optional[str]
quiz_id: PyObjectId quiz_id: str
user_id: str user_id: str
answered_at: datetime answered_at: datetime
answers: List[AnswerItemEntity] answers: List[AnswerItemEntity]
total_score: int total_score: int
total_correct: int total_correct: int
total_questions: int
class Config: class ConfigDict:
populate_by_name = True populate_by_name = True
arbitrary_types_allowed = True arbitrary_types_allowed = True
json_encoders = {ObjectId: str} json_encoders = {ObjectId: str}

View File

@ -1,5 +1,5 @@
from typing import Optional from typing import Optional
from pydantic import BaseModel, Field from pydantic import BaseModel, Field, ConfigDict
from datetime import datetime from datetime import datetime
from .base import PyObjectId from .base import PyObjectId
@ -17,6 +17,4 @@ class UserEntity(BaseModel):
created_at: Optional[datetime] = None created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None updated_at: Optional[datetime] = None
class Config: model_config = ConfigDict(populate_by_name=True, json_encoders={PyObjectId: str})
populate_by_name = True
json_encoders = {PyObjectId: str}

View File

@ -13,7 +13,7 @@ class UserResponseModel(BaseModel):
phone: Optional[str] = None phone: Optional[str] = None
locale: str locale: str
class Config: class ConfigDict:
populate_by_name = True populate_by_name = True
json_encoders = { json_encoders = {
datetime: lambda v: v.isoformat(), datetime: lambda v: v.isoformat(),

View File

@ -1,9 +1,27 @@
from .user_repository import UserRepository from .user_repository import UserRepository
from .quiz_repositroy import QuizRepository from .quiz_repositroy import QuizRepository
from .answer_repository import UserAnswerRepository from .answer_repository import UserAnswerRepository
from .subject_repository import SubjectRepository
from .session_repostory import SessionRepository
from .ner_srl_repository import NERSRLRepository
from .session_memory_repository import SessionMemoryRepository
from .quiz_memory_repository import QuizMemoryRepository
from .answer_memory_repository import AnswerMemoryRepository
from .score_memory_repository import ScoreMemoryRepository
from .question_generation_repository import QuestionGenerationRepository
from .answer_generation_repository import AnswerGenerationRepository
__all__ = [ __all__ = [
"UserRepository", "UserRepository",
"QuizRepository", "QuizRepository",
"UserAnswerRepository", "UserAnswerRepository",
"SubjectRepository",
"SessionRepository",
"NERSRLRepository",
"SessionMemoryRepository",
"QuizMemoryRepository",
"AnswerMemoryRepository",
"ScoreMemoryRepository",
"QuestionGenerationRepository",
"AnswerGenerationRepository",
] ]

View File

@ -0,0 +1,70 @@
import json
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import load_model # type: ignore
from tensorflow.keras.preprocessing.sequence import pad_sequences # type: ignore
from tensorflow.keras.preprocessing.text import tokenizer_from_json # type: ignore
import re
class AnswerGenerationRepository:
MODEL_PATH = "app/lstm_model/question_generation/qa_lstm_model_final_v2.keras"
TOKENIZER_PATH = "app/lstm_model/question_generation/qa_tokenizers_v2.json"
def __init__(self):
with open(self.TOKENIZER_PATH, "r") as f:
tokenizer_data = json.load(f)
self.tokenizer = tokenizer_from_json(tokenizer_data["word_tokenizer"])
self.ner_tokenizer = tokenizer_from_json(tokenizer_data["ner_tokenizer"])
self.srl_tokenizer = tokenizer_from_json(tokenizer_data["srl_tokenizer"])
self.answer_tokenizer = tokenizer_from_json(tokenizer_data["answer_tokenizer"])
self.q_type_tokenizer = tokenizer_from_json(tokenizer_data["q_type_tokenizer"])
self.max_context_len = tokenizer_data["max_context_len"]
self.max_question_len = tokenizer_data["max_question_len"]
self.max_token_len = tokenizer_data["max_token_len"]
self.q_type_vocab_size = len(self.q_type_tokenizer.word_index) + 1
self.model = load_model(self.MODEL_PATH)
def preprocess_text(self, text):
text = text.lower()
text = re.sub(r"\s+", " ", text).strip()
return text
def predict_answer(self, context, question, tokens, ner, srl, q_type):
context_seq = self.tokenizer.texts_to_sequences([self.preprocess_text(context)])
question_seq = self.tokenizer.texts_to_sequences(
[self.preprocess_text(question)]
)
token_seq = [self.tokenizer.texts_to_sequences([" ".join(tokens)])[0]]
ner_seq = [self.ner_tokenizer.texts_to_sequences([" ".join(ner)])[0]]
srl_seq = [self.srl_tokenizer.texts_to_sequences([" ".join(srl)])[0]]
q_type_idx = self.q_type_tokenizer.word_index.get(q_type, 0)
q_type_cat = tf.keras.utils.to_categorical(
[q_type_idx], num_classes=self.q_type_vocab_size
)
context_pad = pad_sequences(
context_seq, maxlen=self.max_context_len, padding="post"
)
question_pad = pad_sequences(
question_seq, maxlen=self.max_question_len, padding="post"
)
token_pad = pad_sequences(token_seq, maxlen=self.max_token_len, padding="post")
ner_pad = pad_sequences(ner_seq, maxlen=self.max_token_len, padding="post")
srl_pad = pad_sequences(srl_seq, maxlen=self.max_token_len, padding="post")
prediction = self.model.predict(
[context_pad, question_pad, token_pad, ner_pad, srl_pad, q_type_cat],
verbose=0,
)
answer_idx = np.argmax(prediction[0])
for word, idx in self.answer_tokenizer.word_index.items():
if idx == answer_idx:
return word
return "Unknown"

View File

@ -0,0 +1,138 @@
import json
from typing import Any, Dict, List
from redis import Redis
class AnswerMemoryRepository:
KEY_TEMPLATE = "answer:{session_id}:{user_id}"
KEY_PATTERN_TEMPLATE = "answer:{session_id}:*"
def __init__(self, redis: Redis):
self.redis = redis
def _build_key(self, session_id: str, user_id: str) -> str:
return self.KEY_TEMPLATE.format(session_id=session_id, user_id=user_id)
def _build_pattern_key(self, session_id: str) -> str:
return self.KEY_PATTERN_TEMPLATE.format(session_id=session_id)
def initialize_empty_answers(
self,
session_id: str,
user_ids: List[str],
total_questions: int,
):
"""
Initialize empty answers for all users at the start of the quiz.
"""
for user_id in user_ids:
key = self._build_key(session_id, user_id)
answers = [
{
"question_index": idx + 1,
"answer": "",
"is_true": False,
"time_spent": 0.0,
}
for idx in range(total_questions)
]
self.set_data(key, answers)
def save_user_answer(
self,
session_id: str,
user_id: str,
question_index: int,
answer: Any,
correct: bool,
time_spent: float,
):
"""
Update user's answer for a specific question.
Assumes answers have been initialized.
"""
key = self._build_key(session_id, user_id)
answers = self.get_data(key) or []
for ans in answers:
if ans.get("question_index") == question_index:
ans.update(
{
"answer": answer,
"is_true": correct,
"time_spent": time_spent,
}
)
break
self.set_data(key, answers)
def get_user_answers(self, session_id: str, user_id: str) -> List[Dict[str, Any]]:
key = self._build_key(session_id, user_id)
return self.get_data(key) or []
def get_all_user_answers(self, session_id: str) -> Dict[str, List[Dict[str, Any]]]:
pattern = self._build_pattern_key(session_id)
keys = self.redis.keys(pattern)
all_answers = {}
for key in keys:
user_id = key.decode().split(":")[-1]
all_answers[user_id] = self.get_data(key) or []
return all_answers
def delete_all_answers(self, session_id: str):
pattern = self._build_pattern_key(session_id)
keys = self.redis.keys(pattern)
if keys:
self.redis.delete(*keys)
def set_data(self, key: str, value: Any):
self.redis.set(key, json.dumps(value))
def get_data(self, key: str) -> Any:
data = self.redis.get(key)
return json.loads(data) if data else None
def auto_fill_incorrect_answers(
self,
session_id: str,
question_index: int,
default_time_spent: float = 0.0,
) -> List[str]:
"""
Auto-fill unanswered specific question (by index) as incorrect for all users.
:return: List of user IDs who had not answered the specific question.
"""
pattern = self._build_pattern_key(session_id)
keys = self.redis.keys(pattern)
users_with_unanswered = []
for key in keys:
answers = self.get_data(key) or []
has_unanswered = False
for ans in answers:
if (
ans.get("question_index") == question_index
and ans.get("answer") == ""
):
has_unanswered = True
ans.update(
{
"answer": "",
"is_true": False,
"time_spent": default_time_spent,
}
)
break # No need to check other answers for this user
if has_unanswered:
user_id = key.decode().split(":")[-1]
users_with_unanswered.append(user_id)
self.set_data(key, answers)
return users_with_unanswered

View File

@ -1,7 +1,7 @@
from pymongo.collection import Collection from pymongo.collection import Collection
from bson import ObjectId from bson import ObjectId
from typing import Optional, List from typing import Optional, List
from models import UserAnswerEntity from app.models import UserAnswerEntity
class UserAnswerRepository: class UserAnswerRepository:
@ -9,13 +9,27 @@ class UserAnswerRepository:
self.collection: Collection = db.user_answers self.collection: Collection = db.user_answers
def create(self, answer_session: UserAnswerEntity) -> str: def create(self, answer_session: UserAnswerEntity) -> str:
data = answer_session.model_dump(by_alias=True) data = answer_session.model_dump(by_alias=True, exclude_none=True)
result = self.collection.insert_one(data) result = self.collection.insert_one(data)
return str(result.inserted_id) return str(result.inserted_id)
def get_by_id(self, id: str) -> Optional[dict]: def get_by_id(self, id: str) -> Optional[UserAnswerEntity]:
result = self.collection.find_one({"_id": ObjectId(id)}) result = self.collection.find_one({"_id": ObjectId(id)})
return result if not result:
return None
return UserAnswerEntity(**result)
def get_by_userid_and_sessionid(
self,
user_id: str,
session_id: str,
) -> Optional[UserAnswerEntity]:
result = self.collection.find_one(
{"user_id": user_id, "session_id": session_id}
)
if not result:
return None
return UserAnswerEntity(**result)
def get_by_user_and_quiz(self, user_id: str, quiz_id: str) -> List[dict]: def get_by_user_and_quiz(self, user_id: str, quiz_id: str) -> List[dict]:
result = self.collection.find( result = self.collection.find(
@ -23,6 +37,10 @@ class UserAnswerRepository:
) )
return list(result) return list(result)
def get_by_user(self, user_id: str) -> list[UserAnswerEntity]:
result = self.collection.find({"user_id": user_id})
return [UserAnswerEntity(**doc) for doc in result]
def get_by_session(self, session_id: str) -> List[dict]: def get_by_session(self, session_id: str) -> List[dict]:
result = self.collection.find({"session_id": ObjectId(session_id)}) result = self.collection.find({"session_id": ObjectId(session_id)})
return list(result) return list(result)

View File

@ -0,0 +1,49 @@
import numpy as np
import pickle
from tensorflow.keras.models import load_model # type: ignore
from tensorflow.keras.preprocessing.sequence import pad_sequences # type: ignore
import re
class NERSRLRepository:
def __init__(self):
# Load model and artifacts
self.model = load_model("app/lstm_model/ner_srl/lstm_ner_srl_model.keras")
with open("app/lstm_model/ner_srl/word2idx.pkl", "rb") as f:
self.word2idx = pickle.load(f)
with open("app/lstm_model/ner_srl/tag2idx_ner.pkl", "rb") as f:
self.tag2idx_ner = pickle.load(f)
with open("app/lstm_model/ner_srl/tag2idx_srl.pkl", "rb") as f:
self.tag2idx_srl = pickle.load(f)
self.idx2tag_ner = {i: t for t, i in self.tag2idx_ner.items()}
self.idx2tag_srl = {i: t for t, i in self.tag2idx_srl.items()}
self.PAD_WORD_ID = self.word2idx["PAD"]
self.MAXLEN = self.model.input_shape[1]
def _preprocess_tokens(self, tokens: list[str]) -> np.ndarray:
seq = [self.word2idx.get(tok.lower(), self.word2idx["UNK"]) for tok in tokens]
return pad_sequences(
[seq], maxlen=self.MAXLEN, padding="post", value=self.PAD_WORD_ID
)
def predict_sentence(self, sentence: str) -> dict:
tokens = re.findall(r"\d{1,2}\.\d{2}|\w+|[^\w\s]", sentence.lower())
print(tokens)
seq_padded = self._preprocess_tokens(tokens)
pred_ner_prob, pred_srl_prob = self.model.predict(seq_padded, verbose=0)
pred_ner = pred_ner_prob.argmax(-1)[0][: len(tokens)]
pred_srl = pred_srl_prob.argmax(-1)[0][: len(tokens)]
return {
"tokens": tokens,
"ner": [self.idx2tag_ner[int(i)] for i in pred_ner],
"srl": [self.idx2tag_srl[int(i)] for i in pred_srl],
}
def labeling_token(self, tokens: list[str]) -> dict:
sentence = " ".join(tokens)
return self.predict_sentence(sentence)

View File

@ -0,0 +1,87 @@
import numpy as np
import json
import tensorflow as tf
from tensorflow.keras.preprocessing.text import tokenizer_from_json # type: ignore
from tensorflow.keras.preprocessing.sequence import pad_sequences # type: ignore
from tensorflow.keras.models import load_model # type: ignore
import re
class QuestionGenerationRepository:
# Static paths for model and tokenizer
MODEL_PATH = "app/lstm_model/question_generation/new_model/question_prediction_model_final.h5"
TOKENIZER_PATH = "app/lstm_model/question_generation/new_model/question_prediction_tokenizers.json"
def __init__(self):
"""
Initialize question prediction model with pre-trained model and tokenizers
using static paths
"""
# Load model
self.model = load_model(self.MODEL_PATH)
# Load tokenizers
with open(self.TOKENIZER_PATH, "r") as f:
tokenizer_data = json.load(f)
# Reconstruct tokenizers
self.word_tokenizer = tokenizer_from_json(tokenizer_data["word_tokenizer"])
self.ner_tokenizer = tokenizer_from_json(tokenizer_data["ner_tokenizer"])
self.srl_tokenizer = tokenizer_from_json(tokenizer_data["srl_tokenizer"])
self.q_type_tokenizer = tokenizer_from_json(tokenizer_data["q_type_tokenizer"])
# Get max lengths
self.max_context_len = tokenizer_data["max_context_len"]
self.max_question_len = tokenizer_data["max_question_len"]
self.max_token_len = tokenizer_data["max_token_len"]
# Get vocabulary sizes
self.vocab_size = len(self.word_tokenizer.word_index) + 1
self.q_type_vocab_size = len(self.q_type_tokenizer.word_index) + 1
def preprocess_text(self, text):
"""Basic text preprocessing"""
text = text.lower()
text = re.sub(r"\s+", " ", text).strip()
return text
def predict_question(self, context, tokens, ner, srl, q_type):
"""Prediksi pertanyaan berdasarkan konteks dan fitur lainnya"""
# Preprocess
context = self.preprocess_text(context)
# Convert to sequences
context_seq = self.word_tokenizer.texts_to_sequences([context])[0]
token_seq = self.word_tokenizer.texts_to_sequences([" ".join(tokens)])[0]
ner_seq = self.ner_tokenizer.texts_to_sequences([" ".join(ner)])[0]
srl_seq = self.srl_tokenizer.texts_to_sequences([" ".join(srl)])[0]
# Pad sequences
context_padded = pad_sequences(
[context_seq], maxlen=self.max_context_len, padding="post"
)
token_padded = pad_sequences(
[token_seq], maxlen=self.max_token_len, padding="post"
)
ner_padded = pad_sequences([ner_seq], maxlen=self.max_token_len, padding="post")
srl_padded = pad_sequences([srl_seq], maxlen=self.max_token_len, padding="post")
# Q-type one-hot encoding
q_type_idx = self.q_type_tokenizer.word_index.get(q_type, 0)
q_type_one_hot = tf.keras.utils.to_categorical(
[q_type_idx], num_classes=self.q_type_vocab_size
)
# Predict
pred = self.model.predict(
[context_padded, token_padded, ner_padded, srl_padded, q_type_one_hot]
)
# Convert prediction to words
pred_seq = np.argmax(pred[0], axis=1)
# Convert indices to words
reverse_word_map = {v: k for k, v in self.word_tokenizer.word_index.items()}
pred_words = [reverse_word_map.get(i, "") for i in pred_seq if i != 0]
return " ".join(pred_words)

View File

@ -0,0 +1,32 @@
import json
from typing import Dict, Any, Optional
from redis import Redis
from app.helpers import DatetimeUtil
from app.models.entities import QuizEntity
class QuizMemoryRepository:
KEY_TEMPLATE = "quiz:{session_id}"
def __init__(self, redis: Redis):
self.redis = redis
def _build_key(self, session_id: str) -> str:
return self.KEY_TEMPLATE.format(session_id=session_id)
def set_quiz_for_session(self, session_id: str, quiz_data: QuizEntity):
data = quiz_data.model_dump()
data["id"] = str(data["id"])
data["date"] = DatetimeUtil.to_string(data["date"])
self.redis.set(self._build_key(session_id), json.dumps(data))
def get_quiz_for_session(self, session_id: str) -> Optional[QuizEntity]:
data = self.redis.get(self._build_key(session_id))
if data:
data = json.loads(data)
data["date"] = DatetimeUtil.from_string(data["date"])
return QuizEntity(**data)
return None
def delete_quiz_for_session(self, session_id: str):
self.redis.delete(self._build_key(session_id))

View File

@ -1,15 +1,16 @@
from bson import ObjectId from bson import ObjectId
from typing import List, Optional from typing import List, Optional
from models import QuizEntity from app.models.entities import QuizEntity
from pymongo.database import Database from pymongo.database import Database
from pymongo.collection import Collection
class QuizRepository: class QuizRepository:
def __init__(self, db: Database): def __init__(self, db: Database):
self.collection = db.quiz self.collection: Collection = db.quiz
def create(self, quiz: QuizEntity) -> str: def create(self, quiz: QuizEntity) -> str:
quiz_dict = quiz.dict(by_alias=True, exclude_none=True) quiz_dict = quiz.model_dump(by_alias=True, exclude_none=True)
result = self.collection.insert_one(quiz_dict) result = self.collection.insert_one(quiz_dict)
return str(result.inserted_id) return str(result.inserted_id)
@ -19,16 +20,88 @@ class QuizRepository:
return QuizEntity(**data) return QuizEntity(**data)
return None return None
# def search_by_title_or_category(
# self, keyword: str, page: int, page_size: int
# ) -> List[QuizEntity]:
# skip = (page - 1) * page_size
# pipeline = [
# {
# "$lookup": {
# "from": "category",
# "localField": "category_id",
# "foreignField": "_id",
# "as": "category_info",
# }
# },
# {"$unwind": "$category_info"},
# {
# "$match": {
# "$or": [
# {"title": {"$regex": keyword, "$options": "i"}},
# {"category_info.name": {"$regex": keyword, "$options": "i"}},
# ]
# }
# },
# {"$skip": skip},
# {"$limit": page_size},
# ]
# cursor = self.collection.aggregate(pipeline)
# return [QuizEntity(**doc) for doc in cursor]
def search_by_title_or_category(
self, keyword: str, subject_id: Optional[str], page: int, page_size: int
) -> List[QuizEntity]:
skip = (page - 1) * page_size
query_conditions = [
{"is_public": True},
{
"$or": [
{"title": {"$regex": keyword, "$options": "i"}},
# {"category": {"$regex": keyword, "$options": "i"}},
]
},
]
if subject_id:
query_conditions.append({"subject_id": subject_id})
cursor = (
self.collection.find({"$and": query_conditions}).skip(skip).limit(page_size)
)
return [QuizEntity(**doc) for doc in cursor]
def count_by_search(self, keyword: str) -> int:
return self.collection.count_documents(
{
"$or": [
{"title": {"$regex": keyword, "$options": "i"}},
{"category": {"$regex": keyword, "$options": "i"}},
]
}
)
def get_by_ids(self, quiz_ids: List[str]) -> Optional[List[QuizEntity]]:
object_ids = [ObjectId(qid) for qid in quiz_ids]
cursor = self.collection.find({"_id": {"$in": object_ids}})
datas = list(cursor)
if not datas:
return None
return [QuizEntity(**data) for data in datas]
def get_by_user_id( def get_by_user_id(
self, user_id: str, page: int = 1, page_size: int = 10 self, user_id: str, page: int = 1, page_size: int = 10
) -> List[QuizEntity]: ) -> List[QuizEntity]:
skip = (page - 1) * page_size skip = (page - 1) * page_size
cursor = ( cursor = (
self.collection.find({"user_id": ObjectId(user_id)}) self.collection.find({"author_id": user_id}).skip(skip).limit(page_size)
.skip(skip)
.limit(page_size)
) )
return [QuizEntity(**doc) for doc in cursor]
return [QuizEntity(**data) for data in cursor]
def get_all(self, skip: int = 0, limit: int = 10) -> List[QuizEntity]: def get_all(self, skip: int = 0, limit: int = 10) -> List[QuizEntity]:
cursor = self.collection.find().skip(skip).limit(limit) cursor = self.collection.find().skip(skip).limit(limit)
@ -40,6 +113,27 @@ class QuizRepository:
) )
return result.modified_count > 0 return result.modified_count > 0
def update_user_playing(self, quiz_id: str, total_user: int) -> bool:
result = self.collection.update_one(
{"_id": ObjectId(quiz_id)}, {"$set": {"total_user_playing": total_user}}
)
return result.modified_count > 0
def delete(self, quiz_id: str) -> bool: def delete(self, quiz_id: str) -> bool:
result = self.collection.delete_one({"_id": ObjectId(quiz_id)}) result = self.collection.delete_one({"_id": ObjectId(quiz_id)})
return result.deleted_count > 0 return result.deleted_count > 0
def count_by_user_id(self, user_id: str) -> int:
return self.collection.count_documents({"author_id": user_id})
def get_top_played_quizzes(
self, page: int = 1, limit: int = 3, is_public: bool = True
) -> List[QuizEntity]:
skip = (page - 1) * limit
cursor = (
self.collection.find({"is_public": is_public})
.sort("total_user_playing", -1)
.skip(skip)
.limit(limit)
)
return [QuizEntity(**doc) for doc in cursor]

View File

@ -0,0 +1,28 @@
from typing import Dict
from redis import Redis
class ScoreMemoryRepository:
KEY_TEMPLATE = "score:{session_id}"
def __init__(self, redis: Redis):
self.redis = redis
def _build_key(self, session_id: str) -> str:
return self.KEY_TEMPLATE.format(session_id=session_id)
def update_user_score(self, session_id: str, user_id: str, correct: bool):
hkey = self._build_key(session_id)
field = f"{user_id}:{'correct' if correct else 'incorrect'}"
self.redis.hincrby(hkey, field, 1)
def get_scores(self, session_id: str) -> Dict[str, Dict[str, int]]:
raw = self.redis.hgetall(self._build_key(session_id))
scores = {}
for k, v in raw.items():
uid, category = k.decode().split(":")
scores.setdefault(uid, {"correct": 0, "incorrect": 0})[category] = int(v)
return scores
def delete_scores(self, session_id: str):
self.redis.delete(self._build_key(session_id))

View File

@ -0,0 +1,108 @@
import json
from typing import Dict, Any, Optional
from redis import Redis
from app.helpers import DatetimeUtil
from app.models.entities import SessionEntity
class SessionMemoryRepository:
KEY_TEMPLATE = "session:{session_id}"
KEY_PATTERN = "session:*"
def __init__(self, redis: Redis):
self.redis = redis
def _build_key(self, session_id: str) -> str:
return self.KEY_TEMPLATE.format(session_id=session_id)
def set_data(self, key: str, value: Any):
self.redis.set(key, json.dumps(value))
def get_data(self, key: str) -> Optional[Any]:
data = self.redis.get(key)
return json.loads(data) if data else None
def delete_key(self, key: str):
self.redis.delete(key)
def create_session(self, session_id: str, initial_data: SessionEntity) -> str:
data = initial_data.model_dump()
data["id"] = data["id"]
data["created_at"] = str(data["created_at"])
self.set_data(self._build_key(session_id), data)
return session_id
def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
return self.get_data(self._build_key(session_id))
def close_session(self, session_id: str) -> Optional[Dict[str, Any]]:
session = self.get_data(self._build_key(session_id))
if not session:
return None
session["status"] = "closed"
session["closed_at"] = DatetimeUtil.now_iso()
self.delete_key(self._build_key(session_id))
return session
def find_session_by_code(self, session_code: str) -> Optional[Dict[str, Any]]:
session_keys = self.redis.keys(self.KEY_PATTERN)
for key in session_keys:
session_data = self.get_data(key)
if session_data and session_data.get("session_code") == session_code:
return session_data
return None
def add_user_to_session(
self, session_id: str, user_data: Dict[str, Any] = None
) -> bool:
session = self.get_session(session_id)
if not session:
return False
user_entry = {
**(user_data or {}),
"joined_at": DatetimeUtil.now_iso(),
}
existing_users = session.get("participants", [])
existing_users.append(user_entry)
session["participants"] = existing_users
self.set_data(self._build_key(session_id), session)
return True
def get_user_in_session(self, session_id: str):
session = self.get_session(session_id)
if not session:
return []
return session.get("participants", [])
def remove_user_from_session(self, session_id: str, user_id: str) -> bool:
session = self.get_session(session_id)
if not session:
return False
session["participants"] = [
user
for user in session.get("participants", [])
if user.get("id") != user_id
]
self.set_data(self._build_key(session_id), session)
return True
def delete_session(self, session_id: str) -> bool:
"""
Delete a session by its session_id.
Args:
session_id (str): The ID of the session to delete.
Returns:
bool: True if the session was deleted, False if it did not exist.
"""
key = self._build_key(session_id)
if self.redis.exists(key):
self.delete_key(key)
return True
return False

View File

@ -0,0 +1,49 @@
from pymongo.collection import Collection
from pymongo.database import Database
from typing import Optional
from app.models.entities import SessionEntity
from bson import ObjectId
class SessionRepository:
COLLECTION_NAME = "session"
def __init__(self, db: Database):
self.collection: Collection = db[self.COLLECTION_NAME]
# self.collection.create_index("id", unique=True)
def insert(self, session_data: SessionEntity) -> str:
result = self.collection.insert_one(
session_data.model_dump(by_alias=True, exclude_none=True)
)
return str(result.inserted_id)
def find_by_session_id(self, session_id: str) -> Optional[SessionEntity]:
doc = self.collection.find_one({"_id": ObjectId(session_id)})
return SessionEntity(**doc) if doc else None
def find_by_session_code(self, session_code: str) -> Optional[SessionEntity]:
doc = self.collection.find_one({"session_code": session_code})
return SessionEntity(**doc) if doc else None
def update(self, session_id: str, update_fields: SessionEntity) -> bool:
result = self.collection.update_one(
{"_id": ObjectId(session_id)},
{"$set": update_fields.model_dump(by_alias=True, exclude_none=True)},
)
return result.modified_count > 0
def add_participant(self, session_id: str, user_id: str) -> bool:
"""Add user_id to participants array without duplicates"""
result = self.collection.update_one(
{"_id": session_id}, {"$addToSet": {"participants": user_id}}
)
return result.modified_count > 0
def delete(self, session_id: str) -> bool:
result = self.collection.delete_one({"_id": session_id})
return result.deleted_count > 0
def list_active_sessions(self) -> list[SessionEntity]:
docs = self.collection.find({"is_active": True})
return [SessionEntity(**doc) for doc in docs]

View File

@ -0,0 +1,61 @@
from typing import List, Optional
from pymongo.database import Database
from pymongo.collection import Collection
from bson import ObjectId, errors as bson_errors
from app.models.entities import SubjectEntity
class SubjectRepository:
COLLECTION_NAME = "subjects"
def __init__(self, db: Database):
self.collection: Collection = db[self.COLLECTION_NAME]
def create(self, subject: SubjectEntity) -> str:
subject_dict = subject.model_dump(by_alias=True, exclude_none=True)
result = self.collection.insert_one(subject_dict)
return str(result.inserted_id)
def get_all(self) -> List[SubjectEntity]:
return [SubjectEntity(**doc) for doc in self.collection.find()]
def get_by_id(self, subject_id: str) -> Optional[SubjectEntity]:
try:
oid = ObjectId(subject_id)
except bson_errors.InvalidId:
return None
doc = self.collection.find_one({"_id": oid})
return SubjectEntity(**doc) if doc else None
def get_by_ids(self, subject_ids: List[str]) -> List[SubjectEntity]:
object_ids = []
for sid in subject_ids:
try:
object_ids.append(ObjectId(sid))
except bson_errors.InvalidId:
continue
if not object_ids:
return []
cursor = self.collection.find({"_id": {"$in": object_ids}})
return [SubjectEntity(**doc) for doc in cursor]
def update(self, subject_id: str, update_data: dict) -> bool:
try:
oid = ObjectId(subject_id)
except bson_errors.InvalidId:
return False
result = self.collection.update_one({"_id": oid}, {"$set": update_data})
return result.modified_count > 0
def delete(self, subject_id: str) -> bool:
try:
oid = ObjectId(subject_id)
except bson_errors.InvalidId:
return False
result = self.collection.delete_one({"_id": oid})
return result.deleted_count > 0

View File

@ -1,6 +1,6 @@
from typing import Optional from typing import Optional
from bson import ObjectId from bson import ObjectId
from models import UserEntity from app.models.entities import UserEntity
class UserRepository: class UserRepository:

View File

@ -8,6 +8,12 @@ from .quiz import (
from .answer.answer_request_schema import UserAnswerSchema from .answer.answer_request_schema import UserAnswerSchema
from .answer.answer_item_request_schema import AnswerItemSchema from .answer.answer_item_request_schema import AnswerItemSchema
from .subject.create_subject_schema import SubjectCreateRequest
from .subject.update_subject_schema import SubjectUpdateRequest
from .user.profile_update_schema import ProfileUpdateSchema
from .user.password_change_schema import PasswordChangeSchema
__all__ = [ __all__ = [
"RegisterSchema", "RegisterSchema",
@ -15,4 +21,8 @@ __all__ = [
"QuizCreateSchema", "QuizCreateSchema",
"UserAnswerSchema", "UserAnswerSchema",
"AnswerItemSchema", "AnswerItemSchema",
"SubjectCreateRequest",
"SubjectUpdateRequest",
"PasswordChangeSchema",
"ProfileUpdateSchema",
] ]

View File

@ -1,11 +1,9 @@
from pydantic import BaseModel from pydantic import BaseModel
from typing import Union
class AnswerItemSchema(BaseModel): class AnswerItemSchema(BaseModel):
question_index: int question_index: int
question: str answer: Union[str | int | bool]
answer: str
correct_answer: str
is_correct: bool is_correct: bool
duration: int
time_spent: float time_spent: float

View File

@ -5,11 +5,8 @@ from .answer_item_request_schema import AnswerItemSchema
class UserAnswerSchema(BaseModel): class UserAnswerSchema(BaseModel):
session_id: Optional[str] = None
quiz_id: str quiz_id: str
user_id: str user_id: str
session_id: Optional[str] = None
answered_at: datetime answered_at: datetime
total_score: int
total_correct: int
total_questions: int
answers: List[AnswerItemSchema] answers: List[AnswerItemSchema]

View File

@ -8,8 +8,6 @@ class QuizCreateSchema(BaseModel):
title: str title: str
description: Optional[str] = None description: Optional[str] = None
is_public: bool = False is_public: bool = False
date: Optional[str] = None subject_id: str
total_quiz: Optional[int] = 0
limit_duration: Optional[int] = 0
author_id: Optional[str] = None author_id: Optional[str] = None
question_listings: Optional[List[QuestionItemSchema]] = [] question_listings: Optional[List[QuestionItemSchema]] = []

View File

@ -1,10 +1,11 @@
from typing import List, Optional from typing import List, Optional, Union
from pydantic import BaseModel from pydantic import BaseModel
class QuestionItemSchema(BaseModel): class QuestionItemSchema(BaseModel):
index: int
question: str question: str
target_answer: str target_answer: Union[str, bool, int]
duration: int duration: int
type: str type: str
options: Optional[List[str]] = None options: Optional[List[str]] = None

View File

@ -0,0 +1,82 @@
from pydantic import BaseModel, Field
from typing import List, Dict, Union, Optional
class ConnectionResponse(BaseModel):
status: str = "connected"
sid: str
class JoinRoomRequest(BaseModel):
session_code: str = Field(..., description="Unique code for the quiz session")
user_id: str = Field(..., description="Unique identifier for the user")
class RoomMessageResponse(BaseModel):
type: str = Field(..., description="Type of room message (join/leave)")
message: str
room: str
argument: Optional[str] = None
data: Optional[Dict] = None
# Question Models
class QuestionType:
FILL_THE_BLANK = "fill_the_blank"
TRUE_FALSE = "true_false"
OPTION = "option"
class QuizQuestion(BaseModel):
index: int
question: str
duration: int
type: str = Field(..., description="Type of question: fill_the_blank, true_false, or option")
options: Optional[List[str]] = None
# Answer Submission Models
class AnswerSubmissionRequest(BaseModel):
session_id: str
user_id: str
question_index: int
answer: Union[str, bool, int]
class AnswerSubmissionResponse(BaseModel):
user_id: str
question_index: int
answer: Union[str, bool, int]
correct: bool
# Scoring Models
class UserScore(BaseModel):
correct: int = 0
incorrect: int = 0
class ScoreUpdateResponse(BaseModel):
scores: Dict[str, UserScore]
# Session Management Models
class LeaveRoomRequest(BaseModel):
session_id: str
user_id: str
username: Optional[str] = "anonymous"
class EndSessionRequest(BaseModel):
session_id: str
user_id: str
# Messaging Models
class SendMessageRequest(BaseModel):
session_id: str
message: str
username: Optional[str] = "anonymous"
class ReceiveMessageResponse(BaseModel):
message: str
from_user: str = Field(alias="from")
# Quiz Flow Models
class QuizStartRequest(BaseModel):
session_code: str
class QuizStatusResponse(BaseModel):
message: str
# Error Model
class ErrorResponse(BaseModel):
message: str

View File

@ -0,0 +1,10 @@
from pydantic import BaseModel, Field
from typing import Optional
class SubjectCreateRequest(BaseModel):
name: str = Field(..., example="Ilmu Pengetahuan ALam")
alias: str = Field(..., examples="IPA", alias="short_name")
description: Optional[str] = Field(
None, example="Pelajaran tentang angka dan logika"
)

View File

@ -0,0 +1,7 @@
from pydantic import BaseModel, Field
from typing import Optional
class SubjectUpdateRequest(BaseModel):
name: Optional[str] = Field(None, example="Fisika")
description: Optional[str] = Field(None, example="Pelajaran tentang hukum alam")

View File

@ -0,0 +1,8 @@
from pydantic import BaseModel
class PasswordChangeSchema(BaseModel):
"""Schema for changing user password"""
current_password: str
new_password: str

View File

@ -0,0 +1,12 @@
from typing import Optional
from pydantic import BaseModel
from datetime import datetime
class ProfileUpdateSchema(BaseModel):
id: str
name: Optional[str] = None
birth_date: Optional[str] = None
locale: Optional[str] = None
phone: Optional[str] = None

View File

@ -1,9 +1,26 @@
from .quiz.quiz_creation_response import QuizCreationResponse from .quiz.quiz_creation_response import QuizCreationResponse
from .quiz.quiz_get_response import QuizGetSchema from .quiz.quiz_get_response import QuizGetSchema
from .quiz.question_item_schema import QuestionItemSchema from .quiz.question_item_schema import QuestionItemSchema
from .quiz.quiz_data_rsp_schema import UserQuizListResponse
from .history.history_response import HistoryResultSchema
from .history.detail_history_response import QuizHistoryResponse, QuestionResult
from .recomendation.recomendation_response_schema import ListingQuizResponse
from .subject.get_subject_schema import GetSubjectResponse
from .auth.login_response import LoginResponseSchema
from .user.user_response_scema import UserResponseSchema
from .answer.answer_session_response import AnsweredQuizResponse
__all__ = [ __all__ = [
"QuizCreationResponse", "QuizCreationResponse",
"QuizGetSchema", "QuizGetSchema",
"QuestionItemSchema", "QuestionItemSchema",
"UserQuizListResponse",
"HistoryResultSchema",
"QuizHistoryResponse",
"QuestionResult",
"ListingQuizResponse",
"GetSubjectResponse",
"LoginResponseSchema",
"UserResponseSchema",
"AnsweredQuizResponse",
] ]

View File

@ -0,0 +1,13 @@
from pydantic import BaseModel
from typing import List, Dict
class AnsweredQuizResponse(BaseModel):
id: str
session_id: str
quiz_id: str
user_id: str
answered_at: str
answers: List[Dict]
total_score: int
total_correct: int

View File

@ -0,0 +1,14 @@
from pydantic import BaseModel, EmailStr, field_serializer
from app.models import UserEntity
from typing import Optional
from datetime import datetime
class LoginResponseSchema(BaseModel):
id: str
email: EmailStr
name: str
birth_date: Optional[str] = None
pic_url: Optional[str] = None
phone: Optional[str] = None
locale: Optional[str] = None

View File

@ -0,0 +1,27 @@
from typing import List, Optional, Union
from pydantic import BaseModel
from datetime import datetime
class QuestionResult(BaseModel):
index: int
question: str
type: str
target_answer: Union[str | bool | int]
user_answer: Optional[Union[str | bool | int]]
is_correct: Optional[bool]
time_spent: Optional[float]
options: Optional[List[str]]
class QuizHistoryResponse(BaseModel):
answer_id: str
quiz_id: str
title: str
description: str
author_id: str
answered_at: str
total_correct: int
total_score: int
total_solve_time: float
question_listings: List[QuestionResult]

View File

@ -0,0 +1,12 @@
from pydantic import BaseModel, Field
from typing import Optional
class HistoryResultSchema(BaseModel):
quiz_id: str = Field(..., description="ID dari kuis")
answer_id: str = Field(..., description="ID dari jawaban")
title: str = Field(..., description="Judul kuis")
description: Optional[str] = Field(None, description="Deskripsi kuis")
total_correct: int = Field(..., description="Jumlah jawaban benar")
total_question: int = Field(..., description="Total soal dalam kuis")
date: str

View File

@ -1,8 +1,11 @@
from typing import List, Optional, Union
from pydantic import BaseModel from pydantic import BaseModel
class QuestionItemSchema(BaseModel): class QuestionItemSchema(BaseModel):
index: int
question: str question: str
target_answer: str target_answer: Union[str | int | bool]
duration: int duration: int
type: str type: str
options: Optional[List[str]] = None

View File

@ -0,0 +1,10 @@
from typing import List
from pydantic import BaseModel
from app.schemas.response.recomendation.recomendation_response_schema import (
ListingQuizResponse,
)
class UserQuizListResponse(BaseModel):
total: int
quizzes: List[ListingQuizResponse]

View File

@ -5,11 +5,15 @@ from .question_item_schema import QuestionItemSchema
class QuizGetSchema(BaseModel): class QuizGetSchema(BaseModel):
id: str
author_id: str author_id: str
subject_id: str
subject_alias: str
title: str title: str
description: Optional[str] = None description: Optional[str] = None
is_public: bool = False is_public: bool = False
date: Optional[str] = None date: str
time: str
total_quiz: int = 0 total_quiz: int = 0
limit_duration: int = 0 limit_duration: int = 0
question_listings: List[QuestionItemSchema] = [] question_listings: List[QuestionItemSchema] = []

View File

@ -0,0 +1,12 @@
from pydantic import BaseModel
class ListingQuizResponse(BaseModel):
quiz_id: str
author_id: str
author_name: str
title: str
description: str
date: str
total_quiz: int
duration: int

View File

@ -0,0 +1,13 @@
from pydantic import BaseModel, Field
from typing import Optional
class GetSubjectResponse(BaseModel):
id: str
name: str
alias: str
description: Optional[str]
class ConfigDict:
from_attributes = True
populate_by_name = True

View File

@ -0,0 +1,15 @@
from pydantic import BaseModel
from typing import Optional
class UserResponseSchema(BaseModel):
id: str
google_id: Optional[str]
email: str
name: str
birth_date: Optional[str]
pic_url: Optional[str]
phone: Optional[str]
locale: str
created_at: str
updated_at: str

View File

@ -2,11 +2,18 @@ from .auth_service import AuthService
from .user_service import UserService from .user_service import UserService
from .quiz_service import QuizService from .quiz_service import QuizService
from .answer_service import AnswerService from .answer_service import AnswerService
from .history_service import HistoryService
from .subject_service import SubjectService
from .session_service import SessionService
from .question_generation_service import QuestionGenerationService
__all__ = [ __all__ = [
"AuthService", "AuthService",
"UserService", "UserService",
"QuizService", "QuizService",
"AnswerService", "AnswerService",
"HistoryService",
"SubjectService",
"SessionService",
"QuestionGenerationService",
] ]

View File

@ -1,9 +1,22 @@
from repositories import UserAnswerRepository from app.repositories import UserAnswerRepository, QuizRepository, UserRepository
from app.schemas.requests import UserAnswerSchema
from app.schemas.response import AnsweredQuizResponse
from app.models import UserAnswerEntity
from app.models.entities import AnswerItemEntity
from app.exception import ValidationException
from app.helpers import DatetimeUtil
class AnswerService: class AnswerService:
def __init__(self, answer_repository: UserAnswerRepository): def __init__(
self,
answer_repository: UserAnswerRepository,
quiz_repository: QuizRepository,
user_repositroy: UserRepository,
):
self.answer_repository = answer_repository self.answer_repository = answer_repository
self.quiz_repository = quiz_repository
self.user_repositroy = user_repositroy
def get_answer_by_id(self, answer_id): def get_answer_by_id(self, answer_id):
return self.answer_repository.get_answer_by_id(answer_id) return self.answer_repository.get_answer_by_id(answer_id)
@ -12,11 +25,127 @@ class AnswerService:
if quiz_id is not None: if quiz_id is not None:
return self.answer_repository return self.answer_repository
def create_answer(self, answer_data): def create_answer(self, answer_data: UserAnswerSchema):
return self.answer_repository.create(answer_data) quiz_data = self.quiz_repository.get_by_id(answer_data.quiz_id)
if not quiz_data:
raise ValidationException(message="Quiz not found")
user_data = self.user_repositroy.get_user_by_id(answer_data.user_id)
if not user_data:
raise ValidationException(message="user is not registered")
total_quiz_played = quiz_data.total_user_playing + 1
self.quiz_repository.update_user_playing(
quiz_id=quiz_data.id, total_user=total_quiz_played
)
question_map = {q.index: q for q in quiz_data.question_listings}
answer_item_Entity = []
total_correct = 0
for user_answer in answer_data.answers:
question = question_map.get(user_answer.question_index)
if question is None:
raise ValueError(
f"Question index {user_answer.question_index} tidak ditemukan di kuis."
)
correct = False
if question.type == "fill_the_blank":
correct = (
user_answer.answer.strip().lower()
== question.target_answer.strip().lower()
)
elif question.type == "true_false":
correct = user_answer.answer == question.target_answer
elif question.type == "option":
answer_index = int(user_answer.answer)
if 0 <= answer_index < len(question.options):
correct = str(answer_index) == question.target_answer
else:
raise ValueError(
f"Index jawaban tidak valid untuk soal {question.index}"
)
else:
raise ValueError(f"Tipe soal tidak dikenali: {question.type}")
user_answer.is_correct = correct
if correct:
total_correct += 1
answer_item_Entity.append(
AnswerItemEntity(
question_index=user_answer.question_index,
answer=user_answer.answer,
is_correct=user_answer.is_correct,
time_spent=user_answer.time_spent,
)
)
total_questions = len(quiz_data.question_listings)
total_score = (
total_correct * 100 // total_questions
) # contoh perhitungan: nilai 100 dibagi rata
# Buat entitas yang akan disimpan
answer_entity = UserAnswerEntity(
session_id=answer_data.session_id,
quiz_id=answer_data.quiz_id,
user_id=answer_data.user_id,
answered_at=answer_data.answered_at,
answers=answer_item_Entity,
total_correct=total_correct,
total_questions=total_questions,
total_score=total_score,
)
return self.answer_repository.create(answer_entity)
def update_answer(self, answer_id, answer_data): def update_answer(self, answer_id, answer_data):
return self.answer_repository.update(answer_id, answer_data) return self.answer_repository.update(answer_id, answer_data)
def delete_answer(self, answer_id): def delete_answer(self, answer_id):
return self.answer_repository.delete_by_id(answer_id) return self.answer_repository.delete_by_id(answer_id)
def get_answer_session(self, session_id: str, user_id: str) -> AnsweredQuizResponse:
answer_data: UserAnswerEntity = (
self.answer_repository.get_by_userid_and_sessionid(
session_id=session_id,
user_id=user_id,
)
)
if not answer_data:
return None
quiz = self.quiz_repository.get_by_id(answer_data.quiz_id)
question_listings = quiz.question_listings # List[QuestionItemEntity]
# Mapping question_index ke QuestionItemEntity
question_map = {q.index: q for q in question_listings}
answers_data = []
for ans in answer_data.answers:
question_entity = question_map.get(ans.question_index)
question_fields = question_entity.dict() if question_entity else {}
answers_data.append(
{
**question_fields, # Langsung unpack semua field QuestionItemEntity ke dalam dictionary
"answer": ans.answer,
"is_correct": ans.is_correct,
"time_spent": ans.time_spent,
}
)
data = AnsweredQuizResponse(
id=str(answer_data.id),
session_id=answer_data.session_id,
quiz_id=answer_data.quiz_id,
user_id=answer_data.user_id,
answered_at=DatetimeUtil.to_string(answer_data.answered_at),
answers=answers_data,
total_score=answer_data.total_score,
total_correct=answer_data.total_correct,
)
return data

View File

@ -1,11 +1,14 @@
from schemas import LoginSchema from app.schemas import LoginSchema
from repositories import UserRepository from app.schemas.response import LoginResponseSchema
from mapper import UserMapper from app.repositories import UserRepository
from app.mapper import UserMapper
from google.oauth2 import id_token from google.oauth2 import id_token
from google.auth.transport import requests from google.auth.transport import requests
from configs import Config from app.configs import Config
from exception import AuthException from app.exception import AuthException
from werkzeug.security import check_password_hash from werkzeug.security import check_password_hash
from app.helpers import DatetimeUtil
from app.mapper import UserMapper
class AuthService: class AuthService:
@ -17,23 +20,23 @@ class AuthService:
id_token_str, requests.Request(), Config.GOOGLE_CLIENT_ID id_token_str, requests.Request(), Config.GOOGLE_CLIENT_ID
) )
if not payload:
raise AuthException("Invalid Google ID Token")
google_id = payload.get("sub") google_id = payload.get("sub")
email = payload.get("email") email = payload.get("email")
existing_user = self.user_repository.get_by_google_id(google_id) existing_user = self.user_repository.get_by_google_id(google_id)
if existing_user: if existing_user:
if existing_user.email == email: if existing_user.email == email:
return existing_user
return UserMapper.user_entity_to_response(existing_user)
raise AuthException("Email not match") raise AuthException("Email not match")
new_user = UserMapper.from_google_payload(google_id, email, payload) new_user = UserMapper.from_google_payload(google_id, email, payload)
user_id = self.user_repository.insert_user(user_data=new_user) user_id = self.user_repository.insert_user(user_data=new_user)
return self.user_repository.get_user_by_id(user_id=user_id) user_data = self.user_repository.get_user_by_id(user_id=user_id)
return UserMapper.user_entity_to_response(user_data)
def login(self, data: LoginSchema): def login(self, data: LoginSchema):
user_data = self.user_repository.get_user_by_email(data.email) user_data = self.user_repository.get_user_by_email(data.email)
@ -41,7 +44,8 @@ class AuthService:
if user_data is None: if user_data is None:
return None return None
if check_password_hash(user_data.password, data.password): if not check_password_hash(user_data.password, data.password):
user_data.password = None
return user_data
return None return None
user_data.password = None
return UserMapper.user_entity_to_response(user_data)

View File

@ -0,0 +1,119 @@
from app.repositories import (
UserAnswerRepository,
QuizRepository,
SessionRepository,
UserRepository,
)
from app.schemas.response import (
HistoryResultSchema,
QuizHistoryResponse,
QuestionResult,
)
from app.helpers import DatetimeUtil
class HistoryService:
def __init__(
self,
quiz_repository: QuizRepository,
answer_repository: UserAnswerRepository,
session_repository: SessionRepository,
user_repository: UserRepository,
):
self.quiz_repository = quiz_repository
self.answer_repository = answer_repository
self.session_repository = session_repository
self.user_repository = user_repository
def get_history_by_user_id(self, user_id: str):
answer_data = self.answer_repository.get_by_user(user_id)
if not answer_data:
return []
quiz_ids = [asn.quiz_id for asn in answer_data]
quiz_data = self.quiz_repository.get_by_ids(quiz_ids)
quiz_map = {str(quiz.id): quiz for quiz in quiz_data}
result = []
for answer in answer_data:
quiz = quiz_map.get(answer.quiz_id)
if quiz:
result.append(
HistoryResultSchema(
quiz_id=str(quiz.id),
answer_id=str(answer.id),
title=quiz.title,
description=quiz.description,
total_correct=answer.total_correct,
total_question=quiz.total_quiz,
date=answer.answered_at.strftime("%Y-%m-%d %H:%M:%S"),
)
)
return result
def get_history_by_answer_id(self, answer_id: str):
answer = self.answer_repository.get_by_id(answer_id)
quiz = self.quiz_repository.get_by_id(answer.quiz_id)
total_solve_time = sum([a.time_spent for a in answer.answers])
question_results = []
for q in quiz.question_listings:
user_answer = next(
(a for a in answer.answers if a.question_index == q.index), None
)
question_results.append(
QuestionResult(
index=q.index,
question=q.question,
type=q.type,
target_answer=q.target_answer,
user_answer=user_answer.answer if user_answer else None,
is_correct=user_answer.is_correct if user_answer else None,
time_spent=user_answer.time_spent if user_answer else None,
options=q.options,
)
)
result = QuizHistoryResponse(
answer_id=str(answer.id),
quiz_id=str(quiz.id),
title=quiz.title,
description=quiz.description,
author_id=quiz.author_id,
answered_at=answer.answered_at.strftime("%d-%B-%Y"),
total_correct=answer.total_correct,
total_score=answer.total_score,
total_solve_time=total_solve_time,
question_listings=question_results,
)
return result
def get_session_history(self, session_id):
session_data = self.session_repository.find_by_session_id(session_id)
participants = []
for participant in session_data.participants:
answer = self.answer_repository.get_by_userid_and_sessionid(
user_id=participant["id"],
session_id=session_id,
)
user = self.user_repository.get_user_by_id(user_id=participant["id"])
participants.append(
{
"id": str(user.id),
"name": user.name,
"score": answer.total_score,
}
)
session_data.id = str(session_data.id)
session_data.participants = participants
session_data.created_at = DatetimeUtil.to_string(session_data.created_at)
session_data.started_at = DatetimeUtil.to_string(session_data.started_at)
session_data.ended_at = DatetimeUtil.to_string(session_data.ended_at)
return session_data

View File

@ -0,0 +1,51 @@
from app.repositories import (
NERSRLRepository,
QuestionGenerationRepository,
AnswerGenerationRepository,
)
import re
class QuestionGenerationService:
def __init__(
self,
ner_srl_repository: NERSRLRepository,
question_generate_repository: QuestionGenerationRepository,
answer_generate_repository: AnswerGenerationRepository,
):
self._ner_srl_repository = ner_srl_repository
self._question_generation_repository = question_generate_repository
self._answer_generation_repository = answer_generate_repository
def createQuizAutomate(self, sentence: str):
# Gunakan regex untuk split hanya pada titik yang diikuti spasi atau akhir kalimat,
# dan bukan bagian dari angka (contoh: 19.00 tidak dipisah)
split_pattern = r"\.(?=\s|$)(?!\d)"
# split sentence using regex
sentences = [s.strip() for s in re.split(split_pattern, sentence) if s.strip()]
results = []
for s in sentences:
result = self._ner_srl_repository.predict_sentence(s)
question = self._question_generation_repository.predict_question(
context=s,
ner=result["ner"],
tokens=result["tokens"],
srl=result["srl"],
q_type=1,
)
answer = self._answer_generation_repository.predict_answer(
context=s,
question=question,
ner=result["ner"],
tokens=result["tokens"],
srl=result["srl"],
q_type=1,
)
results.append({"qustion": question, "answer": answer})
return results

View File

@ -1,29 +1,98 @@
from typing import List from app.repositories import QuizRepository, UserRepository, SubjectRepository
from repositories import QuizRepository from app.schemas.requests import QuizCreateSchema
from schemas import QuizGetSchema from app.schemas.response import (
from exception import DataNotFoundException UserQuizListResponse,
from mapper import map_quiz_entity_to_schema ListingQuizResponse,
QuizGetSchema,
)
from app.exception import DataNotFoundException, ValidationException
from app.mapper import QuizMapper
from app.helpers import DatetimeUtil
class QuizService: class QuizService:
def __init__(self, quiz_repository=QuizRepository): def __init__(
self,
quiz_repository=QuizRepository,
user_repository=UserRepository,
subject_repository=SubjectRepository,
):
self.quiz_repository = quiz_repository self.quiz_repository = quiz_repository
self.user_repostory = user_repository
self.subject_repository = subject_repository
def get_quiz(self, quiz_id) -> QuizGetSchema: def get_quiz(self, quiz_id) -> QuizGetSchema:
data = self.quiz_repository.get_by_id(quiz_id) data = self.quiz_repository.get_by_id(quiz_id)
if data is None: if data is None:
raise DataNotFoundException("Quiz not found") raise DataNotFoundException("Quiz not found")
quiz_subject = self.subject_repository.get_by_id(data.subject_id)
return QuizMapper.map_quiz_entity_to_schema(data, quiz_subject)
return map_quiz_entity_to_schema(data) def search_quiz(
self, keyword: str, subject_id: str, page: int = 1, page_size: int = 10
) -> tuple[list[ListingQuizResponse], int]:
quizzes = self.quiz_repository.search_by_title_or_category(
keyword=keyword,
page=page,
page_size=page_size,
subject_id=subject_id,
)
total = self.quiz_repository.count_by_search(keyword)
mapped_quizzes = []
for quiz in quizzes:
author = self.user_repostory.get_user_by_id(user_id=quiz.author_id)
if author is None:
continue
mapped_quizzes.append(
QuizMapper.quiz_to_recomendation_mapper(
quiz_entity=quiz,
user_entity=author,
)
)
return mapped_quizzes, total
def get_user_quiz( def get_user_quiz(
self, user_id: str, page: int = 1, page_size: int = 10 self, user_id: str, page: int = 1, page_size: int = 10
) -> List[QuizGetSchema]: ) -> UserQuizListResponse:
quizzes = self.quiz_repository.get_by_user_id(user_id, page, page_size) quizzes = self.quiz_repository.get_by_user_id(user_id, page, page_size)
return [QuizGetSchema.model_validate(quiz) for quiz in quizzes] if not quizzes:
return UserQuizListResponse(total=0, quizzes=[])
def create_quiz(self, quiz_data): total_user_quiz = self.quiz_repository.count_by_user_id(user_id)
return self.quiz_repository.create(quiz_data)
user = self.user_repostory.get_user_by_id(user_id)
quiz_data = [
QuizMapper.quiz_to_recomendation_mapper(quiz, user) for quiz in quizzes
]
return UserQuizListResponse(total=total_user_quiz, quizzes=quiz_data)
def create_quiz(self, quiz_data: QuizCreateSchema):
total_time = 0
for question in quiz_data.question_listings:
if question.type == "option" and (
not question.options or len(question.options) != 4
):
raise ValidationException(
"Option type questions must have exactly 4 options."
)
total_time += question.duration
datetime_now = DatetimeUtil.now_iso()
data = QuizMapper.map_quiz_schema_to_entity(
schema=quiz_data,
datetime=datetime_now,
total_duration=total_time,
)
return self.quiz_repository.create(data)
def update_quiz(self, quiz_id, quiz_data): def update_quiz(self, quiz_id, quiz_data):
return self.quiz_repository.update(quiz_id, quiz_data) return self.quiz_repository.update(quiz_id, quiz_data)
@ -31,9 +100,19 @@ class QuizService:
def delete_quiz(self, quiz_id): def delete_quiz(self, quiz_id):
return self.quiz_repository.delete(quiz_id) return self.quiz_repository.delete(quiz_id)
def quiz_recommendation(self): def get_quiz_recommendation(self, page: int, limit: int):
data = self.quiz_repository
if data is None: data = self.quiz_repository.get_top_played_quizzes(page=page, limit=limit)
if not data:
raise DataNotFoundException("Quiz not found") raise DataNotFoundException("Quiz not found")
return map_quiz_entity_to_schema(data) result = []
for quiz in data:
author = self.user_repostory.get_user_by_id(user_id=quiz.author_id)
result.append(
QuizMapper.quiz_to_recomendation_mapper(
quiz_entity=quiz,
user_entity=author,
)
)
return result

View File

@ -0,0 +1,334 @@
from typing import Any, Dict
from uuid import uuid4
from app.repositories import (
SessionRepository,
UserRepository,
SessionMemoryRepository,
QuizRepository,
UserAnswerRepository,
QuizMemoryRepository,
AnswerMemoryRepository,
ScoreMemoryRepository,
)
from app.models.entities import SessionEntity, UserAnswerEntity, AnswerItemEntity
from app.helpers import DatetimeUtil
from flask_socketio import SocketIO
import time
from bson import ObjectId
class SessionService:
def __init__(
self,
session_mongo_repository: SessionRepository,
session_redis_repository: SessionMemoryRepository,
quiz_redis_repository: QuizMemoryRepository,
answer_redis_repository: AnswerMemoryRepository,
score_redis_repostory: ScoreMemoryRepository,
user_repository: UserRepository,
quiz_repository: QuizRepository,
answer_repository: UserAnswerRepository,
):
self.session_mongo_repository = session_mongo_repository
self.session_redis_repository = session_redis_repository
self.quiz_redis_repository = quiz_redis_repository
self.answer_redis_repository = answer_redis_repository
self.score_redis_repository = score_redis_repostory
self.user_repository = user_repository
self.quiz_repository = quiz_repository
self.answer_repository = answer_repository
def create_session(self, quiz_id: str, host_id: str, limit_participan: int) -> str:
generateed_code = uuid4().hex[:6].upper()
session = SessionEntity(
session_code=generateed_code,
quiz_id=quiz_id,
host_id=host_id,
created_at=DatetimeUtil.now_iso(),
limit_participan=limit_participan,
participants=[],
current_question_index=0,
is_active=True,
)
session_id = self.session_mongo_repository.insert(session)
session.id = session_id
self.session_redis_repository.create_session(session_id, session)
data = self.quiz_repository.get_by_id(quiz_id=quiz_id)
self.quiz_redis_repository.set_quiz_for_session(session_id, data)
return {
"session_id": session_id,
"session_code": generateed_code,
}
def join_session(self, session_code: str, user_id: str) -> dict:
user = self.user_repository.get_user_by_id(user_id)
session = self.session_redis_repository.find_session_by_code(session_code)
if session is None:
return None
session_id = session["id"]
is_existing_user = any(
u["id"] == user_id for u in session.get("participants", [])
)
session_quiz = self.quiz_redis_repository.get_quiz_for_session(session["id"])
quiz_info = {
"title": session_quiz.title,
"description": session_quiz.description,
"total_quiz": session_quiz.total_quiz,
"limit_duration": session_quiz.limit_duration,
}
if session["host_id"] == user_id:
return {
"session_id": session_id,
"is_admin": True,
"message": "admin joined",
"session_info": session,
"quiz_info": quiz_info,
}
if is_existing_user:
return {
"session_id": session_id,
"is_admin": False,
"user_id": str(user.id),
"username": user.name,
"user_pic": user.pic_url,
"session_info": session,
"quiz_info": quiz_info,
"new_user": not is_existing_user,
}
self.session_redis_repository.add_user_to_session(
session_id=session["id"],
user_data={
"id": str(user.id),
"username": user.name,
"user_pic": user.pic_url,
},
)
session = self.session_redis_repository.get_session(session["id"])
response = {
"session_id": session_id,
"is_admin": False,
"user_id": str(user.id),
"username": user.name,
"user_pic": user.pic_url,
"session_info": session if not is_existing_user else None,
"quiz_info": quiz_info,
"new_user": not is_existing_user,
}
return response
def leave_session(self, session_id: str, user_id: str) -> dict:
is_success = self.session_redis_repository.remove_user_from_session(
session_id, user_id
)
if is_success:
participant_left = self.session_redis_repository.get_user_in_session(
session_id
)
return {"is_success": True, "participants": participant_left}
return {"is_success": False}
def run_quiz_flow(self, session_id: str, socketio: SocketIO):
users = self.session_redis_repository.get_user_in_session(session_id)
quiz = self.quiz_redis_repository.get_quiz_for_session(session_id)
self.answer_redis_repository.initialize_empty_answers(
session_id=session_id,
user_ids=[u["id"] for u in users if "id" in u],
total_questions=quiz.total_quiz,
)
questions = quiz.question_listings
start_quiz = DatetimeUtil.now_iso()
time.sleep(2)
for q in questions:
print(f"\nMengirim pertanyaan {q.index} ke room {session_id}")
question_to_send = q.model_dump(exclude={"target_answer"})
socketio.emit("quiz_question", question_to_send, room=session_id)
time.sleep(q.duration)
usersNotAnswer = self.answer_redis_repository.auto_fill_incorrect_answers(
session_id=session_id,
question_index=q.index,
default_time_spent=q.duration,
)
for userId in usersNotAnswer:
self.score_redis_repository.update_user_score(
session_id=session_id,
user_id=userId,
correct=False,
)
socketio.emit(
"score_update",
{
"scores": self.get_ranked_scores(session_id),
},
room=session_id,
)
socketio.emit("clean_up", room=session_id)
self.summaryAllSessionData(session_id=session_id, start_time=start_quiz)
socketio.emit("quiz_done", room=session_id)
def submit_answer(
self,
session_id: str,
user_id: str,
question_index: int,
answer: Any,
time_spent: int,
) -> Dict[str, Any]:
quiz = self.quiz_redis_repository.get_quiz_for_session(session_id)
question = next(
(q for q in quiz.question_listings if q.index == question_index),
None,
)
if question is None:
raise ValueError(
f"Question {question_index} not found in session {session_id}"
)
is_correct = self._is_correct(question, answer)
print(answer)
self.answer_redis_repository.save_user_answer(
session_id=session_id,
user_id=user_id,
question_index=question_index,
answer=answer,
correct=is_correct,
time_spent=time_spent,
)
scores = self.score_redis_repository.update_user_score(
session_id=session_id,
user_id=user_id,
correct=is_correct,
)
return {
"user_id": user_id,
"question_index": question_index,
"answer": answer,
"correct": is_correct,
"scores": scores,
}
def get_ranked_scores(self, session_id: str):
raw = self.score_redis_repository.get_scores(session_id)
ranked = [
{
"user_id": uid,
"correct": v.get("correct", 0),
"incorrect": v.get("incorrect", 0),
"total_score": v.get("correct", 0) * 10,
}
for uid, v in raw.items()
]
ranked.sort(key=lambda x: x["total_score"], reverse=True)
return ranked
def _is_correct(self, q, ans) -> bool:
result = False
if q.type == "true_false":
result = str(ans).strip().lower() == str(q.target_answer).strip().lower()
elif q.type in ["multiple_choice", "option"]:
try:
result = int(ans) == int(q.target_answer)
except (ValueError, TypeError):
result = False
elif q.type == "fill_the_blank":
result = str(q.target_answer).strip().lower() in str(ans).strip().lower()
else:
result = False
# Print informasi evaluasi
print(f"Tipe Soal: {q.type}")
print(f"Jawaban User: {ans}")
print(f"Jawaban Benar: {q.target_answer}")
print(f"Hasil: {'Benar' if result else 'Salah'}\n")
return result
def summaryAllSessionData(self, session_id: str, start_time):
session = self.session_redis_repository.get_session(session_id=session_id)
now = DatetimeUtil.now_iso()
session["id"] = ObjectId(session["id"])
session["participants"] = [
{"id": user["id"], "joined_at": user["joined_at"]}
for user in session["participants"]
]
session["created_at"] = DatetimeUtil.from_iso(session["created_at"])
session["started_at"] = DatetimeUtil.from_iso(start_time)
session["ended_at"] = DatetimeUtil.from_iso(now)
newData = SessionEntity(**session)
newData.is_active = False
answers = self.answer_redis_repository.get_all_user_answers(
session_id=session_id
)
quiz = self.quiz_repository.get_by_id(newData.quiz_id)
self.quiz_repository.update_user_playing(
quiz_id=quiz.id, total_user=quiz.total_user_playing + len(answers)
)
self.session_mongo_repository.update(
session_id=session_id,
update_fields=newData,
)
for key, value_list in answers.items():
answer_items = []
total_correct = 0
for item in sorted(value_list, key=lambda x: x["question_index"]):
is_correct = item["is_true"]
if is_correct:
total_correct += 1
answer_item = AnswerItemEntity(
question_index=item["question_index"],
answer=item["answer"],
is_correct=is_correct,
time_spent=item["time_spent"],
)
answer_items.append(answer_item)
total_questions = len(value_list)
total_score = (
(total_correct / total_questions) * 100 if total_questions > 0 else 0.0
)
userAnswer = UserAnswerEntity(
user_id=key,
quiz_id=str(quiz.id),
session_id=session_id,
total_correct=total_correct,
total_score=round(total_score, 2),
answers=answer_items,
answered_at=newData.started_at,
)
self.answer_repository.create(userAnswer)
self.session_redis_repository.delete_session(session_id=session_id)
self.quiz_redis_repository.delete_quiz_for_session(session_id=session_id)
self.answer_redis_repository.delete_all_answers(session_id=session_id)
self.score_redis_repository.delete_scores(session_id=session_id)

View File

@ -0,0 +1,45 @@
from typing import List, Optional
from app.models.entities import SubjectEntity
from app.schemas.requests import SubjectCreateRequest, SubjectUpdateRequest
from app.schemas.response import GetSubjectResponse
from app.repositories import SubjectRepository
from app.mapper import SubjectMapper
class SubjectService:
def __init__(self, repository: SubjectRepository):
self.repository = repository
def create_subject(self, request: SubjectCreateRequest) -> str:
subject = SubjectMapper.to_entity(request)
return self.repository.create(subject)
def get_all_subjects(self) -> List[GetSubjectResponse]:
subjects = self.repository.get_all()
return [
GetSubjectResponse(
id=str(subject.id),
name=subject.name,
alias=subject.short_name,
description=subject.description,
)
for subject in subjects
]
def get_subject_by_id(self, subject_id: str) -> Optional[GetSubjectResponse]:
subject = self.repository.get_by_id(subject_id)
if subject:
return GetSubjectResponse(
id=str(subject.id),
name=subject.name,
alias=subject.short_name,
description=subject.description,
)
return None
def update_subject(self, subject_id: str, request: SubjectUpdateRequest) -> bool:
update_data = request.model_dump(exclude_unset=True)
return self.repository.update(subject_id, update_data)
def delete_subject(self, subject_id: str) -> bool:
return self.repository.delete(subject_id)

View File

@ -1,9 +1,12 @@
from flask import current_app from datetime import datetime
from repositories import UserRepository from app.repositories import UserRepository
from schemas import RegisterSchema from app.schemas import RegisterSchema
from mapper import UserMapper from app.schemas.requests import ProfileUpdateSchema
from exception import AlreadyExistException from app.schemas.response import UserResponseSchema
from werkzeug.security import generate_password_hash from app.mapper import UserMapper
from app.exception import AlreadyExistException, DataNotFoundException
from werkzeug.security import generate_password_hash, check_password_hash
from app.helpers import DatetimeUtil
class UserService: class UserService:
@ -23,3 +26,71 @@ class UserService:
data = UserMapper.from_register(user_data) data = UserMapper.from_register(user_data)
return self.user_repository.insert_user(data) return self.user_repository.insert_user(data)
def update_profile(self, new_profile: ProfileUpdateSchema):
user = self.user_repository.get_user_by_id(new_profile.id)
if not user:
raise DataNotFoundException(entity="User")
update_data = {}
if new_profile.name is not None:
update_data["name"] = new_profile.name
if new_profile.birth_date is not None:
update_data["birth_date"] = DatetimeUtil.from_string(
new_profile.birth_date, fmt="%d-%m-%Y"
)
if new_profile.locale is not None:
update_data["locale"] = new_profile.locale
if new_profile.phone is not None:
update_data["phone"] = new_profile.phone
if not update_data:
return True
update_data["updated_at"] = DatetimeUtil.now_iso()
return self.user_repository.update_user(new_profile.id, update_data)
def change_password(self, user_id: str, current_password: str, new_password: str):
user = self.user_repository.get_user_by_id(user_id)
if not user:
raise DataNotFoundException(entity="User")
if not user.password or not check_password_hash(
user.password, current_password
):
raise ValueError("Current password is incorrect")
encrypted_password = generate_password_hash(new_password)
update_data = {
"password": encrypted_password,
"updated_at": DatetimeUtil.now_iso(),
}
return self.user_repository.update_user(user_id, update_data)
def get_user_by_id(self, user_id: str):
user = self.user_repository.get_user_by_id(user_id)
if not user:
raise DataNotFoundException(entity="User")
user_dict = user.model_dump()
if "password" in user_dict:
del user_dict["password"]
if "id" in user_dict:
user_dict["id"] = str(user.id)
if "birth_date" in user_dict and user_dict["birth_date"]:
user_dict["birth_date"] = DatetimeUtil.to_string(
user_dict["birth_date"], fmt="%d-%m-%Y"
)
if "created_at" in user_dict and user_dict["created_at"]:
user_dict["created_at"] = DatetimeUtil.to_string(user_dict["created_at"])
if "updated_at" in user_dict and user_dict["updated_at"]:
user_dict["updated_at"] = DatetimeUtil.to_string(user_dict["updated_at"])
return UserResponseSchema(**user_dict)

Some files were not shown because too many files have changed in this diff Show More