From c6db0ab5699e834547b87260f0c17a57ebeef72d Mon Sep 17 00:00:00 2001 From: Sam Date: Mon, 2 Mar 2026 23:25:57 +0100 Subject: [PATCH] refactor: replace GraphQL/REST with MQTT-only for Unraid server data All server stats (CPU, RAM, Docker, shares, disks, array) now come directly from MQTT topics published by the Unraid MQTT Agent. This eliminates the need for API keys, HTTP polling, and the GraphQL/REST fallback chain. - Rewrote unraid_service.py to read from MQTT store (no httpx needed) - Simplified servers router (no cache, no enrichment hack) - Added mqtt_prefix field to UnraidServer config - Updated DB: both Daddelolymp and Adriahub with mqtt_prefix, no api_key - Data is always fresh (MQTT pushes every ~15s) Co-Authored-By: Claude Opus 4.6 --- server/config.py | 13 +- server/routers/servers.py | 132 +------- server/services/unraid_service.py | 525 ++++++++++-------------------- 3 files changed, 201 insertions(+), 469 deletions(-) diff --git a/server/config.py b/server/config.py index b2cdd74..cb388f9 100644 --- a/server/config.py +++ b/server/config.py @@ -19,9 +19,10 @@ logger = logging.getLogger(__name__) @dataclass class UnraidServer: name: str - host: str - api_key: str = "" - port: int = 80 + host: str = "" + mqtt_prefix: str = "" # MQTT topic prefix, e.g. "Adriahub" or "unraid-daddelolymp" + api_key: str = "" # Deprecated — kept for backward compat + port: int = 80 # Deprecated — kept for backward compat @dataclass @@ -122,11 +123,12 @@ class Settings: UnraidServer( name=srv.get("name", f"Server {i+1}"), host=srv.get("host", ""), + mqtt_prefix=srv.get("mqtt_prefix", ""), api_key=srv.get("api_key", ""), port=int(srv.get("port", 80)), ) for i, srv in enumerate(servers_data) - if srv.get("host") + if srv.get("name") or srv.get("host") ] s.unraid_enabled = len(s.unraid_servers) > 0 except (json.JSONDecodeError, TypeError): @@ -172,11 +174,12 @@ class Settings: UnraidServer( name=s.get("name", ""), host=s.get("host", ""), + mqtt_prefix=s.get("mqtt_prefix", ""), api_key=s.get("api_key", ""), port=int(s.get("port", 80)), ) for s in servers - if s.get("host") + if s.get("name") or s.get("host") ] self.unraid_enabled = enabled diff --git a/server/routers/servers.py b/server/routers/servers.py index 3ecea6c..138cc6d 100644 --- a/server/routers/servers.py +++ b/server/routers/servers.py @@ -1,4 +1,4 @@ -"""Unraid servers status router.""" +"""Unraid servers status router — MQTT-only data source.""" from __future__ import annotations @@ -7,151 +7,45 @@ from typing import Any, Dict, List from fastapi import APIRouter -from server.cache import cache from server.config import get_settings from server.services.mqtt_service import mqtt_service -from server.services.unraid_service import ServerConfig, fetch_all_servers +from server.services.unraid_service import ServerConfig, fetch_all_servers_mqtt logger = logging.getLogger(__name__) router = APIRouter(prefix="/api", tags=["servers"]) -CACHE_KEY = "servers" - - -# --------------------------------------------------------------------------- -# MQTT enrichment — overlay live system metrics from MQTT topics -# --------------------------------------------------------------------------- - -def _enrich_from_mqtt(servers: List[Dict[str, Any]]) -> None: - """Merge live CPU/RAM data from MQTT ``/system`` topics. - - The Unraid MQTT Agent plugin publishes JSON payloads to topics like - ``unraid-daddelolymp/system`` or ``Adriahub/system`` every ~15 s. - These contain live ``cpu_usage_percent``, ``ram_usage_percent``, etc. - that the GraphQL API does not expose. - """ - - store = mqtt_service.store - - for srv in servers: - name = srv.get("name", "") - if not name: - continue - - # Try common topic patterns - system_data: Dict[str, Any] | None = None - for pattern in ( - f"{name}/system", # "Adriahub/system" - f"unraid-{name.lower()}/system", # "unraid-daddelolymp/system" - f"unraid-{name}/system", # "unraid-Daddelolymp/system" - ): - msg = store.get(pattern) - if msg is not None and isinstance(msg.payload, dict): - system_data = msg.payload - break - - if not system_data: - continue - - # --- CPU --- - cpu_pct = system_data.get("cpu_usage_percent") - if cpu_pct is not None: - srv["cpu"]["usage_pct"] = round(float(cpu_pct), 1) - - cpu_model = system_data.get("cpu_model") - if cpu_model: - srv["cpu"]["brand"] = cpu_model - - cpu_temp = system_data.get("cpu_temp_celsius") - if cpu_temp is not None: - srv["cpu"]["temp_c"] = cpu_temp - - cores = system_data.get("cpu_cores") - if cores: - srv["cpu"]["cores"] = cores - - threads = system_data.get("cpu_threads") - if threads: - srv["cpu"]["threads"] = threads - - # --- RAM --- - ram_pct = system_data.get("ram_usage_percent") - if ram_pct is not None: - srv["ram"]["pct"] = round(float(ram_pct), 1) - - ram_total = system_data.get("ram_total_bytes") - if ram_total: - srv["ram"]["total_gb"] = round(ram_total / (1024 ** 3), 1) - - ram_used = system_data.get("ram_used_bytes") - if ram_used: - srv["ram"]["used_gb"] = round(ram_used / (1024 ** 3), 1) - - # --- Uptime --- - uptime_secs = system_data.get("uptime_seconds") - if uptime_secs: - days = uptime_secs // 86400 - hours = (uptime_secs % 86400) // 3600 - srv["uptime"] = f"{days}d {hours}h" - - srv["online"] = True - - logger.debug( - "[UNRAID] %s: MQTT enriched — CPU %.1f%% %.0f°C, RAM %.1f%%", - name, - srv["cpu"].get("usage_pct", 0), - srv["cpu"].get("temp_c", 0) or 0, - srv["ram"].get("pct", 0), - ) - @router.get("/servers") async def get_servers() -> Dict[str, Any]: """Return status information for all configured Unraid servers. - Response shape:: - - { - "servers": [ ... server dicts ... ] - } + All data comes from the MQTT message store — no HTTP polling, + no API keys, no cache needed (MQTT data is always fresh). """ - # --- cache hit? ----------------------------------------------------------- - cached = await cache.get(CACHE_KEY) - if cached is not None: - # Always overlay fresh MQTT data even on cache hits - _enrich_from_mqtt(cached.get("servers", [])) - return cached + settings = get_settings() - # --- cache miss ----------------------------------------------------------- server_configs: List[ServerConfig] = [ ServerConfig( name=srv.name, host=srv.host, - api_key=srv.api_key, - port=srv.port, + mqtt_prefix=getattr(srv, "mqtt_prefix", "") or srv.name, ) - for srv in get_settings().unraid_servers + for srv in settings.unraid_servers ] - servers_data: List[Dict[str, Any]] = [] + if not server_configs: + return {"servers": []} + try: - servers_data = await fetch_all_servers(server_configs) + servers_data = fetch_all_servers_mqtt(server_configs, mqtt_service.store) except Exception as exc: - logger.exception("Failed to fetch Unraid server data") + logger.exception("Failed to read Unraid server data from MQTT") return { "servers": [], "error": True, "message": str(exc), } - # Overlay live MQTT system metrics - _enrich_from_mqtt(servers_data) - - payload: Dict[str, Any] = { - "servers": servers_data, - } - - await cache.set(CACHE_KEY, payload, get_settings().unraid_cache_ttl) - return payload + return {"servers": servers_data} diff --git a/server/services/unraid_service.py b/server/services/unraid_service.py index 85ebe6a..f4341a3 100644 --- a/server/services/unraid_service.py +++ b/server/services/unraid_service.py @@ -1,421 +1,256 @@ +"""Unraid server stats — MQTT-only data source. + +Reads all server data directly from the MQTT message store, which is +populated by the Unraid MQTT Agent running on each server. This +eliminates the need for GraphQL/REST API keys or HTTP polling. + +MQTT topics used per server (prefix = e.g. "Adriahub" or "unraid-daddelolymp"): + {prefix}/system — CPU, RAM, uptime, temps, hostname, version + {prefix}/docker/containers — Docker container list with per-container stats + {prefix}/shares — Share names, free/used/total bytes + {prefix}/disks — Disk info with temps, SMART, model + {prefix}/array — Array state + {prefix}/availability — "online" / "offline" +""" + from __future__ import annotations -import asyncio import logging -import httpx -from dataclasses import dataclass, field +import time +from dataclasses import dataclass from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) -# --------------------------------------------------------------------------- -# GraphQL query for the Unraid built-in API (6.12+) -# --------------------------------------------------------------------------- - -_GRAPHQL_QUERY = """ -{ - online - info { - os { hostname uptime } - cpu { model cores threads manufacturer brand } - memory { layout { size type } } - } - docker { - containers { names state status image } - } - array { - state - capacity { kilobytes { free used total } } - } - shares { name free size } -} -""".strip() - @dataclass class ServerConfig: """Configuration for a single Unraid server.""" name: str - host: str + host: str = "" + mqtt_prefix: str = "" + # Deprecated — kept for backward compat with old DB rows api_key: str = "" port: int = 80 -def _empty_stats(server: ServerConfig) -> Dict[str, Any]: - """Return a default stats dictionary for a server that has not yet been queried.""" +def _empty_stats(name: str, host: str = "") -> Dict[str, Any]: + """Return a default stats dictionary for a server with no data yet.""" return { - "name": server.name, - "host": server.host, + "name": name, + "host": host, "online": False, "uptime": "", - "cpu": {"usage_pct": 0, "cores": 0, "temp_c": None}, + "cpu": {"usage_pct": 0, "cores": 0, "temp_c": None, "threads": 0, "brand": ""}, "ram": {"used_gb": 0, "total_gb": 0, "pct": 0}, "array": {"status": "unknown", "disks": []}, "docker": {"running": 0, "containers": []}, + "shares": [], + "disks": [], "error": None, } -# --------------------------------------------------------------------------- -# GraphQL parser (Unraid 6.12+ built-in API) -# --------------------------------------------------------------------------- +def _format_uptime(seconds: int) -> str: + """Convert uptime seconds to a human-readable string.""" + days = seconds // 86400 + hours = (seconds % 86400) // 3600 + if days > 0: + return f"{days}d {hours}h" + return f"{hours}h" -def _parse_graphql_response(data: Dict[str, Any], result: Dict[str, Any]) -> None: - """Parse a successful GraphQL response into the standard result dict.""" - result["online"] = data.get("online", True) - # --- Info --- - info = data.get("info", {}) - os_info = info.get("os", {}) - result["uptime"] = os_info.get("uptime", "") +def _parse_system(data: Dict[str, Any], result: Dict[str, Any]) -> None: + """Parse the ``{prefix}/system`` MQTT payload.""" + # CPU + cpu_pct = data.get("cpu_usage_percent") + if cpu_pct is not None: + result["cpu"]["usage_pct"] = round(float(cpu_pct), 1) + result["cpu"]["cores"] = data.get("cpu_cores", 0) + result["cpu"]["threads"] = data.get("cpu_threads", 0) + result["cpu"]["brand"] = data.get("cpu_model", "") + cpu_temp = data.get("cpu_temp_celsius") + if cpu_temp is not None: + result["cpu"]["temp_c"] = cpu_temp + mb_temp = data.get("motherboard_temp_celsius") + if mb_temp is not None: + result["cpu"]["mb_temp_c"] = mb_temp - cpu_info = info.get("cpu", {}) - result["cpu"]["cores"] = cpu_info.get("cores", 0) - result["cpu"]["threads"] = cpu_info.get("threads", 0) - result["cpu"]["brand"] = cpu_info.get("brand", "") - # GraphQL API doesn't expose CPU usage % — keep 0 + # RAM + ram_pct = data.get("ram_usage_percent") + if ram_pct is not None: + result["ram"]["pct"] = round(float(ram_pct), 1) + ram_total = data.get("ram_total_bytes") + if ram_total: + result["ram"]["total_gb"] = round(ram_total / (1024 ** 3), 1) + ram_used = data.get("ram_used_bytes") + if ram_used: + result["ram"]["used_gb"] = round(ram_used / (1024 ** 3), 1) - # Memory: sum layout slots for total GB - mem_layout = info.get("memory", {}).get("layout", []) - total_bytes = sum(slot.get("size", 0) for slot in mem_layout) - result["ram"]["total_gb"] = round(total_bytes / (1024 ** 3), 1) - # GraphQL API doesn't expose used memory — keep 0 + # Uptime + uptime_secs = data.get("uptime_seconds") + if uptime_secs: + result["uptime"] = _format_uptime(int(uptime_secs)) - # --- Docker --- - docker_data = data.get("docker", {}) - containers_raw: List[Dict[str, Any]] = docker_data.get("containers", []) + # Extra metadata + result["version"] = data.get("version", "") + result["kernel"] = data.get("kernel_version", "") + result["motherboard"] = data.get("server_model", "") + + result["online"] = True + + +def _parse_docker(containers_list: List[Dict[str, Any]], result: Dict[str, Any]) -> None: + """Parse the ``{prefix}/docker/containers`` MQTT payload.""" containers: List[Dict[str, Any]] = [] running_count = 0 - for c in containers_raw: - names = c.get("names", []) - name = names[0].lstrip("/") if names else "unknown" + + for c in containers_list: state = c.get("state", "unknown") - is_running = state == "RUNNING" + is_running = state == "running" if is_running: running_count += 1 containers.append({ - "name": name, + "name": c.get("name", "unknown"), "status": c.get("status", ""), "image": c.get("image", ""), "running": is_running, }) + result["docker"]["running"] = running_count result["docker"]["containers"] = containers - # --- Array --- - array_data = data.get("array", {}) - result["array"]["status"] = array_data.get("state", "unknown").lower() - cap = array_data.get("capacity", {}).get("kilobytes", {}) - total_kb = int(cap.get("total", 0)) - used_kb = int(cap.get("used", 0)) - if total_kb > 0: - result["array"]["total_tb"] = round(total_kb / (1024 ** 2), 1) # KB → TB - result["array"]["used_tb"] = round(used_kb / (1024 ** 2), 1) - - # --- Shares (expose as top-level) --- - shares_raw = data.get("shares", []) +def _parse_shares(shares_list: List[Dict[str, Any]], result: Dict[str, Any]) -> None: + """Parse the ``{prefix}/shares`` MQTT payload.""" shares: List[Dict[str, Any]] = [] - for s in shares_raw: - free_kb = s.get("free", 0) + for s in shares_list: + free_bytes = s.get("free_bytes", 0) + total_bytes = s.get("total_bytes", 0) shares.append({ "name": s.get("name", ""), - "free_gb": round(free_kb / (1024 ** 2), 1), + "free_gb": round(free_bytes / (1024 ** 3), 1) if free_bytes else 0, + "total_gb": round(total_bytes / (1024 ** 3), 1) if total_bytes else 0, }) result["shares"] = shares -async def _try_graphql_endpoint( - client: httpx.AsyncClient, - server: ServerConfig, - result: Dict[str, Any], -) -> bool: - """Attempt to fetch stats via the Unraid GraphQL API (6.12+). - - Returns True if successful, False otherwise. - """ - if not server.api_key: - return False - - base = f"http://{server.host}:{server.port}" - headers = { - "x-api-key": server.api_key, - "Content-Type": "application/json", - "Origin": base, - } - - try: - resp = await client.post( - f"{base}/graphql", - headers=headers, - json={"query": _GRAPHQL_QUERY}, - ) - - if resp.status_code == 403: - # 403 means the endpoint exists but auth failed - logger.warning("[UNRAID] %s (%s): GraphQL 403 — invalid API key?", - server.name, server.host) - return False - - if resp.status_code != 200: - return False - - body = resp.json() - - # Check for GraphQL-level errors - errors = body.get("errors") - if errors and not body.get("data"): - first_msg = errors[0].get("message", "") if errors else "" - logger.warning("[UNRAID] %s (%s): GraphQL error: %s", - server.name, server.host, first_msg) - return False - - data = body.get("data") - if not data: - return False - - _parse_graphql_response(data, result) - logger.info( - "[UNRAID] %s (%s): GraphQL OK — %d containers (%d running), %s cores", - server.name, server.host, - len(result["docker"]["containers"]), - result["docker"]["running"], - result["cpu"]["cores"], - ) - return True - - except Exception as exc: - logger.debug("[UNRAID] %s (%s): GraphQL failed: %s", - server.name, server.host, exc) - return False - - -# --------------------------------------------------------------------------- -# Legacy REST parser (custom Unraid API plugins) -# --------------------------------------------------------------------------- - -def _parse_system_info(data: Dict[str, Any], result: Dict[str, Any]) -> None: - """Populate *result* from a generic ``/api/system`` JSON response.""" - result["online"] = True - result["uptime"] = data.get("uptime", "") - - cpu_data = data.get("cpu", {}) - result["cpu"]["usage_pct"] = cpu_data.get("usage_pct", cpu_data.get("usage", 0)) - result["cpu"]["cores"] = cpu_data.get("cores", 0) - result["cpu"]["temp_c"] = cpu_data.get("temp_c", cpu_data.get("temp", None)) - - ram_data = data.get("ram", data.get("memory", {})) - result["ram"]["used_gb"] = round(ram_data.get("used_gb", ram_data.get("used", 0)), 2) - result["ram"]["total_gb"] = round(ram_data.get("total_gb", ram_data.get("total", 0)), 2) - total = result["ram"]["total_gb"] - if total > 0: - result["ram"]["pct"] = round(result["ram"]["used_gb"] / total * 100, 1) - else: - result["ram"]["pct"] = 0 - - -def _parse_array_info(data: Dict[str, Any], result: Dict[str, Any]) -> None: - """Populate array information from an API response.""" - array_data = data.get("array", {}) - result["array"]["status"] = array_data.get("status", "unknown") - - disks_raw: List[Dict[str, Any]] = array_data.get("disks", []) - parsed_disks: List[Dict[str, Any]] = [] - for disk in disks_raw: - parsed_disks.append({ - "name": disk.get("name", ""), - "status": disk.get("status", "unknown"), - "size": disk.get("size", ""), - "used": disk.get("used", ""), - "temp_c": disk.get("temp_c", None), +def _parse_disks(disks_list: List[Dict[str, Any]], result: Dict[str, Any]) -> None: + """Parse the ``{prefix}/disks`` MQTT payload.""" + disks: List[Dict[str, Any]] = [] + for d in disks_list: + # Skip placeholder disks (empty parity slots etc.) + if d.get("status") == "DISK_NP": + continue + disks.append({ + "name": d.get("name", ""), + "model": d.get("model", ""), + "temp_c": d.get("temperature_celsius", None), + "size_gb": round(d.get("size_bytes", 0) / (1024 ** 3), 1) if d.get("size_bytes") else 0, + "smart_status": d.get("smart_status", ""), + "role": d.get("role", ""), }) - result["array"]["disks"] = parsed_disks + result["disks"] = disks -def _parse_docker_info(data: Dict[str, Any], result: Dict[str, Any]) -> None: - """Populate Docker container information from an API response.""" - docker_data = data.get("docker", {}) - containers_raw: List[Dict[str, Any]] = docker_data.get("containers", []) - - containers: List[Dict[str, Any]] = [] - running_count = 0 - for container in containers_raw: - status = container.get("status", "unknown") - is_running = "running" in status.lower() if isinstance(status, str) else False - if is_running: - running_count += 1 - containers.append({ - "name": container.get("name", ""), - "status": status, - "image": container.get("image", ""), - "running": is_running, - }) - - result["docker"]["running"] = docker_data.get("running", running_count) - result["docker"]["containers"] = containers +def _parse_array(data: Dict[str, Any], result: Dict[str, Any]) -> None: + """Parse the ``{prefix}/array`` MQTT payload.""" + result["array"]["status"] = data.get("state", "unknown").lower() + result["array"]["num_disks"] = data.get("num_disks", 0) -async def _try_rest_endpoint( - client: httpx.AsyncClient, +def fetch_server_from_mqtt( server: ServerConfig, - result: Dict[str, Any], -) -> bool: - """Attempt to fetch stats via legacy REST API endpoints. - - Returns True if successful, False otherwise. - """ - if not server.api_key: - return False - - headers = {"Authorization": f"Bearer {server.api_key}"} - base = f"http://{server.host}:{server.port}" - - try: - resp = await client.get(f"{base}/api/system", headers=headers) - if resp.status_code == 200: - data = resp.json() - _parse_system_info(data, result) - _parse_array_info(data, result) - _parse_docker_info(data, result) - logger.info("[UNRAID] %s (%s): REST API OK", server.name, server.host) - return True - else: - logger.debug("[UNRAID] %s (%s): /api/system returned HTTP %d", - server.name, server.host, resp.status_code) - except Exception as exc: - logger.debug("[UNRAID] %s (%s): /api/system failed: %s", - server.name, server.host, exc) - - # Try individual endpoints if the combined one failed - fetched_any = False - - for endpoint, parser in [ - ("/api/cpu", lambda d: ( - result["cpu"].update({ - "usage_pct": d.get("usage_pct", d.get("usage", 0)), - "cores": d.get("cores", 0), - "temp_c": d.get("temp_c", None), - }), - )), - ("/api/memory", lambda d: ( - result["ram"].update({ - "used_gb": round(d.get("used_gb", d.get("used", 0)), 2), - "total_gb": round(d.get("total_gb", d.get("total", 0)), 2), - }), - )), - ]: - try: - resp = await client.get(f"{base}{endpoint}", headers=headers) - if resp.status_code == 200: - parser(resp.json()) - result["online"] = True - fetched_any = True - except Exception: - pass - - try: - resp = await client.get(f"{base}/api/array", headers=headers) - if resp.status_code == 200: - _parse_array_info(resp.json(), result) - result["online"] = True - fetched_any = True - except Exception: - pass - - try: - resp = await client.get(f"{base}/api/docker", headers=headers) - if resp.status_code == 200: - _parse_docker_info(resp.json(), result) - result["online"] = True - fetched_any = True - except Exception: - pass - - return fetched_any - - -# --------------------------------------------------------------------------- -# Connectivity fallback -# --------------------------------------------------------------------------- - -async def _try_connectivity_check( - client: httpx.AsyncClient, - server: ServerConfig, - result: Dict[str, Any], -) -> None: - """Perform a basic HTTP connectivity check as a fallback.""" - try: - resp = await client.get( - f"http://{server.host}:{server.port}/", - follow_redirects=True, - ) - result["online"] = resp.status_code < 500 - except Exception: - result["online"] = False - - -# --------------------------------------------------------------------------- -# Main fetch function -# --------------------------------------------------------------------------- - -async def fetch_server_stats(server: ServerConfig) -> Dict[str, Any]: - """Fetch system stats from an Unraid server. - - Strategy: - 1. Try Unraid GraphQL API (built-in since 6.12, uses ``x-api-key`` header) - 2. Fall back to legacy REST API (custom plugins, uses ``Bearer`` token) - 3. Fall back to simple HTTP connectivity check + store: Dict[str, Any], +) -> Dict[str, Any]: + """Build complete server stats from the MQTT message store. Args: - server: A :class:`ServerConfig` describing the target server. + server: Server configuration with ``mqtt_prefix``. + store: The ``mqtt_service.store`` dict (topic → MqttMessage). Returns: - Dictionary with server name, host, online status, and detailed stats - for CPU, RAM, array, and Docker containers. + Server stats dictionary ready for the API response. """ - result = _empty_stats(server) + prefix = server.mqtt_prefix or server.name + result = _empty_stats(server.name, server.host) - if not server.host: - result["error"] = "No host configured" - return result + def _get(topic: str) -> Optional[Any]: + msg = store.get(topic) + if msg is None: + return None + return msg.payload - try: - async with httpx.AsyncClient(timeout=10, verify=False) as client: - # 1) Try GraphQL first (modern Unraid 6.12+) - api_ok = await _try_graphql_endpoint(client, server, result) - - # 2) Fall back to REST - if not api_ok: - api_ok = await _try_rest_endpoint(client, server, result) - - # 3) Fall back to connectivity check - if not api_ok and not result["online"]: - logger.info("[UNRAID] %s: APIs failed, trying connectivity check", server.name) - await _try_connectivity_check(client, server, result) - - except Exception as exc: + # --- Availability --- + avail = _get(f"{prefix}/availability") + if avail == "online": + result["online"] = True + elif avail == "offline": result["online"] = False - result["error"] = str(exc) - logger.error("[UNRAID] %s (%s): connection failed: %s", server.name, server.host, exc) + result["error"] = "Server offline (MQTT availability)" - if not result["online"]: - logger.warning("[UNRAID] %s (%s): offline (error=%s)", server.name, server.host, result.get("error")) + # --- System (CPU, RAM, uptime, temps) --- + system_data = _get(f"{prefix}/system") + if system_data and isinstance(system_data, dict): + _parse_system(system_data, result) + else: + # No system data means MQTT agent isn't reporting + if result["online"] is False: + result["error"] = "No MQTT data available" + + # --- Docker containers --- + docker_data = _get(f"{prefix}/docker/containers") + if docker_data and isinstance(docker_data, list): + _parse_docker(docker_data, result) + + # --- Shares --- + shares_data = _get(f"{prefix}/shares") + if shares_data and isinstance(shares_data, list): + _parse_shares(shares_data, result) + + # --- Disks --- + disks_data = _get(f"{prefix}/disks") + if disks_data and isinstance(disks_data, list): + _parse_disks(disks_data, result) + + # --- Array --- + array_data = _get(f"{prefix}/array") + if array_data and isinstance(array_data, dict): + _parse_array(array_data, result) + + # Check data freshness (system topic timestamp) + sys_msg = store.get(f"{prefix}/system") + if sys_msg: + age = time.time() - sys_msg.timestamp + if age > 120: # More than 2 minutes old + result["stale"] = True + logger.warning( + "[UNRAID] %s: MQTT data is %.0fs old (stale)", + server.name, age, + ) + + logger.debug( + "[UNRAID] %s: MQTT — CPU %.1f%% (%d°C), RAM %.1f%%, Docker %d/%d", + server.name, + result["cpu"].get("usage_pct", 0), + result["cpu"].get("temp_c", 0) or 0, + result["ram"].get("pct", 0), + result["docker"]["running"], + len(result["docker"]["containers"]), + ) return result -async def fetch_all_servers(servers: List[ServerConfig]) -> List[Dict[str, Any]]: - """Fetch stats from all configured Unraid servers in parallel. +def fetch_all_servers_mqtt( + servers: List[ServerConfig], + store: Dict[str, Any], +) -> List[Dict[str, Any]]: + """Fetch stats for all configured servers from the MQTT store. - Args: - servers: List of :class:`ServerConfig` instances. - - Returns: - List of stats dictionaries, one per server. + This is synchronous — no HTTP calls, just reading in-memory data. """ if not servers: return [] - - tasks = [fetch_server_stats(srv) for srv in servers] - return list(await asyncio.gather(*tasks)) + return [fetch_server_from_mqtt(srv, store) for srv in servers]