Complete admin backend with login, where all integrations (weather, news, Home Assistant, Vikunja, Unraid, MQTT) can be configured via web UI instead of ENV variables. Two-layer config: ENV seeds DB on first start, then DB is source of truth. Auto-migration system on startup. Backend: db.py shared pool, auth.py JWT, settings_service CRUD, seed_service, admin router (protected), test_connections per integration, config.py rewrite. Frontend: react-router v6, login page, admin layout with sidebar, 8 settings pages (General, Weather, News, HA, Vikunja, Unraid, MQTT, ChangePassword), shared IntegrationForm + TestButton components. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
79 lines
2.3 KiB
Python
79 lines
2.3 KiB
Python
"""Auth router — login and password management."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import Any, Dict
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from pydantic import BaseModel
|
|
|
|
from server.auth import (
|
|
create_access_token,
|
|
hash_password,
|
|
require_admin,
|
|
verify_password,
|
|
)
|
|
from server.services import settings_service
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/api/auth", tags=["auth"])
|
|
|
|
|
|
class LoginRequest(BaseModel):
|
|
username: str
|
|
password: str
|
|
|
|
|
|
class ChangePasswordRequest(BaseModel):
|
|
current_password: str
|
|
new_password: str
|
|
|
|
|
|
@router.post("/login")
|
|
async def login(body: LoginRequest) -> Dict[str, Any]:
|
|
"""Authenticate admin and return a JWT."""
|
|
user = await settings_service.get_admin_user()
|
|
if user is None:
|
|
raise HTTPException(status_code=503, detail="No admin user configured")
|
|
|
|
if body.username != user["username"]:
|
|
raise HTTPException(status_code=401, detail="Invalid credentials")
|
|
|
|
if not verify_password(body.password, user["password_hash"]):
|
|
raise HTTPException(status_code=401, detail="Invalid credentials")
|
|
|
|
token = create_access_token(user["username"])
|
|
return {
|
|
"token": token,
|
|
"username": user["username"],
|
|
}
|
|
|
|
|
|
@router.get("/me")
|
|
async def get_me(admin_user: str = Depends(require_admin)) -> Dict[str, str]:
|
|
"""Return the authenticated admin username. Used to verify token validity."""
|
|
return {"username": admin_user}
|
|
|
|
|
|
@router.put("/password")
|
|
async def change_password(
|
|
body: ChangePasswordRequest,
|
|
admin_user: str = Depends(require_admin),
|
|
) -> Dict[str, str]:
|
|
"""Change the admin password."""
|
|
user = await settings_service.get_admin_user()
|
|
if user is None:
|
|
raise HTTPException(status_code=500, detail="Admin user not found")
|
|
|
|
if not verify_password(body.current_password, user["password_hash"]):
|
|
raise HTTPException(status_code=400, detail="Current password is incorrect")
|
|
|
|
if len(body.new_password) < 6:
|
|
raise HTTPException(status_code=400, detail="New password must be at least 6 characters")
|
|
|
|
await settings_service.update_admin_password(
|
|
user["id"], hash_password(body.new_password)
|
|
)
|
|
return {"status": "ok", "message": "Password changed successfully"}
|