"""Admin router — protected CRUD for settings, integrations, and MQTT subscriptions.""" from __future__ import annotations import logging from typing import Any, Dict, List, Optional from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from server.auth import require_admin from server.services import settings_service from server.services.test_connections import TEST_FUNCTIONS logger = logging.getLogger(__name__) router = APIRouter( prefix="/api/admin", tags=["admin"], dependencies=[Depends(require_admin)], ) # --------------------------------------------------------------------------- # Request/Response Models # --------------------------------------------------------------------------- class SettingsUpdate(BaseModel): settings: Dict[str, Any] class IntegrationUpdate(BaseModel): name: Optional[str] = None config: Optional[Dict[str, Any]] = None enabled: Optional[bool] = None display_order: Optional[int] = None class MqttSubscriptionCreate(BaseModel): topic_pattern: str display_name: str = "" category: str = "other" unit: str = "" widget_type: str = "value" enabled: bool = True display_order: int = 0 class MqttSubscriptionUpdate(BaseModel): topic_pattern: Optional[str] = None display_name: Optional[str] = None category: Optional[str] = None unit: Optional[str] = None widget_type: Optional[str] = None enabled: Optional[bool] = None display_order: Optional[int] = None # --------------------------------------------------------------------------- # Settings # --------------------------------------------------------------------------- @router.get("/settings") async def get_settings() -> Dict[str, Any]: """Return all app settings.""" return await settings_service.get_all_settings() @router.put("/settings") async def update_settings(body: SettingsUpdate) -> Dict[str, str]: """Bulk update settings.""" await settings_service.bulk_set_settings(body.settings) # Reload in-memory settings await _reload_app_settings() return {"status": "ok", "message": f"Updated {len(body.settings)} setting(s)"} # --------------------------------------------------------------------------- # Integrations # --------------------------------------------------------------------------- @router.get("/integrations") async def list_integrations() -> List[Dict[str, Any]]: """List all integration configs.""" return await settings_service.get_integrations() @router.get("/integrations/{type_name}") async def get_integration(type_name: str) -> Dict[str, Any]: """Get a single integration config.""" result = await settings_service.get_integration(type_name) if result is None: raise HTTPException(status_code=404, detail=f"Integration '{type_name}' not found") return result @router.put("/integrations/{type_name}") async def update_integration(type_name: str, body: IntegrationUpdate) -> Dict[str, Any]: """Update an integration config.""" existing = await settings_service.get_integration(type_name) if existing is None: raise HTTPException(status_code=404, detail=f"Integration '{type_name}' not found") result = await settings_service.upsert_integration( type_name=type_name, name=body.name or existing["name"], config=body.config if body.config is not None else existing["config"], enabled=body.enabled if body.enabled is not None else existing["enabled"], display_order=body.display_order if body.display_order is not None else existing["display_order"], ) # Reload in-memory settings await _reload_app_settings() return result @router.post("/integrations/{type_name}/test") async def test_integration(type_name: str) -> Dict[str, Any]: """Test an integration connection.""" integration = await settings_service.get_integration(type_name) if integration is None: raise HTTPException(status_code=404, detail=f"Integration '{type_name}' not found") test_fn = TEST_FUNCTIONS.get(type_name) if test_fn is None: return {"success": False, "message": f"No test available for '{type_name}'"} return await test_fn(integration["config"]) # --------------------------------------------------------------------------- # MQTT Subscriptions # --------------------------------------------------------------------------- @router.get("/mqtt/subscriptions") async def list_mqtt_subscriptions() -> List[Dict[str, Any]]: """List all MQTT subscriptions.""" return await settings_service.get_mqtt_subscriptions() @router.post("/mqtt/subscriptions") async def create_mqtt_subscription(body: MqttSubscriptionCreate) -> Dict[str, Any]: """Create a new MQTT subscription.""" return await settings_service.create_mqtt_subscription( topic_pattern=body.topic_pattern, display_name=body.display_name, category=body.category, unit=body.unit, widget_type=body.widget_type, enabled=body.enabled, display_order=body.display_order, ) @router.put("/mqtt/subscriptions/{sub_id}") async def update_mqtt_subscription(sub_id: int, body: MqttSubscriptionUpdate) -> Dict[str, Any]: """Update an MQTT subscription.""" fields = body.model_dump(exclude_none=True) if not fields: raise HTTPException(status_code=400, detail="No fields to update") result = await settings_service.update_mqtt_subscription(sub_id, **fields) if result is None: raise HTTPException(status_code=404, detail="Subscription not found") return result @router.delete("/mqtt/subscriptions/{sub_id}") async def delete_mqtt_subscription(sub_id: int) -> Dict[str, str]: """Delete an MQTT subscription.""" deleted = await settings_service.delete_mqtt_subscription(sub_id) if not deleted: raise HTTPException(status_code=404, detail="Subscription not found") return {"status": "ok", "message": "Subscription deleted"} # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- async def _reload_app_settings() -> None: """Reload the in-memory Settings object from the database.""" try: from server.config import reload_settings await reload_settings() except Exception: logger.exception("Failed to reload settings after admin change")