Compare commits
No commits in common. "develop" and "main" have entirely different histories.
|
@ -0,0 +1,3 @@
|
|||
MONGO_URI=mongodb://localhost:27017/quiz_app
|
||||
FLASK_ENV=development
|
||||
DEBUG=True
|
19
.env.example
19
.env.example
|
@ -1,19 +0,0 @@
|
|||
# Existing Configurations
|
||||
MONGO_URI=
|
||||
FLASK_ENV=development
|
||||
DEBUG=True
|
||||
|
||||
SECRET_KEY=
|
||||
|
||||
GOOGLE_PROJECT_ID=
|
||||
GOOGLE_CLIENT_ID=
|
||||
GOOGLE_CLIENT_SECRET=
|
||||
GOOGLE_AUHT_URI=
|
||||
GOOGLE_TOKEN_URI=
|
||||
GOOGLE_AUTH_PROVIDER_X509_CERT_URL=
|
||||
|
||||
# Redis Configuration
|
||||
REDIS_HOST=localhost
|
||||
REDIS_PORT=6379
|
||||
REDIS_DB=0
|
||||
REDIS_PASSWORD=
|
|
@ -1,137 +0,0 @@
|
|||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# Only ignore __pycache__ inside app
|
||||
app/**/__pycache__/
|
||||
# Ignore compiled Python files inside app
|
||||
app/**/*.pyc
|
||||
app/**/*.pyo
|
||||
|
||||
# C extensions
|
||||
*.so
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
pip-wheel-metadata/
|
||||
share/python-wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
MANIFEST
|
||||
|
||||
# PyInstaller
|
||||
*.manifest
|
||||
*.spec
|
||||
|
||||
# Installer logs
|
||||
pip-log.txt
|
||||
pip-delete-this-directory.txt
|
||||
|
||||
# Unit test / coverage reports
|
||||
htmlcov/
|
||||
.tox/
|
||||
.nox/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
*.cover
|
||||
*.py,cover
|
||||
.hypothesis/
|
||||
.pytest_cache/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
*.pot
|
||||
|
||||
# Environments
|
||||
.env
|
||||
.venv
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# Logs
|
||||
logs/
|
||||
*.log
|
||||
|
||||
# Django stuff:
|
||||
*.sqlite3
|
||||
db.sqlite3
|
||||
|
||||
# Flask stuff:
|
||||
instance/
|
||||
.webassets-cache
|
||||
|
||||
# Scrapy stuff:
|
||||
.scrapy
|
||||
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
|
||||
# PyBuilder
|
||||
target/
|
||||
|
||||
# Jupyter Notebook
|
||||
.ipynb_checkpoints
|
||||
|
||||
# IPython
|
||||
profile_default/
|
||||
ipython_config.py
|
||||
|
||||
# pyenv
|
||||
.python-version
|
||||
|
||||
# mypy
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
||||
# Pyre
|
||||
.pyre/
|
||||
|
||||
# pytype
|
||||
.pytype/
|
||||
|
||||
# Cython debug symbols
|
||||
cython_debug/
|
||||
|
||||
# VS Code
|
||||
.vscode/
|
||||
|
||||
# JetBrains IDEs
|
||||
.idea/
|
||||
*.iml
|
||||
|
||||
# MacOS
|
||||
.DS_Store
|
||||
|
||||
# Thumbs.db (Windows)
|
||||
Thumbs.db
|
||||
ehthumbs.db
|
||||
|
||||
# Others
|
||||
*.bak
|
||||
*.swp
|
||||
*.swo
|
||||
*~
|
||||
|
||||
# Local dev files
|
||||
local_settings.py
|
|
@ -1,5 +0,0 @@
|
|||
{
|
||||
"yaml.schemas": {
|
||||
"openapi:v3": "file:///mnt/disc1/code/thesis_quiz_project/quiz_maker/docs/rest_api_docs.yaml"
|
||||
}
|
||||
}
|
Binary file not shown.
Binary file not shown.
|
@ -1,2 +0,0 @@
|
|||
# from flask import Flask
|
||||
from app.main import createApp
|
|
@ -1,23 +1,3 @@
|
|||
from .default import default_blueprint
|
||||
|
||||
from .auth import auth_blueprint
|
||||
from .user import user_blueprint
|
||||
from .swagger import swagger_blueprint
|
||||
from .quiz import quiz_bp
|
||||
from .history import history_blueprint
|
||||
from .subject import subject_blueprint
|
||||
from .session import session_bp
|
||||
|
||||
__all__ = [
|
||||
"default_blueprint",
|
||||
"auth_blueprint",
|
||||
"user_blueprint",
|
||||
"quiz_bp",
|
||||
"history_blueprint",
|
||||
"subject_blueprint",
|
||||
"session_bp",
|
||||
"swagger_blueprint",
|
||||
]
|
||||
|
||||
|
||||
# from .user import user_blueprint
|
||||
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -1,25 +1,22 @@
|
|||
from flask import Blueprint
|
||||
from app.controllers import AuthController
|
||||
from app.di_container import Container
|
||||
from dependency_injector.wiring import inject, Provide
|
||||
|
||||
from controllers import AuthController
|
||||
|
||||
# Inisialisasi blueprint
|
||||
auth_blueprint = Blueprint("auth", __name__)
|
||||
auth_controller = AuthController()
|
||||
|
||||
|
||||
# Daftarkan rute ke controller
|
||||
@auth_blueprint.route("/register", methods=["POST"])
|
||||
def register():
|
||||
return auth_controller.register()
|
||||
|
||||
|
||||
@auth_blueprint.route("/login", methods=["POST"])
|
||||
@inject
|
||||
def login(auth_controller: AuthController = Provide[Container.auth_controller]):
|
||||
def login():
|
||||
return auth_controller.login()
|
||||
|
||||
|
||||
@auth_blueprint.route("/login/google", methods=["POST"])
|
||||
@inject
|
||||
def google_login(auth_controller: AuthController = Provide[Container.auth_controller]):
|
||||
return auth_controller.google_login()
|
||||
|
||||
|
||||
@auth_blueprint.route("/logout", methods=["DELETE"])
|
||||
@inject
|
||||
def logout(auth_controller: AuthController = Provide[Container.auth_controller]):
|
||||
def logout():
|
||||
return auth_controller.logout()
|
||||
|
|
|
@ -1,33 +0,0 @@
|
|||
from flask import Blueprint
|
||||
from app.controllers import HistoryController
|
||||
from app.di_container import Container
|
||||
from dependency_injector.wiring import inject, Provide
|
||||
|
||||
history_blueprint = Blueprint("history", __name__)
|
||||
|
||||
|
||||
@history_blueprint.route("/<user_id>", methods=["GET"])
|
||||
@inject
|
||||
def user_history(
|
||||
user_id: str, controller: HistoryController = Provide[Container.history_controller]
|
||||
):
|
||||
return controller.get_quiz_by_user(user_id)
|
||||
|
||||
|
||||
@history_blueprint.route("/detail/<answer_id>", methods=["GET"])
|
||||
@inject
|
||||
def user_detail_history(
|
||||
answer_id: str,
|
||||
controller: HistoryController = Provide[Container.history_controller],
|
||||
):
|
||||
print(answer_id)
|
||||
return controller.get_detail_quiz_history(answer_id)
|
||||
|
||||
|
||||
@history_blueprint.route("/session/<session_id>", methods=["GET"])
|
||||
@inject
|
||||
def session_history(
|
||||
session_id: str,
|
||||
controller: HistoryController = Provide[Container.history_controller],
|
||||
):
|
||||
return controller.get_session_history(session_id)
|
|
@ -1,112 +0,0 @@
|
|||
from flask import Blueprint, request
|
||||
from app.di_container import Container
|
||||
from dependency_injector.wiring import inject, Provide
|
||||
from app.controllers import QuizController
|
||||
|
||||
|
||||
quiz_bp = Blueprint("quiz", __name__)
|
||||
|
||||
|
||||
@quiz_bp.route("", methods=["POST"])
|
||||
@inject
|
||||
def create_quiz(controller: QuizController = Provide[Container.quiz_controller]):
|
||||
reqBody = request.get_json()
|
||||
return controller.create_quiz(reqBody)
|
||||
|
||||
|
||||
@quiz_bp.route("/ai", methods=["POST"])
|
||||
@inject
|
||||
def create_quiz_auto(controller: QuizController = Provide[Container.quiz_controller]):
|
||||
|
||||
reqBody = request.get_json()
|
||||
return controller.create_quiz_auto(reqBody)
|
||||
|
||||
|
||||
@quiz_bp.route("/<quiz_id>", methods=["GET"])
|
||||
@inject
|
||||
def get_quiz(
|
||||
quiz_id: str, controller: QuizController = Provide[Container.quiz_controller]
|
||||
):
|
||||
return controller.get_quiz(quiz_id)
|
||||
|
||||
|
||||
@quiz_bp.route("/answer", methods=["POST"])
|
||||
@inject
|
||||
def submit_answer(controller: QuizController = Provide[Container.quiz_controller]):
|
||||
req_body = request.get_json()
|
||||
return controller.submit_answer(req_body)
|
||||
|
||||
|
||||
@quiz_bp.route("/answer/session", methods=["POST"])
|
||||
@inject
|
||||
def get_answer_session(controller: QuizController = Provide[Container.quiz_controller]):
|
||||
req_body = request.get_json()
|
||||
return controller.get_user_ans_session(req_body)
|
||||
|
||||
|
||||
@quiz_bp.route("/answer", methods=["GET"])
|
||||
@inject
|
||||
def get_answer(controller: QuizController = Provide[Container.quiz_controller]):
|
||||
quiz_id = request.args.get("quiz_id")
|
||||
user_id = request.args.get("user_id")
|
||||
session_id = request.args.get("session_id")
|
||||
|
||||
return controller.get_answer(
|
||||
quiz_id=quiz_id, user_id=user_id, session_id=session_id
|
||||
)
|
||||
|
||||
|
||||
@quiz_bp.route("/populer", methods=["GET"])
|
||||
@inject
|
||||
def get_quiz_populer(
|
||||
controller: QuizController = Provide[Container.quiz_controller],
|
||||
):
|
||||
page = request.args.get("page")
|
||||
limit = request.args.get("limit")
|
||||
lang_code = request.args.get("lang_code") or "id"
|
||||
return controller.get_quiz_populer(
|
||||
page=page,
|
||||
limit=limit,
|
||||
lang_code=lang_code,
|
||||
)
|
||||
|
||||
|
||||
@quiz_bp.route("/recommendation", methods=["GET"])
|
||||
@inject
|
||||
def get_quiz_recommendation(
|
||||
controller: QuizController = Provide[Container.quiz_controller],
|
||||
):
|
||||
user_id = request.args.get("user_id")
|
||||
page = request.args.get("page")
|
||||
limit = request.args.get("limit")
|
||||
lang_code = request.args.get("lang_code") or "id"
|
||||
return controller.get_quiz_recommendation(
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
limit=limit,
|
||||
lang_code=lang_code,
|
||||
)
|
||||
|
||||
|
||||
@quiz_bp.route("/user/<user_id>", methods=["GET"])
|
||||
@inject
|
||||
def get_user_quiz(
|
||||
user_id: str, controller: QuizController = Provide[Container.quiz_controller]
|
||||
):
|
||||
page = request.args.get("page", default=1, type=int)
|
||||
page_size = request.args.get("page_size", default=10, type=int)
|
||||
return controller.get_user_quiz(user_id=user_id, page=page, page_size=page_size)
|
||||
|
||||
|
||||
@quiz_bp.route("/search", methods=["GET"])
|
||||
@inject
|
||||
def search_quiz(controller: QuizController = Provide[Container.quiz_controller]):
|
||||
keyword = request.args.get("keyword", "")
|
||||
subject_id = request.args.get("subject_id")
|
||||
page = int(request.args.get("page", 1))
|
||||
limit = int(request.args.get("limit", 10))
|
||||
lang_code = request.args.get("lang_code") or "id"
|
||||
|
||||
return controller.search_quiz(
|
||||
keyword=keyword, subject_id=subject_id, page=page, limit=limit
|
||||
)
|
|
@ -1,18 +0,0 @@
|
|||
from flask import Blueprint, request
|
||||
from dependency_injector.wiring import inject, Provide
|
||||
from app.di_container import Container
|
||||
from app.controllers import SessionController
|
||||
|
||||
session_bp = Blueprint("session", __name__)
|
||||
|
||||
|
||||
@session_bp.route("", methods=["POST"])
|
||||
@inject
|
||||
def sessionGet(controller: SessionController = Provide[Container.session_controller]):
|
||||
return controller.createRoom(request.get_json())
|
||||
|
||||
|
||||
@session_bp.route("/summary", methods=["POST"])
|
||||
@inject
|
||||
def summary(controller: SessionController = Provide[Container.session_controller]):
|
||||
return controller.summaryall(request.get_json())
|
|
@ -1,50 +0,0 @@
|
|||
from flask import Blueprint, request
|
||||
from dependency_injector.wiring import inject, Provide
|
||||
from app.di_container import Container
|
||||
from app.controllers import SubjectController
|
||||
|
||||
|
||||
subject_blueprint = Blueprint("subject", __name__)
|
||||
|
||||
|
||||
@subject_blueprint.route("", methods=["POST"])
|
||||
@inject
|
||||
def create_subject(
|
||||
controller: SubjectController = Provide[Container.subject_controller],
|
||||
):
|
||||
return controller.create(request.get_json())
|
||||
|
||||
|
||||
@subject_blueprint.route("", methods=["GET"])
|
||||
@inject
|
||||
def get_all_subjects(
|
||||
controller: SubjectController = Provide[Container.subject_controller],
|
||||
):
|
||||
return controller.get_all()
|
||||
|
||||
|
||||
@subject_blueprint.route("/<subject_id>", methods=["GET"])
|
||||
@inject
|
||||
def get_subject(
|
||||
subject_id: str,
|
||||
controller: SubjectController = Provide[Container.subject_controller],
|
||||
):
|
||||
return controller.get_by_id(subject_id)
|
||||
|
||||
|
||||
@subject_blueprint.route("/<subject_id>", methods=["PUT"])
|
||||
@inject
|
||||
def update_subject(
|
||||
subject_id: str,
|
||||
controller: SubjectController = Provide[Container.subject_controller],
|
||||
):
|
||||
return controller.update(subject_id, request.get_json())
|
||||
|
||||
|
||||
@subject_blueprint.route("/<subject_id>", methods=["DELETE"])
|
||||
@inject
|
||||
def delete_subject(
|
||||
subject_id: str,
|
||||
controller: SubjectController = Provide[Container.subject_controller],
|
||||
):
|
||||
return controller.delete(subject_id)
|
|
@ -1,23 +0,0 @@
|
|||
from flask import Blueprint, jsonify, send_file
|
||||
from flask_swagger_ui import get_swaggerui_blueprint
|
||||
import os
|
||||
|
||||
swagger_blueprint = Blueprint("swagger", __name__)
|
||||
|
||||
SWAGGER_URL = "/swagger"
|
||||
API_URL = "http://127.0.0.1:5000/swagger/docs"
|
||||
|
||||
swagger_ui_blueprint = get_swaggerui_blueprint(
|
||||
SWAGGER_URL,
|
||||
API_URL,
|
||||
config={"app_name": "Quiz Maker API"},
|
||||
)
|
||||
|
||||
swagger_blueprint.register_blueprint(swagger_ui_blueprint)
|
||||
|
||||
|
||||
@swagger_blueprint.route("/swagger/docs")
|
||||
def serve_openapi():
|
||||
"""Serve the OpenAPI spec from a file."""
|
||||
docs_path = os.path.abspath("docs/rest_api_docs.yaml")
|
||||
return send_file(docs_path, mimetype="application/yaml")
|
|
@ -1,48 +1,12 @@
|
|||
# /blueprints/user.py
|
||||
|
||||
from flask import Blueprint
|
||||
from app.di_container import Container
|
||||
from app.controllers import UserController
|
||||
from dependency_injector.wiring import inject, Provide
|
||||
from controllers import UserController
|
||||
|
||||
user_blueprint = Blueprint("user", __name__)
|
||||
user_controller = UserController()
|
||||
|
||||
|
||||
@user_blueprint.route("/users", methods=["GET"])
|
||||
@inject
|
||||
def get_users(user_controller: UserController = Provide[Container.user_controller]):
|
||||
def get_users():
|
||||
return user_controller.get_users()
|
||||
|
||||
|
||||
@user_blueprint.route("/register", methods=["POST"])
|
||||
@inject
|
||||
def register(user_controller: UserController = Provide[Container.user_controller]):
|
||||
return user_controller.register()
|
||||
|
||||
|
||||
@user_blueprint.route("/user/update", methods=["POST"])
|
||||
@inject
|
||||
def update_user(user_controller: UserController = Provide[Container.user_controller]):
|
||||
return user_controller.update_profile()
|
||||
|
||||
|
||||
@user_blueprint.route("/user/change-password", methods=["POST"])
|
||||
@inject
|
||||
def change_password(
|
||||
user_controller: UserController = Provide[Container.user_controller],
|
||||
):
|
||||
return user_controller.change_password()
|
||||
|
||||
|
||||
@user_blueprint.route("/user/<string:user_id>", methods=["GET"])
|
||||
@inject
|
||||
def get_user(
|
||||
user_id, user_controller: UserController = Provide[Container.user_controller]
|
||||
):
|
||||
return user_controller.get_user_by_id(user_id)
|
||||
|
||||
|
||||
@user_blueprint.route("/user/status/<string:user_id>", methods=["GET"])
|
||||
@inject
|
||||
def get_user_stat(
|
||||
user_id, user_controller: UserController = Provide[Container.user_controller]
|
||||
):
|
||||
return user_controller.user_stat(user_id)
|
||||
|
|
|
@ -1,2 +1 @@
|
|||
from .config import Config
|
||||
from .logger_config import LoggerConfig
|
||||
|
|
Binary file not shown.
Binary file not shown.
|
@ -1,42 +1,18 @@
|
|||
from dotenv import load_dotenv
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# Load variables from .env
|
||||
load_dotenv(override=True)
|
||||
load_dotenv() # Load environment variables from .env
|
||||
|
||||
|
||||
class Config:
|
||||
# Flask Environment Settings
|
||||
FLASK_ENV = os.getenv("FLASK_ENV", "development")
|
||||
DEBUG = os.getenv("DEBUG", "False").lower() in ("true", "1", "t")
|
||||
API_VERSION = os.getenv("API_VERSION", "v1")
|
||||
SECRET_KEY = os.getenv("SECRET_KEY", "your_secret_key")
|
||||
MONGO_URI = os.getenv(
|
||||
"MONGO_URI", "mongodb://localhost:27017/quiz_app"
|
||||
) # Default value if not set
|
||||
|
||||
# MongoDB Settings
|
||||
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/yourdb")
|
||||
FLASK_ENV = os.getenv("FLASK_ENV", "development") # Default to development
|
||||
|
||||
# Google OAuth Settings
|
||||
GOOGLE_PROJECT_ID = os.getenv("GOOGLE_PROJECT_ID")
|
||||
GOOGLE_CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID")
|
||||
GOOGLE_CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET")
|
||||
GOOGLE_AUTH_URI = os.getenv(
|
||||
"GOOGLE_AUTH_URI", "https://accounts.google.com/o/oauth2/auth"
|
||||
)
|
||||
GOOGLE_TOKEN_URI = os.getenv(
|
||||
"GOOGLE_TOKEN_URI", "https://oauth2.googleapis.com/token"
|
||||
)
|
||||
GOOGLE_AUTH_PROVIDER_X509_CERT_URL = os.getenv("GOOGLE_AUTH_PROVIDER_X509_CERT_URL")
|
||||
GOOGLE_SCOPE = "email profile"
|
||||
GOOGLE_BASE_URL = "https://www.googleapis.com/oauth2/v1/"
|
||||
|
||||
# Redis Configuration
|
||||
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
|
||||
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
|
||||
REDIS_DB = int(os.getenv("REDIS_DB", 0))
|
||||
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", None)
|
||||
|
||||
@property
|
||||
def REDIS_URL(self):
|
||||
if self.REDIS_PASSWORD:
|
||||
return f"redis://:{self.REDIS_PASSWORD}@{self.REDIS_HOST}:{self.REDIS_PORT}/{self.REDIS_DB}"
|
||||
return f"redis://{self.REDIS_HOST}:{self.REDIS_PORT}/{self.REDIS_DB}"
|
||||
DEBUG = os.getenv("DEBUG", "True").lower() in [
|
||||
"true",
|
||||
"1",
|
||||
"t",
|
||||
] # Convert string to boolean
|
||||
|
|
|
@ -1,76 +0,0 @@
|
|||
import logging
|
||||
import os
|
||||
from logging.handlers import RotatingFileHandler
|
||||
|
||||
|
||||
class LoggerConfig:
|
||||
"""A class to configure logging for the Flask application."""
|
||||
|
||||
LOG_DIR = "logs" # Define the log directory
|
||||
|
||||
@staticmethod
|
||||
def init_logger(app, production_mode=False):
|
||||
"""
|
||||
Initializes separate log files for INFO, ERROR, and WARNING levels.
|
||||
|
||||
Args:
|
||||
app: Flask application instance
|
||||
production_mode (bool): If True, disables console output and only logs to files
|
||||
"""
|
||||
# Ensure the logs directory exists
|
||||
if not os.path.exists(LoggerConfig.LOG_DIR):
|
||||
os.makedirs(LoggerConfig.LOG_DIR)
|
||||
|
||||
# Remove default handlers to prevent duplicate logging
|
||||
for handler in app.logger.handlers[:]:
|
||||
app.logger.removeHandler(handler)
|
||||
|
||||
# Disable propagation to root logger to prevent console output in production
|
||||
if production_mode:
|
||||
app.logger.propagate = False
|
||||
|
||||
# Create separate loggers
|
||||
info_handler = LoggerConfig._setup_logger(
|
||||
"info_logger", "info.log", logging.INFO, logging.WARNING
|
||||
)
|
||||
error_handler = LoggerConfig._setup_logger(
|
||||
"error_logger", "error.log", logging.ERROR, logging.CRITICAL
|
||||
)
|
||||
warning_handler = LoggerConfig._setup_logger(
|
||||
"warning_logger", "warning.log", logging.WARNING, logging.WARNING
|
||||
)
|
||||
|
||||
# Attach handlers to Flask app logger
|
||||
app.logger.addHandler(info_handler)
|
||||
app.logger.addHandler(error_handler)
|
||||
app.logger.addHandler(warning_handler)
|
||||
app.logger.setLevel(logging.DEBUG) # Set lowest level to capture all logs
|
||||
|
||||
# Add console handler only in development mode
|
||||
if not production_mode:
|
||||
console_handler = logging.StreamHandler()
|
||||
console_handler.setLevel(logging.DEBUG)
|
||||
console_formatter = logging.Formatter(
|
||||
"%(asctime)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
console_handler.setFormatter(console_formatter)
|
||||
app.logger.addHandler(console_handler)
|
||||
|
||||
app.logger.info("Logger has been initialized for Flask application.")
|
||||
|
||||
@staticmethod
|
||||
def _setup_logger(name, filename, level, max_level):
|
||||
"""Helper method to configure loggers for specific levels."""
|
||||
logger = logging.getLogger(name)
|
||||
logger.setLevel(level)
|
||||
|
||||
log_file_path = os.path.join(LoggerConfig.LOG_DIR, filename)
|
||||
log_handler = RotatingFileHandler(log_file_path, maxBytes=100000, backupCount=3)
|
||||
log_handler.setLevel(level)
|
||||
|
||||
log_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
|
||||
log_handler.setFormatter(log_formatter)
|
||||
log_handler.addFilter(lambda record: level <= record.levelno <= max_level)
|
||||
|
||||
logger.addHandler(log_handler)
|
||||
return log_handler
|
|
@ -1,17 +1,2 @@
|
|||
from .auth_controller import AuthController
|
||||
from .user_controller import UserController
|
||||
from .quiz_controller import QuizController
|
||||
from .history_controller import HistoryController
|
||||
from .subject_controller import SubjectController
|
||||
from .socket_conroller import SocketController
|
||||
from .session_controller import SessionController
|
||||
|
||||
__all__ = [
|
||||
"AuthController",
|
||||
"UserController",
|
||||
"QuizController",
|
||||
"HistoryController",
|
||||
"SubjectController",
|
||||
"SocketController",
|
||||
"SessionController",
|
||||
]
|
||||
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -1,77 +1,19 @@
|
|||
from flask import jsonify, request, current_app
|
||||
from pydantic import ValidationError
|
||||
from app.schemas.basic_response_schema import ResponseSchema
|
||||
from app.schemas.google_login_schema import GoogleLoginSchema
|
||||
from app.schemas import LoginSchema
|
||||
from app.services import UserService, AuthService
|
||||
from app.exception import AuthException
|
||||
from app.mapper import UserMapper
|
||||
from app.helpers import make_response, make_error_response
|
||||
import logging
|
||||
|
||||
logging = logging.getLogger(__name__)
|
||||
from flask import jsonify, request
|
||||
from services import UserService
|
||||
from services import AuthService
|
||||
|
||||
|
||||
class AuthController:
|
||||
def __init__(self, userService: UserService, authService: AuthService):
|
||||
self.user_service = userService
|
||||
self.auth_service = authService
|
||||
def __init__(self):
|
||||
self.user_service = UserService()
|
||||
self.auth_service = AuthService()
|
||||
|
||||
def login(self):
|
||||
try:
|
||||
data = request.get_json()
|
||||
dataSchema = LoginSchema(**data)
|
||||
response = self.auth_service.login(dataSchema)
|
||||
|
||||
if response is None:
|
||||
return make_response(message="User is not registered", status_code=401)
|
||||
return make_response(message="Login success", data=response)
|
||||
except ValidationError as e:
|
||||
current_app.logger.error(f"Validation error: {e}")
|
||||
response = ResponseSchema(message="Invalid input", data=None, meta=None)
|
||||
return jsonify(response.model_dump()), 400
|
||||
|
||||
except Exception as e:
|
||||
current_app.logger.error(
|
||||
f"Error during Google login: {str(e)}", exc_info=True
|
||||
)
|
||||
response = ResponseSchema(
|
||||
message="Internal server error", data=None, meta=None
|
||||
)
|
||||
return jsonify(response.model_dump()), 500
|
||||
|
||||
def google_login(self):
|
||||
"""Handles Google Login via ID Token verification"""
|
||||
try:
|
||||
data = request.get_json()
|
||||
|
||||
validated_data = GoogleLoginSchema(**data)
|
||||
id_token = validated_data.token_id
|
||||
|
||||
user_info = self.auth_service.verify_google_id_token(id_token)
|
||||
if not user_info:
|
||||
return make_response(message="Invalid Google ID Token", data=user_info)
|
||||
|
||||
return make_response(message="Login Success", data=user_info)
|
||||
|
||||
except ValidationError as e:
|
||||
current_app.logger.error(f"Validation error: {e}")
|
||||
response = ResponseSchema(message="Invalid input", data=None, meta=None)
|
||||
return jsonify(response.model_dump()), 400
|
||||
|
||||
except AuthException as e:
|
||||
current_app.logger.error(f"Auth error: {e}")
|
||||
response = ResponseSchema(message=e, data=None, meta=None)
|
||||
return jsonify(response.model_dump()), 400
|
||||
|
||||
except Exception as e:
|
||||
current_app.logger.error(
|
||||
f"Error during Google login: {str(e)}", exc_info=True
|
||||
)
|
||||
response = ResponseSchema(
|
||||
message="Internal server error", data=None, meta=None
|
||||
)
|
||||
return jsonify(response.model_dump()), 500
|
||||
|
||||
def logout(self):
|
||||
return jsonify({"message": "logout"}), 200
|
||||
users = self.auth_service.login(data)
|
||||
response = {
|
||||
"status": True,
|
||||
"message": "success retrive data",
|
||||
"data": users,
|
||||
}
|
||||
return jsonify(response)
|
||||
|
|
|
@ -1,32 +0,0 @@
|
|||
from app.services import HistoryService
|
||||
from app.helpers import make_error_response, make_response
|
||||
|
||||
|
||||
class HistoryController:
|
||||
|
||||
def __init__(self, history_service: HistoryService):
|
||||
self.history_service = history_service
|
||||
|
||||
def get_quiz_by_user(self, user_id: str):
|
||||
try:
|
||||
data = self.history_service.get_history_by_user_id(user_id)
|
||||
return make_response(message="retrive history data", data=data)
|
||||
except Exception as e:
|
||||
return make_error_response(e)
|
||||
|
||||
def get_detail_quiz_history(self, answer_id: str):
|
||||
try:
|
||||
data = self.history_service.get_history_by_answer_id(answer_id)
|
||||
return make_response(
|
||||
message="success retrive detail history data", data=data
|
||||
)
|
||||
except Exception as e:
|
||||
return make_error_response(e)
|
||||
|
||||
def get_session_history(self, session_id):
|
||||
try:
|
||||
result = self.history_service.get_session_history(session_id)
|
||||
|
||||
return make_response(message="success get history session", data=result)
|
||||
except Exception as e:
|
||||
return make_error_response(e)
|
|
@ -1,188 +0,0 @@
|
|||
import json
|
||||
from pydantic import ValidationError
|
||||
from app.schemas.requests import QuizCreateSchema, UserAnswerSchema
|
||||
from app.schemas.response import QuizCreationResponse
|
||||
from app.services import QuizService, AnswerService, QuestionGenerationService
|
||||
from app.helpers import make_response, make_error_response
|
||||
from app.exception import ValidationException, DataNotFoundException
|
||||
|
||||
|
||||
class QuizController:
|
||||
def __init__(
|
||||
self,
|
||||
quiz_service: QuizService,
|
||||
answer_service: AnswerService,
|
||||
question_generate_service: QuestionGenerationService,
|
||||
):
|
||||
self.quiz_service = quiz_service
|
||||
self.answer_service = answer_service
|
||||
self.question_generate_service = question_generate_service
|
||||
|
||||
def get_quiz(self, quiz_id):
|
||||
try:
|
||||
result = self.quiz_service.get_quiz(quiz_id)
|
||||
if not result:
|
||||
return make_response(message="Quiz not found", status_code=404)
|
||||
return make_response(message="Quiz Found", data=result.model_dump())
|
||||
except DataNotFoundException as e:
|
||||
return make_response(message=e.message, status_code=e.status_code)
|
||||
except Exception as e:
|
||||
return make_error_response(e)
|
||||
|
||||
def create_quiz(self, quiz_data):
|
||||
try:
|
||||
quiz_obj = QuizCreateSchema(**quiz_data)
|
||||
quiz_id = self.quiz_service.create_quiz(quiz_obj)
|
||||
return make_response(
|
||||
message="Quiz created",
|
||||
data=QuizCreationResponse(quiz_id=quiz_id),
|
||||
status_code=201,
|
||||
)
|
||||
except (ValidationError, ValidationException) as e:
|
||||
return make_response(message="", status_code=400)
|
||||
except Exception as e:
|
||||
return make_error_response(e)
|
||||
|
||||
def quiz_populer(self):
|
||||
try:
|
||||
result = self.quiz_service.get_quiz_populer()
|
||||
if not result:
|
||||
return make_response(message="Quiz not found", status_code=404)
|
||||
return make_response(message="Quiz Found", data=result.model_dump())
|
||||
except Exception as e:
|
||||
return make_error_response(e)
|
||||
|
||||
def submit_answer(self, answer_data):
|
||||
try:
|
||||
answer_obj = UserAnswerSchema(**answer_data)
|
||||
answer_id = self.answer_service.create_answer(answer_obj)
|
||||
return make_response(
|
||||
message="Answer submitted",
|
||||
data={"answer_id": answer_id},
|
||||
status_code=201,
|
||||
)
|
||||
except ValidationError as e:
|
||||
return make_response(
|
||||
message="validation error", data=json.loads(e.json()), status_code=400
|
||||
)
|
||||
except ValidationException as e:
|
||||
return make_response(
|
||||
message=f"validation issue {e.message}", status_code=400
|
||||
)
|
||||
except Exception as e:
|
||||
return make_error_response(e)
|
||||
|
||||
def get_answer(self, quiz_id, user_id, session_id):
|
||||
try:
|
||||
# self.answer_service.
|
||||
pass
|
||||
except Exception as e:
|
||||
return make_error_response(e)
|
||||
|
||||
def get_user_quiz(self, user_id, page=1, page_size=10):
|
||||
try:
|
||||
result = self.quiz_service.get_user_quiz(
|
||||
user_id=user_id, page=page, page_size=page_size
|
||||
)
|
||||
return make_response(
|
||||
message="User quizzes retrieved successfully",
|
||||
data=result.quizzes,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
total_all_data=result.total,
|
||||
)
|
||||
except Exception as e:
|
||||
return make_error_response(e)
|
||||
|
||||
def get_quiz_populer(self, page, limit, lang_code):
|
||||
try:
|
||||
page = int(page) if page is not None else 1
|
||||
limit = int(limit) if limit is not None else 3
|
||||
result = self.quiz_service.get_quiz_populer(
|
||||
page=page,
|
||||
limit=limit,
|
||||
lang_code=lang_code,
|
||||
)
|
||||
return make_response(message="success retrieve populer quiz", data=result)
|
||||
except DataNotFoundException as e:
|
||||
return make_response(message=e.message, status_code=e.status_code)
|
||||
except ValueError as e:
|
||||
return make_response(message=str(e), data=None, status_code=400)
|
||||
except ValidationError as e:
|
||||
return make_response(
|
||||
message="validation error", data=json.loads(e.json()), status_code=400
|
||||
)
|
||||
except Exception as e:
|
||||
return make_error_response(e)
|
||||
|
||||
def get_quiz_recommendation(self, user_id, page, limit, lang_code):
|
||||
try:
|
||||
page = int(page) if page is not None else 1
|
||||
limit = int(limit) if limit is not None else 3
|
||||
|
||||
result = self.quiz_service.get_quiz_recommendation(
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
limit=limit,
|
||||
lang_code=lang_code,
|
||||
)
|
||||
return make_response(message="success retrieve populer quiz", data=result)
|
||||
except DataNotFoundException as e:
|
||||
return make_response(message=e.message, status_code=e.status_code)
|
||||
except ValueError as e:
|
||||
return make_response(message=str(e), data=None, status_code=400)
|
||||
except ValidationError as e:
|
||||
return make_response(
|
||||
message="validation error", data=json.loads(e.json()), status_code=400
|
||||
)
|
||||
except Exception as e:
|
||||
return make_error_response(e)
|
||||
|
||||
def search_quiz(self, keyword: str, subject_id: str, page: int, limit: int):
|
||||
try:
|
||||
quiz, total = self.quiz_service.search_quiz(
|
||||
keyword, subject_id, page, limit
|
||||
)
|
||||
return make_response(
|
||||
message="success",
|
||||
data=quiz,
|
||||
page=page,
|
||||
page_size=limit,
|
||||
total_all_data=total,
|
||||
)
|
||||
except Exception as e:
|
||||
return make_error_response(e)
|
||||
|
||||
def create_quiz_auto(
|
||||
self,
|
||||
reqBody,
|
||||
):
|
||||
try:
|
||||
result = self.question_generate_service.createQuizAutomate(
|
||||
reqBody["sentence"]
|
||||
)
|
||||
return make_response(message="succes labeling", data=result)
|
||||
except Exception as e:
|
||||
return make_error_response(e)
|
||||
|
||||
def get_user_ans_session(self, body):
|
||||
try:
|
||||
session_id = body.get("session_id")
|
||||
user_id = body.get("user_id")
|
||||
|
||||
if not session_id and not user_id:
|
||||
return make_response(
|
||||
message="session_id or user_id must be provided", status_code=400
|
||||
)
|
||||
|
||||
data = self.answer_service.get_answer_session(
|
||||
session_id=session_id,
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
return make_response(message="Successfully retrieved the answer", data=data)
|
||||
|
||||
except KeyError as e:
|
||||
return make_error_response(f"Missing required key: {str(e)}")
|
||||
except Exception as e:
|
||||
return make_error_response(f"An error occurred: {str(e)}")
|
|
@ -1,44 +0,0 @@
|
|||
from flask import request, jsonify
|
||||
from flask.views import MethodView
|
||||
from app.services.session_service import SessionService
|
||||
from app.helpers import make_response
|
||||
|
||||
|
||||
class SessionController(MethodView):
|
||||
def __init__(self, session_service: SessionService):
|
||||
self.session_service = session_service
|
||||
|
||||
def createRoom(self, data):
|
||||
|
||||
required_fields = [
|
||||
"quiz_id",
|
||||
"host_id",
|
||||
"room_name",
|
||||
"limit_participan",
|
||||
]
|
||||
for field in required_fields:
|
||||
if field not in data:
|
||||
return jsonify({"error": f"Missing field: {field}"}), 400
|
||||
|
||||
session = self.session_service.create_session(
|
||||
quiz_id=data["quiz_id"],
|
||||
host_id=data["host_id"],
|
||||
room_name=data["room_name"],
|
||||
limit_participan=data["limit_participan"],
|
||||
)
|
||||
|
||||
return make_response(
|
||||
message="succes create room",
|
||||
data=session,
|
||||
status_code=201,
|
||||
)
|
||||
|
||||
def summaryall(self, body):
|
||||
self.session_service.summaryAllSessionData(
|
||||
session_id=body.get("session_id"), start_time=""
|
||||
)
|
||||
return make_response(
|
||||
message="succes create room",
|
||||
data="",
|
||||
status_code=201,
|
||||
)
|
|
@ -1,261 +0,0 @@
|
|||
from flask_socketio import SocketIO, emit, join_room, leave_room
|
||||
from flask import request, current_app
|
||||
from app.services import SessionService
|
||||
import threading
|
||||
|
||||
|
||||
class SocketController:
|
||||
def __init__(
|
||||
self,
|
||||
socketio: SocketIO,
|
||||
session_service: SessionService,
|
||||
):
|
||||
self.socketio = socketio
|
||||
self.session_service = session_service
|
||||
self._register_events()
|
||||
|
||||
def _register_events(self):
|
||||
@self.socketio.on("connect")
|
||||
def on_connect():
|
||||
try:
|
||||
current_app.logger.info(f"Client connected: {request.sid}")
|
||||
emit("connection_response", {"status": "connected", "sid": request.sid})
|
||||
except Exception as e:
|
||||
emit("error", {"message": f"Connect error: {str(e)}"})
|
||||
current_app.logger.error(f"Connect error: {str(e)}")
|
||||
|
||||
@self.socketio.on("disconnect")
|
||||
def on_disconnect():
|
||||
try:
|
||||
current_app.logger.info(f"Client disconnected: {request.sid}")
|
||||
except Exception as e:
|
||||
emit("error", {"message": f"Disconnect error: {str(e)}"})
|
||||
current_app.logger.error(f"error: {str(e)}")
|
||||
|
||||
@self.socketio.on("join_room")
|
||||
def handle_join_room(data):
|
||||
try:
|
||||
session_code = data.get("session_code")
|
||||
user_id = data.get("user_id")
|
||||
|
||||
if not session_code or not user_id:
|
||||
emit("error", {"message": "session_code and user_id are required"})
|
||||
return
|
||||
|
||||
session = self.session_service.join_session(session_code, user_id)
|
||||
if session is None:
|
||||
emit(
|
||||
"error",
|
||||
{"message": "Failed to join session or session inactive"},
|
||||
)
|
||||
return
|
||||
|
||||
session_id = session["session_id"]
|
||||
join_room(session_id)
|
||||
|
||||
message = (
|
||||
"Admin has joined the room."
|
||||
if session["is_admin"]
|
||||
else f"User {session['username']} has joined the room."
|
||||
)
|
||||
|
||||
current_app.logger.info(f"Client joined: {message}")
|
||||
emit(
|
||||
"room_message",
|
||||
{
|
||||
"type": "join",
|
||||
"message": message,
|
||||
"room": session_id,
|
||||
"argument": "adm_update",
|
||||
"data": {
|
||||
"session_info": session["session_info"],
|
||||
"quiz_info": session["quiz_info"],
|
||||
},
|
||||
},
|
||||
to=request.sid,
|
||||
)
|
||||
|
||||
emit(
|
||||
"room_message",
|
||||
{
|
||||
"type": "participan_join",
|
||||
"message": message,
|
||||
"room": session_id,
|
||||
"argument": "adm_update",
|
||||
"data": {
|
||||
"participants": session["session_info"]["participants"],
|
||||
},
|
||||
},
|
||||
room=session_id,
|
||||
skip_sid=request.sid,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
emit("error", {"message": f"Join room error: {str(e)}"})
|
||||
current_app.logger.error(f"error: {str(e)}")
|
||||
|
||||
@self.socketio.on("leave_room")
|
||||
def handle_leave_room(data):
|
||||
try:
|
||||
session_id = data.get("session_id")
|
||||
user_id = data.get("user_id")
|
||||
username = data.get("username", "anonymous")
|
||||
|
||||
leave_result = self.session_service.leave_session(session_id, user_id)
|
||||
leave_room(session_id)
|
||||
|
||||
if leave_result["is_success"]:
|
||||
emit(
|
||||
"room_message",
|
||||
{
|
||||
"type": "participan_leave",
|
||||
"message": f"{username} has left the room.",
|
||||
"room": session_id,
|
||||
"argument": "adm_update",
|
||||
"data": {
|
||||
"participants": leave_result["participants"],
|
||||
},
|
||||
},
|
||||
room=session_id,
|
||||
skip_sid=request.sid,
|
||||
)
|
||||
|
||||
emit(
|
||||
"room_message",
|
||||
{
|
||||
"type": "leave",
|
||||
"message": f"{username} has left the room.",
|
||||
"room": session_id,
|
||||
"argument": "adm_update",
|
||||
"data": None,
|
||||
},
|
||||
room=session_id,
|
||||
to=request.sid,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
emit("error", {"message": f"Leave room error: {str(e)}"})
|
||||
current_app.logger.error(f"error: {str(e)}")
|
||||
|
||||
@self.socketio.on("send_message")
|
||||
def on_send_message(data):
|
||||
try:
|
||||
session_code = data.get("session_id")
|
||||
message = data.get("message")
|
||||
username = data.get("username", "anonymous")
|
||||
|
||||
emit(
|
||||
"receive_message",
|
||||
{"message": message, "from": username},
|
||||
room=session_code,
|
||||
)
|
||||
except Exception as e:
|
||||
emit("error", {"message": f"Send message error: {str(e)}"})
|
||||
current_app.logger.error(f"error: {str(e)}")
|
||||
|
||||
@self.socketio.on("end_session")
|
||||
def handle_end_session(data):
|
||||
try:
|
||||
session_code = data.get("session_id")
|
||||
user_id = data.get("user_id")
|
||||
if not session_code or not user_id:
|
||||
emit("error", {"message": "session_id and user_id required"})
|
||||
return
|
||||
|
||||
self.session_service.end_session(
|
||||
session_id=session_code, user_id=user_id
|
||||
)
|
||||
|
||||
for key in [
|
||||
self._answers_key(session_code),
|
||||
self._scores_key(session_code),
|
||||
self._questions_key(session_code),
|
||||
]:
|
||||
self.redis_repo.delete_key(key)
|
||||
|
||||
emit(
|
||||
"room_closed",
|
||||
{"message": "Session has ended.", "room": session_code},
|
||||
room=session_code,
|
||||
)
|
||||
except Exception as e:
|
||||
emit("error", {"message": f"End session error: {str(e)}"})
|
||||
current_app.logger.error(f"error: {str(e)}")
|
||||
|
||||
@self.socketio.on("start_quiz")
|
||||
def handle_start_quiz(data):
|
||||
try:
|
||||
session_id = data.get("session_id")
|
||||
if not session_id:
|
||||
emit("error", {"message": "session_id is required"})
|
||||
return
|
||||
|
||||
emit("quiz_started", {"message": "Quiz has started!"}, room=session_id)
|
||||
threading.Thread(
|
||||
target=self.session_service.run_quiz_flow,
|
||||
args=(session_id, self.socketio),
|
||||
daemon=True,
|
||||
).start()
|
||||
except Exception as e:
|
||||
emit("error", {"message": f"Start quiz error: {str(e)}"})
|
||||
current_app.logger.error(f"error: {str(e)}")
|
||||
|
||||
@self.socketio.on("submit_answer")
|
||||
def handle_submit_answer(data):
|
||||
try:
|
||||
session_id = data.get("session_id")
|
||||
user_id = data.get("user_id")
|
||||
question_index = data.get("question_index")
|
||||
user_answer = data.get("answer")
|
||||
time_spent = data.get("time_spent")
|
||||
|
||||
if not all(
|
||||
[
|
||||
session_id,
|
||||
user_id,
|
||||
question_index is not None,
|
||||
user_answer is not None,
|
||||
time_spent is not None,
|
||||
]
|
||||
):
|
||||
emit(
|
||||
"error",
|
||||
{
|
||||
"message": "session_id, user_id, question_index, and answer are required"
|
||||
},
|
||||
)
|
||||
return
|
||||
|
||||
result = self.session_service.submit_answer(
|
||||
session_id=session_id,
|
||||
user_id=user_id,
|
||||
question_index=question_index,
|
||||
answer=user_answer,
|
||||
time_spent=time_spent,
|
||||
)
|
||||
|
||||
emit(
|
||||
"answer_submitted",
|
||||
{
|
||||
"question_index": result["question_index"],
|
||||
"answer": result["answer"],
|
||||
"correct": result["correct"],
|
||||
"score": result["scores"],
|
||||
},
|
||||
to=request.sid,
|
||||
)
|
||||
|
||||
emit(
|
||||
"score_update",
|
||||
{
|
||||
"scores": self.session_service.get_ranked_scores(session_id),
|
||||
},
|
||||
room=session_id,
|
||||
)
|
||||
|
||||
except ValueError as exc:
|
||||
emit("error", {"message": str(exc)})
|
||||
current_app.logger.error(f"error: {str(exc)}")
|
||||
except Exception as e:
|
||||
emit("error", {"message": f"Submit answer error: {str(e)}"})
|
||||
current_app.logger.error(f"error: {str(e)}")
|
|
@ -1,50 +0,0 @@
|
|||
from app.services.subject_service import SubjectService
|
||||
from app.helpers import make_response, make_error_response
|
||||
from app.schemas.requests import SubjectCreateRequest
|
||||
|
||||
|
||||
class SubjectController:
|
||||
def __init__(self, service: SubjectService):
|
||||
self.service = service
|
||||
|
||||
def create(self, req_body):
|
||||
try:
|
||||
data = SubjectCreateRequest(**req_body)
|
||||
new_id = self.service.create_subject(data)
|
||||
return make_response(message="Subject created", data={"id": new_id})
|
||||
except Exception as e:
|
||||
return make_error_response(e)
|
||||
|
||||
def get_all(self):
|
||||
try:
|
||||
subjects = self.service.get_all_subjects()
|
||||
return make_response(message="success retrieve subject", data=subjects)
|
||||
except Exception as e:
|
||||
return make_error_response(e)
|
||||
|
||||
def get_by_id(self, subject_id: str):
|
||||
try:
|
||||
subject = self.service.get_subject_by_id(subject_id)
|
||||
if not subject:
|
||||
return make_response(message="Subject not found", status_code=404)
|
||||
return make_response(data=subject.model_dump())
|
||||
except Exception as e:
|
||||
return make_error_response(e)
|
||||
|
||||
def update(self, subject_id: str, req_body):
|
||||
try:
|
||||
updated = self.service.update_subject(subject_id, req_body)
|
||||
if not updated:
|
||||
return make_response(message="No subject updated", status_code=404)
|
||||
return make_response(message="Subject updated")
|
||||
except Exception as e:
|
||||
return make_error_response(e)
|
||||
|
||||
def delete(self, subject_id: str):
|
||||
try:
|
||||
deleted = self.service.delete_subject(subject_id)
|
||||
if not deleted:
|
||||
return make_response(message="No subject deleted", status_code=404)
|
||||
return make_response(message="Subject deleted")
|
||||
except Exception as e:
|
||||
return make_error_response(e)
|
|
@ -1,129 +1,12 @@
|
|||
# /controllers/user_controller.py
|
||||
from flask import jsonify, request, current_app
|
||||
from app.services import UserService
|
||||
from app.schemas import RegisterSchema
|
||||
from pydantic import ValidationError
|
||||
from app.schemas import ResponseSchema
|
||||
from app.exception import AlreadyExistException, DataNotFoundException
|
||||
from app.helpers import make_response
|
||||
from app.schemas.requests import ProfileUpdateSchema
|
||||
from flask import jsonify
|
||||
from services import UserService
|
||||
|
||||
|
||||
class UserController:
|
||||
def __init__(self, userService: UserService):
|
||||
self.user_service = userService
|
||||
def __init__(self):
|
||||
self.user_service = UserService()
|
||||
|
||||
def register(self):
|
||||
try:
|
||||
request_data = request.get_json()
|
||||
register_data = RegisterSchema(**request_data)
|
||||
self.user_service.register_user(register_data)
|
||||
return make_response("Register Success")
|
||||
|
||||
except ValidationError as e:
|
||||
current_app.logger.error(f"Validation error: {e}")
|
||||
response = ResponseSchema(message="Invalid input", data=None, meta=None)
|
||||
return make_response("Invalid input", status_code=400)
|
||||
except AlreadyExistException as e:
|
||||
return make_response("User already exists", status_code=409)
|
||||
except Exception as e:
|
||||
current_app.logger.error(
|
||||
f"Error during Google login: {str(e)}", exc_info=True
|
||||
)
|
||||
return make_response("Internal server error", status_code=500)
|
||||
|
||||
def get_user_by_id(self, user_id):
|
||||
try:
|
||||
if not user_id:
|
||||
return make_response("User ID is required", status_code=400)
|
||||
|
||||
user = self.user_service.get_user_by_id(user_id)
|
||||
if user:
|
||||
return make_response("User found", data=user)
|
||||
else:
|
||||
return make_response("User not found", status_code=404)
|
||||
except Exception as e:
|
||||
current_app.logger.error(
|
||||
f"Error while retrieving user: {str(e)}", exc_info=True
|
||||
)
|
||||
return make_response(
|
||||
message="An internal server error occurred. Please try again later.",
|
||||
status_code=500,
|
||||
)
|
||||
|
||||
def update_profile(self):
|
||||
try:
|
||||
body = request.get_json()
|
||||
reqBody = ProfileUpdateSchema(**body)
|
||||
result = self.user_service.update_profile(reqBody)
|
||||
|
||||
if result:
|
||||
return make_response(message="User profile updated successfully.")
|
||||
else:
|
||||
return make_response(
|
||||
message="Failed to update user profile. Please check the submitted data.",
|
||||
status_code=400,
|
||||
)
|
||||
except DataNotFoundException as e:
|
||||
return make_response(message="User data not found.", status_code=404)
|
||||
except ValueError as e:
|
||||
return make_response(
|
||||
message=f"Invalid data provided: {str(e)}", status_code=400
|
||||
)
|
||||
except Exception as e:
|
||||
current_app.logger.error(
|
||||
f"Error while updating profile: {str(e)}", exc_info=True
|
||||
)
|
||||
return make_response(
|
||||
message="An internal server error occurred. Please try again later.",
|
||||
status_code=500,
|
||||
)
|
||||
|
||||
def change_password(self):
|
||||
try:
|
||||
body = request.get_json()
|
||||
user_id = body.get("id")
|
||||
current_password = body.get("current_password")
|
||||
new_password = body.get("new_password")
|
||||
|
||||
if not all([user_id, current_password, new_password]):
|
||||
return make_response(
|
||||
message="Missing required fields: id, current_password, new_password",
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
result = self.user_service.change_password(
|
||||
user_id, current_password, new_password
|
||||
)
|
||||
if result:
|
||||
return make_response(message="Password changed successfully.")
|
||||
else:
|
||||
return make_response(
|
||||
message="Failed to change password.",
|
||||
status_code=400,
|
||||
)
|
||||
except DataNotFoundException as e:
|
||||
return make_response(message="User data not found.", status_code=404)
|
||||
except ValueError as e:
|
||||
return make_response(message=f"{str(e)}", status_code=400)
|
||||
except Exception as e:
|
||||
current_app.logger.error(
|
||||
f"Error while changing password: {str(e)}", exc_info=True
|
||||
)
|
||||
return make_response(
|
||||
message="An internal server error occurred. Please try again later.",
|
||||
status_code=500,
|
||||
)
|
||||
|
||||
def user_stat(self, user_id):
|
||||
try:
|
||||
response = self.user_service.get_user_status(user_id)
|
||||
return make_response(message="Success retrive user stat", data=response)
|
||||
except Exception as e:
|
||||
current_app.logger.error(
|
||||
f"Error while changing password: {str(e)}", exc_info=True
|
||||
)
|
||||
return make_response(
|
||||
message="An internal server error occurred. Please try again later.",
|
||||
status_code=500,
|
||||
)
|
||||
def get_users(self):
|
||||
users = self.user_service.get_all_users()
|
||||
return jsonify(users)
|
||||
|
|
|
@ -1,2 +1 @@
|
|||
|
||||
from .db import init_db
|
||||
|
|
Binary file not shown.
Binary file not shown.
|
@ -1,16 +1,11 @@
|
|||
from flask_pymongo import PyMongo
|
||||
from flask import Flask, current_app
|
||||
from .seed.subject_seed import seed_subjects
|
||||
from flask import Flask
|
||||
from configs import Config
|
||||
|
||||
mongo = PyMongo()
|
||||
|
||||
|
||||
def init_db(app: Flask) -> PyMongo:
|
||||
try:
|
||||
mongo = PyMongo(app)
|
||||
|
||||
mongo.cx.server_info()
|
||||
app.logger.info("MongoDB connection established")
|
||||
seed_subjects(mongo)
|
||||
return mongo
|
||||
except Exception as e:
|
||||
app.logger.error(f"MongoDB connection failed: {e}")
|
||||
return None
|
||||
def init_db(app: Flask):
|
||||
app.config["MONGO_URI"] = Config.MONGO_URI
|
||||
print(Config.MONGO_URI)
|
||||
mongo.init_app(app)
|
||||
|
|
|
@ -1,42 +0,0 @@
|
|||
from flask_pymongo import PyMongo
|
||||
|
||||
|
||||
def seed_subjects(mongo: PyMongo):
|
||||
subject_collection = mongo.db.subjects
|
||||
|
||||
base_subjects = [
|
||||
{
|
||||
"name": "Ilmu Pengetahuan Alam",
|
||||
"short_name": "IPA",
|
||||
"description": "Pelajaran tentang sains dan alam",
|
||||
},
|
||||
{
|
||||
"name": "Ilmu Pengetahuan Sosial",
|
||||
"short_name": "IPS",
|
||||
"description": "Pelajaran tentang masyarakat dan geografi",
|
||||
},
|
||||
{
|
||||
"name": "Sejarah",
|
||||
"short_name": "Sejarah",
|
||||
"description": "Pelajaran mengenai sejarah di indonesia",
|
||||
},
|
||||
{
|
||||
"name": "Matematika",
|
||||
"short_name": "Matematika",
|
||||
"description": "Pelajaran tentang angka dan logika",
|
||||
},
|
||||
{
|
||||
"name": "Bahasa Indonesia",
|
||||
"short_name": "B.Indonesia",
|
||||
"description": "Pelajaran tentang bahasa nasional",
|
||||
},
|
||||
{
|
||||
"name": "Sejarah",
|
||||
"short_name": "Sejarah",
|
||||
"description": "Pelajaran sejarah Indonesia",
|
||||
},
|
||||
]
|
||||
|
||||
for subject in base_subjects:
|
||||
if not subject_collection.find_one({"name": subject["name"]}):
|
||||
subject_collection.insert_one(subject)
|
|
@ -1,136 +0,0 @@
|
|||
from dependency_injector import containers, providers
|
||||
from app.repositories import (
|
||||
UserRepository,
|
||||
QuizRepository,
|
||||
UserAnswerRepository,
|
||||
SubjectRepository,
|
||||
SessionRepository,
|
||||
NERSRLRepository,
|
||||
SessionMemoryRepository,
|
||||
QuizMemoryRepository,
|
||||
AnswerMemoryRepository,
|
||||
ScoreMemoryRepository,
|
||||
QuestionGenerationRepository,
|
||||
AnswerGenerationRepository,
|
||||
)
|
||||
|
||||
from app.services import (
|
||||
UserService,
|
||||
AuthService,
|
||||
QuizService,
|
||||
AnswerService,
|
||||
HistoryService,
|
||||
SubjectService,
|
||||
SessionService,
|
||||
QuestionGenerationService,
|
||||
)
|
||||
|
||||
from app.controllers import (
|
||||
UserController,
|
||||
AuthController,
|
||||
QuizController,
|
||||
HistoryController,
|
||||
SubjectController,
|
||||
SocketController,
|
||||
SessionController,
|
||||
)
|
||||
|
||||
|
||||
class Container(containers.DeclarativeContainer):
|
||||
"""Dependency Injection Container"""
|
||||
|
||||
mongo = providers.Dependency()
|
||||
redis = providers.Dependency()
|
||||
socketio = providers.Dependency()
|
||||
|
||||
# repository
|
||||
user_repository = providers.Factory(UserRepository, mongo.provided.db)
|
||||
quiz_repository = providers.Factory(QuizRepository, mongo.provided.db)
|
||||
answer_repository = providers.Factory(UserAnswerRepository, mongo.provided.db)
|
||||
subject_repository = providers.Factory(SubjectRepository, mongo.provided.db)
|
||||
session_repository = providers.Factory(SessionRepository, mongo.provided.db)
|
||||
ner_srl_repository = providers.Factory(NERSRLRepository)
|
||||
question_generation_repository = providers.Factory(QuestionGenerationRepository)
|
||||
answer_generator_repository = providers.Factory(AnswerGenerationRepository)
|
||||
session_memory_repository = providers.Factory(SessionMemoryRepository, redis)
|
||||
quiz_memory_repository = providers.Factory(QuizMemoryRepository, redis)
|
||||
answer_memory_repository = providers.Factory(AnswerMemoryRepository, redis)
|
||||
score_memory_repository = providers.Factory(ScoreMemoryRepository, redis)
|
||||
|
||||
# services
|
||||
auth_service = providers.Factory(
|
||||
AuthService,
|
||||
user_repository,
|
||||
)
|
||||
|
||||
user_service = providers.Factory(
|
||||
UserService,
|
||||
user_repository,
|
||||
answer_repository,
|
||||
quiz_repository,
|
||||
)
|
||||
|
||||
quiz_service = providers.Factory(
|
||||
QuizService,
|
||||
quiz_repository,
|
||||
user_repository,
|
||||
subject_repository,
|
||||
answer_repository,
|
||||
)
|
||||
|
||||
answer_service = providers.Factory(
|
||||
AnswerService,
|
||||
answer_repository,
|
||||
quiz_repository,
|
||||
user_repository,
|
||||
)
|
||||
|
||||
history_service = providers.Factory(
|
||||
HistoryService,
|
||||
quiz_repository,
|
||||
answer_repository,
|
||||
session_repository,
|
||||
user_repository,
|
||||
)
|
||||
|
||||
subject_service = providers.Factory(
|
||||
SubjectService,
|
||||
subject_repository,
|
||||
)
|
||||
|
||||
session_service = providers.Factory(
|
||||
SessionService,
|
||||
session_repository,
|
||||
session_memory_repository,
|
||||
quiz_memory_repository,
|
||||
answer_memory_repository,
|
||||
score_memory_repository,
|
||||
user_repository,
|
||||
quiz_repository,
|
||||
answer_repository,
|
||||
)
|
||||
|
||||
question_generation_service = providers.Factory(
|
||||
QuestionGenerationService,
|
||||
ner_srl_repository,
|
||||
question_generation_repository,
|
||||
answer_generator_repository,
|
||||
)
|
||||
|
||||
# controllers
|
||||
auth_controller = providers.Factory(AuthController, user_service, auth_service)
|
||||
user_controller = providers.Factory(UserController, user_service)
|
||||
quiz_controller = providers.Factory(
|
||||
QuizController,
|
||||
quiz_service,
|
||||
answer_service,
|
||||
question_generation_service,
|
||||
)
|
||||
history_controller = providers.Factory(HistoryController, history_service)
|
||||
subject_controller = providers.Factory(SubjectController, subject_service)
|
||||
socket_controller = providers.Factory(
|
||||
SocketController,
|
||||
socketio,
|
||||
session_service,
|
||||
)
|
||||
session_controller = providers.Factory(SessionController, session_service)
|
|
@ -1,12 +0,0 @@
|
|||
from .auth_exception import AuthException
|
||||
from .already_exist_exception import AlreadyExistException
|
||||
from .data_not_found_exception import DataNotFoundException
|
||||
from .validation_exception import ValidationException
|
||||
|
||||
|
||||
__all__ = [
|
||||
"AuthException",
|
||||
"AlreadyExistException",
|
||||
"DataNotFoundException",
|
||||
"ValidationException",
|
||||
]
|
|
@ -1,6 +0,0 @@
|
|||
class AlreadyExistException(Exception):
|
||||
def __init__(self, entity: str, message: str = None):
|
||||
if message is None:
|
||||
message = f"{entity} already exists"
|
||||
self.message = message
|
||||
super().__init__(self.message)
|
|
@ -1,8 +0,0 @@
|
|||
from .base_exception import BaseExceptionTemplate
|
||||
|
||||
|
||||
class AuthException(BaseExceptionTemplate):
|
||||
"""Exception for authentication-related errors"""
|
||||
|
||||
def __init__(self, message: str = "Authentication failed"):
|
||||
super().__init__(message, status_code=401)
|
|
@ -1,13 +0,0 @@
|
|||
class BaseExceptionTemplate(Exception):
|
||||
"""Base exception template for custom exceptions"""
|
||||
|
||||
def __init__(self, message: str, status_code: int = 400):
|
||||
self.message = message
|
||||
self.status_code = status_code
|
||||
super().__init__(self.message)
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.__class__.__name__}: {self.message}"
|
||||
|
||||
def json(self):
|
||||
return {"error": self.__class__.__name__, "message": self.message}
|
|
@ -1,8 +0,0 @@
|
|||
from .base_exception import BaseExceptionTemplate
|
||||
|
||||
|
||||
class DataNotFoundException(BaseExceptionTemplate):
|
||||
"""Exception for data not found"""
|
||||
|
||||
def __init__(self, message: str = "Data Not Found"):
|
||||
super().__init__(message, status_code=404)
|
|
@ -1,8 +0,0 @@
|
|||
from .base_exception import BaseExceptionTemplate
|
||||
|
||||
|
||||
class ValidationException(BaseExceptionTemplate):
|
||||
"""Exception for validation"""
|
||||
|
||||
def __init__(self, message: str = "validation error, check yout input"):
|
||||
super().__init__(message, status_code=400)
|
|
@ -1,9 +0,0 @@
|
|||
from .response_helper import make_response, make_error_response
|
||||
from .datetime_util import DatetimeUtil
|
||||
|
||||
|
||||
__all__ = [
|
||||
"make_response",
|
||||
"make_error_response",
|
||||
"DatetimeUtil",
|
||||
]
|
|
@ -1,44 +0,0 @@
|
|||
from datetime import datetime
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
|
||||
class DatetimeUtil:
|
||||
@staticmethod
|
||||
def now():
|
||||
"""Waktu UTC (timezone-aware)"""
|
||||
return datetime.now(tz=ZoneInfo("UTC"))
|
||||
|
||||
@staticmethod
|
||||
def now_iso():
|
||||
"""Waktu UTC dalam format ISO 8601 string"""
|
||||
return datetime.now(tz=ZoneInfo("UTC")).isoformat()
|
||||
|
||||
@staticmethod
|
||||
def now_jakarta():
|
||||
"""Waktu sekarang di zona Asia/Jakarta (WIB)"""
|
||||
return datetime.now(tz=ZoneInfo("Asia/Jakarta"))
|
||||
|
||||
@staticmethod
|
||||
def to_string(dt: datetime, fmt: str = "%Y-%m-%d %H:%M:%S") -> str:
|
||||
"""Convert UTC datetime to Asia/Jakarta time and format as string"""
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=ZoneInfo("UTC"))
|
||||
jakarta_time = dt.astimezone(ZoneInfo("Asia/Jakarta"))
|
||||
return jakarta_time.strftime(fmt)
|
||||
|
||||
@staticmethod
|
||||
def from_string(
|
||||
date_str: str, fmt: str = "%Y-%m-%d %H:%M:%S", tz: str = "UTC"
|
||||
) -> datetime:
|
||||
"""Convert string ke datetime dengan timezone"""
|
||||
dt = datetime.strptime(date_str, fmt)
|
||||
return dt.replace(tzinfo=ZoneInfo(tz))
|
||||
|
||||
@staticmethod
|
||||
def from_iso(date_str: str, tz: str = "UTC") -> datetime:
|
||||
"""Convert ISO 8601 string to datetime with timezone awareness"""
|
||||
dt = datetime.fromisoformat(date_str)
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=ZoneInfo(tz))
|
||||
return dt
|
||||
|
|
@ -1,49 +0,0 @@
|
|||
from flask import jsonify, current_app
|
||||
from typing import Optional, Union
|
||||
from app.schemas import ResponseSchema, MetaSchema
|
||||
import math
|
||||
|
||||
|
||||
def calculate_total_page(total_data: int, page_size: int) -> int:
|
||||
if not page_size or page_size <= 0:
|
||||
return 1
|
||||
return math.ceil(total_data / page_size)
|
||||
|
||||
|
||||
def make_response(
|
||||
message: str,
|
||||
data: Optional[Union[dict, list]] = None,
|
||||
page: Optional[int] = None,
|
||||
page_size: Optional[int] = None,
|
||||
total_all_data: Optional[int] = None,
|
||||
status_code: int = 200,
|
||||
):
|
||||
meta = None
|
||||
if page is not None and page_size is not None and total_all_data is not None:
|
||||
meta = MetaSchema(
|
||||
current_page=page,
|
||||
total_all_data=total_all_data,
|
||||
total_data=len(data) if data else 0,
|
||||
total_page=calculate_total_page(total_all_data, page_size),
|
||||
)
|
||||
|
||||
response = ResponseSchema(
|
||||
message=message,
|
||||
data=data,
|
||||
meta=meta,
|
||||
)
|
||||
return jsonify(response.model_dump()), status_code
|
||||
|
||||
|
||||
def make_error_response(
|
||||
err: Union[Exception, str],
|
||||
log_message: Optional[str] = None,
|
||||
status_code: int = 500,
|
||||
):
|
||||
"""Logs the error and returns a standardized error response"""
|
||||
error_message = str(err) if isinstance(err, Exception) else err
|
||||
log_msg = log_message or f"An error occurred: {error_message}"
|
||||
current_app.logger.error(log_msg, exc_info=True)
|
||||
|
||||
response = ResponseSchema(message="Internal server error", data=None, meta=None)
|
||||
return jsonify(response.model_dump()), status_code
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
File diff suppressed because one or more lines are too long
Binary file not shown.
Binary file not shown.
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
Binary file not shown.
Binary file not shown.
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
86
app/main.py
86
app/main.py
|
@ -1,77 +1,21 @@
|
|||
import eventlet
|
||||
|
||||
eventlet.monkey_patch()
|
||||
|
||||
import logging
|
||||
from flask import Flask
|
||||
from flask_socketio import SocketIO
|
||||
from blueprints import auth_blueprint, user_blueprint, default_blueprint
|
||||
from database import init_db
|
||||
from configs import Config # Import the config class
|
||||
|
||||
from app.di_container import Container
|
||||
from app.configs import Config, LoggerConfig
|
||||
from app.blueprints import (
|
||||
auth_blueprint,
|
||||
user_blueprint,
|
||||
quiz_bp,
|
||||
default_blueprint,
|
||||
history_blueprint,
|
||||
subject_blueprint,
|
||||
session_bp,
|
||||
swagger_blueprint,
|
||||
)
|
||||
from app.database import init_db
|
||||
from redis import Redis
|
||||
app = Flask(__name__)
|
||||
|
||||
# Apply configurations environtment
|
||||
app.config["FLASK_ENV"] = Config.FLASK_ENV
|
||||
app.config["DEBUG"] = Config.DEBUG
|
||||
|
||||
def createApp() -> tuple[Flask, SocketIO]:
|
||||
app = Flask(__name__)
|
||||
app.config.from_object(Config)
|
||||
LoggerConfig.init_logger(app, not Config.DEBUG)
|
||||
# Initialize database
|
||||
init_db(app)
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
|
||||
)
|
||||
# Register blueprints
|
||||
app.register_blueprint(default_blueprint)
|
||||
app.register_blueprint(auth_blueprint, url_prefix="/api")
|
||||
app.register_blueprint(user_blueprint, url_prefix="/api")
|
||||
|
||||
container = Container()
|
||||
app.container = container
|
||||
|
||||
mongo = init_db(app)
|
||||
if mongo is not None:
|
||||
container.mongo.override(mongo)
|
||||
|
||||
redis_url = Config().REDIS_URL
|
||||
redis_client = Redis.from_url(redis_url)
|
||||
redis_client.ping()
|
||||
container.redis.override(redis_client)
|
||||
|
||||
socketio = SocketIO(
|
||||
cors_allowed_origins="*",
|
||||
# message_queue=redis_url,
|
||||
async_mode="eventlet",
|
||||
)
|
||||
|
||||
container.socketio.override(socketio)
|
||||
container.socket_controller()
|
||||
|
||||
socketio.init_app(app)
|
||||
|
||||
container.wire(
|
||||
modules=[
|
||||
"app.blueprints.auth",
|
||||
"app.blueprints.user",
|
||||
"app.blueprints.quiz",
|
||||
"app.blueprints.history",
|
||||
"app.blueprints.subject",
|
||||
"app.blueprints.session",
|
||||
]
|
||||
)
|
||||
|
||||
app.register_blueprint(default_blueprint)
|
||||
app.register_blueprint(swagger_blueprint)
|
||||
app.register_blueprint(auth_blueprint, url_prefix="/api")
|
||||
app.register_blueprint(user_blueprint, url_prefix="/api")
|
||||
app.register_blueprint(quiz_bp, url_prefix="/api/quiz")
|
||||
app.register_blueprint(history_blueprint, url_prefix="/api/history")
|
||||
app.register_blueprint(subject_blueprint, url_prefix="/api/subject")
|
||||
app.register_blueprint(session_bp, url_prefix="/api/session")
|
||||
|
||||
return app, socketio
|
||||
if __name__ == "__main__":
|
||||
app.run(debug=Config.DEBUG)
|
||||
|
|
|
@ -1,10 +0,0 @@
|
|||
from .user_mapper import UserMapper
|
||||
from .quiz_mapper import QuizMapper
|
||||
from .subject_mapper import SubjectMapper
|
||||
|
||||
|
||||
__all__ = [
|
||||
"UserMapper",
|
||||
"QuizMapper",
|
||||
"SubjectMapper",
|
||||
]
|
|
@ -1,93 +0,0 @@
|
|||
from datetime import datetime
|
||||
from app.helpers import DatetimeUtil
|
||||
from app.models import QuizEntity, QuestionItemEntity, UserEntity
|
||||
from app.models.entities import SubjectEntity
|
||||
from app.schemas import QuizGetSchema, QuestionItemSchema
|
||||
from app.schemas.response import ListingQuizResponse
|
||||
from app.schemas.requests import QuizCreateSchema
|
||||
|
||||
|
||||
class QuizMapper:
|
||||
|
||||
@staticmethod
|
||||
def map_question_entity_to_schema(entity: QuestionItemEntity) -> QuestionItemSchema:
|
||||
return QuestionItemSchema(
|
||||
index=entity.index,
|
||||
question=entity.question,
|
||||
target_answer=entity.target_answer,
|
||||
duration=entity.duration,
|
||||
type=entity.type,
|
||||
options=entity.options,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def map_question_schema_to_entity(schema: QuestionItemSchema) -> QuestionItemEntity:
|
||||
return QuestionItemEntity(
|
||||
index=schema.index,
|
||||
question=schema.question,
|
||||
target_answer=schema.target_answer,
|
||||
duration=schema.duration,
|
||||
type=schema.type,
|
||||
options=schema.options,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def map_quiz_entity_to_schema(
|
||||
entity: QuizEntity,
|
||||
subjectE: SubjectEntity,
|
||||
) -> QuizGetSchema:
|
||||
return QuizGetSchema(
|
||||
id=str(entity.id),
|
||||
author_id=entity.author_id,
|
||||
subject_id=str(subjectE.id),
|
||||
subject_alias=subjectE.short_name,
|
||||
title=entity.title,
|
||||
description=entity.description,
|
||||
is_public=entity.is_public,
|
||||
date=DatetimeUtil.to_string(entity.date, "%d-%m-%Y"),
|
||||
time=DatetimeUtil.to_string(entity.date, "%H:%M:%S"),
|
||||
total_quiz=entity.total_quiz or 0,
|
||||
limit_duration=entity.limit_duration or 0,
|
||||
question_listings=[
|
||||
QuizMapper.map_question_entity_to_schema(q)
|
||||
for q in entity.question_listings or []
|
||||
],
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def map_quiz_schema_to_entity(
|
||||
schema: QuizCreateSchema,
|
||||
datetime: datetime,
|
||||
total_duration: int,
|
||||
) -> QuizEntity:
|
||||
return QuizEntity(
|
||||
author_id=schema.author_id,
|
||||
subject_id=schema.subject_id,
|
||||
title=schema.title,
|
||||
description=schema.description,
|
||||
is_public=schema.is_public,
|
||||
date=datetime,
|
||||
language_code=schema.lang_code,
|
||||
total_quiz=len(schema.question_listings),
|
||||
limit_duration=total_duration,
|
||||
question_listings=[
|
||||
QuizMapper.map_question_schema_to_entity(q)
|
||||
for q in schema.question_listings or []
|
||||
],
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def quiz_to_populer_mapper(
|
||||
quiz_entity: QuizEntity,
|
||||
user_entity: UserEntity,
|
||||
) -> ListingQuizResponse:
|
||||
return ListingQuizResponse(
|
||||
quiz_id=str(quiz_entity.id),
|
||||
author_id=str(user_entity.id),
|
||||
author_name=user_entity.name,
|
||||
title=quiz_entity.title,
|
||||
description=quiz_entity.description,
|
||||
date=quiz_entity.date.strftime("%d-%B-%Y") if quiz_entity.date else None,
|
||||
duration=quiz_entity.limit_duration,
|
||||
total_quiz=quiz_entity.total_quiz,
|
||||
)
|
|
@ -1,12 +0,0 @@
|
|||
from app.schemas.requests import SubjectCreateRequest
|
||||
from app.models.entities import SubjectEntity
|
||||
|
||||
|
||||
class SubjectMapper:
|
||||
@staticmethod
|
||||
def to_entity(data: SubjectCreateRequest) -> SubjectEntity:
|
||||
return SubjectEntity(
|
||||
name=data.name,
|
||||
short_name=data.alias,
|
||||
description=data.description,
|
||||
)
|
|
@ -1,64 +0,0 @@
|
|||
from datetime import datetime
|
||||
from typing import Dict, Optional
|
||||
from app.models import UserEntity
|
||||
from app.schemas import RegisterSchema
|
||||
from app.schemas.response import LoginResponseSchema
|
||||
from app.helpers import DatetimeUtil
|
||||
|
||||
|
||||
class UserMapper:
|
||||
@staticmethod
|
||||
def from_google_payload(
|
||||
google_id: str, email: str, payload: Dict[str, Optional[str]]
|
||||
) -> UserEntity:
|
||||
return UserEntity(
|
||||
google_id=google_id,
|
||||
email=email,
|
||||
name=payload.get("name"),
|
||||
pic_url=payload.get("picture"),
|
||||
birth_date=None,
|
||||
phone=None,
|
||||
role="user",
|
||||
is_active=True,
|
||||
address=None,
|
||||
created_at=datetime.now(),
|
||||
updated_at=datetime.now(),
|
||||
verification_token=None,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def from_register(data: RegisterSchema) -> UserEntity:
|
||||
return UserEntity(
|
||||
email=data.email,
|
||||
password=data.password,
|
||||
name=data.name,
|
||||
birth_date=datetime.strptime(data.birth_date, "%d-%m-%Y").date(),
|
||||
phone=data.phone,
|
||||
role="user",
|
||||
is_active=False,
|
||||
address=None,
|
||||
created_at=datetime.now(),
|
||||
updated_at=datetime.now(),
|
||||
verification_token=None,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def user_entity_to_response(user: UserEntity) -> LoginResponseSchema:
|
||||
return LoginResponseSchema(
|
||||
id=str(user.id) if user.id else None,
|
||||
email=user.email,
|
||||
name=user.name,
|
||||
birth_date=(
|
||||
DatetimeUtil.to_string(user.birth_date, fmt="%d-%m-%Y")
|
||||
if user.birth_date
|
||||
else None
|
||||
),
|
||||
pic_url=user.pic_url,
|
||||
phone=user.phone,
|
||||
locale=user.locale,
|
||||
created_at=(
|
||||
DatetimeUtil.to_string(user.created_at, fmt="%d-%m-%Y")
|
||||
if user.created_at
|
||||
else None
|
||||
),
|
||||
)
|
|
@ -1,13 +0,0 @@
|
|||
# app/models/__init__.py
|
||||
from .entities import UserEntity, QuizEntity, QuestionItemEntity, UserAnswerEntity
|
||||
from .login import UserResponseModel
|
||||
|
||||
|
||||
__all__ = [
|
||||
"UserEntity",
|
||||
"UserDTO",
|
||||
"UserResponseModel",
|
||||
"QuizEntity",
|
||||
"QuestionItemEntity",
|
||||
"UserAnswerEntity",
|
||||
]
|
|
@ -1 +0,0 @@
|
|||
from .response import ApiResponse
|
|
@ -1,22 +0,0 @@
|
|||
# from pydantic import BaseModel
|
||||
# from typing import Generic, TypeVar, Optional
|
||||
|
||||
# T = TypeVar("T")
|
||||
|
||||
|
||||
# class ApiResponse(BaseModel, Generic[T]):
|
||||
# success: bool
|
||||
# message: str
|
||||
# data: Optional[T] = None
|
||||
|
||||
# def to_json(self) -> str:
|
||||
# """
|
||||
# Convert the model to a properly formatted JSON string.
|
||||
# """
|
||||
# return self.model_dump_json(indent=4)
|
||||
|
||||
# def to_dict(self) -> dict:
|
||||
# """
|
||||
# Convert the model to a dictionary with proper key-value pairs.
|
||||
# """
|
||||
# return self.model_dump()
|
|
@ -1,19 +0,0 @@
|
|||
from .user_entity import UserEntity
|
||||
from .base import PyObjectId
|
||||
from .quiz_entity import QuizEntity
|
||||
from .question_item_entity import QuestionItemEntity
|
||||
from .user_answer_entity import UserAnswerEntity
|
||||
from .answer_item import AnswerItemEntity
|
||||
from .subject_entity import SubjectEntity
|
||||
from .session_entity import SessionEntity
|
||||
|
||||
__all__ = [
|
||||
"UserEntity",
|
||||
"PyObjectId",
|
||||
"QuizEntity",
|
||||
"QuestionItemEntity",
|
||||
"UserAnswerEntity",
|
||||
"AnswerItemEntity",
|
||||
"SubjectEntity",
|
||||
"SessionEntity",
|
||||
]
|
|
@ -1,9 +0,0 @@
|
|||
from pydantic import BaseModel
|
||||
from typing import Union
|
||||
|
||||
|
||||
class AnswerItemEntity(BaseModel):
|
||||
question_index: int
|
||||
answer: Union[str | int | bool]
|
||||
is_correct: bool
|
||||
time_spent: float
|
|
@ -1,29 +0,0 @@
|
|||
from bson import ObjectId
|
||||
from pydantic import GetCoreSchemaHandler
|
||||
from pydantic_core import core_schema
|
||||
|
||||
|
||||
class PyObjectId(ObjectId):
|
||||
"""Custom ObjectId type for Pydantic v2 to handle MongoDB _id"""
|
||||
|
||||
@classmethod
|
||||
def __get_pydantic_core_schema__(
|
||||
cls, source, handler: GetCoreSchemaHandler
|
||||
) -> core_schema.CoreSchema:
|
||||
return core_schema.no_info_after_validator_function(
|
||||
cls.validate,
|
||||
core_schema.union_schema(
|
||||
[
|
||||
core_schema.str_schema(),
|
||||
core_schema.is_instance_schema(ObjectId),
|
||||
]
|
||||
),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def validate(cls, v):
|
||||
if isinstance(v, ObjectId):
|
||||
return v
|
||||
if isinstance(v, str) and ObjectId.is_valid(v):
|
||||
return ObjectId(v)
|
||||
raise ValueError(f"Invalid ObjectId: {v}")
|
|
@ -1,11 +0,0 @@
|
|||
from typing import Optional, List, Union
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class QuestionItemEntity(BaseModel):
|
||||
index: int
|
||||
question: str
|
||||
target_answer: Union[str, bool, int]
|
||||
duration: int
|
||||
type: str
|
||||
options: Optional[List[str]] = None
|
|
@ -1,25 +0,0 @@
|
|||
from typing import Optional
|
||||
from pydantic import BaseModel, Field
|
||||
from datetime import datetime
|
||||
from .base import PyObjectId
|
||||
from .question_item_entity import QuestionItemEntity
|
||||
|
||||
|
||||
class QuizEntity(BaseModel):
|
||||
id: Optional[PyObjectId] = Field(default=None, alias="_id")
|
||||
author_id: Optional[str] = None
|
||||
subject_id: str
|
||||
title: str
|
||||
description: Optional[str] = None
|
||||
is_public: bool = False
|
||||
date: datetime
|
||||
total_quiz: int = 0
|
||||
limit_duration: Optional[int] = 0
|
||||
total_user_playing: int = 0
|
||||
language_code: Optional[str] = "id"
|
||||
question_listings: Optional[list[QuestionItemEntity]] = []
|
||||
|
||||
class ConfigDict:
|
||||
arbitrary_types_allowed = True
|
||||
populate_by_name = True
|
||||
json_encoders = {PyObjectId: str}
|
|
@ -1,19 +0,0 @@
|
|||
from datetime import datetime
|
||||
from typing import List, Optional
|
||||
from pydantic import BaseModel, Field
|
||||
from app.models.entities import PyObjectId
|
||||
|
||||
|
||||
class SessionEntity(BaseModel):
|
||||
id: Optional[PyObjectId] = Field(default=None, alias="_id")
|
||||
session_code: str
|
||||
quiz_id: str
|
||||
host_id: str
|
||||
room_name: str
|
||||
created_at: datetime = Field(default_factory=datetime.now)
|
||||
started_at: datetime | None = None
|
||||
ended_at: datetime | None = None
|
||||
is_active: bool = True
|
||||
participan_limit: int = 10
|
||||
participants: List[dict] = []
|
||||
current_question_index: int = 0
|
|
@ -1,17 +0,0 @@
|
|||
from typing import Optional
|
||||
from bson import ObjectId
|
||||
from pydantic import BaseModel, Field
|
||||
from app.models.entities import PyObjectId
|
||||
|
||||
|
||||
class SubjectEntity(BaseModel):
|
||||
id: Optional[PyObjectId] = Field(default=None, alias="_id")
|
||||
name: str
|
||||
short_name: str
|
||||
description: Optional[str] = None
|
||||
icon: Optional[str] = None
|
||||
|
||||
class ConfigDict:
|
||||
populate_by_name = True
|
||||
json_encoders = {ObjectId: str}
|
||||
json_schema_extra = {}
|
|
@ -1,23 +0,0 @@
|
|||
from typing import List, Optional
|
||||
from pydantic import BaseModel, Field
|
||||
from datetime import datetime
|
||||
from bson import ObjectId
|
||||
|
||||
from .answer_item import AnswerItemEntity
|
||||
from .base import PyObjectId
|
||||
|
||||
|
||||
class UserAnswerEntity(BaseModel):
|
||||
id: Optional[PyObjectId] = Field(default=None, alias="_id")
|
||||
session_id: Optional[str]
|
||||
quiz_id: str
|
||||
user_id: str
|
||||
answered_at: datetime
|
||||
answers: List[AnswerItemEntity]
|
||||
total_score: int
|
||||
total_correct: int
|
||||
|
||||
class ConfigDict:
|
||||
populate_by_name = True
|
||||
arbitrary_types_allowed = True
|
||||
json_encoders = {ObjectId: str}
|
|
@ -1,20 +0,0 @@
|
|||
from typing import Optional
|
||||
from pydantic import BaseModel, Field, ConfigDict
|
||||
from datetime import datetime
|
||||
from .base import PyObjectId
|
||||
|
||||
|
||||
class UserEntity(BaseModel):
|
||||
id: Optional[PyObjectId] = Field(default=None, alias="_id")
|
||||
google_id: Optional[str] = None
|
||||
email: str
|
||||
password: Optional[str] = None
|
||||
name: str
|
||||
birth_date: Optional[datetime] = None
|
||||
pic_url: Optional[str] = None
|
||||
phone: Optional[str] = None
|
||||
locale: str = "en-US"
|
||||
created_at: Optional[datetime] = None
|
||||
updated_at: Optional[datetime] = None
|
||||
|
||||
model_config = ConfigDict(populate_by_name=True, json_encoders={PyObjectId: str})
|
|
@ -1 +0,0 @@
|
|||
from .login_response import UserResponseModel
|
|
@ -1,20 +0,0 @@
|
|||
from pydantic import BaseModel, EmailStr, Field
|
||||
from typing import Optional
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class UserResponseModel(BaseModel):
|
||||
id: Optional[str] = Field(alias="_id")
|
||||
google_id: Optional[str] = None
|
||||
email: EmailStr
|
||||
name: str
|
||||
birth_date: Optional[datetime] = None
|
||||
pic_url: Optional[str] = None
|
||||
phone: Optional[str] = None
|
||||
locale: str
|
||||
|
||||
class ConfigDict:
|
||||
populate_by_name = True
|
||||
json_encoders = {
|
||||
datetime: lambda v: v.isoformat(),
|
||||
}
|
|
@ -1,27 +1 @@
|
|||
from .user_repository import UserRepository
|
||||
from .quiz_repositroy import QuizRepository
|
||||
from .answer_repository import UserAnswerRepository
|
||||
from .subject_repository import SubjectRepository
|
||||
from .session_repostory import SessionRepository
|
||||
from .ner_srl_repository import NERSRLRepository
|
||||
from .session_memory_repository import SessionMemoryRepository
|
||||
from .quiz_memory_repository import QuizMemoryRepository
|
||||
from .answer_memory_repository import AnswerMemoryRepository
|
||||
from .score_memory_repository import ScoreMemoryRepository
|
||||
from .question_generation_repository import QuestionGenerationRepository
|
||||
from .answer_generation_repository import AnswerGenerationRepository
|
||||
|
||||
__all__ = [
|
||||
"UserRepository",
|
||||
"QuizRepository",
|
||||
"UserAnswerRepository",
|
||||
"SubjectRepository",
|
||||
"SessionRepository",
|
||||
"NERSRLRepository",
|
||||
"SessionMemoryRepository",
|
||||
"QuizMemoryRepository",
|
||||
"AnswerMemoryRepository",
|
||||
"ScoreMemoryRepository",
|
||||
"QuestionGenerationRepository",
|
||||
"AnswerGenerationRepository",
|
||||
]
|
||||
|
|
Binary file not shown.
Binary file not shown.
|
@ -1,71 +0,0 @@
|
|||
import json
|
||||
import numpy as np
|
||||
import tensorflow as tf
|
||||
from tensorflow.keras.models import load_model # type: ignore
|
||||
from tensorflow.keras.preprocessing.sequence import pad_sequences # type: ignore
|
||||
from tensorflow.keras.preprocessing.text import tokenizer_from_json # type: ignore
|
||||
import re
|
||||
|
||||
|
||||
|
||||
class AnswerGenerationRepository:
|
||||
MODEL_PATH = "app/lstm_model/question_generation/qa_lstm_model_final_v2.keras"
|
||||
TOKENIZER_PATH = "app/lstm_model/question_generation/qa_tokenizers_v2.json"
|
||||
|
||||
def __init__(self):
|
||||
with open(self.TOKENIZER_PATH, "r") as f:
|
||||
tokenizer_data = json.load(f)
|
||||
|
||||
self.tokenizer = tokenizer_from_json(tokenizer_data["word_tokenizer"])
|
||||
self.ner_tokenizer = tokenizer_from_json(tokenizer_data["ner_tokenizer"])
|
||||
self.srl_tokenizer = tokenizer_from_json(tokenizer_data["srl_tokenizer"])
|
||||
self.answer_tokenizer = tokenizer_from_json(tokenizer_data["answer_tokenizer"])
|
||||
self.q_type_tokenizer = tokenizer_from_json(tokenizer_data["q_type_tokenizer"])
|
||||
|
||||
self.max_context_len = tokenizer_data["max_context_len"]
|
||||
self.max_question_len = tokenizer_data["max_question_len"]
|
||||
self.max_token_len = tokenizer_data["max_token_len"]
|
||||
self.q_type_vocab_size = len(self.q_type_tokenizer.word_index) + 1
|
||||
|
||||
self.model = load_model(self.MODEL_PATH)
|
||||
|
||||
def preprocess_text(self, text):
|
||||
text = text.lower()
|
||||
text = re.sub(r"\s+", " ", text).strip()
|
||||
return text
|
||||
|
||||
def predict_answer(self, context, question, tokens, ner, srl, q_type):
|
||||
context_seq = self.tokenizer.texts_to_sequences([self.preprocess_text(context)])
|
||||
question_seq = self.tokenizer.texts_to_sequences(
|
||||
[self.preprocess_text(question)]
|
||||
)
|
||||
token_seq = [self.tokenizer.texts_to_sequences([" ".join(tokens)])[0]]
|
||||
ner_seq = [self.ner_tokenizer.texts_to_sequences([" ".join(ner)])[0]]
|
||||
srl_seq = [self.srl_tokenizer.texts_to_sequences([" ".join(srl)])[0]]
|
||||
|
||||
q_type_idx = self.q_type_tokenizer.word_index.get(q_type, 0)
|
||||
q_type_cat = tf.keras.utils.to_categorical(
|
||||
[q_type_idx], num_classes=self.q_type_vocab_size
|
||||
)
|
||||
|
||||
context_pad = pad_sequences(
|
||||
context_seq, maxlen=self.max_context_len, padding="post"
|
||||
)
|
||||
question_pad = pad_sequences(
|
||||
question_seq, maxlen=self.max_question_len, padding="post"
|
||||
)
|
||||
token_pad = pad_sequences(token_seq, maxlen=self.max_token_len, padding="post")
|
||||
ner_pad = pad_sequences(ner_seq, maxlen=self.max_token_len, padding="post")
|
||||
srl_pad = pad_sequences(srl_seq, maxlen=self.max_token_len, padding="post")
|
||||
|
||||
prediction = self.model.predict(
|
||||
[context_pad, question_pad, token_pad, ner_pad, srl_pad, q_type_cat],
|
||||
verbose=0,
|
||||
)
|
||||
answer_idx = np.argmax(prediction[0])
|
||||
|
||||
for word, idx in self.answer_tokenizer.word_index.items():
|
||||
if idx == answer_idx:
|
||||
return word
|
||||
|
||||
return "Unknown"
|
|
@ -1,138 +0,0 @@
|
|||
import json
|
||||
from typing import Any, Dict, List
|
||||
from redis import Redis
|
||||
|
||||
|
||||
class AnswerMemoryRepository:
|
||||
KEY_TEMPLATE = "answer:{session_id}:{user_id}"
|
||||
KEY_PATTERN_TEMPLATE = "answer:{session_id}:*"
|
||||
|
||||
def __init__(self, redis: Redis):
|
||||
self.redis = redis
|
||||
|
||||
def _build_key(self, session_id: str, user_id: str) -> str:
|
||||
return self.KEY_TEMPLATE.format(session_id=session_id, user_id=user_id)
|
||||
|
||||
def _build_pattern_key(self, session_id: str) -> str:
|
||||
return self.KEY_PATTERN_TEMPLATE.format(session_id=session_id)
|
||||
|
||||
def initialize_empty_answers(
|
||||
self,
|
||||
session_id: str,
|
||||
user_ids: List[str],
|
||||
total_questions: int,
|
||||
):
|
||||
"""
|
||||
Initialize empty answers for all users at the start of the quiz.
|
||||
"""
|
||||
for user_id in user_ids:
|
||||
key = self._build_key(session_id, user_id)
|
||||
answers = [
|
||||
{
|
||||
"question_index": idx + 1,
|
||||
"answer": "",
|
||||
"is_true": False,
|
||||
"time_spent": 0.0,
|
||||
}
|
||||
for idx in range(total_questions)
|
||||
]
|
||||
self.set_data(key, answers)
|
||||
|
||||
def save_user_answer(
|
||||
self,
|
||||
session_id: str,
|
||||
user_id: str,
|
||||
question_index: int,
|
||||
answer: Any,
|
||||
correct: bool,
|
||||
time_spent: float,
|
||||
):
|
||||
"""
|
||||
Update user's answer for a specific question.
|
||||
Assumes answers have been initialized.
|
||||
"""
|
||||
key = self._build_key(session_id, user_id)
|
||||
answers = self.get_data(key) or []
|
||||
|
||||
for ans in answers:
|
||||
if ans.get("question_index") == question_index:
|
||||
ans.update(
|
||||
{
|
||||
"answer": answer,
|
||||
"is_true": correct,
|
||||
"time_spent": time_spent,
|
||||
}
|
||||
)
|
||||
break
|
||||
|
||||
self.set_data(key, answers)
|
||||
|
||||
def get_user_answers(self, session_id: str, user_id: str) -> List[Dict[str, Any]]:
|
||||
key = self._build_key(session_id, user_id)
|
||||
return self.get_data(key) or []
|
||||
|
||||
def get_all_user_answers(self, session_id: str) -> Dict[str, List[Dict[str, Any]]]:
|
||||
pattern = self._build_pattern_key(session_id)
|
||||
keys = self.redis.keys(pattern)
|
||||
all_answers = {}
|
||||
|
||||
for key in keys:
|
||||
user_id = key.decode().split(":")[-1]
|
||||
all_answers[user_id] = self.get_data(key) or []
|
||||
|
||||
return all_answers
|
||||
|
||||
def delete_all_answers(self, session_id: str):
|
||||
pattern = self._build_pattern_key(session_id)
|
||||
keys = self.redis.keys(pattern)
|
||||
if keys:
|
||||
self.redis.delete(*keys)
|
||||
|
||||
def set_data(self, key: str, value: Any):
|
||||
self.redis.set(key, json.dumps(value))
|
||||
|
||||
def get_data(self, key: str) -> Any:
|
||||
data = self.redis.get(key)
|
||||
return json.loads(data) if data else None
|
||||
|
||||
def auto_fill_incorrect_answers(
|
||||
self,
|
||||
session_id: str,
|
||||
question_index: int,
|
||||
default_time_spent: float = 0.0,
|
||||
) -> List[str]:
|
||||
"""
|
||||
Auto-fill unanswered specific question (by index) as incorrect for all users.
|
||||
:return: List of user IDs who had not answered the specific question.
|
||||
"""
|
||||
pattern = self._build_pattern_key(session_id)
|
||||
keys = self.redis.keys(pattern)
|
||||
|
||||
users_with_unanswered = []
|
||||
|
||||
for key in keys:
|
||||
answers = self.get_data(key) or []
|
||||
has_unanswered = False
|
||||
|
||||
for ans in answers:
|
||||
if (
|
||||
ans.get("question_index") == question_index
|
||||
and ans.get("answer") == ""
|
||||
):
|
||||
has_unanswered = True
|
||||
ans.update(
|
||||
{
|
||||
"answer": "",
|
||||
"is_true": False,
|
||||
"time_spent": default_time_spent,
|
||||
}
|
||||
)
|
||||
break # No need to check other answers for this user
|
||||
|
||||
if has_unanswered:
|
||||
user_id = key.decode().split(":")[-1]
|
||||
users_with_unanswered.append(user_id)
|
||||
|
||||
self.set_data(key, answers)
|
||||
|
||||
return users_with_unanswered
|
|
@ -1,50 +0,0 @@
|
|||
from pymongo.collection import Collection
|
||||
from bson import ObjectId
|
||||
from typing import Optional, List
|
||||
from app.models import UserAnswerEntity
|
||||
|
||||
|
||||
class UserAnswerRepository:
|
||||
def __init__(self, db):
|
||||
self.collection: Collection = db.user_answers
|
||||
|
||||
def create(self, answer_session: UserAnswerEntity) -> str:
|
||||
data = answer_session.model_dump(by_alias=True, exclude_none=True)
|
||||
result = self.collection.insert_one(data)
|
||||
return str(result.inserted_id)
|
||||
|
||||
def get_by_id(self, id: str) -> Optional[UserAnswerEntity]:
|
||||
result = self.collection.find_one({"_id": ObjectId(id)})
|
||||
if not result:
|
||||
return None
|
||||
return UserAnswerEntity(**result)
|
||||
|
||||
def get_by_userid_and_sessionid(
|
||||
self,
|
||||
user_id: str,
|
||||
session_id: str,
|
||||
) -> Optional[UserAnswerEntity]:
|
||||
result = self.collection.find_one(
|
||||
{"user_id": user_id, "session_id": session_id}
|
||||
)
|
||||
if not result:
|
||||
return None
|
||||
return UserAnswerEntity(**result)
|
||||
|
||||
def get_by_user_and_quiz(self, user_id: str, quiz_id: str) -> List[dict]:
|
||||
result = self.collection.find(
|
||||
{"user_id": user_id, "quiz_id": ObjectId(quiz_id)}
|
||||
)
|
||||
return list(result)
|
||||
|
||||
def get_by_user(self, user_id: str) -> list[UserAnswerEntity]:
|
||||
result = self.collection.find({"user_id": user_id}).sort("answered_at", -1)
|
||||
return [UserAnswerEntity(**doc) for doc in result]
|
||||
|
||||
def get_by_session(self, session_id: str) -> List[dict]:
|
||||
result = self.collection.find({"session_id": ObjectId(session_id)})
|
||||
return list(result)
|
||||
|
||||
def delete_by_id(self, id: str) -> bool:
|
||||
result = self.collection.delete_one({"_id": ObjectId(id)})
|
||||
return result.deleted_count > 0
|
|
@ -1,49 +0,0 @@
|
|||
import numpy as np
|
||||
import pickle
|
||||
from tensorflow.keras.models import load_model # type: ignore
|
||||
from tensorflow.keras.preprocessing.sequence import pad_sequences # type: ignore
|
||||
import re
|
||||
|
||||
|
||||
class NERSRLRepository:
|
||||
def __init__(self):
|
||||
# Load model and artifacts
|
||||
self.model = load_model("app/lstm_model/ner_srl/lstm_ner_srl_model.keras")
|
||||
|
||||
with open("app/lstm_model/ner_srl/word2idx.pkl", "rb") as f:
|
||||
self.word2idx = pickle.load(f)
|
||||
with open("app/lstm_model/ner_srl/tag2idx_ner.pkl", "rb") as f:
|
||||
self.tag2idx_ner = pickle.load(f)
|
||||
with open("app/lstm_model/ner_srl/tag2idx_srl.pkl", "rb") as f:
|
||||
self.tag2idx_srl = pickle.load(f)
|
||||
|
||||
self.idx2tag_ner = {i: t for t, i in self.tag2idx_ner.items()}
|
||||
self.idx2tag_srl = {i: t for t, i in self.tag2idx_srl.items()}
|
||||
|
||||
self.PAD_WORD_ID = self.word2idx["PAD"]
|
||||
self.MAXLEN = self.model.input_shape[1]
|
||||
|
||||
def _preprocess_tokens(self, tokens: list[str]) -> np.ndarray:
|
||||
seq = [self.word2idx.get(tok.lower(), self.word2idx["UNK"]) for tok in tokens]
|
||||
return pad_sequences(
|
||||
[seq], maxlen=self.MAXLEN, padding="post", value=self.PAD_WORD_ID
|
||||
)
|
||||
|
||||
def predict_sentence(self, sentence: str) -> dict:
|
||||
tokens = re.findall(r"\d{1,2}\.\d{2}|\w+|[^\w\s]", sentence.lower())
|
||||
|
||||
seq_padded = self._preprocess_tokens(tokens)
|
||||
|
||||
pred_ner_prob, pred_srl_prob = self.model.predict(seq_padded, verbose=0)
|
||||
pred_ner = pred_ner_prob.argmax(-1)[0][: len(tokens)]
|
||||
pred_srl = pred_srl_prob.argmax(-1)[0][: len(tokens)]
|
||||
|
||||
return {
|
||||
"tokens": tokens,
|
||||
"ner": [self.idx2tag_ner[int(i)] for i in pred_ner],
|
||||
"srl": [self.idx2tag_srl[int(i)] for i in pred_srl],
|
||||
}
|
||||
|
||||
def labeling_token(self, tokens: list[str]) -> dict:
|
||||
sentence = " ".join(tokens)
|
||||
return self.predict_sentence(sentence)
|
|
@ -1,87 +0,0 @@
|
|||
import numpy as np
|
||||
import json
|
||||
import tensorflow as tf
|
||||
from tensorflow.keras.preprocessing.text import tokenizer_from_json # type: ignore
|
||||
from tensorflow.keras.preprocessing.sequence import pad_sequences # type: ignore
|
||||
from tensorflow.keras.models import load_model # type: ignore
|
||||
import re
|
||||
|
||||
|
||||
class QuestionGenerationRepository:
|
||||
# Static paths for model and tokenizer
|
||||
MODEL_PATH = "app/lstm_model/question_generation/new_model/question_prediction_model_final.h5"
|
||||
TOKENIZER_PATH = "app/lstm_model/question_generation/new_model/question_prediction_tokenizers.json"
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialize question prediction model with pre-trained model and tokenizers
|
||||
using static paths
|
||||
"""
|
||||
# Load model
|
||||
self.model = load_model(self.MODEL_PATH)
|
||||
|
||||
# Load tokenizers
|
||||
with open(self.TOKENIZER_PATH, "r") as f:
|
||||
tokenizer_data = json.load(f)
|
||||
|
||||
# Reconstruct tokenizers
|
||||
self.word_tokenizer = tokenizer_from_json(tokenizer_data["word_tokenizer"])
|
||||
self.ner_tokenizer = tokenizer_from_json(tokenizer_data["ner_tokenizer"])
|
||||
self.srl_tokenizer = tokenizer_from_json(tokenizer_data["srl_tokenizer"])
|
||||
self.q_type_tokenizer = tokenizer_from_json(tokenizer_data["q_type_tokenizer"])
|
||||
|
||||
# Get max lengths
|
||||
self.max_context_len = tokenizer_data["max_context_len"]
|
||||
self.max_question_len = tokenizer_data["max_question_len"]
|
||||
self.max_token_len = tokenizer_data["max_token_len"]
|
||||
|
||||
# Get vocabulary sizes
|
||||
self.vocab_size = len(self.word_tokenizer.word_index) + 1
|
||||
self.q_type_vocab_size = len(self.q_type_tokenizer.word_index) + 1
|
||||
|
||||
def preprocess_text(self, text):
|
||||
"""Basic text preprocessing"""
|
||||
text = text.lower()
|
||||
text = re.sub(r"\s+", " ", text).strip()
|
||||
return text
|
||||
|
||||
def predict_question(self, context, tokens, ner, srl, q_type):
|
||||
"""Prediksi pertanyaan berdasarkan konteks dan fitur lainnya"""
|
||||
# Preprocess
|
||||
context = self.preprocess_text(context)
|
||||
|
||||
# Convert to sequences
|
||||
context_seq = self.word_tokenizer.texts_to_sequences([context])[0]
|
||||
token_seq = self.word_tokenizer.texts_to_sequences([" ".join(tokens)])[0]
|
||||
ner_seq = self.ner_tokenizer.texts_to_sequences([" ".join(ner)])[0]
|
||||
srl_seq = self.srl_tokenizer.texts_to_sequences([" ".join(srl)])[0]
|
||||
|
||||
# Pad sequences
|
||||
context_padded = pad_sequences(
|
||||
[context_seq], maxlen=self.max_context_len, padding="post"
|
||||
)
|
||||
token_padded = pad_sequences(
|
||||
[token_seq], maxlen=self.max_token_len, padding="post"
|
||||
)
|
||||
ner_padded = pad_sequences([ner_seq], maxlen=self.max_token_len, padding="post")
|
||||
srl_padded = pad_sequences([srl_seq], maxlen=self.max_token_len, padding="post")
|
||||
|
||||
# Q-type one-hot encoding
|
||||
q_type_idx = self.q_type_tokenizer.word_index.get(q_type, 0)
|
||||
q_type_one_hot = tf.keras.utils.to_categorical(
|
||||
[q_type_idx], num_classes=self.q_type_vocab_size
|
||||
)
|
||||
|
||||
# Predict
|
||||
pred = self.model.predict(
|
||||
[context_padded, token_padded, ner_padded, srl_padded, q_type_one_hot]
|
||||
)
|
||||
|
||||
# Convert prediction to words
|
||||
pred_seq = np.argmax(pred[0], axis=1)
|
||||
|
||||
# Convert indices to words
|
||||
reverse_word_map = {v: k for k, v in self.word_tokenizer.word_index.items()}
|
||||
pred_words = [reverse_word_map.get(i, "") for i in pred_seq if i != 0]
|
||||
|
||||
return " ".join(pred_words)
|
|
@ -1,32 +0,0 @@
|
|||
import json
|
||||
from typing import Dict, Any, Optional
|
||||
from redis import Redis
|
||||
from app.helpers import DatetimeUtil
|
||||
from app.models.entities import QuizEntity
|
||||
|
||||
|
||||
class QuizMemoryRepository:
|
||||
KEY_TEMPLATE = "quiz:{session_id}"
|
||||
|
||||
def __init__(self, redis: Redis):
|
||||
self.redis = redis
|
||||
|
||||
def _build_key(self, session_id: str) -> str:
|
||||
return self.KEY_TEMPLATE.format(session_id=session_id)
|
||||
|
||||
def set_quiz_for_session(self, session_id: str, quiz_data: QuizEntity):
|
||||
data = quiz_data.model_dump()
|
||||
data["id"] = str(data["id"])
|
||||
data["date"] = DatetimeUtil.to_string(data["date"])
|
||||
self.redis.set(self._build_key(session_id), json.dumps(data))
|
||||
|
||||
def get_quiz_for_session(self, session_id: str) -> Optional[QuizEntity]:
|
||||
data = self.redis.get(self._build_key(session_id))
|
||||
if data:
|
||||
data = json.loads(data)
|
||||
data["date"] = DatetimeUtil.from_string(data["date"])
|
||||
return QuizEntity(**data)
|
||||
return None
|
||||
|
||||
def delete_quiz_for_session(self, session_id: str):
|
||||
self.redis.delete(self._build_key(session_id))
|
|
@ -1,155 +0,0 @@
|
|||
from bson import ObjectId
|
||||
from typing import List, Optional
|
||||
from app.models.entities import QuizEntity
|
||||
from pymongo.database import Database
|
||||
from pymongo.collection import Collection
|
||||
|
||||
|
||||
class QuizRepository:
|
||||
def __init__(self, db: Database):
|
||||
self.collection: Collection = db.quiz
|
||||
|
||||
def create(self, quiz: QuizEntity) -> str:
|
||||
quiz_dict = quiz.model_dump(by_alias=True, exclude_none=True)
|
||||
result = self.collection.insert_one(quiz_dict)
|
||||
return str(result.inserted_id)
|
||||
|
||||
def get_by_id(self, quiz_id: str) -> Optional[QuizEntity]:
|
||||
data = self.collection.find_one({"_id": ObjectId(quiz_id)})
|
||||
if data:
|
||||
return QuizEntity(**data)
|
||||
return None
|
||||
|
||||
def search_by_title_or_category(
|
||||
self, keyword: str, subject_id: Optional[str], page: int, page_size: int
|
||||
) -> List[QuizEntity]:
|
||||
skip = (page - 1) * page_size
|
||||
|
||||
query_conditions = [
|
||||
{"is_public": True},
|
||||
{
|
||||
"$or": [
|
||||
{"title": {"$regex": keyword, "$options": "i"}},
|
||||
# {"category": {"$regex": keyword, "$options": "i"}},
|
||||
]
|
||||
},
|
||||
]
|
||||
|
||||
if subject_id:
|
||||
query_conditions.append({"subject_id": subject_id})
|
||||
|
||||
cursor = (
|
||||
self.collection.find({"$and": query_conditions}).skip(skip).limit(page_size)
|
||||
)
|
||||
|
||||
return [QuizEntity(**doc) for doc in cursor]
|
||||
|
||||
def count_by_search(self, keyword: str) -> int:
|
||||
return self.collection.count_documents(
|
||||
{
|
||||
"$or": [
|
||||
{"title": {"$regex": keyword, "$options": "i"}},
|
||||
{"category": {"$regex": keyword, "$options": "i"}},
|
||||
]
|
||||
}
|
||||
)
|
||||
|
||||
def get_by_ids(self, quiz_ids: List[str]) -> Optional[List[QuizEntity]]:
|
||||
object_ids = [ObjectId(qid) for qid in quiz_ids]
|
||||
cursor = self.collection.find({"_id": {"$in": object_ids}})
|
||||
datas = list(cursor)
|
||||
|
||||
if not datas:
|
||||
return None
|
||||
|
||||
return [QuizEntity(**data) for data in datas]
|
||||
|
||||
def get_by_user_id(
|
||||
self, user_id: str, page: int = 1, page_size: int = 10
|
||||
) -> List[QuizEntity]:
|
||||
skip = (page - 1) * page_size
|
||||
cursor = (
|
||||
self.collection.find({"author_id": user_id}).skip(skip).limit(page_size)
|
||||
)
|
||||
|
||||
return [QuizEntity(**data) for data in cursor]
|
||||
|
||||
def get_all(self, skip: int = 0, limit: int = 10) -> List[QuizEntity]:
|
||||
cursor = self.collection.find().skip(skip).limit(limit)
|
||||
return [QuizEntity(**doc) for doc in cursor]
|
||||
|
||||
def update(self, quiz_id: str, update_data: dict) -> bool:
|
||||
result = self.collection.update_one(
|
||||
{"_id": ObjectId(quiz_id)}, {"$set": update_data}
|
||||
)
|
||||
return result.modified_count > 0
|
||||
|
||||
def update_user_playing(self, quiz_id: str, total_user: int) -> bool:
|
||||
result = self.collection.update_one(
|
||||
{"_id": ObjectId(quiz_id)}, {"$set": {"total_user_playing": total_user}}
|
||||
)
|
||||
return result.modified_count > 0
|
||||
|
||||
def delete(self, quiz_id: str) -> bool:
|
||||
result = self.collection.delete_one({"_id": ObjectId(quiz_id)})
|
||||
return result.deleted_count > 0
|
||||
|
||||
def count_by_user_id(self, user_id: str) -> int:
|
||||
return self.collection.count_documents({"author_id": user_id})
|
||||
|
||||
def get_top_played_quizzes(
|
||||
self,
|
||||
page: int = 1,
|
||||
limit: int = 3,
|
||||
is_public: bool = True,
|
||||
lang_code: str = "id",
|
||||
) -> List[QuizEntity]:
|
||||
skip = (page - 1) * limit
|
||||
cursor = (
|
||||
self.collection.find(
|
||||
{
|
||||
"is_public": is_public,
|
||||
"language_code": lang_code,
|
||||
}
|
||||
)
|
||||
.sort("total_user_playing", -1)
|
||||
.skip(skip)
|
||||
.limit(limit)
|
||||
)
|
||||
return [QuizEntity(**doc) for doc in cursor]
|
||||
|
||||
def get_random_quizzes(
|
||||
self,
|
||||
limit: int = 3,
|
||||
lang_code: str = "id",
|
||||
) -> List[QuizEntity]:
|
||||
pipeline = [
|
||||
{
|
||||
"$match": {
|
||||
"is_public": True,
|
||||
"language_code": lang_code,
|
||||
}
|
||||
},
|
||||
{"$sample": {"size": limit}},
|
||||
]
|
||||
cursor = self.collection.aggregate(pipeline)
|
||||
return [QuizEntity(**doc) for doc in cursor]
|
||||
|
||||
def get_random_quizzes_by_subjects(
|
||||
self,
|
||||
subject_ids: List[str],
|
||||
limit: int = 3,
|
||||
lang_code: str = "id",
|
||||
) -> List[QuizEntity]:
|
||||
pipeline = [
|
||||
{
|
||||
"$match": {
|
||||
"subject_id": {"$in": subject_ids},
|
||||
"is_public": True,
|
||||
"language_code": lang_code,
|
||||
}
|
||||
},
|
||||
{"$sample": {"size": limit}},
|
||||
]
|
||||
cursor = self.collection.aggregate(pipeline)
|
||||
return [QuizEntity(**doc) for doc in cursor]
|
|
@ -1,28 +0,0 @@
|
|||
from typing import Dict
|
||||
from redis import Redis
|
||||
|
||||
|
||||
class ScoreMemoryRepository:
|
||||
KEY_TEMPLATE = "score:{session_id}"
|
||||
|
||||
def __init__(self, redis: Redis):
|
||||
self.redis = redis
|
||||
|
||||
def _build_key(self, session_id: str) -> str:
|
||||
return self.KEY_TEMPLATE.format(session_id=session_id)
|
||||
|
||||
def update_user_score(self, session_id: str, user_id: str, correct: bool):
|
||||
hkey = self._build_key(session_id)
|
||||
field = f"{user_id}:{'correct' if correct else 'incorrect'}"
|
||||
self.redis.hincrby(hkey, field, 1)
|
||||
|
||||
def get_scores(self, session_id: str) -> Dict[str, Dict[str, int]]:
|
||||
raw = self.redis.hgetall(self._build_key(session_id))
|
||||
scores = {}
|
||||
for k, v in raw.items():
|
||||
uid, category = k.decode().split(":")
|
||||
scores.setdefault(uid, {"correct": 0, "incorrect": 0})[category] = int(v)
|
||||
return scores
|
||||
|
||||
def delete_scores(self, session_id: str):
|
||||
self.redis.delete(self._build_key(session_id))
|
|
@ -1,108 +0,0 @@
|
|||
import json
|
||||
from typing import Dict, Any, Optional
|
||||
from redis import Redis
|
||||
from app.helpers import DatetimeUtil
|
||||
from app.models.entities import SessionEntity
|
||||
|
||||
|
||||
class SessionMemoryRepository:
|
||||
KEY_TEMPLATE = "session:{session_id}"
|
||||
KEY_PATTERN = "session:*"
|
||||
|
||||
def __init__(self, redis: Redis):
|
||||
self.redis = redis
|
||||
|
||||
def _build_key(self, session_id: str) -> str:
|
||||
return self.KEY_TEMPLATE.format(session_id=session_id)
|
||||
|
||||
def set_data(self, key: str, value: Any):
|
||||
self.redis.set(key, json.dumps(value))
|
||||
|
||||
def get_data(self, key: str) -> Optional[Any]:
|
||||
data = self.redis.get(key)
|
||||
return json.loads(data) if data else None
|
||||
|
||||
def delete_key(self, key: str):
|
||||
self.redis.delete(key)
|
||||
|
||||
def create_session(self, session_id: str, initial_data: SessionEntity) -> str:
|
||||
data = initial_data.model_dump()
|
||||
data["id"] = data["id"]
|
||||
data["created_at"] = str(data["created_at"])
|
||||
self.set_data(self._build_key(session_id), data)
|
||||
return session_id
|
||||
|
||||
def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
|
||||
return self.get_data(self._build_key(session_id))
|
||||
|
||||
def close_session(self, session_id: str) -> Optional[Dict[str, Any]]:
|
||||
session = self.get_data(self._build_key(session_id))
|
||||
if not session:
|
||||
return None
|
||||
session["status"] = "closed"
|
||||
session["closed_at"] = DatetimeUtil.now_iso()
|
||||
self.delete_key(self._build_key(session_id))
|
||||
return session
|
||||
|
||||
def find_session_by_code(self, session_code: str) -> Optional[Dict[str, Any]]:
|
||||
session_keys = self.redis.keys(self.KEY_PATTERN)
|
||||
for key in session_keys:
|
||||
session_data = self.get_data(key)
|
||||
if session_data and session_data.get("session_code") == session_code:
|
||||
return session_data
|
||||
return None
|
||||
|
||||
def add_user_to_session(
|
||||
self, session_id: str, user_data: Dict[str, Any] = None
|
||||
) -> bool:
|
||||
session = self.get_session(session_id)
|
||||
if not session:
|
||||
return False
|
||||
|
||||
user_entry = {
|
||||
**(user_data or {}),
|
||||
"joined_at": DatetimeUtil.now_iso(),
|
||||
}
|
||||
|
||||
existing_users = session.get("participants", [])
|
||||
existing_users.append(user_entry)
|
||||
session["participants"] = existing_users
|
||||
|
||||
self.set_data(self._build_key(session_id), session)
|
||||
return True
|
||||
|
||||
def get_user_in_session(self, session_id: str):
|
||||
session = self.get_session(session_id)
|
||||
if not session:
|
||||
return []
|
||||
return session.get("participants", [])
|
||||
|
||||
def remove_user_from_session(self, session_id: str, user_id: str) -> bool:
|
||||
session = self.get_session(session_id)
|
||||
if not session:
|
||||
return False
|
||||
|
||||
session["participants"] = [
|
||||
user
|
||||
for user in session.get("participants", [])
|
||||
if user.get("id") != user_id
|
||||
]
|
||||
|
||||
self.set_data(self._build_key(session_id), session)
|
||||
return True
|
||||
|
||||
def delete_session(self, session_id: str) -> bool:
|
||||
"""
|
||||
Delete a session by its session_id.
|
||||
|
||||
Args:
|
||||
session_id (str): The ID of the session to delete.
|
||||
|
||||
Returns:
|
||||
bool: True if the session was deleted, False if it did not exist.
|
||||
"""
|
||||
key = self._build_key(session_id)
|
||||
if self.redis.exists(key):
|
||||
self.delete_key(key)
|
||||
return True
|
||||
return False
|
|
@ -1,49 +0,0 @@
|
|||
from pymongo.collection import Collection
|
||||
from pymongo.database import Database
|
||||
from typing import Optional
|
||||
from app.models.entities import SessionEntity
|
||||
from bson import ObjectId
|
||||
|
||||
|
||||
class SessionRepository:
|
||||
COLLECTION_NAME = "session"
|
||||
|
||||
def __init__(self, db: Database):
|
||||
self.collection: Collection = db[self.COLLECTION_NAME]
|
||||
# self.collection.create_index("id", unique=True)
|
||||
|
||||
def insert(self, session_data: SessionEntity) -> str:
|
||||
result = self.collection.insert_one(
|
||||
session_data.model_dump(by_alias=True, exclude_none=True)
|
||||
)
|
||||
return str(result.inserted_id)
|
||||
|
||||
def find_by_session_id(self, session_id: str) -> Optional[SessionEntity]:
|
||||
doc = self.collection.find_one({"_id": ObjectId(session_id)})
|
||||
return SessionEntity(**doc) if doc else None
|
||||
|
||||
def find_by_session_code(self, session_code: str) -> Optional[SessionEntity]:
|
||||
doc = self.collection.find_one({"session_code": session_code})
|
||||
return SessionEntity(**doc) if doc else None
|
||||
|
||||
def update(self, session_id: str, update_fields: SessionEntity) -> bool:
|
||||
result = self.collection.update_one(
|
||||
{"_id": ObjectId(session_id)},
|
||||
{"$set": update_fields.model_dump(by_alias=True, exclude_none=True)},
|
||||
)
|
||||
return result.modified_count > 0
|
||||
|
||||
def add_participant(self, session_id: str, user_id: str) -> bool:
|
||||
"""Add user_id to participants array without duplicates"""
|
||||
result = self.collection.update_one(
|
||||
{"_id": session_id}, {"$addToSet": {"participants": user_id}}
|
||||
)
|
||||
return result.modified_count > 0
|
||||
|
||||
def delete(self, session_id: str) -> bool:
|
||||
result = self.collection.delete_one({"_id": session_id})
|
||||
return result.deleted_count > 0
|
||||
|
||||
def list_active_sessions(self) -> list[SessionEntity]:
|
||||
docs = self.collection.find({"is_active": True})
|
||||
return [SessionEntity(**doc) for doc in docs]
|
|
@ -1,61 +0,0 @@
|
|||
from typing import List, Optional
|
||||
from pymongo.database import Database
|
||||
from pymongo.collection import Collection
|
||||
from bson import ObjectId, errors as bson_errors
|
||||
from app.models.entities import SubjectEntity
|
||||
|
||||
|
||||
class SubjectRepository:
|
||||
COLLECTION_NAME = "subjects"
|
||||
|
||||
def __init__(self, db: Database):
|
||||
self.collection: Collection = db[self.COLLECTION_NAME]
|
||||
|
||||
def create(self, subject: SubjectEntity) -> str:
|
||||
subject_dict = subject.model_dump(by_alias=True, exclude_none=True)
|
||||
result = self.collection.insert_one(subject_dict)
|
||||
return str(result.inserted_id)
|
||||
|
||||
def get_all(self) -> List[SubjectEntity]:
|
||||
return [SubjectEntity(**doc) for doc in self.collection.find()]
|
||||
|
||||
def get_by_id(self, subject_id: str) -> Optional[SubjectEntity]:
|
||||
try:
|
||||
oid = ObjectId(subject_id)
|
||||
except bson_errors.InvalidId:
|
||||
return None
|
||||
|
||||
doc = self.collection.find_one({"_id": oid})
|
||||
return SubjectEntity(**doc) if doc else None
|
||||
|
||||
def get_by_ids(self, subject_ids: List[str]) -> List[SubjectEntity]:
|
||||
object_ids = []
|
||||
for sid in subject_ids:
|
||||
try:
|
||||
object_ids.append(ObjectId(sid))
|
||||
except bson_errors.InvalidId:
|
||||
continue
|
||||
|
||||
if not object_ids:
|
||||
return []
|
||||
|
||||
cursor = self.collection.find({"_id": {"$in": object_ids}})
|
||||
return [SubjectEntity(**doc) for doc in cursor]
|
||||
|
||||
def update(self, subject_id: str, update_data: dict) -> bool:
|
||||
try:
|
||||
oid = ObjectId(subject_id)
|
||||
except bson_errors.InvalidId:
|
||||
return False
|
||||
|
||||
result = self.collection.update_one({"_id": oid}, {"$set": update_data})
|
||||
return result.modified_count > 0
|
||||
|
||||
def delete(self, subject_id: str) -> bool:
|
||||
try:
|
||||
oid = ObjectId(subject_id)
|
||||
except bson_errors.InvalidId:
|
||||
return False
|
||||
|
||||
result = self.collection.delete_one({"_id": oid})
|
||||
return result.deleted_count > 0
|
|
@ -1,54 +1,24 @@
|
|||
from typing import Optional
|
||||
from bson import ObjectId
|
||||
from app.models.entities import UserEntity
|
||||
|
||||
|
||||
class UserRepository:
|
||||
def __init__(self, db):
|
||||
self.collection = db.users
|
||||
users = [
|
||||
{
|
||||
"id": 1,
|
||||
"name": "akhdan",
|
||||
"email": "akhdanre@gmail.com",
|
||||
"password": "password123",
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"name": "Bob",
|
||||
"email": "bob@example.com",
|
||||
"password": "password123",
|
||||
},
|
||||
]
|
||||
|
||||
def get_all_users(self) -> list[UserEntity]:
|
||||
"""Retrieve all users from the database."""
|
||||
users = list(self.collection.find({}, {"_id": 0}))
|
||||
return [UserEntity(**user) for user in users]
|
||||
def get_all_users(self):
|
||||
return self.users
|
||||
|
||||
def get_user_by_email(self, email: str) -> Optional[UserEntity]:
|
||||
"""Retrieve a user based on their email address."""
|
||||
user = self.collection.find_one({"email": email})
|
||||
return UserEntity(**user) if user else None
|
||||
|
||||
def get_user_by_id(self, user_id: str) -> Optional[UserEntity]:
|
||||
"""Retrieve a user based on their ID."""
|
||||
object_id = ObjectId(user_id)
|
||||
user = self.collection.find_one({"_id": object_id})
|
||||
return UserEntity(**user) if user else None
|
||||
|
||||
def get_by_google_id(self, google_id: str) -> Optional[UserEntity]:
|
||||
"""Retrieve a user based on their Google ID."""
|
||||
user_data = self.collection.find_one({"google_id": google_id})
|
||||
return UserEntity(**user_data) if user_data else None
|
||||
|
||||
def insert_user(self, user_data: UserEntity) -> str:
|
||||
"""Insert a new user into the database and return the user's ID."""
|
||||
result = self.collection.insert_one(user_data.model_dump())
|
||||
return str(result.inserted_id)
|
||||
|
||||
def update_user(self, user_id: str, update_data: dict) -> bool:
|
||||
"""Update all fields of a user based on their ID."""
|
||||
object_id = ObjectId(user_id)
|
||||
result = self.collection.update_one({"_id": object_id}, {"$set": update_data})
|
||||
return result.modified_count > 0
|
||||
|
||||
def update_user_field(self, user_id: str, field: str, value) -> bool:
|
||||
"""Update a single field of a user based on their ID."""
|
||||
object_id = ObjectId(user_id)
|
||||
result = self.collection.update_one(
|
||||
{"_id": object_id}, {"$set": {field: value}}
|
||||
)
|
||||
return result.modified_count > 0
|
||||
|
||||
def delete_user(self, user_id: str) -> bool:
|
||||
"""Delete a user based on their ID."""
|
||||
object_id = ObjectId(user_id)
|
||||
result = self.collection.delete_one({"_id": object_id})
|
||||
return result.deleted_count > 0
|
||||
def get_user_by_email(self, email):
|
||||
for user in self.users:
|
||||
if user.get("email") == email:
|
||||
return user
|
||||
return None
|
||||
|
|
|
@ -1,15 +0,0 @@
|
|||
from .login_schema import LoginSchema
|
||||
from .basic_response_schema import ResponseSchema, MetaSchema
|
||||
from .requests import RegisterSchema
|
||||
from .response import QuizCreationResponse, QuizGetSchema, QuestionItemSchema
|
||||
|
||||
|
||||
__all__ = [
|
||||
"LoginSchema",
|
||||
"ResponseSchema",
|
||||
"MetaSchema",
|
||||
"RegisterSchema",
|
||||
"QuizCreationResponse",
|
||||
"QuizGetSchema",
|
||||
"QuestionItemSchema",
|
||||
]
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue