"""Dashboard aggregate router -- combined endpoint and WebSocket push.""" from __future__ import annotations import asyncio import json import logging from datetime import datetime, timezone from typing import Any, Dict, List from fastapi import APIRouter, WebSocket, WebSocketDisconnect from server.routers.homeassistant import get_ha from server.routers.news import get_news_articles from server.routers.servers import get_servers from server.routers.tasks import get_tasks from server.routers.weather import get_weather from server.services.mqtt_service import mqtt_service logger = logging.getLogger(__name__) router = APIRouter(tags=["dashboard"]) # Connected WebSocket clients clients: List[WebSocket] = [] @router.get("/api/all") async def get_all() -> Dict[str, Any]: """Fetch every data source in parallel and return a single combined dict. Response shape:: { "weather": { ... }, "news": { ... }, "servers": { ... }, "ha": { ... }, "tasks": { ... }, "timestamp": "ISO-8601 string" } Individual sections that fail will contain ``{"error": true, "message": "..."}``. """ results = await asyncio.gather( _safe(get_weather, "weather"), _safe(lambda: get_news_articles(limit=20, offset=0, category=None), "news"), _safe(get_servers, "servers"), _safe(get_ha, "ha"), _safe(get_tasks, "tasks"), ) weather_data, news_data, servers_data, ha_data, tasks_data = results # Log a concise summary of what worked and what failed sections = {"weather": weather_data, "news": news_data, "servers": servers_data, "ha": ha_data, "tasks": tasks_data} errors = [k for k, v in sections.items() if isinstance(v, dict) and v.get("error")] if errors: logger.warning("[DASHBOARD] Sections with errors: %s", ", ".join(errors)) return { "weather": weather_data, "news": news_data, "servers": servers_data, "ha": ha_data, "tasks": tasks_data, "mqtt": { "connected": mqtt_service.connected, "entities": mqtt_service.get_entities(), }, "timestamp": datetime.now(timezone.utc).isoformat(), } @router.websocket("/ws") async def ws_endpoint(ws: WebSocket) -> None: """WebSocket that pushes fresh dashboard data on every client ping. The client should send periodic text messages (e.g. ``"ping"``) to request an update. If no message arrives within 20 seconds the server sends a refresh anyway, keeping the connection alive. """ await ws.accept() clients.append(ws) logger.info("WebSocket client connected (%d total)", len(clients)) try: while True: # Wait for a client ping / keepalive; refresh on timeout too. try: _msg = await asyncio.wait_for(ws.receive_text(), timeout=20.0) except asyncio.TimeoutError: pass # Build and push the latest data try: data = await get_all() await ws.send_json(data) except Exception as exc: logger.exception("Error sending WebSocket payload") # Try to send a lightweight error frame; if that also fails the # outer handler will close the connection. try: await ws.send_json({"error": True, "message": str(exc)}) except Exception: break except WebSocketDisconnect: logger.info("WebSocket client disconnected") except Exception as exc: logger.exception("Unexpected WebSocket error") finally: if ws in clients: clients.remove(ws) logger.info("WebSocket clients remaining: %d", len(clients)) # -- internal helpers --------------------------------------------------------- async def _safe(coro_or_callable: Any, label: str) -> Dict[str, Any]: """Call an async function and return its result, or an error dict.""" try: if asyncio.iscoroutinefunction(coro_or_callable): return await coro_or_callable() # Support lambdas that return coroutines result = coro_or_callable() if asyncio.iscoroutine(result): return await result return result except Exception as exc: logger.exception("Failed to fetch %s data for dashboard", label) return {"error": True, "message": str(exc)}