daily-briefing/server/routers/admin.py

187 lines
6.3 KiB
Python
Raw Permalink Normal View History

"""Admin router — protected CRUD for settings, integrations, and MQTT subscriptions."""
from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from server.auth import require_admin
from server.services import settings_service
from server.services.test_connections import TEST_FUNCTIONS
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/admin",
tags=["admin"],
dependencies=[Depends(require_admin)],
)
# ---------------------------------------------------------------------------
# Request/Response Models
# ---------------------------------------------------------------------------
class SettingsUpdate(BaseModel):
settings: Dict[str, Any]
class IntegrationUpdate(BaseModel):
name: Optional[str] = None
config: Optional[Dict[str, Any]] = None
enabled: Optional[bool] = None
display_order: Optional[int] = None
class MqttSubscriptionCreate(BaseModel):
topic_pattern: str
display_name: str = ""
category: str = "other"
unit: str = ""
widget_type: str = "value"
enabled: bool = True
display_order: int = 0
class MqttSubscriptionUpdate(BaseModel):
topic_pattern: Optional[str] = None
display_name: Optional[str] = None
category: Optional[str] = None
unit: Optional[str] = None
widget_type: Optional[str] = None
enabled: Optional[bool] = None
display_order: Optional[int] = None
# ---------------------------------------------------------------------------
# Settings
# ---------------------------------------------------------------------------
@router.get("/settings")
async def get_settings() -> Dict[str, Any]:
"""Return all app settings."""
return await settings_service.get_all_settings()
@router.put("/settings")
async def update_settings(body: SettingsUpdate) -> Dict[str, str]:
"""Bulk update settings."""
await settings_service.bulk_set_settings(body.settings)
# Reload in-memory settings
await _reload_app_settings()
return {"status": "ok", "message": f"Updated {len(body.settings)} setting(s)"}
# ---------------------------------------------------------------------------
# Integrations
# ---------------------------------------------------------------------------
@router.get("/integrations")
async def list_integrations() -> List[Dict[str, Any]]:
"""List all integration configs."""
return await settings_service.get_integrations()
@router.get("/integrations/{type_name}")
async def get_integration(type_name: str) -> Dict[str, Any]:
"""Get a single integration config."""
result = await settings_service.get_integration(type_name)
if result is None:
raise HTTPException(status_code=404, detail=f"Integration '{type_name}' not found")
return result
@router.put("/integrations/{type_name}")
async def update_integration(type_name: str, body: IntegrationUpdate) -> Dict[str, Any]:
"""Update an integration config."""
existing = await settings_service.get_integration(type_name)
if existing is None:
raise HTTPException(status_code=404, detail=f"Integration '{type_name}' not found")
result = await settings_service.upsert_integration(
type_name=type_name,
name=body.name or existing["name"],
config=body.config if body.config is not None else existing["config"],
enabled=body.enabled if body.enabled is not None else existing["enabled"],
display_order=body.display_order if body.display_order is not None else existing["display_order"],
)
# Reload in-memory settings
await _reload_app_settings()
return result
@router.post("/integrations/{type_name}/test")
async def test_integration(type_name: str) -> Dict[str, Any]:
"""Test an integration connection."""
integration = await settings_service.get_integration(type_name)
if integration is None:
raise HTTPException(status_code=404, detail=f"Integration '{type_name}' not found")
test_fn = TEST_FUNCTIONS.get(type_name)
if test_fn is None:
return {"success": False, "message": f"No test available for '{type_name}'"}
return await test_fn(integration["config"])
# ---------------------------------------------------------------------------
# MQTT Subscriptions
# ---------------------------------------------------------------------------
@router.get("/mqtt/subscriptions")
async def list_mqtt_subscriptions() -> List[Dict[str, Any]]:
"""List all MQTT subscriptions."""
return await settings_service.get_mqtt_subscriptions()
@router.post("/mqtt/subscriptions")
async def create_mqtt_subscription(body: MqttSubscriptionCreate) -> Dict[str, Any]:
"""Create a new MQTT subscription."""
return await settings_service.create_mqtt_subscription(
topic_pattern=body.topic_pattern,
display_name=body.display_name,
category=body.category,
unit=body.unit,
widget_type=body.widget_type,
enabled=body.enabled,
display_order=body.display_order,
)
@router.put("/mqtt/subscriptions/{sub_id}")
async def update_mqtt_subscription(sub_id: int, body: MqttSubscriptionUpdate) -> Dict[str, Any]:
"""Update an MQTT subscription."""
fields = body.model_dump(exclude_none=True)
if not fields:
raise HTTPException(status_code=400, detail="No fields to update")
result = await settings_service.update_mqtt_subscription(sub_id, **fields)
if result is None:
raise HTTPException(status_code=404, detail="Subscription not found")
return result
@router.delete("/mqtt/subscriptions/{sub_id}")
async def delete_mqtt_subscription(sub_id: int) -> Dict[str, str]:
"""Delete an MQTT subscription."""
deleted = await settings_service.delete_mqtt_subscription(sub_id)
if not deleted:
raise HTTPException(status_code=404, detail="Subscription not found")
return {"status": "ok", "message": "Subscription deleted"}
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
async def _reload_app_settings() -> None:
"""Reload the in-memory Settings object from the database."""
try:
from server.config import reload_settings
await reload_settings()
except Exception:
logger.exception("Failed to reload settings after admin change")