Replace monolithic Jinja2 template with modern stack: Backend (FastAPI): - Modular router/service architecture - Async PostgreSQL (asyncpg) for news from n8n pipeline - Live Unraid server stats (2 servers via API) - Home Assistant, Vikunja tasks, weather (wttr.in) - WebSocket broadcast for real-time updates (15s) - TTL cache per endpoint, all config via ENV vars Frontend (React + Vite + TypeScript): - Glassmorphism dark theme with Tailwind CSS - Responsive grid: mobile/tablet/desktop/ultrawide - Weather cards, hourly forecast, news with category tabs - Server stats (CPU ring, RAM bar, Docker list) - Home Assistant controls, task management - Live clock, WebSocket connection indicator Infrastructure: - Multi-stage Dockerfile (node:22-alpine + python:3.11-slim) - docker-compose with full ENV configuration - Kaniko CI/CD pipeline for GitLab registry Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
233 lines
7.5 KiB
Python
233 lines
7.5 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import httpx
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
|
|
@dataclass
|
|
class ServerConfig:
|
|
"""Configuration for a single Unraid server."""
|
|
|
|
name: str
|
|
host: str
|
|
api_key: str = ""
|
|
port: int = 80
|
|
|
|
|
|
def _empty_stats(server: ServerConfig) -> Dict[str, Any]:
|
|
"""Return a default stats dictionary for a server that has not yet been queried."""
|
|
return {
|
|
"name": server.name,
|
|
"host": server.host,
|
|
"online": False,
|
|
"uptime": "",
|
|
"cpu": {"usage_pct": 0, "cores": 0, "temp_c": None},
|
|
"ram": {"used_gb": 0, "total_gb": 0, "pct": 0},
|
|
"array": {"status": "unknown", "disks": []},
|
|
"docker": {"running": 0, "containers": []},
|
|
"error": None,
|
|
}
|
|
|
|
|
|
def _parse_system_info(data: Dict[str, Any], result: Dict[str, Any]) -> None:
|
|
"""Populate *result* from a generic ``/api/system`` JSON response."""
|
|
result["online"] = True
|
|
result["uptime"] = data.get("uptime", "")
|
|
|
|
cpu_data = data.get("cpu", {})
|
|
result["cpu"]["usage_pct"] = cpu_data.get("usage_pct", cpu_data.get("usage", 0))
|
|
result["cpu"]["cores"] = cpu_data.get("cores", 0)
|
|
result["cpu"]["temp_c"] = cpu_data.get("temp_c", cpu_data.get("temp", None))
|
|
|
|
ram_data = data.get("ram", data.get("memory", {}))
|
|
result["ram"]["used_gb"] = round(ram_data.get("used_gb", ram_data.get("used", 0)), 2)
|
|
result["ram"]["total_gb"] = round(ram_data.get("total_gb", ram_data.get("total", 0)), 2)
|
|
total = result["ram"]["total_gb"]
|
|
if total > 0:
|
|
result["ram"]["pct"] = round(result["ram"]["used_gb"] / total * 100, 1)
|
|
else:
|
|
result["ram"]["pct"] = 0
|
|
|
|
|
|
def _parse_array_info(data: Dict[str, Any], result: Dict[str, Any]) -> None:
|
|
"""Populate array information from an API response."""
|
|
array_data = data.get("array", {})
|
|
result["array"]["status"] = array_data.get("status", "unknown")
|
|
|
|
disks_raw: List[Dict[str, Any]] = array_data.get("disks", [])
|
|
parsed_disks: List[Dict[str, Any]] = []
|
|
for disk in disks_raw:
|
|
parsed_disks.append({
|
|
"name": disk.get("name", ""),
|
|
"status": disk.get("status", "unknown"),
|
|
"size": disk.get("size", ""),
|
|
"used": disk.get("used", ""),
|
|
"temp_c": disk.get("temp_c", None),
|
|
})
|
|
result["array"]["disks"] = parsed_disks
|
|
|
|
|
|
def _parse_docker_info(data: Dict[str, Any], result: Dict[str, Any]) -> None:
|
|
"""Populate Docker container information from an API response."""
|
|
docker_data = data.get("docker", {})
|
|
containers_raw: List[Dict[str, Any]] = docker_data.get("containers", [])
|
|
|
|
containers: List[Dict[str, Any]] = []
|
|
running_count = 0
|
|
for container in containers_raw:
|
|
status = container.get("status", "unknown")
|
|
is_running = "running" in status.lower() if isinstance(status, str) else False
|
|
if is_running:
|
|
running_count += 1
|
|
containers.append({
|
|
"name": container.get("name", ""),
|
|
"status": status,
|
|
"image": container.get("image", ""),
|
|
"running": is_running,
|
|
})
|
|
|
|
result["docker"]["running"] = docker_data.get("running", running_count)
|
|
result["docker"]["containers"] = containers
|
|
|
|
|
|
async def _try_api_endpoint(
|
|
client: httpx.AsyncClient,
|
|
server: ServerConfig,
|
|
result: Dict[str, Any],
|
|
) -> bool:
|
|
"""Attempt to fetch stats via the Unraid OS API.
|
|
|
|
Returns True if successful, False otherwise.
|
|
"""
|
|
if not server.api_key:
|
|
return False
|
|
|
|
headers = {"Authorization": f"Bearer {server.api_key}"}
|
|
base = f"http://{server.host}:{server.port}"
|
|
|
|
try:
|
|
resp = await client.get(f"{base}/api/system", headers=headers)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
_parse_system_info(data, result)
|
|
_parse_array_info(data, result)
|
|
_parse_docker_info(data, result)
|
|
return True
|
|
except Exception:
|
|
pass
|
|
|
|
# Try individual endpoints if the combined one failed
|
|
fetched_any = False
|
|
|
|
try:
|
|
resp = await client.get(f"{base}/api/cpu", headers=headers)
|
|
if resp.status_code == 200:
|
|
cpu_data = resp.json()
|
|
result["cpu"]["usage_pct"] = cpu_data.get("usage_pct", cpu_data.get("usage", 0))
|
|
result["cpu"]["cores"] = cpu_data.get("cores", 0)
|
|
result["cpu"]["temp_c"] = cpu_data.get("temp_c", None)
|
|
result["online"] = True
|
|
fetched_any = True
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
resp = await client.get(f"{base}/api/memory", headers=headers)
|
|
if resp.status_code == 200:
|
|
ram_data = resp.json()
|
|
result["ram"]["used_gb"] = round(ram_data.get("used_gb", ram_data.get("used", 0)), 2)
|
|
result["ram"]["total_gb"] = round(ram_data.get("total_gb", ram_data.get("total", 0)), 2)
|
|
total = result["ram"]["total_gb"]
|
|
if total > 0:
|
|
result["ram"]["pct"] = round(result["ram"]["used_gb"] / total * 100, 1)
|
|
result["online"] = True
|
|
fetched_any = True
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
resp = await client.get(f"{base}/api/array", headers=headers)
|
|
if resp.status_code == 200:
|
|
_parse_array_info(resp.json(), result)
|
|
result["online"] = True
|
|
fetched_any = True
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
resp = await client.get(f"{base}/api/docker", headers=headers)
|
|
if resp.status_code == 200:
|
|
_parse_docker_info(resp.json(), result)
|
|
result["online"] = True
|
|
fetched_any = True
|
|
except Exception:
|
|
pass
|
|
|
|
return fetched_any
|
|
|
|
|
|
async def _try_connectivity_check(
|
|
client: httpx.AsyncClient,
|
|
server: ServerConfig,
|
|
result: Dict[str, Any],
|
|
) -> None:
|
|
"""Perform a basic HTTP connectivity check as a fallback."""
|
|
try:
|
|
resp = await client.get(
|
|
f"http://{server.host}:{server.port}/",
|
|
follow_redirects=True,
|
|
)
|
|
result["online"] = resp.status_code < 500
|
|
except Exception:
|
|
result["online"] = False
|
|
|
|
|
|
async def fetch_server_stats(server: ServerConfig) -> Dict[str, Any]:
|
|
"""Fetch system stats from an Unraid server.
|
|
|
|
Tries the Unraid API first (if ``api_key`` is configured), then falls back
|
|
to a simple HTTP connectivity check.
|
|
|
|
Args:
|
|
server: A :class:`ServerConfig` describing the target server.
|
|
|
|
Returns:
|
|
Dictionary with server name, host, online status, and detailed stats
|
|
for CPU, RAM, array, and Docker containers.
|
|
"""
|
|
result = _empty_stats(server)
|
|
|
|
if not server.host:
|
|
result["error"] = "No host configured"
|
|
return result
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10, verify=False) as client:
|
|
api_ok = await _try_api_endpoint(client, server, result)
|
|
|
|
if not api_ok and not result["online"]:
|
|
await _try_connectivity_check(client, server, result)
|
|
|
|
except Exception as exc:
|
|
result["online"] = False
|
|
result["error"] = str(exc)
|
|
|
|
return result
|
|
|
|
|
|
async def fetch_all_servers(servers: List[ServerConfig]) -> List[Dict[str, Any]]:
|
|
"""Fetch stats from all configured Unraid servers in parallel.
|
|
|
|
Args:
|
|
servers: List of :class:`ServerConfig` instances.
|
|
|
|
Returns:
|
|
List of stats dictionaries, one per server.
|
|
"""
|
|
if not servers:
|
|
return []
|
|
|
|
tasks = [fetch_server_stats(srv) for srv in servers]
|
|
return list(await asyncio.gather(*tasks))
|