daily-briefing/server/routers/dashboard.py

136 lines
4.5 KiB
Python
Raw Permalink Normal View History

"""Dashboard aggregate router -- combined endpoint and WebSocket push."""
from __future__ import annotations
import asyncio
import json
import logging
from datetime import datetime, timezone
from typing import Any, Dict, List
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from server.routers.homeassistant import get_ha
from server.routers.news import get_news_articles
from server.routers.servers import get_servers
from server.routers.tasks import get_tasks
from server.routers.weather import get_weather
from server.services.mqtt_service import mqtt_service
logger = logging.getLogger(__name__)
router = APIRouter(tags=["dashboard"])
# Connected WebSocket clients
clients: List[WebSocket] = []
@router.get("/api/all")
async def get_all() -> Dict[str, Any]:
"""Fetch every data source in parallel and return a single combined dict.
Response shape::
{
"weather": { ... },
"news": { ... },
"servers": { ... },
"ha": { ... },
"tasks": { ... },
"timestamp": "ISO-8601 string"
}
Individual sections that fail will contain ``{"error": true, "message": "..."}``.
"""
results = await asyncio.gather(
_safe(get_weather, "weather"),
_safe(lambda: get_news_articles(limit=20, offset=0, category=None), "news"),
_safe(get_servers, "servers"),
_safe(get_ha, "ha"),
_safe(get_tasks, "tasks"),
)
weather_data, news_data, servers_data, ha_data, tasks_data = results
# Log a concise summary of what worked and what failed
sections = {"weather": weather_data, "news": news_data, "servers": servers_data,
"ha": ha_data, "tasks": tasks_data}
errors = [k for k, v in sections.items() if isinstance(v, dict) and v.get("error")]
if errors:
logger.warning("[DASHBOARD] Sections with errors: %s", ", ".join(errors))
return {
"weather": weather_data,
"news": news_data,
"servers": servers_data,
"ha": ha_data,
"tasks": tasks_data,
"mqtt": {
"connected": mqtt_service.connected,
"entities": mqtt_service.get_entities(),
},
"timestamp": datetime.now(timezone.utc).isoformat(),
}
@router.websocket("/ws")
async def ws_endpoint(ws: WebSocket) -> None:
"""WebSocket that pushes fresh dashboard data on every client ping.
The client should send periodic text messages (e.g. ``"ping"``) to request
an update. If no message arrives within 20 seconds the server sends a
refresh anyway, keeping the connection alive.
"""
await ws.accept()
clients.append(ws)
logger.info("WebSocket client connected (%d total)", len(clients))
try:
while True:
# Wait for a client ping / keepalive; refresh on timeout too.
try:
_msg = await asyncio.wait_for(ws.receive_text(), timeout=12.0)
except asyncio.TimeoutError:
pass
# Build and push the latest data
try:
data = await get_all()
await ws.send_json(data)
except Exception as exc:
logger.exception("Error sending WebSocket payload")
# Try to send a lightweight error frame; if that also fails the
# outer handler will close the connection.
try:
await ws.send_json({"error": True, "message": str(exc)})
except Exception:
break
except WebSocketDisconnect:
logger.info("WebSocket client disconnected")
except Exception as exc:
logger.exception("Unexpected WebSocket error")
finally:
if ws in clients:
clients.remove(ws)
logger.info("WebSocket clients remaining: %d", len(clients))
# -- internal helpers ---------------------------------------------------------
async def _safe(coro_or_callable: Any, label: str) -> Dict[str, Any]:
"""Call an async function and return its result, or an error dict."""
try:
if asyncio.iscoroutinefunction(coro_or_callable):
return await coro_or_callable()
# Support lambdas that return coroutines
result = coro_or_callable()
if asyncio.iscoroutine(result):
return await result
return result
except Exception as exc:
logger.exception("Failed to fetch %s data for dashboard", label)
return {"error": True, "message": str(exc)}