feat: add Admin Panel with JWT auth, DB settings, and integration management

Complete admin backend with login, where all integrations (weather, news,
Home Assistant, Vikunja, Unraid, MQTT) can be configured via web UI instead
of ENV variables. Two-layer config: ENV seeds DB on first start, then DB
is source of truth. Auto-migration system on startup.

Backend: db.py shared pool, auth.py JWT, settings_service CRUD, seed_service,
admin router (protected), test_connections per integration, config.py rewrite.

Frontend: react-router v6, login page, admin layout with sidebar, 8 settings
pages (General, Weather, News, HA, Vikunja, Unraid, MQTT, ChangePassword),
shared IntegrationForm + TestButton components.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Sam 2026-03-02 10:37:30 +01:00
parent 89ed0c6d0a
commit f6a42c2dd2
40 changed files with 3487 additions and 311 deletions

72
server/auth.py Normal file
View file

@ -0,0 +1,72 @@
"""JWT authentication for admin routes."""
from __future__ import annotations
import logging
import os
import secrets
from datetime import datetime, timedelta, timezone
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
logger = logging.getLogger(__name__)
JWT_SECRET = os.getenv("JWT_SECRET") or secrets.token_urlsafe(32)
JWT_ALGORITHM = "HS256"
JWT_EXPIRE_HOURS = 24
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
bearer_scheme = HTTPBearer(auto_error=False)
def hash_password(password: str) -> str:
"""Hash a plain-text password with bcrypt."""
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
"""Verify a plain-text password against its bcrypt hash."""
return pwd_context.verify(plain, hashed)
def create_access_token(subject: str) -> str:
"""Create a JWT access token for the given subject (username)."""
expire = datetime.now(timezone.utc) + timedelta(hours=JWT_EXPIRE_HOURS)
return jwt.encode(
{"sub": subject, "exp": expire},
JWT_SECRET,
algorithm=JWT_ALGORITHM,
)
async def require_admin(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme),
) -> str:
"""FastAPI dependency that validates the JWT and returns the username.
Use as: ``admin_user: str = Depends(require_admin)``
"""
if credentials is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required",
headers={"WWW-Authenticate": "Bearer"},
)
token = credentials.credentials
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
username: Optional[str] = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Invalid token payload")
return username
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
headers={"WWW-Authenticate": "Bearer"},
)

View file

@ -1,11 +1,19 @@
"""Centralized configuration via environment variables."""
"""Centralized configuration — two-layer system (ENV bootstrap + DB runtime).
On first start, ENV values seed the database.
After that, the database is the source of truth for all integration configs.
ENV is only needed for: DB connection, ADMIN_PASSWORD, JWT_SECRET.
"""
from __future__ import annotations
import json
import logging
import os
from dataclasses import dataclass, field
from typing import List
from typing import Any, Dict, List
logger = logging.getLogger(__name__)
@dataclass
@ -18,7 +26,7 @@ class UnraidServer:
@dataclass
class Settings:
# --- Database (PostgreSQL) ---
# --- Bootstrap (always from ENV) ---
db_host: str = "10.10.10.10"
db_port: int = 5433
db_name: str = "openclaw"
@ -28,25 +36,31 @@ class Settings:
# --- Weather ---
weather_location: str = "Leverkusen"
weather_location_secondary: str = "Rab,Croatia"
weather_cache_ttl: int = 1800 # 30 min
weather_cache_ttl: int = 1800
# --- Home Assistant ---
ha_url: str = "https://homeassistant.daddelolymp.de"
ha_url: str = ""
ha_token: str = ""
ha_cache_ttl: int = 30
ha_enabled: bool = False
# --- Vikunja Tasks ---
vikunja_url: str = "http://10.10.10.10:3456/api/v1"
vikunja_url: str = ""
vikunja_token: str = ""
vikunja_cache_ttl: int = 60
vikunja_enabled: bool = False
vikunja_private_projects: List[int] = field(default_factory=lambda: [3, 4])
vikunja_sams_projects: List[int] = field(default_factory=lambda: [2, 5])
# --- Unraid Servers ---
unraid_servers: List[UnraidServer] = field(default_factory=list)
unraid_cache_ttl: int = 15
unraid_enabled: bool = False
# --- News ---
news_cache_ttl: int = 300 # 5 min
news_cache_ttl: int = 300
news_max_age_hours: int = 48
news_enabled: bool = True
# --- MQTT ---
mqtt_host: str = ""
@ -55,39 +69,44 @@ class Settings:
mqtt_password: str = ""
mqtt_topics: List[str] = field(default_factory=lambda: ["#"])
mqtt_client_id: str = "daily-briefing"
mqtt_enabled: bool = False
# --- Server ---
host: str = "0.0.0.0"
port: int = 8080
debug: bool = False
# --- WebSocket ---
ws_interval: int = 15
@classmethod
def from_env(cls) -> "Settings":
"""Load bootstrap config from environment variables."""
s = cls()
s.db_host = os.getenv("DB_HOST", s.db_host)
s.db_port = int(os.getenv("DB_PORT", str(s.db_port)))
s.db_name = os.getenv("DB_NAME", s.db_name)
s.db_user = os.getenv("DB_USER", s.db_user)
s.db_password = os.getenv("DB_PASSWORD", s.db_password)
s.debug = os.getenv("DEBUG", "").lower() in ("1", "true", "yes")
# Legacy ENV support — used for first-run seeding
s.weather_location = os.getenv("WEATHER_LOCATION", s.weather_location)
s.weather_location_secondary = os.getenv(
"WEATHER_LOCATION_SECONDARY", s.weather_location_secondary
)
s.weather_location_secondary = os.getenv("WEATHER_LOCATION_SECONDARY", s.weather_location_secondary)
s.ha_url = os.getenv("HA_URL", s.ha_url)
s.ha_token = os.getenv("HA_TOKEN", s.ha_token)
s.ha_enabled = bool(s.ha_url)
s.vikunja_url = os.getenv("VIKUNJA_URL", s.vikunja_url)
s.vikunja_token = os.getenv("VIKUNJA_TOKEN", s.vikunja_token)
s.vikunja_enabled = bool(s.vikunja_url)
s.mqtt_host = os.getenv("MQTT_HOST", s.mqtt_host)
s.mqtt_port = int(os.getenv("MQTT_PORT", str(s.mqtt_port)))
s.mqtt_username = os.getenv("MQTT_USERNAME", s.mqtt_username)
s.mqtt_password = os.getenv("MQTT_PASSWORD", s.mqtt_password)
s.mqtt_client_id = os.getenv("MQTT_CLIENT_ID", s.mqtt_client_id)
s.mqtt_enabled = bool(s.mqtt_host)
# Parse MQTT_TOPICS (comma-separated or JSON array)
# Parse MQTT_TOPICS
raw_topics = os.getenv("MQTT_TOPICS", "")
if raw_topics:
try:
@ -95,8 +114,6 @@ class Settings:
except (json.JSONDecodeError, TypeError):
s.mqtt_topics = [t.strip() for t in raw_topics.split(",") if t.strip()]
s.debug = os.getenv("DEBUG", "").lower() in ("1", "true", "yes")
# Parse UNRAID_SERVERS JSON
raw = os.getenv("UNRAID_SERVERS", "[]")
try:
@ -111,10 +128,103 @@ class Settings:
for i, srv in enumerate(servers_data)
if srv.get("host")
]
s.unraid_enabled = len(s.unraid_servers) > 0
except (json.JSONDecodeError, TypeError):
s.unraid_servers = []
return s
async def load_from_db(self) -> None:
"""Override fields with values from the database."""
try:
from server.services import settings_service
# Load integrations
integrations = await settings_service.get_integrations()
for integ in integrations:
cfg = integ.get("config", {})
enabled = integ.get("enabled", True)
itype = integ["type"]
if itype == "weather":
self.weather_location = cfg.get("location", self.weather_location)
self.weather_location_secondary = cfg.get("location_secondary", self.weather_location_secondary)
elif itype == "news":
self.news_max_age_hours = int(cfg.get("max_age_hours", self.news_max_age_hours))
self.news_enabled = enabled
elif itype == "ha":
self.ha_url = cfg.get("url", self.ha_url)
self.ha_token = cfg.get("token", self.ha_token)
self.ha_enabled = enabled
elif itype == "vikunja":
self.vikunja_url = cfg.get("url", self.vikunja_url)
self.vikunja_token = cfg.get("token", self.vikunja_token)
self.vikunja_private_projects = cfg.get("private_projects", self.vikunja_private_projects)
self.vikunja_sams_projects = cfg.get("sams_projects", self.vikunja_sams_projects)
self.vikunja_enabled = enabled
elif itype == "unraid":
servers = cfg.get("servers", [])
self.unraid_servers = [
UnraidServer(
name=s.get("name", ""),
host=s.get("host", ""),
api_key=s.get("api_key", ""),
port=int(s.get("port", 80)),
)
for s in servers
if s.get("host")
]
self.unraid_enabled = enabled
elif itype == "mqtt":
self.mqtt_host = cfg.get("host", self.mqtt_host)
self.mqtt_port = int(cfg.get("port", self.mqtt_port))
self.mqtt_username = cfg.get("username", self.mqtt_username)
self.mqtt_password = cfg.get("password", self.mqtt_password)
self.mqtt_client_id = cfg.get("client_id", self.mqtt_client_id)
self.mqtt_topics = cfg.get("topics", self.mqtt_topics)
self.mqtt_enabled = enabled
# Load app_settings (cache TTLs, etc.)
all_settings = await settings_service.get_all_settings()
for key, data in all_settings.items():
val = data["value"]
if key == "weather_cache_ttl":
self.weather_cache_ttl = int(val)
elif key == "ha_cache_ttl":
self.ha_cache_ttl = int(val)
elif key == "vikunja_cache_ttl":
self.vikunja_cache_ttl = int(val)
elif key == "unraid_cache_ttl":
self.unraid_cache_ttl = int(val)
elif key == "news_cache_ttl":
self.news_cache_ttl = int(val)
elif key == "ws_interval":
self.ws_interval = int(val)
logger.info("Settings loaded from database (%d integrations)", len(integrations))
except Exception:
logger.exception("Failed to load settings from DB — using ENV defaults")
# --- Module-level singleton ---
settings = Settings.from_env()
def get_settings() -> Settings:
"""Return the current settings. Used by all routers."""
return settings
async def reload_settings() -> None:
"""Reload settings from DB. Called after admin changes."""
global settings
s = Settings.from_env()
await s.load_from_db()
settings = s
logger.info("Settings reloaded from database")

52
server/db.py Normal file
View file

@ -0,0 +1,52 @@
"""Shared asyncpg connection pool."""
from __future__ import annotations
import logging
from typing import Optional
import asyncpg
logger = logging.getLogger(__name__)
_pool: Optional[asyncpg.Pool] = None
async def init_pool(
host: str,
port: int,
dbname: str,
user: str,
password: str,
min_size: int = 1,
max_size: int = 5,
) -> asyncpg.Pool:
"""Create the shared connection pool. Call once during app startup."""
global _pool
_pool = await asyncpg.create_pool(
host=host,
port=port,
database=dbname,
user=user,
password=password,
min_size=min_size,
max_size=max_size,
)
logger.info("Database pool initialized (%s:%d/%s)", host, port, dbname)
return _pool
async def get_pool() -> asyncpg.Pool:
"""Return the shared pool. Raises if not yet initialized."""
if _pool is None:
raise RuntimeError("Database pool not initialized — call init_pool() first")
return _pool
async def close_pool() -> None:
"""Close the shared pool. Call during app shutdown."""
global _pool
if _pool is not None:
await _pool.close()
_pool = None
logger.info("Database pool closed")

View file

@ -8,10 +8,10 @@ from pathlib import Path
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from server.config import settings
from server.services import news_service
from server.config import get_settings, reload_settings, settings
from server.services.mqtt_service import mqtt_service
logger = logging.getLogger("daily-briefing")
@ -24,15 +24,15 @@ logging.basicConfig(
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup / shutdown lifecycle."""
logger.info("Starting Daily Briefing Dashboard...")
logger.info(
"Unraid servers configured: %d",
len(settings.unraid_servers),
)
from server import db
from server.migrations.runner import run_migrations
from server.services.seed_service import seed_if_empty
# Initialize database pool
logger.info("Starting Daily Briefing Dashboard v2.1...")
# 1. Initialize shared database pool (bootstrap from ENV)
try:
await news_service.init_pool(
pool = await db.init_pool(
host=settings.db_host,
port=settings.db_port,
dbname=settings.db_name,
@ -41,36 +41,64 @@ async def lifespan(app: FastAPI):
)
logger.info("Database pool initialized")
except Exception:
logger.exception("Failed to initialize database pool — news will be unavailable")
logger.exception("Failed to initialize database pool — admin + news will be unavailable")
yield
return
# Start MQTT service
if settings.mqtt_host:
# 2. Run database migrations
try:
await run_migrations(pool)
except Exception:
logger.exception("Migration error — some features may not work")
# 3. Seed database from ENV on first run
try:
await seed_if_empty()
except Exception:
logger.exception("Seeding error — admin panel may need manual setup")
# 4. Load settings from database (overrides ENV defaults)
try:
await reload_settings()
cfg = get_settings()
logger.info(
"Settings loaded from DB — %d Unraid servers, MQTT=%s, HA=%s",
len(cfg.unraid_servers),
"enabled" if cfg.mqtt_enabled else "disabled",
"enabled" if cfg.ha_enabled else "disabled",
)
except Exception:
logger.exception("Failed to load settings from DB — using ENV defaults")
cfg = settings
# 5. Start MQTT service if enabled
if cfg.mqtt_enabled and cfg.mqtt_host:
try:
await mqtt_service.start(
host=settings.mqtt_host,
port=settings.mqtt_port,
username=settings.mqtt_username or None,
password=settings.mqtt_password or None,
topics=settings.mqtt_topics,
client_id=settings.mqtt_client_id,
host=cfg.mqtt_host,
port=cfg.mqtt_port,
username=cfg.mqtt_username or None,
password=cfg.mqtt_password or None,
topics=cfg.mqtt_topics,
client_id=cfg.mqtt_client_id,
)
logger.info("MQTT service started (broker %s:%d)", settings.mqtt_host, settings.mqtt_port)
logger.info("MQTT service started (%s:%d)", cfg.mqtt_host, cfg.mqtt_port)
except Exception:
logger.exception("Failed to start MQTT service — MQTT will be unavailable")
logger.exception("Failed to start MQTT service")
else:
logger.info("MQTT disabled — set MQTT_HOST to enable")
logger.info("MQTT disabled — configure via Admin Panel or MQTT_HOST env")
yield
# Shutdown
logger.info("Shutting down...")
await mqtt_service.stop()
await news_service.close_pool()
await db.close_pool()
app = FastAPI(
title="Daily Briefing",
version="2.0.0",
version="2.1.0",
lifespan=lifespan,
)
@ -84,8 +112,10 @@ app.add_middleware(
)
# --- Register Routers ---
from server.routers import dashboard, homeassistant, mqtt, news, servers, tasks, weather # noqa: E402
from server.routers import admin, auth, dashboard, homeassistant, mqtt, news, servers, tasks, weather # noqa: E402
app.include_router(auth.router)
app.include_router(admin.router)
app.include_router(weather.router)
app.include_router(news.router)
app.include_router(servers.router)
@ -97,6 +127,14 @@ app.include_router(dashboard.router)
# --- Serve static frontend (production) ---
static_dir = Path(__file__).parent.parent / "static"
if static_dir.is_dir():
# SPA fallback: serve index.html for any non-API path
@app.get("/admin/{full_path:path}")
async def admin_spa_fallback(full_path: str):
index = static_dir / "index.html"
if index.exists():
return FileResponse(str(index))
return {"error": "Frontend not built"}
app.mount("/", StaticFiles(directory=str(static_dir), html=True), name="static")
logger.info("Serving static frontend from %s", static_dir)
else:
@ -104,6 +142,10 @@ else:
async def root():
return {
"status": "ok",
"message": "Daily Briefing API — Frontend not built yet",
"endpoints": ["/api/all", "/api/weather", "/api/news", "/api/servers", "/api/ha", "/api/tasks"],
"message": "Daily Briefing API v2.1 — Frontend not built yet",
"endpoints": [
"/api/all", "/api/weather", "/api/news", "/api/servers",
"/api/ha", "/api/tasks", "/api/mqtt",
"/api/auth/login", "/api/admin/integrations",
],
}

View file

@ -0,0 +1,53 @@
-- Migration 001: Admin Backend Schema
-- Creates tables for admin user, settings, integrations, and MQTT subscriptions.
-- Single admin user
CREATE TABLE IF NOT EXISTS admin_user (
id SERIAL PRIMARY KEY,
username VARCHAR(100) NOT NULL DEFAULT 'admin',
password_hash VARCHAR(255) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- General key/value settings (cache TTLs, preferences, etc.)
CREATE TABLE IF NOT EXISTS app_settings (
key VARCHAR(100) PRIMARY KEY,
value TEXT NOT NULL DEFAULT '',
value_type VARCHAR(20) NOT NULL DEFAULT 'string',
category VARCHAR(50) NOT NULL DEFAULT 'general',
label VARCHAR(200) NOT NULL DEFAULT '',
description TEXT NOT NULL DEFAULT '',
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Integration configurations (one row per integration type)
CREATE TABLE IF NOT EXISTS integrations (
id SERIAL PRIMARY KEY,
type VARCHAR(50) NOT NULL UNIQUE,
name VARCHAR(200) NOT NULL,
config JSONB NOT NULL DEFAULT '{}',
enabled BOOLEAN NOT NULL DEFAULT true,
display_order INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- MQTT subscription management
CREATE TABLE IF NOT EXISTS mqtt_subscriptions (
id SERIAL PRIMARY KEY,
topic_pattern VARCHAR(500) NOT NULL,
display_name VARCHAR(200) NOT NULL DEFAULT '',
category VARCHAR(100) NOT NULL DEFAULT 'other',
unit VARCHAR(50) NOT NULL DEFAULT '',
widget_type VARCHAR(50) NOT NULL DEFAULT 'value',
enabled BOOLEAN NOT NULL DEFAULT true,
display_order INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Record this migration
INSERT INTO schema_version (version, description)
VALUES (1, 'Admin backend: admin_user, app_settings, integrations, mqtt_subscriptions')
ON CONFLICT (version) DO NOTHING;

View file

View file

@ -0,0 +1,58 @@
"""Auto-migration runner. Applies pending SQL migrations on startup."""
from __future__ import annotations
import logging
from pathlib import Path
import asyncpg
logger = logging.getLogger(__name__)
MIGRATIONS_DIR = Path(__file__).parent
async def run_migrations(pool: asyncpg.Pool) -> None:
"""Check schema_version and apply any pending .sql migration files."""
async with pool.acquire() as conn:
# Ensure the version-tracking table exists
await conn.execute("""
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER PRIMARY KEY,
applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
description TEXT NOT NULL DEFAULT ''
)
""")
row = await conn.fetchrow(
"SELECT COALESCE(MAX(version), 0) AS v FROM schema_version"
)
current_version: int = row["v"]
logger.info("Current schema version: %d", current_version)
# Discover and sort SQL files by their numeric prefix
sql_files = sorted(
MIGRATIONS_DIR.glob("[0-9]*.sql"),
key=lambda p: int(p.stem.split("_")[0]),
)
applied = 0
for sql_file in sql_files:
version = int(sql_file.stem.split("_")[0])
if version <= current_version:
continue
logger.info("Applying migration %03d: %s", version, sql_file.name)
sql = sql_file.read_text(encoding="utf-8")
# Execute the entire migration in a transaction
async with conn.transaction():
await conn.execute(sql)
logger.info("Migration %03d applied successfully", version)
applied += 1
if applied == 0:
logger.info("No pending migrations")
else:
logger.info("Applied %d migration(s)", applied)

186
server/routers/admin.py Normal file
View file

@ -0,0 +1,186 @@
"""Admin router — protected CRUD for settings, integrations, and MQTT subscriptions."""
from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from server.auth import require_admin
from server.services import settings_service
from server.services.test_connections import TEST_FUNCTIONS
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/admin",
tags=["admin"],
dependencies=[Depends(require_admin)],
)
# ---------------------------------------------------------------------------
# Request/Response Models
# ---------------------------------------------------------------------------
class SettingsUpdate(BaseModel):
settings: Dict[str, Any]
class IntegrationUpdate(BaseModel):
name: Optional[str] = None
config: Optional[Dict[str, Any]] = None
enabled: Optional[bool] = None
display_order: Optional[int] = None
class MqttSubscriptionCreate(BaseModel):
topic_pattern: str
display_name: str = ""
category: str = "other"
unit: str = ""
widget_type: str = "value"
enabled: bool = True
display_order: int = 0
class MqttSubscriptionUpdate(BaseModel):
topic_pattern: Optional[str] = None
display_name: Optional[str] = None
category: Optional[str] = None
unit: Optional[str] = None
widget_type: Optional[str] = None
enabled: Optional[bool] = None
display_order: Optional[int] = None
# ---------------------------------------------------------------------------
# Settings
# ---------------------------------------------------------------------------
@router.get("/settings")
async def get_settings() -> Dict[str, Any]:
"""Return all app settings."""
return await settings_service.get_all_settings()
@router.put("/settings")
async def update_settings(body: SettingsUpdate) -> Dict[str, str]:
"""Bulk update settings."""
await settings_service.bulk_set_settings(body.settings)
# Reload in-memory settings
await _reload_app_settings()
return {"status": "ok", "message": f"Updated {len(body.settings)} setting(s)"}
# ---------------------------------------------------------------------------
# Integrations
# ---------------------------------------------------------------------------
@router.get("/integrations")
async def list_integrations() -> List[Dict[str, Any]]:
"""List all integration configs."""
return await settings_service.get_integrations()
@router.get("/integrations/{type_name}")
async def get_integration(type_name: str) -> Dict[str, Any]:
"""Get a single integration config."""
result = await settings_service.get_integration(type_name)
if result is None:
raise HTTPException(status_code=404, detail=f"Integration '{type_name}' not found")
return result
@router.put("/integrations/{type_name}")
async def update_integration(type_name: str, body: IntegrationUpdate) -> Dict[str, Any]:
"""Update an integration config."""
existing = await settings_service.get_integration(type_name)
if existing is None:
raise HTTPException(status_code=404, detail=f"Integration '{type_name}' not found")
result = await settings_service.upsert_integration(
type_name=type_name,
name=body.name or existing["name"],
config=body.config if body.config is not None else existing["config"],
enabled=body.enabled if body.enabled is not None else existing["enabled"],
display_order=body.display_order if body.display_order is not None else existing["display_order"],
)
# Reload in-memory settings
await _reload_app_settings()
return result
@router.post("/integrations/{type_name}/test")
async def test_integration(type_name: str) -> Dict[str, Any]:
"""Test an integration connection."""
integration = await settings_service.get_integration(type_name)
if integration is None:
raise HTTPException(status_code=404, detail=f"Integration '{type_name}' not found")
test_fn = TEST_FUNCTIONS.get(type_name)
if test_fn is None:
return {"success": False, "message": f"No test available for '{type_name}'"}
return await test_fn(integration["config"])
# ---------------------------------------------------------------------------
# MQTT Subscriptions
# ---------------------------------------------------------------------------
@router.get("/mqtt/subscriptions")
async def list_mqtt_subscriptions() -> List[Dict[str, Any]]:
"""List all MQTT subscriptions."""
return await settings_service.get_mqtt_subscriptions()
@router.post("/mqtt/subscriptions")
async def create_mqtt_subscription(body: MqttSubscriptionCreate) -> Dict[str, Any]:
"""Create a new MQTT subscription."""
return await settings_service.create_mqtt_subscription(
topic_pattern=body.topic_pattern,
display_name=body.display_name,
category=body.category,
unit=body.unit,
widget_type=body.widget_type,
enabled=body.enabled,
display_order=body.display_order,
)
@router.put("/mqtt/subscriptions/{sub_id}")
async def update_mqtt_subscription(sub_id: int, body: MqttSubscriptionUpdate) -> Dict[str, Any]:
"""Update an MQTT subscription."""
fields = body.model_dump(exclude_none=True)
if not fields:
raise HTTPException(status_code=400, detail="No fields to update")
result = await settings_service.update_mqtt_subscription(sub_id, **fields)
if result is None:
raise HTTPException(status_code=404, detail="Subscription not found")
return result
@router.delete("/mqtt/subscriptions/{sub_id}")
async def delete_mqtt_subscription(sub_id: int) -> Dict[str, str]:
"""Delete an MQTT subscription."""
deleted = await settings_service.delete_mqtt_subscription(sub_id)
if not deleted:
raise HTTPException(status_code=404, detail="Subscription not found")
return {"status": "ok", "message": "Subscription deleted"}
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
async def _reload_app_settings() -> None:
"""Reload the in-memory Settings object from the database."""
try:
from server.config import reload_settings
await reload_settings()
except Exception:
logger.exception("Failed to reload settings after admin change")

79
server/routers/auth.py Normal file
View file

@ -0,0 +1,79 @@
"""Auth router — login and password management."""
from __future__ import annotations
import logging
from typing import Any, Dict
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from server.auth import (
create_access_token,
hash_password,
require_admin,
verify_password,
)
from server.services import settings_service
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/auth", tags=["auth"])
class LoginRequest(BaseModel):
username: str
password: str
class ChangePasswordRequest(BaseModel):
current_password: str
new_password: str
@router.post("/login")
async def login(body: LoginRequest) -> Dict[str, Any]:
"""Authenticate admin and return a JWT."""
user = await settings_service.get_admin_user()
if user is None:
raise HTTPException(status_code=503, detail="No admin user configured")
if body.username != user["username"]:
raise HTTPException(status_code=401, detail="Invalid credentials")
if not verify_password(body.password, user["password_hash"]):
raise HTTPException(status_code=401, detail="Invalid credentials")
token = create_access_token(user["username"])
return {
"token": token,
"username": user["username"],
}
@router.get("/me")
async def get_me(admin_user: str = Depends(require_admin)) -> Dict[str, str]:
"""Return the authenticated admin username. Used to verify token validity."""
return {"username": admin_user}
@router.put("/password")
async def change_password(
body: ChangePasswordRequest,
admin_user: str = Depends(require_admin),
) -> Dict[str, str]:
"""Change the admin password."""
user = await settings_service.get_admin_user()
if user is None:
raise HTTPException(status_code=500, detail="Admin user not found")
if not verify_password(body.current_password, user["password_hash"]):
raise HTTPException(status_code=400, detail="Current password is incorrect")
if len(body.new_password) < 6:
raise HTTPException(status_code=400, detail="New password must be at least 6 characters")
await settings_service.update_admin_password(
user["id"], hash_password(body.new_password)
)
return {"status": "ok", "message": "Password changed successfully"}

View file

@ -8,7 +8,7 @@ from typing import Any, Dict
from fastapi import APIRouter
from server.cache import cache
from server.config import settings
from server.config import get_settings
from server.services.ha_service import fetch_ha_data
logger = logging.getLogger(__name__)
@ -36,12 +36,12 @@ async def get_ha() -> Dict[str, Any]:
# --- cache miss -----------------------------------------------------------
try:
data: Dict[str, Any] = await fetch_ha_data(
settings.ha_url,
settings.ha_token,
get_settings().ha_url,
get_settings().ha_token,
)
except Exception as exc:
logger.exception("Failed to fetch Home Assistant data")
return {"error": True, "message": str(exc)}
await cache.set(CACHE_KEY, data, settings.ha_cache_ttl)
await cache.set(CACHE_KEY, data, get_settings().ha_cache_ttl)
return data

View file

@ -8,7 +8,7 @@ from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Query
from server.cache import cache
from server.config import settings
from server.config import get_settings
from server.services.news_service import get_news, get_news_count
logger = logging.getLogger(__name__)
@ -50,7 +50,7 @@ async def get_news_articles(
total: int = 0
try:
articles = await get_news(limit=limit, offset=offset, category=category, max_age_hours=settings.news_max_age_hours)
articles = await get_news(limit=limit, offset=offset, category=category, max_age_hours=get_settings().news_max_age_hours)
except Exception as exc:
logger.exception("Failed to fetch news articles")
return {
@ -63,7 +63,7 @@ async def get_news_articles(
}
try:
total = await get_news_count(max_age_hours=settings.news_max_age_hours, category=category)
total = await get_news_count(max_age_hours=get_settings().news_max_age_hours, category=category)
except Exception as exc:
logger.exception("Failed to fetch news count")
# We still have articles -- return them with total = len(articles)
@ -76,5 +76,5 @@ async def get_news_articles(
"offset": offset,
}
await cache.set(key, payload, settings.news_cache_ttl)
await cache.set(key, payload, get_settings().news_cache_ttl)
return payload

View file

@ -8,7 +8,7 @@ from typing import Any, Dict, List
from fastapi import APIRouter
from server.cache import cache
from server.config import settings
from server.config import get_settings
from server.services.unraid_service import ServerConfig, fetch_all_servers
logger = logging.getLogger(__name__)
@ -42,7 +42,7 @@ async def get_servers() -> Dict[str, Any]:
api_key=srv.api_key,
port=srv.port,
)
for srv in settings.unraid_servers
for srv in get_settings().unraid_servers
]
servers_data: List[Dict[str, Any]] = []
@ -60,5 +60,5 @@ async def get_servers() -> Dict[str, Any]:
"servers": servers_data,
}
await cache.set(CACHE_KEY, payload, settings.unraid_cache_ttl)
await cache.set(CACHE_KEY, payload, get_settings().unraid_cache_ttl)
return payload

View file

@ -8,7 +8,7 @@ from typing import Any, Dict
from fastapi import APIRouter
from server.cache import cache
from server.config import settings
from server.config import get_settings
from server.services.vikunja_service import fetch_tasks
logger = logging.getLogger(__name__)
@ -36,12 +36,12 @@ async def get_tasks() -> Dict[str, Any]:
# --- cache miss -----------------------------------------------------------
try:
data: Dict[str, Any] = await fetch_tasks(
settings.vikunja_url,
settings.vikunja_token,
get_settings().vikunja_url,
get_settings().vikunja_token,
)
except Exception as exc:
logger.exception("Failed to fetch Vikunja tasks")
return {"error": True, "message": str(exc)}
await cache.set(CACHE_KEY, data, settings.vikunja_cache_ttl)
await cache.set(CACHE_KEY, data, get_settings().vikunja_cache_ttl)
return data

View file

@ -9,7 +9,7 @@ from typing import Any, Dict, List
from fastapi import APIRouter
from server.cache import cache
from server.config import settings
from server.config import get_settings
from server.services.weather_service import fetch_hourly_forecast, fetch_weather
logger = logging.getLogger(__name__)
@ -43,9 +43,9 @@ async def get_weather() -> Dict[str, Any]:
hourly_data: List[Dict[str, Any]] = []
results = await asyncio.gather(
_safe_fetch_weather(settings.weather_location),
_safe_fetch_weather(settings.weather_location_secondary),
_safe_fetch_hourly(settings.weather_location),
_safe_fetch_weather(get_settings().weather_location),
_safe_fetch_weather(get_settings().weather_location_secondary),
_safe_fetch_hourly(get_settings().weather_location),
return_exceptions=False, # we handle errors inside the helpers
)
@ -59,7 +59,7 @@ async def get_weather() -> Dict[str, Any]:
"hourly": hourly_data,
}
await cache.set(CACHE_KEY, payload, settings.weather_cache_ttl)
await cache.set(CACHE_KEY, payload, get_settings().weather_cache_ttl)
return payload

View file

@ -1,43 +1,11 @@
"""News service — queries market_news from PostgreSQL via shared pool."""
from __future__ import annotations
import asyncpg
from typing import Any, Dict, List, Optional
_pool: Optional[asyncpg.Pool] = None
async def init_pool(
host: str,
port: int,
dbname: str,
user: str,
password: str,
) -> None:
"""Initialise the global asyncpg connection pool.
Call once at application startup.
"""
global _pool
_pool = await asyncpg.create_pool(
host=host,
port=port,
database=dbname,
user=user,
password=password,
min_size=1,
max_size=5,
)
async def close_pool() -> None:
"""Close the global asyncpg connection pool.
Call once at application shutdown.
"""
global _pool
if _pool is not None:
await _pool.close()
_pool = None
from server.db import get_pool
def _row_to_dict(row: asyncpg.Record) -> Dict[str, Any]:
@ -54,19 +22,8 @@ async def get_news(
category: Optional[str] = None,
max_age_hours: int = 48,
) -> List[Dict[str, Any]]:
"""Fetch recent news articles from the market_news table.
Args:
limit: Maximum number of rows to return.
offset: Number of rows to skip (for pagination).
category: Optional category filter (exact match).
max_age_hours: Only return articles published within this many hours.
Returns:
List of news article dictionaries.
"""
if _pool is None:
raise RuntimeError("Database pool is not initialised. Call init_pool() first.")
"""Fetch recent news articles from the market_news table."""
pool = await get_pool()
params: List[Any] = []
param_idx = 1
@ -86,7 +43,7 @@ async def get_news(
params.append(limit)
params.append(offset)
async with _pool.acquire() as conn:
async with pool.acquire() as conn:
rows = await conn.fetch(base_query, *params)
return [_row_to_dict(row) for row in rows]
@ -96,17 +53,8 @@ async def get_news_count(
max_age_hours: int = 48,
category: Optional[str] = None,
) -> int:
"""Return the total count of recent news articles.
Args:
max_age_hours: Only count articles published within this many hours.
category: Optional category filter.
Returns:
Integer count.
"""
if _pool is None:
raise RuntimeError("Database pool is not initialised. Call init_pool() first.")
"""Return the total count of recent news articles."""
pool = await get_pool()
params: List[Any] = []
param_idx = 1
@ -121,23 +69,15 @@ async def get_news_count(
query += f" AND category = ${param_idx}"
params.append(category)
async with _pool.acquire() as conn:
async with pool.acquire() as conn:
row = await conn.fetchrow(query, *params)
return int(row["cnt"]) if row else 0
async def get_categories(max_age_hours: int = 48) -> List[str]:
"""Return distinct categories from recent news articles.
Args:
max_age_hours: Only consider articles published within this many hours.
Returns:
Sorted list of category strings.
"""
if _pool is None:
raise RuntimeError("Database pool is not initialised. Call init_pool() first.")
"""Return distinct categories from recent news articles."""
pool = await get_pool()
query = (
"SELECT DISTINCT category "
@ -147,7 +87,7 @@ async def get_categories(max_age_hours: int = 48) -> List[str]:
"ORDER BY category"
)
async with _pool.acquire() as conn:
async with pool.acquire() as conn:
rows = await conn.fetch(query)
return [row["category"] for row in rows]

View file

@ -0,0 +1,150 @@
"""First-run seeder: populates DB from ENV defaults when tables are empty."""
from __future__ import annotations
import json
import logging
import os
import secrets
from server.auth import hash_password
from server.db import get_pool
from server.services import settings_service
logger = logging.getLogger(__name__)
async def seed_if_empty() -> None:
"""Check if admin tables are empty and seed with ENV-derived values."""
pool = await get_pool()
# ---- Admin User ----
user = await settings_service.get_admin_user()
if user is None:
admin_pw = os.getenv("ADMIN_PASSWORD", "")
if not admin_pw:
admin_pw = secrets.token_urlsafe(16)
logger.warning(
"=" * 60 + "\n"
" No ADMIN_PASSWORD set — generated: %s\n"
" Set ADMIN_PASSWORD env to use your own.\n" +
"=" * 60,
admin_pw,
)
await settings_service.create_admin_user("admin", hash_password(admin_pw))
logger.info("Admin user seeded from ENV")
# ---- Integrations ----
existing = await settings_service.get_integrations()
existing_types = {i["type"] for i in existing}
seed_integrations = [
{
"type": "weather",
"name": "Wetter (wttr.in)",
"config": {
"location": os.getenv("WEATHER_LOCATION", "Leverkusen"),
"location_secondary": os.getenv("WEATHER_LOCATION_SECONDARY", "Rab,Croatia"),
},
"enabled": True,
"display_order": 0,
},
{
"type": "news",
"name": "News (PostgreSQL)",
"config": {
"max_age_hours": int(os.getenv("NEWS_MAX_AGE_HOURS", "48")),
},
"enabled": True,
"display_order": 1,
},
{
"type": "ha",
"name": "Home Assistant",
"config": {
"url": os.getenv("HA_URL", ""),
"token": os.getenv("HA_TOKEN", ""),
},
"enabled": bool(os.getenv("HA_URL")),
"display_order": 2,
},
{
"type": "vikunja",
"name": "Vikunja Tasks",
"config": {
"url": os.getenv("VIKUNJA_URL", ""),
"token": os.getenv("VIKUNJA_TOKEN", ""),
"private_projects": [3, 4],
"sams_projects": [2, 5],
},
"enabled": bool(os.getenv("VIKUNJA_URL")),
"display_order": 3,
},
{
"type": "unraid",
"name": "Unraid Server",
"config": {
"servers": _parse_unraid_env(),
},
"enabled": bool(os.getenv("UNRAID_SERVERS")),
"display_order": 4,
},
{
"type": "mqtt",
"name": "MQTT Broker",
"config": {
"host": os.getenv("MQTT_HOST", ""),
"port": int(os.getenv("MQTT_PORT", "1883")),
"username": os.getenv("MQTT_USERNAME", ""),
"password": os.getenv("MQTT_PASSWORD", ""),
"client_id": os.getenv("MQTT_CLIENT_ID", "daily-briefing"),
"topics": _parse_mqtt_topics(),
},
"enabled": bool(os.getenv("MQTT_HOST")),
"display_order": 5,
},
]
for seed in seed_integrations:
if seed["type"] not in existing_types:
await settings_service.upsert_integration(
type_name=seed["type"],
name=seed["name"],
config=seed["config"],
enabled=seed["enabled"],
display_order=seed["display_order"],
)
logger.info("Seeded integration: %s", seed["type"])
# ---- App Settings ----
existing_settings = await settings_service.get_all_settings()
if not existing_settings:
default_settings = [
("weather_cache_ttl", "1800", "int", "cache", "Wetter Cache TTL", "Sekunden"),
("ha_cache_ttl", "30", "int", "cache", "HA Cache TTL", "Sekunden"),
("vikunja_cache_ttl", "60", "int", "cache", "Vikunja Cache TTL", "Sekunden"),
("unraid_cache_ttl", "15", "int", "cache", "Unraid Cache TTL", "Sekunden"),
("news_cache_ttl", "300", "int", "cache", "News Cache TTL", "Sekunden"),
("ws_interval", "15", "int", "general", "WebSocket Intervall", "Sekunden"),
]
for key, value, vtype, cat, label, desc in default_settings:
await settings_service.set_setting(key, value, vtype, cat, label, desc)
logger.info("Seeded %d default settings", len(default_settings))
def _parse_unraid_env() -> list:
"""Parse UNRAID_SERVERS env var."""
raw = os.getenv("UNRAID_SERVERS", "[]")
try:
return json.loads(raw)
except (json.JSONDecodeError, TypeError):
return []
def _parse_mqtt_topics() -> list:
"""Parse MQTT_TOPICS env var."""
raw = os.getenv("MQTT_TOPICS", "#")
try:
return json.loads(raw)
except (json.JSONDecodeError, TypeError):
return [t.strip() for t in raw.split(",") if t.strip()]

View file

@ -0,0 +1,297 @@
"""Database-backed settings, integrations, and user management."""
from __future__ import annotations
import json
import logging
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from server.db import get_pool
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Admin User
# ---------------------------------------------------------------------------
async def get_admin_user() -> Optional[Dict[str, Any]]:
"""Return the admin user row, or None if not yet created."""
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT * FROM admin_user LIMIT 1")
return dict(row) if row else None
async def create_admin_user(username: str, password_hash: str) -> None:
"""Insert the initial admin user."""
pool = await get_pool()
async with pool.acquire() as conn:
await conn.execute(
"INSERT INTO admin_user (username, password_hash) VALUES ($1, $2)",
username,
password_hash,
)
logger.info("Admin user '%s' created", username)
async def update_admin_password(user_id: int, password_hash: str) -> None:
"""Update the admin user's password."""
pool = await get_pool()
async with pool.acquire() as conn:
await conn.execute(
"UPDATE admin_user SET password_hash = $1, updated_at = NOW() WHERE id = $2",
password_hash,
user_id,
)
logger.info("Admin password updated (user_id=%d)", user_id)
# ---------------------------------------------------------------------------
# App Settings (key/value)
# ---------------------------------------------------------------------------
async def get_all_settings() -> Dict[str, Any]:
"""Return all settings as a dict, casting values to their declared type."""
pool = await get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch("SELECT * FROM app_settings ORDER BY category, key")
result: Dict[str, Any] = {}
for row in rows:
result[row["key"]] = {
"value": _cast_value(row["value"], row["value_type"]),
"value_type": row["value_type"],
"category": row["category"],
"label": row["label"],
"description": row["description"],
}
return result
async def get_setting(key: str) -> Optional[Any]:
"""Return a single setting's typed value, or None."""
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT value, value_type FROM app_settings WHERE key = $1", key)
if row is None:
return None
return _cast_value(row["value"], row["value_type"])
async def set_setting(
key: str,
value: Any,
value_type: str = "string",
category: str = "general",
label: str = "",
description: str = "",
) -> None:
"""Upsert a single setting."""
pool = await get_pool()
str_value = json.dumps(value) if value_type == "json" else str(value)
async with pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO app_settings (key, value, value_type, category, label, description, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, NOW())
ON CONFLICT (key) DO UPDATE SET
value = EXCLUDED.value,
value_type = EXCLUDED.value_type,
category = EXCLUDED.category,
label = EXCLUDED.label,
description = EXCLUDED.description,
updated_at = NOW()
""",
key, str_value, value_type, category, label, description,
)
async def bulk_set_settings(settings_dict: Dict[str, Any]) -> None:
"""Bulk upsert settings from a flat key→value dict."""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.transaction():
for key, val in settings_dict.items():
str_val = str(val)
await conn.execute(
"""
UPDATE app_settings SET value = $1, updated_at = NOW()
WHERE key = $2
""",
str_val, key,
)
def _cast_value(raw: str, value_type: str) -> Any:
"""Cast a stored string value to its declared type."""
if value_type == "int":
try:
return int(raw)
except (ValueError, TypeError):
return 0
elif value_type == "bool":
return raw.lower() in ("1", "true", "yes")
elif value_type == "json":
try:
return json.loads(raw)
except (json.JSONDecodeError, TypeError):
return raw
return raw
# ---------------------------------------------------------------------------
# Integrations
# ---------------------------------------------------------------------------
async def get_integrations() -> List[Dict[str, Any]]:
"""Return all integration configs."""
pool = await get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT * FROM integrations ORDER BY display_order, type"
)
return [_integration_to_dict(row) for row in rows]
async def get_integration(type_name: str) -> Optional[Dict[str, Any]]:
"""Return a single integration by type name."""
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM integrations WHERE type = $1", type_name
)
return _integration_to_dict(row) if row else None
async def upsert_integration(
type_name: str,
name: str,
config: Dict[str, Any],
enabled: bool = True,
display_order: int = 0,
) -> Dict[str, Any]:
"""Insert or update an integration config."""
pool = await get_pool()
config_json = json.dumps(config)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO integrations (type, name, config, enabled, display_order, updated_at)
VALUES ($1, $2, $3::jsonb, $4, $5, NOW())
ON CONFLICT (type) DO UPDATE SET
name = EXCLUDED.name,
config = EXCLUDED.config,
enabled = EXCLUDED.enabled,
display_order = EXCLUDED.display_order,
updated_at = NOW()
RETURNING *
""",
type_name, name, config_json, enabled, display_order,
)
return _integration_to_dict(row)
async def toggle_integration(type_name: str, enabled: bool) -> None:
"""Enable or disable an integration."""
pool = await get_pool()
async with pool.acquire() as conn:
await conn.execute(
"UPDATE integrations SET enabled = $1, updated_at = NOW() WHERE type = $2",
enabled, type_name,
)
def _integration_to_dict(row: Any) -> Dict[str, Any]:
"""Convert an integration row to a dict."""
d = dict(row)
# Ensure config is a dict (asyncpg returns JSONB as dict already)
if isinstance(d.get("config"), str):
d["config"] = json.loads(d["config"])
# Convert datetimes
for k in ("created_at", "updated_at"):
if k in d and d[k] is not None:
d[k] = d[k].isoformat()
return d
# ---------------------------------------------------------------------------
# MQTT Subscriptions
# ---------------------------------------------------------------------------
async def get_mqtt_subscriptions() -> List[Dict[str, Any]]:
"""Return all MQTT subscriptions."""
pool = await get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT * FROM mqtt_subscriptions ORDER BY display_order, id"
)
return [_sub_to_dict(row) for row in rows]
async def create_mqtt_subscription(
topic_pattern: str,
display_name: str = "",
category: str = "other",
unit: str = "",
widget_type: str = "value",
enabled: bool = True,
display_order: int = 0,
) -> Dict[str, Any]:
"""Create a new MQTT subscription."""
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO mqtt_subscriptions
(topic_pattern, display_name, category, unit, widget_type, enabled, display_order)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *
""",
topic_pattern, display_name, category, unit, widget_type, enabled, display_order,
)
return _sub_to_dict(row)
async def update_mqtt_subscription(sub_id: int, **fields: Any) -> Optional[Dict[str, Any]]:
"""Update specific fields of an MQTT subscription."""
pool = await get_pool()
allowed = {"topic_pattern", "display_name", "category", "unit", "widget_type", "enabled", "display_order"}
updates = {k: v for k, v in fields.items() if k in allowed}
if not updates:
return None
set_parts = []
params = []
for i, (k, v) in enumerate(updates.items(), start=1):
set_parts.append(f"{k} = ${i}")
params.append(v)
params.append(sub_id)
set_clause = ", ".join(set_parts)
async with pool.acquire() as conn:
row = await conn.fetchrow(
f"UPDATE mqtt_subscriptions SET {set_clause}, updated_at = NOW() "
f"WHERE id = ${len(params)} RETURNING *",
*params,
)
return _sub_to_dict(row) if row else None
async def delete_mqtt_subscription(sub_id: int) -> bool:
"""Delete an MQTT subscription. Returns True if deleted."""
pool = await get_pool()
async with pool.acquire() as conn:
result = await conn.execute(
"DELETE FROM mqtt_subscriptions WHERE id = $1", sub_id
)
return result == "DELETE 1"
def _sub_to_dict(row: Any) -> Dict[str, Any]:
d = dict(row)
for k in ("created_at", "updated_at"):
if k in d and d[k] is not None:
d[k] = d[k].isoformat()
return d

View file

@ -0,0 +1,147 @@
"""Integration connection testing functions."""
from __future__ import annotations
import asyncio
import logging
from typing import Any, Dict
import httpx
logger = logging.getLogger(__name__)
TIMEOUT = 10.0
async def test_weather(config: Dict[str, Any]) -> Dict[str, Any]:
"""Test weather service by fetching current conditions."""
location = config.get("location", "Leverkusen")
try:
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
r = await client.get(f"https://wttr.in/{location}?format=j1")
r.raise_for_status()
data = r.json()
temp = data["current_condition"][0]["temp_C"]
return {"success": True, "message": f"Verbunden — {location}: {temp}°C"}
except Exception as exc:
return {"success": False, "message": str(exc)}
async def test_ha(config: Dict[str, Any]) -> Dict[str, Any]:
"""Test Home Assistant connection."""
url = config.get("url", "")
token = config.get("token", "")
if not url or not token:
return {"success": False, "message": "URL und Token sind erforderlich"}
try:
async with httpx.AsyncClient(timeout=TIMEOUT, verify=False) as client:
r = await client.get(
f"{url.rstrip('/')}/api/",
headers={"Authorization": f"Bearer {token}"},
)
r.raise_for_status()
data = r.json()
return {"success": True, "message": f"Verbunden — {data.get('message', 'OK')}"}
except Exception as exc:
return {"success": False, "message": str(exc)}
async def test_vikunja(config: Dict[str, Any]) -> Dict[str, Any]:
"""Test Vikunja API connection."""
url = config.get("url", "")
token = config.get("token", "")
if not url or not token:
return {"success": False, "message": "URL und Token sind erforderlich"}
try:
base = url.rstrip("/")
# Try to reach the info or user endpoint
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
r = await client.get(
f"{base}/user",
headers={"Authorization": f"Bearer {token}"},
)
r.raise_for_status()
data = r.json()
return {"success": True, "message": f"Verbunden als {data.get('username', 'OK')}"}
except Exception as exc:
return {"success": False, "message": str(exc)}
async def test_unraid(config: Dict[str, Any]) -> Dict[str, Any]:
"""Test Unraid server connectivity."""
servers = config.get("servers", [])
if not servers:
return {"success": False, "message": "Keine Server konfiguriert"}
results = []
for srv in servers:
name = srv.get("name", srv.get("host", "?"))
host = srv.get("host", "")
port = srv.get("port", 80)
if not host:
results.append(f"{name}: Kein Host")
continue
try:
async with httpx.AsyncClient(timeout=5.0) as client:
r = await client.get(f"http://{host}:{port}/")
results.append(f"{name}: Online ({r.status_code})")
except Exception as exc:
results.append(f"{name}: Offline ({exc})")
all_ok = all("Online" in r for r in results)
return {
"success": all_ok,
"message": " | ".join(results),
}
async def test_mqtt(config: Dict[str, Any]) -> Dict[str, Any]:
"""Test MQTT broker connection."""
host = config.get("host", "")
port = int(config.get("port", 1883))
username = config.get("username") or None
password = config.get("password") or None
if not host:
return {"success": False, "message": "MQTT Host ist erforderlich"}
try:
import aiomqtt
async with aiomqtt.Client(
hostname=host,
port=port,
username=username,
password=password,
identifier="daily-briefing-test",
) as client:
# If we get here, connection succeeded
pass
return {"success": True, "message": f"Verbunden mit {host}:{port}"}
except Exception as exc:
return {"success": False, "message": str(exc)}
async def test_news_db(config: Dict[str, Any]) -> Dict[str, Any]:
"""Test that market_news table is accessible."""
try:
from server.db import get_pool
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT COUNT(*) AS cnt FROM market_news")
count = row["cnt"] if row else 0
return {"success": True, "message": f"Verbunden — {count} Artikel in der Datenbank"}
except Exception as exc:
return {"success": False, "message": str(exc)}
# Map integration type → test function
TEST_FUNCTIONS = {
"weather": test_weather,
"ha": test_ha,
"vikunja": test_vikunja,
"unraid": test_unraid,
"mqtt": test_mqtt,
"news": test_news_db,
}