TIF_E41211115_Genso_quiz_ba.../app/controllers/user_controller.py

130 lines
5.2 KiB
Python

# /controllers/user_controller.py
from flask import jsonify, request, current_app
from app.services import UserService
from app.schemas import RegisterSchema
from pydantic import ValidationError
from app.schemas import ResponseSchema
from app.exception import AlreadyExistException, DataNotFoundException
from app.helpers import make_response
from app.schemas.requests import ProfileUpdateSchema
class UserController:
def __init__(self, userService: UserService):
self.user_service = userService
def register(self):
try:
request_data = request.get_json()
register_data = RegisterSchema(**request_data)
self.user_service.register_user(register_data)
return make_response("Register Success")
except ValidationError as e:
current_app.logger.error(f"Validation error: {e}")
response = ResponseSchema(message="Invalid input", data=None, meta=None)
return make_response("Invalid input", status_code=400)
except AlreadyExistException as e:
return make_response("User already exists", status_code=409)
except Exception as e:
current_app.logger.error(
f"Error during Google login: {str(e)}", exc_info=True
)
return make_response("Internal server error", status_code=500)
def get_user_by_id(self, user_id):
try:
if not user_id:
return make_response("User ID is required", status_code=400)
user = self.user_service.get_user_by_id(user_id)
if user:
return make_response("User found", data=user)
else:
return make_response("User not found", status_code=404)
except Exception as e:
current_app.logger.error(
f"Error while retrieving user: {str(e)}", exc_info=True
)
return make_response(
message="An internal server error occurred. Please try again later.",
status_code=500,
)
def update_profile(self):
try:
body = request.get_json()
reqBody = ProfileUpdateSchema(**body)
result = self.user_service.update_profile(reqBody)
if result:
return make_response(message="User profile updated successfully.")
else:
return make_response(
message="Failed to update user profile. Please check the submitted data.",
status_code=400,
)
except DataNotFoundException as e:
return make_response(message="User data not found.", status_code=404)
except ValueError as e:
return make_response(
message=f"Invalid data provided: {str(e)}", status_code=400
)
except Exception as e:
current_app.logger.error(
f"Error while updating profile: {str(e)}", exc_info=True
)
return make_response(
message="An internal server error occurred. Please try again later.",
status_code=500,
)
def change_password(self):
try:
body = request.get_json()
user_id = body.get("id")
current_password = body.get("current_password")
new_password = body.get("new_password")
if not all([user_id, current_password, new_password]):
return make_response(
message="Missing required fields: id, current_password, new_password",
status_code=400,
)
result = self.user_service.change_password(
user_id, current_password, new_password
)
if result:
return make_response(message="Password changed successfully.")
else:
return make_response(
message="Failed to change password.",
status_code=400,
)
except DataNotFoundException as e:
return make_response(message="User data not found.", status_code=404)
except ValueError as e:
return make_response(message=f"{str(e)}", status_code=400)
except Exception as e:
current_app.logger.error(
f"Error while changing password: {str(e)}", exc_info=True
)
return make_response(
message="An internal server error occurred. Please try again later.",
status_code=500,
)
def user_stat(self, user_id):
try:
response = self.user_service.get_user_status(user_id)
return make_response(message="Success retrive user stat", data=response)
except Exception as e:
current_app.logger.error(
f"Error while changing password: {str(e)}", exc_info=True
)
return make_response(
message="An internal server error occurred. Please try again later.",
status_code=500,
)