from app.repositories import UserRepository, UserAnswerRepository, QuizRepository from app.schemas import RegisterSchema from app.schemas.requests import ProfileUpdateSchema from app.schemas.response import UserResponseSchema from app.models.entities import UserAnswerEntity from app.mapper import UserMapper from app.exception import AlreadyExistException, DataNotFoundException from werkzeug.security import generate_password_hash, check_password_hash from app.helpers import DatetimeUtil class UserService: def __init__( self, user_repository: UserRepository, answer_repository: UserAnswerRepository, quiz_repository: QuizRepository, ): self.user_repository = user_repository self.answer_repository = answer_repository self.quiz_repository = quiz_repository def get_all_users(self): return self.user_repository.get_all_users() def register_user(self, user_data: RegisterSchema): existData = self.user_repository.get_user_by_email(user_data.email) if existData: raise AlreadyExistException(entity="Email") encrypted_password = generate_password_hash(user_data.password) user_data.password = encrypted_password data = UserMapper.from_register(user_data) return self.user_repository.insert_user(data) def update_profile(self, new_profile: ProfileUpdateSchema): user = self.user_repository.get_user_by_id(new_profile.id) if not user: raise DataNotFoundException(entity="User") update_data = {} if new_profile.name is not None: update_data["name"] = new_profile.name if new_profile.birth_date is not None: update_data["birth_date"] = DatetimeUtil.from_string( new_profile.birth_date, fmt="%d-%m-%Y" ) if new_profile.locale is not None: update_data["locale"] = new_profile.locale if new_profile.phone is not None: update_data["phone"] = new_profile.phone if not update_data: return True update_data["updated_at"] = DatetimeUtil.now_iso() return self.user_repository.update_user(new_profile.id, update_data) def change_password(self, user_id: str, current_password: str, new_password: str): user = self.user_repository.get_user_by_id(user_id) if not user: raise DataNotFoundException(entity="User") if not user.password or not check_password_hash( user.password, current_password ): raise ValueError("Current password is incorrect") encrypted_password = generate_password_hash(new_password) update_data = { "password": encrypted_password, "updated_at": DatetimeUtil.now_iso(), } return self.user_repository.update_user(user_id, update_data) def get_user_by_id(self, user_id: str): user = self.user_repository.get_user_by_id(user_id) if not user: raise DataNotFoundException(entity="User") user_dict = user.model_dump() if "password" in user_dict: del user_dict["password"] if "id" in user_dict: user_dict["id"] = str(user.id) if "birth_date" in user_dict and user_dict["birth_date"]: user_dict["birth_date"] = DatetimeUtil.to_string( user_dict["birth_date"], fmt="%d-%m-%Y" ) if "created_at" in user_dict and user_dict["created_at"]: user_dict["created_at"] = DatetimeUtil.to_string(user_dict["created_at"]) if "updated_at" in user_dict and user_dict["updated_at"]: user_dict["updated_at"] = DatetimeUtil.to_string(user_dict["updated_at"]) return UserResponseSchema(**user_dict) def get_user_status(self, user_id): user_answers: list[UserAnswerEntity] = self.answer_repository.get_by_user( user_id ) total_quiz = self.quiz_repository.count_by_user_id(user_id) if not user_answers: return None total_score = sum(answer.total_score for answer in user_answers) total_questions = len(user_answers) percentage = total_score / total_questions return { "avg_score": round(percentage, 2), "total_solve": total_questions, "total_quiz": total_quiz, }