refactor: replace GraphQL/REST with MQTT-only for Unraid server data

All server stats (CPU, RAM, Docker, shares, disks, array) now come
directly from MQTT topics published by the Unraid MQTT Agent. This
eliminates the need for API keys, HTTP polling, and the GraphQL/REST
fallback chain.

- Rewrote unraid_service.py to read from MQTT store (no httpx needed)
- Simplified servers router (no cache, no enrichment hack)
- Added mqtt_prefix field to UnraidServer config
- Updated DB: both Daddelolymp and Adriahub with mqtt_prefix, no api_key
- Data is always fresh (MQTT pushes every ~15s)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Sam 2026-03-02 23:25:57 +01:00
parent 5d3d4f4015
commit c6db0ab569
3 changed files with 201 additions and 469 deletions

View file

@ -19,9 +19,10 @@ logger = logging.getLogger(__name__)
@dataclass
class UnraidServer:
name: str
host: str
api_key: str = ""
port: int = 80
host: str = ""
mqtt_prefix: str = "" # MQTT topic prefix, e.g. "Adriahub" or "unraid-daddelolymp"
api_key: str = "" # Deprecated — kept for backward compat
port: int = 80 # Deprecated — kept for backward compat
@dataclass
@ -122,11 +123,12 @@ class Settings:
UnraidServer(
name=srv.get("name", f"Server {i+1}"),
host=srv.get("host", ""),
mqtt_prefix=srv.get("mqtt_prefix", ""),
api_key=srv.get("api_key", ""),
port=int(srv.get("port", 80)),
)
for i, srv in enumerate(servers_data)
if srv.get("host")
if srv.get("name") or srv.get("host")
]
s.unraid_enabled = len(s.unraid_servers) > 0
except (json.JSONDecodeError, TypeError):
@ -172,11 +174,12 @@ class Settings:
UnraidServer(
name=s.get("name", ""),
host=s.get("host", ""),
mqtt_prefix=s.get("mqtt_prefix", ""),
api_key=s.get("api_key", ""),
port=int(s.get("port", 80)),
)
for s in servers
if s.get("host")
if s.get("name") or s.get("host")
]
self.unraid_enabled = enabled

View file

@ -1,4 +1,4 @@
"""Unraid servers status router."""
"""Unraid servers status router — MQTT-only data source."""
from __future__ import annotations
@ -7,151 +7,45 @@ from typing import Any, Dict, List
from fastapi import APIRouter
from server.cache import cache
from server.config import get_settings
from server.services.mqtt_service import mqtt_service
from server.services.unraid_service import ServerConfig, fetch_all_servers
from server.services.unraid_service import ServerConfig, fetch_all_servers_mqtt
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api", tags=["servers"])
CACHE_KEY = "servers"
# ---------------------------------------------------------------------------
# MQTT enrichment — overlay live system metrics from MQTT topics
# ---------------------------------------------------------------------------
def _enrich_from_mqtt(servers: List[Dict[str, Any]]) -> None:
"""Merge live CPU/RAM data from MQTT ``<prefix>/system`` topics.
The Unraid MQTT Agent plugin publishes JSON payloads to topics like
``unraid-daddelolymp/system`` or ``Adriahub/system`` every ~15 s.
These contain live ``cpu_usage_percent``, ``ram_usage_percent``, etc.
that the GraphQL API does not expose.
"""
store = mqtt_service.store
for srv in servers:
name = srv.get("name", "")
if not name:
continue
# Try common topic patterns
system_data: Dict[str, Any] | None = None
for pattern in (
f"{name}/system", # "Adriahub/system"
f"unraid-{name.lower()}/system", # "unraid-daddelolymp/system"
f"unraid-{name}/system", # "unraid-Daddelolymp/system"
):
msg = store.get(pattern)
if msg is not None and isinstance(msg.payload, dict):
system_data = msg.payload
break
if not system_data:
continue
# --- CPU ---
cpu_pct = system_data.get("cpu_usage_percent")
if cpu_pct is not None:
srv["cpu"]["usage_pct"] = round(float(cpu_pct), 1)
cpu_model = system_data.get("cpu_model")
if cpu_model:
srv["cpu"]["brand"] = cpu_model
cpu_temp = system_data.get("cpu_temp_celsius")
if cpu_temp is not None:
srv["cpu"]["temp_c"] = cpu_temp
cores = system_data.get("cpu_cores")
if cores:
srv["cpu"]["cores"] = cores
threads = system_data.get("cpu_threads")
if threads:
srv["cpu"]["threads"] = threads
# --- RAM ---
ram_pct = system_data.get("ram_usage_percent")
if ram_pct is not None:
srv["ram"]["pct"] = round(float(ram_pct), 1)
ram_total = system_data.get("ram_total_bytes")
if ram_total:
srv["ram"]["total_gb"] = round(ram_total / (1024 ** 3), 1)
ram_used = system_data.get("ram_used_bytes")
if ram_used:
srv["ram"]["used_gb"] = round(ram_used / (1024 ** 3), 1)
# --- Uptime ---
uptime_secs = system_data.get("uptime_seconds")
if uptime_secs:
days = uptime_secs // 86400
hours = (uptime_secs % 86400) // 3600
srv["uptime"] = f"{days}d {hours}h"
srv["online"] = True
logger.debug(
"[UNRAID] %s: MQTT enriched — CPU %.1f%% %.0f°C, RAM %.1f%%",
name,
srv["cpu"].get("usage_pct", 0),
srv["cpu"].get("temp_c", 0) or 0,
srv["ram"].get("pct", 0),
)
@router.get("/servers")
async def get_servers() -> Dict[str, Any]:
"""Return status information for all configured Unraid servers.
Response shape::
{
"servers": [ ... server dicts ... ]
}
All data comes from the MQTT message store no HTTP polling,
no API keys, no cache needed (MQTT data is always fresh).
"""
# --- cache hit? -----------------------------------------------------------
cached = await cache.get(CACHE_KEY)
if cached is not None:
# Always overlay fresh MQTT data even on cache hits
_enrich_from_mqtt(cached.get("servers", []))
return cached
settings = get_settings()
# --- cache miss -----------------------------------------------------------
server_configs: List[ServerConfig] = [
ServerConfig(
name=srv.name,
host=srv.host,
api_key=srv.api_key,
port=srv.port,
mqtt_prefix=getattr(srv, "mqtt_prefix", "") or srv.name,
)
for srv in get_settings().unraid_servers
for srv in settings.unraid_servers
]
servers_data: List[Dict[str, Any]] = []
if not server_configs:
return {"servers": []}
try:
servers_data = await fetch_all_servers(server_configs)
servers_data = fetch_all_servers_mqtt(server_configs, mqtt_service.store)
except Exception as exc:
logger.exception("Failed to fetch Unraid server data")
logger.exception("Failed to read Unraid server data from MQTT")
return {
"servers": [],
"error": True,
"message": str(exc),
}
# Overlay live MQTT system metrics
_enrich_from_mqtt(servers_data)
payload: Dict[str, Any] = {
"servers": servers_data,
}
await cache.set(CACHE_KEY, payload, get_settings().unraid_cache_ttl)
return payload
return {"servers": servers_data}

View file

@ -1,421 +1,256 @@
"""Unraid server stats — MQTT-only data source.
Reads all server data directly from the MQTT message store, which is
populated by the Unraid MQTT Agent running on each server. This
eliminates the need for GraphQL/REST API keys or HTTP polling.
MQTT topics used per server (prefix = e.g. "Adriahub" or "unraid-daddelolymp"):
{prefix}/system CPU, RAM, uptime, temps, hostname, version
{prefix}/docker/containers Docker container list with per-container stats
{prefix}/shares Share names, free/used/total bytes
{prefix}/disks Disk info with temps, SMART, model
{prefix}/array Array state
{prefix}/availability "online" / "offline"
"""
from __future__ import annotations
import asyncio
import logging
import httpx
from dataclasses import dataclass, field
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# GraphQL query for the Unraid built-in API (6.12+)
# ---------------------------------------------------------------------------
_GRAPHQL_QUERY = """
{
online
info {
os { hostname uptime }
cpu { model cores threads manufacturer brand }
memory { layout { size type } }
}
docker {
containers { names state status image }
}
array {
state
capacity { kilobytes { free used total } }
}
shares { name free size }
}
""".strip()
@dataclass
class ServerConfig:
"""Configuration for a single Unraid server."""
name: str
host: str
host: str = ""
mqtt_prefix: str = ""
# Deprecated — kept for backward compat with old DB rows
api_key: str = ""
port: int = 80
def _empty_stats(server: ServerConfig) -> Dict[str, Any]:
"""Return a default stats dictionary for a server that has not yet been queried."""
def _empty_stats(name: str, host: str = "") -> Dict[str, Any]:
"""Return a default stats dictionary for a server with no data yet."""
return {
"name": server.name,
"host": server.host,
"name": name,
"host": host,
"online": False,
"uptime": "",
"cpu": {"usage_pct": 0, "cores": 0, "temp_c": None},
"cpu": {"usage_pct": 0, "cores": 0, "temp_c": None, "threads": 0, "brand": ""},
"ram": {"used_gb": 0, "total_gb": 0, "pct": 0},
"array": {"status": "unknown", "disks": []},
"docker": {"running": 0, "containers": []},
"shares": [],
"disks": [],
"error": None,
}
# ---------------------------------------------------------------------------
# GraphQL parser (Unraid 6.12+ built-in API)
# ---------------------------------------------------------------------------
def _format_uptime(seconds: int) -> str:
"""Convert uptime seconds to a human-readable string."""
days = seconds // 86400
hours = (seconds % 86400) // 3600
if days > 0:
return f"{days}d {hours}h"
return f"{hours}h"
def _parse_graphql_response(data: Dict[str, Any], result: Dict[str, Any]) -> None:
"""Parse a successful GraphQL response into the standard result dict."""
result["online"] = data.get("online", True)
# --- Info ---
info = data.get("info", {})
os_info = info.get("os", {})
result["uptime"] = os_info.get("uptime", "")
def _parse_system(data: Dict[str, Any], result: Dict[str, Any]) -> None:
"""Parse the ``{prefix}/system`` MQTT payload."""
# CPU
cpu_pct = data.get("cpu_usage_percent")
if cpu_pct is not None:
result["cpu"]["usage_pct"] = round(float(cpu_pct), 1)
result["cpu"]["cores"] = data.get("cpu_cores", 0)
result["cpu"]["threads"] = data.get("cpu_threads", 0)
result["cpu"]["brand"] = data.get("cpu_model", "")
cpu_temp = data.get("cpu_temp_celsius")
if cpu_temp is not None:
result["cpu"]["temp_c"] = cpu_temp
mb_temp = data.get("motherboard_temp_celsius")
if mb_temp is not None:
result["cpu"]["mb_temp_c"] = mb_temp
cpu_info = info.get("cpu", {})
result["cpu"]["cores"] = cpu_info.get("cores", 0)
result["cpu"]["threads"] = cpu_info.get("threads", 0)
result["cpu"]["brand"] = cpu_info.get("brand", "")
# GraphQL API doesn't expose CPU usage % — keep 0
# RAM
ram_pct = data.get("ram_usage_percent")
if ram_pct is not None:
result["ram"]["pct"] = round(float(ram_pct), 1)
ram_total = data.get("ram_total_bytes")
if ram_total:
result["ram"]["total_gb"] = round(ram_total / (1024 ** 3), 1)
ram_used = data.get("ram_used_bytes")
if ram_used:
result["ram"]["used_gb"] = round(ram_used / (1024 ** 3), 1)
# Memory: sum layout slots for total GB
mem_layout = info.get("memory", {}).get("layout", [])
total_bytes = sum(slot.get("size", 0) for slot in mem_layout)
result["ram"]["total_gb"] = round(total_bytes / (1024 ** 3), 1)
# GraphQL API doesn't expose used memory — keep 0
# Uptime
uptime_secs = data.get("uptime_seconds")
if uptime_secs:
result["uptime"] = _format_uptime(int(uptime_secs))
# --- Docker ---
docker_data = data.get("docker", {})
containers_raw: List[Dict[str, Any]] = docker_data.get("containers", [])
# Extra metadata
result["version"] = data.get("version", "")
result["kernel"] = data.get("kernel_version", "")
result["motherboard"] = data.get("server_model", "")
result["online"] = True
def _parse_docker(containers_list: List[Dict[str, Any]], result: Dict[str, Any]) -> None:
"""Parse the ``{prefix}/docker/containers`` MQTT payload."""
containers: List[Dict[str, Any]] = []
running_count = 0
for c in containers_raw:
names = c.get("names", [])
name = names[0].lstrip("/") if names else "unknown"
for c in containers_list:
state = c.get("state", "unknown")
is_running = state == "RUNNING"
is_running = state == "running"
if is_running:
running_count += 1
containers.append({
"name": name,
"name": c.get("name", "unknown"),
"status": c.get("status", ""),
"image": c.get("image", ""),
"running": is_running,
})
result["docker"]["running"] = running_count
result["docker"]["containers"] = containers
# --- Array ---
array_data = data.get("array", {})
result["array"]["status"] = array_data.get("state", "unknown").lower()
cap = array_data.get("capacity", {}).get("kilobytes", {})
total_kb = int(cap.get("total", 0))
used_kb = int(cap.get("used", 0))
if total_kb > 0:
result["array"]["total_tb"] = round(total_kb / (1024 ** 2), 1) # KB → TB
result["array"]["used_tb"] = round(used_kb / (1024 ** 2), 1)
# --- Shares (expose as top-level) ---
shares_raw = data.get("shares", [])
def _parse_shares(shares_list: List[Dict[str, Any]], result: Dict[str, Any]) -> None:
"""Parse the ``{prefix}/shares`` MQTT payload."""
shares: List[Dict[str, Any]] = []
for s in shares_raw:
free_kb = s.get("free", 0)
for s in shares_list:
free_bytes = s.get("free_bytes", 0)
total_bytes = s.get("total_bytes", 0)
shares.append({
"name": s.get("name", ""),
"free_gb": round(free_kb / (1024 ** 2), 1),
"free_gb": round(free_bytes / (1024 ** 3), 1) if free_bytes else 0,
"total_gb": round(total_bytes / (1024 ** 3), 1) if total_bytes else 0,
})
result["shares"] = shares
async def _try_graphql_endpoint(
client: httpx.AsyncClient,
server: ServerConfig,
result: Dict[str, Any],
) -> bool:
"""Attempt to fetch stats via the Unraid GraphQL API (6.12+).
Returns True if successful, False otherwise.
"""
if not server.api_key:
return False
base = f"http://{server.host}:{server.port}"
headers = {
"x-api-key": server.api_key,
"Content-Type": "application/json",
"Origin": base,
}
try:
resp = await client.post(
f"{base}/graphql",
headers=headers,
json={"query": _GRAPHQL_QUERY},
)
if resp.status_code == 403:
# 403 means the endpoint exists but auth failed
logger.warning("[UNRAID] %s (%s): GraphQL 403 — invalid API key?",
server.name, server.host)
return False
if resp.status_code != 200:
return False
body = resp.json()
# Check for GraphQL-level errors
errors = body.get("errors")
if errors and not body.get("data"):
first_msg = errors[0].get("message", "") if errors else ""
logger.warning("[UNRAID] %s (%s): GraphQL error: %s",
server.name, server.host, first_msg)
return False
data = body.get("data")
if not data:
return False
_parse_graphql_response(data, result)
logger.info(
"[UNRAID] %s (%s): GraphQL OK — %d containers (%d running), %s cores",
server.name, server.host,
len(result["docker"]["containers"]),
result["docker"]["running"],
result["cpu"]["cores"],
)
return True
except Exception as exc:
logger.debug("[UNRAID] %s (%s): GraphQL failed: %s",
server.name, server.host, exc)
return False
# ---------------------------------------------------------------------------
# Legacy REST parser (custom Unraid API plugins)
# ---------------------------------------------------------------------------
def _parse_system_info(data: Dict[str, Any], result: Dict[str, Any]) -> None:
"""Populate *result* from a generic ``/api/system`` JSON response."""
result["online"] = True
result["uptime"] = data.get("uptime", "")
cpu_data = data.get("cpu", {})
result["cpu"]["usage_pct"] = cpu_data.get("usage_pct", cpu_data.get("usage", 0))
result["cpu"]["cores"] = cpu_data.get("cores", 0)
result["cpu"]["temp_c"] = cpu_data.get("temp_c", cpu_data.get("temp", None))
ram_data = data.get("ram", data.get("memory", {}))
result["ram"]["used_gb"] = round(ram_data.get("used_gb", ram_data.get("used", 0)), 2)
result["ram"]["total_gb"] = round(ram_data.get("total_gb", ram_data.get("total", 0)), 2)
total = result["ram"]["total_gb"]
if total > 0:
result["ram"]["pct"] = round(result["ram"]["used_gb"] / total * 100, 1)
else:
result["ram"]["pct"] = 0
def _parse_array_info(data: Dict[str, Any], result: Dict[str, Any]) -> None:
"""Populate array information from an API response."""
array_data = data.get("array", {})
result["array"]["status"] = array_data.get("status", "unknown")
disks_raw: List[Dict[str, Any]] = array_data.get("disks", [])
parsed_disks: List[Dict[str, Any]] = []
for disk in disks_raw:
parsed_disks.append({
"name": disk.get("name", ""),
"status": disk.get("status", "unknown"),
"size": disk.get("size", ""),
"used": disk.get("used", ""),
"temp_c": disk.get("temp_c", None),
def _parse_disks(disks_list: List[Dict[str, Any]], result: Dict[str, Any]) -> None:
"""Parse the ``{prefix}/disks`` MQTT payload."""
disks: List[Dict[str, Any]] = []
for d in disks_list:
# Skip placeholder disks (empty parity slots etc.)
if d.get("status") == "DISK_NP":
continue
disks.append({
"name": d.get("name", ""),
"model": d.get("model", ""),
"temp_c": d.get("temperature_celsius", None),
"size_gb": round(d.get("size_bytes", 0) / (1024 ** 3), 1) if d.get("size_bytes") else 0,
"smart_status": d.get("smart_status", ""),
"role": d.get("role", ""),
})
result["array"]["disks"] = parsed_disks
result["disks"] = disks
def _parse_docker_info(data: Dict[str, Any], result: Dict[str, Any]) -> None:
"""Populate Docker container information from an API response."""
docker_data = data.get("docker", {})
containers_raw: List[Dict[str, Any]] = docker_data.get("containers", [])
containers: List[Dict[str, Any]] = []
running_count = 0
for container in containers_raw:
status = container.get("status", "unknown")
is_running = "running" in status.lower() if isinstance(status, str) else False
if is_running:
running_count += 1
containers.append({
"name": container.get("name", ""),
"status": status,
"image": container.get("image", ""),
"running": is_running,
})
result["docker"]["running"] = docker_data.get("running", running_count)
result["docker"]["containers"] = containers
def _parse_array(data: Dict[str, Any], result: Dict[str, Any]) -> None:
"""Parse the ``{prefix}/array`` MQTT payload."""
result["array"]["status"] = data.get("state", "unknown").lower()
result["array"]["num_disks"] = data.get("num_disks", 0)
async def _try_rest_endpoint(
client: httpx.AsyncClient,
def fetch_server_from_mqtt(
server: ServerConfig,
result: Dict[str, Any],
) -> bool:
"""Attempt to fetch stats via legacy REST API endpoints.
Returns True if successful, False otherwise.
"""
if not server.api_key:
return False
headers = {"Authorization": f"Bearer {server.api_key}"}
base = f"http://{server.host}:{server.port}"
try:
resp = await client.get(f"{base}/api/system", headers=headers)
if resp.status_code == 200:
data = resp.json()
_parse_system_info(data, result)
_parse_array_info(data, result)
_parse_docker_info(data, result)
logger.info("[UNRAID] %s (%s): REST API OK", server.name, server.host)
return True
else:
logger.debug("[UNRAID] %s (%s): /api/system returned HTTP %d",
server.name, server.host, resp.status_code)
except Exception as exc:
logger.debug("[UNRAID] %s (%s): /api/system failed: %s",
server.name, server.host, exc)
# Try individual endpoints if the combined one failed
fetched_any = False
for endpoint, parser in [
("/api/cpu", lambda d: (
result["cpu"].update({
"usage_pct": d.get("usage_pct", d.get("usage", 0)),
"cores": d.get("cores", 0),
"temp_c": d.get("temp_c", None),
}),
)),
("/api/memory", lambda d: (
result["ram"].update({
"used_gb": round(d.get("used_gb", d.get("used", 0)), 2),
"total_gb": round(d.get("total_gb", d.get("total", 0)), 2),
}),
)),
]:
try:
resp = await client.get(f"{base}{endpoint}", headers=headers)
if resp.status_code == 200:
parser(resp.json())
result["online"] = True
fetched_any = True
except Exception:
pass
try:
resp = await client.get(f"{base}/api/array", headers=headers)
if resp.status_code == 200:
_parse_array_info(resp.json(), result)
result["online"] = True
fetched_any = True
except Exception:
pass
try:
resp = await client.get(f"{base}/api/docker", headers=headers)
if resp.status_code == 200:
_parse_docker_info(resp.json(), result)
result["online"] = True
fetched_any = True
except Exception:
pass
return fetched_any
# ---------------------------------------------------------------------------
# Connectivity fallback
# ---------------------------------------------------------------------------
async def _try_connectivity_check(
client: httpx.AsyncClient,
server: ServerConfig,
result: Dict[str, Any],
) -> None:
"""Perform a basic HTTP connectivity check as a fallback."""
try:
resp = await client.get(
f"http://{server.host}:{server.port}/",
follow_redirects=True,
)
result["online"] = resp.status_code < 500
except Exception:
result["online"] = False
# ---------------------------------------------------------------------------
# Main fetch function
# ---------------------------------------------------------------------------
async def fetch_server_stats(server: ServerConfig) -> Dict[str, Any]:
"""Fetch system stats from an Unraid server.
Strategy:
1. Try Unraid GraphQL API (built-in since 6.12, uses ``x-api-key`` header)
2. Fall back to legacy REST API (custom plugins, uses ``Bearer`` token)
3. Fall back to simple HTTP connectivity check
store: Dict[str, Any],
) -> Dict[str, Any]:
"""Build complete server stats from the MQTT message store.
Args:
server: A :class:`ServerConfig` describing the target server.
server: Server configuration with ``mqtt_prefix``.
store: The ``mqtt_service.store`` dict (topic MqttMessage).
Returns:
Dictionary with server name, host, online status, and detailed stats
for CPU, RAM, array, and Docker containers.
Server stats dictionary ready for the API response.
"""
result = _empty_stats(server)
prefix = server.mqtt_prefix or server.name
result = _empty_stats(server.name, server.host)
if not server.host:
result["error"] = "No host configured"
return result
def _get(topic: str) -> Optional[Any]:
msg = store.get(topic)
if msg is None:
return None
return msg.payload
try:
async with httpx.AsyncClient(timeout=10, verify=False) as client:
# 1) Try GraphQL first (modern Unraid 6.12+)
api_ok = await _try_graphql_endpoint(client, server, result)
# 2) Fall back to REST
if not api_ok:
api_ok = await _try_rest_endpoint(client, server, result)
# 3) Fall back to connectivity check
if not api_ok and not result["online"]:
logger.info("[UNRAID] %s: APIs failed, trying connectivity check", server.name)
await _try_connectivity_check(client, server, result)
except Exception as exc:
# --- Availability ---
avail = _get(f"{prefix}/availability")
if avail == "online":
result["online"] = True
elif avail == "offline":
result["online"] = False
result["error"] = str(exc)
logger.error("[UNRAID] %s (%s): connection failed: %s", server.name, server.host, exc)
result["error"] = "Server offline (MQTT availability)"
if not result["online"]:
logger.warning("[UNRAID] %s (%s): offline (error=%s)", server.name, server.host, result.get("error"))
# --- System (CPU, RAM, uptime, temps) ---
system_data = _get(f"{prefix}/system")
if system_data and isinstance(system_data, dict):
_parse_system(system_data, result)
else:
# No system data means MQTT agent isn't reporting
if result["online"] is False:
result["error"] = "No MQTT data available"
# --- Docker containers ---
docker_data = _get(f"{prefix}/docker/containers")
if docker_data and isinstance(docker_data, list):
_parse_docker(docker_data, result)
# --- Shares ---
shares_data = _get(f"{prefix}/shares")
if shares_data and isinstance(shares_data, list):
_parse_shares(shares_data, result)
# --- Disks ---
disks_data = _get(f"{prefix}/disks")
if disks_data and isinstance(disks_data, list):
_parse_disks(disks_data, result)
# --- Array ---
array_data = _get(f"{prefix}/array")
if array_data and isinstance(array_data, dict):
_parse_array(array_data, result)
# Check data freshness (system topic timestamp)
sys_msg = store.get(f"{prefix}/system")
if sys_msg:
age = time.time() - sys_msg.timestamp
if age > 120: # More than 2 minutes old
result["stale"] = True
logger.warning(
"[UNRAID] %s: MQTT data is %.0fs old (stale)",
server.name, age,
)
logger.debug(
"[UNRAID] %s: MQTT — CPU %.1f%% (%d°C), RAM %.1f%%, Docker %d/%d",
server.name,
result["cpu"].get("usage_pct", 0),
result["cpu"].get("temp_c", 0) or 0,
result["ram"].get("pct", 0),
result["docker"]["running"],
len(result["docker"]["containers"]),
)
return result
async def fetch_all_servers(servers: List[ServerConfig]) -> List[Dict[str, Any]]:
"""Fetch stats from all configured Unraid servers in parallel.
def fetch_all_servers_mqtt(
servers: List[ServerConfig],
store: Dict[str, Any],
) -> List[Dict[str, Any]]:
"""Fetch stats for all configured servers from the MQTT store.
Args:
servers: List of :class:`ServerConfig` instances.
Returns:
List of stats dictionaries, one per server.
This is synchronous no HTTP calls, just reading in-memory data.
"""
if not servers:
return []
tasks = [fetch_server_stats(srv) for srv in servers]
return list(await asyncio.gather(*tasks))
return [fetch_server_from_mqtt(srv, store) for srv in servers]