from __future__ import annotations import asyncio import logging import httpx from dataclasses import dataclass, field from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # GraphQL query for the Unraid built-in API (6.12+) # --------------------------------------------------------------------------- _GRAPHQL_QUERY = """ { online info { os { hostname uptime } cpu { model cores threads } memory { layout { size type } } } docker { containers { names state status image } } array { state capacity { kilobytes { free used total } } } shares { name free size } } """.strip() @dataclass class ServerConfig: """Configuration for a single Unraid server.""" name: str host: str api_key: str = "" port: int = 80 def _empty_stats(server: ServerConfig) -> Dict[str, Any]: """Return a default stats dictionary for a server that has not yet been queried.""" return { "name": server.name, "host": server.host, "online": False, "uptime": "", "cpu": {"usage_pct": 0, "cores": 0, "temp_c": None}, "ram": {"used_gb": 0, "total_gb": 0, "pct": 0}, "array": {"status": "unknown", "disks": []}, "docker": {"running": 0, "containers": []}, "error": None, } # --------------------------------------------------------------------------- # GraphQL parser (Unraid 6.12+ built-in API) # --------------------------------------------------------------------------- def _parse_graphql_response(data: Dict[str, Any], result: Dict[str, Any]) -> None: """Parse a successful GraphQL response into the standard result dict.""" result["online"] = data.get("online", True) # --- Info --- info = data.get("info", {}) os_info = info.get("os", {}) result["uptime"] = os_info.get("uptime", "") cpu_info = info.get("cpu", {}) result["cpu"]["cores"] = cpu_info.get("cores", 0) # GraphQL API doesn't expose CPU usage % — keep 0 # Store threads for the frontend to show result["cpu"]["threads"] = cpu_info.get("threads", 0) # Memory: sum layout slots for total GB mem_layout = info.get("memory", {}).get("layout", []) total_bytes = sum(slot.get("size", 0) for slot in mem_layout) result["ram"]["total_gb"] = round(total_bytes / (1024 ** 3), 1) # GraphQL API doesn't expose used memory — keep 0 # --- Docker --- docker_data = data.get("docker", {}) containers_raw: List[Dict[str, Any]] = docker_data.get("containers", []) containers: List[Dict[str, Any]] = [] running_count = 0 for c in containers_raw: names = c.get("names", []) name = names[0].lstrip("/") if names else "unknown" state = c.get("state", "unknown") is_running = state == "RUNNING" if is_running: running_count += 1 containers.append({ "name": name, "status": c.get("status", ""), "image": c.get("image", ""), "running": is_running, }) result["docker"]["running"] = running_count result["docker"]["containers"] = containers # --- Array --- array_data = data.get("array", {}) result["array"]["status"] = array_data.get("state", "unknown").lower() cap = array_data.get("capacity", {}).get("kilobytes", {}) total_kb = int(cap.get("total", 0)) used_kb = int(cap.get("used", 0)) if total_kb > 0: result["array"]["total_tb"] = round(total_kb / (1024 ** 2), 1) # KB → TB result["array"]["used_tb"] = round(used_kb / (1024 ** 2), 1) # --- Shares (expose as top-level) --- shares_raw = data.get("shares", []) shares: List[Dict[str, Any]] = [] for s in shares_raw: free_kb = s.get("free", 0) shares.append({ "name": s.get("name", ""), "free_gb": round(free_kb / (1024 ** 2), 1), }) result["shares"] = shares async def _try_graphql_endpoint( client: httpx.AsyncClient, server: ServerConfig, result: Dict[str, Any], ) -> bool: """Attempt to fetch stats via the Unraid GraphQL API (6.12+). Returns True if successful, False otherwise. """ if not server.api_key: return False base = f"http://{server.host}:{server.port}" headers = { "x-api-key": server.api_key, "Content-Type": "application/json", "Origin": base, } try: resp = await client.post( f"{base}/graphql", headers=headers, json={"query": _GRAPHQL_QUERY}, ) if resp.status_code == 403: # 403 means the endpoint exists but auth failed logger.warning("[UNRAID] %s (%s): GraphQL 403 — invalid API key?", server.name, server.host) return False if resp.status_code != 200: return False body = resp.json() # Check for GraphQL-level errors errors = body.get("errors") if errors and not body.get("data"): first_msg = errors[0].get("message", "") if errors else "" logger.warning("[UNRAID] %s (%s): GraphQL error: %s", server.name, server.host, first_msg) return False data = body.get("data") if not data: return False _parse_graphql_response(data, result) logger.info( "[UNRAID] %s (%s): GraphQL OK — %d containers (%d running), %s cores", server.name, server.host, len(result["docker"]["containers"]), result["docker"]["running"], result["cpu"]["cores"], ) return True except Exception as exc: logger.debug("[UNRAID] %s (%s): GraphQL failed: %s", server.name, server.host, exc) return False # --------------------------------------------------------------------------- # Legacy REST parser (custom Unraid API plugins) # --------------------------------------------------------------------------- def _parse_system_info(data: Dict[str, Any], result: Dict[str, Any]) -> None: """Populate *result* from a generic ``/api/system`` JSON response.""" result["online"] = True result["uptime"] = data.get("uptime", "") cpu_data = data.get("cpu", {}) result["cpu"]["usage_pct"] = cpu_data.get("usage_pct", cpu_data.get("usage", 0)) result["cpu"]["cores"] = cpu_data.get("cores", 0) result["cpu"]["temp_c"] = cpu_data.get("temp_c", cpu_data.get("temp", None)) ram_data = data.get("ram", data.get("memory", {})) result["ram"]["used_gb"] = round(ram_data.get("used_gb", ram_data.get("used", 0)), 2) result["ram"]["total_gb"] = round(ram_data.get("total_gb", ram_data.get("total", 0)), 2) total = result["ram"]["total_gb"] if total > 0: result["ram"]["pct"] = round(result["ram"]["used_gb"] / total * 100, 1) else: result["ram"]["pct"] = 0 def _parse_array_info(data: Dict[str, Any], result: Dict[str, Any]) -> None: """Populate array information from an API response.""" array_data = data.get("array", {}) result["array"]["status"] = array_data.get("status", "unknown") disks_raw: List[Dict[str, Any]] = array_data.get("disks", []) parsed_disks: List[Dict[str, Any]] = [] for disk in disks_raw: parsed_disks.append({ "name": disk.get("name", ""), "status": disk.get("status", "unknown"), "size": disk.get("size", ""), "used": disk.get("used", ""), "temp_c": disk.get("temp_c", None), }) result["array"]["disks"] = parsed_disks def _parse_docker_info(data: Dict[str, Any], result: Dict[str, Any]) -> None: """Populate Docker container information from an API response.""" docker_data = data.get("docker", {}) containers_raw: List[Dict[str, Any]] = docker_data.get("containers", []) containers: List[Dict[str, Any]] = [] running_count = 0 for container in containers_raw: status = container.get("status", "unknown") is_running = "running" in status.lower() if isinstance(status, str) else False if is_running: running_count += 1 containers.append({ "name": container.get("name", ""), "status": status, "image": container.get("image", ""), "running": is_running, }) result["docker"]["running"] = docker_data.get("running", running_count) result["docker"]["containers"] = containers async def _try_rest_endpoint( client: httpx.AsyncClient, server: ServerConfig, result: Dict[str, Any], ) -> bool: """Attempt to fetch stats via legacy REST API endpoints. Returns True if successful, False otherwise. """ if not server.api_key: return False headers = {"Authorization": f"Bearer {server.api_key}"} base = f"http://{server.host}:{server.port}" try: resp = await client.get(f"{base}/api/system", headers=headers) if resp.status_code == 200: data = resp.json() _parse_system_info(data, result) _parse_array_info(data, result) _parse_docker_info(data, result) logger.info("[UNRAID] %s (%s): REST API OK", server.name, server.host) return True else: logger.debug("[UNRAID] %s (%s): /api/system returned HTTP %d", server.name, server.host, resp.status_code) except Exception as exc: logger.debug("[UNRAID] %s (%s): /api/system failed: %s", server.name, server.host, exc) # Try individual endpoints if the combined one failed fetched_any = False for endpoint, parser in [ ("/api/cpu", lambda d: ( result["cpu"].update({ "usage_pct": d.get("usage_pct", d.get("usage", 0)), "cores": d.get("cores", 0), "temp_c": d.get("temp_c", None), }), )), ("/api/memory", lambda d: ( result["ram"].update({ "used_gb": round(d.get("used_gb", d.get("used", 0)), 2), "total_gb": round(d.get("total_gb", d.get("total", 0)), 2), }), )), ]: try: resp = await client.get(f"{base}{endpoint}", headers=headers) if resp.status_code == 200: parser(resp.json()) result["online"] = True fetched_any = True except Exception: pass try: resp = await client.get(f"{base}/api/array", headers=headers) if resp.status_code == 200: _parse_array_info(resp.json(), result) result["online"] = True fetched_any = True except Exception: pass try: resp = await client.get(f"{base}/api/docker", headers=headers) if resp.status_code == 200: _parse_docker_info(resp.json(), result) result["online"] = True fetched_any = True except Exception: pass return fetched_any # --------------------------------------------------------------------------- # Connectivity fallback # --------------------------------------------------------------------------- async def _try_connectivity_check( client: httpx.AsyncClient, server: ServerConfig, result: Dict[str, Any], ) -> None: """Perform a basic HTTP connectivity check as a fallback.""" try: resp = await client.get( f"http://{server.host}:{server.port}/", follow_redirects=True, ) result["online"] = resp.status_code < 500 except Exception: result["online"] = False # --------------------------------------------------------------------------- # Main fetch function # --------------------------------------------------------------------------- async def fetch_server_stats(server: ServerConfig) -> Dict[str, Any]: """Fetch system stats from an Unraid server. Strategy: 1. Try Unraid GraphQL API (built-in since 6.12, uses ``x-api-key`` header) 2. Fall back to legacy REST API (custom plugins, uses ``Bearer`` token) 3. Fall back to simple HTTP connectivity check Args: server: A :class:`ServerConfig` describing the target server. Returns: Dictionary with server name, host, online status, and detailed stats for CPU, RAM, array, and Docker containers. """ result = _empty_stats(server) if not server.host: result["error"] = "No host configured" return result try: async with httpx.AsyncClient(timeout=10, verify=False) as client: # 1) Try GraphQL first (modern Unraid 6.12+) api_ok = await _try_graphql_endpoint(client, server, result) # 2) Fall back to REST if not api_ok: api_ok = await _try_rest_endpoint(client, server, result) # 3) Fall back to connectivity check if not api_ok and not result["online"]: logger.info("[UNRAID] %s: APIs failed, trying connectivity check", server.name) await _try_connectivity_check(client, server, result) except Exception as exc: result["online"] = False result["error"] = str(exc) logger.error("[UNRAID] %s (%s): connection failed: %s", server.name, server.host, exc) if not result["online"]: logger.warning("[UNRAID] %s (%s): offline (error=%s)", server.name, server.host, result.get("error")) return result async def fetch_all_servers(servers: List[ServerConfig]) -> List[Dict[str, Any]]: """Fetch stats from all configured Unraid servers in parallel. Args: servers: List of :class:`ServerConfig` instances. Returns: List of stats dictionaries, one per server. """ if not servers: return [] tasks = [fetch_server_stats(srv) for srv in servers] return list(await asyncio.gather(*tasks))