daily-briefing/server/routers/auth.py

80 lines
2.3 KiB
Python
Raw Normal View History

"""Auth router — login and password management."""
from __future__ import annotations
import logging
from typing import Any, Dict
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from server.auth import (
create_access_token,
hash_password,
require_admin,
verify_password,
)
from server.services import settings_service
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/auth", tags=["auth"])
class LoginRequest(BaseModel):
username: str
password: str
class ChangePasswordRequest(BaseModel):
current_password: str
new_password: str
@router.post("/login")
async def login(body: LoginRequest) -> Dict[str, Any]:
"""Authenticate admin and return a JWT."""
user = await settings_service.get_admin_user()
if user is None:
raise HTTPException(status_code=503, detail="No admin user configured")
if body.username != user["username"]:
raise HTTPException(status_code=401, detail="Invalid credentials")
if not verify_password(body.password, user["password_hash"]):
raise HTTPException(status_code=401, detail="Invalid credentials")
token = create_access_token(user["username"])
return {
"token": token,
"username": user["username"],
}
@router.get("/me")
async def get_me(admin_user: str = Depends(require_admin)) -> Dict[str, str]:
"""Return the authenticated admin username. Used to verify token validity."""
return {"username": admin_user}
@router.put("/password")
async def change_password(
body: ChangePasswordRequest,
admin_user: str = Depends(require_admin),
) -> Dict[str, str]:
"""Change the admin password."""
user = await settings_service.get_admin_user()
if user is None:
raise HTTPException(status_code=500, detail="Admin user not found")
if not verify_password(body.current_password, user["password_hash"]):
raise HTTPException(status_code=400, detail="Current password is incorrect")
if len(body.new_password) < 6:
raise HTTPException(status_code=400, detail="New password must be at least 6 characters")
await settings_service.update_admin_password(
user["id"], hash_password(body.new_password)
)
return {"status": "ok", "message": "Password changed successfully"}