"""Integration connection testing functions.""" from __future__ import annotations import asyncio import logging from typing import Any, Dict import httpx logger = logging.getLogger(__name__) TIMEOUT = 10.0 async def test_weather(config: Dict[str, Any]) -> Dict[str, Any]: """Test weather service by fetching current conditions.""" location = config.get("location", "Leverkusen") try: async with httpx.AsyncClient(timeout=TIMEOUT) as client: r = await client.get(f"https://wttr.in/{location}?format=j1") r.raise_for_status() data = r.json() temp = data["current_condition"][0]["temp_C"] return {"success": True, "message": f"Verbunden — {location}: {temp}°C"} except Exception as exc: return {"success": False, "message": str(exc)} async def test_ha(config: Dict[str, Any]) -> Dict[str, Any]: """Test Home Assistant connection.""" url = config.get("url", "") token = config.get("token", "") if not url or not token: return {"success": False, "message": "URL und Token sind erforderlich"} try: async with httpx.AsyncClient(timeout=TIMEOUT, verify=False) as client: r = await client.get( f"{url.rstrip('/')}/api/", headers={"Authorization": f"Bearer {token}"}, ) r.raise_for_status() data = r.json() return {"success": True, "message": f"Verbunden — {data.get('message', 'OK')}"} except Exception as exc: return {"success": False, "message": str(exc)} async def test_vikunja(config: Dict[str, Any]) -> Dict[str, Any]: """Test Vikunja API connection.""" url = config.get("url", "") token = config.get("token", "") if not url or not token: return {"success": False, "message": "URL und Token sind erforderlich"} try: base = url.rstrip("/") # Try to reach the info or user endpoint async with httpx.AsyncClient(timeout=TIMEOUT) as client: r = await client.get( f"{base}/user", headers={"Authorization": f"Bearer {token}"}, ) r.raise_for_status() data = r.json() return {"success": True, "message": f"Verbunden als {data.get('username', 'OK')}"} except Exception as exc: return {"success": False, "message": str(exc)} async def test_unraid(config: Dict[str, Any]) -> Dict[str, Any]: """Test Unraid server connectivity.""" servers = config.get("servers", []) if not servers: return {"success": False, "message": "Keine Server konfiguriert"} results = [] for srv in servers: name = srv.get("name", srv.get("host", "?")) host = srv.get("host", "") port = srv.get("port", 80) if not host: results.append(f"{name}: Kein Host") continue try: async with httpx.AsyncClient(timeout=5.0) as client: r = await client.get(f"http://{host}:{port}/") results.append(f"{name}: Online ({r.status_code})") except Exception as exc: results.append(f"{name}: Offline ({exc})") all_ok = all("Online" in r for r in results) return { "success": all_ok, "message": " | ".join(results), } async def test_mqtt(config: Dict[str, Any]) -> Dict[str, Any]: """Test MQTT broker connection.""" host = config.get("host", "") port = int(config.get("port", 1883)) username = config.get("username") or None password = config.get("password") or None if not host: return {"success": False, "message": "MQTT Host ist erforderlich"} try: import aiomqtt async with aiomqtt.Client( hostname=host, port=port, username=username, password=password, identifier="daily-briefing-test", ) as client: # If we get here, connection succeeded pass return {"success": True, "message": f"Verbunden mit {host}:{port}"} except Exception as exc: return {"success": False, "message": str(exc)} async def test_news_db(config: Dict[str, Any]) -> Dict[str, Any]: """Test that market_news table is accessible.""" try: from server.db import get_pool pool = await get_pool() async with pool.acquire() as conn: row = await conn.fetchrow("SELECT COUNT(*) AS cnt FROM market_news") count = row["cnt"] if row else 0 return {"success": True, "message": f"Verbunden — {count} Artikel in der Datenbank"} except Exception as exc: return {"success": False, "message": str(exc)} async def test_n8n(config: Dict[str, Any]) -> Dict[str, Any]: """Test n8n connection by fetching workflows.""" url = config.get("url", "") api_key = config.get("api_key", "") if not url or not api_key: return {"success": False, "message": "URL und API Key sind erforderlich"} try: base = url.rstrip("/") async with httpx.AsyncClient(timeout=TIMEOUT) as client: r = await client.get( f"{base}/api/v1/workflows", headers={"X-N8N-API-KEY": api_key}, ) r.raise_for_status() data = r.json() count = len(data.get("data", [])) return {"success": True, "message": f"Verbunden — {count} Workflows gefunden"} except Exception as exc: return {"success": False, "message": str(exc)} # Map integration type → test function TEST_FUNCTIONS = { "weather": test_weather, "ha": test_ha, "vikunja": test_vikunja, "unraid": test_unraid, "mqtt": test_mqtt, "news": test_news_db, "n8n": test_n8n, }