Compare commits

..

No commits in common. "develop" and "main" have entirely different histories.

169 changed files with 168 additions and 7617 deletions

3
.env Normal file
View File

@ -0,0 +1,3 @@
MONGO_URI=mongodb://localhost:27017/quiz_app
FLASK_ENV=development
DEBUG=True

View File

@ -1,19 +0,0 @@
# Existing Configurations
MONGO_URI=
FLASK_ENV=development
DEBUG=True
SECRET_KEY=
GOOGLE_PROJECT_ID=
GOOGLE_CLIENT_ID=
GOOGLE_CLIENT_SECRET=
GOOGLE_AUHT_URI=
GOOGLE_TOKEN_URI=
GOOGLE_AUTH_PROVIDER_X509_CERT_URL=
# Redis Configuration
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASSWORD=

137
.gitignore vendored
View File

@ -1,137 +0,0 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# Only ignore __pycache__ inside app
app/**/__pycache__/
# Ignore compiled Python files inside app
app/**/*.pyc
app/**/*.pyo
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Logs
logs/
*.log
# Django stuff:
*.sqlite3
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre
.pyre/
# pytype
.pytype/
# Cython debug symbols
cython_debug/
# VS Code
.vscode/
# JetBrains IDEs
.idea/
*.iml
# MacOS
.DS_Store
# Thumbs.db (Windows)
Thumbs.db
ehthumbs.db
# Others
*.bak
*.swp
*.swo
*~
# Local dev files
local_settings.py

View File

@ -1,5 +0,0 @@
{
"yaml.schemas": {
"openapi:v3": "file:///mnt/disc1/code/thesis_quiz_project/quiz_maker/docs/rest_api_docs.yaml"
}
}

Binary file not shown.

Binary file not shown.

View File

@ -1,2 +0,0 @@
# from flask import Flask
from app.main import createApp

View File

@ -1,23 +1,3 @@
from .default import default_blueprint from .default import default_blueprint
from .auth import auth_blueprint from .auth import auth_blueprint
from .user import user_blueprint from .user import user_blueprint
from .swagger import swagger_blueprint
from .quiz import quiz_bp
from .history import history_blueprint
from .subject import subject_blueprint
from .session import session_bp
__all__ = [
"default_blueprint",
"auth_blueprint",
"user_blueprint",
"quiz_bp",
"history_blueprint",
"subject_blueprint",
"session_bp",
"swagger_blueprint",
]
# from .user import user_blueprint

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -1,25 +1,22 @@
from flask import Blueprint from flask import Blueprint
from app.controllers import AuthController from controllers import AuthController
from app.di_container import Container
from dependency_injector.wiring import inject, Provide
# Inisialisasi blueprint
auth_blueprint = Blueprint("auth", __name__) auth_blueprint = Blueprint("auth", __name__)
auth_controller = AuthController()
# Daftarkan rute ke controller
@auth_blueprint.route("/register", methods=["POST"])
def register():
return auth_controller.register()
@auth_blueprint.route("/login", methods=["POST"]) @auth_blueprint.route("/login", methods=["POST"])
@inject def login():
def login(auth_controller: AuthController = Provide[Container.auth_controller]):
return auth_controller.login() return auth_controller.login()
@auth_blueprint.route("/login/google", methods=["POST"])
@inject
def google_login(auth_controller: AuthController = Provide[Container.auth_controller]):
return auth_controller.google_login()
@auth_blueprint.route("/logout", methods=["DELETE"]) @auth_blueprint.route("/logout", methods=["DELETE"])
@inject def logout():
def logout(auth_controller: AuthController = Provide[Container.auth_controller]):
return auth_controller.logout() return auth_controller.logout()

View File

@ -1,33 +0,0 @@
from flask import Blueprint
from app.controllers import HistoryController
from app.di_container import Container
from dependency_injector.wiring import inject, Provide
history_blueprint = Blueprint("history", __name__)
@history_blueprint.route("/<user_id>", methods=["GET"])
@inject
def user_history(
user_id: str, controller: HistoryController = Provide[Container.history_controller]
):
return controller.get_quiz_by_user(user_id)
@history_blueprint.route("/detail/<answer_id>", methods=["GET"])
@inject
def user_detail_history(
answer_id: str,
controller: HistoryController = Provide[Container.history_controller],
):
print(answer_id)
return controller.get_detail_quiz_history(answer_id)
@history_blueprint.route("/session/<session_id>", methods=["GET"])
@inject
def session_history(
session_id: str,
controller: HistoryController = Provide[Container.history_controller],
):
return controller.get_session_history(session_id)

View File

@ -1,112 +0,0 @@
from flask import Blueprint, request
from app.di_container import Container
from dependency_injector.wiring import inject, Provide
from app.controllers import QuizController
quiz_bp = Blueprint("quiz", __name__)
@quiz_bp.route("", methods=["POST"])
@inject
def create_quiz(controller: QuizController = Provide[Container.quiz_controller]):
reqBody = request.get_json()
return controller.create_quiz(reqBody)
@quiz_bp.route("/ai", methods=["POST"])
@inject
def create_quiz_auto(controller: QuizController = Provide[Container.quiz_controller]):
reqBody = request.get_json()
return controller.create_quiz_auto(reqBody)
@quiz_bp.route("/<quiz_id>", methods=["GET"])
@inject
def get_quiz(
quiz_id: str, controller: QuizController = Provide[Container.quiz_controller]
):
return controller.get_quiz(quiz_id)
@quiz_bp.route("/answer", methods=["POST"])
@inject
def submit_answer(controller: QuizController = Provide[Container.quiz_controller]):
req_body = request.get_json()
return controller.submit_answer(req_body)
@quiz_bp.route("/answer/session", methods=["POST"])
@inject
def get_answer_session(controller: QuizController = Provide[Container.quiz_controller]):
req_body = request.get_json()
return controller.get_user_ans_session(req_body)
@quiz_bp.route("/answer", methods=["GET"])
@inject
def get_answer(controller: QuizController = Provide[Container.quiz_controller]):
quiz_id = request.args.get("quiz_id")
user_id = request.args.get("user_id")
session_id = request.args.get("session_id")
return controller.get_answer(
quiz_id=quiz_id, user_id=user_id, session_id=session_id
)
@quiz_bp.route("/populer", methods=["GET"])
@inject
def get_quiz_populer(
controller: QuizController = Provide[Container.quiz_controller],
):
page = request.args.get("page")
limit = request.args.get("limit")
lang_code = request.args.get("lang_code") or "id"
return controller.get_quiz_populer(
page=page,
limit=limit,
lang_code=lang_code,
)
@quiz_bp.route("/recommendation", methods=["GET"])
@inject
def get_quiz_recommendation(
controller: QuizController = Provide[Container.quiz_controller],
):
user_id = request.args.get("user_id")
page = request.args.get("page")
limit = request.args.get("limit")
lang_code = request.args.get("lang_code") or "id"
return controller.get_quiz_recommendation(
user_id=user_id,
page=page,
limit=limit,
lang_code=lang_code,
)
@quiz_bp.route("/user/<user_id>", methods=["GET"])
@inject
def get_user_quiz(
user_id: str, controller: QuizController = Provide[Container.quiz_controller]
):
page = request.args.get("page", default=1, type=int)
page_size = request.args.get("page_size", default=10, type=int)
return controller.get_user_quiz(user_id=user_id, page=page, page_size=page_size)
@quiz_bp.route("/search", methods=["GET"])
@inject
def search_quiz(controller: QuizController = Provide[Container.quiz_controller]):
keyword = request.args.get("keyword", "")
subject_id = request.args.get("subject_id")
page = int(request.args.get("page", 1))
limit = int(request.args.get("limit", 10))
lang_code = request.args.get("lang_code") or "id"
return controller.search_quiz(
keyword=keyword, subject_id=subject_id, page=page, limit=limit
)

View File

@ -1,18 +0,0 @@
from flask import Blueprint, request
from dependency_injector.wiring import inject, Provide
from app.di_container import Container
from app.controllers import SessionController
session_bp = Blueprint("session", __name__)
@session_bp.route("", methods=["POST"])
@inject
def sessionGet(controller: SessionController = Provide[Container.session_controller]):
return controller.createRoom(request.get_json())
@session_bp.route("/summary", methods=["POST"])
@inject
def summary(controller: SessionController = Provide[Container.session_controller]):
return controller.summaryall(request.get_json())

View File

@ -1,50 +0,0 @@
from flask import Blueprint, request
from dependency_injector.wiring import inject, Provide
from app.di_container import Container
from app.controllers import SubjectController
subject_blueprint = Blueprint("subject", __name__)
@subject_blueprint.route("", methods=["POST"])
@inject
def create_subject(
controller: SubjectController = Provide[Container.subject_controller],
):
return controller.create(request.get_json())
@subject_blueprint.route("", methods=["GET"])
@inject
def get_all_subjects(
controller: SubjectController = Provide[Container.subject_controller],
):
return controller.get_all()
@subject_blueprint.route("/<subject_id>", methods=["GET"])
@inject
def get_subject(
subject_id: str,
controller: SubjectController = Provide[Container.subject_controller],
):
return controller.get_by_id(subject_id)
@subject_blueprint.route("/<subject_id>", methods=["PUT"])
@inject
def update_subject(
subject_id: str,
controller: SubjectController = Provide[Container.subject_controller],
):
return controller.update(subject_id, request.get_json())
@subject_blueprint.route("/<subject_id>", methods=["DELETE"])
@inject
def delete_subject(
subject_id: str,
controller: SubjectController = Provide[Container.subject_controller],
):
return controller.delete(subject_id)

View File

@ -1,23 +0,0 @@
from flask import Blueprint, jsonify, send_file
from flask_swagger_ui import get_swaggerui_blueprint
import os
swagger_blueprint = Blueprint("swagger", __name__)
SWAGGER_URL = "/swagger"
API_URL = "http://127.0.0.1:5000/swagger/docs"
swagger_ui_blueprint = get_swaggerui_blueprint(
SWAGGER_URL,
API_URL,
config={"app_name": "Quiz Maker API"},
)
swagger_blueprint.register_blueprint(swagger_ui_blueprint)
@swagger_blueprint.route("/swagger/docs")
def serve_openapi():
"""Serve the OpenAPI spec from a file."""
docs_path = os.path.abspath("docs/rest_api_docs.yaml")
return send_file(docs_path, mimetype="application/yaml")

View File

@ -1,48 +1,12 @@
# /blueprints/user.py
from flask import Blueprint from flask import Blueprint
from app.di_container import Container from controllers import UserController
from app.controllers import UserController
from dependency_injector.wiring import inject, Provide
user_blueprint = Blueprint("user", __name__) user_blueprint = Blueprint("user", __name__)
user_controller = UserController()
@user_blueprint.route("/users", methods=["GET"]) @user_blueprint.route("/users", methods=["GET"])
@inject def get_users():
def get_users(user_controller: UserController = Provide[Container.user_controller]):
return user_controller.get_users() return user_controller.get_users()
@user_blueprint.route("/register", methods=["POST"])
@inject
def register(user_controller: UserController = Provide[Container.user_controller]):
return user_controller.register()
@user_blueprint.route("/user/update", methods=["POST"])
@inject
def update_user(user_controller: UserController = Provide[Container.user_controller]):
return user_controller.update_profile()
@user_blueprint.route("/user/change-password", methods=["POST"])
@inject
def change_password(
user_controller: UserController = Provide[Container.user_controller],
):
return user_controller.change_password()
@user_blueprint.route("/user/<string:user_id>", methods=["GET"])
@inject
def get_user(
user_id, user_controller: UserController = Provide[Container.user_controller]
):
return user_controller.get_user_by_id(user_id)
@user_blueprint.route("/user/status/<string:user_id>", methods=["GET"])
@inject
def get_user_stat(
user_id, user_controller: UserController = Provide[Container.user_controller]
):
return user_controller.user_stat(user_id)

View File

@ -1,2 +1 @@
from .config import Config from .config import Config
from .logger_config import LoggerConfig

Binary file not shown.

Binary file not shown.

View File

@ -1,42 +1,18 @@
from dotenv import load_dotenv
import os import os
from dotenv import load_dotenv
# Load variables from .env load_dotenv() # Load environment variables from .env
load_dotenv(override=True)
class Config: class Config:
# Flask Environment Settings MONGO_URI = os.getenv(
FLASK_ENV = os.getenv("FLASK_ENV", "development") "MONGO_URI", "mongodb://localhost:27017/quiz_app"
DEBUG = os.getenv("DEBUG", "False").lower() in ("true", "1", "t") ) # Default value if not set
API_VERSION = os.getenv("API_VERSION", "v1")
SECRET_KEY = os.getenv("SECRET_KEY", "your_secret_key")
# MongoDB Settings FLASK_ENV = os.getenv("FLASK_ENV", "development") # Default to development
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/yourdb")
# Google OAuth Settings DEBUG = os.getenv("DEBUG", "True").lower() in [
GOOGLE_PROJECT_ID = os.getenv("GOOGLE_PROJECT_ID") "true",
GOOGLE_CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID") "1",
GOOGLE_CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET") "t",
GOOGLE_AUTH_URI = os.getenv( ] # Convert string to boolean
"GOOGLE_AUTH_URI", "https://accounts.google.com/o/oauth2/auth"
)
GOOGLE_TOKEN_URI = os.getenv(
"GOOGLE_TOKEN_URI", "https://oauth2.googleapis.com/token"
)
GOOGLE_AUTH_PROVIDER_X509_CERT_URL = os.getenv("GOOGLE_AUTH_PROVIDER_X509_CERT_URL")
GOOGLE_SCOPE = "email profile"
GOOGLE_BASE_URL = "https://www.googleapis.com/oauth2/v1/"
# Redis Configuration
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_DB = int(os.getenv("REDIS_DB", 0))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", None)
@property
def REDIS_URL(self):
if self.REDIS_PASSWORD:
return f"redis://:{self.REDIS_PASSWORD}@{self.REDIS_HOST}:{self.REDIS_PORT}/{self.REDIS_DB}"
return f"redis://{self.REDIS_HOST}:{self.REDIS_PORT}/{self.REDIS_DB}"

View File

@ -1,76 +0,0 @@
import logging
import os
from logging.handlers import RotatingFileHandler
class LoggerConfig:
"""A class to configure logging for the Flask application."""
LOG_DIR = "logs" # Define the log directory
@staticmethod
def init_logger(app, production_mode=False):
"""
Initializes separate log files for INFO, ERROR, and WARNING levels.
Args:
app: Flask application instance
production_mode (bool): If True, disables console output and only logs to files
"""
# Ensure the logs directory exists
if not os.path.exists(LoggerConfig.LOG_DIR):
os.makedirs(LoggerConfig.LOG_DIR)
# Remove default handlers to prevent duplicate logging
for handler in app.logger.handlers[:]:
app.logger.removeHandler(handler)
# Disable propagation to root logger to prevent console output in production
if production_mode:
app.logger.propagate = False
# Create separate loggers
info_handler = LoggerConfig._setup_logger(
"info_logger", "info.log", logging.INFO, logging.WARNING
)
error_handler = LoggerConfig._setup_logger(
"error_logger", "error.log", logging.ERROR, logging.CRITICAL
)
warning_handler = LoggerConfig._setup_logger(
"warning_logger", "warning.log", logging.WARNING, logging.WARNING
)
# Attach handlers to Flask app logger
app.logger.addHandler(info_handler)
app.logger.addHandler(error_handler)
app.logger.addHandler(warning_handler)
app.logger.setLevel(logging.DEBUG) # Set lowest level to capture all logs
# Add console handler only in development mode
if not production_mode:
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(message)s"
)
console_handler.setFormatter(console_formatter)
app.logger.addHandler(console_handler)
app.logger.info("Logger has been initialized for Flask application.")
@staticmethod
def _setup_logger(name, filename, level, max_level):
"""Helper method to configure loggers for specific levels."""
logger = logging.getLogger(name)
logger.setLevel(level)
log_file_path = os.path.join(LoggerConfig.LOG_DIR, filename)
log_handler = RotatingFileHandler(log_file_path, maxBytes=100000, backupCount=3)
log_handler.setLevel(level)
log_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
log_handler.setFormatter(log_formatter)
log_handler.addFilter(lambda record: level <= record.levelno <= max_level)
logger.addHandler(log_handler)
return log_handler

View File

@ -1,17 +1,2 @@
from .auth_controller import AuthController from .auth_controller import AuthController
from .user_controller import UserController from .user_controller import UserController
from .quiz_controller import QuizController
from .history_controller import HistoryController
from .subject_controller import SubjectController
from .socket_conroller import SocketController
from .session_controller import SessionController
__all__ = [
"AuthController",
"UserController",
"QuizController",
"HistoryController",
"SubjectController",
"SocketController",
"SessionController",
]

Binary file not shown.

View File

@ -1,77 +1,19 @@
from flask import jsonify, request, current_app from flask import jsonify, request
from pydantic import ValidationError from services import UserService
from app.schemas.basic_response_schema import ResponseSchema from services import AuthService
from app.schemas.google_login_schema import GoogleLoginSchema
from app.schemas import LoginSchema
from app.services import UserService, AuthService
from app.exception import AuthException
from app.mapper import UserMapper
from app.helpers import make_response, make_error_response
import logging
logging = logging.getLogger(__name__)
class AuthController: class AuthController:
def __init__(self, userService: UserService, authService: AuthService): def __init__(self):
self.user_service = userService self.user_service = UserService()
self.auth_service = authService self.auth_service = AuthService()
def login(self): def login(self):
try:
data = request.get_json() data = request.get_json()
dataSchema = LoginSchema(**data) users = self.auth_service.login(data)
response = self.auth_service.login(dataSchema) response = {
"status": True,
if response is None: "message": "success retrive data",
return make_response(message="User is not registered", status_code=401) "data": users,
return make_response(message="Login success", data=response) }
except ValidationError as e: return jsonify(response)
current_app.logger.error(f"Validation error: {e}")
response = ResponseSchema(message="Invalid input", data=None, meta=None)
return jsonify(response.model_dump()), 400
except Exception as e:
current_app.logger.error(
f"Error during Google login: {str(e)}", exc_info=True
)
response = ResponseSchema(
message="Internal server error", data=None, meta=None
)
return jsonify(response.model_dump()), 500
def google_login(self):
"""Handles Google Login via ID Token verification"""
try:
data = request.get_json()
validated_data = GoogleLoginSchema(**data)
id_token = validated_data.token_id
user_info = self.auth_service.verify_google_id_token(id_token)
if not user_info:
return make_response(message="Invalid Google ID Token", data=user_info)
return make_response(message="Login Success", data=user_info)
except ValidationError as e:
current_app.logger.error(f"Validation error: {e}")
response = ResponseSchema(message="Invalid input", data=None, meta=None)
return jsonify(response.model_dump()), 400
except AuthException as e:
current_app.logger.error(f"Auth error: {e}")
response = ResponseSchema(message=e, data=None, meta=None)
return jsonify(response.model_dump()), 400
except Exception as e:
current_app.logger.error(
f"Error during Google login: {str(e)}", exc_info=True
)
response = ResponseSchema(
message="Internal server error", data=None, meta=None
)
return jsonify(response.model_dump()), 500
def logout(self):
return jsonify({"message": "logout"}), 200

View File

@ -1,32 +0,0 @@
from app.services import HistoryService
from app.helpers import make_error_response, make_response
class HistoryController:
def __init__(self, history_service: HistoryService):
self.history_service = history_service
def get_quiz_by_user(self, user_id: str):
try:
data = self.history_service.get_history_by_user_id(user_id)
return make_response(message="retrive history data", data=data)
except Exception as e:
return make_error_response(e)
def get_detail_quiz_history(self, answer_id: str):
try:
data = self.history_service.get_history_by_answer_id(answer_id)
return make_response(
message="success retrive detail history data", data=data
)
except Exception as e:
return make_error_response(e)
def get_session_history(self, session_id):
try:
result = self.history_service.get_session_history(session_id)
return make_response(message="success get history session", data=result)
except Exception as e:
return make_error_response(e)

View File

@ -1,188 +0,0 @@
import json
from pydantic import ValidationError
from app.schemas.requests import QuizCreateSchema, UserAnswerSchema
from app.schemas.response import QuizCreationResponse
from app.services import QuizService, AnswerService, QuestionGenerationService
from app.helpers import make_response, make_error_response
from app.exception import ValidationException, DataNotFoundException
class QuizController:
def __init__(
self,
quiz_service: QuizService,
answer_service: AnswerService,
question_generate_service: QuestionGenerationService,
):
self.quiz_service = quiz_service
self.answer_service = answer_service
self.question_generate_service = question_generate_service
def get_quiz(self, quiz_id):
try:
result = self.quiz_service.get_quiz(quiz_id)
if not result:
return make_response(message="Quiz not found", status_code=404)
return make_response(message="Quiz Found", data=result.model_dump())
except DataNotFoundException as e:
return make_response(message=e.message, status_code=e.status_code)
except Exception as e:
return make_error_response(e)
def create_quiz(self, quiz_data):
try:
quiz_obj = QuizCreateSchema(**quiz_data)
quiz_id = self.quiz_service.create_quiz(quiz_obj)
return make_response(
message="Quiz created",
data=QuizCreationResponse(quiz_id=quiz_id),
status_code=201,
)
except (ValidationError, ValidationException) as e:
return make_response(message="", status_code=400)
except Exception as e:
return make_error_response(e)
def quiz_populer(self):
try:
result = self.quiz_service.get_quiz_populer()
if not result:
return make_response(message="Quiz not found", status_code=404)
return make_response(message="Quiz Found", data=result.model_dump())
except Exception as e:
return make_error_response(e)
def submit_answer(self, answer_data):
try:
answer_obj = UserAnswerSchema(**answer_data)
answer_id = self.answer_service.create_answer(answer_obj)
return make_response(
message="Answer submitted",
data={"answer_id": answer_id},
status_code=201,
)
except ValidationError as e:
return make_response(
message="validation error", data=json.loads(e.json()), status_code=400
)
except ValidationException as e:
return make_response(
message=f"validation issue {e.message}", status_code=400
)
except Exception as e:
return make_error_response(e)
def get_answer(self, quiz_id, user_id, session_id):
try:
# self.answer_service.
pass
except Exception as e:
return make_error_response(e)
def get_user_quiz(self, user_id, page=1, page_size=10):
try:
result = self.quiz_service.get_user_quiz(
user_id=user_id, page=page, page_size=page_size
)
return make_response(
message="User quizzes retrieved successfully",
data=result.quizzes,
page=page,
page_size=page_size,
total_all_data=result.total,
)
except Exception as e:
return make_error_response(e)
def get_quiz_populer(self, page, limit, lang_code):
try:
page = int(page) if page is not None else 1
limit = int(limit) if limit is not None else 3
result = self.quiz_service.get_quiz_populer(
page=page,
limit=limit,
lang_code=lang_code,
)
return make_response(message="success retrieve populer quiz", data=result)
except DataNotFoundException as e:
return make_response(message=e.message, status_code=e.status_code)
except ValueError as e:
return make_response(message=str(e), data=None, status_code=400)
except ValidationError as e:
return make_response(
message="validation error", data=json.loads(e.json()), status_code=400
)
except Exception as e:
return make_error_response(e)
def get_quiz_recommendation(self, user_id, page, limit, lang_code):
try:
page = int(page) if page is not None else 1
limit = int(limit) if limit is not None else 3
result = self.quiz_service.get_quiz_recommendation(
user_id=user_id,
page=page,
limit=limit,
lang_code=lang_code,
)
return make_response(message="success retrieve populer quiz", data=result)
except DataNotFoundException as e:
return make_response(message=e.message, status_code=e.status_code)
except ValueError as e:
return make_response(message=str(e), data=None, status_code=400)
except ValidationError as e:
return make_response(
message="validation error", data=json.loads(e.json()), status_code=400
)
except Exception as e:
return make_error_response(e)
def search_quiz(self, keyword: str, subject_id: str, page: int, limit: int):
try:
quiz, total = self.quiz_service.search_quiz(
keyword, subject_id, page, limit
)
return make_response(
message="success",
data=quiz,
page=page,
page_size=limit,
total_all_data=total,
)
except Exception as e:
return make_error_response(e)
def create_quiz_auto(
self,
reqBody,
):
try:
result = self.question_generate_service.createQuizAutomate(
reqBody["sentence"]
)
return make_response(message="succes labeling", data=result)
except Exception as e:
return make_error_response(e)
def get_user_ans_session(self, body):
try:
session_id = body.get("session_id")
user_id = body.get("user_id")
if not session_id and not user_id:
return make_response(
message="session_id or user_id must be provided", status_code=400
)
data = self.answer_service.get_answer_session(
session_id=session_id,
user_id=user_id,
)
return make_response(message="Successfully retrieved the answer", data=data)
except KeyError as e:
return make_error_response(f"Missing required key: {str(e)}")
except Exception as e:
return make_error_response(f"An error occurred: {str(e)}")

View File

@ -1,44 +0,0 @@
from flask import request, jsonify
from flask.views import MethodView
from app.services.session_service import SessionService
from app.helpers import make_response
class SessionController(MethodView):
def __init__(self, session_service: SessionService):
self.session_service = session_service
def createRoom(self, data):
required_fields = [
"quiz_id",
"host_id",
"room_name",
"limit_participan",
]
for field in required_fields:
if field not in data:
return jsonify({"error": f"Missing field: {field}"}), 400
session = self.session_service.create_session(
quiz_id=data["quiz_id"],
host_id=data["host_id"],
room_name=data["room_name"],
limit_participan=data["limit_participan"],
)
return make_response(
message="succes create room",
data=session,
status_code=201,
)
def summaryall(self, body):
self.session_service.summaryAllSessionData(
session_id=body.get("session_id"), start_time=""
)
return make_response(
message="succes create room",
data="",
status_code=201,
)

View File

@ -1,261 +0,0 @@
from flask_socketio import SocketIO, emit, join_room, leave_room
from flask import request, current_app
from app.services import SessionService
import threading
class SocketController:
def __init__(
self,
socketio: SocketIO,
session_service: SessionService,
):
self.socketio = socketio
self.session_service = session_service
self._register_events()
def _register_events(self):
@self.socketio.on("connect")
def on_connect():
try:
current_app.logger.info(f"Client connected: {request.sid}")
emit("connection_response", {"status": "connected", "sid": request.sid})
except Exception as e:
emit("error", {"message": f"Connect error: {str(e)}"})
current_app.logger.error(f"Connect error: {str(e)}")
@self.socketio.on("disconnect")
def on_disconnect():
try:
current_app.logger.info(f"Client disconnected: {request.sid}")
except Exception as e:
emit("error", {"message": f"Disconnect error: {str(e)}"})
current_app.logger.error(f"error: {str(e)}")
@self.socketio.on("join_room")
def handle_join_room(data):
try:
session_code = data.get("session_code")
user_id = data.get("user_id")
if not session_code or not user_id:
emit("error", {"message": "session_code and user_id are required"})
return
session = self.session_service.join_session(session_code, user_id)
if session is None:
emit(
"error",
{"message": "Failed to join session or session inactive"},
)
return
session_id = session["session_id"]
join_room(session_id)
message = (
"Admin has joined the room."
if session["is_admin"]
else f"User {session['username']} has joined the room."
)
current_app.logger.info(f"Client joined: {message}")
emit(
"room_message",
{
"type": "join",
"message": message,
"room": session_id,
"argument": "adm_update",
"data": {
"session_info": session["session_info"],
"quiz_info": session["quiz_info"],
},
},
to=request.sid,
)
emit(
"room_message",
{
"type": "participan_join",
"message": message,
"room": session_id,
"argument": "adm_update",
"data": {
"participants": session["session_info"]["participants"],
},
},
room=session_id,
skip_sid=request.sid,
)
except Exception as e:
emit("error", {"message": f"Join room error: {str(e)}"})
current_app.logger.error(f"error: {str(e)}")
@self.socketio.on("leave_room")
def handle_leave_room(data):
try:
session_id = data.get("session_id")
user_id = data.get("user_id")
username = data.get("username", "anonymous")
leave_result = self.session_service.leave_session(session_id, user_id)
leave_room(session_id)
if leave_result["is_success"]:
emit(
"room_message",
{
"type": "participan_leave",
"message": f"{username} has left the room.",
"room": session_id,
"argument": "adm_update",
"data": {
"participants": leave_result["participants"],
},
},
room=session_id,
skip_sid=request.sid,
)
emit(
"room_message",
{
"type": "leave",
"message": f"{username} has left the room.",
"room": session_id,
"argument": "adm_update",
"data": None,
},
room=session_id,
to=request.sid,
)
except Exception as e:
emit("error", {"message": f"Leave room error: {str(e)}"})
current_app.logger.error(f"error: {str(e)}")
@self.socketio.on("send_message")
def on_send_message(data):
try:
session_code = data.get("session_id")
message = data.get("message")
username = data.get("username", "anonymous")
emit(
"receive_message",
{"message": message, "from": username},
room=session_code,
)
except Exception as e:
emit("error", {"message": f"Send message error: {str(e)}"})
current_app.logger.error(f"error: {str(e)}")
@self.socketio.on("end_session")
def handle_end_session(data):
try:
session_code = data.get("session_id")
user_id = data.get("user_id")
if not session_code or not user_id:
emit("error", {"message": "session_id and user_id required"})
return
self.session_service.end_session(
session_id=session_code, user_id=user_id
)
for key in [
self._answers_key(session_code),
self._scores_key(session_code),
self._questions_key(session_code),
]:
self.redis_repo.delete_key(key)
emit(
"room_closed",
{"message": "Session has ended.", "room": session_code},
room=session_code,
)
except Exception as e:
emit("error", {"message": f"End session error: {str(e)}"})
current_app.logger.error(f"error: {str(e)}")
@self.socketio.on("start_quiz")
def handle_start_quiz(data):
try:
session_id = data.get("session_id")
if not session_id:
emit("error", {"message": "session_id is required"})
return
emit("quiz_started", {"message": "Quiz has started!"}, room=session_id)
threading.Thread(
target=self.session_service.run_quiz_flow,
args=(session_id, self.socketio),
daemon=True,
).start()
except Exception as e:
emit("error", {"message": f"Start quiz error: {str(e)}"})
current_app.logger.error(f"error: {str(e)}")
@self.socketio.on("submit_answer")
def handle_submit_answer(data):
try:
session_id = data.get("session_id")
user_id = data.get("user_id")
question_index = data.get("question_index")
user_answer = data.get("answer")
time_spent = data.get("time_spent")
if not all(
[
session_id,
user_id,
question_index is not None,
user_answer is not None,
time_spent is not None,
]
):
emit(
"error",
{
"message": "session_id, user_id, question_index, and answer are required"
},
)
return
result = self.session_service.submit_answer(
session_id=session_id,
user_id=user_id,
question_index=question_index,
answer=user_answer,
time_spent=time_spent,
)
emit(
"answer_submitted",
{
"question_index": result["question_index"],
"answer": result["answer"],
"correct": result["correct"],
"score": result["scores"],
},
to=request.sid,
)
emit(
"score_update",
{
"scores": self.session_service.get_ranked_scores(session_id),
},
room=session_id,
)
except ValueError as exc:
emit("error", {"message": str(exc)})
current_app.logger.error(f"error: {str(exc)}")
except Exception as e:
emit("error", {"message": f"Submit answer error: {str(e)}"})
current_app.logger.error(f"error: {str(e)}")

View File

@ -1,50 +0,0 @@
from app.services.subject_service import SubjectService
from app.helpers import make_response, make_error_response
from app.schemas.requests import SubjectCreateRequest
class SubjectController:
def __init__(self, service: SubjectService):
self.service = service
def create(self, req_body):
try:
data = SubjectCreateRequest(**req_body)
new_id = self.service.create_subject(data)
return make_response(message="Subject created", data={"id": new_id})
except Exception as e:
return make_error_response(e)
def get_all(self):
try:
subjects = self.service.get_all_subjects()
return make_response(message="success retrieve subject", data=subjects)
except Exception as e:
return make_error_response(e)
def get_by_id(self, subject_id: str):
try:
subject = self.service.get_subject_by_id(subject_id)
if not subject:
return make_response(message="Subject not found", status_code=404)
return make_response(data=subject.model_dump())
except Exception as e:
return make_error_response(e)
def update(self, subject_id: str, req_body):
try:
updated = self.service.update_subject(subject_id, req_body)
if not updated:
return make_response(message="No subject updated", status_code=404)
return make_response(message="Subject updated")
except Exception as e:
return make_error_response(e)
def delete(self, subject_id: str):
try:
deleted = self.service.delete_subject(subject_id)
if not deleted:
return make_response(message="No subject deleted", status_code=404)
return make_response(message="Subject deleted")
except Exception as e:
return make_error_response(e)

View File

@ -1,129 +1,12 @@
# /controllers/user_controller.py # /controllers/user_controller.py
from flask import jsonify, request, current_app from flask import jsonify
from app.services import UserService from services import UserService
from app.schemas import RegisterSchema
from pydantic import ValidationError
from app.schemas import ResponseSchema
from app.exception import AlreadyExistException, DataNotFoundException
from app.helpers import make_response
from app.schemas.requests import ProfileUpdateSchema
class UserController: class UserController:
def __init__(self, userService: UserService): def __init__(self):
self.user_service = userService self.user_service = UserService()
def register(self): def get_users(self):
try: users = self.user_service.get_all_users()
request_data = request.get_json() return jsonify(users)
register_data = RegisterSchema(**request_data)
self.user_service.register_user(register_data)
return make_response("Register Success")
except ValidationError as e:
current_app.logger.error(f"Validation error: {e}")
response = ResponseSchema(message="Invalid input", data=None, meta=None)
return make_response("Invalid input", status_code=400)
except AlreadyExistException as e:
return make_response("User already exists", status_code=409)
except Exception as e:
current_app.logger.error(
f"Error during Google login: {str(e)}", exc_info=True
)
return make_response("Internal server error", status_code=500)
def get_user_by_id(self, user_id):
try:
if not user_id:
return make_response("User ID is required", status_code=400)
user = self.user_service.get_user_by_id(user_id)
if user:
return make_response("User found", data=user)
else:
return make_response("User not found", status_code=404)
except Exception as e:
current_app.logger.error(
f"Error while retrieving user: {str(e)}", exc_info=True
)
return make_response(
message="An internal server error occurred. Please try again later.",
status_code=500,
)
def update_profile(self):
try:
body = request.get_json()
reqBody = ProfileUpdateSchema(**body)
result = self.user_service.update_profile(reqBody)
if result:
return make_response(message="User profile updated successfully.")
else:
return make_response(
message="Failed to update user profile. Please check the submitted data.",
status_code=400,
)
except DataNotFoundException as e:
return make_response(message="User data not found.", status_code=404)
except ValueError as e:
return make_response(
message=f"Invalid data provided: {str(e)}", status_code=400
)
except Exception as e:
current_app.logger.error(
f"Error while updating profile: {str(e)}", exc_info=True
)
return make_response(
message="An internal server error occurred. Please try again later.",
status_code=500,
)
def change_password(self):
try:
body = request.get_json()
user_id = body.get("id")
current_password = body.get("current_password")
new_password = body.get("new_password")
if not all([user_id, current_password, new_password]):
return make_response(
message="Missing required fields: id, current_password, new_password",
status_code=400,
)
result = self.user_service.change_password(
user_id, current_password, new_password
)
if result:
return make_response(message="Password changed successfully.")
else:
return make_response(
message="Failed to change password.",
status_code=400,
)
except DataNotFoundException as e:
return make_response(message="User data not found.", status_code=404)
except ValueError as e:
return make_response(message=f"{str(e)}", status_code=400)
except Exception as e:
current_app.logger.error(
f"Error while changing password: {str(e)}", exc_info=True
)
return make_response(
message="An internal server error occurred. Please try again later.",
status_code=500,
)
def user_stat(self, user_id):
try:
response = self.user_service.get_user_status(user_id)
return make_response(message="Success retrive user stat", data=response)
except Exception as e:
current_app.logger.error(
f"Error while changing password: {str(e)}", exc_info=True
)
return make_response(
message="An internal server error occurred. Please try again later.",
status_code=500,
)

View File

@ -1,2 +1 @@
from .db import init_db from .db import init_db

Binary file not shown.

Binary file not shown.

View File

@ -1,16 +1,11 @@
from flask_pymongo import PyMongo from flask_pymongo import PyMongo
from flask import Flask, current_app from flask import Flask
from .seed.subject_seed import seed_subjects from configs import Config
mongo = PyMongo()
def init_db(app: Flask) -> PyMongo: def init_db(app: Flask):
try: app.config["MONGO_URI"] = Config.MONGO_URI
mongo = PyMongo(app) print(Config.MONGO_URI)
mongo.init_app(app)
mongo.cx.server_info()
app.logger.info("MongoDB connection established")
seed_subjects(mongo)
return mongo
except Exception as e:
app.logger.error(f"MongoDB connection failed: {e}")
return None

View File

@ -1,42 +0,0 @@
from flask_pymongo import PyMongo
def seed_subjects(mongo: PyMongo):
subject_collection = mongo.db.subjects
base_subjects = [
{
"name": "Ilmu Pengetahuan Alam",
"short_name": "IPA",
"description": "Pelajaran tentang sains dan alam",
},
{
"name": "Ilmu Pengetahuan Sosial",
"short_name": "IPS",
"description": "Pelajaran tentang masyarakat dan geografi",
},
{
"name": "Sejarah",
"short_name": "Sejarah",
"description": "Pelajaran mengenai sejarah di indonesia",
},
{
"name": "Matematika",
"short_name": "Matematika",
"description": "Pelajaran tentang angka dan logika",
},
{
"name": "Bahasa Indonesia",
"short_name": "B.Indonesia",
"description": "Pelajaran tentang bahasa nasional",
},
{
"name": "Sejarah",
"short_name": "Sejarah",
"description": "Pelajaran sejarah Indonesia",
},
]
for subject in base_subjects:
if not subject_collection.find_one({"name": subject["name"]}):
subject_collection.insert_one(subject)

View File

@ -1,136 +0,0 @@
from dependency_injector import containers, providers
from app.repositories import (
UserRepository,
QuizRepository,
UserAnswerRepository,
SubjectRepository,
SessionRepository,
NERSRLRepository,
SessionMemoryRepository,
QuizMemoryRepository,
AnswerMemoryRepository,
ScoreMemoryRepository,
QuestionGenerationRepository,
AnswerGenerationRepository,
)
from app.services import (
UserService,
AuthService,
QuizService,
AnswerService,
HistoryService,
SubjectService,
SessionService,
QuestionGenerationService,
)
from app.controllers import (
UserController,
AuthController,
QuizController,
HistoryController,
SubjectController,
SocketController,
SessionController,
)
class Container(containers.DeclarativeContainer):
"""Dependency Injection Container"""
mongo = providers.Dependency()
redis = providers.Dependency()
socketio = providers.Dependency()
# repository
user_repository = providers.Factory(UserRepository, mongo.provided.db)
quiz_repository = providers.Factory(QuizRepository, mongo.provided.db)
answer_repository = providers.Factory(UserAnswerRepository, mongo.provided.db)
subject_repository = providers.Factory(SubjectRepository, mongo.provided.db)
session_repository = providers.Factory(SessionRepository, mongo.provided.db)
ner_srl_repository = providers.Factory(NERSRLRepository)
question_generation_repository = providers.Factory(QuestionGenerationRepository)
answer_generator_repository = providers.Factory(AnswerGenerationRepository)
session_memory_repository = providers.Factory(SessionMemoryRepository, redis)
quiz_memory_repository = providers.Factory(QuizMemoryRepository, redis)
answer_memory_repository = providers.Factory(AnswerMemoryRepository, redis)
score_memory_repository = providers.Factory(ScoreMemoryRepository, redis)
# services
auth_service = providers.Factory(
AuthService,
user_repository,
)
user_service = providers.Factory(
UserService,
user_repository,
answer_repository,
quiz_repository,
)
quiz_service = providers.Factory(
QuizService,
quiz_repository,
user_repository,
subject_repository,
answer_repository,
)
answer_service = providers.Factory(
AnswerService,
answer_repository,
quiz_repository,
user_repository,
)
history_service = providers.Factory(
HistoryService,
quiz_repository,
answer_repository,
session_repository,
user_repository,
)
subject_service = providers.Factory(
SubjectService,
subject_repository,
)
session_service = providers.Factory(
SessionService,
session_repository,
session_memory_repository,
quiz_memory_repository,
answer_memory_repository,
score_memory_repository,
user_repository,
quiz_repository,
answer_repository,
)
question_generation_service = providers.Factory(
QuestionGenerationService,
ner_srl_repository,
question_generation_repository,
answer_generator_repository,
)
# controllers
auth_controller = providers.Factory(AuthController, user_service, auth_service)
user_controller = providers.Factory(UserController, user_service)
quiz_controller = providers.Factory(
QuizController,
quiz_service,
answer_service,
question_generation_service,
)
history_controller = providers.Factory(HistoryController, history_service)
subject_controller = providers.Factory(SubjectController, subject_service)
socket_controller = providers.Factory(
SocketController,
socketio,
session_service,
)
session_controller = providers.Factory(SessionController, session_service)

View File

@ -1,12 +0,0 @@
from .auth_exception import AuthException
from .already_exist_exception import AlreadyExistException
from .data_not_found_exception import DataNotFoundException
from .validation_exception import ValidationException
__all__ = [
"AuthException",
"AlreadyExistException",
"DataNotFoundException",
"ValidationException",
]

View File

@ -1,6 +0,0 @@
class AlreadyExistException(Exception):
def __init__(self, entity: str, message: str = None):
if message is None:
message = f"{entity} already exists"
self.message = message
super().__init__(self.message)

View File

@ -1,8 +0,0 @@
from .base_exception import BaseExceptionTemplate
class AuthException(BaseExceptionTemplate):
"""Exception for authentication-related errors"""
def __init__(self, message: str = "Authentication failed"):
super().__init__(message, status_code=401)

View File

@ -1,13 +0,0 @@
class BaseExceptionTemplate(Exception):
"""Base exception template for custom exceptions"""
def __init__(self, message: str, status_code: int = 400):
self.message = message
self.status_code = status_code
super().__init__(self.message)
def __str__(self):
return f"{self.__class__.__name__}: {self.message}"
def json(self):
return {"error": self.__class__.__name__, "message": self.message}

View File

@ -1,8 +0,0 @@
from .base_exception import BaseExceptionTemplate
class DataNotFoundException(BaseExceptionTemplate):
"""Exception for data not found"""
def __init__(self, message: str = "Data Not Found"):
super().__init__(message, status_code=404)

View File

@ -1,8 +0,0 @@
from .base_exception import BaseExceptionTemplate
class ValidationException(BaseExceptionTemplate):
"""Exception for validation"""
def __init__(self, message: str = "validation error, check yout input"):
super().__init__(message, status_code=400)

View File

@ -1,9 +0,0 @@
from .response_helper import make_response, make_error_response
from .datetime_util import DatetimeUtil
__all__ = [
"make_response",
"make_error_response",
"DatetimeUtil",
]

View File

@ -1,44 +0,0 @@
from datetime import datetime
from zoneinfo import ZoneInfo
class DatetimeUtil:
@staticmethod
def now():
"""Waktu UTC (timezone-aware)"""
return datetime.now(tz=ZoneInfo("UTC"))
@staticmethod
def now_iso():
"""Waktu UTC dalam format ISO 8601 string"""
return datetime.now(tz=ZoneInfo("UTC")).isoformat()
@staticmethod
def now_jakarta():
"""Waktu sekarang di zona Asia/Jakarta (WIB)"""
return datetime.now(tz=ZoneInfo("Asia/Jakarta"))
@staticmethod
def to_string(dt: datetime, fmt: str = "%Y-%m-%d %H:%M:%S") -> str:
"""Convert UTC datetime to Asia/Jakarta time and format as string"""
if dt.tzinfo is None:
dt = dt.replace(tzinfo=ZoneInfo("UTC"))
jakarta_time = dt.astimezone(ZoneInfo("Asia/Jakarta"))
return jakarta_time.strftime(fmt)
@staticmethod
def from_string(
date_str: str, fmt: str = "%Y-%m-%d %H:%M:%S", tz: str = "UTC"
) -> datetime:
"""Convert string ke datetime dengan timezone"""
dt = datetime.strptime(date_str, fmt)
return dt.replace(tzinfo=ZoneInfo(tz))
@staticmethod
def from_iso(date_str: str, tz: str = "UTC") -> datetime:
"""Convert ISO 8601 string to datetime with timezone awareness"""
dt = datetime.fromisoformat(date_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=ZoneInfo(tz))
return dt

View File

@ -1,49 +0,0 @@
from flask import jsonify, current_app
from typing import Optional, Union
from app.schemas import ResponseSchema, MetaSchema
import math
def calculate_total_page(total_data: int, page_size: int) -> int:
if not page_size or page_size <= 0:
return 1
return math.ceil(total_data / page_size)
def make_response(
message: str,
data: Optional[Union[dict, list]] = None,
page: Optional[int] = None,
page_size: Optional[int] = None,
total_all_data: Optional[int] = None,
status_code: int = 200,
):
meta = None
if page is not None and page_size is not None and total_all_data is not None:
meta = MetaSchema(
current_page=page,
total_all_data=total_all_data,
total_data=len(data) if data else 0,
total_page=calculate_total_page(total_all_data, page_size),
)
response = ResponseSchema(
message=message,
data=data,
meta=meta,
)
return jsonify(response.model_dump()), status_code
def make_error_response(
err: Union[Exception, str],
log_message: Optional[str] = None,
status_code: int = 500,
):
"""Logs the error and returns a standardized error response"""
error_message = str(err) if isinstance(err, Exception) else err
log_msg = log_message or f"An error occurred: {error_message}"
current_app.logger.error(log_msg, exc_info=True)
response = ResponseSchema(message="Internal server error", data=None, meta=None)
return jsonify(response.model_dump()), status_code

Binary file not shown.

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1,77 +1,21 @@
import eventlet
eventlet.monkey_patch()
import logging
from flask import Flask from flask import Flask
from flask_socketio import SocketIO from blueprints import auth_blueprint, user_blueprint, default_blueprint
from database import init_db
from configs import Config # Import the config class
from app.di_container import Container app = Flask(__name__)
from app.configs import Config, LoggerConfig
from app.blueprints import (
auth_blueprint,
user_blueprint,
quiz_bp,
default_blueprint,
history_blueprint,
subject_blueprint,
session_bp,
swagger_blueprint,
)
from app.database import init_db
from redis import Redis
# Apply configurations environtment
app.config["FLASK_ENV"] = Config.FLASK_ENV
app.config["DEBUG"] = Config.DEBUG
def createApp() -> tuple[Flask, SocketIO]: # Initialize database
app = Flask(__name__) init_db(app)
app.config.from_object(Config)
LoggerConfig.init_logger(app, not Config.DEBUG)
logging.basicConfig( # Register blueprints
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s" app.register_blueprint(default_blueprint)
) app.register_blueprint(auth_blueprint, url_prefix="/api")
app.register_blueprint(user_blueprint, url_prefix="/api")
container = Container() if __name__ == "__main__":
app.container = container app.run(debug=Config.DEBUG)
mongo = init_db(app)
if mongo is not None:
container.mongo.override(mongo)
redis_url = Config().REDIS_URL
redis_client = Redis.from_url(redis_url)
redis_client.ping()
container.redis.override(redis_client)
socketio = SocketIO(
cors_allowed_origins="*",
# message_queue=redis_url,
async_mode="eventlet",
)
container.socketio.override(socketio)
container.socket_controller()
socketio.init_app(app)
container.wire(
modules=[
"app.blueprints.auth",
"app.blueprints.user",
"app.blueprints.quiz",
"app.blueprints.history",
"app.blueprints.subject",
"app.blueprints.session",
]
)
app.register_blueprint(default_blueprint)
app.register_blueprint(swagger_blueprint)
app.register_blueprint(auth_blueprint, url_prefix="/api")
app.register_blueprint(user_blueprint, url_prefix="/api")
app.register_blueprint(quiz_bp, url_prefix="/api/quiz")
app.register_blueprint(history_blueprint, url_prefix="/api/history")
app.register_blueprint(subject_blueprint, url_prefix="/api/subject")
app.register_blueprint(session_bp, url_prefix="/api/session")
return app, socketio

View File

@ -1,10 +0,0 @@
from .user_mapper import UserMapper
from .quiz_mapper import QuizMapper
from .subject_mapper import SubjectMapper
__all__ = [
"UserMapper",
"QuizMapper",
"SubjectMapper",
]

View File

@ -1,93 +0,0 @@
from datetime import datetime
from app.helpers import DatetimeUtil
from app.models import QuizEntity, QuestionItemEntity, UserEntity
from app.models.entities import SubjectEntity
from app.schemas import QuizGetSchema, QuestionItemSchema
from app.schemas.response import ListingQuizResponse
from app.schemas.requests import QuizCreateSchema
class QuizMapper:
@staticmethod
def map_question_entity_to_schema(entity: QuestionItemEntity) -> QuestionItemSchema:
return QuestionItemSchema(
index=entity.index,
question=entity.question,
target_answer=entity.target_answer,
duration=entity.duration,
type=entity.type,
options=entity.options,
)
@staticmethod
def map_question_schema_to_entity(schema: QuestionItemSchema) -> QuestionItemEntity:
return QuestionItemEntity(
index=schema.index,
question=schema.question,
target_answer=schema.target_answer,
duration=schema.duration,
type=schema.type,
options=schema.options,
)
@staticmethod
def map_quiz_entity_to_schema(
entity: QuizEntity,
subjectE: SubjectEntity,
) -> QuizGetSchema:
return QuizGetSchema(
id=str(entity.id),
author_id=entity.author_id,
subject_id=str(subjectE.id),
subject_alias=subjectE.short_name,
title=entity.title,
description=entity.description,
is_public=entity.is_public,
date=DatetimeUtil.to_string(entity.date, "%d-%m-%Y"),
time=DatetimeUtil.to_string(entity.date, "%H:%M:%S"),
total_quiz=entity.total_quiz or 0,
limit_duration=entity.limit_duration or 0,
question_listings=[
QuizMapper.map_question_entity_to_schema(q)
for q in entity.question_listings or []
],
)
@staticmethod
def map_quiz_schema_to_entity(
schema: QuizCreateSchema,
datetime: datetime,
total_duration: int,
) -> QuizEntity:
return QuizEntity(
author_id=schema.author_id,
subject_id=schema.subject_id,
title=schema.title,
description=schema.description,
is_public=schema.is_public,
date=datetime,
language_code=schema.lang_code,
total_quiz=len(schema.question_listings),
limit_duration=total_duration,
question_listings=[
QuizMapper.map_question_schema_to_entity(q)
for q in schema.question_listings or []
],
)
@staticmethod
def quiz_to_populer_mapper(
quiz_entity: QuizEntity,
user_entity: UserEntity,
) -> ListingQuizResponse:
return ListingQuizResponse(
quiz_id=str(quiz_entity.id),
author_id=str(user_entity.id),
author_name=user_entity.name,
title=quiz_entity.title,
description=quiz_entity.description,
date=quiz_entity.date.strftime("%d-%B-%Y") if quiz_entity.date else None,
duration=quiz_entity.limit_duration,
total_quiz=quiz_entity.total_quiz,
)

View File

@ -1,12 +0,0 @@
from app.schemas.requests import SubjectCreateRequest
from app.models.entities import SubjectEntity
class SubjectMapper:
@staticmethod
def to_entity(data: SubjectCreateRequest) -> SubjectEntity:
return SubjectEntity(
name=data.name,
short_name=data.alias,
description=data.description,
)

View File

@ -1,64 +0,0 @@
from datetime import datetime
from typing import Dict, Optional
from app.models import UserEntity
from app.schemas import RegisterSchema
from app.schemas.response import LoginResponseSchema
from app.helpers import DatetimeUtil
class UserMapper:
@staticmethod
def from_google_payload(
google_id: str, email: str, payload: Dict[str, Optional[str]]
) -> UserEntity:
return UserEntity(
google_id=google_id,
email=email,
name=payload.get("name"),
pic_url=payload.get("picture"),
birth_date=None,
phone=None,
role="user",
is_active=True,
address=None,
created_at=datetime.now(),
updated_at=datetime.now(),
verification_token=None,
)
@staticmethod
def from_register(data: RegisterSchema) -> UserEntity:
return UserEntity(
email=data.email,
password=data.password,
name=data.name,
birth_date=datetime.strptime(data.birth_date, "%d-%m-%Y").date(),
phone=data.phone,
role="user",
is_active=False,
address=None,
created_at=datetime.now(),
updated_at=datetime.now(),
verification_token=None,
)
@staticmethod
def user_entity_to_response(user: UserEntity) -> LoginResponseSchema:
return LoginResponseSchema(
id=str(user.id) if user.id else None,
email=user.email,
name=user.name,
birth_date=(
DatetimeUtil.to_string(user.birth_date, fmt="%d-%m-%Y")
if user.birth_date
else None
),
pic_url=user.pic_url,
phone=user.phone,
locale=user.locale,
created_at=(
DatetimeUtil.to_string(user.created_at, fmt="%d-%m-%Y")
if user.created_at
else None
),
)

View File

@ -1,13 +0,0 @@
# app/models/__init__.py
from .entities import UserEntity, QuizEntity, QuestionItemEntity, UserAnswerEntity
from .login import UserResponseModel
__all__ = [
"UserEntity",
"UserDTO",
"UserResponseModel",
"QuizEntity",
"QuestionItemEntity",
"UserAnswerEntity",
]

View File

@ -1 +0,0 @@
from .response import ApiResponse

View File

@ -1,22 +0,0 @@
# from pydantic import BaseModel
# from typing import Generic, TypeVar, Optional
# T = TypeVar("T")
# class ApiResponse(BaseModel, Generic[T]):
# success: bool
# message: str
# data: Optional[T] = None
# def to_json(self) -> str:
# """
# Convert the model to a properly formatted JSON string.
# """
# return self.model_dump_json(indent=4)
# def to_dict(self) -> dict:
# """
# Convert the model to a dictionary with proper key-value pairs.
# """
# return self.model_dump()

View File

@ -1,19 +0,0 @@
from .user_entity import UserEntity
from .base import PyObjectId
from .quiz_entity import QuizEntity
from .question_item_entity import QuestionItemEntity
from .user_answer_entity import UserAnswerEntity
from .answer_item import AnswerItemEntity
from .subject_entity import SubjectEntity
from .session_entity import SessionEntity
__all__ = [
"UserEntity",
"PyObjectId",
"QuizEntity",
"QuestionItemEntity",
"UserAnswerEntity",
"AnswerItemEntity",
"SubjectEntity",
"SessionEntity",
]

View File

@ -1,9 +0,0 @@
from pydantic import BaseModel
from typing import Union
class AnswerItemEntity(BaseModel):
question_index: int
answer: Union[str | int | bool]
is_correct: bool
time_spent: float

View File

@ -1,29 +0,0 @@
from bson import ObjectId
from pydantic import GetCoreSchemaHandler
from pydantic_core import core_schema
class PyObjectId(ObjectId):
"""Custom ObjectId type for Pydantic v2 to handle MongoDB _id"""
@classmethod
def __get_pydantic_core_schema__(
cls, source, handler: GetCoreSchemaHandler
) -> core_schema.CoreSchema:
return core_schema.no_info_after_validator_function(
cls.validate,
core_schema.union_schema(
[
core_schema.str_schema(),
core_schema.is_instance_schema(ObjectId),
]
),
)
@classmethod
def validate(cls, v):
if isinstance(v, ObjectId):
return v
if isinstance(v, str) and ObjectId.is_valid(v):
return ObjectId(v)
raise ValueError(f"Invalid ObjectId: {v}")

View File

@ -1,11 +0,0 @@
from typing import Optional, List, Union
from pydantic import BaseModel
class QuestionItemEntity(BaseModel):
index: int
question: str
target_answer: Union[str, bool, int]
duration: int
type: str
options: Optional[List[str]] = None

View File

@ -1,25 +0,0 @@
from typing import Optional
from pydantic import BaseModel, Field
from datetime import datetime
from .base import PyObjectId
from .question_item_entity import QuestionItemEntity
class QuizEntity(BaseModel):
id: Optional[PyObjectId] = Field(default=None, alias="_id")
author_id: Optional[str] = None
subject_id: str
title: str
description: Optional[str] = None
is_public: bool = False
date: datetime
total_quiz: int = 0
limit_duration: Optional[int] = 0
total_user_playing: int = 0
language_code: Optional[str] = "id"
question_listings: Optional[list[QuestionItemEntity]] = []
class ConfigDict:
arbitrary_types_allowed = True
populate_by_name = True
json_encoders = {PyObjectId: str}

View File

@ -1,19 +0,0 @@
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel, Field
from app.models.entities import PyObjectId
class SessionEntity(BaseModel):
id: Optional[PyObjectId] = Field(default=None, alias="_id")
session_code: str
quiz_id: str
host_id: str
room_name: str
created_at: datetime = Field(default_factory=datetime.now)
started_at: datetime | None = None
ended_at: datetime | None = None
is_active: bool = True
participan_limit: int = 10
participants: List[dict] = []
current_question_index: int = 0

View File

@ -1,17 +0,0 @@
from typing import Optional
from bson import ObjectId
from pydantic import BaseModel, Field
from app.models.entities import PyObjectId
class SubjectEntity(BaseModel):
id: Optional[PyObjectId] = Field(default=None, alias="_id")
name: str
short_name: str
description: Optional[str] = None
icon: Optional[str] = None
class ConfigDict:
populate_by_name = True
json_encoders = {ObjectId: str}
json_schema_extra = {}

View File

@ -1,23 +0,0 @@
from typing import List, Optional
from pydantic import BaseModel, Field
from datetime import datetime
from bson import ObjectId
from .answer_item import AnswerItemEntity
from .base import PyObjectId
class UserAnswerEntity(BaseModel):
id: Optional[PyObjectId] = Field(default=None, alias="_id")
session_id: Optional[str]
quiz_id: str
user_id: str
answered_at: datetime
answers: List[AnswerItemEntity]
total_score: int
total_correct: int
class ConfigDict:
populate_by_name = True
arbitrary_types_allowed = True
json_encoders = {ObjectId: str}

View File

@ -1,20 +0,0 @@
from typing import Optional
from pydantic import BaseModel, Field, ConfigDict
from datetime import datetime
from .base import PyObjectId
class UserEntity(BaseModel):
id: Optional[PyObjectId] = Field(default=None, alias="_id")
google_id: Optional[str] = None
email: str
password: Optional[str] = None
name: str
birth_date: Optional[datetime] = None
pic_url: Optional[str] = None
phone: Optional[str] = None
locale: str = "en-US"
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
model_config = ConfigDict(populate_by_name=True, json_encoders={PyObjectId: str})

View File

@ -1 +0,0 @@
from .login_response import UserResponseModel

View File

@ -1,20 +0,0 @@
from pydantic import BaseModel, EmailStr, Field
from typing import Optional
from datetime import datetime
class UserResponseModel(BaseModel):
id: Optional[str] = Field(alias="_id")
google_id: Optional[str] = None
email: EmailStr
name: str
birth_date: Optional[datetime] = None
pic_url: Optional[str] = None
phone: Optional[str] = None
locale: str
class ConfigDict:
populate_by_name = True
json_encoders = {
datetime: lambda v: v.isoformat(),
}

View File

@ -1,27 +1 @@
from .user_repository import UserRepository from .user_repository import UserRepository
from .quiz_repositroy import QuizRepository
from .answer_repository import UserAnswerRepository
from .subject_repository import SubjectRepository
from .session_repostory import SessionRepository
from .ner_srl_repository import NERSRLRepository
from .session_memory_repository import SessionMemoryRepository
from .quiz_memory_repository import QuizMemoryRepository
from .answer_memory_repository import AnswerMemoryRepository
from .score_memory_repository import ScoreMemoryRepository
from .question_generation_repository import QuestionGenerationRepository
from .answer_generation_repository import AnswerGenerationRepository
__all__ = [
"UserRepository",
"QuizRepository",
"UserAnswerRepository",
"SubjectRepository",
"SessionRepository",
"NERSRLRepository",
"SessionMemoryRepository",
"QuizMemoryRepository",
"AnswerMemoryRepository",
"ScoreMemoryRepository",
"QuestionGenerationRepository",
"AnswerGenerationRepository",
]

Binary file not shown.

View File

@ -1,71 +0,0 @@
import json
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import load_model # type: ignore
from tensorflow.keras.preprocessing.sequence import pad_sequences # type: ignore
from tensorflow.keras.preprocessing.text import tokenizer_from_json # type: ignore
import re
class AnswerGenerationRepository:
MODEL_PATH = "app/lstm_model/question_generation/qa_lstm_model_final_v2.keras"
TOKENIZER_PATH = "app/lstm_model/question_generation/qa_tokenizers_v2.json"
def __init__(self):
with open(self.TOKENIZER_PATH, "r") as f:
tokenizer_data = json.load(f)
self.tokenizer = tokenizer_from_json(tokenizer_data["word_tokenizer"])
self.ner_tokenizer = tokenizer_from_json(tokenizer_data["ner_tokenizer"])
self.srl_tokenizer = tokenizer_from_json(tokenizer_data["srl_tokenizer"])
self.answer_tokenizer = tokenizer_from_json(tokenizer_data["answer_tokenizer"])
self.q_type_tokenizer = tokenizer_from_json(tokenizer_data["q_type_tokenizer"])
self.max_context_len = tokenizer_data["max_context_len"]
self.max_question_len = tokenizer_data["max_question_len"]
self.max_token_len = tokenizer_data["max_token_len"]
self.q_type_vocab_size = len(self.q_type_tokenizer.word_index) + 1
self.model = load_model(self.MODEL_PATH)
def preprocess_text(self, text):
text = text.lower()
text = re.sub(r"\s+", " ", text).strip()
return text
def predict_answer(self, context, question, tokens, ner, srl, q_type):
context_seq = self.tokenizer.texts_to_sequences([self.preprocess_text(context)])
question_seq = self.tokenizer.texts_to_sequences(
[self.preprocess_text(question)]
)
token_seq = [self.tokenizer.texts_to_sequences([" ".join(tokens)])[0]]
ner_seq = [self.ner_tokenizer.texts_to_sequences([" ".join(ner)])[0]]
srl_seq = [self.srl_tokenizer.texts_to_sequences([" ".join(srl)])[0]]
q_type_idx = self.q_type_tokenizer.word_index.get(q_type, 0)
q_type_cat = tf.keras.utils.to_categorical(
[q_type_idx], num_classes=self.q_type_vocab_size
)
context_pad = pad_sequences(
context_seq, maxlen=self.max_context_len, padding="post"
)
question_pad = pad_sequences(
question_seq, maxlen=self.max_question_len, padding="post"
)
token_pad = pad_sequences(token_seq, maxlen=self.max_token_len, padding="post")
ner_pad = pad_sequences(ner_seq, maxlen=self.max_token_len, padding="post")
srl_pad = pad_sequences(srl_seq, maxlen=self.max_token_len, padding="post")
prediction = self.model.predict(
[context_pad, question_pad, token_pad, ner_pad, srl_pad, q_type_cat],
verbose=0,
)
answer_idx = np.argmax(prediction[0])
for word, idx in self.answer_tokenizer.word_index.items():
if idx == answer_idx:
return word
return "Unknown"

View File

@ -1,138 +0,0 @@
import json
from typing import Any, Dict, List
from redis import Redis
class AnswerMemoryRepository:
KEY_TEMPLATE = "answer:{session_id}:{user_id}"
KEY_PATTERN_TEMPLATE = "answer:{session_id}:*"
def __init__(self, redis: Redis):
self.redis = redis
def _build_key(self, session_id: str, user_id: str) -> str:
return self.KEY_TEMPLATE.format(session_id=session_id, user_id=user_id)
def _build_pattern_key(self, session_id: str) -> str:
return self.KEY_PATTERN_TEMPLATE.format(session_id=session_id)
def initialize_empty_answers(
self,
session_id: str,
user_ids: List[str],
total_questions: int,
):
"""
Initialize empty answers for all users at the start of the quiz.
"""
for user_id in user_ids:
key = self._build_key(session_id, user_id)
answers = [
{
"question_index": idx + 1,
"answer": "",
"is_true": False,
"time_spent": 0.0,
}
for idx in range(total_questions)
]
self.set_data(key, answers)
def save_user_answer(
self,
session_id: str,
user_id: str,
question_index: int,
answer: Any,
correct: bool,
time_spent: float,
):
"""
Update user's answer for a specific question.
Assumes answers have been initialized.
"""
key = self._build_key(session_id, user_id)
answers = self.get_data(key) or []
for ans in answers:
if ans.get("question_index") == question_index:
ans.update(
{
"answer": answer,
"is_true": correct,
"time_spent": time_spent,
}
)
break
self.set_data(key, answers)
def get_user_answers(self, session_id: str, user_id: str) -> List[Dict[str, Any]]:
key = self._build_key(session_id, user_id)
return self.get_data(key) or []
def get_all_user_answers(self, session_id: str) -> Dict[str, List[Dict[str, Any]]]:
pattern = self._build_pattern_key(session_id)
keys = self.redis.keys(pattern)
all_answers = {}
for key in keys:
user_id = key.decode().split(":")[-1]
all_answers[user_id] = self.get_data(key) or []
return all_answers
def delete_all_answers(self, session_id: str):
pattern = self._build_pattern_key(session_id)
keys = self.redis.keys(pattern)
if keys:
self.redis.delete(*keys)
def set_data(self, key: str, value: Any):
self.redis.set(key, json.dumps(value))
def get_data(self, key: str) -> Any:
data = self.redis.get(key)
return json.loads(data) if data else None
def auto_fill_incorrect_answers(
self,
session_id: str,
question_index: int,
default_time_spent: float = 0.0,
) -> List[str]:
"""
Auto-fill unanswered specific question (by index) as incorrect for all users.
:return: List of user IDs who had not answered the specific question.
"""
pattern = self._build_pattern_key(session_id)
keys = self.redis.keys(pattern)
users_with_unanswered = []
for key in keys:
answers = self.get_data(key) or []
has_unanswered = False
for ans in answers:
if (
ans.get("question_index") == question_index
and ans.get("answer") == ""
):
has_unanswered = True
ans.update(
{
"answer": "",
"is_true": False,
"time_spent": default_time_spent,
}
)
break # No need to check other answers for this user
if has_unanswered:
user_id = key.decode().split(":")[-1]
users_with_unanswered.append(user_id)
self.set_data(key, answers)
return users_with_unanswered

View File

@ -1,50 +0,0 @@
from pymongo.collection import Collection
from bson import ObjectId
from typing import Optional, List
from app.models import UserAnswerEntity
class UserAnswerRepository:
def __init__(self, db):
self.collection: Collection = db.user_answers
def create(self, answer_session: UserAnswerEntity) -> str:
data = answer_session.model_dump(by_alias=True, exclude_none=True)
result = self.collection.insert_one(data)
return str(result.inserted_id)
def get_by_id(self, id: str) -> Optional[UserAnswerEntity]:
result = self.collection.find_one({"_id": ObjectId(id)})
if not result:
return None
return UserAnswerEntity(**result)
def get_by_userid_and_sessionid(
self,
user_id: str,
session_id: str,
) -> Optional[UserAnswerEntity]:
result = self.collection.find_one(
{"user_id": user_id, "session_id": session_id}
)
if not result:
return None
return UserAnswerEntity(**result)
def get_by_user_and_quiz(self, user_id: str, quiz_id: str) -> List[dict]:
result = self.collection.find(
{"user_id": user_id, "quiz_id": ObjectId(quiz_id)}
)
return list(result)
def get_by_user(self, user_id: str) -> list[UserAnswerEntity]:
result = self.collection.find({"user_id": user_id}).sort("answered_at", -1)
return [UserAnswerEntity(**doc) for doc in result]
def get_by_session(self, session_id: str) -> List[dict]:
result = self.collection.find({"session_id": ObjectId(session_id)})
return list(result)
def delete_by_id(self, id: str) -> bool:
result = self.collection.delete_one({"_id": ObjectId(id)})
return result.deleted_count > 0

View File

@ -1,49 +0,0 @@
import numpy as np
import pickle
from tensorflow.keras.models import load_model # type: ignore
from tensorflow.keras.preprocessing.sequence import pad_sequences # type: ignore
import re
class NERSRLRepository:
def __init__(self):
# Load model and artifacts
self.model = load_model("app/lstm_model/ner_srl/lstm_ner_srl_model.keras")
with open("app/lstm_model/ner_srl/word2idx.pkl", "rb") as f:
self.word2idx = pickle.load(f)
with open("app/lstm_model/ner_srl/tag2idx_ner.pkl", "rb") as f:
self.tag2idx_ner = pickle.load(f)
with open("app/lstm_model/ner_srl/tag2idx_srl.pkl", "rb") as f:
self.tag2idx_srl = pickle.load(f)
self.idx2tag_ner = {i: t for t, i in self.tag2idx_ner.items()}
self.idx2tag_srl = {i: t for t, i in self.tag2idx_srl.items()}
self.PAD_WORD_ID = self.word2idx["PAD"]
self.MAXLEN = self.model.input_shape[1]
def _preprocess_tokens(self, tokens: list[str]) -> np.ndarray:
seq = [self.word2idx.get(tok.lower(), self.word2idx["UNK"]) for tok in tokens]
return pad_sequences(
[seq], maxlen=self.MAXLEN, padding="post", value=self.PAD_WORD_ID
)
def predict_sentence(self, sentence: str) -> dict:
tokens = re.findall(r"\d{1,2}\.\d{2}|\w+|[^\w\s]", sentence.lower())
seq_padded = self._preprocess_tokens(tokens)
pred_ner_prob, pred_srl_prob = self.model.predict(seq_padded, verbose=0)
pred_ner = pred_ner_prob.argmax(-1)[0][: len(tokens)]
pred_srl = pred_srl_prob.argmax(-1)[0][: len(tokens)]
return {
"tokens": tokens,
"ner": [self.idx2tag_ner[int(i)] for i in pred_ner],
"srl": [self.idx2tag_srl[int(i)] for i in pred_srl],
}
def labeling_token(self, tokens: list[str]) -> dict:
sentence = " ".join(tokens)
return self.predict_sentence(sentence)

View File

@ -1,87 +0,0 @@
import numpy as np
import json
import tensorflow as tf
from tensorflow.keras.preprocessing.text import tokenizer_from_json # type: ignore
from tensorflow.keras.preprocessing.sequence import pad_sequences # type: ignore
from tensorflow.keras.models import load_model # type: ignore
import re
class QuestionGenerationRepository:
# Static paths for model and tokenizer
MODEL_PATH = "app/lstm_model/question_generation/new_model/question_prediction_model_final.h5"
TOKENIZER_PATH = "app/lstm_model/question_generation/new_model/question_prediction_tokenizers.json"
def __init__(self):
"""
Initialize question prediction model with pre-trained model and tokenizers
using static paths
"""
# Load model
self.model = load_model(self.MODEL_PATH)
# Load tokenizers
with open(self.TOKENIZER_PATH, "r") as f:
tokenizer_data = json.load(f)
# Reconstruct tokenizers
self.word_tokenizer = tokenizer_from_json(tokenizer_data["word_tokenizer"])
self.ner_tokenizer = tokenizer_from_json(tokenizer_data["ner_tokenizer"])
self.srl_tokenizer = tokenizer_from_json(tokenizer_data["srl_tokenizer"])
self.q_type_tokenizer = tokenizer_from_json(tokenizer_data["q_type_tokenizer"])
# Get max lengths
self.max_context_len = tokenizer_data["max_context_len"]
self.max_question_len = tokenizer_data["max_question_len"]
self.max_token_len = tokenizer_data["max_token_len"]
# Get vocabulary sizes
self.vocab_size = len(self.word_tokenizer.word_index) + 1
self.q_type_vocab_size = len(self.q_type_tokenizer.word_index) + 1
def preprocess_text(self, text):
"""Basic text preprocessing"""
text = text.lower()
text = re.sub(r"\s+", " ", text).strip()
return text
def predict_question(self, context, tokens, ner, srl, q_type):
"""Prediksi pertanyaan berdasarkan konteks dan fitur lainnya"""
# Preprocess
context = self.preprocess_text(context)
# Convert to sequences
context_seq = self.word_tokenizer.texts_to_sequences([context])[0]
token_seq = self.word_tokenizer.texts_to_sequences([" ".join(tokens)])[0]
ner_seq = self.ner_tokenizer.texts_to_sequences([" ".join(ner)])[0]
srl_seq = self.srl_tokenizer.texts_to_sequences([" ".join(srl)])[0]
# Pad sequences
context_padded = pad_sequences(
[context_seq], maxlen=self.max_context_len, padding="post"
)
token_padded = pad_sequences(
[token_seq], maxlen=self.max_token_len, padding="post"
)
ner_padded = pad_sequences([ner_seq], maxlen=self.max_token_len, padding="post")
srl_padded = pad_sequences([srl_seq], maxlen=self.max_token_len, padding="post")
# Q-type one-hot encoding
q_type_idx = self.q_type_tokenizer.word_index.get(q_type, 0)
q_type_one_hot = tf.keras.utils.to_categorical(
[q_type_idx], num_classes=self.q_type_vocab_size
)
# Predict
pred = self.model.predict(
[context_padded, token_padded, ner_padded, srl_padded, q_type_one_hot]
)
# Convert prediction to words
pred_seq = np.argmax(pred[0], axis=1)
# Convert indices to words
reverse_word_map = {v: k for k, v in self.word_tokenizer.word_index.items()}
pred_words = [reverse_word_map.get(i, "") for i in pred_seq if i != 0]
return " ".join(pred_words)

View File

@ -1,32 +0,0 @@
import json
from typing import Dict, Any, Optional
from redis import Redis
from app.helpers import DatetimeUtil
from app.models.entities import QuizEntity
class QuizMemoryRepository:
KEY_TEMPLATE = "quiz:{session_id}"
def __init__(self, redis: Redis):
self.redis = redis
def _build_key(self, session_id: str) -> str:
return self.KEY_TEMPLATE.format(session_id=session_id)
def set_quiz_for_session(self, session_id: str, quiz_data: QuizEntity):
data = quiz_data.model_dump()
data["id"] = str(data["id"])
data["date"] = DatetimeUtil.to_string(data["date"])
self.redis.set(self._build_key(session_id), json.dumps(data))
def get_quiz_for_session(self, session_id: str) -> Optional[QuizEntity]:
data = self.redis.get(self._build_key(session_id))
if data:
data = json.loads(data)
data["date"] = DatetimeUtil.from_string(data["date"])
return QuizEntity(**data)
return None
def delete_quiz_for_session(self, session_id: str):
self.redis.delete(self._build_key(session_id))

View File

@ -1,155 +0,0 @@
from bson import ObjectId
from typing import List, Optional
from app.models.entities import QuizEntity
from pymongo.database import Database
from pymongo.collection import Collection
class QuizRepository:
def __init__(self, db: Database):
self.collection: Collection = db.quiz
def create(self, quiz: QuizEntity) -> str:
quiz_dict = quiz.model_dump(by_alias=True, exclude_none=True)
result = self.collection.insert_one(quiz_dict)
return str(result.inserted_id)
def get_by_id(self, quiz_id: str) -> Optional[QuizEntity]:
data = self.collection.find_one({"_id": ObjectId(quiz_id)})
if data:
return QuizEntity(**data)
return None
def search_by_title_or_category(
self, keyword: str, subject_id: Optional[str], page: int, page_size: int
) -> List[QuizEntity]:
skip = (page - 1) * page_size
query_conditions = [
{"is_public": True},
{
"$or": [
{"title": {"$regex": keyword, "$options": "i"}},
# {"category": {"$regex": keyword, "$options": "i"}},
]
},
]
if subject_id:
query_conditions.append({"subject_id": subject_id})
cursor = (
self.collection.find({"$and": query_conditions}).skip(skip).limit(page_size)
)
return [QuizEntity(**doc) for doc in cursor]
def count_by_search(self, keyword: str) -> int:
return self.collection.count_documents(
{
"$or": [
{"title": {"$regex": keyword, "$options": "i"}},
{"category": {"$regex": keyword, "$options": "i"}},
]
}
)
def get_by_ids(self, quiz_ids: List[str]) -> Optional[List[QuizEntity]]:
object_ids = [ObjectId(qid) for qid in quiz_ids]
cursor = self.collection.find({"_id": {"$in": object_ids}})
datas = list(cursor)
if not datas:
return None
return [QuizEntity(**data) for data in datas]
def get_by_user_id(
self, user_id: str, page: int = 1, page_size: int = 10
) -> List[QuizEntity]:
skip = (page - 1) * page_size
cursor = (
self.collection.find({"author_id": user_id}).skip(skip).limit(page_size)
)
return [QuizEntity(**data) for data in cursor]
def get_all(self, skip: int = 0, limit: int = 10) -> List[QuizEntity]:
cursor = self.collection.find().skip(skip).limit(limit)
return [QuizEntity(**doc) for doc in cursor]
def update(self, quiz_id: str, update_data: dict) -> bool:
result = self.collection.update_one(
{"_id": ObjectId(quiz_id)}, {"$set": update_data}
)
return result.modified_count > 0
def update_user_playing(self, quiz_id: str, total_user: int) -> bool:
result = self.collection.update_one(
{"_id": ObjectId(quiz_id)}, {"$set": {"total_user_playing": total_user}}
)
return result.modified_count > 0
def delete(self, quiz_id: str) -> bool:
result = self.collection.delete_one({"_id": ObjectId(quiz_id)})
return result.deleted_count > 0
def count_by_user_id(self, user_id: str) -> int:
return self.collection.count_documents({"author_id": user_id})
def get_top_played_quizzes(
self,
page: int = 1,
limit: int = 3,
is_public: bool = True,
lang_code: str = "id",
) -> List[QuizEntity]:
skip = (page - 1) * limit
cursor = (
self.collection.find(
{
"is_public": is_public,
"language_code": lang_code,
}
)
.sort("total_user_playing", -1)
.skip(skip)
.limit(limit)
)
return [QuizEntity(**doc) for doc in cursor]
def get_random_quizzes(
self,
limit: int = 3,
lang_code: str = "id",
) -> List[QuizEntity]:
pipeline = [
{
"$match": {
"is_public": True,
"language_code": lang_code,
}
},
{"$sample": {"size": limit}},
]
cursor = self.collection.aggregate(pipeline)
return [QuizEntity(**doc) for doc in cursor]
def get_random_quizzes_by_subjects(
self,
subject_ids: List[str],
limit: int = 3,
lang_code: str = "id",
) -> List[QuizEntity]:
pipeline = [
{
"$match": {
"subject_id": {"$in": subject_ids},
"is_public": True,
"language_code": lang_code,
}
},
{"$sample": {"size": limit}},
]
cursor = self.collection.aggregate(pipeline)
return [QuizEntity(**doc) for doc in cursor]

View File

@ -1,28 +0,0 @@
from typing import Dict
from redis import Redis
class ScoreMemoryRepository:
KEY_TEMPLATE = "score:{session_id}"
def __init__(self, redis: Redis):
self.redis = redis
def _build_key(self, session_id: str) -> str:
return self.KEY_TEMPLATE.format(session_id=session_id)
def update_user_score(self, session_id: str, user_id: str, correct: bool):
hkey = self._build_key(session_id)
field = f"{user_id}:{'correct' if correct else 'incorrect'}"
self.redis.hincrby(hkey, field, 1)
def get_scores(self, session_id: str) -> Dict[str, Dict[str, int]]:
raw = self.redis.hgetall(self._build_key(session_id))
scores = {}
for k, v in raw.items():
uid, category = k.decode().split(":")
scores.setdefault(uid, {"correct": 0, "incorrect": 0})[category] = int(v)
return scores
def delete_scores(self, session_id: str):
self.redis.delete(self._build_key(session_id))

View File

@ -1,108 +0,0 @@
import json
from typing import Dict, Any, Optional
from redis import Redis
from app.helpers import DatetimeUtil
from app.models.entities import SessionEntity
class SessionMemoryRepository:
KEY_TEMPLATE = "session:{session_id}"
KEY_PATTERN = "session:*"
def __init__(self, redis: Redis):
self.redis = redis
def _build_key(self, session_id: str) -> str:
return self.KEY_TEMPLATE.format(session_id=session_id)
def set_data(self, key: str, value: Any):
self.redis.set(key, json.dumps(value))
def get_data(self, key: str) -> Optional[Any]:
data = self.redis.get(key)
return json.loads(data) if data else None
def delete_key(self, key: str):
self.redis.delete(key)
def create_session(self, session_id: str, initial_data: SessionEntity) -> str:
data = initial_data.model_dump()
data["id"] = data["id"]
data["created_at"] = str(data["created_at"])
self.set_data(self._build_key(session_id), data)
return session_id
def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
return self.get_data(self._build_key(session_id))
def close_session(self, session_id: str) -> Optional[Dict[str, Any]]:
session = self.get_data(self._build_key(session_id))
if not session:
return None
session["status"] = "closed"
session["closed_at"] = DatetimeUtil.now_iso()
self.delete_key(self._build_key(session_id))
return session
def find_session_by_code(self, session_code: str) -> Optional[Dict[str, Any]]:
session_keys = self.redis.keys(self.KEY_PATTERN)
for key in session_keys:
session_data = self.get_data(key)
if session_data and session_data.get("session_code") == session_code:
return session_data
return None
def add_user_to_session(
self, session_id: str, user_data: Dict[str, Any] = None
) -> bool:
session = self.get_session(session_id)
if not session:
return False
user_entry = {
**(user_data or {}),
"joined_at": DatetimeUtil.now_iso(),
}
existing_users = session.get("participants", [])
existing_users.append(user_entry)
session["participants"] = existing_users
self.set_data(self._build_key(session_id), session)
return True
def get_user_in_session(self, session_id: str):
session = self.get_session(session_id)
if not session:
return []
return session.get("participants", [])
def remove_user_from_session(self, session_id: str, user_id: str) -> bool:
session = self.get_session(session_id)
if not session:
return False
session["participants"] = [
user
for user in session.get("participants", [])
if user.get("id") != user_id
]
self.set_data(self._build_key(session_id), session)
return True
def delete_session(self, session_id: str) -> bool:
"""
Delete a session by its session_id.
Args:
session_id (str): The ID of the session to delete.
Returns:
bool: True if the session was deleted, False if it did not exist.
"""
key = self._build_key(session_id)
if self.redis.exists(key):
self.delete_key(key)
return True
return False

View File

@ -1,49 +0,0 @@
from pymongo.collection import Collection
from pymongo.database import Database
from typing import Optional
from app.models.entities import SessionEntity
from bson import ObjectId
class SessionRepository:
COLLECTION_NAME = "session"
def __init__(self, db: Database):
self.collection: Collection = db[self.COLLECTION_NAME]
# self.collection.create_index("id", unique=True)
def insert(self, session_data: SessionEntity) -> str:
result = self.collection.insert_one(
session_data.model_dump(by_alias=True, exclude_none=True)
)
return str(result.inserted_id)
def find_by_session_id(self, session_id: str) -> Optional[SessionEntity]:
doc = self.collection.find_one({"_id": ObjectId(session_id)})
return SessionEntity(**doc) if doc else None
def find_by_session_code(self, session_code: str) -> Optional[SessionEntity]:
doc = self.collection.find_one({"session_code": session_code})
return SessionEntity(**doc) if doc else None
def update(self, session_id: str, update_fields: SessionEntity) -> bool:
result = self.collection.update_one(
{"_id": ObjectId(session_id)},
{"$set": update_fields.model_dump(by_alias=True, exclude_none=True)},
)
return result.modified_count > 0
def add_participant(self, session_id: str, user_id: str) -> bool:
"""Add user_id to participants array without duplicates"""
result = self.collection.update_one(
{"_id": session_id}, {"$addToSet": {"participants": user_id}}
)
return result.modified_count > 0
def delete(self, session_id: str) -> bool:
result = self.collection.delete_one({"_id": session_id})
return result.deleted_count > 0
def list_active_sessions(self) -> list[SessionEntity]:
docs = self.collection.find({"is_active": True})
return [SessionEntity(**doc) for doc in docs]

View File

@ -1,61 +0,0 @@
from typing import List, Optional
from pymongo.database import Database
from pymongo.collection import Collection
from bson import ObjectId, errors as bson_errors
from app.models.entities import SubjectEntity
class SubjectRepository:
COLLECTION_NAME = "subjects"
def __init__(self, db: Database):
self.collection: Collection = db[self.COLLECTION_NAME]
def create(self, subject: SubjectEntity) -> str:
subject_dict = subject.model_dump(by_alias=True, exclude_none=True)
result = self.collection.insert_one(subject_dict)
return str(result.inserted_id)
def get_all(self) -> List[SubjectEntity]:
return [SubjectEntity(**doc) for doc in self.collection.find()]
def get_by_id(self, subject_id: str) -> Optional[SubjectEntity]:
try:
oid = ObjectId(subject_id)
except bson_errors.InvalidId:
return None
doc = self.collection.find_one({"_id": oid})
return SubjectEntity(**doc) if doc else None
def get_by_ids(self, subject_ids: List[str]) -> List[SubjectEntity]:
object_ids = []
for sid in subject_ids:
try:
object_ids.append(ObjectId(sid))
except bson_errors.InvalidId:
continue
if not object_ids:
return []
cursor = self.collection.find({"_id": {"$in": object_ids}})
return [SubjectEntity(**doc) for doc in cursor]
def update(self, subject_id: str, update_data: dict) -> bool:
try:
oid = ObjectId(subject_id)
except bson_errors.InvalidId:
return False
result = self.collection.update_one({"_id": oid}, {"$set": update_data})
return result.modified_count > 0
def delete(self, subject_id: str) -> bool:
try:
oid = ObjectId(subject_id)
except bson_errors.InvalidId:
return False
result = self.collection.delete_one({"_id": oid})
return result.deleted_count > 0

View File

@ -1,54 +1,24 @@
from typing import Optional
from bson import ObjectId
from app.models.entities import UserEntity
class UserRepository: class UserRepository:
def __init__(self, db): users = [
self.collection = db.users {
"id": 1,
"name": "akhdan",
"email": "akhdanre@gmail.com",
"password": "password123",
},
{
"id": 2,
"name": "Bob",
"email": "bob@example.com",
"password": "password123",
},
]
def get_all_users(self) -> list[UserEntity]: def get_all_users(self):
"""Retrieve all users from the database.""" return self.users
users = list(self.collection.find({}, {"_id": 0}))
return [UserEntity(**user) for user in users]
def get_user_by_email(self, email: str) -> Optional[UserEntity]: def get_user_by_email(self, email):
"""Retrieve a user based on their email address.""" for user in self.users:
user = self.collection.find_one({"email": email}) if user.get("email") == email:
return UserEntity(**user) if user else None return user
return None
def get_user_by_id(self, user_id: str) -> Optional[UserEntity]:
"""Retrieve a user based on their ID."""
object_id = ObjectId(user_id)
user = self.collection.find_one({"_id": object_id})
return UserEntity(**user) if user else None
def get_by_google_id(self, google_id: str) -> Optional[UserEntity]:
"""Retrieve a user based on their Google ID."""
user_data = self.collection.find_one({"google_id": google_id})
return UserEntity(**user_data) if user_data else None
def insert_user(self, user_data: UserEntity) -> str:
"""Insert a new user into the database and return the user's ID."""
result = self.collection.insert_one(user_data.model_dump())
return str(result.inserted_id)
def update_user(self, user_id: str, update_data: dict) -> bool:
"""Update all fields of a user based on their ID."""
object_id = ObjectId(user_id)
result = self.collection.update_one({"_id": object_id}, {"$set": update_data})
return result.modified_count > 0
def update_user_field(self, user_id: str, field: str, value) -> bool:
"""Update a single field of a user based on their ID."""
object_id = ObjectId(user_id)
result = self.collection.update_one(
{"_id": object_id}, {"$set": {field: value}}
)
return result.modified_count > 0
def delete_user(self, user_id: str) -> bool:
"""Delete a user based on their ID."""
object_id = ObjectId(user_id)
result = self.collection.delete_one({"_id": object_id})
return result.deleted_count > 0

View File

@ -1,15 +0,0 @@
from .login_schema import LoginSchema
from .basic_response_schema import ResponseSchema, MetaSchema
from .requests import RegisterSchema
from .response import QuizCreationResponse, QuizGetSchema, QuestionItemSchema
__all__ = [
"LoginSchema",
"ResponseSchema",
"MetaSchema",
"RegisterSchema",
"QuizCreationResponse",
"QuizGetSchema",
"QuestionItemSchema",
]

Some files were not shown because too many files have changed in this diff Show More