daily-briefing/server/services/vikunja_service.py

256 lines
7.5 KiB
Python
Raw Permalink Normal View History

from __future__ import annotations
import asyncio
import httpx
from typing import Any, Dict, List, Optional
# Project ID groupings
PRIVATE_PROJECTS: List[int] = [3, 4] # Haus & Garten, Jugendeinrichtung
SAMS_PROJECTS: List[int] = [2, 5] # OpenClaw AI, Sam's Wunderwelt
# Readable names for known project IDs
PROJECT_NAMES: Dict[int, str] = {
2: "OpenClaw AI",
3: "Haus & Garten",
4: "Jugendeinrichtung",
5: "Sam's Wunderwelt",
}
def _parse_task(task: Dict[str, Any], project_id: int) -> Dict[str, Any]:
"""Normalise a raw Vikunja task into a simplified dictionary."""
return {
"id": task.get("id", 0),
"title": task.get("title", ""),
"done": bool(task.get("done", False)),
"priority": task.get("priority", 0),
"project_id": project_id,
"project_name": PROJECT_NAMES.get(project_id, f"Project {project_id}"),
"due_date": task.get("due_date") or None,
"created": task.get("created") or None,
"updated": task.get("updated") or None,
"labels": [
label.get("title", "")
for label in (task.get("labels") or [])
if label.get("title")
],
}
async def _fetch_project_tasks(
client: httpx.AsyncClient,
base_url: str,
project_id: int,
) -> List[Dict[str, Any]]:
"""Fetch all tasks for a single Vikunja project.
Args:
client: An authenticated httpx.AsyncClient.
base_url: Vikunja API base URL.
project_id: The project ID to query.
Returns:
List of parsed task dictionaries.
"""
all_tasks: List[Dict[str, Any]] = []
page = 1
per_page = 50
while True:
try:
resp = await client.get(
f"{base_url}/projects/{project_id}/tasks",
params={"page": page, "per_page": per_page},
)
resp.raise_for_status()
tasks_page: List[Dict[str, Any]] = resp.json()
except Exception:
break
if not tasks_page:
break
for raw_task in tasks_page:
all_tasks.append(_parse_task(raw_task, project_id))
if len(tasks_page) < per_page:
break
page += 1
return all_tasks
def _sort_and_split(
tasks: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""Split tasks into open/done buckets and sort by priority descending."""
open_tasks = sorted(
[t for t in tasks if not t["done"]],
key=lambda t: t["priority"],
reverse=True,
)
done_tasks = sorted(
[t for t in tasks if t["done"]],
key=lambda t: t["priority"],
reverse=True,
)
return {
"open": open_tasks,
"done": done_tasks,
"open_count": len(open_tasks),
"done_count": len(done_tasks),
}
async def fetch_tasks(base_url: str, token: str) -> Dict[str, Any]:
"""Fetch tasks from all configured Vikunja projects.
Groups tasks into ``private`` (PRIVATE_PROJECTS) and ``sams`` (SAMS_PROJECTS).
Args:
base_url: Vikunja instance base URL (e.g. ``https://tasks.example.com``).
token: API token for Vikunja authentication.
Returns:
Dictionary with ``private`` and ``sams`` keys, each containing
``open``, ``done``, ``open_count``, and ``done_count``.
"""
result: Dict[str, Any] = {
"private": {"open": [], "done": [], "open_count": 0, "done_count": 0},
"sams": {"open": [], "done": [], "open_count": 0, "done_count": 0},
"error": None,
}
if not base_url or not token:
result["error"] = "Missing Vikunja base URL or token"
return result
clean_url = base_url.rstrip("/")
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
try:
async with httpx.AsyncClient(
timeout=15,
headers=headers,
) as client:
all_project_ids = list(set(PRIVATE_PROJECTS + SAMS_PROJECTS))
coros = [
_fetch_project_tasks(client, clean_url, pid)
for pid in all_project_ids
]
results_by_project = await asyncio.gather(*coros, return_exceptions=True)
project_tasks_map: Dict[int, List[Dict[str, Any]]] = {}
for pid, tasks_or_exc in zip(all_project_ids, results_by_project):
if isinstance(tasks_or_exc, Exception):
project_tasks_map[pid] = []
else:
project_tasks_map[pid] = tasks_or_exc
private_tasks: List[Dict[str, Any]] = []
for pid in PRIVATE_PROJECTS:
private_tasks.extend(project_tasks_map.get(pid, []))
sams_tasks: List[Dict[str, Any]] = []
for pid in SAMS_PROJECTS:
sams_tasks.extend(project_tasks_map.get(pid, []))
result["private"] = _sort_and_split(private_tasks)
result["sams"] = _sort_and_split(sams_tasks)
except httpx.HTTPStatusError as exc:
result["error"] = f"HTTP {exc.response.status_code}"
except httpx.RequestError as exc:
result["error"] = f"Connection failed: {exc}"
except Exception as exc:
result["error"] = str(exc)
return result
async def toggle_task_done(
base_url: str,
token: str,
task_id: int,
done: bool,
) -> Dict[str, Any]:
"""Toggle the done state of a single Vikunja task.
Args:
base_url: Vikunja instance base URL.
token: API token for authentication.
task_id: The task ID to update.
done: New done state.
Returns:
Dictionary with ``ok`` and optionally ``error``.
"""
if not base_url or not token:
return {"ok": False, "error": "Missing Vikunja base URL or token"}
clean_url = base_url.rstrip("/")
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
try:
async with httpx.AsyncClient(timeout=10, headers=headers) as client:
resp = await client.post(
f"{clean_url}/tasks/{task_id}",
json={"done": done},
)
resp.raise_for_status()
return {"ok": True, "task": {"id": task_id, "done": done}}
except httpx.HTTPStatusError as exc:
return {"ok": False, "error": f"HTTP {exc.response.status_code}"}
except Exception as exc:
return {"ok": False, "error": str(exc)}
async def fetch_single_project(
base_url: str,
token: str,
project_id: int,
) -> Dict[str, Any]:
"""Fetch tasks for a single Vikunja project.
Args:
base_url: Vikunja instance base URL.
token: API token for authentication.
project_id: The project ID to query.
Returns:
Dictionary with ``open``, ``done``, ``open_count``, ``done_count``, and ``error``.
"""
result: Dict[str, Any] = {
"open": [],
"done": [],
"open_count": 0,
"done_count": 0,
"error": None,
}
if not base_url or not token:
result["error"] = "Missing Vikunja base URL or token"
return result
clean_url = base_url.rstrip("/")
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
try:
async with httpx.AsyncClient(timeout=15, headers=headers) as client:
tasks = await _fetch_project_tasks(client, clean_url, project_id)
split = _sort_and_split(tasks)
result.update(split)
except Exception as exc:
result["error"] = str(exc)
return result