"""JWT authentication for admin routes.""" from __future__ import annotations import logging import os import secrets from datetime import datetime, timedelta, timezone from typing import Optional from fastapi import Depends, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jose import JWTError, jwt from passlib.context import CryptContext logger = logging.getLogger(__name__) JWT_SECRET = os.getenv("JWT_SECRET") or secrets.token_urlsafe(32) JWT_ALGORITHM = "HS256" JWT_EXPIRE_HOURS = 24 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") bearer_scheme = HTTPBearer(auto_error=False) def hash_password(password: str) -> str: """Hash a plain-text password with bcrypt.""" return pwd_context.hash(password) def verify_password(plain: str, hashed: str) -> bool: """Verify a plain-text password against its bcrypt hash.""" return pwd_context.verify(plain, hashed) def create_access_token(subject: str) -> str: """Create a JWT access token for the given subject (username).""" expire = datetime.now(timezone.utc) + timedelta(hours=JWT_EXPIRE_HOURS) return jwt.encode( {"sub": subject, "exp": expire}, JWT_SECRET, algorithm=JWT_ALGORITHM, ) async def require_admin( credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme), ) -> str: """FastAPI dependency that validates the JWT and returns the username. Use as: ``admin_user: str = Depends(require_admin)`` """ if credentials is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required", headers={"WWW-Authenticate": "Bearer"}, ) token = credentials.credentials try: payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) username: Optional[str] = payload.get("sub") if username is None: raise HTTPException(status_code=401, detail="Invalid token payload") return username except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token", headers={"WWW-Authenticate": "Bearer"}, )