TIF_E41211115_Genso_quiz_ba.../app/services/user_service.py

125 lines
4.4 KiB
Python

from app.repositories import UserRepository, UserAnswerRepository, QuizRepository
from app.schemas import RegisterSchema
from app.schemas.requests import ProfileUpdateSchema
from app.schemas.response import UserResponseSchema
from app.models.entities import UserAnswerEntity
from app.mapper import UserMapper
from app.exception import AlreadyExistException, DataNotFoundException
from werkzeug.security import generate_password_hash, check_password_hash
from app.helpers import DatetimeUtil
class UserService:
def __init__(
self,
user_repository: UserRepository,
answer_repository: UserAnswerRepository,
quiz_repository: QuizRepository,
):
self.user_repository = user_repository
self.answer_repository = answer_repository
self.quiz_repository = quiz_repository
def get_all_users(self):
return self.user_repository.get_all_users()
def register_user(self, user_data: RegisterSchema):
existData = self.user_repository.get_user_by_email(user_data.email)
if existData:
raise AlreadyExistException(entity="Email")
encrypted_password = generate_password_hash(user_data.password)
user_data.password = encrypted_password
data = UserMapper.from_register(user_data)
return self.user_repository.insert_user(data)
def update_profile(self, new_profile: ProfileUpdateSchema):
user = self.user_repository.get_user_by_id(new_profile.id)
if not user:
raise DataNotFoundException(entity="User")
update_data = {}
if new_profile.name is not None:
update_data["name"] = new_profile.name
if new_profile.birth_date is not None:
update_data["birth_date"] = DatetimeUtil.from_string(
new_profile.birth_date, fmt="%d-%m-%Y"
)
if new_profile.locale is not None:
update_data["locale"] = new_profile.locale
if new_profile.phone is not None:
update_data["phone"] = new_profile.phone
if not update_data:
return True
update_data["updated_at"] = DatetimeUtil.now_iso()
return self.user_repository.update_user(new_profile.id, update_data)
def change_password(self, user_id: str, current_password: str, new_password: str):
user = self.user_repository.get_user_by_id(user_id)
if not user:
raise DataNotFoundException(entity="User")
if not user.password or not check_password_hash(
user.password, current_password
):
raise ValueError("Current password is incorrect")
encrypted_password = generate_password_hash(new_password)
update_data = {
"password": encrypted_password,
"updated_at": DatetimeUtil.now_iso(),
}
return self.user_repository.update_user(user_id, update_data)
def get_user_by_id(self, user_id: str):
user = self.user_repository.get_user_by_id(user_id)
if not user:
raise DataNotFoundException(entity="User")
user_dict = user.model_dump()
if "password" in user_dict:
del user_dict["password"]
if "id" in user_dict:
user_dict["id"] = str(user.id)
if "birth_date" in user_dict and user_dict["birth_date"]:
user_dict["birth_date"] = DatetimeUtil.to_string(
user_dict["birth_date"], fmt="%d-%m-%Y"
)
if "created_at" in user_dict and user_dict["created_at"]:
user_dict["created_at"] = DatetimeUtil.to_string(user_dict["created_at"])
if "updated_at" in user_dict and user_dict["updated_at"]:
user_dict["updated_at"] = DatetimeUtil.to_string(user_dict["updated_at"])
return UserResponseSchema(**user_dict)
def get_user_status(self, user_id):
user_answers: list[UserAnswerEntity] = self.answer_repository.get_by_user(
user_id
)
total_quiz = self.quiz_repository.count_by_user_id(user_id)
if not user_answers:
return None
total_score = sum(answer.total_score for answer in user_answers)
total_questions = len(user_answers)
percentage = total_score / total_questions
return {
"avg_score": round(percentage, 2),
"total_solve": total_questions,
"total_quiz": total_quiz,
}