Coverage for src / lilbee / crawler / crawl4ai_fetcher.py: 100%
138 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
1"""crawl4ai-backed implementation of :class:`lilbee.crawler.fetcher.WebFetcher`.
3THIS IS THE ONLY FILE IN THE PROJECT THAT IMPORTS ``crawl4ai``.
5Swapping to a different web-fetching SDK is a one-file change:
6delete this module, add a replacement that implements
7:class:`lilbee.crawler.fetcher.WebFetcher`, and update the one
8import in :mod:`lilbee.crawler.api`.
9"""
11from __future__ import annotations
13import contextlib
14import inspect
15import io
16import logging
17import math
18from collections.abc import AsyncGenerator, AsyncIterator
19from typing import TYPE_CHECKING, Any
20from urllib.parse import urlparse
22from lilbee.crawler import bootstrap
23from lilbee.crawler.bootstrap import CrawlerBrowserMissing
24from lilbee.crawler.models import (
25 CancelToken,
26 ConcurrencySpec,
27 FetchedPage,
28 FilterSpec,
29)
31if TYPE_CHECKING:
32 from lilbee.crawler.fetcher import WebFetcher
34log = logging.getLogger(__name__)
37def _build_rate_limited_dispatcher(concurrency: ConcurrencySpec) -> Any:
38 """Build a SemaphoreDispatcher + RateLimiter from a ConcurrencySpec, or None.
40 BFSDeepCrawlStrategy calls ``crawler.arun_many()`` without a dispatcher
41 kwarg, so per-domain rate limiting is only reachable by threading a
42 dispatcher through AsyncWebCrawler itself. This helper centralizes the
43 spec read so the TUI / CLI / server all get identical behavior.
44 """
45 if not concurrency.retry_on_rate_limit:
46 return None
47 from crawl4ai.async_dispatcher import RateLimiter, SemaphoreDispatcher
49 rate_limiter = RateLimiter(
50 base_delay=(concurrency.retry_base_delay_min, concurrency.retry_base_delay_max),
51 max_delay=concurrency.retry_max_backoff,
52 max_retries=concurrency.retry_max_attempts,
53 )
54 return SemaphoreDispatcher(
55 semaphore_count=concurrency.semaphore_count,
56 rate_limiter=rate_limiter,
57 )
60class _LilbeeAsyncCrawler:
61 """AsyncWebCrawler wrapper that injects a default dispatcher on arun_many.
63 crawl4ai's BFSDeepCrawlStrategy hard-codes crawler.arun_many(urls, config)
64 without a dispatcher kwarg, so per-domain rate limiting and 429/503 retries
65 can't be wired via CrawlerRunConfig. By giving the crawler a default
66 dispatcher, every strategy-originated arun_many picks it up. An explicit
67 dispatcher= on the call still wins.
68 """
70 def __init__(self, *, verbose: bool, dispatcher: Any) -> None:
71 from crawl4ai import AsyncWebCrawler
73 self._inner = AsyncWebCrawler(verbose=verbose)
74 self._dispatcher = dispatcher
76 async def __aenter__(self) -> _LilbeeAsyncCrawler:
77 await self._inner.__aenter__()
78 return self
80 async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> Any:
81 return await self._inner.__aexit__(exc_type, exc, tb)
83 async def arun(self, *args: Any, **kwargs: Any) -> Any:
84 return await self._inner.arun(*args, **kwargs)
86 async def arun_many(
87 self, urls: Any, config: Any = None, dispatcher: Any = None, **kwargs: Any
88 ) -> Any:
89 return await self._inner.arun_many(
90 urls,
91 config=config,
92 dispatcher=dispatcher if dispatcher is not None else self._dispatcher,
93 **kwargs,
94 )
97@contextlib.asynccontextmanager
98async def _open_crawler(*, quiet: bool = False, dispatcher: Any = None) -> AsyncIterator[Any]:
99 """Open a crawler.
101 Raises :class:`CrawlerBrowserMissing` early if the Chromium binary
102 hasn't been downloaded. Without this guard Playwright prints a full
103 ASCII install banner that leaks into the TUI.
105 When *dispatcher* is provided, wrap AsyncWebCrawler in _LilbeeAsyncCrawler
106 so every strategy-originated arun_many call picks it up. The single-URL
107 path (crawl_single) doesn't need a dispatcher because arun() doesn't accept
108 one, so it passes None and gets a bare AsyncWebCrawler.
109 """
110 if not bootstrap.chromium_installed():
111 raise CrawlerBrowserMissing(
112 "Playwright Chromium browser not installed. "
113 "Run 'uv run playwright install chromium' to enable /crawl."
114 )
116 from crawl4ai import AsyncWebCrawler
118 stdout_ctx = contextlib.redirect_stdout(io.StringIO()) if quiet else contextlib.nullcontext()
119 stderr_ctx = contextlib.redirect_stderr(io.StringIO()) if quiet else contextlib.nullcontext()
120 with stdout_ctx, stderr_ctx:
121 if dispatcher is not None:
122 async with _LilbeeAsyncCrawler(verbose=not quiet, dispatcher=dispatcher) as crawler:
123 yield crawler
124 else:
125 async with AsyncWebCrawler(verbose=not quiet) as crawler:
126 yield crawler
129def _safe_strategy_cancel(strategy: Any) -> None:
130 """Call ``strategy.cancel()`` if available, swallowing the known shapes.
132 BFSDeepCrawlStrategy has ``.cancel()`` in crawl4ai 0.8.6. Older versions or
133 third-party strategies may not. Belt-and-suspenders: should_cancel already
134 gates between BFS levels, but ``cancel()`` also short-circuits arun_many.
136 Narrow catch: ``AttributeError`` covers the rare case where ``cancel``
137 exists but accesses a missing attribute mid-call; ``RuntimeError`` covers
138 cancel-on-closed-strategy. Anything else propagates.
139 """
140 cancel_method = getattr(strategy, "cancel", None)
141 if callable(cancel_method):
142 try:
143 cancel_method()
144 except (AttributeError, RuntimeError) as exc:
145 log.debug("strategy.cancel() raised: %s", exc)
148async def _safe_aclose(stream: Any) -> None:
149 """Close an async generator stream if that is what it is.
151 ``_iter_crawl_stream`` normalizes over async-generator / list / single-result
152 shapes; only the generator shape has an ``aclose()`` to call. A list or
153 single object is a no-op.
154 """
155 if stream is None:
156 return
157 if inspect.isasyncgen(stream):
158 with contextlib.suppress(Exception):
159 await stream.aclose()
162async def _iter_crawl_stream(stream: Any) -> AsyncIterator[Any]:
163 """Normalize crawl4ai's ``arun()`` return to an async iterator.
165 With ``stream=True`` on CrawlerRunConfig, crawl4ai 0.8 returns an async
166 generator. Older call sites and some crawl4ai code paths return a list
167 (batch mode) or a single CrawlResult. Accept all three shapes so tests
168 that mock ``arun()`` with a plain list keep working.
169 """
170 # Three possible shapes from crawl4ai's arun(): async generator (stream=True),
171 # plain list (batch), or a single CrawlResult. Tests mock arun() with any of
172 # the three, so normalize here rather than in each caller.
173 if inspect.isasyncgen(stream):
174 async for item in stream:
175 yield item
176 return
177 # A list is the batch-mode shape; iterate and yield each item.
178 if isinstance(stream, list):
179 for item in stream:
180 yield item
181 return
182 yield stream
185def _host_scope_filter(start_url: str, *, include_subdomains: bool) -> Any:
186 """Build a URLFilter that scopes a crawl to the starting URL's host.
188 Default behavior (``include_subdomains=False``) restricts link-following to
189 the exact host of *start_url*. For ``https://en.wikipedia.org/...`` this
190 excludes ``af.wikipedia.org`` and every other language subdomain.
192 When ``include_subdomains=True``, crawl4ai's DomainFilter matches the host
193 plus any of its subdomains (``foo.example.com`` matches ``example.com``),
194 which is the loose "whole registrable domain" behavior users may want.
195 """
196 from crawl4ai.deep_crawling.filters import DomainFilter, URLFilter
198 host = (urlparse(start_url).hostname or "").lower()
199 if include_subdomains:
200 return DomainFilter(allowed_domains=host) if host else None
202 class _ExactHostFilter(URLFilter): # type: ignore[misc]
203 def __init__(self, allowed_host: str) -> None:
204 super().__init__()
205 self._host = allowed_host
207 def apply(self, url: str) -> bool:
208 link_host = (urlparse(url).hostname or "").lower()
209 ok = link_host == self._host
210 self._update_stats(ok)
211 return ok
213 return _ExactHostFilter(host) if host else None
216class Crawl4aiFetcher:
217 """:class:`WebFetcher` implementation backed by crawl4ai.
219 Migrating off crawl4ai means replacing this class with another
220 :class:`WebFetcher` implementor (e.g. a ``KreuzcrawlFetcher``) and
221 updating the one construction site in :mod:`lilbee.crawler.api`.
222 """
224 def __init__(self, *, quiet: bool = False) -> None:
225 self._quiet = quiet
227 async def __aenter__(self) -> Crawl4aiFetcher:
228 # Crawl4ai opens a fresh ``AsyncWebCrawler`` per operation because
229 # ``fetch_recursive`` needs a per-call dispatcher (which depends on
230 # the :class:`ConcurrencySpec` for that call). Nothing to set up here.
231 return self
233 async def __aexit__(
234 self,
235 exc_type: type[BaseException] | None,
236 exc: BaseException | None,
237 tb: Any,
238 ) -> None:
239 return None
241 async def fetch_single(self, url: str, *, timeout: float) -> FetchedPage:
242 """Fetch a single URL via crawl4ai's ``arun``."""
243 from crawl4ai import CrawlerRunConfig
245 config = CrawlerRunConfig(page_timeout=int(timeout * 1000))
246 async with _open_crawler(quiet=self._quiet) as crawler:
247 result = await crawler.arun(url=url, config=config)
248 markdown = (result.markdown or "").strip()
249 if markdown:
250 return FetchedPage(url=url, markdown=markdown, success=True)
251 return FetchedPage(
252 url=url,
253 success=False,
254 error=result.error_message or "No content extracted",
255 )
257 async def fetch_recursive(
258 self,
259 seed_url: str,
260 *,
261 depth: int | None,
262 max_pages: int | None,
263 timeout: float,
264 concurrency: ConcurrencySpec,
265 filters: FilterSpec,
266 cancel: CancelToken | None = None,
267 ) -> AsyncGenerator[FetchedPage, None]:
268 """Stream pages discovered by crawl4ai's native BFS.
270 ``depth`` / ``max_pages`` of ``None`` mean unbounded; the adapter
271 translates to ``math.inf`` for crawl4ai's BFSDeepCrawlStrategy, which
272 is the sentinel it understands.
273 """
275 def _should_cancel() -> bool:
276 return cancel is not None and cancel.is_set()
278 from crawl4ai import CrawlerRunConfig
279 from crawl4ai.deep_crawling import BFSDeepCrawlStrategy
280 from crawl4ai.deep_crawling.filters import FilterChain, URLPatternFilter
282 filter_chain_items: list[Any] = []
283 host_filter = _host_scope_filter(seed_url, include_subdomains=filters.include_subdomains)
284 if host_filter is not None:
285 filter_chain_items.append(host_filter)
286 if filters.exclude_patterns:
287 filter_chain_items.append(
288 URLPatternFilter(filters.exclude_patterns, use_glob=False, reverse=True)
289 )
290 filter_chain = FilterChain(filter_chain_items) if filter_chain_items else FilterChain()
292 strategy = BFSDeepCrawlStrategy(
293 max_depth=math.inf if depth is None else depth,
294 max_pages=math.inf if max_pages is None else max_pages,
295 should_cancel=_should_cancel,
296 filter_chain=filter_chain,
297 )
298 config = CrawlerRunConfig(
299 deep_crawl_strategy=strategy,
300 page_timeout=int(timeout * 1000),
301 mean_delay=concurrency.mean_delay,
302 max_range=concurrency.max_delay_range,
303 semaphore_count=concurrency.semaphore_count,
304 stream=True,
305 )
307 dispatcher = _build_rate_limited_dispatcher(concurrency)
308 stream: Any = None
309 strategy_cancelled = False
310 # Exceptions propagate to the orchestration layer, which decides
311 # whether to log cancel-teardown noise at debug vs surface a real
312 # failure. The adapter's only housekeeping is stream close + BFS
313 # strategy cancel so Playwright tears down in order.
314 async with _open_crawler(quiet=self._quiet, dispatcher=dispatcher) as crawler:
315 stream = await crawler.arun(url=seed_url, config=config)
316 try:
317 async for cr in _iter_crawl_stream(stream):
318 if _should_cancel():
319 _safe_strategy_cancel(strategy)
320 strategy_cancelled = True
321 break
322 if cr.success:
323 yield FetchedPage(url=cr.url, markdown=cr.markdown or "")
324 else:
325 yield FetchedPage(
326 url=cr.url,
327 success=False,
328 error=cr.error_message or "Unknown error",
329 )
330 finally:
331 # If the consumer breaks out before we saw a cancel, still
332 # short-circuit the BFS strategy so any in-flight arun_many
333 # batch stops dispatching. Mirrors the orchestrator's
334 # previous "hard cap on visible counter" behavior now that
335 # the strategy object lives inside the adapter.
336 if not strategy_cancelled:
337 _safe_strategy_cancel(strategy)
338 # Close the async generator (if it is one) before the
339 # crawler context exits, so Playwright tears down
340 # in-flight URLs in order. Skipping this is what produced
341 # the "BrowserContext.new_page: Connection closed" spam
342 # on cancel.
343 await _safe_aclose(stream)
346# Protocol conformance check: Crawl4aiFetcher is structurally a WebFetcher.
347# We don't instantiate at import time so the check stays purely structural.
348if TYPE_CHECKING:
349 _: WebFetcher = Crawl4aiFetcher()
352def crawler_available() -> bool:
353 """Check if the crawl4ai backend is importable (i.e. the extra is installed)."""
354 try:
355 import crawl4ai # noqa: F401
357 return True
358 except ImportError:
359 return False