feat: add MQTT integration for real-time entity updates
- aiomqtt async client with auto-reconnect and topic store
- MQTT router: GET /api/mqtt, GET /api/mqtt/topic/{path}, POST /api/mqtt/publish
- MQTT entities included in /api/all + WebSocket broadcast
- MqttCard frontend component with category filters, entity list
- Configurable via ENV: MQTT_HOST, MQTT_PORT, MQTT_USERNAME,
MQTT_PASSWORD, MQTT_TOPICS (comma-separated or JSON array)
- Gracefully disabled when MQTT_HOST is not set
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
9f7330e217
commit
89ed0c6d0a
11 changed files with 542 additions and 1 deletions
222
server/services/mqtt_service.py
Normal file
222
server/services/mqtt_service.py
Normal file
|
|
@ -0,0 +1,222 @@
|
|||
"""MQTT client service for real-time entity updates.
|
||||
|
||||
Connects to an MQTT broker, subscribes to configured topics, and stores
|
||||
the latest messages per topic. The dashboard WebSocket pushes MQTT
|
||||
state changes to connected clients.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Callable, Dict, List, Optional
|
||||
|
||||
import aiomqtt
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MqttMessage:
|
||||
"""Single retained MQTT message."""
|
||||
|
||||
topic: str
|
||||
payload: Any
|
||||
timestamp: float = field(default_factory=time.time)
|
||||
|
||||
|
||||
class MqttService:
|
||||
"""Async MQTT client that maintains a live topic→value store."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._store: Dict[str, MqttMessage] = {}
|
||||
self._client: Optional[aiomqtt.Client] = None
|
||||
self._task: Optional[asyncio.Task] = None
|
||||
self._listeners: List[Callable[[MqttMessage], Any]] = []
|
||||
self._connected = False
|
||||
self._config: Dict[str, Any] = {}
|
||||
|
||||
@property
|
||||
def connected(self) -> bool:
|
||||
return self._connected
|
||||
|
||||
@property
|
||||
def store(self) -> Dict[str, MqttMessage]:
|
||||
return self._store
|
||||
|
||||
def on_message(self, callback: Callable[[MqttMessage], Any]) -> None:
|
||||
"""Register a callback fired on every incoming MQTT message."""
|
||||
self._listeners.append(callback)
|
||||
|
||||
async def start(
|
||||
self,
|
||||
host: str,
|
||||
port: int = 1883,
|
||||
username: Optional[str] = None,
|
||||
password: Optional[str] = None,
|
||||
topics: Optional[List[str]] = None,
|
||||
client_id: str = "daily-briefing",
|
||||
) -> None:
|
||||
"""Connect to the broker and start listening in the background."""
|
||||
if not host:
|
||||
logger.info("MQTT disabled — no MQTT_HOST configured")
|
||||
return
|
||||
|
||||
self._config = dict(
|
||||
host=host,
|
||||
port=port,
|
||||
username=username,
|
||||
password=password,
|
||||
topics=topics or ["#"],
|
||||
client_id=client_id,
|
||||
)
|
||||
self._task = asyncio.create_task(self._run_loop())
|
||||
logger.info("MQTT background task started (broker %s:%d)", host, port)
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Disconnect and cancel the background task."""
|
||||
if self._task:
|
||||
self._task.cancel()
|
||||
try:
|
||||
await self._task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._task = None
|
||||
self._connected = False
|
||||
logger.info("MQTT service stopped")
|
||||
|
||||
async def publish(
|
||||
self,
|
||||
topic: str,
|
||||
payload: Any,
|
||||
retain: bool = False,
|
||||
) -> None:
|
||||
"""Publish a message to the broker (e.g. for controlling devices)."""
|
||||
if not self._client or not self._connected:
|
||||
raise RuntimeError("MQTT not connected")
|
||||
msg = json.dumps(payload) if not isinstance(payload, (str, bytes)) else payload
|
||||
await self._client.publish(topic, msg, retain=retain)
|
||||
logger.debug("MQTT published → %s", topic)
|
||||
|
||||
def get_state(self) -> Dict[str, Any]:
|
||||
"""Return all stored MQTT states for the dashboard API."""
|
||||
result: Dict[str, Any] = {}
|
||||
for topic, msg in self._store.items():
|
||||
result[topic] = {
|
||||
"value": msg.payload,
|
||||
"timestamp": msg.timestamp,
|
||||
}
|
||||
return result
|
||||
|
||||
def get_entities(self) -> List[Dict[str, Any]]:
|
||||
"""Return a flat list of MQTT entities grouped for the frontend."""
|
||||
entities: List[Dict[str, Any]] = []
|
||||
for topic, msg in self._store.items():
|
||||
entities.append({
|
||||
"topic": topic,
|
||||
"value": msg.payload,
|
||||
"timestamp": msg.timestamp,
|
||||
"name": _topic_to_name(topic),
|
||||
"category": _topic_to_category(topic),
|
||||
})
|
||||
return entities
|
||||
|
||||
# -- internal ---
|
||||
|
||||
async def _run_loop(self) -> None:
|
||||
"""Reconnecting event loop."""
|
||||
cfg = self._config
|
||||
backoff = 1
|
||||
|
||||
while True:
|
||||
try:
|
||||
async with aiomqtt.Client(
|
||||
hostname=cfg["host"],
|
||||
port=cfg["port"],
|
||||
username=cfg.get("username"),
|
||||
password=cfg.get("password"),
|
||||
identifier=cfg["client_id"],
|
||||
) as client:
|
||||
self._client = client
|
||||
self._connected = True
|
||||
backoff = 1
|
||||
logger.info(
|
||||
"MQTT connected to %s:%d — subscribing to %d topic(s)",
|
||||
cfg["host"],
|
||||
cfg["port"],
|
||||
len(cfg["topics"]),
|
||||
)
|
||||
|
||||
for t in cfg["topics"]:
|
||||
await client.subscribe(t)
|
||||
|
||||
async for message in client.messages:
|
||||
await self._handle_message(message)
|
||||
|
||||
except aiomqtt.MqttError as exc:
|
||||
self._connected = False
|
||||
self._client = None
|
||||
logger.warning(
|
||||
"MQTT connection lost (%s) — reconnecting in %ds",
|
||||
exc,
|
||||
backoff,
|
||||
)
|
||||
await asyncio.sleep(backoff)
|
||||
backoff = min(backoff * 2, 60)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
self._connected = False
|
||||
self._client = None
|
||||
break
|
||||
|
||||
except Exception:
|
||||
self._connected = False
|
||||
self._client = None
|
||||
logger.exception("Unexpected MQTT error — reconnecting in %ds", backoff)
|
||||
await asyncio.sleep(backoff)
|
||||
backoff = min(backoff * 2, 60)
|
||||
|
||||
async def _handle_message(self, message: aiomqtt.Message) -> None:
|
||||
topic = str(message.topic)
|
||||
raw = message.payload
|
||||
|
||||
# Try JSON decode
|
||||
if isinstance(raw, (bytes, bytearray)):
|
||||
raw = raw.decode("utf-8", errors="replace")
|
||||
try:
|
||||
payload = json.loads(raw)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
payload = raw
|
||||
|
||||
msg = MqttMessage(topic=topic, payload=payload)
|
||||
self._store[topic] = msg
|
||||
|
||||
# Notify listeners
|
||||
for cb in self._listeners:
|
||||
try:
|
||||
result = cb(msg)
|
||||
if asyncio.iscoroutine(result):
|
||||
await result
|
||||
except Exception:
|
||||
logger.exception("MQTT listener error on topic %s", topic)
|
||||
|
||||
|
||||
def _topic_to_name(topic: str) -> str:
|
||||
"""Derive a human-readable name from the MQTT topic."""
|
||||
parts = topic.rstrip("/").split("/")
|
||||
return parts[-1].replace("_", " ").replace("-", " ").title() if parts else topic
|
||||
|
||||
|
||||
def _topic_to_category(topic: str) -> str:
|
||||
"""Derive a category from the first topic segment."""
|
||||
parts = topic.strip("/").split("/")
|
||||
if len(parts) >= 2:
|
||||
return parts[0]
|
||||
return "other"
|
||||
|
||||
|
||||
# Singleton
|
||||
mqtt_service = MqttService()
|
||||
Loading…
Add table
Add a link
Reference in a new issue