Merge pull request #2 from Akhdanre/feat/auth

Feat/auth
This commit is contained in:
Akhdan Robbani 2025-03-20 18:48:14 +07:00 committed by GitHub
commit cf7091f83d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 367 additions and 106 deletions

3
.gitignore vendored
View File

@ -9,3 +9,6 @@ app/**/*.pyo
.env .env
.venv .venv
logs/

View File

@ -1,5 +1,6 @@
from .default import default_blueprint from .default import default_blueprint
from .auth import auth_blueprint from .auth import auth_blueprint
from .user import user_blueprint
# from .user import user_blueprint # from .user import user_blueprint

View File

@ -1,4 +1,3 @@
import sys
from flask import Blueprint from flask import Blueprint
from controllers import AuthController from controllers import AuthController
from di_container import Container from di_container import Container
@ -8,12 +7,6 @@ from dependency_injector.wiring import inject, Provide
auth_blueprint = Blueprint("auth", __name__) auth_blueprint = Blueprint("auth", __name__)
@auth_blueprint.route("/register", methods=["POST"])
@inject
def register(auth_controller: AuthController = Provide[Container.auth_controller]):
return auth_controller.register()
@auth_blueprint.route("/login", methods=["POST"]) @auth_blueprint.route("/login", methods=["POST"])
@inject @inject
def login(auth_controller: AuthController = Provide[Container.auth_controller]): def login(auth_controller: AuthController = Provide[Container.auth_controller]):

View File

@ -1,12 +1,18 @@
from flask import Blueprint from flask import Blueprint
from controllers import UserController from controllers import UserController
from di_container import container from di_container import Container
from dependency_injector.wiring import inject, Provide
user_blueprint = Blueprint("user", __name__) user_blueprint = Blueprint("user", __name__)
user_controller = UserController(container.user_service)
@user_blueprint.route("/users", methods=["GET"]) @user_blueprint.route("/users", methods=["GET"])
def get_users(): @inject
def get_users(user_controller: UserController = Provide[Container.user_controller]):
return user_controller.get_users() return user_controller.get_users()
@user_blueprint.route("/register", methods=["POST"])
@inject
def register(user_controller: UserController = Provide[Container.user_controller]):
return user_controller.register()

View File

@ -1 +1,2 @@
from .config import Config from .config import Config
from .logger_config import LoggerConfig

View File

@ -0,0 +1,54 @@
import logging
import os
from logging.handlers import RotatingFileHandler
class LoggerConfig:
"""A class to configure logging for the Flask application."""
LOG_DIR = "logs" # Define the log directory
@staticmethod
def init_logger(app):
"""Initializes separate log files for INFO, ERROR, and WARNING levels."""
# Ensure the logs directory exists
if not os.path.exists(LoggerConfig.LOG_DIR):
os.makedirs(LoggerConfig.LOG_DIR)
# Remove default handlers to prevent duplicate logging
for handler in app.logger.handlers[:]:
app.logger.removeHandler(handler)
# Create separate loggers
info_logger = LoggerConfig._setup_logger(
"info_logger", "info.log", logging.INFO, logging.WARNING
)
error_logger = LoggerConfig._setup_logger(
"error_logger", "error.log", logging.ERROR, logging.CRITICAL
)
warning_logger = LoggerConfig._setup_logger(
"warning_logger", "warning.log", logging.WARNING, logging.WARNING
)
# Attach handlers to Flask app logger
app.logger.addHandler(info_logger)
app.logger.addHandler(error_logger)
app.logger.addHandler(warning_logger)
app.logger.setLevel(logging.DEBUG) # Set lowest level to capture all logs
app.logger.info("Logger has been initialized for Flask application.")
@staticmethod
def _setup_logger(name, filename, level, max_level):
"""Helper method to configure loggers for specific levels."""
logger = logging.getLogger(name)
logger.setLevel(level)
log_file_path = os.path.join(LoggerConfig.LOG_DIR, filename)
log_handler = RotatingFileHandler(log_file_path, maxBytes=100000, backupCount=3)
log_handler.setLevel(level)
log_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
log_handler.setFormatter(log_formatter)
log_handler.addFilter(lambda record: level <= record.levelno <= max_level)
logger.addHandler(log_handler)
return log_handler

View File

@ -1,14 +1,10 @@
import logging from flask import jsonify, request, current_app
import sys
from flask import jsonify, request
from pydantic import ValidationError from pydantic import ValidationError
from app.schemas.basic_response_schema import ResponseSchema from schemas.basic_response_schema import ResponseSchema
from app.schemas.google_login_schema import GoogleLoginSchema from schemas.google_login_schema import GoogleLoginSchema
from schemas import LoginSchema from schemas import LoginSchema
from services import UserService, AuthService from services import UserService, AuthService
from exception import AuthException
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AuthController: class AuthController:
@ -17,13 +13,31 @@ class AuthController:
self.auth_service = authService self.auth_service = authService
def login(self): def login(self):
try:
data = request.get_json() data = request.get_json()
dataSchema = LoginSchema(**data) dataSchema = LoginSchema(**data)
response = self.auth_service.login(dataSchema) response = self.auth_service.login(dataSchema)
return (
jsonify(
ResponseSchema(
message="Register success", data=response
).model_dump()
),
200,
)
except ValidationError as e:
current_app.logger.error(f"Validation error: {e}")
response = ResponseSchema(message="Invalid input", data=None, meta=None)
return jsonify(response.model_dump()), 400
if response.success: except Exception as e:
return jsonify(response.to_dict()), 200 current_app.logger.error(
return jsonify(response.to_dict()), 400 f"Error during Google login: {str(e)}", exc_info=True
)
response = ResponseSchema(
message="Internal server error", data=None, meta=None
)
return jsonify(response.model_dump()), 500
def google_login(self): def google_login(self):
"""Handles Google Login via ID Token verification""" """Handles Google Login via ID Token verification"""
@ -32,42 +46,42 @@ class AuthController:
# Validasi data dengan Pydantic # Validasi data dengan Pydantic
validated_data = GoogleLoginSchema(**data) validated_data = GoogleLoginSchema(**data)
id_token = validated_data.id_token id_token = validated_data.token_id
# Verifikasi ID Token ke layanan AuthService # Verifikasi ID Token ke layanan AuthService
user_info = self.auth_service.verify_google_id_token(id_token) user_info = self.auth_service.verify_google_id_token(id_token)
if not user_info: if not user_info:
logger.error("Invalid Google ID Token") current_app.logger.error("Invalid Google ID Token")
response = ResponseSchema( response = ResponseSchema(
message="Invalid Google ID Token", data=None, meta=None message="Invalid Google ID Token", data=None, meta=None
) )
return jsonify(response.model_dump()), 401 return jsonify(response.model_dump()), 401
# Jika berhasil, kembalikan data user tanpa meta
response = ResponseSchema( response = ResponseSchema(
message="Login successful", message="Login successful",
data=user_info, data=user_info,
meta=None, # Karena ini single data, tidak ada meta meta=None,
) )
return jsonify(response.model_dump()), 200 return jsonify(response.model_dump()), 200
except ValidationError as e: except ValidationError as e:
logger.error(f"Validation error: {e}") current_app.logger.error(f"Validation error: {e}")
response = ResponseSchema(message="Invalid input", data=None, meta=None) response = ResponseSchema(message="Invalid input", data=None, meta=None)
return jsonify(response.model_dump()), 400 return jsonify(response.model_dump()), 400
except AuthException as e:
current_app.logger.error(f"Auth error: {e}")
response = ResponseSchema(message=e, data=None, meta=None)
return jsonify(response.model_dump()), 400
except Exception as e: except Exception as e:
logger.error(f"Error during Google login: {str(e)}", exc_info=True) current_app.logger.error(
f"Error during Google login: {str(e)}", exc_info=True
)
response = ResponseSchema( response = ResponseSchema(
message="Internal server error", data=None, meta=None message="Internal server error", data=None, meta=None
) )
return jsonify(response.model_dump()), 500 return jsonify(response.model_dump()), 500
def register(self):
return jsonify({"message": "register"}), 200
def logout(self): def logout(self):
return jsonify({"message": "logout"}), 200 return jsonify({"message": "logout"}), 200
def test(self):
return "test"

View File

@ -1,12 +1,41 @@
# /controllers/user_controller.py # /controllers/user_controller.py
from flask import jsonify from flask import jsonify, request, current_app
from services import UserService from services import UserService
from schemas import RegisterSchema
from pydantic import ValidationError
from schemas import ResponseSchema
from exception import AlreadyExistException
class UserController: class UserController:
def __init__(self, userService: UserService): def __init__(self, userService: UserService):
self.user_service = userService self.user_service = userService
def get_users(self): def register(self):
users = self.user_service.get_all_users() try:
return jsonify(users) request_data = request.get_json()
register_data = RegisterSchema(**request_data)
self.user_service.register_user(register_data)
return jsonify(ResponseSchema(message="Register Success").model_dump()), 200
except ValidationError as e:
current_app.logger.error(f"Validation error: {e}")
response = ResponseSchema(message="Invalid input", data=None, meta=None)
return jsonify(response.model_dump()), 400
except AlreadyExistException as e:
return (
jsonify(
ResponseSchema(message=str(e), data=None, meta=None).model_dump()
),
409,
)
except Exception as e:
current_app.logger.error(
f"Error during Google login: {str(e)}", exc_info=True
)
response = ResponseSchema(
message="Internal server error", data=None, meta=None
)
return jsonify(response.model_dump()), 500

View File

@ -1,6 +1,5 @@
from flask_pymongo import PyMongo from flask_pymongo import PyMongo
from flask import Flask from flask import Flask, current_app
from configs import Config
def init_db(app: Flask) -> PyMongo: def init_db(app: Flask) -> PyMongo:
@ -8,10 +7,9 @@ def init_db(app: Flask) -> PyMongo:
mongo = PyMongo(app) mongo = PyMongo(app)
mongo.cx.server_info() mongo.cx.server_info()
print("✅ MongoDB connection successful!") app.logger.info("MongoDB connection established")
return mongo return mongo
except Exception as e: except Exception as e:
print(f"MongoDB connection failed: {e}") app.logger.error(f"MongoDB connection failed: {e}")
return None # Handle failure gracefully return None # Handle failure gracefully

View File

@ -1,4 +1,5 @@
from dependency_injector import containers, providers from dependency_injector import containers, providers
from controllers import UserController
from repositories.user_repository import UserRepository from repositories.user_repository import UserRepository
from services import UserService, AuthService from services import UserService, AuthService
from controllers import AuthController from controllers import AuthController
@ -19,3 +20,4 @@ class Container(containers.DeclarativeContainer):
# controllers # controllers
auth_controller = providers.Factory(AuthController, user_service, auth_service) auth_controller = providers.Factory(AuthController, user_service, auth_service)
user_controller = providers.Factory(UserController, user_service)

View File

@ -0,0 +1,2 @@
from .auth_exception import AuthException
from .already_exist_exception import AlreadyExistException

View File

@ -0,0 +1,6 @@
class AlreadyExistException(Exception):
def __init__(self, entity: str, message: str = None):
if message is None:
message = f"{entity} already exists"
self.message = message
super().__init__(self.message)

View File

@ -0,0 +1,8 @@
from .base_exception import BaseExceptionTemplate
class AuthException(BaseExceptionTemplate):
"""Exception for authentication-related errors"""
def __init__(self, message: str = "Authentication failed"):
super().__init__(message, status_code=401)

View File

@ -0,0 +1,10 @@
class BaseExceptionTemplate(Exception):
"""Base exception template for custom exceptions"""
def __init__(self, message: str, status_code: int = 400):
self.message = message
self.status_code = status_code
super().__init__(self.message)
def __str__(self):
return f"{self.__class__.__name__}: {self.message}"

View File

@ -1,17 +1,15 @@
import sys
from dotenv import load_dotenv
from blueprints import default_blueprint from blueprints import default_blueprint
from di_container import Container from di_container import Container
from configs import Config from configs import Config, LoggerConfig
from flask import Flask from flask import Flask
from blueprints import auth_blueprint from blueprints import auth_blueprint, user_blueprint
from database import init_db from database import init_db
def createApp() -> Flask: def createApp() -> Flask:
app = Flask(__name__) app = Flask(__name__)
app.config.from_object(Config) app.config.from_object(Config)
LoggerConfig.init_logger(app)
container = Container() container = Container()
@ -22,10 +20,12 @@ def createApp() -> Flask:
container.mongo.override(mongo) container.mongo.override(mongo)
container.wire(modules=["blueprints.auth"]) container.wire(modules=["blueprints.auth"])
container.wire(modules=["blueprints.user"])
# Register Blueprints # Register Blueprints
app.register_blueprint(default_blueprint) app.register_blueprint(default_blueprint)
app.register_blueprint(auth_blueprint, url_prefix="/api") app.register_blueprint(auth_blueprint, url_prefix="/api")
app.register_blueprint(user_blueprint, url_prefix="/api")
return app return app

1
app/mapper/__init__.py Normal file
View File

@ -0,0 +1 @@
from .user_mapper import UserMapper

41
app/mapper/user_mapper.py Normal file
View File

@ -0,0 +1,41 @@
from datetime import datetime
from typing import Dict, Optional
from models import UserEntity
from schemas import RegisterSchema
class UserMapper:
@staticmethod
def from_google_payload(
google_id: str, email: str, payload: Dict[str, Optional[str]]
) -> UserEntity:
return UserEntity(
google_id=google_id,
email=email,
name=payload.get("name"),
pic_url=payload.get("picture"),
birth_date=None,
phone=None,
role="user",
is_active=True,
address=None,
created_at=datetime.now(),
updated_at=datetime.now(),
verification_token=None,
)
@staticmethod
def from_register(data: RegisterSchema) -> UserEntity:
return UserEntity(
email=data.email,
password=data.password,
name=data.name,
birth_date=datetime.strptime(data.birth_date, "%d-%m-%Y").date(),
phone=data.phone,
role="user",
is_active=False,
address=None,
created_at=datetime.now(),
updated_at=datetime.now(),
verification_token=None,
)

View File

@ -1 +1,4 @@
from .dto import ApiResponse # app/models/__init__.py
from .entities import UserEntity
__all__ = ["UserEntity", "UserDTO"]

View File

@ -0,0 +1,7 @@
from .user_entity import UserEntity
from .base import PyObjectId
__all__ = [
"UserEntity",
"PyObjectId",
]

View File

@ -0,0 +1,19 @@
from bson import ObjectId
class PyObjectId(ObjectId):
"""Custom ObjectId type for Pydantic to handle MongoDB _id"""
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError("Invalid ObjectId")
return ObjectId(v)
@classmethod
def __modify_schema__(cls, field_schema):
field_schema.update(type="string")

View File

@ -0,0 +1,18 @@
from typing import Optional
from pydantic import BaseModel, EmailStr
from datetime import datetime
from .base import PyObjectId
class UserEntity(BaseModel):
_id: Optional[PyObjectId] = None
google_id: Optional[str] = None
email: EmailStr
password: Optional[str] = None
name: str
birth_date: Optional[datetime] = None
pic_url: Optional[str] = None
phone: Optional[str] = None
locale: str = "en-US"
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None

View File

@ -1,4 +1,6 @@
import sys from typing import Optional
from bson import ObjectId
from models import UserEntity
class UserRepository: class UserRepository:
@ -6,18 +8,48 @@ class UserRepository:
def __init__(self, db): def __init__(self, db):
self.collection = db.users self.collection = db.users
def get_all_users(self): def get_all_users(self) -> list[UserEntity]:
try: """Mengambil semua user dari database."""
users = list(self.collection.find({}, {"_id": 0})) users = list(self.collection.find({}, {"_id": 0}))
return [UserEntity(**user) for user in users]
return users if users else [] def get_user_by_email(self, email: str) -> Optional[UserEntity]:
except Exception as e: """Mendapatkan user berdasarkan email."""
return []
def get_user_by_email(self, email):
try:
user = self.collection.find_one({"email": email}, {"_id": 0}) user = self.collection.find_one({"email": email}, {"_id": 0})
return user if user else None return UserEntity(**user) if user else None
except Exception as e:
return None def get_user_by_id(self, user_id: str) -> Optional[UserEntity]:
"""Mendapatkan user berdasarkan ID."""
object_id = ObjectId(user_id)
user = self.collection.find_one({"_id": object_id})
return UserEntity(**user) if user else None
def get_by_google_id(self, google_id: str) -> Optional[UserEntity]:
user_data = self.collection.find_one({"google_id": google_id})
return UserEntity(**user_data) if user_data else None
def insert_user(self, user_data: UserEntity) -> str:
"""Menambahkan pengguna baru ke dalam database dan mengembalikan ID pengguna."""
result = self.collection.insert_one(user_data.model_dump())
return str(result.inserted_id)
def update_user(self, user_id: str, update_data: dict) -> bool:
"""Mengupdate seluruh data user berdasarkan ID."""
object_id = ObjectId(user_id)
result = self.collection.update_one({"_id": object_id}, {"$set": update_data})
return result.modified_count > 0
def update_user_field(self, user_id: str, field: str, value) -> bool:
"""Mengupdate satu field dari user berdasarkan ID."""
object_id = ObjectId(user_id)
result = self.collection.update_one(
{"_id": object_id}, {"$set": {field: value}}
)
return result.modified_count > 0
def delete_user(self, user_id: str) -> bool:
"""Menghapus user berdasarkan ID."""
object_id = ObjectId(user_id)
result = self.collection.delete_one({"_id": object_id})
return result.deleted_count > 0

View File

@ -1,2 +1,3 @@
from .login_schema import LoginSchema from .login_schema import LoginSchema
from .basic_response_schema import ResponseSchema, MetaSchema from .basic_response_schema import ResponseSchema, MetaSchema
from .requests import RegisterSchema

View File

@ -4,7 +4,7 @@ from pydantic import BaseModel
T = TypeVar("T") T = TypeVar("T")
class MetaSchema: class MetaSchema(BaseModel):
total_page: int total_page: int
current_page: int current_page: int
total_data: int total_data: int

View File

@ -2,4 +2,4 @@ from pydantic import BaseModel
class GoogleLoginSchema(BaseModel): class GoogleLoginSchema(BaseModel):
tokenId: str token_id: str

View File

@ -0,0 +1 @@
from .register_schema import RegisterSchema

View File

@ -0,0 +1,10 @@
from pydantic import BaseModel
from typing import Optional
class RegisterSchema(BaseModel):
email: str
password: str
name: str
birth_date: str
phone: Optional[str] = None

View File

@ -1,10 +1,11 @@
import sys
from schemas import LoginSchema from schemas import LoginSchema
from repositories import UserRepository from repositories import UserRepository
from models import ApiResponse from mapper import UserMapper
from google.oauth2 import id_token from google.oauth2 import id_token
from google.auth.transport import requests from google.auth.transport import requests
from configs import Config from configs import Config
from exception import AuthException
from flask import current_app
class AuthService: class AuthService:
@ -12,48 +13,36 @@ class AuthService:
self.user_repository = userRepository self.user_repository = userRepository
def verify_google_id_token(self, id_token_str): def verify_google_id_token(self, id_token_str):
try:
payload = id_token.verify_oauth2_token( payload = id_token.verify_oauth2_token(
id_token_str, requests.Request(), Config.GOOGLE_CLIENT_ID id_token_str, requests.Request(), Config.GOOGLE_CLIENT_ID
) )
print(f"output verify {payload}", file=sys.stderr) if not payload:
raise AuthException("Invalid Google ID Token")
user_data = { google_id = payload.get("sub")
"_id": payload.get("sub"), email = payload.get("email")
"email": payload.get("email"),
"name": payload.get("name"),
"picture": payload.get("picture"),
"given_name": payload.get("given_name"),
"family_name": payload.get("family_name"),
"locale": payload.get("locale"),
"email_verified": payload.get("email_verified", False),
"iat": payload.get("iat"),
"exp": payload.get("exp"),
}
return payload existing_user = self.user_repository.get_by_google_id(google_id)
except Exception as e: if existing_user:
print(f"issue on the verify {e}", file=sys.stderr) if existing_user.email == email:
return None return existing_user
raise AuthException("Email not match")
new_user = UserMapper.from_google_payload(google_id, email, payload)
user_id = self.user_repository.insert_user(user_data=new_user)
return self.user_repository.get_user_by_id(user_id=user_id)
def login(self, data: LoginSchema): def login(self, data: LoginSchema):
try:
user_data = self.user_repository.get_user_by_email(data.email) user_data = self.user_repository.get_user_by_email(data.email)
if user_data == None: if user_data == None:
return ApiResponse(success=False, message="User not found", data=None) return None
if user_data["password"] == data.password: if user_data.password == data.password:
del user_data["password"] del user_data.password
return ApiResponse( return user_data
success=True, message="Login success", data=user_data return None
)
return ApiResponse(success=False, message="Invalid password", data=None)
except Exception as e:
print(f"the issue is {e}")
return ApiResponse(
success=False, message="Internal server error", data=None
)

View File

@ -1,4 +1,8 @@
from flask import current_app
from repositories import UserRepository from repositories import UserRepository
from schemas import RegisterSchema
from mapper import UserMapper
from exception import AlreadyExistException
class UserService: class UserService:
@ -7,3 +11,11 @@ class UserService:
def get_all_users(self): def get_all_users(self):
return self.user_repository.get_all_users() return self.user_repository.get_all_users()
def register_user(self, user_data: RegisterSchema):
existData = self.user_repository.get_user_by_email(user_data.email)
if existData:
raise AlreadyExistException(entity="Email")
data = UserMapper.from_register(user_data)
return self.user_repository.insert_user(data)