Coverage for src / lilbee / crawl_task.py: 100%
88 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
1"""Background crawl task management — start, track, and query crawl operations."""
3import asyncio
4import logging
5import uuid
6from dataclasses import dataclass, field
7from datetime import UTC, datetime
8from enum import StrEnum
10from lilbee.crawler import crawl_and_save
11from lilbee.progress import CrawlPageEvent, DetailedProgressCallback, EventType, ProgressEvent
13log = logging.getLogger(__name__)
15# Maximum completed tasks to retain in memory before evicting oldest.
16_MAX_COMPLETED_TASKS = 100
19class TaskStatus(StrEnum):
20 """Lifecycle states for a crawl task."""
22 PENDING = "pending"
23 RUNNING = "running"
24 DONE = "done"
25 FAILED = "failed"
28@dataclass
29class CrawlTask:
30 """Tracks a single crawl operation.
32 depth / max_pages follow the crawl_and_save three-state convention: None =
33 unbounded, 0 (depth only) = single URL, positive int = explicit cap.
34 """
36 task_id: str
37 url: str
38 depth: int | None
39 max_pages: int | None
40 status: TaskStatus = TaskStatus.PENDING
41 pages_crawled: int = 0
42 pages_total: int | None = None
43 error: str | None = None
44 started_at: str = ""
45 finished_at: str = ""
46 _async_task: asyncio.Task[None] | None = field(default=None, repr=False, init=False)
49class TaskRegistry:
50 """In-memory registry of active and completed crawl tasks.
51 A single module-level instance (_registry) is used because task tracking
52 is inherently per-process state (asyncio.Task references, etc.).
53 """
55 def __init__(self) -> None:
56 self.tasks: dict[str, CrawlTask] = {}
58 def clear(self) -> None:
59 """Remove all tasks from the registry."""
60 self.tasks.clear()
63_registry = TaskRegistry()
66def now_iso() -> str:
67 """Current UTC time as ISO 8601 string."""
68 return datetime.now(UTC).isoformat()
71def make_progress_updater(task: CrawlTask) -> DetailedProgressCallback:
72 """Return a progress callback that updates task fields from crawl events."""
74 def _on_progress(event_type: EventType, data: ProgressEvent) -> None:
75 if event_type == EventType.CRAWL_PAGE:
76 if not isinstance(data, CrawlPageEvent):
77 raise TypeError(f"Expected CrawlPageEvent, got {type(data).__name__}")
78 task.pages_crawled = data.current
79 task.pages_total = data.total
81 return _on_progress
84async def run_crawl(task: CrawlTask) -> None:
85 """Execute crawl, save results, and trigger sync."""
86 task.status = TaskStatus.RUNNING
87 task.started_at = now_iso()
88 progress = make_progress_updater(task)
90 try:
91 paths = await crawl_and_save(
92 task.url,
93 depth=task.depth,
94 max_pages=task.max_pages,
95 on_progress=progress,
96 )
97 task.status = TaskStatus.DONE
98 task.pages_crawled = task.pages_crawled or len(paths)
99 task.finished_at = now_iso()
100 log.info("Crawl complete: %s → %d files", task.url, len(paths))
101 try:
102 from lilbee.ingest import sync
104 await sync(quiet=True)
105 except Exception:
106 log.warning("Post-crawl sync failed for %s", task.url, exc_info=True)
107 except Exception as exc:
108 task.status = TaskStatus.FAILED
109 task.error = str(exc)
110 task.finished_at = now_iso()
111 log.warning("Crawl failed: %s — %s", task.url, exc)
112 finally:
113 task._async_task = None
116def _evict_completed() -> None:
117 """Remove completed tasks with the earliest ``finished_at`` when over cap.
119 Finish order diverges from start order whenever short tasks complete
120 while a long one is still running, so sort on ``finished_at``
121 rather than dict insertion order.
122 """
123 done_statuses = (TaskStatus.DONE, TaskStatus.FAILED)
124 tasks = _registry.tasks
125 completed = [(tid, t) for tid, t in tasks.items() if t.status in done_statuses]
126 excess = len(completed) - _MAX_COMPLETED_TASKS
127 if excess <= 0:
128 return
129 completed.sort(key=lambda pair: pair[1].finished_at)
130 for tid, _ in completed[:excess]:
131 del tasks[tid]
134def start_crawl(
135 url: str,
136 depth: int | None = None,
137 max_pages: int | None = None,
138) -> str:
139 """Create a crawl task and launch it as an asyncio background task.
141 Defaults to whole-site unbounded recursion. Pass depth=0 for single URL.
142 Returns the task_id for status polling.
143 """
144 _evict_completed()
145 task_id = uuid.uuid4().hex[:12]
146 task = CrawlTask(
147 task_id=task_id,
148 url=url,
149 depth=depth,
150 max_pages=max_pages,
151 )
152 _registry.tasks[task_id] = task
153 task._async_task = asyncio.create_task(run_crawl(task))
154 return task_id
157def get_task(task_id: str) -> CrawlTask | None:
158 """Look up a crawl task by ID."""
159 return _registry.tasks.get(task_id)
162def list_tasks() -> list[CrawlTask]:
163 """Return all tracked crawl tasks (active and completed)."""
164 return list(_registry.tasks.values())
167def clear_tasks() -> None:
168 """Remove all tasks from the registry (for testing)."""
169 _registry.clear()