from __future__ import annotations import asyncio import httpx from typing import Any, Dict, List, Optional # Project ID groupings PRIVATE_PROJECTS: List[int] = [3, 4] # Haus & Garten, Jugendeinrichtung SAMS_PROJECTS: List[int] = [2, 5] # OpenClaw AI, Sam's Wunderwelt # Readable names for known project IDs PROJECT_NAMES: Dict[int, str] = { 2: "OpenClaw AI", 3: "Haus & Garten", 4: "Jugendeinrichtung", 5: "Sam's Wunderwelt", } def _parse_task(task: Dict[str, Any], project_id: int) -> Dict[str, Any]: """Normalise a raw Vikunja task into a simplified dictionary.""" return { "id": task.get("id", 0), "title": task.get("title", ""), "done": bool(task.get("done", False)), "priority": task.get("priority", 0), "project_id": project_id, "project_name": PROJECT_NAMES.get(project_id, f"Project {project_id}"), "due_date": task.get("due_date") or None, "created": task.get("created") or None, "updated": task.get("updated") or None, "labels": [ label.get("title", "") for label in (task.get("labels") or []) if label.get("title") ], } async def _fetch_project_tasks( client: httpx.AsyncClient, base_url: str, project_id: int, ) -> List[Dict[str, Any]]: """Fetch all tasks for a single Vikunja project. Args: client: An authenticated httpx.AsyncClient. base_url: Vikunja API base URL. project_id: The project ID to query. Returns: List of parsed task dictionaries. """ all_tasks: List[Dict[str, Any]] = [] page = 1 per_page = 50 while True: try: resp = await client.get( f"{base_url}/projects/{project_id}/tasks", params={"page": page, "per_page": per_page}, ) resp.raise_for_status() tasks_page: List[Dict[str, Any]] = resp.json() except Exception: break if not tasks_page: break for raw_task in tasks_page: all_tasks.append(_parse_task(raw_task, project_id)) if len(tasks_page) < per_page: break page += 1 return all_tasks def _sort_and_split( tasks: List[Dict[str, Any]], ) -> Dict[str, Any]: """Split tasks into open/done buckets and sort by priority descending.""" open_tasks = sorted( [t for t in tasks if not t["done"]], key=lambda t: t["priority"], reverse=True, ) done_tasks = sorted( [t for t in tasks if t["done"]], key=lambda t: t["priority"], reverse=True, ) return { "open": open_tasks, "done": done_tasks, "open_count": len(open_tasks), "done_count": len(done_tasks), } async def fetch_tasks(base_url: str, token: str) -> Dict[str, Any]: """Fetch tasks from all configured Vikunja projects. Groups tasks into ``private`` (PRIVATE_PROJECTS) and ``sams`` (SAMS_PROJECTS). Args: base_url: Vikunja instance base URL (e.g. ``https://tasks.example.com``). token: API token for Vikunja authentication. Returns: Dictionary with ``private`` and ``sams`` keys, each containing ``open``, ``done``, ``open_count``, and ``done_count``. """ result: Dict[str, Any] = { "private": {"open": [], "done": [], "open_count": 0, "done_count": 0}, "sams": {"open": [], "done": [], "open_count": 0, "done_count": 0}, "error": None, } if not base_url or not token: result["error"] = "Missing Vikunja base URL or token" return result clean_url = base_url.rstrip("/") headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", } try: async with httpx.AsyncClient( timeout=15, headers=headers, ) as client: all_project_ids = list(set(PRIVATE_PROJECTS + SAMS_PROJECTS)) coros = [ _fetch_project_tasks(client, clean_url, pid) for pid in all_project_ids ] results_by_project = await asyncio.gather(*coros, return_exceptions=True) project_tasks_map: Dict[int, List[Dict[str, Any]]] = {} for pid, tasks_or_exc in zip(all_project_ids, results_by_project): if isinstance(tasks_or_exc, Exception): project_tasks_map[pid] = [] else: project_tasks_map[pid] = tasks_or_exc private_tasks: List[Dict[str, Any]] = [] for pid in PRIVATE_PROJECTS: private_tasks.extend(project_tasks_map.get(pid, [])) sams_tasks: List[Dict[str, Any]] = [] for pid in SAMS_PROJECTS: sams_tasks.extend(project_tasks_map.get(pid, [])) result["private"] = _sort_and_split(private_tasks) result["sams"] = _sort_and_split(sams_tasks) except httpx.HTTPStatusError as exc: result["error"] = f"HTTP {exc.response.status_code}" except httpx.RequestError as exc: result["error"] = f"Connection failed: {exc}" except Exception as exc: result["error"] = str(exc) return result async def toggle_task_done( base_url: str, token: str, task_id: int, done: bool, ) -> Dict[str, Any]: """Toggle the done state of a single Vikunja task. Args: base_url: Vikunja instance base URL. token: API token for authentication. task_id: The task ID to update. done: New done state. Returns: Dictionary with ``ok`` and optionally ``error``. """ if not base_url or not token: return {"ok": False, "error": "Missing Vikunja base URL or token"} clean_url = base_url.rstrip("/") headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", } try: async with httpx.AsyncClient(timeout=10, headers=headers) as client: resp = await client.post( f"{clean_url}/tasks/{task_id}", json={"done": done}, ) resp.raise_for_status() return {"ok": True, "task": {"id": task_id, "done": done}} except httpx.HTTPStatusError as exc: return {"ok": False, "error": f"HTTP {exc.response.status_code}"} except Exception as exc: return {"ok": False, "error": str(exc)} async def fetch_single_project( base_url: str, token: str, project_id: int, ) -> Dict[str, Any]: """Fetch tasks for a single Vikunja project. Args: base_url: Vikunja instance base URL. token: API token for authentication. project_id: The project ID to query. Returns: Dictionary with ``open``, ``done``, ``open_count``, ``done_count``, and ``error``. """ result: Dict[str, Any] = { "open": [], "done": [], "open_count": 0, "done_count": 0, "error": None, } if not base_url or not token: result["error"] = "Missing Vikunja base URL or token" return result clean_url = base_url.rstrip("/") headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", } try: async with httpx.AsyncClient(timeout=15, headers=headers) as client: tasks = await _fetch_project_tasks(client, clean_url, project_id) split = _sort_and_split(tasks) result.update(split) except Exception as exc: result["error"] = str(exc) return result