Luxx/luxx/api/auth.py

165 lines
4.8 KiB
Python

"""Authentication routes"""
from datetime import timedelta
from fastapi import APIRouter, Depends, status, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from pydantic import BaseModel
from luxx.core.database import get_db
from luxx.models.user import User
from luxx.utils.helpers import (
hash_password,
verify_password,
create_access_token,
decode_access_token,
success_response,
error_response
)
router = APIRouter(prefix="/auth", tags=["Authentication"])
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
class UserRegister(BaseModel):
username: str
email: str | None = None
password: str
class UserLogin(BaseModel):
username: str
password: str
class UserResponse(BaseModel):
id: int
username: str
email: str | None
role: str
permission_level: int
class UserPermissionUpdate(BaseModel):
permission_level: int
class TokenResponse(BaseModel):
access_token: str
token_type: str
def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> User:
"""Get current user"""
payload = decode_access_token(token)
if not payload:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
user_id = payload.get("sub")
if not user_id:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
user = db.query(User).filter(User.id == int(user_id)).first()
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
return user
def require_admin(current_user: User = Depends(get_current_user)) -> User:
"""Require admin role"""
if current_user.permission_level < 4:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin permission required")
return current_user
@router.post("/register", response_model=dict)
def register(user_data: UserRegister, db: Session = Depends(get_db)):
"""User registration"""
existing_user = db.query(User).filter(User.username == user_data.username).first()
if existing_user:
return error_response("Username already exists", 400)
if user_data.email:
existing_email = db.query(User).filter(User.email == user_data.email).first()
if existing_email:
return error_response("Email already registered", 400)
password_hash = hash_password(user_data.password)
user = User(
username=user_data.username,
email=user_data.email,
password_hash=password_hash
)
db.add(user)
db.commit()
db.refresh(user)
return success_response(
data={"id": user.id, "username": user.username},
message="Registration successful"
)
@router.post("/login", response_model=dict)
def login(user_data: UserLogin, db: Session = Depends(get_db)):
"""User login"""
user = db.query(User).filter(User.username == user_data.username).first()
if not user or not verify_password(user_data.password, user.password_hash or ""):
return error_response("Invalid username or password", 401)
if not user.is_active:
return error_response("User account is disabled", 403)
access_token = create_access_token(
data={"sub": str(user.id)},
expires_delta=timedelta(days=7)
)
return success_response(
data={
"access_token": access_token,
"token_type": "bearer",
"user": user.to_dict()
},
message="Login successful"
)
@router.post("/logout")
def logout(current_user: User = Depends(get_current_user)):
"""User logout"""
return success_response(message="Logout successful")
@router.get("/me", response_model=dict)
def get_me(current_user: User = Depends(get_current_user)):
"""Get current user info"""
return success_response(data=current_user.to_dict())
@router.get("/users", response_model=dict)
def get_users(admin_user: User = Depends(require_admin), db: Session = Depends(get_db)):
"""Get all users (admin only)"""
users = db.query(User).all()
return success_response(data={"users": [u.to_dict() for u in users]})
@router.put("/users/{user_id}", response_model=dict)
def update_user(user_id: int, data: UserPermissionUpdate, admin_user: User = Depends(require_admin), db: Session = Depends(get_db)):
"""Update user permission level (admin only)"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
return error_response("User not found", 404)
if data.permission_level < 1 or data.permission_level > 4:
return error_response("Invalid permission level (1-4)", 400)
user.permission_level = data.permission_level
db.commit()
return success_response(data=user.to_dict(), message="User permission updated")