Coverage for src / lilbee / crawler / crawl4ai_fetcher.py: 100%

138 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-29 19:16 +0000

1"""crawl4ai-backed implementation of :class:`lilbee.crawler.fetcher.WebFetcher`. 

2 

3THIS IS THE ONLY FILE IN THE PROJECT THAT IMPORTS ``crawl4ai``. 

4 

5Swapping to a different web-fetching SDK is a one-file change: 

6delete this module, add a replacement that implements 

7:class:`lilbee.crawler.fetcher.WebFetcher`, and update the one 

8import in :mod:`lilbee.crawler.api`. 

9""" 

10 

11from __future__ import annotations 

12 

13import contextlib 

14import inspect 

15import io 

16import logging 

17import math 

18from collections.abc import AsyncGenerator, AsyncIterator 

19from typing import TYPE_CHECKING, Any 

20from urllib.parse import urlparse 

21 

22from lilbee.crawler import bootstrap 

23from lilbee.crawler.bootstrap import CrawlerBrowserMissing 

24from lilbee.crawler.models import ( 

25 CancelToken, 

26 ConcurrencySpec, 

27 FetchedPage, 

28 FilterSpec, 

29) 

30 

31if TYPE_CHECKING: 

32 from lilbee.crawler.fetcher import WebFetcher 

33 

34log = logging.getLogger(__name__) 

35 

36 

37def _build_rate_limited_dispatcher(concurrency: ConcurrencySpec) -> Any: 

38 """Build a SemaphoreDispatcher + RateLimiter from a ConcurrencySpec, or None. 

39 

40 BFSDeepCrawlStrategy calls ``crawler.arun_many()`` without a dispatcher 

41 kwarg, so per-domain rate limiting is only reachable by threading a 

42 dispatcher through AsyncWebCrawler itself. This helper centralizes the 

43 spec read so the TUI / CLI / server all get identical behavior. 

44 """ 

45 if not concurrency.retry_on_rate_limit: 

46 return None 

47 from crawl4ai.async_dispatcher import RateLimiter, SemaphoreDispatcher 

48 

49 rate_limiter = RateLimiter( 

50 base_delay=(concurrency.retry_base_delay_min, concurrency.retry_base_delay_max), 

51 max_delay=concurrency.retry_max_backoff, 

52 max_retries=concurrency.retry_max_attempts, 

53 ) 

54 return SemaphoreDispatcher( 

55 semaphore_count=concurrency.semaphore_count, 

56 rate_limiter=rate_limiter, 

57 ) 

58 

59 

60class _LilbeeAsyncCrawler: 

61 """AsyncWebCrawler wrapper that injects a default dispatcher on arun_many. 

62 

63 crawl4ai's BFSDeepCrawlStrategy hard-codes crawler.arun_many(urls, config) 

64 without a dispatcher kwarg, so per-domain rate limiting and 429/503 retries 

65 can't be wired via CrawlerRunConfig. By giving the crawler a default 

66 dispatcher, every strategy-originated arun_many picks it up. An explicit 

67 dispatcher= on the call still wins. 

68 """ 

69 

70 def __init__(self, *, verbose: bool, dispatcher: Any) -> None: 

71 from crawl4ai import AsyncWebCrawler 

72 

73 self._inner = AsyncWebCrawler(verbose=verbose) 

74 self._dispatcher = dispatcher 

75 

76 async def __aenter__(self) -> _LilbeeAsyncCrawler: 

77 await self._inner.__aenter__() 

78 return self 

79 

80 async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> Any: 

81 return await self._inner.__aexit__(exc_type, exc, tb) 

82 

83 async def arun(self, *args: Any, **kwargs: Any) -> Any: 

84 return await self._inner.arun(*args, **kwargs) 

85 

86 async def arun_many( 

87 self, urls: Any, config: Any = None, dispatcher: Any = None, **kwargs: Any 

88 ) -> Any: 

89 return await self._inner.arun_many( 

90 urls, 

91 config=config, 

92 dispatcher=dispatcher if dispatcher is not None else self._dispatcher, 

93 **kwargs, 

94 ) 

95 

96 

97@contextlib.asynccontextmanager 

98async def _open_crawler(*, quiet: bool = False, dispatcher: Any = None) -> AsyncIterator[Any]: 

99 """Open a crawler. 

100 

101 Raises :class:`CrawlerBrowserMissing` early if the Chromium binary 

102 hasn't been downloaded. Without this guard Playwright prints a full 

103 ASCII install banner that leaks into the TUI. 

104 

105 When *dispatcher* is provided, wrap AsyncWebCrawler in _LilbeeAsyncCrawler 

106 so every strategy-originated arun_many call picks it up. The single-URL 

107 path (crawl_single) doesn't need a dispatcher because arun() doesn't accept 

108 one, so it passes None and gets a bare AsyncWebCrawler. 

109 """ 

110 if not bootstrap.chromium_installed(): 

111 raise CrawlerBrowserMissing( 

112 "Playwright Chromium browser not installed. " 

113 "Run 'uv run playwright install chromium' to enable /crawl." 

114 ) 

115 

116 from crawl4ai import AsyncWebCrawler 

117 

118 stdout_ctx = contextlib.redirect_stdout(io.StringIO()) if quiet else contextlib.nullcontext() 

119 stderr_ctx = contextlib.redirect_stderr(io.StringIO()) if quiet else contextlib.nullcontext() 

120 with stdout_ctx, stderr_ctx: 

121 if dispatcher is not None: 

122 async with _LilbeeAsyncCrawler(verbose=not quiet, dispatcher=dispatcher) as crawler: 

123 yield crawler 

124 else: 

125 async with AsyncWebCrawler(verbose=not quiet) as crawler: 

126 yield crawler 

127 

128 

129def _safe_strategy_cancel(strategy: Any) -> None: 

130 """Call ``strategy.cancel()`` if available, swallowing the known shapes. 

131 

132 BFSDeepCrawlStrategy has ``.cancel()`` in crawl4ai 0.8.6. Older versions or 

133 third-party strategies may not. Belt-and-suspenders: should_cancel already 

134 gates between BFS levels, but ``cancel()`` also short-circuits arun_many. 

135 

136 Narrow catch: ``AttributeError`` covers the rare case where ``cancel`` 

137 exists but accesses a missing attribute mid-call; ``RuntimeError`` covers 

138 cancel-on-closed-strategy. Anything else propagates. 

139 """ 

140 cancel_method = getattr(strategy, "cancel", None) 

141 if callable(cancel_method): 

142 try: 

143 cancel_method() 

144 except (AttributeError, RuntimeError) as exc: 

145 log.debug("strategy.cancel() raised: %s", exc) 

146 

147 

148async def _safe_aclose(stream: Any) -> None: 

149 """Close an async generator stream if that is what it is. 

150 

151 ``_iter_crawl_stream`` normalizes over async-generator / list / single-result 

152 shapes; only the generator shape has an ``aclose()`` to call. A list or 

153 single object is a no-op. 

154 """ 

155 if stream is None: 

156 return 

157 if inspect.isasyncgen(stream): 

158 with contextlib.suppress(Exception): 

159 await stream.aclose() 

160 

161 

162async def _iter_crawl_stream(stream: Any) -> AsyncIterator[Any]: 

163 """Normalize crawl4ai's ``arun()`` return to an async iterator. 

164 

165 With ``stream=True`` on CrawlerRunConfig, crawl4ai 0.8 returns an async 

166 generator. Older call sites and some crawl4ai code paths return a list 

167 (batch mode) or a single CrawlResult. Accept all three shapes so tests 

168 that mock ``arun()`` with a plain list keep working. 

169 """ 

170 # Three possible shapes from crawl4ai's arun(): async generator (stream=True), 

171 # plain list (batch), or a single CrawlResult. Tests mock arun() with any of 

172 # the three, so normalize here rather than in each caller. 

173 if inspect.isasyncgen(stream): 

174 async for item in stream: 

175 yield item 

176 return 

177 # A list is the batch-mode shape; iterate and yield each item. 

178 if isinstance(stream, list): 

179 for item in stream: 

180 yield item 

181 return 

182 yield stream 

183 

184 

185def _host_scope_filter(start_url: str, *, include_subdomains: bool) -> Any: 

186 """Build a URLFilter that scopes a crawl to the starting URL's host. 

187 

188 Default behavior (``include_subdomains=False``) restricts link-following to 

189 the exact host of *start_url*. For ``https://en.wikipedia.org/...`` this 

190 excludes ``af.wikipedia.org`` and every other language subdomain. 

191 

192 When ``include_subdomains=True``, crawl4ai's DomainFilter matches the host 

193 plus any of its subdomains (``foo.example.com`` matches ``example.com``), 

194 which is the loose "whole registrable domain" behavior users may want. 

195 """ 

196 from crawl4ai.deep_crawling.filters import DomainFilter, URLFilter 

197 

198 host = (urlparse(start_url).hostname or "").lower() 

199 if include_subdomains: 

200 return DomainFilter(allowed_domains=host) if host else None 

201 

202 class _ExactHostFilter(URLFilter): # type: ignore[misc] 

203 def __init__(self, allowed_host: str) -> None: 

204 super().__init__() 

205 self._host = allowed_host 

206 

207 def apply(self, url: str) -> bool: 

208 link_host = (urlparse(url).hostname or "").lower() 

209 ok = link_host == self._host 

210 self._update_stats(ok) 

211 return ok 

212 

213 return _ExactHostFilter(host) if host else None 

214 

215 

216class Crawl4aiFetcher: 

217 """:class:`WebFetcher` implementation backed by crawl4ai. 

218 

219 Migrating off crawl4ai means replacing this class with another 

220 :class:`WebFetcher` implementor (e.g. a ``KreuzcrawlFetcher``) and 

221 updating the one construction site in :mod:`lilbee.crawler.api`. 

222 """ 

223 

224 def __init__(self, *, quiet: bool = False) -> None: 

225 self._quiet = quiet 

226 

227 async def __aenter__(self) -> Crawl4aiFetcher: 

228 # Crawl4ai opens a fresh ``AsyncWebCrawler`` per operation because 

229 # ``fetch_recursive`` needs a per-call dispatcher (which depends on 

230 # the :class:`ConcurrencySpec` for that call). Nothing to set up here. 

231 return self 

232 

233 async def __aexit__( 

234 self, 

235 exc_type: type[BaseException] | None, 

236 exc: BaseException | None, 

237 tb: Any, 

238 ) -> None: 

239 return None 

240 

241 async def fetch_single(self, url: str, *, timeout: float) -> FetchedPage: 

242 """Fetch a single URL via crawl4ai's ``arun``.""" 

243 from crawl4ai import CrawlerRunConfig 

244 

245 config = CrawlerRunConfig(page_timeout=int(timeout * 1000)) 

246 async with _open_crawler(quiet=self._quiet) as crawler: 

247 result = await crawler.arun(url=url, config=config) 

248 markdown = (result.markdown or "").strip() 

249 if markdown: 

250 return FetchedPage(url=url, markdown=markdown, success=True) 

251 return FetchedPage( 

252 url=url, 

253 success=False, 

254 error=result.error_message or "No content extracted", 

255 ) 

256 

257 async def fetch_recursive( 

258 self, 

259 seed_url: str, 

260 *, 

261 depth: int | None, 

262 max_pages: int | None, 

263 timeout: float, 

264 concurrency: ConcurrencySpec, 

265 filters: FilterSpec, 

266 cancel: CancelToken | None = None, 

267 ) -> AsyncGenerator[FetchedPage, None]: 

268 """Stream pages discovered by crawl4ai's native BFS. 

269 

270 ``depth`` / ``max_pages`` of ``None`` mean unbounded; the adapter 

271 translates to ``math.inf`` for crawl4ai's BFSDeepCrawlStrategy, which 

272 is the sentinel it understands. 

273 """ 

274 

275 def _should_cancel() -> bool: 

276 return cancel is not None and cancel.is_set() 

277 

278 from crawl4ai import CrawlerRunConfig 

279 from crawl4ai.deep_crawling import BFSDeepCrawlStrategy 

280 from crawl4ai.deep_crawling.filters import FilterChain, URLPatternFilter 

281 

282 filter_chain_items: list[Any] = [] 

283 host_filter = _host_scope_filter(seed_url, include_subdomains=filters.include_subdomains) 

284 if host_filter is not None: 

285 filter_chain_items.append(host_filter) 

286 if filters.exclude_patterns: 

287 filter_chain_items.append( 

288 URLPatternFilter(filters.exclude_patterns, use_glob=False, reverse=True) 

289 ) 

290 filter_chain = FilterChain(filter_chain_items) if filter_chain_items else FilterChain() 

291 

292 strategy = BFSDeepCrawlStrategy( 

293 max_depth=math.inf if depth is None else depth, 

294 max_pages=math.inf if max_pages is None else max_pages, 

295 should_cancel=_should_cancel, 

296 filter_chain=filter_chain, 

297 ) 

298 config = CrawlerRunConfig( 

299 deep_crawl_strategy=strategy, 

300 page_timeout=int(timeout * 1000), 

301 mean_delay=concurrency.mean_delay, 

302 max_range=concurrency.max_delay_range, 

303 semaphore_count=concurrency.semaphore_count, 

304 stream=True, 

305 ) 

306 

307 dispatcher = _build_rate_limited_dispatcher(concurrency) 

308 stream: Any = None 

309 strategy_cancelled = False 

310 # Exceptions propagate to the orchestration layer, which decides 

311 # whether to log cancel-teardown noise at debug vs surface a real 

312 # failure. The adapter's only housekeeping is stream close + BFS 

313 # strategy cancel so Playwright tears down in order. 

314 async with _open_crawler(quiet=self._quiet, dispatcher=dispatcher) as crawler: 

315 stream = await crawler.arun(url=seed_url, config=config) 

316 try: 

317 async for cr in _iter_crawl_stream(stream): 

318 if _should_cancel(): 

319 _safe_strategy_cancel(strategy) 

320 strategy_cancelled = True 

321 break 

322 if cr.success: 

323 yield FetchedPage(url=cr.url, markdown=cr.markdown or "") 

324 else: 

325 yield FetchedPage( 

326 url=cr.url, 

327 success=False, 

328 error=cr.error_message or "Unknown error", 

329 ) 

330 finally: 

331 # If the consumer breaks out before we saw a cancel, still 

332 # short-circuit the BFS strategy so any in-flight arun_many 

333 # batch stops dispatching. Mirrors the orchestrator's 

334 # previous "hard cap on visible counter" behavior now that 

335 # the strategy object lives inside the adapter. 

336 if not strategy_cancelled: 

337 _safe_strategy_cancel(strategy) 

338 # Close the async generator (if it is one) before the 

339 # crawler context exits, so Playwright tears down 

340 # in-flight URLs in order. Skipping this is what produced 

341 # the "BrowserContext.new_page: Connection closed" spam 

342 # on cancel. 

343 await _safe_aclose(stream) 

344 

345 

346# Protocol conformance check: Crawl4aiFetcher is structurally a WebFetcher. 

347# We don't instantiate at import time so the check stays purely structural. 

348if TYPE_CHECKING: 

349 _: WebFetcher = Crawl4aiFetcher() 

350 

351 

352def crawler_available() -> bool: 

353 """Check if the crawl4ai backend is importable (i.e. the extra is installed).""" 

354 try: 

355 import crawl4ai # noqa: F401 

356 

357 return True 

358 except ImportError: 

359 return False