97 lines
3.6 KiB
Python
97 lines
3.6 KiB
Python
from datetime import datetime
|
|
from app.repositories import UserRepository
|
|
from app.schemas import RegisterSchema
|
|
from app.schemas.requests import ProfileUpdateSchema
|
|
from app.schemas.response import UserResponseSchema
|
|
from app.mapper import UserMapper
|
|
from app.exception import AlreadyExistException, DataNotFoundException
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from app.helpers import DatetimeUtil
|
|
|
|
|
|
class UserService:
|
|
def __init__(self, user_repository: UserRepository):
|
|
self.user_repository = user_repository
|
|
|
|
def get_all_users(self):
|
|
return self.user_repository.get_all_users()
|
|
|
|
def register_user(self, user_data: RegisterSchema):
|
|
existData = self.user_repository.get_user_by_email(user_data.email)
|
|
if existData:
|
|
raise AlreadyExistException(entity="Email")
|
|
|
|
encrypted_password = generate_password_hash(user_data.password)
|
|
user_data.password = encrypted_password
|
|
|
|
data = UserMapper.from_register(user_data)
|
|
return self.user_repository.insert_user(data)
|
|
|
|
def update_profile(self, new_profile: ProfileUpdateSchema):
|
|
user = self.user_repository.get_user_by_id(new_profile.id)
|
|
if not user:
|
|
raise DataNotFoundException(entity="User")
|
|
|
|
update_data = {}
|
|
if new_profile.name is not None:
|
|
update_data["name"] = new_profile.name
|
|
if new_profile.birth_date is not None:
|
|
update_data["birth_date"] = DatetimeUtil.from_string(
|
|
new_profile.birth_date, fmt="%d-%m-%Y"
|
|
)
|
|
if new_profile.locale is not None:
|
|
update_data["locale"] = new_profile.locale
|
|
if new_profile.phone is not None:
|
|
update_data["phone"] = new_profile.phone
|
|
|
|
if not update_data:
|
|
return True
|
|
|
|
update_data["updated_at"] = DatetimeUtil.now_iso()
|
|
|
|
return self.user_repository.update_user(new_profile.id, update_data)
|
|
|
|
def change_password(self, user_id: str, current_password: str, new_password: str):
|
|
|
|
user = self.user_repository.get_user_by_id(user_id)
|
|
if not user:
|
|
raise DataNotFoundException(entity="User")
|
|
|
|
if not user.password or not check_password_hash(
|
|
user.password, current_password
|
|
):
|
|
raise ValueError("Current password is incorrect")
|
|
|
|
encrypted_password = generate_password_hash(new_password)
|
|
|
|
update_data = {
|
|
"password": encrypted_password,
|
|
"updated_at": DatetimeUtil.now_iso(),
|
|
}
|
|
return self.user_repository.update_user(user_id, update_data)
|
|
|
|
def get_user_by_id(self, user_id: str):
|
|
user = self.user_repository.get_user_by_id(user_id)
|
|
if not user:
|
|
raise DataNotFoundException(entity="User")
|
|
user_dict = user.model_dump()
|
|
|
|
if "password" in user_dict:
|
|
del user_dict["password"]
|
|
|
|
if "id" in user_dict:
|
|
user_dict["id"] = str(user.id)
|
|
|
|
if "birth_date" in user_dict and user_dict["birth_date"]:
|
|
user_dict["birth_date"] = DatetimeUtil.to_string(
|
|
user_dict["birth_date"], fmt="%d-%m-%Y"
|
|
)
|
|
|
|
if "created_at" in user_dict and user_dict["created_at"]:
|
|
user_dict["created_at"] = DatetimeUtil.to_string(user_dict["created_at"])
|
|
|
|
if "updated_at" in user_dict and user_dict["updated_at"]:
|
|
user_dict["updated_at"] = DatetimeUtil.to_string(user_dict["updated_at"])
|
|
|
|
return UserResponseSchema(**user_dict)
|