Compare commits

..

18 Commits

Author SHA1 Message Date
akhdanre 9fd0caad19 feat: adding exception and and logger handler 2025-05-27 16:02:14 +07:00
akhdanre 43e7fcca30 feat: adding log info 2025-05-27 15:54:10 +07:00
akhdanre 4203db07af fix: session data response 2025-05-27 13:57:31 +07:00
akhdanre cbdce3da20 feat: add language filter for quizzes to match user locale 2025-05-26 17:56:31 +07:00
akhdanre d46b1e934e fix: validation answer input 2025-05-26 14:40:50 +07:00
akhdanre b3069687d0 fix: sorting by newest answer first 2025-05-26 11:40:14 +07:00
akhdanre 55f9e15468 feat: fix the response lgoin user data 2025-05-25 22:06:25 +07:00
akhdanre 82faab1cc5 fix: logic on the get quiz recomendation 2025-05-25 19:33:37 +07:00
akhdanre faf6a0f4b9 feat: adding recomendation 2025-05-25 19:08:51 +07:00
Akhdan Robbani a099da34f4
Merge pull request #5 from Akhdanre/feat/swagger-documentation
swagger and fix issue
2025-05-25 12:56:22 +07:00
akhdanre 324173ec84 feat: adding user stat request 2025-05-25 12:54:27 +07:00
akhdanre 7ba31325eb fix: subject add data and some documentation 2025-05-24 15:00:19 +07:00
akhdanre 9c1793088a feat: adding docuemntation 2025-05-24 14:13:34 +07:00
akhdanre 155e74678f feat: merge develop 2025-05-23 15:13:20 +07:00
akhdanre 599cd689f6 feat: adding documentation for user 2025-03-21 01:20:32 +07:00
akhdanre 71a3091df0 fix: prefix on the url path 2025-03-21 01:12:24 +07:00
akhdanre 4804495371 feat: setup swagger for documentation 2025-03-21 01:05:19 +07:00
akhdanre 687d31d3f1 feat: setup swagger for documentation 2025-03-21 00:54:44 +07:00
35 changed files with 1798 additions and 259 deletions

131
.gitignore vendored
View File

@ -1,14 +1,137 @@
# Ignore only __pycache__ inside the app directory # Byte-compiled / optimized / DLL files
app/**/__pycache__/ __pycache__/
*.py[cod]
*$py.class
# Only ignore __pycache__ inside app
app/**/__pycache__/
# Ignore compiled Python files inside app # Ignore compiled Python files inside app
app/**/*.pyc app/**/*.pyc
app/**/*.pyo app/**/*.pyo
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Environments
.env .env
.venv .venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Logs
logs/ logs/
*.log
# Django stuff:
*.sqlite3
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre
.pyre/
# pytype
.pytype/
# Cython debug symbols
cython_debug/
# VS Code
.vscode/
# JetBrains IDEs
.idea/
*.iml
# MacOS
.DS_Store
# Thumbs.db (Windows)
Thumbs.db
ehthumbs.db
# Others
*.bak
*.swp
*.swo
*~
# Local dev files
local_settings.py

5
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,5 @@
{
"yaml.schemas": {
"openapi:v3": "file:///mnt/disc1/code/thesis_quiz_project/quiz_maker/docs/rest_api_docs.yaml"
}
}

View File

@ -2,6 +2,7 @@ from .default import default_blueprint
from .auth import auth_blueprint from .auth import auth_blueprint
from .user import user_blueprint from .user import user_blueprint
from .swagger import swagger_blueprint
from .quiz import quiz_bp from .quiz import quiz_bp
from .history import history_blueprint from .history import history_blueprint
from .subject import subject_blueprint from .subject import subject_blueprint
@ -15,6 +16,7 @@ __all__ = [
"history_blueprint", "history_blueprint",
"subject_blueprint", "subject_blueprint",
"session_bp", "session_bp",
"swagger_blueprint",
] ]

View File

@ -17,8 +17,10 @@ def user_history(
@history_blueprint.route("/detail/<answer_id>", methods=["GET"]) @history_blueprint.route("/detail/<answer_id>", methods=["GET"])
@inject @inject
def user_detail_history( def user_detail_history(
answer_id, controller: HistoryController = Provide[Container.history_controller] answer_id: str,
controller: HistoryController = Provide[Container.history_controller],
): ):
print(answer_id)
return controller.get_detail_quiz_history(answer_id) return controller.get_detail_quiz_history(answer_id)

View File

@ -56,14 +56,36 @@ def get_answer(controller: QuizController = Provide[Container.quiz_controller]):
) )
@quiz_bp.route("/recomendation", methods=["GET"]) @quiz_bp.route("/populer", methods=["GET"])
@inject @inject
def get_quiz_recommendation( def get_quiz_populer(
controller: QuizController = Provide[Container.quiz_controller], controller: QuizController = Provide[Container.quiz_controller],
): ):
page = request.args.get("page") page = request.args.get("page")
limit = request.args.get("limit") limit = request.args.get("limit")
return controller.get_quiz_recommendation(page=page, limit=limit) lang_code = request.args.get("lang_code") or "id"
return controller.get_quiz_populer(
page=page,
limit=limit,
lang_code=lang_code,
)
@quiz_bp.route("/recommendation", methods=["GET"])
@inject
def get_quiz_recommendation(
controller: QuizController = Provide[Container.quiz_controller],
):
user_id = request.args.get("user_id")
page = request.args.get("page")
limit = request.args.get("limit")
lang_code = request.args.get("lang_code") or "id"
return controller.get_quiz_recommendation(
user_id=user_id,
page=page,
limit=limit,
lang_code=lang_code,
)
@quiz_bp.route("/user/<user_id>", methods=["GET"]) @quiz_bp.route("/user/<user_id>", methods=["GET"])
@ -83,6 +105,7 @@ def search_quiz(controller: QuizController = Provide[Container.quiz_controller])
subject_id = request.args.get("subject_id") subject_id = request.args.get("subject_id")
page = int(request.args.get("page", 1)) page = int(request.args.get("page", 1))
limit = int(request.args.get("limit", 10)) limit = int(request.args.get("limit", 10))
lang_code = request.args.get("lang_code") or "id"
return controller.search_quiz( return controller.search_quiz(
keyword=keyword, subject_id=subject_id, page=page, limit=limit keyword=keyword, subject_id=subject_id, page=page, limit=limit

23
app/blueprints/swagger.py Normal file
View File

@ -0,0 +1,23 @@
from flask import Blueprint, jsonify, send_file
from flask_swagger_ui import get_swaggerui_blueprint
import os
swagger_blueprint = Blueprint("swagger", __name__)
SWAGGER_URL = "/swagger"
API_URL = "http://127.0.0.1:5000/swagger/docs"
swagger_ui_blueprint = get_swaggerui_blueprint(
SWAGGER_URL,
API_URL,
config={"app_name": "Quiz Maker API"},
)
swagger_blueprint.register_blueprint(swagger_ui_blueprint)
@swagger_blueprint.route("/swagger/docs")
def serve_openapi():
"""Serve the OpenAPI spec from a file."""
docs_path = os.path.abspath("docs/rest_api_docs.yaml")
return send_file(docs_path, mimetype="application/yaml")

View File

@ -38,3 +38,11 @@ def get_user(
user_id, user_controller: UserController = Provide[Container.user_controller] user_id, user_controller: UserController = Provide[Container.user_controller]
): ):
return user_controller.get_user_by_id(user_id) return user_controller.get_user_by_id(user_id)
@user_blueprint.route("/user/status/<string:user_id>", methods=["GET"])
@inject
def get_user_stat(
user_id, user_controller: UserController = Provide[Container.user_controller]
):
return user_controller.user_stat(user_id)

View File

@ -9,6 +9,7 @@ class Config:
# Flask Environment Settings # Flask Environment Settings
FLASK_ENV = os.getenv("FLASK_ENV", "development") FLASK_ENV = os.getenv("FLASK_ENV", "development")
DEBUG = os.getenv("DEBUG", "False").lower() in ("true", "1", "t") DEBUG = os.getenv("DEBUG", "False").lower() in ("true", "1", "t")
API_VERSION = os.getenv("API_VERSION", "v1")
SECRET_KEY = os.getenv("SECRET_KEY", "your_secret_key") SECRET_KEY = os.getenv("SECRET_KEY", "your_secret_key")
# MongoDB Settings # MongoDB Settings

View File

@ -9,9 +9,14 @@ class LoggerConfig:
LOG_DIR = "logs" # Define the log directory LOG_DIR = "logs" # Define the log directory
@staticmethod @staticmethod
def init_logger(app): def init_logger(app, production_mode=False):
"""Initializes separate log files for INFO, ERROR, and WARNING levels.""" """
Initializes separate log files for INFO, ERROR, and WARNING levels.
Args:
app: Flask application instance
production_mode (bool): If True, disables console output and only logs to files
"""
# Ensure the logs directory exists # Ensure the logs directory exists
if not os.path.exists(LoggerConfig.LOG_DIR): if not os.path.exists(LoggerConfig.LOG_DIR):
os.makedirs(LoggerConfig.LOG_DIR) os.makedirs(LoggerConfig.LOG_DIR)
@ -20,23 +25,37 @@ class LoggerConfig:
for handler in app.logger.handlers[:]: for handler in app.logger.handlers[:]:
app.logger.removeHandler(handler) app.logger.removeHandler(handler)
# Disable propagation to root logger to prevent console output in production
if production_mode:
app.logger.propagate = False
# Create separate loggers # Create separate loggers
info_logger = LoggerConfig._setup_logger( info_handler = LoggerConfig._setup_logger(
"info_logger", "info.log", logging.INFO, logging.WARNING "info_logger", "info.log", logging.INFO, logging.WARNING
) )
error_logger = LoggerConfig._setup_logger( error_handler = LoggerConfig._setup_logger(
"error_logger", "error.log", logging.ERROR, logging.CRITICAL "error_logger", "error.log", logging.ERROR, logging.CRITICAL
) )
warning_logger = LoggerConfig._setup_logger( warning_handler = LoggerConfig._setup_logger(
"warning_logger", "warning.log", logging.WARNING, logging.WARNING "warning_logger", "warning.log", logging.WARNING, logging.WARNING
) )
# Attach handlers to Flask app logger # Attach handlers to Flask app logger
app.logger.addHandler(info_logger) app.logger.addHandler(info_handler)
app.logger.addHandler(error_logger) app.logger.addHandler(error_handler)
app.logger.addHandler(warning_logger) app.logger.addHandler(warning_handler)
app.logger.setLevel(logging.DEBUG) # Set lowest level to capture all logs app.logger.setLevel(logging.DEBUG) # Set lowest level to capture all logs
# Add console handler only in development mode
if not production_mode:
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(message)s"
)
console_handler.setFormatter(console_formatter)
app.logger.addHandler(console_handler)
app.logger.info("Logger has been initialized for Flask application.") app.logger.info("Logger has been initialized for Flask application.")
@staticmethod @staticmethod
@ -44,11 +63,14 @@ class LoggerConfig:
"""Helper method to configure loggers for specific levels.""" """Helper method to configure loggers for specific levels."""
logger = logging.getLogger(name) logger = logging.getLogger(name)
logger.setLevel(level) logger.setLevel(level)
log_file_path = os.path.join(LoggerConfig.LOG_DIR, filename) log_file_path = os.path.join(LoggerConfig.LOG_DIR, filename)
log_handler = RotatingFileHandler(log_file_path, maxBytes=100000, backupCount=3) log_handler = RotatingFileHandler(log_file_path, maxBytes=100000, backupCount=3)
log_handler.setLevel(level) log_handler.setLevel(level)
log_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") log_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
log_handler.setFormatter(log_formatter) log_handler.setFormatter(log_formatter)
log_handler.addFilter(lambda record: level <= record.levelno <= max_level) log_handler.addFilter(lambda record: level <= record.levelno <= max_level)
logger.addHandler(log_handler) logger.addHandler(log_handler)
return log_handler return log_handler

View File

@ -43,9 +43,9 @@ class QuizController:
except Exception as e: except Exception as e:
return make_error_response(e) return make_error_response(e)
def quiz_recomendation(self): def quiz_populer(self):
try: try:
result = self.quiz_service.get_quiz_recommendation() result = self.quiz_service.get_quiz_populer()
if not result: if not result:
return make_response(message="Quiz not found", status_code=404) return make_response(message="Quiz not found", status_code=404)
return make_response(message="Quiz Found", data=result.model_dump()) return make_response(message="Quiz Found", data=result.model_dump())
@ -94,14 +94,39 @@ class QuizController:
except Exception as e: except Exception as e:
return make_error_response(e) return make_error_response(e)
def get_quiz_recommendation(self, page, limit): def get_quiz_populer(self, page, limit, lang_code):
try: try:
page = int(page) if page is not None else 1 page = int(page) if page is not None else 1
limit = int(limit) if limit is not None else 3 limit = int(limit) if limit is not None else 3
result = self.quiz_service.get_quiz_recommendation(page=page, limit=limit) result = self.quiz_service.get_quiz_populer(
return make_response( page=page,
message="success retrieve recommendation quiz", data=result limit=limit,
lang_code=lang_code,
) )
return make_response(message="success retrieve populer quiz", data=result)
except DataNotFoundException as e:
return make_response(message=e.message, status_code=e.status_code)
except ValueError as e:
return make_response(message=str(e), data=None, status_code=400)
except ValidationError as e:
return make_response(
message="validation error", data=json.loads(e.json()), status_code=400
)
except Exception as e:
return make_error_response(e)
def get_quiz_recommendation(self, user_id, page, limit, lang_code):
try:
page = int(page) if page is not None else 1
limit = int(limit) if limit is not None else 3
result = self.quiz_service.get_quiz_recommendation(
user_id=user_id,
page=page,
limit=limit,
lang_code=lang_code,
)
return make_response(message="success retrieve populer quiz", data=result)
except DataNotFoundException as e: except DataNotFoundException as e:
return make_response(message=e.message, status_code=e.status_code) return make_response(message=e.message, status_code=e.status_code)
except ValueError as e: except ValueError as e:

View File

@ -13,6 +13,7 @@ class SessionController(MethodView):
required_fields = [ required_fields = [
"quiz_id", "quiz_id",
"host_id", "host_id",
"room_name",
"limit_participan", "limit_participan",
] ]
for field in required_fields: for field in required_fields:
@ -22,6 +23,7 @@ class SessionController(MethodView):
session = self.session_service.create_session( session = self.session_service.create_session(
quiz_id=data["quiz_id"], quiz_id=data["quiz_id"],
host_id=data["host_id"], host_id=data["host_id"],
room_name=data["room_name"],
limit_participan=data["limit_participan"], limit_participan=data["limit_participan"],
) )

View File

@ -1,5 +1,5 @@
from flask_socketio import SocketIO, emit, join_room, leave_room from flask_socketio import SocketIO, emit, join_room, leave_room
from flask import request from flask import request, current_app
from app.services import SessionService from app.services import SessionService
import threading import threading
@ -17,186 +17,215 @@ class SocketController:
def _register_events(self): def _register_events(self):
@self.socketio.on("connect") @self.socketio.on("connect")
def on_connect(): def on_connect():
# print(f"Client connected: {request.sid}") try:
emit("connection_response", {"status": "connected", "sid": request.sid}) current_app.logger.info(f"Client connected: {request.sid}")
emit("connection_response", {"status": "connected", "sid": request.sid})
except Exception as e:
emit("error", {"message": f"Connect error: {str(e)}"})
current_app.logger.error(f"Connect error: {str(e)}")
@self.socketio.on("disconnect") @self.socketio.on("disconnect")
def on_disconnect(): def on_disconnect():
# print(f"Client disconnected: {request.sid}") try:
pass current_app.logger.info(f"Client disconnected: {request.sid}")
except Exception as e:
emit("error", {"message": f"Disconnect error: {str(e)}"})
current_app.logger.error(f"error: {str(e)}")
@self.socketio.on("join_room") @self.socketio.on("join_room")
def handle_join_room(data): def handle_join_room(data):
session_code = data.get("session_code") try:
user_id = data.get("user_id") session_code = data.get("session_code")
user_id = data.get("user_id")
if not session_code or not user_id: if not session_code or not user_id:
emit("error", {"message": "session_code and user_id are required"}) emit("error", {"message": "session_code and user_id are required"})
return return
session = self.session_service.join_session( session = self.session_service.join_session(session_code, user_id)
session_code=session_code, if session is None:
user_id=user_id, emit(
) "error",
{"message": "Failed to join session or session inactive"},
)
return
if session is None: session_id = session["session_id"]
emit("error", {"message": "Failed to join session or session inactive"}) join_room(session_id)
return
session_id = session["session_id"] message = (
"Admin has joined the room."
if session["is_admin"]
else f"User {session['username']} has joined the room."
)
join_room(session_id) current_app.logger.info(f"Client joined: {message}")
if session["is_admin"]:
message = "Admin has joined the room."
else:
message = f"User {session['username']} has joined the room."
emit(
"room_message",
{
"type": "join",
"message": message,
"room": session_id,
"argument": "adm_update",
"data": {
"session_info": session["session_info"],
"quiz_info": session["quiz_info"],
},
},
to=request.sid,
)
emit(
"room_message",
{
"type": "participan_join",
"message": message,
"room": session_id,
"argument": "adm_update",
"data": {
"participants": session["session_info"]["participants"],
},
},
room=session_id,
skip_sid=request.sid,
)
@self.socketio.on("leave_room")
def handle_leave_room(data):
session_id = data.get("session_id")
user_id = data.get("user_id")
username = data.get("username", "anonymous")
leave_result = self.session_service.leave_session(
session_id=session_id,
user_id=user_id,
)
leave_room(session_id)
if leave_result["is_success"]:
emit( emit(
"room_message", "room_message",
{ {
"type": "participan_leave", "type": "join",
"message": f"{username} has left the room.", "message": message,
"room": session_id, "room": session_id,
"argument": "adm_update", "argument": "adm_update",
"data": { "data": {
"participants": leave_result["participants"], "session_info": session["session_info"],
"quiz_info": session["quiz_info"],
},
},
to=request.sid,
)
emit(
"room_message",
{
"type": "participan_join",
"message": message,
"room": session_id,
"argument": "adm_update",
"data": {
"participants": session["session_info"]["participants"],
}, },
}, },
room=session_id, room=session_id,
skip_sid=request.sid, skip_sid=request.sid,
) )
emit(
"room_message", except Exception as e:
{ emit("error", {"message": f"Join room error: {str(e)}"})
"type": "leave", current_app.logger.error(f"error: {str(e)}")
"message": f"{username} has left the room.",
"room": session_id, @self.socketio.on("leave_room")
"argument": "adm_update", def handle_leave_room(data):
"data": None, try:
}, session_id = data.get("session_id")
room=session_id, user_id = data.get("user_id")
to=request.sid, username = data.get("username", "anonymous")
)
leave_result = self.session_service.leave_session(session_id, user_id)
leave_room(session_id)
if leave_result["is_success"]:
emit(
"room_message",
{
"type": "participan_leave",
"message": f"{username} has left the room.",
"room": session_id,
"argument": "adm_update",
"data": {
"participants": leave_result["participants"],
},
},
room=session_id,
skip_sid=request.sid,
)
emit(
"room_message",
{
"type": "leave",
"message": f"{username} has left the room.",
"room": session_id,
"argument": "adm_update",
"data": None,
},
room=session_id,
to=request.sid,
)
except Exception as e:
emit("error", {"message": f"Leave room error: {str(e)}"})
current_app.logger.error(f"error: {str(e)}")
@self.socketio.on("send_message") @self.socketio.on("send_message")
def on_send_message(data): def on_send_message(data):
session_code = data.get("session_id") try:
message = data.get("message") session_code = data.get("session_id")
username = data.get("username", "anonymous") message = data.get("message")
emit( username = data.get("username", "anonymous")
"receive_message",
{"message": message, "from": username}, emit(
room=session_code, "receive_message",
) {"message": message, "from": username},
room=session_code,
)
except Exception as e:
emit("error", {"message": f"Send message error: {str(e)}"})
current_app.logger.error(f"error: {str(e)}")
@self.socketio.on("end_session") @self.socketio.on("end_session")
def handle_end_session(data): def handle_end_session(data):
session_code = data.get("session_id") try:
user_id = data.get("user_id") session_code = data.get("session_id")
if not session_code or not user_id: user_id = data.get("user_id")
emit("error", {"message": "session_id and user_id required"}) if not session_code or not user_id:
return emit("error", {"message": "session_id and user_id required"})
return
# Validasi user berhak mengakhiri session self.session_service.end_session(
self.session_service.end_session(session_id=session_code, user_id=user_id) session_id=session_code, user_id=user_id
)
# Bersihkan semua data session di Redis for key in [
for key in [ self._answers_key(session_code),
self._answers_key(session_code), self._scores_key(session_code),
self._scores_key(session_code), self._questions_key(session_code),
self._questions_key(session_code), ]:
]: self.redis_repo.delete_key(key)
self.redis_repo.delete_key(key)
emit( emit(
"room_closed", "room_closed",
{"message": "Session has ended.", "room": session_code}, {"message": "Session has ended.", "room": session_code},
room=session_code, room=session_code,
) )
except Exception as e:
emit("error", {"message": f"End session error: {str(e)}"})
current_app.logger.error(f"error: {str(e)}")
@self.socketio.on("start_quiz") @self.socketio.on("start_quiz")
def handle_start_quiz(data): def handle_start_quiz(data):
session_id = data.get("session_id") try:
if not session_id: session_id = data.get("session_id")
emit("error", {"message": "session_id is required"}) if not session_id:
return emit("error", {"message": "session_id is required"})
return
emit("quiz_started", {"message": "Quiz has started!"}, room=session_id) emit("quiz_started", {"message": "Quiz has started!"}, room=session_id)
threading.Thread( threading.Thread(
target=self.session_service.run_quiz_flow, target=self.session_service.run_quiz_flow,
args=(session_id, self.socketio), args=(session_id, self.socketio),
daemon=True, daemon=True,
).start() ).start()
except Exception as e:
emit("error", {"message": f"Start quiz error: {str(e)}"})
current_app.logger.error(f"error: {str(e)}")
@self.socketio.on("submit_answer") @self.socketio.on("submit_answer")
def handle_submit_answer(data): def handle_submit_answer(data):
session_id = data.get("session_id")
user_id = data.get("user_id")
question_index = data.get("question_index")
user_answer = data.get("answer")
time_spent = data.get("time_spent")
if not all(
[
session_id,
user_id,
question_index is not None,
user_answer is not None,
time_spent is not None,
]
):
emit(
"error",
{
"message": "session_id, user_id, question_index, and answer are required"
},
)
return
try: try:
session_id = data.get("session_id")
user_id = data.get("user_id")
question_index = data.get("question_index")
user_answer = data.get("answer")
time_spent = data.get("time_spent")
if not all(
[
session_id,
user_id,
question_index is not None,
user_answer is not None,
time_spent is not None,
]
):
emit(
"error",
{
"message": "session_id, user_id, question_index, and answer are required"
},
)
return
result = self.session_service.submit_answer( result = self.session_service.submit_answer(
session_id=session_id, session_id=session_id,
user_id=user_id, user_id=user_id,
@ -204,25 +233,29 @@ class SocketController:
answer=user_answer, answer=user_answer,
time_spent=time_spent, time_spent=time_spent,
) )
emit(
"answer_submitted",
{
"question_index": result["question_index"],
"answer": result["answer"],
"correct": result["correct"],
"score": result["scores"],
},
to=request.sid,
)
emit(
"score_update",
{
"scores": self.session_service.get_ranked_scores(session_id),
},
room=session_id,
)
except ValueError as exc: except ValueError as exc:
emit("error", {"message": str(exc)}) emit("error", {"message": str(exc)})
return current_app.logger.error(f"error: {str(exc)}")
except Exception as e:
emit( emit("error", {"message": f"Submit answer error: {str(e)}"})
"answer_submitted", current_app.logger.error(f"error: {str(e)}")
{
"question_index": result["question_index"],
"answer": result["answer"],
"correct": result["correct"],
"score": result["scores"],
},
to=request.sid,
)
emit(
"score_update",
{
"scores": self.session_service.get_ranked_scores(session_id),
},
room=session_id,
)

View File

@ -1,5 +1,6 @@
from app.services.subject_service import SubjectService from app.services.subject_service import SubjectService
from app.helpers import make_response, make_error_response from app.helpers import make_response, make_error_response
from app.schemas.requests import SubjectCreateRequest
class SubjectController: class SubjectController:
@ -8,7 +9,8 @@ class SubjectController:
def create(self, req_body): def create(self, req_body):
try: try:
new_id = self.service.create_subject(req_body) data = SubjectCreateRequest(**req_body)
new_id = self.service.create_subject(data)
return make_response(message="Subject created", data={"id": new_id}) return make_response(message="Subject created", data={"id": new_id})
except Exception as e: except Exception as e:
return make_error_response(e) return make_error_response(e)

View File

@ -114,3 +114,16 @@ class UserController:
message="An internal server error occurred. Please try again later.", message="An internal server error occurred. Please try again later.",
status_code=500, status_code=500,
) )
def user_stat(self, user_id):
try:
response = self.user_service.get_user_status(user_id)
return make_response(message="Success retrive user stat", data=response)
except Exception as e:
current_app.logger.error(
f"Error while changing password: {str(e)}", exc_info=True
)
return make_response(
message="An internal server error occurred. Please try again later.",
status_code=500,
)

View File

@ -66,6 +66,8 @@ class Container(containers.DeclarativeContainer):
user_service = providers.Factory( user_service = providers.Factory(
UserService, UserService,
user_repository, user_repository,
answer_repository,
quiz_repository,
) )
quiz_service = providers.Factory( quiz_service = providers.Factory(
@ -73,7 +75,9 @@ class Container(containers.DeclarativeContainer):
quiz_repository, quiz_repository,
user_repository, user_repository,
subject_repository, subject_repository,
answer_repository,
) )
answer_service = providers.Factory( answer_service = providers.Factory(
AnswerService, AnswerService,
answer_repository, answer_repository,

View File

@ -16,6 +16,7 @@ from app.blueprints import (
history_blueprint, history_blueprint,
subject_blueprint, subject_blueprint,
session_bp, session_bp,
swagger_blueprint,
) )
from app.database import init_db from app.database import init_db
from redis import Redis from redis import Redis
@ -24,7 +25,7 @@ from redis import Redis
def createApp() -> tuple[Flask, SocketIO]: def createApp() -> tuple[Flask, SocketIO]:
app = Flask(__name__) app = Flask(__name__)
app.config.from_object(Config) app.config.from_object(Config)
LoggerConfig.init_logger(app) LoggerConfig.init_logger(app, not Config.DEBUG)
logging.basicConfig( logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s" level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
@ -65,6 +66,7 @@ def createApp() -> tuple[Flask, SocketIO]:
) )
app.register_blueprint(default_blueprint) app.register_blueprint(default_blueprint)
app.register_blueprint(swagger_blueprint)
app.register_blueprint(auth_blueprint, url_prefix="/api") app.register_blueprint(auth_blueprint, url_prefix="/api")
app.register_blueprint(user_blueprint, url_prefix="/api") app.register_blueprint(user_blueprint, url_prefix="/api")
app.register_blueprint(quiz_bp, url_prefix="/api/quiz") app.register_blueprint(quiz_bp, url_prefix="/api/quiz")

View File

@ -67,6 +67,7 @@ class QuizMapper:
description=schema.description, description=schema.description,
is_public=schema.is_public, is_public=schema.is_public,
date=datetime, date=datetime,
language_code=schema.lang_code,
total_quiz=len(schema.question_listings), total_quiz=len(schema.question_listings),
limit_duration=total_duration, limit_duration=total_duration,
question_listings=[ question_listings=[
@ -76,7 +77,7 @@ class QuizMapper:
) )
@staticmethod @staticmethod
def quiz_to_recomendation_mapper( def quiz_to_populer_mapper(
quiz_entity: QuizEntity, quiz_entity: QuizEntity,
user_entity: UserEntity, user_entity: UserEntity,
) -> ListingQuizResponse: ) -> ListingQuizResponse:

View File

@ -49,9 +49,16 @@ class UserMapper:
email=user.email, email=user.email,
name=user.name, name=user.name,
birth_date=( birth_date=(
DatetimeUtil.to_string(user.birth_date) if user.birth_date else None DatetimeUtil.to_string(user.birth_date, fmt="%d-%m-%Y")
if user.birth_date
else None
), ),
pic_url=user.pic_url, pic_url=user.pic_url,
phone=user.phone, phone=user.phone,
locale=user.locale, locale=user.locale,
created_at=(
DatetimeUtil.to_string(user.created_at, fmt="%d-%m-%Y")
if user.created_at
else None
),
) )

View File

@ -14,8 +14,9 @@ class QuizEntity(BaseModel):
is_public: bool = False is_public: bool = False
date: datetime date: datetime
total_quiz: int = 0 total_quiz: int = 0
limit_duration: Optional[int] = 0 # in limit_duration: Optional[int] = 0
total_user_playing: int = 0 total_user_playing: int = 0
language_code: Optional[str] = "id"
question_listings: Optional[list[QuestionItemEntity]] = [] question_listings: Optional[list[QuestionItemEntity]] = []
class ConfigDict: class ConfigDict:

View File

@ -9,6 +9,7 @@ class SessionEntity(BaseModel):
session_code: str session_code: str
quiz_id: str quiz_id: str
host_id: str host_id: str
room_name: str
created_at: datetime = Field(default_factory=datetime.now) created_at: datetime = Field(default_factory=datetime.now)
started_at: datetime | None = None started_at: datetime | None = None
ended_at: datetime | None = None ended_at: datetime | None = None

View File

@ -7,6 +7,7 @@ from tensorflow.keras.preprocessing.text import tokenizer_from_json # type: ign
import re import re
class AnswerGenerationRepository: class AnswerGenerationRepository:
MODEL_PATH = "app/lstm_model/question_generation/qa_lstm_model_final_v2.keras" MODEL_PATH = "app/lstm_model/question_generation/qa_lstm_model_final_v2.keras"
TOKENIZER_PATH = "app/lstm_model/question_generation/qa_tokenizers_v2.json" TOKENIZER_PATH = "app/lstm_model/question_generation/qa_tokenizers_v2.json"

View File

@ -38,7 +38,7 @@ class UserAnswerRepository:
return list(result) return list(result)
def get_by_user(self, user_id: str) -> list[UserAnswerEntity]: def get_by_user(self, user_id: str) -> list[UserAnswerEntity]:
result = self.collection.find({"user_id": user_id}) result = self.collection.find({"user_id": user_id}).sort("answered_at", -1)
return [UserAnswerEntity(**doc) for doc in result] return [UserAnswerEntity(**doc) for doc in result]
def get_by_session(self, session_id: str) -> List[dict]: def get_by_session(self, session_id: str) -> List[dict]:

View File

@ -20,34 +20,6 @@ class QuizRepository:
return QuizEntity(**data) return QuizEntity(**data)
return None return None
# def search_by_title_or_category(
# self, keyword: str, page: int, page_size: int
# ) -> List[QuizEntity]:
# skip = (page - 1) * page_size
# pipeline = [
# {
# "$lookup": {
# "from": "category",
# "localField": "category_id",
# "foreignField": "_id",
# "as": "category_info",
# }
# },
# {"$unwind": "$category_info"},
# {
# "$match": {
# "$or": [
# {"title": {"$regex": keyword, "$options": "i"}},
# {"category_info.name": {"$regex": keyword, "$options": "i"}},
# ]
# }
# },
# {"$skip": skip},
# {"$limit": page_size},
# ]
# cursor = self.collection.aggregate(pipeline)
# return [QuizEntity(**doc) for doc in cursor]
def search_by_title_or_category( def search_by_title_or_category(
self, keyword: str, subject_id: Optional[str], page: int, page_size: int self, keyword: str, subject_id: Optional[str], page: int, page_size: int
) -> List[QuizEntity]: ) -> List[QuizEntity]:
@ -86,7 +58,6 @@ class QuizRepository:
object_ids = [ObjectId(qid) for qid in quiz_ids] object_ids = [ObjectId(qid) for qid in quiz_ids]
cursor = self.collection.find({"_id": {"$in": object_ids}}) cursor = self.collection.find({"_id": {"$in": object_ids}})
datas = list(cursor) datas = list(cursor)
if not datas: if not datas:
return None return None
@ -127,13 +98,58 @@ class QuizRepository:
return self.collection.count_documents({"author_id": user_id}) return self.collection.count_documents({"author_id": user_id})
def get_top_played_quizzes( def get_top_played_quizzes(
self, page: int = 1, limit: int = 3, is_public: bool = True self,
page: int = 1,
limit: int = 3,
is_public: bool = True,
lang_code: str = "id",
) -> List[QuizEntity]: ) -> List[QuizEntity]:
skip = (page - 1) * limit skip = (page - 1) * limit
cursor = ( cursor = (
self.collection.find({"is_public": is_public}) self.collection.find(
{
"is_public": is_public,
"language_code": lang_code,
}
)
.sort("total_user_playing", -1) .sort("total_user_playing", -1)
.skip(skip) .skip(skip)
.limit(limit) .limit(limit)
) )
return [QuizEntity(**doc) for doc in cursor] return [QuizEntity(**doc) for doc in cursor]
def get_random_quizzes(
self,
limit: int = 3,
lang_code: str = "id",
) -> List[QuizEntity]:
pipeline = [
{
"$match": {
"is_public": True,
"language_code": lang_code,
}
},
{"$sample": {"size": limit}},
]
cursor = self.collection.aggregate(pipeline)
return [QuizEntity(**doc) for doc in cursor]
def get_random_quizzes_by_subjects(
self,
subject_ids: List[str],
limit: int = 3,
lang_code: str = "id",
) -> List[QuizEntity]:
pipeline = [
{
"$match": {
"subject_id": {"$in": subject_ids},
"is_public": True,
"language_code": lang_code,
}
},
{"$sample": {"size": limit}},
]
cursor = self.collection.aggregate(pipeline)
return [QuizEntity(**doc) for doc in cursor]

View File

@ -15,3 +15,7 @@ class ResponseSchema(BaseModel, Generic[T]):
message: str message: str
data: Optional[T] = None data: Optional[T] = None
meta: Optional[MetaSchema] = None meta: Optional[MetaSchema] = None
class Config:
from_attributes = True
exclude_none = True

View File

@ -7,6 +7,7 @@ from .quiz_item_schema import QuestionItemSchema
class QuizCreateSchema(BaseModel): class QuizCreateSchema(BaseModel):
title: str title: str
description: Optional[str] = None description: Optional[str] = None
lang_code: str = "id"
is_public: bool = False is_public: bool = False
subject_id: str subject_id: str
author_id: Optional[str] = None author_id: Optional[str] = None

View File

@ -1,10 +1,8 @@
from pydantic import BaseModel, Field from pydantic import BaseModel
from typing import Optional from typing import Optional
class SubjectCreateRequest(BaseModel): class SubjectCreateRequest(BaseModel):
name: str = Field(..., example="Ilmu Pengetahuan ALam") name: str
alias: str = Field(..., examples="IPA", alias="short_name") alias: str
description: Optional[str] = Field( description: Optional[str]
None, example="Pelajaran tentang angka dan logika"
)

View File

@ -4,11 +4,11 @@ from .quiz.question_item_schema import QuestionItemSchema
from .quiz.quiz_data_rsp_schema import UserQuizListResponse from .quiz.quiz_data_rsp_schema import UserQuizListResponse
from .history.history_response import HistoryResultSchema from .history.history_response import HistoryResultSchema
from .history.detail_history_response import QuizHistoryResponse, QuestionResult from .history.detail_history_response import QuizHistoryResponse, QuestionResult
from .recomendation.recomendation_response_schema import ListingQuizResponse
from .subject.get_subject_schema import GetSubjectResponse from .subject.get_subject_schema import GetSubjectResponse
from .auth.login_response import LoginResponseSchema from .auth.login_response import LoginResponseSchema
from .user.user_response_scema import UserResponseSchema from .user.user_response_scema import UserResponseSchema
from .answer.answer_session_response import AnsweredQuizResponse from .answer.answer_session_response import AnsweredQuizResponse
from .recomendation.recomendation_response_schema import ListingQuizResponse
__all__ = [ __all__ = [
"QuizCreationResponse", "QuizCreationResponse",

View File

@ -12,3 +12,4 @@ class LoginResponseSchema(BaseModel):
pic_url: Optional[str] = None pic_url: Optional[str] = None
phone: Optional[str] = None phone: Optional[str] = None
locale: Optional[str] = None locale: Optional[str] = None
created_at: Optional[str] = None

View File

@ -1,8 +1,6 @@
from typing import List from typing import List
from pydantic import BaseModel from pydantic import BaseModel
from app.schemas.response.recomendation.recomendation_response_schema import ( from ..recomendation.recomendation_response_schema import ListingQuizResponse
ListingQuizResponse,
)
class UserQuizListResponse(BaseModel): class UserQuizListResponse(BaseModel):

View File

@ -60,7 +60,8 @@ class AnswerService:
answer_index = int(user_answer.answer) answer_index = int(user_answer.answer)
if 0 <= answer_index < len(question.options): if 0 <= answer_index < len(question.options):
correct = str(answer_index) == question.target_answer correct = answer_index == question.target_answer
else: else:
raise ValueError( raise ValueError(
f"Index jawaban tidak valid untuk soal {question.index}" f"Index jawaban tidak valid untuk soal {question.index}"
@ -82,11 +83,8 @@ class AnswerService:
) )
) )
total_questions = len(quiz_data.question_listings) total_questions = len(quiz_data.question_listings)
total_score = ( total_score = total_correct * 100 // total_questions
total_correct * 100 // total_questions
) # contoh perhitungan: nilai 100 dibagi rata
# Buat entitas yang akan disimpan
answer_entity = UserAnswerEntity( answer_entity = UserAnswerEntity(
session_id=answer_data.session_id, session_id=answer_data.session_id,
quiz_id=answer_data.quiz_id, quiz_id=answer_data.quiz_id,

View File

@ -1,9 +1,14 @@
from app.repositories import QuizRepository, UserRepository, SubjectRepository from app.repositories import (
QuizRepository,
UserRepository,
SubjectRepository,
UserAnswerRepository,
)
from app.schemas.requests import QuizCreateSchema from app.schemas.requests import QuizCreateSchema
from app.schemas.response import ( from app.schemas.response import (
UserQuizListResponse, UserQuizListResponse,
ListingQuizResponse,
QuizGetSchema, QuizGetSchema,
ListingQuizResponse,
) )
from app.exception import DataNotFoundException, ValidationException from app.exception import DataNotFoundException, ValidationException
from app.mapper import QuizMapper from app.mapper import QuizMapper
@ -13,13 +18,15 @@ from app.helpers import DatetimeUtil
class QuizService: class QuizService:
def __init__( def __init__(
self, self,
quiz_repository=QuizRepository, quiz_repository: QuizRepository,
user_repository=UserRepository, user_repository: UserRepository,
subject_repository=SubjectRepository, subject_repository: SubjectRepository,
answer_repository: UserAnswerRepository,
): ):
self.quiz_repository = quiz_repository self.quiz_repository = quiz_repository
self.user_repostory = user_repository self.user_repostory = user_repository
self.subject_repository = subject_repository self.subject_repository = subject_repository
self.answer_repository = answer_repository
def get_quiz(self, quiz_id) -> QuizGetSchema: def get_quiz(self, quiz_id) -> QuizGetSchema:
data = self.quiz_repository.get_by_id(quiz_id) data = self.quiz_repository.get_by_id(quiz_id)
@ -46,7 +53,7 @@ class QuizService:
if author is None: if author is None:
continue continue
mapped_quizzes.append( mapped_quizzes.append(
QuizMapper.quiz_to_recomendation_mapper( QuizMapper.quiz_to_populer_mapper(
quiz_entity=quiz, quiz_entity=quiz,
user_entity=author, user_entity=author,
) )
@ -63,13 +70,9 @@ class QuizService:
total_user_quiz = self.quiz_repository.count_by_user_id(user_id) total_user_quiz = self.quiz_repository.count_by_user_id(user_id)
user = self.user_repostory.get_user_by_id(user_id) user = self.user_repostory.get_user_by_id(user_id)
quiz_data = [ quiz_data = [QuizMapper.quiz_to_populer_mapper(quiz, user) for quiz in quizzes]
QuizMapper.quiz_to_recomendation_mapper(quiz, user) for quiz in quizzes
]
return UserQuizListResponse(total=total_user_quiz, quizzes=quiz_data) return UserQuizListResponse(total=total_user_quiz, quizzes=quiz_data)
@ -100,9 +103,14 @@ class QuizService:
def delete_quiz(self, quiz_id): def delete_quiz(self, quiz_id):
return self.quiz_repository.delete(quiz_id) return self.quiz_repository.delete(quiz_id)
def get_quiz_recommendation(self, page: int, limit: int): def get_quiz_populer(self, page: int, limit: int, lang_code: str):
data = self.quiz_repository.get_top_played_quizzes(
page=page,
limit=limit,
lang_code=lang_code,
)
data = self.quiz_repository.get_top_played_quizzes(page=page, limit=limit)
if not data: if not data:
raise DataNotFoundException("Quiz not found") raise DataNotFoundException("Quiz not found")
@ -110,9 +118,53 @@ class QuizService:
for quiz in data: for quiz in data:
author = self.user_repostory.get_user_by_id(user_id=quiz.author_id) author = self.user_repostory.get_user_by_id(user_id=quiz.author_id)
result.append( result.append(
QuizMapper.quiz_to_recomendation_mapper( QuizMapper.quiz_to_populer_mapper(
quiz_entity=quiz, quiz_entity=quiz,
user_entity=author, user_entity=author,
) )
) )
return result return result
def get_quiz_recommendation(
self,
user_id: str,
page: int,
limit: int,
lang_code: str,
):
quiz_data = []
if user_id:
user_answer = self.answer_repository.get_by_user(user_id=user_id)
if user_answer:
quiz_ids = list({answer.quiz_id for answer in user_answer})
quiz_work = self.quiz_repository.get_by_ids(quiz_ids)
if quiz_work:
quiz_subjects = list({quiz.subject_id for quiz in quiz_work})
if quiz_subjects:
quiz_data = self.quiz_repository.get_random_quizzes_by_subjects(
subject_ids=quiz_subjects,
limit=limit,
lang_code=lang_code,
)
if not quiz_data:
quiz_data = self.quiz_repository.get_random_quizzes(
limit=limit,
lang_code=lang_code,
)
result = []
for quiz in quiz_data:
author = self.user_repostory.get_user_by_id(user_id=quiz.author_id)
result.append(
QuizMapper.quiz_to_populer_mapper(
quiz_entity=quiz,
user_entity=author,
)
)
return result

View File

@ -38,12 +38,19 @@ class SessionService:
self.quiz_repository = quiz_repository self.quiz_repository = quiz_repository
self.answer_repository = answer_repository self.answer_repository = answer_repository
def create_session(self, quiz_id: str, host_id: str, limit_participan: int) -> str: def create_session(
self,
quiz_id: str,
host_id: str,
room_name: str,
limit_participan: int,
) -> str:
generateed_code = uuid4().hex[:6].upper() generateed_code = uuid4().hex[:6].upper()
session = SessionEntity( session = SessionEntity(
session_code=generateed_code, session_code=generateed_code,
quiz_id=quiz_id, quiz_id=quiz_id,
host_id=host_id, host_id=host_id,
room_name=room_name,
created_at=DatetimeUtil.now_iso(), created_at=DatetimeUtil.now_iso(),
limit_participan=limit_participan, limit_participan=limit_participan,
participants=[], participants=[],

View File

@ -1,8 +1,8 @@
from datetime import datetime from app.repositories import UserRepository, UserAnswerRepository, QuizRepository
from app.repositories import UserRepository
from app.schemas import RegisterSchema from app.schemas import RegisterSchema
from app.schemas.requests import ProfileUpdateSchema from app.schemas.requests import ProfileUpdateSchema
from app.schemas.response import UserResponseSchema from app.schemas.response import UserResponseSchema
from app.models.entities import UserAnswerEntity
from app.mapper import UserMapper from app.mapper import UserMapper
from app.exception import AlreadyExistException, DataNotFoundException from app.exception import AlreadyExistException, DataNotFoundException
from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.security import generate_password_hash, check_password_hash
@ -10,8 +10,15 @@ from app.helpers import DatetimeUtil
class UserService: class UserService:
def __init__(self, user_repository: UserRepository): def __init__(
self,
user_repository: UserRepository,
answer_repository: UserAnswerRepository,
quiz_repository: QuizRepository,
):
self.user_repository = user_repository self.user_repository = user_repository
self.answer_repository = answer_repository
self.quiz_repository = quiz_repository
def get_all_users(self): def get_all_users(self):
return self.user_repository.get_all_users() return self.user_repository.get_all_users()
@ -94,3 +101,24 @@ class UserService:
user_dict["updated_at"] = DatetimeUtil.to_string(user_dict["updated_at"]) user_dict["updated_at"] = DatetimeUtil.to_string(user_dict["updated_at"])
return UserResponseSchema(**user_dict) return UserResponseSchema(**user_dict)
def get_user_status(self, user_id):
user_answers: list[UserAnswerEntity] = self.answer_repository.get_by_user(
user_id
)
total_quiz = self.quiz_repository.count_by_user_id(user_id)
if not user_answers:
return None
total_score = sum(answer.total_score for answer in user_answers)
total_questions = len(user_answers)
percentage = total_score / total_questions
return {
"avg_score": round(percentage, 2),
"total_solve": total_questions,
"total_quiz": total_quiz,
}

1135
docs/rest_api_docs.yaml Normal file

File diff suppressed because it is too large Load Diff

View File

@ -114,7 +114,7 @@
# assert result.quizzes == [] # assert result.quizzes == []
# def test_get_quiz_recommendation_found(quiz_service, mock_repositories): # def test_get_quiz_populer_found(quiz_service, mock_repositories):
# quiz = QuizEntity( # quiz = QuizEntity(
# id=ObjectId(), # id=ObjectId(),
# author_id="user1", # author_id="user1",
@ -131,14 +131,14 @@
# mock_repositories["quiz_repository"].get_top_played_quizzes.return_value = [quiz] # mock_repositories["quiz_repository"].get_top_played_quizzes.return_value = [quiz]
# mock_repositories["user_repository"].get_user_by_id.return_value = MagicMock() # mock_repositories["user_repository"].get_user_by_id.return_value = MagicMock()
# result = quiz_service.get_quiz_recommendation(1, 5) # result = quiz_service.get_quiz_populer(1, 5)
# assert len(result) == 1 # assert len(result) == 1
# def test_get_quiz_recommendation_not_found(quiz_service, mock_repositories): # def test_get_quiz_populer_not_found(quiz_service, mock_repositories):
# mock_repositories["quiz_repository"].get_top_played_quizzes.return_value = [] # mock_repositories["quiz_repository"].get_top_played_quizzes.return_value = []
# with pytest.raises(DataNotFoundException): # with pytest.raises(DataNotFoundException):
# quiz_service.get_quiz_recommendation(1, 5) # quiz_service.get_quiz_populer(1, 5)
# def test_update_quiz(quiz_service, mock_repositories): # def test_update_quiz(quiz_service, mock_repositories):