TIF_E41211115_Genso_quiz_ba.../app/services/auth_service.py

64 lines
2.2 KiB
Python

from schemas import LoginSchema
from repositories import UserRepository
from mapper import UserMapper
from google.oauth2 import id_token
from google.auth.transport import requests
from configs import Config
from core import AuthException
from flask import current_app
class AuthService:
def __init__(self, userRepository: UserRepository):
self.user_repository = userRepository
def verify_google_id_token(self, id_token_str):
# Verifikasi token Google
payload = id_token.verify_oauth2_token(
id_token_str, requests.Request(), Config.GOOGLE_CLIENT_ID
)
if not payload:
return AuthException("Invalid Google ID Token")
google_id = payload.get("sub")
email = payload.get("email")
existing_user = self.user_repository.get_by_google_id(google_id)
if existing_user:
current_app.logger.info(f"User {existing_user.email} already exists ")
if existing_user.email == email:
return existing_user
return AuthException("Email not match")
new_user = UserMapper.from_google_payload(google_id, email, payload)
user_id = self.user_repository.insert_user(user_data=new_user)
return self.user_repository.get_user_by_id(user_id=user_id)
def login(self, data: LoginSchema):
try:
user_data = self.user_repository.get_user_by_email(data.email)
if user_data == None:
# return ApiResponse(success=False, message="User not found", data=None)
return None
if user_data["password"] == data.password:
del user_data["password"]
# return ApiResponse(
# success=True, message="Login success", data=user_data
# )
return None
# return ApiResponse(success=False, message="Invalid password", data=None)
return None
except Exception as e:
print(f"the issue is {e}")
# return ApiResponse(
# success=False, message="Internal server error", data=None
# )
return None