refactor: complete rewrite as React+FastAPI dashboard

Replace monolithic Jinja2 template with modern stack:

Backend (FastAPI):
- Modular router/service architecture
- Async PostgreSQL (asyncpg) for news from n8n pipeline
- Live Unraid server stats (2 servers via API)
- Home Assistant, Vikunja tasks, weather (wttr.in)
- WebSocket broadcast for real-time updates (15s)
- TTL cache per endpoint, all config via ENV vars

Frontend (React + Vite + TypeScript):
- Glassmorphism dark theme with Tailwind CSS
- Responsive grid: mobile/tablet/desktop/ultrawide
- Weather cards, hourly forecast, news with category tabs
- Server stats (CPU ring, RAM bar, Docker list)
- Home Assistant controls, task management
- Live clock, WebSocket connection indicator

Infrastructure:
- Multi-stage Dockerfile (node:22-alpine + python:3.11-slim)
- docker-compose with full ENV configuration
- Kaniko CI/CD pipeline for GitLab registry

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Sam 2026-03-02 01:48:51 +01:00
parent 4bbc125a67
commit 9f7330e217
48 changed files with 6390 additions and 1461 deletions

View file

View file

@ -0,0 +1,149 @@
from __future__ import annotations
import httpx
from typing import Any, Dict, List, Optional
def _friendly_name(entity: Dict[str, Any]) -> str:
"""Extract the friendly name from an entity's attributes, falling back to entity_id."""
attrs = entity.get("attributes", {})
return attrs.get("friendly_name", entity.get("entity_id", "unknown"))
def _parse_light(entity: Dict[str, Any]) -> Dict[str, Any]:
"""Parse a light entity into a normalised dictionary."""
attrs = entity.get("attributes", {})
state = entity.get("state", "unknown")
brightness_raw = attrs.get("brightness")
brightness_pct: Optional[int] = None
if brightness_raw is not None:
try:
brightness_pct = round(int(brightness_raw) / 255 * 100)
except (ValueError, TypeError):
brightness_pct = None
return {
"entity_id": entity.get("entity_id", ""),
"name": _friendly_name(entity),
"state": state,
"brightness": brightness_pct,
"color_mode": attrs.get("color_mode"),
}
def _parse_cover(entity: Dict[str, Any]) -> Dict[str, Any]:
"""Parse a cover entity into a normalised dictionary."""
attrs = entity.get("attributes", {})
return {
"entity_id": entity.get("entity_id", ""),
"name": _friendly_name(entity),
"state": entity.get("state", "unknown"),
"current_position": attrs.get("current_position"),
}
def _parse_sensor(entity: Dict[str, Any]) -> Dict[str, Any]:
"""Parse a temperature sensor entity into a normalised dictionary."""
attrs = entity.get("attributes", {})
state_value = entity.get("state", "unknown")
try:
state_value = round(float(state_value), 1)
except (ValueError, TypeError):
pass
return {
"entity_id": entity.get("entity_id", ""),
"name": _friendly_name(entity),
"state": state_value,
"unit": attrs.get("unit_of_measurement", ""),
"device_class": attrs.get("device_class", ""),
}
async def fetch_ha_data(url: str, token: str) -> Dict[str, Any]:
"""Fetch and categorise entity states from a Home Assistant instance.
Args:
url: Base URL of the Home Assistant instance (e.g. ``http://192.168.1.100:8123``).
token: Long-lived access token for authentication.
Returns:
Dictionary containing:
- ``online``: Whether the HA instance is reachable.
- ``lights``: List of light entities with state and brightness.
- ``covers``: List of cover entities with state and position.
- ``sensors``: List of temperature sensor entities.
- ``lights_on``: Count of lights currently in the ``on`` state.
- ``lights_total``: Total number of light entities.
- ``error``: Error message if the request failed, else ``None``.
"""
result: Dict[str, Any] = {
"online": False,
"lights": [],
"covers": [],
"sensors": [],
"lights_on": 0,
"lights_total": 0,
"error": None,
}
if not url or not token:
result["error"] = "Missing Home Assistant URL or token"
return result
base_url = url.rstrip("/")
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
try:
async with httpx.AsyncClient(timeout=15, verify=False) as client:
resp = await client.get(f"{base_url}/api/states", headers=headers)
resp.raise_for_status()
entities: List[Dict[str, Any]] = resp.json()
except httpx.HTTPStatusError as exc:
result["error"] = f"HTTP {exc.response.status_code}"
return result
except httpx.RequestError as exc:
result["error"] = f"Connection failed: {exc}"
return result
except Exception as exc:
result["error"] = str(exc)
return result
result["online"] = True
lights: List[Dict[str, Any]] = []
covers: List[Dict[str, Any]] = []
sensors: List[Dict[str, Any]] = []
for entity in entities:
entity_id: str = entity.get("entity_id", "")
domain = entity_id.split(".")[0] if "." in entity_id else ""
attrs = entity.get("attributes", {})
state = entity.get("state", "")
if state in ("unavailable", "unknown"):
continue
if domain == "light":
lights.append(_parse_light(entity))
elif domain == "cover":
covers.append(_parse_cover(entity))
elif domain == "sensor":
device_class = attrs.get("device_class", "")
if device_class == "temperature":
sensors.append(_parse_sensor(entity))
lights_on = sum(1 for light in lights if light["state"] == "on")
result["lights"] = lights
result["covers"] = covers
result["sensors"] = sensors
result["lights_on"] = lights_on
result["lights_total"] = len(lights)
return result

View file

@ -0,0 +1,153 @@
from __future__ import annotations
import asyncpg
from typing import Any, Dict, List, Optional
_pool: Optional[asyncpg.Pool] = None
async def init_pool(
host: str,
port: int,
dbname: str,
user: str,
password: str,
) -> None:
"""Initialise the global asyncpg connection pool.
Call once at application startup.
"""
global _pool
_pool = await asyncpg.create_pool(
host=host,
port=port,
database=dbname,
user=user,
password=password,
min_size=1,
max_size=5,
)
async def close_pool() -> None:
"""Close the global asyncpg connection pool.
Call once at application shutdown.
"""
global _pool
if _pool is not None:
await _pool.close()
_pool = None
def _row_to_dict(row: asyncpg.Record) -> Dict[str, Any]:
"""Convert an asyncpg Record to a plain dictionary with JSON-safe values."""
d: Dict[str, Any] = dict(row)
if "published_at" in d and d["published_at"] is not None:
d["published_at"] = d["published_at"].isoformat()
return d
async def get_news(
limit: int = 20,
offset: int = 0,
category: Optional[str] = None,
max_age_hours: int = 48,
) -> List[Dict[str, Any]]:
"""Fetch recent news articles from the market_news table.
Args:
limit: Maximum number of rows to return.
offset: Number of rows to skip (for pagination).
category: Optional category filter (exact match).
max_age_hours: Only return articles published within this many hours.
Returns:
List of news article dictionaries.
"""
if _pool is None:
raise RuntimeError("Database pool is not initialised. Call init_pool() first.")
params: List[Any] = []
param_idx = 1
base_query = (
"SELECT id, source, title, url, category, published_at "
"FROM market_news "
f"WHERE published_at > NOW() - INTERVAL '{int(max_age_hours)} hours'"
)
if category is not None:
base_query += f" AND category = ${param_idx}"
params.append(category)
param_idx += 1
base_query += f" ORDER BY published_at DESC LIMIT ${param_idx} OFFSET ${param_idx + 1}"
params.append(limit)
params.append(offset)
async with _pool.acquire() as conn:
rows = await conn.fetch(base_query, *params)
return [_row_to_dict(row) for row in rows]
async def get_news_count(
max_age_hours: int = 48,
category: Optional[str] = None,
) -> int:
"""Return the total count of recent news articles.
Args:
max_age_hours: Only count articles published within this many hours.
category: Optional category filter.
Returns:
Integer count.
"""
if _pool is None:
raise RuntimeError("Database pool is not initialised. Call init_pool() first.")
params: List[Any] = []
param_idx = 1
query = (
"SELECT COUNT(*) AS cnt "
"FROM market_news "
f"WHERE published_at > NOW() - INTERVAL '{int(max_age_hours)} hours'"
)
if category is not None:
query += f" AND category = ${param_idx}"
params.append(category)
async with _pool.acquire() as conn:
row = await conn.fetchrow(query, *params)
return int(row["cnt"]) if row else 0
async def get_categories(max_age_hours: int = 48) -> List[str]:
"""Return distinct categories from recent news articles.
Args:
max_age_hours: Only consider articles published within this many hours.
Returns:
Sorted list of category strings.
"""
if _pool is None:
raise RuntimeError("Database pool is not initialised. Call init_pool() first.")
query = (
"SELECT DISTINCT category "
"FROM market_news "
f"WHERE published_at > NOW() - INTERVAL '{int(max_age_hours)} hours' "
"AND category IS NOT NULL "
"ORDER BY category"
)
async with _pool.acquire() as conn:
rows = await conn.fetch(query)
return [row["category"] for row in rows]

View file

@ -0,0 +1,233 @@
from __future__ import annotations
import asyncio
import httpx
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
@dataclass
class ServerConfig:
"""Configuration for a single Unraid server."""
name: str
host: str
api_key: str = ""
port: int = 80
def _empty_stats(server: ServerConfig) -> Dict[str, Any]:
"""Return a default stats dictionary for a server that has not yet been queried."""
return {
"name": server.name,
"host": server.host,
"online": False,
"uptime": "",
"cpu": {"usage_pct": 0, "cores": 0, "temp_c": None},
"ram": {"used_gb": 0, "total_gb": 0, "pct": 0},
"array": {"status": "unknown", "disks": []},
"docker": {"running": 0, "containers": []},
"error": None,
}
def _parse_system_info(data: Dict[str, Any], result: Dict[str, Any]) -> None:
"""Populate *result* from a generic ``/api/system`` JSON response."""
result["online"] = True
result["uptime"] = data.get("uptime", "")
cpu_data = data.get("cpu", {})
result["cpu"]["usage_pct"] = cpu_data.get("usage_pct", cpu_data.get("usage", 0))
result["cpu"]["cores"] = cpu_data.get("cores", 0)
result["cpu"]["temp_c"] = cpu_data.get("temp_c", cpu_data.get("temp", None))
ram_data = data.get("ram", data.get("memory", {}))
result["ram"]["used_gb"] = round(ram_data.get("used_gb", ram_data.get("used", 0)), 2)
result["ram"]["total_gb"] = round(ram_data.get("total_gb", ram_data.get("total", 0)), 2)
total = result["ram"]["total_gb"]
if total > 0:
result["ram"]["pct"] = round(result["ram"]["used_gb"] / total * 100, 1)
else:
result["ram"]["pct"] = 0
def _parse_array_info(data: Dict[str, Any], result: Dict[str, Any]) -> None:
"""Populate array information from an API response."""
array_data = data.get("array", {})
result["array"]["status"] = array_data.get("status", "unknown")
disks_raw: List[Dict[str, Any]] = array_data.get("disks", [])
parsed_disks: List[Dict[str, Any]] = []
for disk in disks_raw:
parsed_disks.append({
"name": disk.get("name", ""),
"status": disk.get("status", "unknown"),
"size": disk.get("size", ""),
"used": disk.get("used", ""),
"temp_c": disk.get("temp_c", None),
})
result["array"]["disks"] = parsed_disks
def _parse_docker_info(data: Dict[str, Any], result: Dict[str, Any]) -> None:
"""Populate Docker container information from an API response."""
docker_data = data.get("docker", {})
containers_raw: List[Dict[str, Any]] = docker_data.get("containers", [])
containers: List[Dict[str, Any]] = []
running_count = 0
for container in containers_raw:
status = container.get("status", "unknown")
is_running = "running" in status.lower() if isinstance(status, str) else False
if is_running:
running_count += 1
containers.append({
"name": container.get("name", ""),
"status": status,
"image": container.get("image", ""),
"running": is_running,
})
result["docker"]["running"] = docker_data.get("running", running_count)
result["docker"]["containers"] = containers
async def _try_api_endpoint(
client: httpx.AsyncClient,
server: ServerConfig,
result: Dict[str, Any],
) -> bool:
"""Attempt to fetch stats via the Unraid OS API.
Returns True if successful, False otherwise.
"""
if not server.api_key:
return False
headers = {"Authorization": f"Bearer {server.api_key}"}
base = f"http://{server.host}:{server.port}"
try:
resp = await client.get(f"{base}/api/system", headers=headers)
if resp.status_code == 200:
data = resp.json()
_parse_system_info(data, result)
_parse_array_info(data, result)
_parse_docker_info(data, result)
return True
except Exception:
pass
# Try individual endpoints if the combined one failed
fetched_any = False
try:
resp = await client.get(f"{base}/api/cpu", headers=headers)
if resp.status_code == 200:
cpu_data = resp.json()
result["cpu"]["usage_pct"] = cpu_data.get("usage_pct", cpu_data.get("usage", 0))
result["cpu"]["cores"] = cpu_data.get("cores", 0)
result["cpu"]["temp_c"] = cpu_data.get("temp_c", None)
result["online"] = True
fetched_any = True
except Exception:
pass
try:
resp = await client.get(f"{base}/api/memory", headers=headers)
if resp.status_code == 200:
ram_data = resp.json()
result["ram"]["used_gb"] = round(ram_data.get("used_gb", ram_data.get("used", 0)), 2)
result["ram"]["total_gb"] = round(ram_data.get("total_gb", ram_data.get("total", 0)), 2)
total = result["ram"]["total_gb"]
if total > 0:
result["ram"]["pct"] = round(result["ram"]["used_gb"] / total * 100, 1)
result["online"] = True
fetched_any = True
except Exception:
pass
try:
resp = await client.get(f"{base}/api/array", headers=headers)
if resp.status_code == 200:
_parse_array_info(resp.json(), result)
result["online"] = True
fetched_any = True
except Exception:
pass
try:
resp = await client.get(f"{base}/api/docker", headers=headers)
if resp.status_code == 200:
_parse_docker_info(resp.json(), result)
result["online"] = True
fetched_any = True
except Exception:
pass
return fetched_any
async def _try_connectivity_check(
client: httpx.AsyncClient,
server: ServerConfig,
result: Dict[str, Any],
) -> None:
"""Perform a basic HTTP connectivity check as a fallback."""
try:
resp = await client.get(
f"http://{server.host}:{server.port}/",
follow_redirects=True,
)
result["online"] = resp.status_code < 500
except Exception:
result["online"] = False
async def fetch_server_stats(server: ServerConfig) -> Dict[str, Any]:
"""Fetch system stats from an Unraid server.
Tries the Unraid API first (if ``api_key`` is configured), then falls back
to a simple HTTP connectivity check.
Args:
server: A :class:`ServerConfig` describing the target server.
Returns:
Dictionary with server name, host, online status, and detailed stats
for CPU, RAM, array, and Docker containers.
"""
result = _empty_stats(server)
if not server.host:
result["error"] = "No host configured"
return result
try:
async with httpx.AsyncClient(timeout=10, verify=False) as client:
api_ok = await _try_api_endpoint(client, server, result)
if not api_ok and not result["online"]:
await _try_connectivity_check(client, server, result)
except Exception as exc:
result["online"] = False
result["error"] = str(exc)
return result
async def fetch_all_servers(servers: List[ServerConfig]) -> List[Dict[str, Any]]:
"""Fetch stats from all configured Unraid servers in parallel.
Args:
servers: List of :class:`ServerConfig` instances.
Returns:
List of stats dictionaries, one per server.
"""
if not servers:
return []
tasks = [fetch_server_stats(srv) for srv in servers]
return list(await asyncio.gather(*tasks))

View file

@ -0,0 +1,215 @@
from __future__ import annotations
import asyncio
import httpx
from typing import Any, Dict, List, Optional
# Project ID groupings
PRIVATE_PROJECTS: List[int] = [3, 4] # Haus & Garten, Jugendeinrichtung
SAMS_PROJECTS: List[int] = [2, 5] # OpenClaw AI, Sam's Wunderwelt
# Readable names for known project IDs
PROJECT_NAMES: Dict[int, str] = {
2: "OpenClaw AI",
3: "Haus & Garten",
4: "Jugendeinrichtung",
5: "Sam's Wunderwelt",
}
def _parse_task(task: Dict[str, Any], project_id: int) -> Dict[str, Any]:
"""Normalise a raw Vikunja task into a simplified dictionary."""
return {
"id": task.get("id", 0),
"title": task.get("title", ""),
"done": bool(task.get("done", False)),
"priority": task.get("priority", 0),
"project_id": project_id,
"project_name": PROJECT_NAMES.get(project_id, f"Project {project_id}"),
"due_date": task.get("due_date") or None,
"created": task.get("created") or None,
"updated": task.get("updated") or None,
"labels": [
label.get("title", "")
for label in (task.get("labels") or [])
if label.get("title")
],
}
async def _fetch_project_tasks(
client: httpx.AsyncClient,
base_url: str,
project_id: int,
) -> List[Dict[str, Any]]:
"""Fetch all tasks for a single Vikunja project.
Args:
client: An authenticated httpx.AsyncClient.
base_url: Vikunja API base URL.
project_id: The project ID to query.
Returns:
List of parsed task dictionaries.
"""
all_tasks: List[Dict[str, Any]] = []
page = 1
per_page = 50
while True:
try:
resp = await client.get(
f"{base_url}/projects/{project_id}/tasks",
params={"page": page, "per_page": per_page},
)
resp.raise_for_status()
tasks_page: List[Dict[str, Any]] = resp.json()
except Exception:
break
if not tasks_page:
break
for raw_task in tasks_page:
all_tasks.append(_parse_task(raw_task, project_id))
if len(tasks_page) < per_page:
break
page += 1
return all_tasks
def _sort_and_split(
tasks: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""Split tasks into open/done buckets and sort by priority descending."""
open_tasks = sorted(
[t for t in tasks if not t["done"]],
key=lambda t: t["priority"],
reverse=True,
)
done_tasks = sorted(
[t for t in tasks if t["done"]],
key=lambda t: t["priority"],
reverse=True,
)
return {
"open": open_tasks,
"done": done_tasks,
"open_count": len(open_tasks),
"done_count": len(done_tasks),
}
async def fetch_tasks(base_url: str, token: str) -> Dict[str, Any]:
"""Fetch tasks from all configured Vikunja projects.
Groups tasks into ``private`` (PRIVATE_PROJECTS) and ``sams`` (SAMS_PROJECTS).
Args:
base_url: Vikunja instance base URL (e.g. ``https://tasks.example.com``).
token: API token for Vikunja authentication.
Returns:
Dictionary with ``private`` and ``sams`` keys, each containing
``open``, ``done``, ``open_count``, and ``done_count``.
"""
result: Dict[str, Any] = {
"private": {"open": [], "done": [], "open_count": 0, "done_count": 0},
"sams": {"open": [], "done": [], "open_count": 0, "done_count": 0},
"error": None,
}
if not base_url or not token:
result["error"] = "Missing Vikunja base URL or token"
return result
clean_url = base_url.rstrip("/")
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
try:
async with httpx.AsyncClient(
timeout=15,
headers=headers,
) as client:
all_project_ids = list(set(PRIVATE_PROJECTS + SAMS_PROJECTS))
coros = [
_fetch_project_tasks(client, clean_url, pid)
for pid in all_project_ids
]
results_by_project = await asyncio.gather(*coros, return_exceptions=True)
project_tasks_map: Dict[int, List[Dict[str, Any]]] = {}
for pid, tasks_or_exc in zip(all_project_ids, results_by_project):
if isinstance(tasks_or_exc, Exception):
project_tasks_map[pid] = []
else:
project_tasks_map[pid] = tasks_or_exc
private_tasks: List[Dict[str, Any]] = []
for pid in PRIVATE_PROJECTS:
private_tasks.extend(project_tasks_map.get(pid, []))
sams_tasks: List[Dict[str, Any]] = []
for pid in SAMS_PROJECTS:
sams_tasks.extend(project_tasks_map.get(pid, []))
result["private"] = _sort_and_split(private_tasks)
result["sams"] = _sort_and_split(sams_tasks)
except httpx.HTTPStatusError as exc:
result["error"] = f"HTTP {exc.response.status_code}"
except httpx.RequestError as exc:
result["error"] = f"Connection failed: {exc}"
except Exception as exc:
result["error"] = str(exc)
return result
async def fetch_single_project(
base_url: str,
token: str,
project_id: int,
) -> Dict[str, Any]:
"""Fetch tasks for a single Vikunja project.
Args:
base_url: Vikunja instance base URL.
token: API token for authentication.
project_id: The project ID to query.
Returns:
Dictionary with ``open``, ``done``, ``open_count``, ``done_count``, and ``error``.
"""
result: Dict[str, Any] = {
"open": [],
"done": [],
"open_count": 0,
"done_count": 0,
"error": None,
}
if not base_url or not token:
result["error"] = "Missing Vikunja base URL or token"
return result
clean_url = base_url.rstrip("/")
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
try:
async with httpx.AsyncClient(timeout=15, headers=headers) as client:
tasks = await _fetch_project_tasks(client, clean_url, project_id)
split = _sort_and_split(tasks)
result.update(split)
except Exception as exc:
result["error"] = str(exc)
return result

View file

@ -0,0 +1,234 @@
from __future__ import annotations
import httpx
from typing import Any, Dict, List, Optional
WEATHER_ICONS: Dict[int, str] = {
113: "\u2600\ufe0f", # Clear/Sunny
116: "\u26c5", # Partly Cloudy
119: "\u2601\ufe0f", # Cloudy
122: "\u2601\ufe0f", # Overcast
143: "\ud83c\udf2b\ufe0f", # Mist
176: "\ud83c\udf26\ufe0f", # Patchy rain nearby
179: "\ud83c\udf28\ufe0f", # Patchy snow nearby
182: "\ud83c\udf28\ufe0f", # Patchy sleet nearby
185: "\ud83c\udf28\ufe0f", # Patchy freezing drizzle nearby
200: "\u26c8\ufe0f", # Thundery outbreaks nearby
227: "\ud83c\udf28\ufe0f", # Blowing snow
230: "\u2744\ufe0f", # Blizzard
248: "\ud83c\udf2b\ufe0f", # Fog
260: "\ud83c\udf2b\ufe0f", # Freezing fog
263: "\ud83c\udf26\ufe0f", # Patchy light drizzle
266: "\ud83c\udf27\ufe0f", # Light drizzle
281: "\ud83c\udf28\ufe0f", # Freezing drizzle
284: "\ud83c\udf28\ufe0f", # Heavy freezing drizzle
293: "\ud83c\udf26\ufe0f", # Patchy light rain
296: "\ud83c\udf27\ufe0f", # Light rain
299: "\ud83c\udf27\ufe0f", # Moderate rain at times
302: "\ud83c\udf27\ufe0f", # Moderate rain
305: "\ud83c\udf27\ufe0f", # Heavy rain at times
308: "\ud83c\udf27\ufe0f", # Heavy rain
311: "\ud83c\udf28\ufe0f", # Light freezing rain
314: "\ud83c\udf28\ufe0f", # Moderate or heavy freezing rain
317: "\ud83c\udf28\ufe0f", # Light sleet
320: "\ud83c\udf28\ufe0f", # Moderate or heavy sleet
323: "\ud83c\udf28\ufe0f", # Patchy light snow
326: "\u2744\ufe0f", # Light snow
329: "\u2744\ufe0f", # Patchy moderate snow
332: "\u2744\ufe0f", # Moderate snow
335: "\u2744\ufe0f", # Patchy heavy snow
338: "\u2744\ufe0f", # Heavy snow
350: "\ud83c\udf28\ufe0f", # Ice pellets
353: "\ud83c\udf26\ufe0f", # Light rain shower
356: "\ud83c\udf27\ufe0f", # Moderate or heavy rain shower
359: "\ud83c\udf27\ufe0f", # Torrential rain shower
362: "\ud83c\udf28\ufe0f", # Light sleet showers
365: "\ud83c\udf28\ufe0f", # Moderate or heavy sleet showers
368: "\u2744\ufe0f", # Light snow showers
371: "\u2744\ufe0f", # Moderate or heavy snow showers
374: "\ud83c\udf28\ufe0f", # Light showers of ice pellets
377: "\ud83c\udf28\ufe0f", # Moderate or heavy showers of ice pellets
386: "\u26c8\ufe0f", # Patchy light rain with thunder
389: "\u26c8\ufe0f", # Moderate or heavy rain with thunder
392: "\u26c8\ufe0f", # Patchy light snow with thunder
395: "\u26c8\ufe0f", # Moderate or heavy snow with thunder
}
def _get_weather_icon(code: int) -> str:
"""Map a WWO weather code to an emoji icon."""
return WEATHER_ICONS.get(code, "\ud83c\udf24\ufe0f")
def _parse_current_condition(condition: Dict[str, Any], location: str) -> Dict[str, Any]:
"""Parse a single current_condition entry from the wttr.in JSON."""
weather_code = int(condition.get("weatherCode", 113))
descriptions = condition.get("weatherDesc", [])
description = descriptions[0].get("value", "Unknown") if descriptions else "Unknown"
return {
"location": location,
"temp": int(condition.get("temp_C", 0)),
"feels_like": int(condition.get("FeelsLikeC", 0)),
"humidity": int(condition.get("humidity", 0)),
"wind_kmh": int(condition.get("windspeedKmph", 0)),
"description": description,
"icon": _get_weather_icon(weather_code),
}
def _parse_forecast_day(day: Dict[str, Any]) -> Dict[str, Any]:
"""Parse a single forecast day from the wttr.in weather array."""
date = day.get("date", "")
max_temp = int(day.get("maxtempC", 0))
min_temp = int(day.get("mintempC", 0))
astronomy = day.get("astronomy", [])
sunrise = astronomy[0].get("sunrise", "") if astronomy else ""
sunset = astronomy[0].get("sunset", "") if astronomy else ""
hourly = day.get("hourly", [])
if hourly:
midday = hourly[len(hourly) // 2]
weather_code = int(midday.get("weatherCode", 113))
descs = midday.get("weatherDesc", [])
description = descs[0].get("value", "Unknown") if descs else "Unknown"
else:
weather_code = 113
description = "Unknown"
return {
"date": date,
"max_temp": max_temp,
"min_temp": min_temp,
"icon": _get_weather_icon(weather_code),
"description": description,
"sunrise": sunrise,
"sunset": sunset,
}
async def fetch_weather(location: str) -> Dict[str, Any]:
"""Fetch current weather and 3-day forecast from wttr.in.
Args:
location: City name or coordinates (e.g. "Berlin" or "52.52,13.405").
Returns:
Dictionary with current conditions and 3-day forecast.
"""
fallback: Dict[str, Any] = {
"location": location,
"temp": 0,
"feels_like": 0,
"humidity": 0,
"wind_kmh": 0,
"description": "Unavailable",
"icon": "\u2753",
"forecast_3day": [],
"error": None,
}
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"https://wttr.in/{location}",
params={"format": "j1"},
headers={"Accept": "application/json"},
)
resp.raise_for_status()
data = resp.json()
except httpx.HTTPStatusError as exc:
fallback["error"] = f"HTTP {exc.response.status_code}"
return fallback
except httpx.RequestError as exc:
fallback["error"] = f"Request failed: {exc}"
return fallback
except Exception as exc:
fallback["error"] = str(exc)
return fallback
current_conditions = data.get("current_condition", [])
if not current_conditions:
fallback["error"] = "No current condition data"
return fallback
result = _parse_current_condition(current_conditions[0], location)
weather_days = data.get("weather", [])
forecast_3day: List[Dict[str, Any]] = []
for day in weather_days[:3]:
forecast_3day.append(_parse_forecast_day(day))
result["forecast_3day"] = forecast_3day
result["error"] = None
return result
async def fetch_hourly_forecast(location: str) -> List[Dict[str, Any]]:
"""Fetch hourly forecast for the current day from wttr.in.
Returns the next 8 hourly slots from the current day's forecast.
Args:
location: City name or coordinates.
Returns:
List of hourly forecast dicts with time, temp, icon, and precip_chance.
"""
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"https://wttr.in/{location}",
params={"format": "j1"},
headers={"Accept": "application/json"},
)
resp.raise_for_status()
data = resp.json()
except Exception:
return []
weather_days = data.get("weather", [])
if not weather_days:
return []
all_hourly: List[Dict[str, Any]] = []
for day in weather_days[:2]:
hourly_entries = day.get("hourly", [])
for entry in hourly_entries:
time_raw = entry.get("time", "0")
time_value = int(time_raw)
hours = time_value // 100
minutes = time_value % 100
time_str = f"{hours:02d}:{minutes:02d}"
weather_code = int(entry.get("weatherCode", 113))
descs = entry.get("weatherDesc", [])
description = descs[0].get("value", "Unknown") if descs else "Unknown"
all_hourly.append({
"time": time_str,
"temp": int(entry.get("tempC", 0)),
"icon": _get_weather_icon(weather_code),
"description": description,
"precip_chance": int(entry.get("chanceofrain", 0)),
"wind_kmh": int(entry.get("windspeedKmph", 0)),
})
from datetime import datetime
now_hour = datetime.now().hour
upcoming: List[Dict[str, Any]] = []
found_start = False
for slot in all_hourly:
slot_hour = int(slot["time"].split(":")[0])
if not found_start:
if slot_hour >= now_hour:
found_start = True
else:
continue
upcoming.append(slot)
if len(upcoming) >= 8:
break
return upcoming