Complete admin backend with login, where all integrations (weather, news, Home Assistant, Vikunja, Unraid, MQTT) can be configured via web UI instead of ENV variables. Two-layer config: ENV seeds DB on first start, then DB is source of truth. Auto-migration system on startup. Backend: db.py shared pool, auth.py JWT, settings_service CRUD, seed_service, admin router (protected), test_connections per integration, config.py rewrite. Frontend: react-router v6, login page, admin layout with sidebar, 8 settings pages (General, Weather, News, HA, Vikunja, Unraid, MQTT, ChangePassword), shared IntegrationForm + TestButton components. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
147 lines
4.8 KiB
Python
147 lines
4.8 KiB
Python
"""Integration connection testing functions."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Any, Dict
|
|
|
|
import httpx
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
TIMEOUT = 10.0
|
|
|
|
|
|
async def test_weather(config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Test weather service by fetching current conditions."""
|
|
location = config.get("location", "Leverkusen")
|
|
try:
|
|
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
|
|
r = await client.get(f"https://wttr.in/{location}?format=j1")
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
temp = data["current_condition"][0]["temp_C"]
|
|
return {"success": True, "message": f"Verbunden — {location}: {temp}°C"}
|
|
except Exception as exc:
|
|
return {"success": False, "message": str(exc)}
|
|
|
|
|
|
async def test_ha(config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Test Home Assistant connection."""
|
|
url = config.get("url", "")
|
|
token = config.get("token", "")
|
|
if not url or not token:
|
|
return {"success": False, "message": "URL und Token sind erforderlich"}
|
|
try:
|
|
async with httpx.AsyncClient(timeout=TIMEOUT, verify=False) as client:
|
|
r = await client.get(
|
|
f"{url.rstrip('/')}/api/",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
return {"success": True, "message": f"Verbunden — {data.get('message', 'OK')}"}
|
|
except Exception as exc:
|
|
return {"success": False, "message": str(exc)}
|
|
|
|
|
|
async def test_vikunja(config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Test Vikunja API connection."""
|
|
url = config.get("url", "")
|
|
token = config.get("token", "")
|
|
if not url or not token:
|
|
return {"success": False, "message": "URL und Token sind erforderlich"}
|
|
try:
|
|
base = url.rstrip("/")
|
|
# Try to reach the info or user endpoint
|
|
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
|
|
r = await client.get(
|
|
f"{base}/user",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
return {"success": True, "message": f"Verbunden als {data.get('username', 'OK')}"}
|
|
except Exception as exc:
|
|
return {"success": False, "message": str(exc)}
|
|
|
|
|
|
async def test_unraid(config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Test Unraid server connectivity."""
|
|
servers = config.get("servers", [])
|
|
if not servers:
|
|
return {"success": False, "message": "Keine Server konfiguriert"}
|
|
|
|
results = []
|
|
for srv in servers:
|
|
name = srv.get("name", srv.get("host", "?"))
|
|
host = srv.get("host", "")
|
|
port = srv.get("port", 80)
|
|
if not host:
|
|
results.append(f"{name}: Kein Host")
|
|
continue
|
|
try:
|
|
async with httpx.AsyncClient(timeout=5.0) as client:
|
|
r = await client.get(f"http://{host}:{port}/")
|
|
results.append(f"{name}: Online ({r.status_code})")
|
|
except Exception as exc:
|
|
results.append(f"{name}: Offline ({exc})")
|
|
|
|
all_ok = all("Online" in r for r in results)
|
|
return {
|
|
"success": all_ok,
|
|
"message": " | ".join(results),
|
|
}
|
|
|
|
|
|
async def test_mqtt(config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Test MQTT broker connection."""
|
|
host = config.get("host", "")
|
|
port = int(config.get("port", 1883))
|
|
username = config.get("username") or None
|
|
password = config.get("password") or None
|
|
|
|
if not host:
|
|
return {"success": False, "message": "MQTT Host ist erforderlich"}
|
|
|
|
try:
|
|
import aiomqtt
|
|
|
|
async with aiomqtt.Client(
|
|
hostname=host,
|
|
port=port,
|
|
username=username,
|
|
password=password,
|
|
identifier="daily-briefing-test",
|
|
) as client:
|
|
# If we get here, connection succeeded
|
|
pass
|
|
return {"success": True, "message": f"Verbunden mit {host}:{port}"}
|
|
except Exception as exc:
|
|
return {"success": False, "message": str(exc)}
|
|
|
|
|
|
async def test_news_db(config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Test that market_news table is accessible."""
|
|
try:
|
|
from server.db import get_pool
|
|
|
|
pool = await get_pool()
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow("SELECT COUNT(*) AS cnt FROM market_news")
|
|
count = row["cnt"] if row else 0
|
|
return {"success": True, "message": f"Verbunden — {count} Artikel in der Datenbank"}
|
|
except Exception as exc:
|
|
return {"success": False, "message": str(exc)}
|
|
|
|
|
|
# Map integration type → test function
|
|
TEST_FUNCTIONS = {
|
|
"weather": test_weather,
|
|
"ha": test_ha,
|
|
"vikunja": test_vikunja,
|
|
"unraid": test_unraid,
|
|
"mqtt": test_mqtt,
|
|
"news": test_news_db,
|
|
}
|