Coverage for src / lilbee / crawler / api.py: 100%
204 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
1"""Thin orchestration layer: builds specs from ``cfg``, drives a :class:`WebFetcher`.
3No crawl4ai imports. All backend-specific knowledge lives in
4:mod:`lilbee.crawler.crawl4ai_fetcher`; this module only decides
5*what* to crawl (depth/pages/filters/concurrency) and *where* to
6put the bytes (per-page flush + metadata). Callers (CLI, MCP,
7HTTP, TUI) import these functions via the package façade in
8``lilbee.crawler.__init__``.
9"""
11from __future__ import annotations
13import asyncio
14import inspect
15import logging
16import math
17import threading
18import time
19from collections.abc import Callable
20from datetime import UTC, datetime
21from pathlib import Path
22from typing import Any
24from lilbee.config import cfg
25from lilbee.crawler import bootstrap, save, sitemap
26from lilbee.crawler.bootstrap import CrawlerBrowserMissing
27from lilbee.crawler.crawl4ai_fetcher import Crawl4aiFetcher
28from lilbee.crawler.models import (
29 ConcurrencySpec,
30 CrawlResult,
31 FetchedPage,
32 FilterSpec,
33)
34from lilbee.crawler.save import METADATA_FLUSH_INTERVAL, CrawlMeta
35from lilbee.crawler.url_filter import validate_crawl_url
36from lilbee.progress import (
37 CrawlDoneEvent,
38 CrawlPageEvent,
39 CrawlStartEvent,
40 DetailedProgressCallback,
41 EventType,
42)
44log = logging.getLogger(__name__)
47class CrawlerState:
48 """Per-process mutable state for the crawler (semaphore, periodic sync tracking).
50 Encapsulates state that would otherwise live as bare module-level globals.
51 A single module-level instance (``_state``) is used because this state is
52 inherently per-process (threading primitives, asyncio tasks tied to the
53 running loop). Test isolation is via :meth:`reset`.
54 """
56 def __init__(self) -> None:
57 self.semaphore: asyncio.Semaphore | None = None
58 self.semaphore_limit: int = 0
59 self.last_sync_time: float = 0.0
60 self.sync_running: threading.Lock = threading.Lock()
61 self.background_tasks: set[asyncio.Task[None]] = set()
63 def reset(self) -> None:
64 """Reset all state (useful for testing)."""
65 self.semaphore = None
66 self.semaphore_limit = 0
67 self.last_sync_time = 0.0
68 self.sync_running = threading.Lock()
69 self.background_tasks = set()
72_state = CrawlerState()
75def _get_crawl_semaphore() -> asyncio.Semaphore | None:
76 """Return an asyncio semaphore for crawl concurrency, or None if unlimited (0)."""
77 limit = cfg.crawl_max_concurrent
78 if limit <= 0:
79 return None
80 if _state.semaphore is None or _state.semaphore_limit != limit:
81 _state.semaphore = asyncio.Semaphore(limit)
82 _state.semaphore_limit = limit
83 return _state.semaphore
86def _resolve_limit(value: int | None, cfg_ceiling: int | None) -> int | None:
87 """Resolve a caller-provided crawl limit to the number the fetcher consumes.
89 None -> cfg_ceiling (itself may be None; ``None`` means unbounded)
90 n > 0 -> n (explicit caller intent; cfg is not a ceiling here)
91 n <= 0 -> ValueError (use None for unbounded, not 0)
92 """
93 effective = value if value is not None else cfg_ceiling
94 if effective is None:
95 return None
96 if effective <= 0:
97 raise ValueError("crawl limit must be a positive int or None")
98 return effective
101def _build_concurrency_spec() -> ConcurrencySpec:
102 """Snapshot the crawl-concurrency settings from ``cfg`` into a spec."""
103 return ConcurrencySpec(
104 semaphore_count=cfg.crawl_concurrent_requests,
105 mean_delay=cfg.crawl_mean_delay,
106 max_delay_range=cfg.crawl_max_delay_range,
107 retry_on_rate_limit=cfg.crawl_retry_on_rate_limit,
108 retry_base_delay_min=cfg.crawl_retry_base_delay_min,
109 retry_base_delay_max=cfg.crawl_retry_base_delay_max,
110 retry_max_backoff=cfg.crawl_retry_max_backoff,
111 retry_max_attempts=cfg.crawl_retry_max_attempts,
112 )
115def _build_filter_spec(*, include_subdomains: bool) -> FilterSpec:
116 """Snapshot the filter settings from ``cfg`` + caller flags."""
117 return FilterSpec(
118 exclude_patterns=list(cfg.crawl_exclude_patterns),
119 include_subdomains=include_subdomains,
120 )
123def _fetched_to_result(page: FetchedPage) -> CrawlResult:
124 """Translate the fetcher's value type to the public ``CrawlResult`` shape."""
125 return CrawlResult(
126 url=page.url,
127 markdown=page.markdown,
128 success=page.success,
129 error=page.error,
130 )
133async def crawl_single(url: str, *, quiet: bool = False) -> CrawlResult:
134 """Fetch a single URL.
136 Raises :class:`CrawlerBackendMissing` if the crawler extra isn't installed.
137 """
138 validate_crawl_url(url)
139 from lilbee.crawler import crawler_available
141 if not crawler_available():
142 raise bootstrap.CrawlerBackendMissing(
143 "Web crawling is not available. Run 'uv sync --extra crawler' to enable it."
144 )
145 try:
146 async with Crawl4aiFetcher(quiet=quiet) as fetcher:
147 page = await fetcher.fetch_single(url, timeout=cfg.crawl_timeout)
148 return _fetched_to_result(page)
149 except CrawlerBrowserMissing:
150 raise
151 except Exception as exc:
152 log.warning("Failed to crawl %s: %s", url, exc)
153 return CrawlResult(url=url, success=False, error=str(exc))
156def _pages_cap(pages: int | None) -> float:
157 """Return the per-result counter ceiling for visible progress.
159 ``None`` (unbounded) maps to ``math.inf`` so the streaming loop's hard
160 cap check is a pure numeric compare with no branching.
161 """
162 return math.inf if pages is None else pages
165async def _drain_page_stream(
166 page_stream: Any,
167 *,
168 on_progress: DetailedProgressCallback | None,
169 on_result: Callable[[CrawlResult], Any] | None,
170 sitemap_total: int,
171 pages_cap: float,
172 cancel: threading.Event | None,
173) -> list[CrawlResult]:
174 """Consume a fetcher's page stream, emitting events and flushing per page.
176 Returns the accumulated ``CrawlResult`` list. The stream is closed
177 deterministically by the caller; this helper only iterates.
178 """
179 results: list[CrawlResult] = []
180 counter = 0
182 def _should_cancel() -> bool:
183 return cancel is not None and cancel.is_set()
185 async for page in page_stream:
186 if _should_cancel():
187 break
188 counter += 1
189 if on_progress:
190 on_progress(
191 EventType.CRAWL_PAGE,
192 CrawlPageEvent(url=page.url, current=counter, total=sitemap_total),
193 )
194 new_result = _fetched_to_result(page)
195 results.append(new_result)
196 if on_result is not None:
197 try:
198 rv = on_result(new_result)
199 if inspect.isawaitable(rv):
200 await rv
201 except OSError:
202 # A disk-side flush failure must not masquerade as a crawl
203 # failure. Log and keep streaming; the caller still sees the
204 # result in its returned list.
205 log.exception("Flush callback failed for %s", new_result.url)
206 # Hard cap on visible progress. The BFS may emit failed / redirected
207 # pages that push the per-result counter past the cap even after the
208 # strategy has stopped dispatching. Break explicitly so the
209 # user-visible count never exceeds the number the caller asked for.
210 if counter >= pages_cap:
211 break
212 return results
215def _handle_crawl_teardown_error(
216 url: str,
217 exc: Exception,
218 *,
219 cancel: threading.Event | None,
220 results: list[CrawlResult],
221) -> None:
222 """Classify a recursive-crawl exception: cancel-teardown vs real failure.
224 After cancel, crawl4ai may raise BrowserContext teardown errors as
225 in-flight URLs bail. That's expected noise, not a failure worth
226 surfacing. Otherwise, log and append a synthetic error result (only
227 when nothing was produced so callers always see at least one entry).
228 """
229 cancelled = cancel is not None and cancel.is_set()
230 if cancelled:
231 log.debug("Recursive crawl of %s ended during cancel teardown: %s", url, exc)
232 return
233 log.warning("Recursive crawl of %s failed: %s", url, exc)
234 if not results:
235 results.append(CrawlResult(url=url, success=False, error=str(exc)))
238async def crawl_recursive(
239 url: str,
240 max_depth: int | None = None,
241 max_pages: int | None = None,
242 on_progress: DetailedProgressCallback | None = None,
243 cancel: threading.Event | None = None,
244 *,
245 quiet: bool = False,
246 include_subdomains: bool = False,
247 on_result: Callable[[CrawlResult], Any] | None = None,
248) -> list[CrawlResult]:
249 """Crawl a URL recursively using BFS, streaming per-page progress.
251 None values for ``max_depth`` / ``max_pages`` mean unbounded (constrained
252 only by whatever ceiling the user has set in ``cfg.crawl_max_{depth,pages}``,
253 if any). Positive ints are explicit caps. ``CRAWL_PAGE`` events fire as
254 each page completes; total is ``CRAWL_TOTAL_UNKNOWN`` by default and
255 promoted to the sitemap count when available.
257 By default the crawl is scoped to the exact starting host so a Wikipedia
258 article doesn't wander into other language editions. Pass
259 ``include_subdomains=True`` to broaden scope to the starting host plus any
260 subdomains (e.g. ``en.wikipedia.org`` plus ``af.wikipedia.org``).
262 If ``on_result`` is provided, it's called for each streamed ``CrawlResult``
263 the moment it arrives (before the next page yields). Callers use this to
264 flush pages to disk incrementally so a cancelled crawl keeps its partial
265 output.
266 """
267 validate_crawl_url(url)
268 depth = _resolve_limit(max_depth, cfg.crawl_max_depth)
269 pages = _resolve_limit(max_pages, cfg.crawl_max_pages)
271 # Fail fast when the ``crawler`` extra wasn't installed so SSE
272 # callers see ``event: error`` instead of a silent zero-results run.
273 from lilbee.crawler import crawler_available
275 if not crawler_available():
276 raise bootstrap.CrawlerBackendMissing(
277 "Web crawling is not available. Run 'uv sync --extra crawler' to enable it."
278 )
280 # Fail fast before pulling in backend submodules so callers get a clean
281 # CrawlerBrowserMissing instead of a Playwright install banner.
282 if not bootstrap.chromium_installed():
283 raise CrawlerBrowserMissing(
284 "Playwright Chromium browser not installed. "
285 "Run 'uv run playwright install chromium' to enable /crawl."
286 )
288 # Best-effort sitemap lookup so the TUI / CLI can render a real page-count
289 # denominator instead of [n/-1]. Falls back to CRAWL_TOTAL_UNKNOWN on any
290 # failure; off the hot path so a slow/missing sitemap never blocks the crawl.
291 sitemap_total = await asyncio.to_thread(
292 sitemap._count_sitemap_urls, url, include_subdomains=include_subdomains
293 )
295 concurrency = _build_concurrency_spec()
296 filters = _build_filter_spec(include_subdomains=include_subdomains)
298 results: list[CrawlResult] = []
299 try:
300 async with Crawl4aiFetcher(quiet=quiet) as fetcher:
301 # Hold an explicit reference to the generator so we can aclose
302 # it deterministically on break. Without this, the generator's
303 # finally block (which also short-circuits the BFS strategy) only
304 # runs at gc time, which is too late for callers that expect the
305 # strategy to stop the moment we hit ``max_pages``.
306 page_stream = fetcher.fetch_recursive(
307 url,
308 depth=depth,
309 max_pages=pages,
310 timeout=cfg.crawl_timeout,
311 concurrency=concurrency,
312 filters=filters,
313 cancel=cancel,
314 )
315 try:
316 results = await _drain_page_stream(
317 page_stream,
318 on_progress=on_progress,
319 on_result=on_result,
320 sitemap_total=sitemap_total,
321 pages_cap=_pages_cap(pages),
322 cancel=cancel,
323 )
324 finally:
325 await page_stream.aclose()
326 except CrawlerBrowserMissing:
327 raise
328 except Exception as exc:
329 _handle_crawl_teardown_error(url, exc, cancel=cancel, results=results)
331 return results
334async def _maybe_periodic_sync() -> None:
335 """Fire off a background sync if the ``crawl_sync_interval`` has elapsed.
337 Skips if a sync is already running or periodic sync is disabled
338 (``interval=0``). Uses a ``threading.Lock`` to avoid asyncio
339 event-loop binding issues when called from different loops.
340 """
341 interval = cfg.crawl_sync_interval
342 if interval <= 0 or not _state.sync_running.acquire(blocking=False):
343 return
345 now = time.monotonic()
346 if now - _state.last_sync_time < interval:
347 _state.sync_running.release()
348 return
350 _state.last_sync_time = now
352 async def _run_sync() -> None:
353 try:
354 from lilbee.ingest import sync
356 await sync(quiet=True)
357 except Exception as exc:
358 log.warning("Periodic sync during crawl failed: %s", exc)
359 finally:
360 _state.sync_running.release()
362 task = asyncio.create_task(_run_sync())
363 _state.background_tasks.add(task)
364 task.add_done_callback(_state.background_tasks.discard)
367def _make_flush_page(
368 meta: dict[str, CrawlMeta],
369 written_paths: list[Path],
370 counter: dict[str, int],
371) -> Callable[[CrawlResult], Any]:
372 """Build a per-result flush closure that batches metadata writes.
374 Filesystem work runs through ``asyncio.to_thread`` so the streaming
375 event loop isn't blocked by per-page writes on slow filesystems.
377 ``counter`` is a single-entry dict used as a mutable int so the closure
378 can share counter state with the caller without nonlocal gymnastics.
379 """
381 def _sync_flush(result: CrawlResult) -> Path | None:
382 outcome = save._save_single_result(result, meta)
383 if outcome is None:
384 return None
385 save._update_single_metadata(meta, result.url, outcome, datetime.now(UTC).isoformat())
386 counter["pending"] += 1
387 if counter["pending"] >= METADATA_FLUSH_INTERVAL:
388 save.save_crawl_metadata(meta)
389 counter["pending"] = 0
390 return outcome.path
392 async def flush_page(result: CrawlResult) -> Path | None:
393 path = await asyncio.to_thread(_sync_flush, result)
394 if path is not None:
395 written_paths.append(path)
396 return path
398 return flush_page
401async def crawl_and_save(
402 url: str,
403 *,
404 depth: int | None = None,
405 max_pages: int | None = None,
406 on_progress: DetailedProgressCallback | None = None,
407 cancel: threading.Event | None = None,
408 quiet: bool = False,
409 include_subdomains: bool = False,
410) -> list[Path]:
411 """Crawl URL(s), save as markdown, update metadata. Returns paths written.
413 ``depth``: ``None`` = whole-site unbounded recursion (default). ``0`` =
414 single URL, no recursion. ``N > 0`` = max link-follow depth.
415 ``max_pages``: ``None`` = no limit. Positive int = cap.
416 ``cfg.crawl_max_{depth,pages}`` act as user-opted-in ceilings applied only
417 when ``depth``/``max_pages`` are ``None``.
419 When recursing, the crawl is scoped to the exact starting host by default.
420 Set ``include_subdomains=True`` to also follow links into sibling
421 subdomains of the starting host.
423 Uses hash-based change detection: always fetches, but only saves files
424 whose content has changed (or is new). Pages are flushed to disk as they
425 stream so a cancelled crawl preserves the pages already fetched instead
426 of discarding them.
427 """
428 # Reject early when the crawler extra isn't installed. Runs before the
429 # Chromium bootstrap so a user without [crawler] doesn't pay the ~160 MB
430 # download just to hit the same error afterward.
431 from lilbee.crawler import crawler_available
433 if not crawler_available():
434 raise bootstrap.CrawlerBackendMissing(
435 "Web crawling is not available. Run 'uv sync --extra crawler' to enable it."
436 )
438 # Auto-bootstrap Chromium on first use so every crawl entry point works
439 # on a fresh install without a separate setup step. ``bootstrap_chromium``
440 # short-circuits when Chromium is already installed. Any progress is
441 # forwarded through the same ``on_progress`` callback so downstream UIs
442 # surface a 'setup' stage before the crawl events.
443 if not bootstrap.chromium_installed():
444 await bootstrap.bootstrap_chromium(on_progress=on_progress)
446 sem = _get_crawl_semaphore()
447 if sem is not None:
448 await sem.acquire()
449 try:
450 if on_progress:
451 start_depth = depth if depth is not None else 0
452 on_progress(EventType.CRAWL_START, CrawlStartEvent(url=url, depth=start_depth))
454 meta = save.load_crawl_metadata()
455 written_paths: list[Path] = []
456 pages_seen = 0
457 counter = {"pending": 0}
458 flush_page = _make_flush_page(meta, written_paths, counter)
460 if depth == 0:
461 result = await crawl_single(url, quiet=quiet)
462 pages_seen = 1
463 try:
464 await flush_page(result)
465 except OSError:
466 log.exception("Flush failed for %s", result.url)
467 if on_progress:
468 on_progress(EventType.CRAWL_PAGE, CrawlPageEvent(url=url, current=1, total=1))
469 else:
470 results = await crawl_recursive(
471 url,
472 max_depth=depth,
473 max_pages=max_pages,
474 on_progress=on_progress,
475 cancel=cancel,
476 quiet=quiet,
477 include_subdomains=include_subdomains,
478 on_result=flush_page,
479 )
480 pages_seen = len(results)
482 if counter["pending"] > 0:
483 try:
484 save.save_crawl_metadata(meta)
485 except OSError:
486 log.exception("Final metadata flush failed")
488 cancelled = cancel is not None and cancel.is_set()
489 if not cancelled:
490 await _maybe_periodic_sync()
492 if on_progress:
493 on_progress(
494 EventType.CRAWL_DONE,
495 CrawlDoneEvent(pages_crawled=pages_seen, files_written=len(written_paths)),
496 )
498 return written_paths
499 finally:
500 if sem is not None:
501 sem.release()