80 lines
2.3 KiB
Python
80 lines
2.3 KiB
Python
|
|
"""Auth router — login and password management."""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import logging
|
||
|
|
from typing import Any, Dict
|
||
|
|
|
||
|
|
from fastapi import APIRouter, Depends, HTTPException
|
||
|
|
from pydantic import BaseModel
|
||
|
|
|
||
|
|
from server.auth import (
|
||
|
|
create_access_token,
|
||
|
|
hash_password,
|
||
|
|
require_admin,
|
||
|
|
verify_password,
|
||
|
|
)
|
||
|
|
from server.services import settings_service
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
router = APIRouter(prefix="/api/auth", tags=["auth"])
|
||
|
|
|
||
|
|
|
||
|
|
class LoginRequest(BaseModel):
|
||
|
|
username: str
|
||
|
|
password: str
|
||
|
|
|
||
|
|
|
||
|
|
class ChangePasswordRequest(BaseModel):
|
||
|
|
current_password: str
|
||
|
|
new_password: str
|
||
|
|
|
||
|
|
|
||
|
|
@router.post("/login")
|
||
|
|
async def login(body: LoginRequest) -> Dict[str, Any]:
|
||
|
|
"""Authenticate admin and return a JWT."""
|
||
|
|
user = await settings_service.get_admin_user()
|
||
|
|
if user is None:
|
||
|
|
raise HTTPException(status_code=503, detail="No admin user configured")
|
||
|
|
|
||
|
|
if body.username != user["username"]:
|
||
|
|
raise HTTPException(status_code=401, detail="Invalid credentials")
|
||
|
|
|
||
|
|
if not verify_password(body.password, user["password_hash"]):
|
||
|
|
raise HTTPException(status_code=401, detail="Invalid credentials")
|
||
|
|
|
||
|
|
token = create_access_token(user["username"])
|
||
|
|
return {
|
||
|
|
"token": token,
|
||
|
|
"username": user["username"],
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
@router.get("/me")
|
||
|
|
async def get_me(admin_user: str = Depends(require_admin)) -> Dict[str, str]:
|
||
|
|
"""Return the authenticated admin username. Used to verify token validity."""
|
||
|
|
return {"username": admin_user}
|
||
|
|
|
||
|
|
|
||
|
|
@router.put("/password")
|
||
|
|
async def change_password(
|
||
|
|
body: ChangePasswordRequest,
|
||
|
|
admin_user: str = Depends(require_admin),
|
||
|
|
) -> Dict[str, str]:
|
||
|
|
"""Change the admin password."""
|
||
|
|
user = await settings_service.get_admin_user()
|
||
|
|
if user is None:
|
||
|
|
raise HTTPException(status_code=500, detail="Admin user not found")
|
||
|
|
|
||
|
|
if not verify_password(body.current_password, user["password_hash"]):
|
||
|
|
raise HTTPException(status_code=400, detail="Current password is incorrect")
|
||
|
|
|
||
|
|
if len(body.new_password) < 6:
|
||
|
|
raise HTTPException(status_code=400, detail="New password must be at least 6 characters")
|
||
|
|
|
||
|
|
await settings_service.update_admin_password(
|
||
|
|
user["id"], hash_password(body.new_password)
|
||
|
|
)
|
||
|
|
return {"status": "ok", "message": "Password changed successfully"}
|