Compare commits

..

18 Commits

Author SHA1 Message Date
akhdanre 9fd0caad19 feat: adding exception and and logger handler 2025-05-27 16:02:14 +07:00
akhdanre 43e7fcca30 feat: adding log info 2025-05-27 15:54:10 +07:00
akhdanre 4203db07af fix: session data response 2025-05-27 13:57:31 +07:00
akhdanre cbdce3da20 feat: add language filter for quizzes to match user locale 2025-05-26 17:56:31 +07:00
akhdanre d46b1e934e fix: validation answer input 2025-05-26 14:40:50 +07:00
akhdanre b3069687d0 fix: sorting by newest answer first 2025-05-26 11:40:14 +07:00
akhdanre 55f9e15468 feat: fix the response lgoin user data 2025-05-25 22:06:25 +07:00
akhdanre 82faab1cc5 fix: logic on the get quiz recomendation 2025-05-25 19:33:37 +07:00
akhdanre faf6a0f4b9 feat: adding recomendation 2025-05-25 19:08:51 +07:00
Akhdan Robbani a099da34f4
Merge pull request #5 from Akhdanre/feat/swagger-documentation
swagger and fix issue
2025-05-25 12:56:22 +07:00
akhdanre 324173ec84 feat: adding user stat request 2025-05-25 12:54:27 +07:00
akhdanre 7ba31325eb fix: subject add data and some documentation 2025-05-24 15:00:19 +07:00
akhdanre 9c1793088a feat: adding docuemntation 2025-05-24 14:13:34 +07:00
akhdanre 155e74678f feat: merge develop 2025-05-23 15:13:20 +07:00
akhdanre 599cd689f6 feat: adding documentation for user 2025-03-21 01:20:32 +07:00
akhdanre 71a3091df0 fix: prefix on the url path 2025-03-21 01:12:24 +07:00
akhdanre 4804495371 feat: setup swagger for documentation 2025-03-21 01:05:19 +07:00
akhdanre 687d31d3f1 feat: setup swagger for documentation 2025-03-21 00:54:44 +07:00
35 changed files with 1798 additions and 259 deletions

131
.gitignore vendored
View File

@ -1,14 +1,137 @@
# Ignore only __pycache__ inside the app directory
app/**/__pycache__/
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# Only ignore __pycache__ inside app
app/**/__pycache__/
# Ignore compiled Python files inside app
app/**/*.pyc
app/**/*.pyo
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Logs
logs/
*.log
# Django stuff:
*.sqlite3
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre
.pyre/
# pytype
.pytype/
# Cython debug symbols
cython_debug/
# VS Code
.vscode/
# JetBrains IDEs
.idea/
*.iml
# MacOS
.DS_Store
# Thumbs.db (Windows)
Thumbs.db
ehthumbs.db
# Others
*.bak
*.swp
*.swo
*~
# Local dev files
local_settings.py

5
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,5 @@
{
"yaml.schemas": {
"openapi:v3": "file:///mnt/disc1/code/thesis_quiz_project/quiz_maker/docs/rest_api_docs.yaml"
}
}

View File

@ -2,6 +2,7 @@ from .default import default_blueprint
from .auth import auth_blueprint
from .user import user_blueprint
from .swagger import swagger_blueprint
from .quiz import quiz_bp
from .history import history_blueprint
from .subject import subject_blueprint
@ -15,6 +16,7 @@ __all__ = [
"history_blueprint",
"subject_blueprint",
"session_bp",
"swagger_blueprint",
]

View File

@ -17,8 +17,10 @@ def user_history(
@history_blueprint.route("/detail/<answer_id>", methods=["GET"])
@inject
def user_detail_history(
answer_id, controller: HistoryController = Provide[Container.history_controller]
answer_id: str,
controller: HistoryController = Provide[Container.history_controller],
):
print(answer_id)
return controller.get_detail_quiz_history(answer_id)

View File

@ -56,14 +56,36 @@ def get_answer(controller: QuizController = Provide[Container.quiz_controller]):
)
@quiz_bp.route("/recomendation", methods=["GET"])
@quiz_bp.route("/populer", methods=["GET"])
@inject
def get_quiz_recommendation(
def get_quiz_populer(
controller: QuizController = Provide[Container.quiz_controller],
):
page = request.args.get("page")
limit = request.args.get("limit")
return controller.get_quiz_recommendation(page=page, limit=limit)
lang_code = request.args.get("lang_code") or "id"
return controller.get_quiz_populer(
page=page,
limit=limit,
lang_code=lang_code,
)
@quiz_bp.route("/recommendation", methods=["GET"])
@inject
def get_quiz_recommendation(
controller: QuizController = Provide[Container.quiz_controller],
):
user_id = request.args.get("user_id")
page = request.args.get("page")
limit = request.args.get("limit")
lang_code = request.args.get("lang_code") or "id"
return controller.get_quiz_recommendation(
user_id=user_id,
page=page,
limit=limit,
lang_code=lang_code,
)
@quiz_bp.route("/user/<user_id>", methods=["GET"])
@ -83,6 +105,7 @@ def search_quiz(controller: QuizController = Provide[Container.quiz_controller])
subject_id = request.args.get("subject_id")
page = int(request.args.get("page", 1))
limit = int(request.args.get("limit", 10))
lang_code = request.args.get("lang_code") or "id"
return controller.search_quiz(
keyword=keyword, subject_id=subject_id, page=page, limit=limit

23
app/blueprints/swagger.py Normal file
View File

@ -0,0 +1,23 @@
from flask import Blueprint, jsonify, send_file
from flask_swagger_ui import get_swaggerui_blueprint
import os
swagger_blueprint = Blueprint("swagger", __name__)
SWAGGER_URL = "/swagger"
API_URL = "http://127.0.0.1:5000/swagger/docs"
swagger_ui_blueprint = get_swaggerui_blueprint(
SWAGGER_URL,
API_URL,
config={"app_name": "Quiz Maker API"},
)
swagger_blueprint.register_blueprint(swagger_ui_blueprint)
@swagger_blueprint.route("/swagger/docs")
def serve_openapi():
"""Serve the OpenAPI spec from a file."""
docs_path = os.path.abspath("docs/rest_api_docs.yaml")
return send_file(docs_path, mimetype="application/yaml")

View File

@ -38,3 +38,11 @@ def get_user(
user_id, user_controller: UserController = Provide[Container.user_controller]
):
return user_controller.get_user_by_id(user_id)
@user_blueprint.route("/user/status/<string:user_id>", methods=["GET"])
@inject
def get_user_stat(
user_id, user_controller: UserController = Provide[Container.user_controller]
):
return user_controller.user_stat(user_id)

View File

@ -9,6 +9,7 @@ class Config:
# Flask Environment Settings
FLASK_ENV = os.getenv("FLASK_ENV", "development")
DEBUG = os.getenv("DEBUG", "False").lower() in ("true", "1", "t")
API_VERSION = os.getenv("API_VERSION", "v1")
SECRET_KEY = os.getenv("SECRET_KEY", "your_secret_key")
# MongoDB Settings

View File

@ -9,9 +9,14 @@ class LoggerConfig:
LOG_DIR = "logs" # Define the log directory
@staticmethod
def init_logger(app):
"""Initializes separate log files for INFO, ERROR, and WARNING levels."""
def init_logger(app, production_mode=False):
"""
Initializes separate log files for INFO, ERROR, and WARNING levels.
Args:
app: Flask application instance
production_mode (bool): If True, disables console output and only logs to files
"""
# Ensure the logs directory exists
if not os.path.exists(LoggerConfig.LOG_DIR):
os.makedirs(LoggerConfig.LOG_DIR)
@ -20,23 +25,37 @@ class LoggerConfig:
for handler in app.logger.handlers[:]:
app.logger.removeHandler(handler)
# Disable propagation to root logger to prevent console output in production
if production_mode:
app.logger.propagate = False
# Create separate loggers
info_logger = LoggerConfig._setup_logger(
info_handler = LoggerConfig._setup_logger(
"info_logger", "info.log", logging.INFO, logging.WARNING
)
error_logger = LoggerConfig._setup_logger(
error_handler = LoggerConfig._setup_logger(
"error_logger", "error.log", logging.ERROR, logging.CRITICAL
)
warning_logger = LoggerConfig._setup_logger(
warning_handler = LoggerConfig._setup_logger(
"warning_logger", "warning.log", logging.WARNING, logging.WARNING
)
# Attach handlers to Flask app logger
app.logger.addHandler(info_logger)
app.logger.addHandler(error_logger)
app.logger.addHandler(warning_logger)
app.logger.addHandler(info_handler)
app.logger.addHandler(error_handler)
app.logger.addHandler(warning_handler)
app.logger.setLevel(logging.DEBUG) # Set lowest level to capture all logs
# Add console handler only in development mode
if not production_mode:
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(message)s"
)
console_handler.setFormatter(console_formatter)
app.logger.addHandler(console_handler)
app.logger.info("Logger has been initialized for Flask application.")
@staticmethod
@ -44,11 +63,14 @@ class LoggerConfig:
"""Helper method to configure loggers for specific levels."""
logger = logging.getLogger(name)
logger.setLevel(level)
log_file_path = os.path.join(LoggerConfig.LOG_DIR, filename)
log_handler = RotatingFileHandler(log_file_path, maxBytes=100000, backupCount=3)
log_handler.setLevel(level)
log_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
log_handler.setFormatter(log_formatter)
log_handler.addFilter(lambda record: level <= record.levelno <= max_level)
logger.addHandler(log_handler)
return log_handler

View File

@ -43,9 +43,9 @@ class QuizController:
except Exception as e:
return make_error_response(e)
def quiz_recomendation(self):
def quiz_populer(self):
try:
result = self.quiz_service.get_quiz_recommendation()
result = self.quiz_service.get_quiz_populer()
if not result:
return make_response(message="Quiz not found", status_code=404)
return make_response(message="Quiz Found", data=result.model_dump())
@ -94,14 +94,39 @@ class QuizController:
except Exception as e:
return make_error_response(e)
def get_quiz_recommendation(self, page, limit):
def get_quiz_populer(self, page, limit, lang_code):
try:
page = int(page) if page is not None else 1
limit = int(limit) if limit is not None else 3
result = self.quiz_service.get_quiz_recommendation(page=page, limit=limit)
return make_response(
message="success retrieve recommendation quiz", data=result
result = self.quiz_service.get_quiz_populer(
page=page,
limit=limit,
lang_code=lang_code,
)
return make_response(message="success retrieve populer quiz", data=result)
except DataNotFoundException as e:
return make_response(message=e.message, status_code=e.status_code)
except ValueError as e:
return make_response(message=str(e), data=None, status_code=400)
except ValidationError as e:
return make_response(
message="validation error", data=json.loads(e.json()), status_code=400
)
except Exception as e:
return make_error_response(e)
def get_quiz_recommendation(self, user_id, page, limit, lang_code):
try:
page = int(page) if page is not None else 1
limit = int(limit) if limit is not None else 3
result = self.quiz_service.get_quiz_recommendation(
user_id=user_id,
page=page,
limit=limit,
lang_code=lang_code,
)
return make_response(message="success retrieve populer quiz", data=result)
except DataNotFoundException as e:
return make_response(message=e.message, status_code=e.status_code)
except ValueError as e:

View File

@ -13,6 +13,7 @@ class SessionController(MethodView):
required_fields = [
"quiz_id",
"host_id",
"room_name",
"limit_participan",
]
for field in required_fields:
@ -22,6 +23,7 @@ class SessionController(MethodView):
session = self.session_service.create_session(
quiz_id=data["quiz_id"],
host_id=data["host_id"],
room_name=data["room_name"],
limit_participan=data["limit_participan"],
)

View File

@ -1,5 +1,5 @@
from flask_socketio import SocketIO, emit, join_room, leave_room
from flask import request
from flask import request, current_app
from app.services import SessionService
import threading
@ -17,186 +17,215 @@ class SocketController:
def _register_events(self):
@self.socketio.on("connect")
def on_connect():
# print(f"Client connected: {request.sid}")
emit("connection_response", {"status": "connected", "sid": request.sid})
try:
current_app.logger.info(f"Client connected: {request.sid}")
emit("connection_response", {"status": "connected", "sid": request.sid})
except Exception as e:
emit("error", {"message": f"Connect error: {str(e)}"})
current_app.logger.error(f"Connect error: {str(e)}")
@self.socketio.on("disconnect")
def on_disconnect():
# print(f"Client disconnected: {request.sid}")
pass
try:
current_app.logger.info(f"Client disconnected: {request.sid}")
except Exception as e:
emit("error", {"message": f"Disconnect error: {str(e)}"})
current_app.logger.error(f"error: {str(e)}")
@self.socketio.on("join_room")
def handle_join_room(data):
session_code = data.get("session_code")
user_id = data.get("user_id")
try:
session_code = data.get("session_code")
user_id = data.get("user_id")
if not session_code or not user_id:
emit("error", {"message": "session_code and user_id are required"})
return
if not session_code or not user_id:
emit("error", {"message": "session_code and user_id are required"})
return
session = self.session_service.join_session(
session_code=session_code,
user_id=user_id,
)
session = self.session_service.join_session(session_code, user_id)
if session is None:
emit(
"error",
{"message": "Failed to join session or session inactive"},
)
return
if session is None:
emit("error", {"message": "Failed to join session or session inactive"})
return
session_id = session["session_id"]
join_room(session_id)
session_id = session["session_id"]
message = (
"Admin has joined the room."
if session["is_admin"]
else f"User {session['username']} has joined the room."
)
join_room(session_id)
if session["is_admin"]:
message = "Admin has joined the room."
else:
message = f"User {session['username']} has joined the room."
emit(
"room_message",
{
"type": "join",
"message": message,
"room": session_id,
"argument": "adm_update",
"data": {
"session_info": session["session_info"],
"quiz_info": session["quiz_info"],
},
},
to=request.sid,
)
emit(
"room_message",
{
"type": "participan_join",
"message": message,
"room": session_id,
"argument": "adm_update",
"data": {
"participants": session["session_info"]["participants"],
},
},
room=session_id,
skip_sid=request.sid,
)
@self.socketio.on("leave_room")
def handle_leave_room(data):
session_id = data.get("session_id")
user_id = data.get("user_id")
username = data.get("username", "anonymous")
leave_result = self.session_service.leave_session(
session_id=session_id,
user_id=user_id,
)
leave_room(session_id)
if leave_result["is_success"]:
current_app.logger.info(f"Client joined: {message}")
emit(
"room_message",
{
"type": "participan_leave",
"message": f"{username} has left the room.",
"type": "join",
"message": message,
"room": session_id,
"argument": "adm_update",
"data": {
"participants": leave_result["participants"],
"session_info": session["session_info"],
"quiz_info": session["quiz_info"],
},
},
to=request.sid,
)
emit(
"room_message",
{
"type": "participan_join",
"message": message,
"room": session_id,
"argument": "adm_update",
"data": {
"participants": session["session_info"]["participants"],
},
},
room=session_id,
skip_sid=request.sid,
)
emit(
"room_message",
{
"type": "leave",
"message": f"{username} has left the room.",
"room": session_id,
"argument": "adm_update",
"data": None,
},
room=session_id,
to=request.sid,
)
except Exception as e:
emit("error", {"message": f"Join room error: {str(e)}"})
current_app.logger.error(f"error: {str(e)}")
@self.socketio.on("leave_room")
def handle_leave_room(data):
try:
session_id = data.get("session_id")
user_id = data.get("user_id")
username = data.get("username", "anonymous")
leave_result = self.session_service.leave_session(session_id, user_id)
leave_room(session_id)
if leave_result["is_success"]:
emit(
"room_message",
{
"type": "participan_leave",
"message": f"{username} has left the room.",
"room": session_id,
"argument": "adm_update",
"data": {
"participants": leave_result["participants"],
},
},
room=session_id,
skip_sid=request.sid,
)
emit(
"room_message",
{
"type": "leave",
"message": f"{username} has left the room.",
"room": session_id,
"argument": "adm_update",
"data": None,
},
room=session_id,
to=request.sid,
)
except Exception as e:
emit("error", {"message": f"Leave room error: {str(e)}"})
current_app.logger.error(f"error: {str(e)}")
@self.socketio.on("send_message")
def on_send_message(data):
session_code = data.get("session_id")
message = data.get("message")
username = data.get("username", "anonymous")
emit(
"receive_message",
{"message": message, "from": username},
room=session_code,
)
try:
session_code = data.get("session_id")
message = data.get("message")
username = data.get("username", "anonymous")
emit(
"receive_message",
{"message": message, "from": username},
room=session_code,
)
except Exception as e:
emit("error", {"message": f"Send message error: {str(e)}"})
current_app.logger.error(f"error: {str(e)}")
@self.socketio.on("end_session")
def handle_end_session(data):
session_code = data.get("session_id")
user_id = data.get("user_id")
if not session_code or not user_id:
emit("error", {"message": "session_id and user_id required"})
return
try:
session_code = data.get("session_id")
user_id = data.get("user_id")
if not session_code or not user_id:
emit("error", {"message": "session_id and user_id required"})
return
# Validasi user berhak mengakhiri session
self.session_service.end_session(session_id=session_code, user_id=user_id)
self.session_service.end_session(
session_id=session_code, user_id=user_id
)
# Bersihkan semua data session di Redis
for key in [
self._answers_key(session_code),
self._scores_key(session_code),
self._questions_key(session_code),
]:
self.redis_repo.delete_key(key)
for key in [
self._answers_key(session_code),
self._scores_key(session_code),
self._questions_key(session_code),
]:
self.redis_repo.delete_key(key)
emit(
"room_closed",
{"message": "Session has ended.", "room": session_code},
room=session_code,
)
emit(
"room_closed",
{"message": "Session has ended.", "room": session_code},
room=session_code,
)
except Exception as e:
emit("error", {"message": f"End session error: {str(e)}"})
current_app.logger.error(f"error: {str(e)}")
@self.socketio.on("start_quiz")
def handle_start_quiz(data):
session_id = data.get("session_id")
if not session_id:
emit("error", {"message": "session_id is required"})
return
try:
session_id = data.get("session_id")
if not session_id:
emit("error", {"message": "session_id is required"})
return
emit("quiz_started", {"message": "Quiz has started!"}, room=session_id)
threading.Thread(
target=self.session_service.run_quiz_flow,
args=(session_id, self.socketio),
daemon=True,
).start()
emit("quiz_started", {"message": "Quiz has started!"}, room=session_id)
threading.Thread(
target=self.session_service.run_quiz_flow,
args=(session_id, self.socketio),
daemon=True,
).start()
except Exception as e:
emit("error", {"message": f"Start quiz error: {str(e)}"})
current_app.logger.error(f"error: {str(e)}")
@self.socketio.on("submit_answer")
def handle_submit_answer(data):
session_id = data.get("session_id")
user_id = data.get("user_id")
question_index = data.get("question_index")
user_answer = data.get("answer")
time_spent = data.get("time_spent")
if not all(
[
session_id,
user_id,
question_index is not None,
user_answer is not None,
time_spent is not None,
]
):
emit(
"error",
{
"message": "session_id, user_id, question_index, and answer are required"
},
)
return
try:
session_id = data.get("session_id")
user_id = data.get("user_id")
question_index = data.get("question_index")
user_answer = data.get("answer")
time_spent = data.get("time_spent")
if not all(
[
session_id,
user_id,
question_index is not None,
user_answer is not None,
time_spent is not None,
]
):
emit(
"error",
{
"message": "session_id, user_id, question_index, and answer are required"
},
)
return
result = self.session_service.submit_answer(
session_id=session_id,
user_id=user_id,
@ -204,25 +233,29 @@ class SocketController:
answer=user_answer,
time_spent=time_spent,
)
emit(
"answer_submitted",
{
"question_index": result["question_index"],
"answer": result["answer"],
"correct": result["correct"],
"score": result["scores"],
},
to=request.sid,
)
emit(
"score_update",
{
"scores": self.session_service.get_ranked_scores(session_id),
},
room=session_id,
)
except ValueError as exc:
emit("error", {"message": str(exc)})
return
emit(
"answer_submitted",
{
"question_index": result["question_index"],
"answer": result["answer"],
"correct": result["correct"],
"score": result["scores"],
},
to=request.sid,
)
emit(
"score_update",
{
"scores": self.session_service.get_ranked_scores(session_id),
},
room=session_id,
)
current_app.logger.error(f"error: {str(exc)}")
except Exception as e:
emit("error", {"message": f"Submit answer error: {str(e)}"})
current_app.logger.error(f"error: {str(e)}")

View File

@ -1,5 +1,6 @@
from app.services.subject_service import SubjectService
from app.helpers import make_response, make_error_response
from app.schemas.requests import SubjectCreateRequest
class SubjectController:
@ -8,7 +9,8 @@ class SubjectController:
def create(self, req_body):
try:
new_id = self.service.create_subject(req_body)
data = SubjectCreateRequest(**req_body)
new_id = self.service.create_subject(data)
return make_response(message="Subject created", data={"id": new_id})
except Exception as e:
return make_error_response(e)

View File

@ -114,3 +114,16 @@ class UserController:
message="An internal server error occurred. Please try again later.",
status_code=500,
)
def user_stat(self, user_id):
try:
response = self.user_service.get_user_status(user_id)
return make_response(message="Success retrive user stat", data=response)
except Exception as e:
current_app.logger.error(
f"Error while changing password: {str(e)}", exc_info=True
)
return make_response(
message="An internal server error occurred. Please try again later.",
status_code=500,
)

View File

@ -66,6 +66,8 @@ class Container(containers.DeclarativeContainer):
user_service = providers.Factory(
UserService,
user_repository,
answer_repository,
quiz_repository,
)
quiz_service = providers.Factory(
@ -73,7 +75,9 @@ class Container(containers.DeclarativeContainer):
quiz_repository,
user_repository,
subject_repository,
answer_repository,
)
answer_service = providers.Factory(
AnswerService,
answer_repository,

View File

@ -16,6 +16,7 @@ from app.blueprints import (
history_blueprint,
subject_blueprint,
session_bp,
swagger_blueprint,
)
from app.database import init_db
from redis import Redis
@ -24,7 +25,7 @@ from redis import Redis
def createApp() -> tuple[Flask, SocketIO]:
app = Flask(__name__)
app.config.from_object(Config)
LoggerConfig.init_logger(app)
LoggerConfig.init_logger(app, not Config.DEBUG)
logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
@ -65,6 +66,7 @@ def createApp() -> tuple[Flask, SocketIO]:
)
app.register_blueprint(default_blueprint)
app.register_blueprint(swagger_blueprint)
app.register_blueprint(auth_blueprint, url_prefix="/api")
app.register_blueprint(user_blueprint, url_prefix="/api")
app.register_blueprint(quiz_bp, url_prefix="/api/quiz")

View File

@ -67,6 +67,7 @@ class QuizMapper:
description=schema.description,
is_public=schema.is_public,
date=datetime,
language_code=schema.lang_code,
total_quiz=len(schema.question_listings),
limit_duration=total_duration,
question_listings=[
@ -76,7 +77,7 @@ class QuizMapper:
)
@staticmethod
def quiz_to_recomendation_mapper(
def quiz_to_populer_mapper(
quiz_entity: QuizEntity,
user_entity: UserEntity,
) -> ListingQuizResponse:

View File

@ -49,9 +49,16 @@ class UserMapper:
email=user.email,
name=user.name,
birth_date=(
DatetimeUtil.to_string(user.birth_date) if user.birth_date else None
DatetimeUtil.to_string(user.birth_date, fmt="%d-%m-%Y")
if user.birth_date
else None
),
pic_url=user.pic_url,
phone=user.phone,
locale=user.locale,
created_at=(
DatetimeUtil.to_string(user.created_at, fmt="%d-%m-%Y")
if user.created_at
else None
),
)

View File

@ -14,8 +14,9 @@ class QuizEntity(BaseModel):
is_public: bool = False
date: datetime
total_quiz: int = 0
limit_duration: Optional[int] = 0 # in
limit_duration: Optional[int] = 0
total_user_playing: int = 0
language_code: Optional[str] = "id"
question_listings: Optional[list[QuestionItemEntity]] = []
class ConfigDict:

View File

@ -9,6 +9,7 @@ class SessionEntity(BaseModel):
session_code: str
quiz_id: str
host_id: str
room_name: str
created_at: datetime = Field(default_factory=datetime.now)
started_at: datetime | None = None
ended_at: datetime | None = None

View File

@ -7,6 +7,7 @@ from tensorflow.keras.preprocessing.text import tokenizer_from_json # type: ign
import re
class AnswerGenerationRepository:
MODEL_PATH = "app/lstm_model/question_generation/qa_lstm_model_final_v2.keras"
TOKENIZER_PATH = "app/lstm_model/question_generation/qa_tokenizers_v2.json"

View File

@ -38,7 +38,7 @@ class UserAnswerRepository:
return list(result)
def get_by_user(self, user_id: str) -> list[UserAnswerEntity]:
result = self.collection.find({"user_id": user_id})
result = self.collection.find({"user_id": user_id}).sort("answered_at", -1)
return [UserAnswerEntity(**doc) for doc in result]
def get_by_session(self, session_id: str) -> List[dict]:

View File

@ -20,34 +20,6 @@ class QuizRepository:
return QuizEntity(**data)
return None
# def search_by_title_or_category(
# self, keyword: str, page: int, page_size: int
# ) -> List[QuizEntity]:
# skip = (page - 1) * page_size
# pipeline = [
# {
# "$lookup": {
# "from": "category",
# "localField": "category_id",
# "foreignField": "_id",
# "as": "category_info",
# }
# },
# {"$unwind": "$category_info"},
# {
# "$match": {
# "$or": [
# {"title": {"$regex": keyword, "$options": "i"}},
# {"category_info.name": {"$regex": keyword, "$options": "i"}},
# ]
# }
# },
# {"$skip": skip},
# {"$limit": page_size},
# ]
# cursor = self.collection.aggregate(pipeline)
# return [QuizEntity(**doc) for doc in cursor]
def search_by_title_or_category(
self, keyword: str, subject_id: Optional[str], page: int, page_size: int
) -> List[QuizEntity]:
@ -86,7 +58,6 @@ class QuizRepository:
object_ids = [ObjectId(qid) for qid in quiz_ids]
cursor = self.collection.find({"_id": {"$in": object_ids}})
datas = list(cursor)
if not datas:
return None
@ -127,13 +98,58 @@ class QuizRepository:
return self.collection.count_documents({"author_id": user_id})
def get_top_played_quizzes(
self, page: int = 1, limit: int = 3, is_public: bool = True
self,
page: int = 1,
limit: int = 3,
is_public: bool = True,
lang_code: str = "id",
) -> List[QuizEntity]:
skip = (page - 1) * limit
cursor = (
self.collection.find({"is_public": is_public})
self.collection.find(
{
"is_public": is_public,
"language_code": lang_code,
}
)
.sort("total_user_playing", -1)
.skip(skip)
.limit(limit)
)
return [QuizEntity(**doc) for doc in cursor]
def get_random_quizzes(
self,
limit: int = 3,
lang_code: str = "id",
) -> List[QuizEntity]:
pipeline = [
{
"$match": {
"is_public": True,
"language_code": lang_code,
}
},
{"$sample": {"size": limit}},
]
cursor = self.collection.aggregate(pipeline)
return [QuizEntity(**doc) for doc in cursor]
def get_random_quizzes_by_subjects(
self,
subject_ids: List[str],
limit: int = 3,
lang_code: str = "id",
) -> List[QuizEntity]:
pipeline = [
{
"$match": {
"subject_id": {"$in": subject_ids},
"is_public": True,
"language_code": lang_code,
}
},
{"$sample": {"size": limit}},
]
cursor = self.collection.aggregate(pipeline)
return [QuizEntity(**doc) for doc in cursor]

View File

@ -15,3 +15,7 @@ class ResponseSchema(BaseModel, Generic[T]):
message: str
data: Optional[T] = None
meta: Optional[MetaSchema] = None
class Config:
from_attributes = True
exclude_none = True

View File

@ -7,6 +7,7 @@ from .quiz_item_schema import QuestionItemSchema
class QuizCreateSchema(BaseModel):
title: str
description: Optional[str] = None
lang_code: str = "id"
is_public: bool = False
subject_id: str
author_id: Optional[str] = None

View File

@ -1,10 +1,8 @@
from pydantic import BaseModel, Field
from pydantic import BaseModel
from typing import Optional
class SubjectCreateRequest(BaseModel):
name: str = Field(..., example="Ilmu Pengetahuan ALam")
alias: str = Field(..., examples="IPA", alias="short_name")
description: Optional[str] = Field(
None, example="Pelajaran tentang angka dan logika"
)
name: str
alias: str
description: Optional[str]

View File

@ -4,11 +4,11 @@ from .quiz.question_item_schema import QuestionItemSchema
from .quiz.quiz_data_rsp_schema import UserQuizListResponse
from .history.history_response import HistoryResultSchema
from .history.detail_history_response import QuizHistoryResponse, QuestionResult
from .recomendation.recomendation_response_schema import ListingQuizResponse
from .subject.get_subject_schema import GetSubjectResponse
from .auth.login_response import LoginResponseSchema
from .user.user_response_scema import UserResponseSchema
from .answer.answer_session_response import AnsweredQuizResponse
from .recomendation.recomendation_response_schema import ListingQuizResponse
__all__ = [
"QuizCreationResponse",

View File

@ -12,3 +12,4 @@ class LoginResponseSchema(BaseModel):
pic_url: Optional[str] = None
phone: Optional[str] = None
locale: Optional[str] = None
created_at: Optional[str] = None

View File

@ -1,8 +1,6 @@
from typing import List
from pydantic import BaseModel
from app.schemas.response.recomendation.recomendation_response_schema import (
ListingQuizResponse,
)
from ..recomendation.recomendation_response_schema import ListingQuizResponse
class UserQuizListResponse(BaseModel):

View File

@ -60,7 +60,8 @@ class AnswerService:
answer_index = int(user_answer.answer)
if 0 <= answer_index < len(question.options):
correct = str(answer_index) == question.target_answer
correct = answer_index == question.target_answer
else:
raise ValueError(
f"Index jawaban tidak valid untuk soal {question.index}"
@ -82,11 +83,8 @@ class AnswerService:
)
)
total_questions = len(quiz_data.question_listings)
total_score = (
total_correct * 100 // total_questions
) # contoh perhitungan: nilai 100 dibagi rata
total_score = total_correct * 100 // total_questions
# Buat entitas yang akan disimpan
answer_entity = UserAnswerEntity(
session_id=answer_data.session_id,
quiz_id=answer_data.quiz_id,

View File

@ -1,9 +1,14 @@
from app.repositories import QuizRepository, UserRepository, SubjectRepository
from app.repositories import (
QuizRepository,
UserRepository,
SubjectRepository,
UserAnswerRepository,
)
from app.schemas.requests import QuizCreateSchema
from app.schemas.response import (
UserQuizListResponse,
ListingQuizResponse,
QuizGetSchema,
ListingQuizResponse,
)
from app.exception import DataNotFoundException, ValidationException
from app.mapper import QuizMapper
@ -13,13 +18,15 @@ from app.helpers import DatetimeUtil
class QuizService:
def __init__(
self,
quiz_repository=QuizRepository,
user_repository=UserRepository,
subject_repository=SubjectRepository,
quiz_repository: QuizRepository,
user_repository: UserRepository,
subject_repository: SubjectRepository,
answer_repository: UserAnswerRepository,
):
self.quiz_repository = quiz_repository
self.user_repostory = user_repository
self.subject_repository = subject_repository
self.answer_repository = answer_repository
def get_quiz(self, quiz_id) -> QuizGetSchema:
data = self.quiz_repository.get_by_id(quiz_id)
@ -46,7 +53,7 @@ class QuizService:
if author is None:
continue
mapped_quizzes.append(
QuizMapper.quiz_to_recomendation_mapper(
QuizMapper.quiz_to_populer_mapper(
quiz_entity=quiz,
user_entity=author,
)
@ -63,13 +70,9 @@ class QuizService:
total_user_quiz = self.quiz_repository.count_by_user_id(user_id)
user = self.user_repostory.get_user_by_id(user_id)
quiz_data = [
QuizMapper.quiz_to_recomendation_mapper(quiz, user) for quiz in quizzes
]
quiz_data = [QuizMapper.quiz_to_populer_mapper(quiz, user) for quiz in quizzes]
return UserQuizListResponse(total=total_user_quiz, quizzes=quiz_data)
@ -100,9 +103,14 @@ class QuizService:
def delete_quiz(self, quiz_id):
return self.quiz_repository.delete(quiz_id)
def get_quiz_recommendation(self, page: int, limit: int):
def get_quiz_populer(self, page: int, limit: int, lang_code: str):
data = self.quiz_repository.get_top_played_quizzes(
page=page,
limit=limit,
lang_code=lang_code,
)
data = self.quiz_repository.get_top_played_quizzes(page=page, limit=limit)
if not data:
raise DataNotFoundException("Quiz not found")
@ -110,9 +118,53 @@ class QuizService:
for quiz in data:
author = self.user_repostory.get_user_by_id(user_id=quiz.author_id)
result.append(
QuizMapper.quiz_to_recomendation_mapper(
QuizMapper.quiz_to_populer_mapper(
quiz_entity=quiz,
user_entity=author,
)
)
return result
def get_quiz_recommendation(
self,
user_id: str,
page: int,
limit: int,
lang_code: str,
):
quiz_data = []
if user_id:
user_answer = self.answer_repository.get_by_user(user_id=user_id)
if user_answer:
quiz_ids = list({answer.quiz_id for answer in user_answer})
quiz_work = self.quiz_repository.get_by_ids(quiz_ids)
if quiz_work:
quiz_subjects = list({quiz.subject_id for quiz in quiz_work})
if quiz_subjects:
quiz_data = self.quiz_repository.get_random_quizzes_by_subjects(
subject_ids=quiz_subjects,
limit=limit,
lang_code=lang_code,
)
if not quiz_data:
quiz_data = self.quiz_repository.get_random_quizzes(
limit=limit,
lang_code=lang_code,
)
result = []
for quiz in quiz_data:
author = self.user_repostory.get_user_by_id(user_id=quiz.author_id)
result.append(
QuizMapper.quiz_to_populer_mapper(
quiz_entity=quiz,
user_entity=author,
)
)
return result

View File

@ -38,12 +38,19 @@ class SessionService:
self.quiz_repository = quiz_repository
self.answer_repository = answer_repository
def create_session(self, quiz_id: str, host_id: str, limit_participan: int) -> str:
def create_session(
self,
quiz_id: str,
host_id: str,
room_name: str,
limit_participan: int,
) -> str:
generateed_code = uuid4().hex[:6].upper()
session = SessionEntity(
session_code=generateed_code,
quiz_id=quiz_id,
host_id=host_id,
room_name=room_name,
created_at=DatetimeUtil.now_iso(),
limit_participan=limit_participan,
participants=[],

View File

@ -1,8 +1,8 @@
from datetime import datetime
from app.repositories import UserRepository
from app.repositories import UserRepository, UserAnswerRepository, QuizRepository
from app.schemas import RegisterSchema
from app.schemas.requests import ProfileUpdateSchema
from app.schemas.response import UserResponseSchema
from app.models.entities import UserAnswerEntity
from app.mapper import UserMapper
from app.exception import AlreadyExistException, DataNotFoundException
from werkzeug.security import generate_password_hash, check_password_hash
@ -10,8 +10,15 @@ from app.helpers import DatetimeUtil
class UserService:
def __init__(self, user_repository: UserRepository):
def __init__(
self,
user_repository: UserRepository,
answer_repository: UserAnswerRepository,
quiz_repository: QuizRepository,
):
self.user_repository = user_repository
self.answer_repository = answer_repository
self.quiz_repository = quiz_repository
def get_all_users(self):
return self.user_repository.get_all_users()
@ -94,3 +101,24 @@ class UserService:
user_dict["updated_at"] = DatetimeUtil.to_string(user_dict["updated_at"])
return UserResponseSchema(**user_dict)
def get_user_status(self, user_id):
user_answers: list[UserAnswerEntity] = self.answer_repository.get_by_user(
user_id
)
total_quiz = self.quiz_repository.count_by_user_id(user_id)
if not user_answers:
return None
total_score = sum(answer.total_score for answer in user_answers)
total_questions = len(user_answers)
percentage = total_score / total_questions
return {
"avg_score": round(percentage, 2),
"total_solve": total_questions,
"total_quiz": total_quiz,
}

1135
docs/rest_api_docs.yaml Normal file

File diff suppressed because it is too large Load Diff

View File

@ -114,7 +114,7 @@
# assert result.quizzes == []
# def test_get_quiz_recommendation_found(quiz_service, mock_repositories):
# def test_get_quiz_populer_found(quiz_service, mock_repositories):
# quiz = QuizEntity(
# id=ObjectId(),
# author_id="user1",
@ -131,14 +131,14 @@
# mock_repositories["quiz_repository"].get_top_played_quizzes.return_value = [quiz]
# mock_repositories["user_repository"].get_user_by_id.return_value = MagicMock()
# result = quiz_service.get_quiz_recommendation(1, 5)
# result = quiz_service.get_quiz_populer(1, 5)
# assert len(result) == 1
# def test_get_quiz_recommendation_not_found(quiz_service, mock_repositories):
# def test_get_quiz_populer_not_found(quiz_service, mock_repositories):
# mock_repositories["quiz_repository"].get_top_played_quizzes.return_value = []
# with pytest.raises(DataNotFoundException):
# quiz_service.get_quiz_recommendation(1, 5)
# quiz_service.get_quiz_populer(1, 5)
# def test_update_quiz(quiz_service, mock_repositories):