Coverage for src / lilbee / crawler / url_filter.py: 100%

36 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-29 19:16 +0000

1"""URL validation, blocked-network checks, and host-scope helpers. 

2 

3Backend-agnostic: no crawl4ai or Playwright imports. The crawl4ai 

4adapter reads :class:`lilbee.crawler.models.FilterSpec` and builds 

5its own ``URLFilter`` / ``DomainFilter`` objects from these inputs, 

6so a future adapter can reuse the same specs directly. 

7""" 

8 

9from __future__ import annotations 

10 

11import ipaddress 

12import socket 

13from urllib.parse import urlparse 

14 

15_BLOCKED_NETWORKS: tuple[ipaddress.IPv4Network | ipaddress.IPv6Network, ...] = ( 

16 ipaddress.ip_network("127.0.0.0/8"), 

17 ipaddress.ip_network("10.0.0.0/8"), 

18 ipaddress.ip_network("172.16.0.0/12"), 

19 ipaddress.ip_network("192.168.0.0/16"), 

20 ipaddress.ip_network("169.254.0.0/16"), 

21 ipaddress.ip_network("::1/128"), 

22) 

23 

24 

25def get_blocked_networks() -> tuple[ipaddress.IPv4Network | ipaddress.IPv6Network, ...]: 

26 """Return blocked network list. Override in tests via monkeypatch.""" 

27 return _BLOCKED_NETWORKS 

28 

29 

30def is_url(value: str) -> bool: 

31 """Check if a string is an HTTP/HTTPS URL.""" 

32 return value.startswith(("http://", "https://")) 

33 

34 

35def validate_crawl_url(url: str) -> None: 

36 """Validate a URL for crawling. Raises ValueError for unsafe URLs. 

37 Rejects private IPs, loopback, link-local, and non-HTTP schemes. 

38 """ 

39 parsed = urlparse(url) 

40 scheme = parsed.scheme.lower() 

41 if scheme not in ("http", "https"): 

42 raise ValueError(f"Only http:// and https:// URLs are allowed, got {scheme}://") 

43 

44 hostname = parsed.hostname 

45 if not hostname: 

46 raise ValueError("URL has no hostname") 

47 

48 try: 

49 addr_infos = socket.getaddrinfo(hostname, None) 

50 except socket.gaierror as exc: 

51 raise ValueError(f"Cannot resolve hostname: {hostname}") from exc 

52 

53 for _family, _type, _proto, _canonname, sockaddr in addr_infos: 

54 ip = ipaddress.ip_address(sockaddr[0]) 

55 for network in get_blocked_networks(): 

56 if ip in network: 

57 raise ValueError(f"Crawling private/reserved IP {ip} is not allowed") 

58 

59 

60def require_valid_crawl_url(url: str) -> None: 

61 """Validate URL for crawling. Raises ValueError if invalid.""" 

62 if not is_url(url): 

63 raise ValueError("URL must start with http:// or https://") 

64 validate_crawl_url(url) 

65 

66 

67def host_in_scope(link_host: str, host: str, *, include_subdomains: bool) -> bool: 

68 """Return True when ``link_host`` should be followed during a whole-site crawl.""" 

69 if not link_host: 

70 return False 

71 if link_host == host: 

72 return True 

73 return include_subdomains and link_host.endswith(f".{host}")