from __future__ import annotations import asyncio import logging import httpx from dataclasses import dataclass, field from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) @dataclass class ServerConfig: """Configuration for a single Unraid server.""" name: str host: str api_key: str = "" port: int = 80 def _empty_stats(server: ServerConfig) -> Dict[str, Any]: """Return a default stats dictionary for a server that has not yet been queried.""" return { "name": server.name, "host": server.host, "online": False, "uptime": "", "cpu": {"usage_pct": 0, "cores": 0, "temp_c": None}, "ram": {"used_gb": 0, "total_gb": 0, "pct": 0}, "array": {"status": "unknown", "disks": []}, "docker": {"running": 0, "containers": []}, "error": None, } def _parse_system_info(data: Dict[str, Any], result: Dict[str, Any]) -> None: """Populate *result* from a generic ``/api/system`` JSON response.""" result["online"] = True result["uptime"] = data.get("uptime", "") cpu_data = data.get("cpu", {}) result["cpu"]["usage_pct"] = cpu_data.get("usage_pct", cpu_data.get("usage", 0)) result["cpu"]["cores"] = cpu_data.get("cores", 0) result["cpu"]["temp_c"] = cpu_data.get("temp_c", cpu_data.get("temp", None)) ram_data = data.get("ram", data.get("memory", {})) result["ram"]["used_gb"] = round(ram_data.get("used_gb", ram_data.get("used", 0)), 2) result["ram"]["total_gb"] = round(ram_data.get("total_gb", ram_data.get("total", 0)), 2) total = result["ram"]["total_gb"] if total > 0: result["ram"]["pct"] = round(result["ram"]["used_gb"] / total * 100, 1) else: result["ram"]["pct"] = 0 def _parse_array_info(data: Dict[str, Any], result: Dict[str, Any]) -> None: """Populate array information from an API response.""" array_data = data.get("array", {}) result["array"]["status"] = array_data.get("status", "unknown") disks_raw: List[Dict[str, Any]] = array_data.get("disks", []) parsed_disks: List[Dict[str, Any]] = [] for disk in disks_raw: parsed_disks.append({ "name": disk.get("name", ""), "status": disk.get("status", "unknown"), "size": disk.get("size", ""), "used": disk.get("used", ""), "temp_c": disk.get("temp_c", None), }) result["array"]["disks"] = parsed_disks def _parse_docker_info(data: Dict[str, Any], result: Dict[str, Any]) -> None: """Populate Docker container information from an API response.""" docker_data = data.get("docker", {}) containers_raw: List[Dict[str, Any]] = docker_data.get("containers", []) containers: List[Dict[str, Any]] = [] running_count = 0 for container in containers_raw: status = container.get("status", "unknown") is_running = "running" in status.lower() if isinstance(status, str) else False if is_running: running_count += 1 containers.append({ "name": container.get("name", ""), "status": status, "image": container.get("image", ""), "running": is_running, }) result["docker"]["running"] = docker_data.get("running", running_count) result["docker"]["containers"] = containers async def _try_api_endpoint( client: httpx.AsyncClient, server: ServerConfig, result: Dict[str, Any], ) -> bool: """Attempt to fetch stats via the Unraid OS API. Returns True if successful, False otherwise. """ if not server.api_key: return False headers = {"Authorization": f"Bearer {server.api_key}"} base = f"http://{server.host}:{server.port}" try: resp = await client.get(f"{base}/api/system", headers=headers) if resp.status_code == 200: data = resp.json() _parse_system_info(data, result) _parse_array_info(data, result) _parse_docker_info(data, result) logger.info("[UNRAID] %s (%s): API OK", server.name, server.host) return True else: logger.warning("[UNRAID] %s (%s): /api/system returned HTTP %d", server.name, server.host, resp.status_code) except Exception as exc: logger.warning("[UNRAID] %s (%s): /api/system failed: %s", server.name, server.host, exc) # Try individual endpoints if the combined one failed fetched_any = False try: resp = await client.get(f"{base}/api/cpu", headers=headers) if resp.status_code == 200: cpu_data = resp.json() result["cpu"]["usage_pct"] = cpu_data.get("usage_pct", cpu_data.get("usage", 0)) result["cpu"]["cores"] = cpu_data.get("cores", 0) result["cpu"]["temp_c"] = cpu_data.get("temp_c", None) result["online"] = True fetched_any = True except Exception: pass try: resp = await client.get(f"{base}/api/memory", headers=headers) if resp.status_code == 200: ram_data = resp.json() result["ram"]["used_gb"] = round(ram_data.get("used_gb", ram_data.get("used", 0)), 2) result["ram"]["total_gb"] = round(ram_data.get("total_gb", ram_data.get("total", 0)), 2) total = result["ram"]["total_gb"] if total > 0: result["ram"]["pct"] = round(result["ram"]["used_gb"] / total * 100, 1) result["online"] = True fetched_any = True except Exception: pass try: resp = await client.get(f"{base}/api/array", headers=headers) if resp.status_code == 200: _parse_array_info(resp.json(), result) result["online"] = True fetched_any = True except Exception: pass try: resp = await client.get(f"{base}/api/docker", headers=headers) if resp.status_code == 200: _parse_docker_info(resp.json(), result) result["online"] = True fetched_any = True except Exception: pass return fetched_any async def _try_connectivity_check( client: httpx.AsyncClient, server: ServerConfig, result: Dict[str, Any], ) -> None: """Perform a basic HTTP connectivity check as a fallback.""" try: resp = await client.get( f"http://{server.host}:{server.port}/", follow_redirects=True, ) result["online"] = resp.status_code < 500 except Exception: result["online"] = False async def fetch_server_stats(server: ServerConfig) -> Dict[str, Any]: """Fetch system stats from an Unraid server. Tries the Unraid API first (if ``api_key`` is configured), then falls back to a simple HTTP connectivity check. Args: server: A :class:`ServerConfig` describing the target server. Returns: Dictionary with server name, host, online status, and detailed stats for CPU, RAM, array, and Docker containers. """ result = _empty_stats(server) if not server.host: result["error"] = "No host configured" return result try: async with httpx.AsyncClient(timeout=10, verify=False) as client: api_ok = await _try_api_endpoint(client, server, result) if not api_ok and not result["online"]: logger.info("[UNRAID] %s: API failed, trying connectivity check", server.name) await _try_connectivity_check(client, server, result) except Exception as exc: result["online"] = False result["error"] = str(exc) logger.error("[UNRAID] %s (%s): connection failed: %s", server.name, server.host, exc) if not result["online"]: logger.warning("[UNRAID] %s (%s): offline (error=%s)", server.name, server.host, result.get("error")) return result async def fetch_all_servers(servers: List[ServerConfig]) -> List[Dict[str, Any]]: """Fetch stats from all configured Unraid servers in parallel. Args: servers: List of :class:`ServerConfig` instances. Returns: List of stats dictionaries, one per server. """ if not servers: return [] tasks = [fetch_server_stats(srv) for srv in servers] return list(await asyncio.gather(*tasks))