"""JWT authentication for admin routes.""" from __future__ import annotations import logging import os import secrets from datetime import datetime, timedelta, timezone from typing import Optional import bcrypt from fastapi import Depends, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jose import JWTError, jwt logger = logging.getLogger(__name__) JWT_SECRET = os.getenv("JWT_SECRET") or secrets.token_urlsafe(32) JWT_ALGORITHM = "HS256" JWT_EXPIRE_HOURS = 24 bearer_scheme = HTTPBearer(auto_error=False) def hash_password(password: str) -> str: """Hash a plain-text password with bcrypt.""" return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") def verify_password(plain: str, hashed: str) -> bool: """Verify a plain-text password against its bcrypt hash.""" return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8")) def create_access_token(subject: str) -> str: """Create a JWT access token for the given subject (username).""" expire = datetime.now(timezone.utc) + timedelta(hours=JWT_EXPIRE_HOURS) return jwt.encode( {"sub": subject, "exp": expire}, JWT_SECRET, algorithm=JWT_ALGORITHM, ) async def require_admin( credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme), ) -> str: """FastAPI dependency that validates the JWT and returns the username. Use as: ``admin_user: str = Depends(require_admin)`` """ if credentials is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required", headers={"WWW-Authenticate": "Bearer"}, ) token = credentials.credentials try: payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) username: Optional[str] = payload.get("sub") if username is None: raise HTTPException(status_code=401, detail="Invalid token payload") return username except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token", headers={"WWW-Authenticate": "Bearer"}, )