diff --git a/server/config.py b/server/config.py index b2cdd74..cb388f9 100644 --- a/server/config.py +++ b/server/config.py @@ -19,9 +19,10 @@ logger = logging.getLogger(__name__) @dataclass class UnraidServer: name: str - host: str - api_key: str = "" - port: int = 80 + host: str = "" + mqtt_prefix: str = "" # MQTT topic prefix, e.g. "Adriahub" or "unraid-daddelolymp" + api_key: str = "" # Deprecated — kept for backward compat + port: int = 80 # Deprecated — kept for backward compat @dataclass @@ -122,11 +123,12 @@ class Settings: UnraidServer( name=srv.get("name", f"Server {i+1}"), host=srv.get("host", ""), + mqtt_prefix=srv.get("mqtt_prefix", ""), api_key=srv.get("api_key", ""), port=int(srv.get("port", 80)), ) for i, srv in enumerate(servers_data) - if srv.get("host") + if srv.get("name") or srv.get("host") ] s.unraid_enabled = len(s.unraid_servers) > 0 except (json.JSONDecodeError, TypeError): @@ -172,11 +174,12 @@ class Settings: UnraidServer( name=s.get("name", ""), host=s.get("host", ""), + mqtt_prefix=s.get("mqtt_prefix", ""), api_key=s.get("api_key", ""), port=int(s.get("port", 80)), ) for s in servers - if s.get("host") + if s.get("name") or s.get("host") ] self.unraid_enabled = enabled diff --git a/server/routers/servers.py b/server/routers/servers.py index 3ecea6c..138cc6d 100644 --- a/server/routers/servers.py +++ b/server/routers/servers.py @@ -1,4 +1,4 @@ -"""Unraid servers status router.""" +"""Unraid servers status router — MQTT-only data source.""" from __future__ import annotations @@ -7,151 +7,45 @@ from typing import Any, Dict, List from fastapi import APIRouter -from server.cache import cache from server.config import get_settings from server.services.mqtt_service import mqtt_service -from server.services.unraid_service import ServerConfig, fetch_all_servers +from server.services.unraid_service import ServerConfig, fetch_all_servers_mqtt logger = logging.getLogger(__name__) router = APIRouter(prefix="/api", tags=["servers"]) -CACHE_KEY = "servers" - - -# --------------------------------------------------------------------------- -# MQTT enrichment — overlay live system metrics from MQTT topics -# --------------------------------------------------------------------------- - -def _enrich_from_mqtt(servers: List[Dict[str, Any]]) -> None: - """Merge live CPU/RAM data from MQTT ``/system`` topics. - - The Unraid MQTT Agent plugin publishes JSON payloads to topics like - ``unraid-daddelolymp/system`` or ``Adriahub/system`` every ~15 s. - These contain live ``cpu_usage_percent``, ``ram_usage_percent``, etc. - that the GraphQL API does not expose. - """ - - store = mqtt_service.store - - for srv in servers: - name = srv.get("name", "") - if not name: - continue - - # Try common topic patterns - system_data: Dict[str, Any] | None = None - for pattern in ( - f"{name}/system", # "Adriahub/system" - f"unraid-{name.lower()}/system", # "unraid-daddelolymp/system" - f"unraid-{name}/system", # "unraid-Daddelolymp/system" - ): - msg = store.get(pattern) - if msg is not None and isinstance(msg.payload, dict): - system_data = msg.payload - break - - if not system_data: - continue - - # --- CPU --- - cpu_pct = system_data.get("cpu_usage_percent") - if cpu_pct is not None: - srv["cpu"]["usage_pct"] = round(float(cpu_pct), 1) - - cpu_model = system_data.get("cpu_model") - if cpu_model: - srv["cpu"]["brand"] = cpu_model - - cpu_temp = system_data.get("cpu_temp_celsius") - if cpu_temp is not None: - srv["cpu"]["temp_c"] = cpu_temp - - cores = system_data.get("cpu_cores") - if cores: - srv["cpu"]["cores"] = cores - - threads = system_data.get("cpu_threads") - if threads: - srv["cpu"]["threads"] = threads - - # --- RAM --- - ram_pct = system_data.get("ram_usage_percent") - if ram_pct is not None: - srv["ram"]["pct"] = round(float(ram_pct), 1) - - ram_total = system_data.get("ram_total_bytes") - if ram_total: - srv["ram"]["total_gb"] = round(ram_total / (1024 ** 3), 1) - - ram_used = system_data.get("ram_used_bytes") - if ram_used: - srv["ram"]["used_gb"] = round(ram_used / (1024 ** 3), 1) - - # --- Uptime --- - uptime_secs = system_data.get("uptime_seconds") - if uptime_secs: - days = uptime_secs // 86400 - hours = (uptime_secs % 86400) // 3600 - srv["uptime"] = f"{days}d {hours}h" - - srv["online"] = True - - logger.debug( - "[UNRAID] %s: MQTT enriched — CPU %.1f%% %.0f°C, RAM %.1f%%", - name, - srv["cpu"].get("usage_pct", 0), - srv["cpu"].get("temp_c", 0) or 0, - srv["ram"].get("pct", 0), - ) - @router.get("/servers") async def get_servers() -> Dict[str, Any]: """Return status information for all configured Unraid servers. - Response shape:: - - { - "servers": [ ... server dicts ... ] - } + All data comes from the MQTT message store — no HTTP polling, + no API keys, no cache needed (MQTT data is always fresh). """ - # --- cache hit? ----------------------------------------------------------- - cached = await cache.get(CACHE_KEY) - if cached is not None: - # Always overlay fresh MQTT data even on cache hits - _enrich_from_mqtt(cached.get("servers", [])) - return cached + settings = get_settings() - # --- cache miss ----------------------------------------------------------- server_configs: List[ServerConfig] = [ ServerConfig( name=srv.name, host=srv.host, - api_key=srv.api_key, - port=srv.port, + mqtt_prefix=getattr(srv, "mqtt_prefix", "") or srv.name, ) - for srv in get_settings().unraid_servers + for srv in settings.unraid_servers ] - servers_data: List[Dict[str, Any]] = [] + if not server_configs: + return {"servers": []} + try: - servers_data = await fetch_all_servers(server_configs) + servers_data = fetch_all_servers_mqtt(server_configs, mqtt_service.store) except Exception as exc: - logger.exception("Failed to fetch Unraid server data") + logger.exception("Failed to read Unraid server data from MQTT") return { "servers": [], "error": True, "message": str(exc), } - # Overlay live MQTT system metrics - _enrich_from_mqtt(servers_data) - - payload: Dict[str, Any] = { - "servers": servers_data, - } - - await cache.set(CACHE_KEY, payload, get_settings().unraid_cache_ttl) - return payload + return {"servers": servers_data} diff --git a/server/services/unraid_service.py b/server/services/unraid_service.py index 85ebe6a..f4341a3 100644 --- a/server/services/unraid_service.py +++ b/server/services/unraid_service.py @@ -1,421 +1,256 @@ +"""Unraid server stats — MQTT-only data source. + +Reads all server data directly from the MQTT message store, which is +populated by the Unraid MQTT Agent running on each server. This +eliminates the need for GraphQL/REST API keys or HTTP polling. + +MQTT topics used per server (prefix = e.g. "Adriahub" or "unraid-daddelolymp"): + {prefix}/system — CPU, RAM, uptime, temps, hostname, version + {prefix}/docker/containers — Docker container list with per-container stats + {prefix}/shares — Share names, free/used/total bytes + {prefix}/disks — Disk info with temps, SMART, model + {prefix}/array — Array state + {prefix}/availability — "online" / "offline" +""" + from __future__ import annotations -import asyncio import logging -import httpx -from dataclasses import dataclass, field +import time +from dataclasses import dataclass from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) -# --------------------------------------------------------------------------- -# GraphQL query for the Unraid built-in API (6.12+) -# --------------------------------------------------------------------------- - -_GRAPHQL_QUERY = """ -{ - online - info { - os { hostname uptime } - cpu { model cores threads manufacturer brand } - memory { layout { size type } } - } - docker { - containers { names state status image } - } - array { - state - capacity { kilobytes { free used total } } - } - shares { name free size } -} -""".strip() - @dataclass class ServerConfig: """Configuration for a single Unraid server.""" name: str - host: str + host: str = "" + mqtt_prefix: str = "" + # Deprecated — kept for backward compat with old DB rows api_key: str = "" port: int = 80 -def _empty_stats(server: ServerConfig) -> Dict[str, Any]: - """Return a default stats dictionary for a server that has not yet been queried.""" +def _empty_stats(name: str, host: str = "") -> Dict[str, Any]: + """Return a default stats dictionary for a server with no data yet.""" return { - "name": server.name, - "host": server.host, + "name": name, + "host": host, "online": False, "uptime": "", - "cpu": {"usage_pct": 0, "cores": 0, "temp_c": None}, + "cpu": {"usage_pct": 0, "cores": 0, "temp_c": None, "threads": 0, "brand": ""}, "ram": {"used_gb": 0, "total_gb": 0, "pct": 0}, "array": {"status": "unknown", "disks": []}, "docker": {"running": 0, "containers": []}, + "shares": [], + "disks": [], "error": None, } -# --------------------------------------------------------------------------- -# GraphQL parser (Unraid 6.12+ built-in API) -# --------------------------------------------------------------------------- +def _format_uptime(seconds: int) -> str: + """Convert uptime seconds to a human-readable string.""" + days = seconds // 86400 + hours = (seconds % 86400) // 3600 + if days > 0: + return f"{days}d {hours}h" + return f"{hours}h" -def _parse_graphql_response(data: Dict[str, Any], result: Dict[str, Any]) -> None: - """Parse a successful GraphQL response into the standard result dict.""" - result["online"] = data.get("online", True) - # --- Info --- - info = data.get("info", {}) - os_info = info.get("os", {}) - result["uptime"] = os_info.get("uptime", "") +def _parse_system(data: Dict[str, Any], result: Dict[str, Any]) -> None: + """Parse the ``{prefix}/system`` MQTT payload.""" + # CPU + cpu_pct = data.get("cpu_usage_percent") + if cpu_pct is not None: + result["cpu"]["usage_pct"] = round(float(cpu_pct), 1) + result["cpu"]["cores"] = data.get("cpu_cores", 0) + result["cpu"]["threads"] = data.get("cpu_threads", 0) + result["cpu"]["brand"] = data.get("cpu_model", "") + cpu_temp = data.get("cpu_temp_celsius") + if cpu_temp is not None: + result["cpu"]["temp_c"] = cpu_temp + mb_temp = data.get("motherboard_temp_celsius") + if mb_temp is not None: + result["cpu"]["mb_temp_c"] = mb_temp - cpu_info = info.get("cpu", {}) - result["cpu"]["cores"] = cpu_info.get("cores", 0) - result["cpu"]["threads"] = cpu_info.get("threads", 0) - result["cpu"]["brand"] = cpu_info.get("brand", "") - # GraphQL API doesn't expose CPU usage % — keep 0 + # RAM + ram_pct = data.get("ram_usage_percent") + if ram_pct is not None: + result["ram"]["pct"] = round(float(ram_pct), 1) + ram_total = data.get("ram_total_bytes") + if ram_total: + result["ram"]["total_gb"] = round(ram_total / (1024 ** 3), 1) + ram_used = data.get("ram_used_bytes") + if ram_used: + result["ram"]["used_gb"] = round(ram_used / (1024 ** 3), 1) - # Memory: sum layout slots for total GB - mem_layout = info.get("memory", {}).get("layout", []) - total_bytes = sum(slot.get("size", 0) for slot in mem_layout) - result["ram"]["total_gb"] = round(total_bytes / (1024 ** 3), 1) - # GraphQL API doesn't expose used memory — keep 0 + # Uptime + uptime_secs = data.get("uptime_seconds") + if uptime_secs: + result["uptime"] = _format_uptime(int(uptime_secs)) - # --- Docker --- - docker_data = data.get("docker", {}) - containers_raw: List[Dict[str, Any]] = docker_data.get("containers", []) + # Extra metadata + result["version"] = data.get("version", "") + result["kernel"] = data.get("kernel_version", "") + result["motherboard"] = data.get("server_model", "") + + result["online"] = True + + +def _parse_docker(containers_list: List[Dict[str, Any]], result: Dict[str, Any]) -> None: + """Parse the ``{prefix}/docker/containers`` MQTT payload.""" containers: List[Dict[str, Any]] = [] running_count = 0 - for c in containers_raw: - names = c.get("names", []) - name = names[0].lstrip("/") if names else "unknown" + + for c in containers_list: state = c.get("state", "unknown") - is_running = state == "RUNNING" + is_running = state == "running" if is_running: running_count += 1 containers.append({ - "name": name, + "name": c.get("name", "unknown"), "status": c.get("status", ""), "image": c.get("image", ""), "running": is_running, }) + result["docker"]["running"] = running_count result["docker"]["containers"] = containers - # --- Array --- - array_data = data.get("array", {}) - result["array"]["status"] = array_data.get("state", "unknown").lower() - cap = array_data.get("capacity", {}).get("kilobytes", {}) - total_kb = int(cap.get("total", 0)) - used_kb = int(cap.get("used", 0)) - if total_kb > 0: - result["array"]["total_tb"] = round(total_kb / (1024 ** 2), 1) # KB → TB - result["array"]["used_tb"] = round(used_kb / (1024 ** 2), 1) - - # --- Shares (expose as top-level) --- - shares_raw = data.get("shares", []) +def _parse_shares(shares_list: List[Dict[str, Any]], result: Dict[str, Any]) -> None: + """Parse the ``{prefix}/shares`` MQTT payload.""" shares: List[Dict[str, Any]] = [] - for s in shares_raw: - free_kb = s.get("free", 0) + for s in shares_list: + free_bytes = s.get("free_bytes", 0) + total_bytes = s.get("total_bytes", 0) shares.append({ "name": s.get("name", ""), - "free_gb": round(free_kb / (1024 ** 2), 1), + "free_gb": round(free_bytes / (1024 ** 3), 1) if free_bytes else 0, + "total_gb": round(total_bytes / (1024 ** 3), 1) if total_bytes else 0, }) result["shares"] = shares -async def _try_graphql_endpoint( - client: httpx.AsyncClient, - server: ServerConfig, - result: Dict[str, Any], -) -> bool: - """Attempt to fetch stats via the Unraid GraphQL API (6.12+). - - Returns True if successful, False otherwise. - """ - if not server.api_key: - return False - - base = f"http://{server.host}:{server.port}" - headers = { - "x-api-key": server.api_key, - "Content-Type": "application/json", - "Origin": base, - } - - try: - resp = await client.post( - f"{base}/graphql", - headers=headers, - json={"query": _GRAPHQL_QUERY}, - ) - - if resp.status_code == 403: - # 403 means the endpoint exists but auth failed - logger.warning("[UNRAID] %s (%s): GraphQL 403 — invalid API key?", - server.name, server.host) - return False - - if resp.status_code != 200: - return False - - body = resp.json() - - # Check for GraphQL-level errors - errors = body.get("errors") - if errors and not body.get("data"): - first_msg = errors[0].get("message", "") if errors else "" - logger.warning("[UNRAID] %s (%s): GraphQL error: %s", - server.name, server.host, first_msg) - return False - - data = body.get("data") - if not data: - return False - - _parse_graphql_response(data, result) - logger.info( - "[UNRAID] %s (%s): GraphQL OK — %d containers (%d running), %s cores", - server.name, server.host, - len(result["docker"]["containers"]), - result["docker"]["running"], - result["cpu"]["cores"], - ) - return True - - except Exception as exc: - logger.debug("[UNRAID] %s (%s): GraphQL failed: %s", - server.name, server.host, exc) - return False - - -# --------------------------------------------------------------------------- -# Legacy REST parser (custom Unraid API plugins) -# --------------------------------------------------------------------------- - -def _parse_system_info(data: Dict[str, Any], result: Dict[str, Any]) -> None: - """Populate *result* from a generic ``/api/system`` JSON response.""" - result["online"] = True - result["uptime"] = data.get("uptime", "") - - cpu_data = data.get("cpu", {}) - result["cpu"]["usage_pct"] = cpu_data.get("usage_pct", cpu_data.get("usage", 0)) - result["cpu"]["cores"] = cpu_data.get("cores", 0) - result["cpu"]["temp_c"] = cpu_data.get("temp_c", cpu_data.get("temp", None)) - - ram_data = data.get("ram", data.get("memory", {})) - result["ram"]["used_gb"] = round(ram_data.get("used_gb", ram_data.get("used", 0)), 2) - result["ram"]["total_gb"] = round(ram_data.get("total_gb", ram_data.get("total", 0)), 2) - total = result["ram"]["total_gb"] - if total > 0: - result["ram"]["pct"] = round(result["ram"]["used_gb"] / total * 100, 1) - else: - result["ram"]["pct"] = 0 - - -def _parse_array_info(data: Dict[str, Any], result: Dict[str, Any]) -> None: - """Populate array information from an API response.""" - array_data = data.get("array", {}) - result["array"]["status"] = array_data.get("status", "unknown") - - disks_raw: List[Dict[str, Any]] = array_data.get("disks", []) - parsed_disks: List[Dict[str, Any]] = [] - for disk in disks_raw: - parsed_disks.append({ - "name": disk.get("name", ""), - "status": disk.get("status", "unknown"), - "size": disk.get("size", ""), - "used": disk.get("used", ""), - "temp_c": disk.get("temp_c", None), +def _parse_disks(disks_list: List[Dict[str, Any]], result: Dict[str, Any]) -> None: + """Parse the ``{prefix}/disks`` MQTT payload.""" + disks: List[Dict[str, Any]] = [] + for d in disks_list: + # Skip placeholder disks (empty parity slots etc.) + if d.get("status") == "DISK_NP": + continue + disks.append({ + "name": d.get("name", ""), + "model": d.get("model", ""), + "temp_c": d.get("temperature_celsius", None), + "size_gb": round(d.get("size_bytes", 0) / (1024 ** 3), 1) if d.get("size_bytes") else 0, + "smart_status": d.get("smart_status", ""), + "role": d.get("role", ""), }) - result["array"]["disks"] = parsed_disks + result["disks"] = disks -def _parse_docker_info(data: Dict[str, Any], result: Dict[str, Any]) -> None: - """Populate Docker container information from an API response.""" - docker_data = data.get("docker", {}) - containers_raw: List[Dict[str, Any]] = docker_data.get("containers", []) - - containers: List[Dict[str, Any]] = [] - running_count = 0 - for container in containers_raw: - status = container.get("status", "unknown") - is_running = "running" in status.lower() if isinstance(status, str) else False - if is_running: - running_count += 1 - containers.append({ - "name": container.get("name", ""), - "status": status, - "image": container.get("image", ""), - "running": is_running, - }) - - result["docker"]["running"] = docker_data.get("running", running_count) - result["docker"]["containers"] = containers +def _parse_array(data: Dict[str, Any], result: Dict[str, Any]) -> None: + """Parse the ``{prefix}/array`` MQTT payload.""" + result["array"]["status"] = data.get("state", "unknown").lower() + result["array"]["num_disks"] = data.get("num_disks", 0) -async def _try_rest_endpoint( - client: httpx.AsyncClient, +def fetch_server_from_mqtt( server: ServerConfig, - result: Dict[str, Any], -) -> bool: - """Attempt to fetch stats via legacy REST API endpoints. - - Returns True if successful, False otherwise. - """ - if not server.api_key: - return False - - headers = {"Authorization": f"Bearer {server.api_key}"} - base = f"http://{server.host}:{server.port}" - - try: - resp = await client.get(f"{base}/api/system", headers=headers) - if resp.status_code == 200: - data = resp.json() - _parse_system_info(data, result) - _parse_array_info(data, result) - _parse_docker_info(data, result) - logger.info("[UNRAID] %s (%s): REST API OK", server.name, server.host) - return True - else: - logger.debug("[UNRAID] %s (%s): /api/system returned HTTP %d", - server.name, server.host, resp.status_code) - except Exception as exc: - logger.debug("[UNRAID] %s (%s): /api/system failed: %s", - server.name, server.host, exc) - - # Try individual endpoints if the combined one failed - fetched_any = False - - for endpoint, parser in [ - ("/api/cpu", lambda d: ( - result["cpu"].update({ - "usage_pct": d.get("usage_pct", d.get("usage", 0)), - "cores": d.get("cores", 0), - "temp_c": d.get("temp_c", None), - }), - )), - ("/api/memory", lambda d: ( - result["ram"].update({ - "used_gb": round(d.get("used_gb", d.get("used", 0)), 2), - "total_gb": round(d.get("total_gb", d.get("total", 0)), 2), - }), - )), - ]: - try: - resp = await client.get(f"{base}{endpoint}", headers=headers) - if resp.status_code == 200: - parser(resp.json()) - result["online"] = True - fetched_any = True - except Exception: - pass - - try: - resp = await client.get(f"{base}/api/array", headers=headers) - if resp.status_code == 200: - _parse_array_info(resp.json(), result) - result["online"] = True - fetched_any = True - except Exception: - pass - - try: - resp = await client.get(f"{base}/api/docker", headers=headers) - if resp.status_code == 200: - _parse_docker_info(resp.json(), result) - result["online"] = True - fetched_any = True - except Exception: - pass - - return fetched_any - - -# --------------------------------------------------------------------------- -# Connectivity fallback -# --------------------------------------------------------------------------- - -async def _try_connectivity_check( - client: httpx.AsyncClient, - server: ServerConfig, - result: Dict[str, Any], -) -> None: - """Perform a basic HTTP connectivity check as a fallback.""" - try: - resp = await client.get( - f"http://{server.host}:{server.port}/", - follow_redirects=True, - ) - result["online"] = resp.status_code < 500 - except Exception: - result["online"] = False - - -# --------------------------------------------------------------------------- -# Main fetch function -# --------------------------------------------------------------------------- - -async def fetch_server_stats(server: ServerConfig) -> Dict[str, Any]: - """Fetch system stats from an Unraid server. - - Strategy: - 1. Try Unraid GraphQL API (built-in since 6.12, uses ``x-api-key`` header) - 2. Fall back to legacy REST API (custom plugins, uses ``Bearer`` token) - 3. Fall back to simple HTTP connectivity check + store: Dict[str, Any], +) -> Dict[str, Any]: + """Build complete server stats from the MQTT message store. Args: - server: A :class:`ServerConfig` describing the target server. + server: Server configuration with ``mqtt_prefix``. + store: The ``mqtt_service.store`` dict (topic → MqttMessage). Returns: - Dictionary with server name, host, online status, and detailed stats - for CPU, RAM, array, and Docker containers. + Server stats dictionary ready for the API response. """ - result = _empty_stats(server) + prefix = server.mqtt_prefix or server.name + result = _empty_stats(server.name, server.host) - if not server.host: - result["error"] = "No host configured" - return result + def _get(topic: str) -> Optional[Any]: + msg = store.get(topic) + if msg is None: + return None + return msg.payload - try: - async with httpx.AsyncClient(timeout=10, verify=False) as client: - # 1) Try GraphQL first (modern Unraid 6.12+) - api_ok = await _try_graphql_endpoint(client, server, result) - - # 2) Fall back to REST - if not api_ok: - api_ok = await _try_rest_endpoint(client, server, result) - - # 3) Fall back to connectivity check - if not api_ok and not result["online"]: - logger.info("[UNRAID] %s: APIs failed, trying connectivity check", server.name) - await _try_connectivity_check(client, server, result) - - except Exception as exc: + # --- Availability --- + avail = _get(f"{prefix}/availability") + if avail == "online": + result["online"] = True + elif avail == "offline": result["online"] = False - result["error"] = str(exc) - logger.error("[UNRAID] %s (%s): connection failed: %s", server.name, server.host, exc) + result["error"] = "Server offline (MQTT availability)" - if not result["online"]: - logger.warning("[UNRAID] %s (%s): offline (error=%s)", server.name, server.host, result.get("error")) + # --- System (CPU, RAM, uptime, temps) --- + system_data = _get(f"{prefix}/system") + if system_data and isinstance(system_data, dict): + _parse_system(system_data, result) + else: + # No system data means MQTT agent isn't reporting + if result["online"] is False: + result["error"] = "No MQTT data available" + + # --- Docker containers --- + docker_data = _get(f"{prefix}/docker/containers") + if docker_data and isinstance(docker_data, list): + _parse_docker(docker_data, result) + + # --- Shares --- + shares_data = _get(f"{prefix}/shares") + if shares_data and isinstance(shares_data, list): + _parse_shares(shares_data, result) + + # --- Disks --- + disks_data = _get(f"{prefix}/disks") + if disks_data and isinstance(disks_data, list): + _parse_disks(disks_data, result) + + # --- Array --- + array_data = _get(f"{prefix}/array") + if array_data and isinstance(array_data, dict): + _parse_array(array_data, result) + + # Check data freshness (system topic timestamp) + sys_msg = store.get(f"{prefix}/system") + if sys_msg: + age = time.time() - sys_msg.timestamp + if age > 120: # More than 2 minutes old + result["stale"] = True + logger.warning( + "[UNRAID] %s: MQTT data is %.0fs old (stale)", + server.name, age, + ) + + logger.debug( + "[UNRAID] %s: MQTT — CPU %.1f%% (%d°C), RAM %.1f%%, Docker %d/%d", + server.name, + result["cpu"].get("usage_pct", 0), + result["cpu"].get("temp_c", 0) or 0, + result["ram"].get("pct", 0), + result["docker"]["running"], + len(result["docker"]["containers"]), + ) return result -async def fetch_all_servers(servers: List[ServerConfig]) -> List[Dict[str, Any]]: - """Fetch stats from all configured Unraid servers in parallel. +def fetch_all_servers_mqtt( + servers: List[ServerConfig], + store: Dict[str, Any], +) -> List[Dict[str, Any]]: + """Fetch stats for all configured servers from the MQTT store. - Args: - servers: List of :class:`ServerConfig` instances. - - Returns: - List of stats dictionaries, one per server. + This is synchronous — no HTTP calls, just reading in-memory data. """ if not servers: return [] - - tasks = [fetch_server_stats(srv) for srv in servers] - return list(await asyncio.gather(*tasks)) + return [fetch_server_from_mqtt(srv, store) for srv in servers]