"""Unraid server stats — MQTT-only data source. Reads all server data directly from the MQTT message store, which is populated by the Unraid MQTT Agent running on each server. This eliminates the need for GraphQL/REST API keys or HTTP polling. MQTT topics used per server (prefix = e.g. "Adriahub" or "unraid-daddelolymp"): {prefix}/system — CPU, RAM, uptime, temps, hostname, version {prefix}/docker/containers — Docker container list with per-container stats {prefix}/shares — Share names, free/used/total bytes {prefix}/disks — Disk info with temps, SMART, model {prefix}/array — Array state {prefix}/availability — "online" / "offline" """ from __future__ import annotations import logging import time from dataclasses import dataclass from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) @dataclass class ServerConfig: """Configuration for a single Unraid server.""" name: str host: str = "" mqtt_prefix: str = "" # Deprecated — kept for backward compat with old DB rows api_key: str = "" port: int = 80 def _empty_stats(name: str, host: str = "") -> Dict[str, Any]: """Return a default stats dictionary for a server with no data yet.""" return { "name": name, "host": host, "online": False, "uptime": "", "cpu": {"usage_pct": 0, "cores": 0, "temp_c": None, "threads": 0, "brand": ""}, "ram": {"used_gb": 0, "total_gb": 0, "pct": 0}, "array": {"status": "unknown", "disks": []}, "docker": {"running": 0, "containers": []}, "shares": [], "disks": [], "error": None, } def _format_uptime(seconds: int) -> str: """Convert uptime seconds to a human-readable string.""" days = seconds // 86400 hours = (seconds % 86400) // 3600 if days > 0: return f"{days}d {hours}h" return f"{hours}h" def _parse_system(data: Dict[str, Any], result: Dict[str, Any]) -> None: """Parse the ``{prefix}/system`` MQTT payload.""" # CPU cpu_pct = data.get("cpu_usage_percent") if cpu_pct is not None: result["cpu"]["usage_pct"] = round(float(cpu_pct), 1) result["cpu"]["cores"] = data.get("cpu_cores", 0) result["cpu"]["threads"] = data.get("cpu_threads", 0) result["cpu"]["brand"] = data.get("cpu_model", "") cpu_temp = data.get("cpu_temp_celsius") if cpu_temp is not None: result["cpu"]["temp_c"] = cpu_temp mb_temp = data.get("motherboard_temp_celsius") if mb_temp is not None: result["cpu"]["mb_temp_c"] = mb_temp # RAM ram_pct = data.get("ram_usage_percent") if ram_pct is not None: result["ram"]["pct"] = round(float(ram_pct), 1) ram_total = data.get("ram_total_bytes") if ram_total: result["ram"]["total_gb"] = round(ram_total / (1024 ** 3), 1) ram_used = data.get("ram_used_bytes") if ram_used: result["ram"]["used_gb"] = round(ram_used / (1024 ** 3), 1) # Uptime uptime_secs = data.get("uptime_seconds") if uptime_secs: result["uptime"] = _format_uptime(int(uptime_secs)) # Extra metadata result["version"] = data.get("version", "") result["kernel"] = data.get("kernel_version", "") result["motherboard"] = data.get("server_model", "") result["online"] = True def _parse_docker(containers_list: List[Dict[str, Any]], result: Dict[str, Any]) -> None: """Parse the ``{prefix}/docker/containers`` MQTT payload.""" containers: List[Dict[str, Any]] = [] running_count = 0 for c in containers_list: state = c.get("state", "unknown") is_running = state == "running" if is_running: running_count += 1 containers.append({ "name": c.get("name", "unknown"), "status": c.get("status", ""), "image": c.get("image", ""), "running": is_running, }) result["docker"]["running"] = running_count result["docker"]["containers"] = containers def _parse_shares(shares_list: List[Dict[str, Any]], result: Dict[str, Any]) -> None: """Parse the ``{prefix}/shares`` MQTT payload.""" shares: List[Dict[str, Any]] = [] for s in shares_list: free_bytes = s.get("free_bytes", 0) total_bytes = s.get("total_bytes", 0) shares.append({ "name": s.get("name", ""), "free_gb": round(free_bytes / (1024 ** 3), 1) if free_bytes else 0, "total_gb": round(total_bytes / (1024 ** 3), 1) if total_bytes else 0, }) result["shares"] = shares def _parse_disks(disks_list: List[Dict[str, Any]], result: Dict[str, Any]) -> None: """Parse the ``{prefix}/disks`` MQTT payload.""" disks: List[Dict[str, Any]] = [] for d in disks_list: # Skip placeholder disks (empty parity slots etc.) if d.get("status") == "DISK_NP": continue disks.append({ "name": d.get("name", ""), "model": d.get("model", ""), "temp_c": d.get("temperature_celsius", None), "size_gb": round(d.get("size_bytes", 0) / (1024 ** 3), 1) if d.get("size_bytes") else 0, "smart_status": d.get("smart_status", ""), "role": d.get("role", ""), }) result["disks"] = disks def _parse_array(data: Dict[str, Any], result: Dict[str, Any]) -> None: """Parse the ``{prefix}/array`` MQTT payload.""" result["array"]["status"] = data.get("state", "unknown").lower() result["array"]["num_disks"] = data.get("num_disks", 0) def fetch_server_from_mqtt( server: ServerConfig, store: Dict[str, Any], ) -> Dict[str, Any]: """Build complete server stats from the MQTT message store. Args: server: Server configuration with ``mqtt_prefix``. store: The ``mqtt_service.store`` dict (topic → MqttMessage). Returns: Server stats dictionary ready for the API response. """ prefix = server.mqtt_prefix or server.name result = _empty_stats(server.name, server.host) def _get(topic: str) -> Optional[Any]: msg = store.get(topic) if msg is None: return None return msg.payload # --- Availability --- avail = _get(f"{prefix}/availability") if avail == "online": result["online"] = True elif avail == "offline": result["online"] = False result["error"] = "Server offline (MQTT availability)" # --- System (CPU, RAM, uptime, temps) --- system_data = _get(f"{prefix}/system") if system_data and isinstance(system_data, dict): _parse_system(system_data, result) else: # No system data means MQTT agent isn't reporting if result["online"] is False: result["error"] = "No MQTT data available" # --- Docker containers --- docker_data = _get(f"{prefix}/docker/containers") if docker_data and isinstance(docker_data, list): _parse_docker(docker_data, result) # --- Shares --- shares_data = _get(f"{prefix}/shares") if shares_data and isinstance(shares_data, list): _parse_shares(shares_data, result) # --- Disks --- disks_data = _get(f"{prefix}/disks") if disks_data and isinstance(disks_data, list): _parse_disks(disks_data, result) # --- Array --- array_data = _get(f"{prefix}/array") if array_data and isinstance(array_data, dict): _parse_array(array_data, result) # Check data freshness (system topic timestamp) sys_msg = store.get(f"{prefix}/system") if sys_msg: age = time.time() - sys_msg.timestamp if age > 120: # More than 2 minutes old result["stale"] = True logger.warning( "[UNRAID] %s: MQTT data is %.0fs old (stale)", server.name, age, ) logger.debug( "[UNRAID] %s: MQTT — CPU %.1f%% (%d°C), RAM %.1f%%, Docker %d/%d", server.name, result["cpu"].get("usage_pct", 0), result["cpu"].get("temp_c", 0) or 0, result["ram"].get("pct", 0), result["docker"]["running"], len(result["docker"]["containers"]), ) return result def fetch_all_servers_mqtt( servers: List[ServerConfig], store: Dict[str, Any], ) -> List[Dict[str, Any]]: """Fetch stats for all configured servers from the MQTT store. This is synchronous — no HTTP calls, just reading in-memory data. """ if not servers: return [] return [fetch_server_from_mqtt(srv, store) for srv in servers]