87 lines
2.9 KiB
Python
87 lines
2.9 KiB
Python
from datetime import datetime
|
|
import sys
|
|
import traceback
|
|
from schemas import LoginSchema
|
|
from repositories import UserRepository
|
|
|
|
# from models import ApiResponse
|
|
from google.oauth2 import id_token
|
|
from google.auth.transport import requests
|
|
from configs import Config
|
|
from models import UserEntity
|
|
|
|
|
|
class AuthService:
|
|
def __init__(self, userRepository: UserRepository):
|
|
self.user_repository = userRepository
|
|
|
|
def verify_google_id_token(self, id_token_str):
|
|
try:
|
|
# Verifikasi token Google
|
|
payload = id_token.verify_oauth2_token(
|
|
id_token_str, requests.Request(), Config.GOOGLE_CLIENT_ID
|
|
)
|
|
|
|
if not payload:
|
|
print("Invalid token", file=sys.stderr)
|
|
return None
|
|
|
|
google_id = payload.get("sub")
|
|
email = payload.get("email")
|
|
|
|
existing_user = self.user_repository.get_by_google_id(google_id)
|
|
if existing_user:
|
|
if existing_user.email == email:
|
|
return existing_user
|
|
|
|
new_user = UserEntity(
|
|
id=str(google_id),
|
|
google_id=google_id,
|
|
email=email,
|
|
name=payload.get("name"),
|
|
pic_url=payload.get("picture"),
|
|
birth_date=None,
|
|
phone=None,
|
|
role="user",
|
|
is_active=True,
|
|
address=None,
|
|
created_at=datetime.now(),
|
|
updated_at=datetime.now(),
|
|
verification_token=None,
|
|
)
|
|
|
|
# Simpan user ke database
|
|
user_id = self.user_repository.insert_user(user_data=new_user)
|
|
|
|
# Ambil user yang baru dibuat berdasarkan ID
|
|
return self.user_repository.get_user_by_id(user_id=user_id)
|
|
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
print(f"Error verifying Google ID token: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
def login(self, data: LoginSchema):
|
|
try:
|
|
|
|
user_data = self.user_repository.get_user_by_email(data.email)
|
|
|
|
if user_data == None:
|
|
# return ApiResponse(success=False, message="User not found", data=None)
|
|
return None
|
|
|
|
if user_data["password"] == data.password:
|
|
del user_data["password"]
|
|
# return ApiResponse(
|
|
# success=True, message="Login success", data=user_data
|
|
# )
|
|
return None
|
|
# return ApiResponse(success=False, message="Invalid password", data=None)
|
|
return None
|
|
except Exception as e:
|
|
print(f"the issue is {e}")
|
|
# return ApiResponse(
|
|
# success=False, message="Internal server error", data=None
|
|
# )
|
|
return None
|