94 lines
2.9 KiB
Python
94 lines
2.9 KiB
Python
import os
|
|
import bcrypt
|
|
import jwt
|
|
from datetime import datetime, timedelta
|
|
from fastapi import APIRouter, HTTPException, Query
|
|
from pydantic import BaseModel
|
|
from config.db import conn
|
|
from models.index import users
|
|
from dotenv import load_dotenv
|
|
from sqlalchemy.sql import select
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Secret key untuk JWT
|
|
SECRET_KEY = os.getenv("SECRET_KEY", "supersecretkey")
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 60 # Token berlaku selama 1 jam
|
|
|
|
# Router
|
|
auth_router = APIRouter(
|
|
prefix="/auth",
|
|
tags=["Auth"]
|
|
)
|
|
|
|
# Schema untuk login request
|
|
class LoginRequest(BaseModel):
|
|
email: str
|
|
password: str
|
|
|
|
@auth_router.post("/login")
|
|
async def login(data: LoginRequest):
|
|
# Cek apakah user dengan email ini ada di database
|
|
user = conn.execute(users.select().where(users.c.email == data.email)).fetchone()
|
|
if not user:
|
|
raise HTTPException(status_code=400, detail="Invalid email or password")
|
|
|
|
# Verifikasi password
|
|
if not bcrypt.checkpw(data.password.encode("utf-8"), user.password.encode("utf-8")):
|
|
raise HTTPException(status_code=400, detail="Invalid email or password")
|
|
|
|
# Buat token JWT
|
|
payload = {
|
|
"sub": user.email,
|
|
"role": user.role,
|
|
"exp": datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
}
|
|
token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
|
|
|
|
# Kembalikan token beserta data user
|
|
return {
|
|
"access_token": token,
|
|
"token_type": "bearer",
|
|
"expires_in": ACCESS_TOKEN_EXPIRE_MINUTES * 60,
|
|
"user": {
|
|
"id": user.id,
|
|
"email": user.email,
|
|
"role": user.role,
|
|
}
|
|
}
|
|
|
|
class ForgotPasswordRequest(BaseModel):
|
|
email: str
|
|
new_password: str
|
|
|
|
@auth_router.put("/forgot-password")
|
|
async def forgot_password(data: ForgotPasswordRequest):
|
|
# Cek apakah user dengan email ini ada di database
|
|
user = conn.execute(users.select().where(users.c.email == data.email)).fetchone()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="Email tidak ditemukan")
|
|
|
|
# Hash password baru
|
|
hashed_password = bcrypt.hashpw(data.new_password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
|
|
|
|
# Update password di database
|
|
conn.execute(users.update().where(users.c.email == data.email).values(password=hashed_password))
|
|
conn.commit()
|
|
|
|
return {"message": "Password berhasil diperbarui"}
|
|
|
|
class CekEmail(BaseModel):
|
|
email: str
|
|
|
|
|
|
@auth_router.get("/check-email")
|
|
async def check_email(email: str = Query(..., description="Email yang akan dicek")):
|
|
query = select(users).where(users.c.email == email)
|
|
result = conn.execute(query).fetchone()
|
|
|
|
if result:
|
|
return {"message": "true"}
|
|
else:
|
|
raise HTTPException(status_code=400, detail="false") |