Merge pull request #3 from Akhdanre/feat/realtime-system

Feat/realtime system
This commit is contained in:
Akhdan Robbani 2025-04-28 11:44:19 +07:00 committed by GitHub
commit bd3db93279
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
48 changed files with 824 additions and 109 deletions

View File

@ -2,5 +2,14 @@ from .default import default_blueprint
from .auth import auth_blueprint
from .user import user_blueprint
from .quiz import quiz_bp
__all__ = [
"default_blueprint",
"auth_blueprint",
"user_blueprint",
"quiz_bp",
]
# from .user import user_blueprint

59
app/blueprints/quiz.py Normal file
View File

@ -0,0 +1,59 @@
from flask import Blueprint, request
from di_container import Container
from dependency_injector.wiring import inject, Provide
from controllers import QuizController
quiz_bp = Blueprint("quiz", __name__)
@quiz_bp.route("", methods=["POST"])
@inject
def create_quiz(controller: QuizController = Provide[Container.quiz_controller]):
reqBody = request.get_json()
return controller.create_quiz(reqBody)
@quiz_bp.route("/<quiz_id>", methods=["GET"])
@inject
def get_quiz(
quiz_id: str, controller: QuizController = Provide[Container.quiz_controller]
):
return controller.get_quiz(quiz_id)
@quiz_bp.route("/answer", methods=["POST"])
@inject
def submit_answer(controller: QuizController = Provide[Container.quiz_controller]):
req_body = request.get_json()
return controller.submit_answer(req_body)
@quiz_bp.route("/answer", methods=["GET"])
@inject
def get_answer(controller: QuizController = Provide[Container.quiz_controller]):
quiz_id = request.args.get("quiz_id")
user_id = request.args.get("user_id")
session_id = request.args.get("session_id")
return controller.get_answer(
quiz_id=quiz_id, user_id=user_id, session_id=session_id
)
@quiz_bp.route("/recomendation", methods=["GET"])
@inject
def get_quiz_recommendation(
controller: QuizController = Provide[Container.quiz_controller],
):
return controller.get_quiz_recommendation()
@quiz_bp.route("/user/<user_id>", methods=["GET"])
@inject
def get_user_quiz(controller: QuizController = Provide[Container.quiz_controller]):
page = request.args.get("page", default=1, type=int)
page_size = request.args.get("page_size", default=10, type=int)
return controller.get_user_quiz(
user_id=request.view_args["user_id"], page=page, page_size=page_size
)

View File

@ -1,2 +1,10 @@
from .auth_controller import AuthController
from .user_controller import UserController
from .quiz_controller import QuizController
__all__ = [
"AuthController",
"UserController",
"QuizController",
]

View File

@ -1,10 +1,16 @@
from flask import jsonify, request, current_app
from pydantic import ValidationError
from models.login.login_response import UserResponseModel
from schemas.basic_response_schema import ResponseSchema
from schemas.google_login_schema import GoogleLoginSchema
from schemas import LoginSchema
from services import UserService, AuthService
from exception import AuthException
from mapper import UserMapper
from helpers import make_response
import logging
logging = logging.getLogger(__name__)
class AuthController:
@ -17,10 +23,14 @@ class AuthController:
data = request.get_json()
dataSchema = LoginSchema(**data)
response = self.auth_service.login(dataSchema)
if response is None:
return make_response(message="User is not registered", status_code=401)
return (
jsonify(
ResponseSchema(
message="Register success", data=response
message="Register success",
data=UserMapper.user_entity_to_response(response),
).model_dump()
),
200,
@ -59,7 +69,7 @@ class AuthController:
response = ResponseSchema(
message="Login successful",
data=user_info,
data=UserMapper.user_entity_to_response(user_info),
meta=None,
)
return jsonify(response.model_dump()), 200

View File

@ -0,0 +1,95 @@
from flask import jsonify
from pydantic import ValidationError
from schemas.requests import QuizCreateSchema, UserAnswerSchema
from schemas.response import QuizCreationResponse
from services import QuizService, AnswerService
from helpers import make_response, make_error_response
class QuizController:
def __init__(self, quiz_service: QuizService, answer_service: AnswerService):
self.quiz_service = quiz_service
self.answer_service = answer_service
def get_quiz(self, quiz_id):
try:
result = self.quiz_service.get_quiz(quiz_id)
if not result:
return make_response(message="Quiz not found", status_code=404)
return make_response(message="Quiz Found", data=result.dict())
except Exception as e:
return make_error_response(e)
def create_quiz(self, quiz_data):
try:
quiz_obj = QuizCreateSchema(**quiz_data)
quiz_id = self.quiz_service.create_quiz(quiz_obj)
return make_response(
message="Quiz created",
data=QuizCreationResponse(quiz_id=quiz_id),
status_code=201,
)
except ValidationError as e:
return make_response(e.errors(), status_code=400)
except Exception as e:
return make_error_response(e)
# def update_quiz(self, quiz_id, quiz_data):
# try:
# quiz_obj = QuizUpdateSchema(**quiz_data)
# success = self.quiz_service.update_quiz(
# quiz_id, quiz_obj.dict(exclude_unset=True)
# )
# if not success:
# return jsonify({"error": "Quiz not found or update failed"}), 400
# return jsonify({"message": "Quiz updated"}), 200
# except ValidationError as e:
# return jsonify({"error": "Validation error", "detail": e.errors()}), 400
def delete_quiz(self, quiz_id):
success = self.quiz_service.delete_quiz(quiz_id)
if not success:
return jsonify({"error": "Quiz not found"}), 400
return jsonify({"message": "Quiz deleted"}), 200
def quiz_recomendation(self):
try:
result = self.quiz_service.get_quiz_recommendation()
if not result:
return make_response(message="Quiz not found", status_code=404)
return make_response(message="Quiz Found", data=result.dict())
except Exception as e:
return make_error_response(e)
def submit_answer(self, answer_data):
try:
# Assuming answer_data is a dictionary with the necessary fields
answer_obj = UserAnswerSchema(**answer_data)
answer_id = self.answer_service.create_answer(answer_obj)
return make_response(
message="Answer submitted",
data={"answer_id": answer_id},
status_code=201,
)
except ValidationError as e:
return make_response(e.errors(), status_code=400)
except Exception as e:
return make_error_response(e)
def get_answer(self, quiz_id, user_id, session_id):
try:
# self.answer_service.
print("yps")
except Exception as e:
return make_error_response(e)
def get_user_quiz(self, user_id, page=1, page_size=10):
try:
result = self.quiz_service.get_user_quiz(
user_id=user_id, page=page, page_size=page_size
)
return make_response(
message="User quizzes retrieved successfully", data=result
)
except Exception as e:
return make_error_response(e)

View File

@ -5,6 +5,7 @@ from schemas import RegisterSchema
from pydantic import ValidationError
from schemas import ResponseSchema
from exception import AlreadyExistException
from helpers import make_response
class UserController:
@ -16,26 +17,17 @@ class UserController:
request_data = request.get_json()
register_data = RegisterSchema(**request_data)
self.user_service.register_user(register_data)
return jsonify(ResponseSchema(message="Register Success").model_dump()), 200
return make_response("Register Success")
except ValidationError as e:
current_app.logger.error(f"Validation error: {e}")
response = ResponseSchema(message="Invalid input", data=None, meta=None)
return jsonify(response.model_dump()), 400
return make_response("Invalid input", status_code=400)
except AlreadyExistException as e:
return (
jsonify(
ResponseSchema(message=str(e), data=None, meta=None).model_dump()
),
409,
)
return make_response("User already exists", status_code=409)
except Exception as e:
current_app.logger.error(
f"Error during Google login: {str(e)}", exc_info=True
)
response = ResponseSchema(
message="Internal server error", data=None, meta=None
)
return jsonify(response.model_dump()), 500
return make_response("Internal server error", status_code=500)

View File

@ -1,9 +1,7 @@
from dependency_injector import containers, providers
from controllers import UserController
from repositories.user_repository import UserRepository
from services import UserService, AuthService
from controllers import AuthController
from flask_pymongo import PyMongo
from controllers import UserController, AuthController, QuizController
from repositories import UserRepository, QuizRepository, UserAnswerRepository
from services import UserService, AuthService, QuizService, AnswerService
class Container(containers.DeclarativeContainer):
@ -13,11 +11,16 @@ class Container(containers.DeclarativeContainer):
# repository
user_repository = providers.Factory(UserRepository, mongo.provided.db)
quiz_repository = providers.Factory(QuizRepository, mongo.provided.db)
answer_repository = providers.Factory(UserAnswerRepository, mongo.provided.db)
# services
auth_service = providers.Factory(AuthService, user_repository)
user_service = providers.Factory(UserService, user_repository)
quiz_service = providers.Factory(QuizService, quiz_repository)
answer_service = providers.Factory(AnswerService, answer_repository)
# controllers
auth_controller = providers.Factory(AuthController, user_service, auth_service)
user_controller = providers.Factory(UserController, user_service)
quiz_controller = providers.Factory(QuizController, quiz_service, answer_service)

View File

@ -1,2 +1,10 @@
from .auth_exception import AuthException
from .already_exist_exception import AlreadyExistException
from .data_not_found_exception import DataNotFoundException
__all__ = [
"AuthException",
"AlreadyExistException",
"DataNotFoundException",
]

View File

@ -0,0 +1,8 @@
from .base_exception import BaseExceptionTemplate
class DataNotFoundException(BaseExceptionTemplate):
"""Exception for data not found"""
def __init__(self, message: str = "Data Not Found"):
super().__init__(message, status_code=404)

4
app/helpers/__init__.py Normal file
View File

@ -0,0 +1,4 @@
from .response_helper import make_response, make_error_response
__all__ = ["make_response", "make_error_response"]

View File

@ -0,0 +1,27 @@
from flask import jsonify, current_app
from typing import Optional, Union
from schemas import ResponseSchema
def make_response(
message: str,
data: Optional[dict] = None,
meta: Optional[dict] = None,
status_code: int = 200,
):
response = ResponseSchema(message=message, data=data, meta=meta)
return jsonify(response.model_dump()), status_code
def make_error_response(
err: Union[Exception, str],
log_message: Optional[str] = None,
status_code: int = 500,
):
"""Logs the error and returns a standardized error response"""
error_message = str(err) if isinstance(err, Exception) else err
log_msg = log_message or f"An error occurred: {error_message}"
current_app.logger.error(log_msg, exc_info=True)
response = ResponseSchema(message="Internal server error", data=None, meta=None)
return jsonify(response.model_dump()), status_code

View File

@ -1,13 +1,17 @@
from blueprints import default_blueprint
from di_container import Container
from configs import Config, LoggerConfig
from flask import Flask
from blueprints import auth_blueprint, user_blueprint
from blueprints import auth_blueprint, user_blueprint, quiz_bp, default_blueprint
from database import init_db
import logging
def createApp() -> Flask:
app = Flask(__name__)
logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
app.config.from_object(Config)
LoggerConfig.init_logger(app)
@ -19,13 +23,26 @@ def createApp() -> Flask:
if mongo is not None:
container.mongo.override(mongo)
container.wire(modules=["blueprints.auth"])
container.wire(modules=["blueprints.user"])
# container.wire(modules=["blueprints.auth"])
# container.wire(modules=["blueprints.user"])
# container.wire(modules=["blueprints.quiz"])
container.wire(
modules=[
"blueprints.auth",
"blueprints.user",
"blueprints.quiz",
]
)
# Register Blueprints
app.register_blueprint(default_blueprint)
app.register_blueprint(auth_blueprint, url_prefix="/api")
app.register_blueprint(user_blueprint, url_prefix="/api")
app.register_blueprint(quiz_bp, url_prefix="/api/quiz")
for rule in app.url_map.iter_rules():
print(f"Route: {rule} -> Methods: {rule.methods}")
return app

View File

@ -1 +1,8 @@
from .user_mapper import UserMapper
from .quiz_mapper import map_quiz_entity_to_schema
__all__ = [
"UserMapper",
"map_quiz_entity_to_schema",
]

26
app/mapper/quiz_mapper.py Normal file
View File

@ -0,0 +1,26 @@
from models import QuizEntity, QuestionItemEntity
from schemas import QuizGetSchema, QuestionItemSchema
def map_question_entity_to_schema(entity: QuestionItemEntity) -> QuestionItemSchema:
return QuestionItemSchema(
question=entity.question,
target_answer=entity.target_answer,
duration=entity.duration,
type=entity.type,
)
def map_quiz_entity_to_schema(entity: QuizEntity) -> QuizGetSchema:
return QuizGetSchema(
author_id=entity.author_id,
title=entity.title,
description=entity.description,
is_public=entity.is_public,
date=entity.date.strftime("%Y-%m-%d %H:%M:%S") if entity.date else None,
total_quiz=entity.total_quiz or 0,
limit_duration=entity.limit_duration or 0,
question_listings=[
map_question_entity_to_schema(q) for q in entity.question_listings or []
],
)

View File

@ -1,6 +1,6 @@
from datetime import datetime
from typing import Dict, Optional
from models import UserEntity
from models import UserEntity, UserResponseModel
from schemas import RegisterSchema
@ -39,3 +39,18 @@ class UserMapper:
updated_at=datetime.now(),
verification_token=None,
)
@staticmethod
def user_entity_to_response(user: UserEntity) -> UserResponseModel:
return UserResponseModel(
id=str(user.id) if user.id else None,
google_id=user.google_id,
email=user.email,
name=user.name,
birth_date=user.birth_date,
pic_url=user.pic_url,
phone=user.phone,
locale=user.locale,
# created_at=user.created_at,
# updated_at=user.updated_at,
)

View File

@ -1,4 +1,13 @@
# app/models/__init__.py
from .entities import UserEntity
from .entities import UserEntity, QuizEntity, QuestionItemEntity, UserAnswerEntity
from .login import UserResponseModel
__all__ = ["UserEntity", "UserDTO"]
__all__ = [
"UserEntity",
"UserDTO",
"UserResponseModel",
"QuizEntity",
"QuestionItemEntity",
"UserAnswerEntity",
]

View File

@ -1,7 +1,13 @@
from .user_entity import UserEntity
from .base import PyObjectId
from .quiz_entity import QuizEntity
from .question_item_entity import QuestionItemEntity
from .user_answer_entity import UserAnswerEntity
__all__ = [
"UserEntity",
"PyObjectId",
"QuizEntity",
"QuestionItemEntity",
"UserAnswerEntity",
]

View File

@ -0,0 +1,11 @@
from pydantic import BaseModel
class AnswerItemEntity(BaseModel):
question_index: int
question: str
answer: str
correct_answer: str
is_correct: bool
duration: int
time_spent: float

View File

@ -1,19 +1,29 @@
from bson import ObjectId
from pydantic import GetCoreSchemaHandler
from pydantic_core import core_schema
class PyObjectId(ObjectId):
"""Custom ObjectId type for Pydantic to handle MongoDB _id"""
"""Custom ObjectId type for Pydantic v2 to handle MongoDB _id"""
@classmethod
def __get_validators__(cls):
yield cls.validate
def __get_pydantic_core_schema__(
cls, source, handler: GetCoreSchemaHandler
) -> core_schema.CoreSchema:
return core_schema.no_info_after_validator_function(
cls.validate,
core_schema.union_schema(
[
core_schema.str_schema(),
core_schema.is_instance_schema(ObjectId),
]
),
)
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError("Invalid ObjectId")
if isinstance(v, ObjectId):
return v
if isinstance(v, str) and ObjectId.is_valid(v):
return ObjectId(v)
@classmethod
def __modify_schema__(cls, field_schema):
field_schema.update(type="string")
raise ValueError(f"Invalid ObjectId: {v}")

View File

@ -0,0 +1,12 @@
from typing import Optional, List
from pydantic import BaseModel
from datetime import datetime
from .base import PyObjectId
class QuestionItemEntity(BaseModel):
_id: Optional[PyObjectId] = None
question: str
target_answer: str
duration: int
type: str # "isian" | "true_false"

View File

@ -0,0 +1,21 @@
from typing import Optional
from pydantic import BaseModel
from datetime import datetime
from .base import PyObjectId
from .question_item_entity import QuestionItemEntity
class QuizEntity(BaseModel):
_id: Optional[PyObjectId] = None
author_id: Optional[str] = None
title: str
description: Optional[str] = None
is_public: bool = False
date: Optional[datetime] = None
total_quiz: Optional[int] = 0
limit_duration: Optional[int] = 0
question_listings: Optional[list[QuestionItemEntity]] = []
class Config:
arbitrary_types_allowed = True
json_encoders = {PyObjectId: str}

View File

@ -0,0 +1,24 @@
from typing import List, Optional
from pydantic import BaseModel, Field
from datetime import datetime
from bson import ObjectId
from .answer_item import AnswerItemEntity
from .base import PyObjectId
class UserAnswerEntity(BaseModel):
_id: Optional[PyObjectId] = None
session_id: Optional[PyObjectId]
quiz_id: PyObjectId
user_id: str
answered_at: datetime
answers: List[AnswerItemEntity]
total_score: int
total_correct: int
total_questions: int
class Config:
populate_by_name = True
arbitrary_types_allowed = True
json_encoders = {ObjectId: str}

View File

@ -1,13 +1,13 @@
from typing import Optional
from pydantic import BaseModel, EmailStr
from pydantic import BaseModel, Field
from datetime import datetime
from .base import PyObjectId
class UserEntity(BaseModel):
_id: Optional[PyObjectId] = None
id: Optional[PyObjectId] = Field(default=None, alias="_id")
google_id: Optional[str] = None
email: EmailStr
email: str
password: Optional[str] = None
name: str
birth_date: Optional[datetime] = None
@ -16,3 +16,7 @@ class UserEntity(BaseModel):
locale: str = "en-US"
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
populate_by_name = True
json_encoders = {PyObjectId: str}

View File

@ -0,0 +1 @@
from .login_response import UserResponseModel

View File

@ -0,0 +1,20 @@
from pydantic import BaseModel, EmailStr, Field
from typing import Optional
from datetime import datetime
class UserResponseModel(BaseModel):
id: Optional[str] = Field(alias="_id")
google_id: Optional[str] = None
email: EmailStr
name: str
birth_date: Optional[datetime] = None
pic_url: Optional[str] = None
phone: Optional[str] = None
locale: str
class Config:
populate_by_name = True
json_encoders = {
datetime: lambda v: v.isoformat(),
}

View File

@ -1 +1,9 @@
from .user_repository import UserRepository
from .quiz_repositroy import QuizRepository
from .answer_repository import UserAnswerRepository
__all__ = [
"UserRepository",
"QuizRepository",
"UserAnswerRepository",
]

View File

@ -0,0 +1,32 @@
from pymongo.collection import Collection
from bson import ObjectId
from typing import Optional, List
from models import UserAnswerEntity
class UserAnswerRepository:
def __init__(self, db):
self.collection: Collection = db.user_answers
def create(self, answer_session: UserAnswerEntity) -> str:
data = answer_session.model_dump(by_alias=True)
result = self.collection.insert_one(data)
return str(result.inserted_id)
def get_by_id(self, id: str) -> Optional[dict]:
result = self.collection.find_one({"_id": ObjectId(id)})
return result
def get_by_user_and_quiz(self, user_id: str, quiz_id: str) -> List[dict]:
result = self.collection.find(
{"user_id": user_id, "quiz_id": ObjectId(quiz_id)}
)
return list(result)
def get_by_session(self, session_id: str) -> List[dict]:
result = self.collection.find({"session_id": ObjectId(session_id)})
return list(result)
def delete_by_id(self, id: str) -> bool:
result = self.collection.delete_one({"_id": ObjectId(id)})
return result.deleted_count > 0

View File

@ -0,0 +1,45 @@
from bson import ObjectId
from typing import List, Optional
from models import QuizEntity
from pymongo.database import Database
class QuizRepository:
def __init__(self, db: Database):
self.collection = db.quiz
def create(self, quiz: QuizEntity) -> str:
quiz_dict = quiz.dict(by_alias=True, exclude_none=True)
result = self.collection.insert_one(quiz_dict)
return str(result.inserted_id)
def get_by_id(self, quiz_id: str) -> Optional[QuizEntity]:
data = self.collection.find_one({"_id": ObjectId(quiz_id)})
if data:
return QuizEntity(**data)
return None
def get_by_user_id(
self, user_id: str, page: int = 1, page_size: int = 10
) -> List[QuizEntity]:
skip = (page - 1) * page_size
cursor = (
self.collection.find({"user_id": ObjectId(user_id)})
.skip(skip)
.limit(page_size)
)
return [QuizEntity(**doc) for doc in cursor]
def get_all(self, skip: int = 0, limit: int = 10) -> List[QuizEntity]:
cursor = self.collection.find().skip(skip).limit(limit)
return [QuizEntity(**doc) for doc in cursor]
def update(self, quiz_id: str, update_data: dict) -> bool:
result = self.collection.update_one(
{"_id": ObjectId(quiz_id)}, {"$set": update_data}
)
return result.modified_count > 0
def delete(self, quiz_id: str) -> bool:
result = self.collection.delete_one({"_id": ObjectId(quiz_id)})
return result.deleted_count > 0

View File

@ -4,44 +4,43 @@ from models import UserEntity
class UserRepository:
def __init__(self, db):
self.collection = db.users
def get_all_users(self) -> list[UserEntity]:
"""Mengambil semua user dari database."""
"""Retrieve all users from the database."""
users = list(self.collection.find({}, {"_id": 0}))
return [UserEntity(**user) for user in users]
def get_user_by_email(self, email: str) -> Optional[UserEntity]:
"""Mendapatkan user berdasarkan email."""
user = self.collection.find_one({"email": email}, {"_id": 0})
"""Retrieve a user based on their email address."""
user = self.collection.find_one({"email": email})
return UserEntity(**user) if user else None
def get_user_by_id(self, user_id: str) -> Optional[UserEntity]:
"""Mendapatkan user berdasarkan ID."""
"""Retrieve a user based on their ID."""
object_id = ObjectId(user_id)
user = self.collection.find_one({"_id": object_id})
return UserEntity(**user) if user else None
def get_by_google_id(self, google_id: str) -> Optional[UserEntity]:
"""Retrieve a user based on their Google ID."""
user_data = self.collection.find_one({"google_id": google_id})
return UserEntity(**user_data) if user_data else None
def insert_user(self, user_data: UserEntity) -> str:
"""Menambahkan pengguna baru ke dalam database dan mengembalikan ID pengguna."""
"""Insert a new user into the database and return the user's ID."""
result = self.collection.insert_one(user_data.model_dump())
return str(result.inserted_id)
def update_user(self, user_id: str, update_data: dict) -> bool:
"""Mengupdate seluruh data user berdasarkan ID."""
"""Update all fields of a user based on their ID."""
object_id = ObjectId(user_id)
result = self.collection.update_one({"_id": object_id}, {"$set": update_data})
return result.modified_count > 0
def update_user_field(self, user_id: str, field: str, value) -> bool:
"""Mengupdate satu field dari user berdasarkan ID."""
"""Update a single field of a user based on their ID."""
object_id = ObjectId(user_id)
result = self.collection.update_one(
{"_id": object_id}, {"$set": {field: value}}
@ -49,7 +48,7 @@ class UserRepository:
return result.modified_count > 0
def delete_user(self, user_id: str) -> bool:
"""Menghapus user berdasarkan ID."""
"""Delete a user based on their ID."""
object_id = ObjectId(user_id)
result = self.collection.delete_one({"_id": object_id})
return result.deleted_count > 0

View File

@ -1,3 +1,15 @@
from .login_schema import LoginSchema
from .basic_response_schema import ResponseSchema, MetaSchema
from .requests import RegisterSchema
from .response import QuizCreationResponse, QuizGetSchema, QuestionItemSchema
__all__ = [
"LoginSchema",
"ResponseSchema",
"MetaSchema",
"RegisterSchema",
"QuizCreationResponse",
"QuizGetSchema",
"QuestionItemSchema",
]

View File

@ -1 +1,18 @@
from .register_schema import RegisterSchema
from .quiz import (
QuestionItemSchema,
QuizCreateSchema,
)
from .answer.answer_request_schema import UserAnswerSchema
from .answer.answer_item_request_schema import AnswerItemSchema
__all__ = [
"RegisterSchema",
"QuestionItemSchema",
"QuizCreateSchema",
"UserAnswerSchema",
"AnswerItemSchema",
]

View File

@ -0,0 +1,11 @@
from pydantic import BaseModel
class AnswerItemSchema(BaseModel):
question_index: int
question: str
answer: str
correct_answer: str
is_correct: bool
duration: int
time_spent: float

View File

@ -0,0 +1,15 @@
from typing import List, Optional
from pydantic import BaseModel, Field
from datetime import datetime
from .answer_item_request_schema import AnswerItemSchema
class UserAnswerSchema(BaseModel):
session_id: Optional[str] = None
quiz_id: str
user_id: str
answered_at: datetime
total_score: int
total_correct: int
total_questions: int
answers: List[AnswerItemSchema]

View File

@ -0,0 +1,7 @@
from .quiz_item_schema import QuestionItemSchema
from .create_quiz_schema import QuizCreateSchema
__all__ = [
"QuestionItemSchema",
"QuizCreateSchema",
]

View File

@ -0,0 +1,15 @@
from typing import List, Optional
from pydantic import BaseModel, Field
from datetime import datetime
from .quiz_item_schema import QuestionItemSchema
class QuizCreateSchema(BaseModel):
title: str
description: Optional[str] = None
is_public: bool = False
date: Optional[str] = None
total_quiz: Optional[int] = 0
limit_duration: Optional[int] = 0
author_id: Optional[str] = None
question_listings: Optional[List[QuestionItemSchema]] = []

View File

@ -0,0 +1,10 @@
from typing import List, Optional
from pydantic import BaseModel
class QuestionItemSchema(BaseModel):
question: str
target_answer: str
duration: int
type: str
options: Optional[List[str]] = None

View File

@ -0,0 +1,9 @@
from .quiz.quiz_creation_response import QuizCreationResponse
from .quiz.quiz_get_response import QuizGetSchema
from .quiz.question_item_schema import QuestionItemSchema
__all__ = [
"QuizCreationResponse",
"QuizGetSchema",
"QuestionItemSchema",
]

View File

@ -0,0 +1,8 @@
from pydantic import BaseModel
class QuestionItemSchema(BaseModel):
question: str
target_answer: str
duration: int
type: str

View File

@ -0,0 +1,5 @@
from pydantic import BaseModel
class QuizCreationResponse(BaseModel):
quiz_id: str

View File

@ -0,0 +1,15 @@
from typing import List, Optional
from pydantic import BaseModel
from datetime import datetime
from .question_item_schema import QuestionItemSchema
class QuizGetSchema(BaseModel):
author_id: str
title: str
description: Optional[str] = None
is_public: bool = False
date: Optional[str] = None
total_quiz: int = 0
limit_duration: int = 0
question_listings: List[QuestionItemSchema] = []

View File

@ -1,2 +1,12 @@
from .auth_service import AuthService
from .user_service import UserService
from .quiz_service import QuizService
from .answer_service import AnswerService
__all__ = [
"AuthService",
"UserService",
"QuizService",
"AnswerService",
]

View File

@ -0,0 +1,22 @@
from repositories import UserAnswerRepository
class AnswerService:
def __init__(self, answer_repository: UserAnswerRepository):
self.answer_repository = answer_repository
def get_answer_by_id(self, answer_id):
return self.answer_repository.get_answer_by_id(answer_id)
def get_answer(self, quiz_id, user_id, session_id):
if quiz_id is not None:
return self.answer_repository
def create_answer(self, answer_data):
return self.answer_repository.create(answer_data)
def update_answer(self, answer_id, answer_data):
return self.answer_repository.update(answer_id, answer_data)
def delete_answer(self, answer_id):
return self.answer_repository.delete_by_id(answer_id)

View File

@ -5,7 +5,7 @@ from google.oauth2 import id_token
from google.auth.transport import requests
from configs import Config
from exception import AuthException
from flask import current_app
from werkzeug.security import check_password_hash
class AuthService:
@ -36,13 +36,12 @@ class AuthService:
return self.user_repository.get_user_by_id(user_id=user_id)
def login(self, data: LoginSchema):
user_data = self.user_repository.get_user_by_email(data.email)
if user_data == None:
if user_data is None:
return None
if user_data.password == data.password:
del user_data.password
if check_password_hash(user_data.password, data.password):
user_data.password = None
return user_data
return None

View File

@ -1,53 +0,0 @@
import numpy as np
class LSTM:
def __init__(self, input_dim, hidden_dim):
self.input_dim = input_dim
self.hidden_dim = hidden_dim
self.Wf = np.random.randn(hidden_dim, input_dim + hidden_dim) * 0.01
self.Wi = np.random.randn(hidden_dim, input_dim + hidden_dim) * 0.01
self.Wc = np.random.randn(hidden_dim, input_dim + hidden_dim) * 0.01
self.Wo = np.random.randn(hidden_dim, input_dim + hidden_dim) * 0.01
self.bf = np.zeros((hidden_dim, 1))
self.bi = np.zeros((hidden_dim, 1))
self.bc = np.zeros((hidden_dim, 1))
self.bo = np.zeros((hidden_dim, 1))
self.h = np.zeros((hidden_dim, 1))
self.c = np.zeros((hidden_dim, 1))
def sigmoid(self, x):
return 1 / (1 + np.exp(-x))
def tanh(self, x):
return np.tanh(x)
def forward(self, x_t):
combined = np.vstack((self.h, x_t))
f_t = self.sigmoid(np.dot(self.Wf, combined) + self.bf)
i_t = self.sigmoid(np.dot(self.Wi, combined) + self.bi)
C_tilde_t = self.tanh(np.dot(self.Wc, combined) + self.bc)
self.c = f_t * self.c + i_t * C_tilde_t
o_t = self.sigmoid(np.dot(self.Wo, combined) + self.bo)
self.h = o_t * self.tanh(self.c)
return self.h
def backward(self, x_t, h_t, y_t, learning_rate):
# Your backward pass implementation here
pass
def train(self, X, y, num_epochs, learning_rate):
for epoch in range(num_epochs):
for i in range(len(X)):
x_t = X[i]
y_t = y[i]
# Forward pass
h_t = self.forward(x_t)
# Calculate loss and perform backward pass
loss = np.mean((h_t - y_t) ** 2) # Example loss
self.backward(x_t, h_t, y_t, learning_rate)
if i % 100 == 0: # Print loss every 100 samples
print(f"Epoch {epoch}, Sample {i}, Loss: {loss}")

View File

@ -0,0 +1,61 @@
# from keras.models import load_model
# from tensorflow.keras.preprocessing.sequence import pad_sequences
# from nummpy import nps
# import pickle
# class LSTMService:
# def predict(self, input_data, maxlen=50):
# with open("QC/tokenizers.pkl", "rb") as f:
# tokenizers = pickle.load(f)
# model = load_model("QC/lstm_qg.keras")
# tok_token = tokenizers["token"]
# tok_ner = tokenizers["ner"]
# tok_srl = tokenizers["srl"]
# tok_q = tokenizers["question"]
# tok_a = tokenizers["answer"]
# tok_type = tokenizers["type"]
# # Prepare input
# tokens = input_data["tokens"]
# ner = input_data["ner"]
# srl = input_data["srl"]
# x_tok = pad_sequences(
# [tok_token.texts_to_sequences([tokens])[0]], maxlen=maxlen, padding="post"
# )
# x_ner = pad_sequences(
# [tok_ner.texts_to_sequences([ner])[0]], maxlen=maxlen, padding="post"
# )
# x_srl = pad_sequences(
# [tok_srl.texts_to_sequences([srl])[0]], maxlen=maxlen, padding="post"
# )
# # Predict
# pred_q, pred_a, pred_type = model.predict([x_tok, x_ner, x_srl])
# pred_q_ids = np.argmax(pred_q[0], axis=-1)
# pred_a_ids = np.argmax(pred_a[0], axis=-1)
# pred_type_id = np.argmax(pred_type[0])
# # Decode
# index2word_q = {v: k for k, v in tok_q.word_index.items()}
# index2word_a = {v: k for k, v in tok_a.word_index.items()}
# index2word_q[0] = "<PAD>"
# index2word_a[0] = "<PAD>"
# decoded_q = [index2word_q[i] for i in pred_q_ids if i != 0]
# decoded_a = [index2word_a[i] for i in pred_a_ids if i != 0]
# index2type = {v - 1: k for k, v in tok_type.word_index.items()}
# decoded_type = index2type.get(pred_type_id, "unknown")
# return {
# "question": " ".join(decoded_q),
# "answer": " ".join(decoded_a),
# "type": decoded_type,
# }

View File

@ -0,0 +1,39 @@
from typing import List
from repositories import QuizRepository
from schemas import QuizGetSchema
from exception import DataNotFoundException
from mapper import map_quiz_entity_to_schema
class QuizService:
def __init__(self, quiz_repository=QuizRepository):
self.quiz_repository = quiz_repository
def get_quiz(self, quiz_id) -> QuizGetSchema:
data = self.quiz_repository.get_by_id(quiz_id)
if data is None:
raise DataNotFoundException("Quiz not found")
return map_quiz_entity_to_schema(data)
def get_user_quiz(
self, user_id: str, page: int = 1, page_size: int = 10
) -> List[QuizGetSchema]:
quizzes = self.quiz_repository.get_by_user_id(user_id, page, page_size)
return [QuizGetSchema.model_validate(quiz) for quiz in quizzes]
def create_quiz(self, quiz_data):
return self.quiz_repository.create(quiz_data)
def update_quiz(self, quiz_id, quiz_data):
return self.quiz_repository.update(quiz_id, quiz_data)
def delete_quiz(self, quiz_id):
return self.quiz_repository.delete(quiz_id)
def quiz_recommendation(self):
data = self.quiz_repository
if data is None:
raise DataNotFoundException("Quiz not found")
return map_quiz_entity_to_schema(data)

View File

@ -3,6 +3,7 @@ from repositories import UserRepository
from schemas import RegisterSchema
from mapper import UserMapper
from exception import AlreadyExistException
from werkzeug.security import generate_password_hash
class UserService:
@ -17,5 +18,8 @@ class UserService:
if existData:
raise AlreadyExistException(entity="Email")
encrypted_password = generate_password_hash(user_data.password)
user_data.password = encrypted_password
data = UserMapper.from_register(user_data)
return self.user_repository.insert_user(data)

View File

@ -1,7 +1,9 @@
annotated-types==0.7.0
Authlib==1.5.1
bcrypt==4.3.0
bidict==0.23.1
blinker==1.9.0
cachetools==5.5.2
certifi==2025.1.31
cffi==1.17.1
charset-normalizer==3.4.1
@ -16,23 +18,40 @@ Flask-Bcrypt==1.0.1
Flask-JWT-Extended==4.7.1
Flask-Login==0.6.3
Flask-PyMongo==3.0.1
Flask-SocketIO==5.5.1
flask-swagger-ui==4.11.1
google-auth==2.38.0
google-auth-httplib2==0.2.0
google-auth-oauthlib==1.2.1
h11==0.14.0
httplib2==0.22.0
idna==3.10
iniconfig==2.0.0
itsdangerous==2.2.0
Jinja2==3.1.6
MarkupSafe==3.0.2
numpy==2.1.2
oauthlib==3.2.2
packaging==24.2
pluggy==1.5.0
pyasn1==0.6.1
pyasn1_modules==0.4.1
pycparser==2.22
pydantic==2.10.6
pydantic_core==2.27.2
PyJWT==2.10.1
pymongo==4.11.2
pyparsing==3.2.1
pytest==8.3.4
python-dotenv==1.0.1
python-engineio==4.11.2
python-socketio==5.12.1
requests==2.32.3
requests-oauthlib==2.0.0
rsa==4.9
simple-websocket==1.1.0
tomli==2.2.1
typing_extensions==4.12.2
urllib3==2.3.0
Werkzeug==3.1.3
wsproto==1.2.0