Coverage for src / lilbee / crawler / api.py: 100%

204 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-29 19:16 +0000

1"""Thin orchestration layer: builds specs from ``cfg``, drives a :class:`WebFetcher`. 

2 

3No crawl4ai imports. All backend-specific knowledge lives in 

4:mod:`lilbee.crawler.crawl4ai_fetcher`; this module only decides 

5*what* to crawl (depth/pages/filters/concurrency) and *where* to 

6put the bytes (per-page flush + metadata). Callers (CLI, MCP, 

7HTTP, TUI) import these functions via the package façade in 

8``lilbee.crawler.__init__``. 

9""" 

10 

11from __future__ import annotations 

12 

13import asyncio 

14import inspect 

15import logging 

16import math 

17import threading 

18import time 

19from collections.abc import Callable 

20from datetime import UTC, datetime 

21from pathlib import Path 

22from typing import Any 

23 

24from lilbee.config import cfg 

25from lilbee.crawler import bootstrap, save, sitemap 

26from lilbee.crawler.bootstrap import CrawlerBrowserMissing 

27from lilbee.crawler.crawl4ai_fetcher import Crawl4aiFetcher 

28from lilbee.crawler.models import ( 

29 ConcurrencySpec, 

30 CrawlResult, 

31 FetchedPage, 

32 FilterSpec, 

33) 

34from lilbee.crawler.save import METADATA_FLUSH_INTERVAL, CrawlMeta 

35from lilbee.crawler.url_filter import validate_crawl_url 

36from lilbee.progress import ( 

37 CrawlDoneEvent, 

38 CrawlPageEvent, 

39 CrawlStartEvent, 

40 DetailedProgressCallback, 

41 EventType, 

42) 

43 

44log = logging.getLogger(__name__) 

45 

46 

47class CrawlerState: 

48 """Per-process mutable state for the crawler (semaphore, periodic sync tracking). 

49 

50 Encapsulates state that would otherwise live as bare module-level globals. 

51 A single module-level instance (``_state``) is used because this state is 

52 inherently per-process (threading primitives, asyncio tasks tied to the 

53 running loop). Test isolation is via :meth:`reset`. 

54 """ 

55 

56 def __init__(self) -> None: 

57 self.semaphore: asyncio.Semaphore | None = None 

58 self.semaphore_limit: int = 0 

59 self.last_sync_time: float = 0.0 

60 self.sync_running: threading.Lock = threading.Lock() 

61 self.background_tasks: set[asyncio.Task[None]] = set() 

62 

63 def reset(self) -> None: 

64 """Reset all state (useful for testing).""" 

65 self.semaphore = None 

66 self.semaphore_limit = 0 

67 self.last_sync_time = 0.0 

68 self.sync_running = threading.Lock() 

69 self.background_tasks = set() 

70 

71 

72_state = CrawlerState() 

73 

74 

75def _get_crawl_semaphore() -> asyncio.Semaphore | None: 

76 """Return an asyncio semaphore for crawl concurrency, or None if unlimited (0).""" 

77 limit = cfg.crawl_max_concurrent 

78 if limit <= 0: 

79 return None 

80 if _state.semaphore is None or _state.semaphore_limit != limit: 

81 _state.semaphore = asyncio.Semaphore(limit) 

82 _state.semaphore_limit = limit 

83 return _state.semaphore 

84 

85 

86def _resolve_limit(value: int | None, cfg_ceiling: int | None) -> int | None: 

87 """Resolve a caller-provided crawl limit to the number the fetcher consumes. 

88 

89 None -> cfg_ceiling (itself may be None; ``None`` means unbounded) 

90 n > 0 -> n (explicit caller intent; cfg is not a ceiling here) 

91 n <= 0 -> ValueError (use None for unbounded, not 0) 

92 """ 

93 effective = value if value is not None else cfg_ceiling 

94 if effective is None: 

95 return None 

96 if effective <= 0: 

97 raise ValueError("crawl limit must be a positive int or None") 

98 return effective 

99 

100 

101def _build_concurrency_spec() -> ConcurrencySpec: 

102 """Snapshot the crawl-concurrency settings from ``cfg`` into a spec.""" 

103 return ConcurrencySpec( 

104 semaphore_count=cfg.crawl_concurrent_requests, 

105 mean_delay=cfg.crawl_mean_delay, 

106 max_delay_range=cfg.crawl_max_delay_range, 

107 retry_on_rate_limit=cfg.crawl_retry_on_rate_limit, 

108 retry_base_delay_min=cfg.crawl_retry_base_delay_min, 

109 retry_base_delay_max=cfg.crawl_retry_base_delay_max, 

110 retry_max_backoff=cfg.crawl_retry_max_backoff, 

111 retry_max_attempts=cfg.crawl_retry_max_attempts, 

112 ) 

113 

114 

115def _build_filter_spec(*, include_subdomains: bool) -> FilterSpec: 

116 """Snapshot the filter settings from ``cfg`` + caller flags.""" 

117 return FilterSpec( 

118 exclude_patterns=list(cfg.crawl_exclude_patterns), 

119 include_subdomains=include_subdomains, 

120 ) 

121 

122 

123def _fetched_to_result(page: FetchedPage) -> CrawlResult: 

124 """Translate the fetcher's value type to the public ``CrawlResult`` shape.""" 

125 return CrawlResult( 

126 url=page.url, 

127 markdown=page.markdown, 

128 success=page.success, 

129 error=page.error, 

130 ) 

131 

132 

133async def crawl_single(url: str, *, quiet: bool = False) -> CrawlResult: 

134 """Fetch a single URL. 

135 

136 Raises :class:`CrawlerBackendMissing` if the crawler extra isn't installed. 

137 """ 

138 validate_crawl_url(url) 

139 from lilbee.crawler import crawler_available 

140 

141 if not crawler_available(): 

142 raise bootstrap.CrawlerBackendMissing( 

143 "Web crawling is not available. Run 'uv sync --extra crawler' to enable it." 

144 ) 

145 try: 

146 async with Crawl4aiFetcher(quiet=quiet) as fetcher: 

147 page = await fetcher.fetch_single(url, timeout=cfg.crawl_timeout) 

148 return _fetched_to_result(page) 

149 except CrawlerBrowserMissing: 

150 raise 

151 except Exception as exc: 

152 log.warning("Failed to crawl %s: %s", url, exc) 

153 return CrawlResult(url=url, success=False, error=str(exc)) 

154 

155 

156def _pages_cap(pages: int | None) -> float: 

157 """Return the per-result counter ceiling for visible progress. 

158 

159 ``None`` (unbounded) maps to ``math.inf`` so the streaming loop's hard 

160 cap check is a pure numeric compare with no branching. 

161 """ 

162 return math.inf if pages is None else pages 

163 

164 

165async def _drain_page_stream( 

166 page_stream: Any, 

167 *, 

168 on_progress: DetailedProgressCallback | None, 

169 on_result: Callable[[CrawlResult], Any] | None, 

170 sitemap_total: int, 

171 pages_cap: float, 

172 cancel: threading.Event | None, 

173) -> list[CrawlResult]: 

174 """Consume a fetcher's page stream, emitting events and flushing per page. 

175 

176 Returns the accumulated ``CrawlResult`` list. The stream is closed 

177 deterministically by the caller; this helper only iterates. 

178 """ 

179 results: list[CrawlResult] = [] 

180 counter = 0 

181 

182 def _should_cancel() -> bool: 

183 return cancel is not None and cancel.is_set() 

184 

185 async for page in page_stream: 

186 if _should_cancel(): 

187 break 

188 counter += 1 

189 if on_progress: 

190 on_progress( 

191 EventType.CRAWL_PAGE, 

192 CrawlPageEvent(url=page.url, current=counter, total=sitemap_total), 

193 ) 

194 new_result = _fetched_to_result(page) 

195 results.append(new_result) 

196 if on_result is not None: 

197 try: 

198 rv = on_result(new_result) 

199 if inspect.isawaitable(rv): 

200 await rv 

201 except OSError: 

202 # A disk-side flush failure must not masquerade as a crawl 

203 # failure. Log and keep streaming; the caller still sees the 

204 # result in its returned list. 

205 log.exception("Flush callback failed for %s", new_result.url) 

206 # Hard cap on visible progress. The BFS may emit failed / redirected 

207 # pages that push the per-result counter past the cap even after the 

208 # strategy has stopped dispatching. Break explicitly so the 

209 # user-visible count never exceeds the number the caller asked for. 

210 if counter >= pages_cap: 

211 break 

212 return results 

213 

214 

215def _handle_crawl_teardown_error( 

216 url: str, 

217 exc: Exception, 

218 *, 

219 cancel: threading.Event | None, 

220 results: list[CrawlResult], 

221) -> None: 

222 """Classify a recursive-crawl exception: cancel-teardown vs real failure. 

223 

224 After cancel, crawl4ai may raise BrowserContext teardown errors as 

225 in-flight URLs bail. That's expected noise, not a failure worth 

226 surfacing. Otherwise, log and append a synthetic error result (only 

227 when nothing was produced so callers always see at least one entry). 

228 """ 

229 cancelled = cancel is not None and cancel.is_set() 

230 if cancelled: 

231 log.debug("Recursive crawl of %s ended during cancel teardown: %s", url, exc) 

232 return 

233 log.warning("Recursive crawl of %s failed: %s", url, exc) 

234 if not results: 

235 results.append(CrawlResult(url=url, success=False, error=str(exc))) 

236 

237 

238async def crawl_recursive( 

239 url: str, 

240 max_depth: int | None = None, 

241 max_pages: int | None = None, 

242 on_progress: DetailedProgressCallback | None = None, 

243 cancel: threading.Event | None = None, 

244 *, 

245 quiet: bool = False, 

246 include_subdomains: bool = False, 

247 on_result: Callable[[CrawlResult], Any] | None = None, 

248) -> list[CrawlResult]: 

249 """Crawl a URL recursively using BFS, streaming per-page progress. 

250 

251 None values for ``max_depth`` / ``max_pages`` mean unbounded (constrained 

252 only by whatever ceiling the user has set in ``cfg.crawl_max_{depth,pages}``, 

253 if any). Positive ints are explicit caps. ``CRAWL_PAGE`` events fire as 

254 each page completes; total is ``CRAWL_TOTAL_UNKNOWN`` by default and 

255 promoted to the sitemap count when available. 

256 

257 By default the crawl is scoped to the exact starting host so a Wikipedia 

258 article doesn't wander into other language editions. Pass 

259 ``include_subdomains=True`` to broaden scope to the starting host plus any 

260 subdomains (e.g. ``en.wikipedia.org`` plus ``af.wikipedia.org``). 

261 

262 If ``on_result`` is provided, it's called for each streamed ``CrawlResult`` 

263 the moment it arrives (before the next page yields). Callers use this to 

264 flush pages to disk incrementally so a cancelled crawl keeps its partial 

265 output. 

266 """ 

267 validate_crawl_url(url) 

268 depth = _resolve_limit(max_depth, cfg.crawl_max_depth) 

269 pages = _resolve_limit(max_pages, cfg.crawl_max_pages) 

270 

271 # Fail fast when the ``crawler`` extra wasn't installed so SSE 

272 # callers see ``event: error`` instead of a silent zero-results run. 

273 from lilbee.crawler import crawler_available 

274 

275 if not crawler_available(): 

276 raise bootstrap.CrawlerBackendMissing( 

277 "Web crawling is not available. Run 'uv sync --extra crawler' to enable it." 

278 ) 

279 

280 # Fail fast before pulling in backend submodules so callers get a clean 

281 # CrawlerBrowserMissing instead of a Playwright install banner. 

282 if not bootstrap.chromium_installed(): 

283 raise CrawlerBrowserMissing( 

284 "Playwright Chromium browser not installed. " 

285 "Run 'uv run playwright install chromium' to enable /crawl." 

286 ) 

287 

288 # Best-effort sitemap lookup so the TUI / CLI can render a real page-count 

289 # denominator instead of [n/-1]. Falls back to CRAWL_TOTAL_UNKNOWN on any 

290 # failure; off the hot path so a slow/missing sitemap never blocks the crawl. 

291 sitemap_total = await asyncio.to_thread( 

292 sitemap._count_sitemap_urls, url, include_subdomains=include_subdomains 

293 ) 

294 

295 concurrency = _build_concurrency_spec() 

296 filters = _build_filter_spec(include_subdomains=include_subdomains) 

297 

298 results: list[CrawlResult] = [] 

299 try: 

300 async with Crawl4aiFetcher(quiet=quiet) as fetcher: 

301 # Hold an explicit reference to the generator so we can aclose 

302 # it deterministically on break. Without this, the generator's 

303 # finally block (which also short-circuits the BFS strategy) only 

304 # runs at gc time, which is too late for callers that expect the 

305 # strategy to stop the moment we hit ``max_pages``. 

306 page_stream = fetcher.fetch_recursive( 

307 url, 

308 depth=depth, 

309 max_pages=pages, 

310 timeout=cfg.crawl_timeout, 

311 concurrency=concurrency, 

312 filters=filters, 

313 cancel=cancel, 

314 ) 

315 try: 

316 results = await _drain_page_stream( 

317 page_stream, 

318 on_progress=on_progress, 

319 on_result=on_result, 

320 sitemap_total=sitemap_total, 

321 pages_cap=_pages_cap(pages), 

322 cancel=cancel, 

323 ) 

324 finally: 

325 await page_stream.aclose() 

326 except CrawlerBrowserMissing: 

327 raise 

328 except Exception as exc: 

329 _handle_crawl_teardown_error(url, exc, cancel=cancel, results=results) 

330 

331 return results 

332 

333 

334async def _maybe_periodic_sync() -> None: 

335 """Fire off a background sync if the ``crawl_sync_interval`` has elapsed. 

336 

337 Skips if a sync is already running or periodic sync is disabled 

338 (``interval=0``). Uses a ``threading.Lock`` to avoid asyncio 

339 event-loop binding issues when called from different loops. 

340 """ 

341 interval = cfg.crawl_sync_interval 

342 if interval <= 0 or not _state.sync_running.acquire(blocking=False): 

343 return 

344 

345 now = time.monotonic() 

346 if now - _state.last_sync_time < interval: 

347 _state.sync_running.release() 

348 return 

349 

350 _state.last_sync_time = now 

351 

352 async def _run_sync() -> None: 

353 try: 

354 from lilbee.ingest import sync 

355 

356 await sync(quiet=True) 

357 except Exception as exc: 

358 log.warning("Periodic sync during crawl failed: %s", exc) 

359 finally: 

360 _state.sync_running.release() 

361 

362 task = asyncio.create_task(_run_sync()) 

363 _state.background_tasks.add(task) 

364 task.add_done_callback(_state.background_tasks.discard) 

365 

366 

367def _make_flush_page( 

368 meta: dict[str, CrawlMeta], 

369 written_paths: list[Path], 

370 counter: dict[str, int], 

371) -> Callable[[CrawlResult], Any]: 

372 """Build a per-result flush closure that batches metadata writes. 

373 

374 Filesystem work runs through ``asyncio.to_thread`` so the streaming 

375 event loop isn't blocked by per-page writes on slow filesystems. 

376 

377 ``counter`` is a single-entry dict used as a mutable int so the closure 

378 can share counter state with the caller without nonlocal gymnastics. 

379 """ 

380 

381 def _sync_flush(result: CrawlResult) -> Path | None: 

382 outcome = save._save_single_result(result, meta) 

383 if outcome is None: 

384 return None 

385 save._update_single_metadata(meta, result.url, outcome, datetime.now(UTC).isoformat()) 

386 counter["pending"] += 1 

387 if counter["pending"] >= METADATA_FLUSH_INTERVAL: 

388 save.save_crawl_metadata(meta) 

389 counter["pending"] = 0 

390 return outcome.path 

391 

392 async def flush_page(result: CrawlResult) -> Path | None: 

393 path = await asyncio.to_thread(_sync_flush, result) 

394 if path is not None: 

395 written_paths.append(path) 

396 return path 

397 

398 return flush_page 

399 

400 

401async def crawl_and_save( 

402 url: str, 

403 *, 

404 depth: int | None = None, 

405 max_pages: int | None = None, 

406 on_progress: DetailedProgressCallback | None = None, 

407 cancel: threading.Event | None = None, 

408 quiet: bool = False, 

409 include_subdomains: bool = False, 

410) -> list[Path]: 

411 """Crawl URL(s), save as markdown, update metadata. Returns paths written. 

412 

413 ``depth``: ``None`` = whole-site unbounded recursion (default). ``0`` = 

414 single URL, no recursion. ``N > 0`` = max link-follow depth. 

415 ``max_pages``: ``None`` = no limit. Positive int = cap. 

416 ``cfg.crawl_max_{depth,pages}`` act as user-opted-in ceilings applied only 

417 when ``depth``/``max_pages`` are ``None``. 

418 

419 When recursing, the crawl is scoped to the exact starting host by default. 

420 Set ``include_subdomains=True`` to also follow links into sibling 

421 subdomains of the starting host. 

422 

423 Uses hash-based change detection: always fetches, but only saves files 

424 whose content has changed (or is new). Pages are flushed to disk as they 

425 stream so a cancelled crawl preserves the pages already fetched instead 

426 of discarding them. 

427 """ 

428 # Reject early when the crawler extra isn't installed. Runs before the 

429 # Chromium bootstrap so a user without [crawler] doesn't pay the ~160 MB 

430 # download just to hit the same error afterward. 

431 from lilbee.crawler import crawler_available 

432 

433 if not crawler_available(): 

434 raise bootstrap.CrawlerBackendMissing( 

435 "Web crawling is not available. Run 'uv sync --extra crawler' to enable it." 

436 ) 

437 

438 # Auto-bootstrap Chromium on first use so every crawl entry point works 

439 # on a fresh install without a separate setup step. ``bootstrap_chromium`` 

440 # short-circuits when Chromium is already installed. Any progress is 

441 # forwarded through the same ``on_progress`` callback so downstream UIs 

442 # surface a 'setup' stage before the crawl events. 

443 if not bootstrap.chromium_installed(): 

444 await bootstrap.bootstrap_chromium(on_progress=on_progress) 

445 

446 sem = _get_crawl_semaphore() 

447 if sem is not None: 

448 await sem.acquire() 

449 try: 

450 if on_progress: 

451 start_depth = depth if depth is not None else 0 

452 on_progress(EventType.CRAWL_START, CrawlStartEvent(url=url, depth=start_depth)) 

453 

454 meta = save.load_crawl_metadata() 

455 written_paths: list[Path] = [] 

456 pages_seen = 0 

457 counter = {"pending": 0} 

458 flush_page = _make_flush_page(meta, written_paths, counter) 

459 

460 if depth == 0: 

461 result = await crawl_single(url, quiet=quiet) 

462 pages_seen = 1 

463 try: 

464 await flush_page(result) 

465 except OSError: 

466 log.exception("Flush failed for %s", result.url) 

467 if on_progress: 

468 on_progress(EventType.CRAWL_PAGE, CrawlPageEvent(url=url, current=1, total=1)) 

469 else: 

470 results = await crawl_recursive( 

471 url, 

472 max_depth=depth, 

473 max_pages=max_pages, 

474 on_progress=on_progress, 

475 cancel=cancel, 

476 quiet=quiet, 

477 include_subdomains=include_subdomains, 

478 on_result=flush_page, 

479 ) 

480 pages_seen = len(results) 

481 

482 if counter["pending"] > 0: 

483 try: 

484 save.save_crawl_metadata(meta) 

485 except OSError: 

486 log.exception("Final metadata flush failed") 

487 

488 cancelled = cancel is not None and cancel.is_set() 

489 if not cancelled: 

490 await _maybe_periodic_sync() 

491 

492 if on_progress: 

493 on_progress( 

494 EventType.CRAWL_DONE, 

495 CrawlDoneEvent(pages_crawled=pages_seen, files_written=len(written_paths)), 

496 ) 

497 

498 return written_paths 

499 finally: 

500 if sem is not None: 

501 sem.release()