From 89ed0c6d0a1ca9d06e976dfc042937b1fa3e2c4f Mon Sep 17 00:00:00 2001 From: Sam Date: Mon, 2 Mar 2026 10:13:50 +0100 Subject: [PATCH] feat: add MQTT integration for real-time entity updates - aiomqtt async client with auto-reconnect and topic store - MQTT router: GET /api/mqtt, GET /api/mqtt/topic/{path}, POST /api/mqtt/publish - MQTT entities included in /api/all + WebSocket broadcast - MqttCard frontend component with category filters, entity list - Configurable via ENV: MQTT_HOST, MQTT_PORT, MQTT_USERNAME, MQTT_PASSWORD, MQTT_TOPICS (comma-separated or JSON array) - Gracefully disabled when MQTT_HOST is not set Co-Authored-By: Claude Opus 4.6 --- docker-compose.yml | 6 + requirements.txt | 1 + server/config.py | 22 ++++ server/main.py | 22 +++- server/routers/dashboard.py | 5 + server/routers/mqtt.py | 54 ++++++++ server/services/mqtt_service.py | 222 ++++++++++++++++++++++++++++++++ web/src/App.tsx | 10 ++ web/src/api.ts | 15 +++ web/src/components/MqttCard.tsx | 175 +++++++++++++++++++++++++ web/src/mockData.ts | 11 ++ 11 files changed, 542 insertions(+), 1 deletion(-) create mode 100644 server/routers/mqtt.py create mode 100644 server/services/mqtt_service.py create mode 100644 web/src/components/MqttCard.tsx diff --git a/docker-compose.yml b/docker-compose.yml index 772f881..93b3b55 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -22,6 +22,12 @@ services: - VIKUNJA_TOKEN=${VIKUNJA_TOKEN} # Unraid Servers (JSON array) - UNRAID_SERVERS=${UNRAID_SERVERS:-[]} + # MQTT (optional) + - MQTT_HOST=${MQTT_HOST:-} + - MQTT_PORT=${MQTT_PORT:-1883} + - MQTT_USERNAME=${MQTT_USERNAME:-} + - MQTT_PASSWORD=${MQTT_PASSWORD:-} + - MQTT_TOPICS=${MQTT_TOPICS:-#} extra_hosts: - "host.docker.internal:host-gateway" restart: always diff --git a/requirements.txt b/requirements.txt index 5148df0..cb95e39 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ httpx==0.28.1 asyncpg==0.30.0 jinja2==3.1.5 websockets==14.2 +aiomqtt==2.3.0 diff --git a/server/config.py b/server/config.py index d316173..8a5a46d 100644 --- a/server/config.py +++ b/server/config.py @@ -48,6 +48,14 @@ class Settings: news_cache_ttl: int = 300 # 5 min news_max_age_hours: int = 48 + # --- MQTT --- + mqtt_host: str = "" + mqtt_port: int = 1883 + mqtt_username: str = "" + mqtt_password: str = "" + mqtt_topics: List[str] = field(default_factory=lambda: ["#"]) + mqtt_client_id: str = "daily-briefing" + # --- Server --- host: str = "0.0.0.0" port: int = 8080 @@ -73,6 +81,20 @@ class Settings: s.vikunja_url = os.getenv("VIKUNJA_URL", s.vikunja_url) s.vikunja_token = os.getenv("VIKUNJA_TOKEN", s.vikunja_token) + s.mqtt_host = os.getenv("MQTT_HOST", s.mqtt_host) + s.mqtt_port = int(os.getenv("MQTT_PORT", str(s.mqtt_port))) + s.mqtt_username = os.getenv("MQTT_USERNAME", s.mqtt_username) + s.mqtt_password = os.getenv("MQTT_PASSWORD", s.mqtt_password) + s.mqtt_client_id = os.getenv("MQTT_CLIENT_ID", s.mqtt_client_id) + + # Parse MQTT_TOPICS (comma-separated or JSON array) + raw_topics = os.getenv("MQTT_TOPICS", "") + if raw_topics: + try: + s.mqtt_topics = json.loads(raw_topics) + except (json.JSONDecodeError, TypeError): + s.mqtt_topics = [t.strip() for t in raw_topics.split(",") if t.strip()] + s.debug = os.getenv("DEBUG", "").lower() in ("1", "true", "yes") # Parse UNRAID_SERVERS JSON diff --git a/server/main.py b/server/main.py index df49131..457c332 100644 --- a/server/main.py +++ b/server/main.py @@ -12,6 +12,7 @@ from fastapi.staticfiles import StaticFiles from server.config import settings from server.services import news_service +from server.services.mqtt_service import mqtt_service logger = logging.getLogger("daily-briefing") logging.basicConfig( @@ -42,10 +43,28 @@ async def lifespan(app: FastAPI): except Exception: logger.exception("Failed to initialize database pool — news will be unavailable") + # Start MQTT service + if settings.mqtt_host: + try: + await mqtt_service.start( + host=settings.mqtt_host, + port=settings.mqtt_port, + username=settings.mqtt_username or None, + password=settings.mqtt_password or None, + topics=settings.mqtt_topics, + client_id=settings.mqtt_client_id, + ) + logger.info("MQTT service started (broker %s:%d)", settings.mqtt_host, settings.mqtt_port) + except Exception: + logger.exception("Failed to start MQTT service — MQTT will be unavailable") + else: + logger.info("MQTT disabled — set MQTT_HOST to enable") + yield # Shutdown logger.info("Shutting down...") + await mqtt_service.stop() await news_service.close_pool() @@ -65,13 +84,14 @@ app.add_middleware( ) # --- Register Routers --- -from server.routers import dashboard, homeassistant, news, servers, tasks, weather # noqa: E402 +from server.routers import dashboard, homeassistant, mqtt, news, servers, tasks, weather # noqa: E402 app.include_router(weather.router) app.include_router(news.router) app.include_router(servers.router) app.include_router(homeassistant.router) app.include_router(tasks.router) +app.include_router(mqtt.router) app.include_router(dashboard.router) # --- Serve static frontend (production) --- diff --git a/server/routers/dashboard.py b/server/routers/dashboard.py index c10f9ce..5a872ce 100644 --- a/server/routers/dashboard.py +++ b/server/routers/dashboard.py @@ -15,6 +15,7 @@ from server.routers.news import get_news_articles from server.routers.servers import get_servers from server.routers.tasks import get_tasks from server.routers.weather import get_weather +from server.services.mqtt_service import mqtt_service logger = logging.getLogger(__name__) @@ -58,6 +59,10 @@ async def get_all() -> Dict[str, Any]: "servers": servers_data, "ha": ha_data, "tasks": tasks_data, + "mqtt": { + "connected": mqtt_service.connected, + "entities": mqtt_service.get_entities(), + }, "timestamp": datetime.now(timezone.utc).isoformat(), } diff --git a/server/routers/mqtt.py b/server/routers/mqtt.py new file mode 100644 index 0000000..573ab77 --- /dev/null +++ b/server/routers/mqtt.py @@ -0,0 +1,54 @@ +"""MQTT router — exposes MQTT entity state and publish endpoint.""" + +from __future__ import annotations + +import logging +from typing import Any, Dict, Optional + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +from server.services.mqtt_service import mqtt_service + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/mqtt", tags=["mqtt"]) + + +class PublishRequest(BaseModel): + topic: str + payload: Any + retain: bool = False + + +@router.get("") +async def get_mqtt_state() -> Dict[str, Any]: + """Return all stored MQTT entities.""" + return { + "connected": mqtt_service.connected, + "entities": mqtt_service.get_entities(), + "topic_count": len(mqtt_service.store), + } + + +@router.get("/topic/{topic:path}") +async def get_mqtt_topic(topic: str) -> Dict[str, Any]: + """Return value for a specific MQTT topic.""" + msg = mqtt_service.store.get(topic) + if not msg: + raise HTTPException(status_code=404, detail=f"Topic '{topic}' not found") + return { + "topic": msg.topic, + "value": msg.payload, + "timestamp": msg.timestamp, + } + + +@router.post("/publish") +async def publish_mqtt(req: PublishRequest) -> Dict[str, str]: + """Publish a message to the MQTT broker.""" + try: + await mqtt_service.publish(req.topic, req.payload, retain=req.retain) + return {"status": "ok", "topic": req.topic} + except RuntimeError as exc: + raise HTTPException(status_code=503, detail=str(exc)) diff --git a/server/services/mqtt_service.py b/server/services/mqtt_service.py new file mode 100644 index 0000000..a711165 --- /dev/null +++ b/server/services/mqtt_service.py @@ -0,0 +1,222 @@ +"""MQTT client service for real-time entity updates. + +Connects to an MQTT broker, subscribes to configured topics, and stores +the latest messages per topic. The dashboard WebSocket pushes MQTT +state changes to connected clients. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import time +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional + +import aiomqtt + +logger = logging.getLogger(__name__) + + +@dataclass +class MqttMessage: + """Single retained MQTT message.""" + + topic: str + payload: Any + timestamp: float = field(default_factory=time.time) + + +class MqttService: + """Async MQTT client that maintains a live topic→value store.""" + + def __init__(self) -> None: + self._store: Dict[str, MqttMessage] = {} + self._client: Optional[aiomqtt.Client] = None + self._task: Optional[asyncio.Task] = None + self._listeners: List[Callable[[MqttMessage], Any]] = [] + self._connected = False + self._config: Dict[str, Any] = {} + + @property + def connected(self) -> bool: + return self._connected + + @property + def store(self) -> Dict[str, MqttMessage]: + return self._store + + def on_message(self, callback: Callable[[MqttMessage], Any]) -> None: + """Register a callback fired on every incoming MQTT message.""" + self._listeners.append(callback) + + async def start( + self, + host: str, + port: int = 1883, + username: Optional[str] = None, + password: Optional[str] = None, + topics: Optional[List[str]] = None, + client_id: str = "daily-briefing", + ) -> None: + """Connect to the broker and start listening in the background.""" + if not host: + logger.info("MQTT disabled — no MQTT_HOST configured") + return + + self._config = dict( + host=host, + port=port, + username=username, + password=password, + topics=topics or ["#"], + client_id=client_id, + ) + self._task = asyncio.create_task(self._run_loop()) + logger.info("MQTT background task started (broker %s:%d)", host, port) + + async def stop(self) -> None: + """Disconnect and cancel the background task.""" + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None + self._connected = False + logger.info("MQTT service stopped") + + async def publish( + self, + topic: str, + payload: Any, + retain: bool = False, + ) -> None: + """Publish a message to the broker (e.g. for controlling devices).""" + if not self._client or not self._connected: + raise RuntimeError("MQTT not connected") + msg = json.dumps(payload) if not isinstance(payload, (str, bytes)) else payload + await self._client.publish(topic, msg, retain=retain) + logger.debug("MQTT published → %s", topic) + + def get_state(self) -> Dict[str, Any]: + """Return all stored MQTT states for the dashboard API.""" + result: Dict[str, Any] = {} + for topic, msg in self._store.items(): + result[topic] = { + "value": msg.payload, + "timestamp": msg.timestamp, + } + return result + + def get_entities(self) -> List[Dict[str, Any]]: + """Return a flat list of MQTT entities grouped for the frontend.""" + entities: List[Dict[str, Any]] = [] + for topic, msg in self._store.items(): + entities.append({ + "topic": topic, + "value": msg.payload, + "timestamp": msg.timestamp, + "name": _topic_to_name(topic), + "category": _topic_to_category(topic), + }) + return entities + + # -- internal --- + + async def _run_loop(self) -> None: + """Reconnecting event loop.""" + cfg = self._config + backoff = 1 + + while True: + try: + async with aiomqtt.Client( + hostname=cfg["host"], + port=cfg["port"], + username=cfg.get("username"), + password=cfg.get("password"), + identifier=cfg["client_id"], + ) as client: + self._client = client + self._connected = True + backoff = 1 + logger.info( + "MQTT connected to %s:%d — subscribing to %d topic(s)", + cfg["host"], + cfg["port"], + len(cfg["topics"]), + ) + + for t in cfg["topics"]: + await client.subscribe(t) + + async for message in client.messages: + await self._handle_message(message) + + except aiomqtt.MqttError as exc: + self._connected = False + self._client = None + logger.warning( + "MQTT connection lost (%s) — reconnecting in %ds", + exc, + backoff, + ) + await asyncio.sleep(backoff) + backoff = min(backoff * 2, 60) + + except asyncio.CancelledError: + self._connected = False + self._client = None + break + + except Exception: + self._connected = False + self._client = None + logger.exception("Unexpected MQTT error — reconnecting in %ds", backoff) + await asyncio.sleep(backoff) + backoff = min(backoff * 2, 60) + + async def _handle_message(self, message: aiomqtt.Message) -> None: + topic = str(message.topic) + raw = message.payload + + # Try JSON decode + if isinstance(raw, (bytes, bytearray)): + raw = raw.decode("utf-8", errors="replace") + try: + payload = json.loads(raw) + except (json.JSONDecodeError, TypeError): + payload = raw + + msg = MqttMessage(topic=topic, payload=payload) + self._store[topic] = msg + + # Notify listeners + for cb in self._listeners: + try: + result = cb(msg) + if asyncio.iscoroutine(result): + await result + except Exception: + logger.exception("MQTT listener error on topic %s", topic) + + +def _topic_to_name(topic: str) -> str: + """Derive a human-readable name from the MQTT topic.""" + parts = topic.rstrip("/").split("/") + return parts[-1].replace("_", " ").replace("-", " ").title() if parts else topic + + +def _topic_to_category(topic: str) -> str: + """Derive a category from the first topic segment.""" + parts = topic.strip("/").split("/") + if len(parts) >= 2: + return parts[0] + return "other" + + +# Singleton +mqtt_service = MqttService() diff --git a/web/src/App.tsx b/web/src/App.tsx index f08fc85..2576538 100644 --- a/web/src/App.tsx +++ b/web/src/App.tsx @@ -6,6 +6,7 @@ import NewsGrid from "./components/NewsGrid"; import ServerCard from "./components/ServerCard"; import HomeAssistant from "./components/HomeAssistant"; import TasksCard from "./components/TasksCard"; +import MqttCard from "./components/MqttCard"; import { RefreshCw, Wifi, WifiOff, AlertTriangle } from "lucide-react"; export default function App() { @@ -77,6 +78,15 @@ export default function App() { + {/* Row 2.5: MQTT (only show if connected or has entities) */} + {(data.mqtt?.connected || (data.mqtt?.entities?.length ?? 0) > 0) && ( +
+
+ +
+
+ )} + {/* Row 3: News (full width) */}
diff --git a/web/src/api.ts b/web/src/api.ts index d9be6cb..c8f3514 100644 --- a/web/src/api.ts +++ b/web/src/api.ts @@ -97,12 +97,26 @@ export interface TasksResponse { error?: boolean; } +export interface MqttEntity { + topic: string; + value: any; + timestamp: number; + name: string; + category: string; +} + +export interface MqttData { + connected: boolean; + entities: MqttEntity[]; +} + export interface DashboardData { weather: WeatherResponse; news: NewsResponse; servers: ServersResponse; ha: HAData; tasks: TasksResponse; + mqtt: MqttData; timestamp: string; } @@ -117,4 +131,5 @@ export const fetchNews = (limit = 20, offset = 0, category?: string) => { export const fetchServers = () => fetchJSON("/servers"); export const fetchHA = () => fetchJSON("/ha"); export const fetchTasks = () => fetchJSON("/tasks"); +export const fetchMqtt = () => fetchJSON("/mqtt"); export const fetchAll = () => fetchJSON("/all"); diff --git a/web/src/components/MqttCard.tsx b/web/src/components/MqttCard.tsx new file mode 100644 index 0000000..f2146ac --- /dev/null +++ b/web/src/components/MqttCard.tsx @@ -0,0 +1,175 @@ +import { useState, useMemo } from "react"; +import { Radio, ChevronDown, ChevronUp, Zap, ZapOff } from "lucide-react"; +import type { MqttData, MqttEntity } from "../api"; + +interface Props { + data: MqttData; +} + +export default function MqttCard({ data }: Props) { + const [expanded, setExpanded] = useState(false); + const [filter, setFilter] = useState(null); + + // Group entities by category + const grouped = useMemo(() => { + const map: Record = {}; + for (const e of data.entities) { + const cat = e.category || "other"; + if (!map[cat]) map[cat] = []; + map[cat].push(e); + } + return map; + }, [data.entities]); + + const categories = Object.keys(grouped).sort(); + const filtered = filter ? grouped[filter] || [] : data.entities; + const shown = expanded ? filtered : filtered.slice(0, 8); + + return ( +
+ {/* Header */} +
+
+ +

MQTT

+ + {data.entities.length} Entit{data.entities.length === 1 ? "ät" : "äten"} + +
+
+ {data.connected ? ( + + ) : ( + + )} + + {data.connected ? "Verbunden" : "Getrennt"} + +
+
+ + {/* Category filter tabs */} + {categories.length > 1 && ( +
+ + {categories.map((cat) => ( + + ))} +
+ )} + + {/* Entity list */} + {data.entities.length === 0 ? ( +
+ +

+ {data.connected + ? "Warte auf MQTT-Nachrichten..." + : "MQTT nicht konfiguriert"} +

+
+ ) : ( +
+ {shown.map((entity) => ( + + ))} +
+ )} + + {/* Expand/collapse */} + {filtered.length > 8 && ( + + )} +
+ ); +} + +function EntityRow({ entity }: { entity: MqttEntity }) { + const age = Math.round((Date.now() / 1000 - entity.timestamp)); + const ageStr = + age < 60 + ? `${age}s` + : age < 3600 + ? `${Math.floor(age / 60)}m` + : `${Math.floor(age / 3600)}h`; + + // Format value for display + const displayValue = formatValue(entity.value); + const isNumeric = typeof entity.value === "number"; + + return ( +
+
+

{entity.name}

+

{entity.topic}

+
+
+ + {displayValue} + + {ageStr} +
+
+ ); +} + +function formatValue(value: any): string { + if (value === null || value === undefined) return "—"; + if (typeof value === "boolean") return value ? "ON" : "OFF"; + if (typeof value === "number") { + return Number.isInteger(value) ? value.toString() : value.toFixed(1); + } + if (typeof value === "object") { + return JSON.stringify(value).slice(0, 40); + } + const str = String(value); + // Common HA/MQTT states → nicer display + if (str === "on" || str === "ON") return "ON"; + if (str === "off" || str === "OFF") return "OFF"; + if (str === "online") return "Online"; + if (str === "offline") return "Offline"; + return str.length > 30 ? str.slice(0, 30) + "…" : str; +} diff --git a/web/src/mockData.ts b/web/src/mockData.ts index 6d03263..8478b19 100644 --- a/web/src/mockData.ts +++ b/web/src/mockData.ts @@ -146,6 +146,17 @@ export const MOCK_DATA: DashboardData = { lights_on: 3, lights_total: 8, }, + mqtt: { + connected: true, + entities: [ + { topic: "unraid/daddelolymp/cpu_temp", value: 52, timestamp: Date.now() / 1000 - 5, name: "Cpu Temp", category: "unraid" }, + { topic: "unraid/daddelolymp/array_status", value: "normal", timestamp: Date.now() / 1000 - 12, name: "Array Status", category: "unraid" }, + { topic: "unraid/moneyboy/cpu_temp", value: 41, timestamp: Date.now() / 1000 - 8, name: "Cpu Temp", category: "unraid" }, + { topic: "zigbee2mqtt/wohnzimmer/temperature", value: 21.5, timestamp: Date.now() / 1000 - 30, name: "Temperature", category: "zigbee2mqtt" }, + { topic: "zigbee2mqtt/wohnzimmer/humidity", value: 48, timestamp: Date.now() / 1000 - 30, name: "Humidity", category: "zigbee2mqtt" }, + { topic: "frigate/events/front_door", value: "person", timestamp: Date.now() / 1000 - 120, name: "Front Door", category: "frigate" }, + ], + }, tasks: { private: { open: [