daily-briefing/server/services/unraid_service.py

257 lines
8.4 KiB
Python
Raw Permalink Normal View History

"""Unraid server stats — MQTT-only data source.
Reads all server data directly from the MQTT message store, which is
populated by the Unraid MQTT Agent running on each server. This
eliminates the need for GraphQL/REST API keys or HTTP polling.
MQTT topics used per server (prefix = e.g. "Adriahub" or "unraid-daddelolymp"):
{prefix}/system CPU, RAM, uptime, temps, hostname, version
{prefix}/docker/containers Docker container list with per-container stats
{prefix}/shares Share names, free/used/total bytes
{prefix}/disks Disk info with temps, SMART, model
{prefix}/array Array state
{prefix}/availability "online" / "offline"
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
@dataclass
class ServerConfig:
"""Configuration for a single Unraid server."""
name: str
host: str = ""
mqtt_prefix: str = ""
# Deprecated — kept for backward compat with old DB rows
api_key: str = ""
port: int = 80
def _empty_stats(name: str, host: str = "") -> Dict[str, Any]:
"""Return a default stats dictionary for a server with no data yet."""
return {
"name": name,
"host": host,
"online": False,
"uptime": "",
"cpu": {"usage_pct": 0, "cores": 0, "temp_c": None, "threads": 0, "brand": ""},
"ram": {"used_gb": 0, "total_gb": 0, "pct": 0},
"array": {"status": "unknown", "disks": []},
"docker": {"running": 0, "containers": []},
"shares": [],
"disks": [],
"error": None,
}
def _format_uptime(seconds: int) -> str:
"""Convert uptime seconds to a human-readable string."""
days = seconds // 86400
hours = (seconds % 86400) // 3600
if days > 0:
return f"{days}d {hours}h"
return f"{hours}h"
def _parse_system(data: Dict[str, Any], result: Dict[str, Any]) -> None:
"""Parse the ``{prefix}/system`` MQTT payload."""
# CPU
cpu_pct = data.get("cpu_usage_percent")
if cpu_pct is not None:
result["cpu"]["usage_pct"] = round(float(cpu_pct), 1)
result["cpu"]["cores"] = data.get("cpu_cores", 0)
result["cpu"]["threads"] = data.get("cpu_threads", 0)
result["cpu"]["brand"] = data.get("cpu_model", "")
cpu_temp = data.get("cpu_temp_celsius")
if cpu_temp is not None:
result["cpu"]["temp_c"] = cpu_temp
mb_temp = data.get("motherboard_temp_celsius")
if mb_temp is not None:
result["cpu"]["mb_temp_c"] = mb_temp
# RAM
ram_pct = data.get("ram_usage_percent")
if ram_pct is not None:
result["ram"]["pct"] = round(float(ram_pct), 1)
ram_total = data.get("ram_total_bytes")
if ram_total:
result["ram"]["total_gb"] = round(ram_total / (1024 ** 3), 1)
ram_used = data.get("ram_used_bytes")
if ram_used:
result["ram"]["used_gb"] = round(ram_used / (1024 ** 3), 1)
# Uptime
uptime_secs = data.get("uptime_seconds")
if uptime_secs:
result["uptime"] = _format_uptime(int(uptime_secs))
# Extra metadata
result["version"] = data.get("version", "")
result["kernel"] = data.get("kernel_version", "")
result["motherboard"] = data.get("server_model", "")
result["online"] = True
def _parse_docker(containers_list: List[Dict[str, Any]], result: Dict[str, Any]) -> None:
"""Parse the ``{prefix}/docker/containers`` MQTT payload."""
containers: List[Dict[str, Any]] = []
running_count = 0
for c in containers_list:
state = c.get("state", "unknown")
is_running = state == "running"
if is_running:
running_count += 1
containers.append({
"name": c.get("name", "unknown"),
"status": c.get("status", ""),
"image": c.get("image", ""),
"running": is_running,
})
result["docker"]["running"] = running_count
result["docker"]["containers"] = containers
def _parse_shares(shares_list: List[Dict[str, Any]], result: Dict[str, Any]) -> None:
"""Parse the ``{prefix}/shares`` MQTT payload."""
shares: List[Dict[str, Any]] = []
for s in shares_list:
free_bytes = s.get("free_bytes", 0)
total_bytes = s.get("total_bytes", 0)
shares.append({
"name": s.get("name", ""),
"free_gb": round(free_bytes / (1024 ** 3), 1) if free_bytes else 0,
"total_gb": round(total_bytes / (1024 ** 3), 1) if total_bytes else 0,
})
result["shares"] = shares
def _parse_disks(disks_list: List[Dict[str, Any]], result: Dict[str, Any]) -> None:
"""Parse the ``{prefix}/disks`` MQTT payload."""
disks: List[Dict[str, Any]] = []
for d in disks_list:
# Skip placeholder disks (empty parity slots etc.)
if d.get("status") == "DISK_NP":
continue
disks.append({
"name": d.get("name", ""),
"model": d.get("model", ""),
"temp_c": d.get("temperature_celsius", None),
"size_gb": round(d.get("size_bytes", 0) / (1024 ** 3), 1) if d.get("size_bytes") else 0,
"smart_status": d.get("smart_status", ""),
"role": d.get("role", ""),
})
result["disks"] = disks
def _parse_array(data: Dict[str, Any], result: Dict[str, Any]) -> None:
"""Parse the ``{prefix}/array`` MQTT payload."""
result["array"]["status"] = data.get("state", "unknown").lower()
result["array"]["num_disks"] = data.get("num_disks", 0)
def fetch_server_from_mqtt(
server: ServerConfig,
store: Dict[str, Any],
) -> Dict[str, Any]:
"""Build complete server stats from the MQTT message store.
Args:
server: Server configuration with ``mqtt_prefix``.
store: The ``mqtt_service.store`` dict (topic MqttMessage).
Returns:
Server stats dictionary ready for the API response.
"""
prefix = server.mqtt_prefix or server.name
result = _empty_stats(server.name, server.host)
def _get(topic: str) -> Optional[Any]:
msg = store.get(topic)
if msg is None:
return None
return msg.payload
# --- Availability ---
avail = _get(f"{prefix}/availability")
if avail == "online":
result["online"] = True
elif avail == "offline":
result["online"] = False
result["error"] = "Server offline (MQTT availability)"
# --- System (CPU, RAM, uptime, temps) ---
system_data = _get(f"{prefix}/system")
if system_data and isinstance(system_data, dict):
_parse_system(system_data, result)
else:
# No system data means MQTT agent isn't reporting
if result["online"] is False:
result["error"] = "No MQTT data available"
# --- Docker containers ---
docker_data = _get(f"{prefix}/docker/containers")
if docker_data and isinstance(docker_data, list):
_parse_docker(docker_data, result)
# --- Shares ---
shares_data = _get(f"{prefix}/shares")
if shares_data and isinstance(shares_data, list):
_parse_shares(shares_data, result)
# --- Disks ---
disks_data = _get(f"{prefix}/disks")
if disks_data and isinstance(disks_data, list):
_parse_disks(disks_data, result)
# --- Array ---
array_data = _get(f"{prefix}/array")
if array_data and isinstance(array_data, dict):
_parse_array(array_data, result)
# Check data freshness (system topic timestamp)
sys_msg = store.get(f"{prefix}/system")
if sys_msg:
age = time.time() - sys_msg.timestamp
if age > 120: # More than 2 minutes old
result["stale"] = True
logger.warning(
"[UNRAID] %s: MQTT data is %.0fs old (stale)",
server.name, age,
)
logger.debug(
"[UNRAID] %s: MQTT — CPU %.1f%% (%d°C), RAM %.1f%%, Docker %d/%d",
server.name,
result["cpu"].get("usage_pct", 0),
result["cpu"].get("temp_c", 0) or 0,
result["ram"].get("pct", 0),
result["docker"]["running"],
len(result["docker"]["containers"]),
)
return result
def fetch_all_servers_mqtt(
servers: List[ServerConfig],
store: Dict[str, Any],
) -> List[Dict[str, Any]]:
"""Fetch stats for all configured servers from the MQTT store.
This is synchronous no HTTP calls, just reading in-memory data.
"""
if not servers:
return []
return [fetch_server_from_mqtt(srv, store) for srv in servers]