Coverage for src / lilbee / crawl_task.py: 100%

88 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-29 19:16 +0000

1"""Background crawl task management — start, track, and query crawl operations.""" 

2 

3import asyncio 

4import logging 

5import uuid 

6from dataclasses import dataclass, field 

7from datetime import UTC, datetime 

8from enum import StrEnum 

9 

10from lilbee.crawler import crawl_and_save 

11from lilbee.progress import CrawlPageEvent, DetailedProgressCallback, EventType, ProgressEvent 

12 

13log = logging.getLogger(__name__) 

14 

15# Maximum completed tasks to retain in memory before evicting oldest. 

16_MAX_COMPLETED_TASKS = 100 

17 

18 

19class TaskStatus(StrEnum): 

20 """Lifecycle states for a crawl task.""" 

21 

22 PENDING = "pending" 

23 RUNNING = "running" 

24 DONE = "done" 

25 FAILED = "failed" 

26 

27 

28@dataclass 

29class CrawlTask: 

30 """Tracks a single crawl operation. 

31 

32 depth / max_pages follow the crawl_and_save three-state convention: None = 

33 unbounded, 0 (depth only) = single URL, positive int = explicit cap. 

34 """ 

35 

36 task_id: str 

37 url: str 

38 depth: int | None 

39 max_pages: int | None 

40 status: TaskStatus = TaskStatus.PENDING 

41 pages_crawled: int = 0 

42 pages_total: int | None = None 

43 error: str | None = None 

44 started_at: str = "" 

45 finished_at: str = "" 

46 _async_task: asyncio.Task[None] | None = field(default=None, repr=False, init=False) 

47 

48 

49class TaskRegistry: 

50 """In-memory registry of active and completed crawl tasks. 

51 A single module-level instance (_registry) is used because task tracking 

52 is inherently per-process state (asyncio.Task references, etc.). 

53 """ 

54 

55 def __init__(self) -> None: 

56 self.tasks: dict[str, CrawlTask] = {} 

57 

58 def clear(self) -> None: 

59 """Remove all tasks from the registry.""" 

60 self.tasks.clear() 

61 

62 

63_registry = TaskRegistry() 

64 

65 

66def now_iso() -> str: 

67 """Current UTC time as ISO 8601 string.""" 

68 return datetime.now(UTC).isoformat() 

69 

70 

71def make_progress_updater(task: CrawlTask) -> DetailedProgressCallback: 

72 """Return a progress callback that updates task fields from crawl events.""" 

73 

74 def _on_progress(event_type: EventType, data: ProgressEvent) -> None: 

75 if event_type == EventType.CRAWL_PAGE: 

76 if not isinstance(data, CrawlPageEvent): 

77 raise TypeError(f"Expected CrawlPageEvent, got {type(data).__name__}") 

78 task.pages_crawled = data.current 

79 task.pages_total = data.total 

80 

81 return _on_progress 

82 

83 

84async def run_crawl(task: CrawlTask) -> None: 

85 """Execute crawl, save results, and trigger sync.""" 

86 task.status = TaskStatus.RUNNING 

87 task.started_at = now_iso() 

88 progress = make_progress_updater(task) 

89 

90 try: 

91 paths = await crawl_and_save( 

92 task.url, 

93 depth=task.depth, 

94 max_pages=task.max_pages, 

95 on_progress=progress, 

96 ) 

97 task.status = TaskStatus.DONE 

98 task.pages_crawled = task.pages_crawled or len(paths) 

99 task.finished_at = now_iso() 

100 log.info("Crawl complete: %s → %d files", task.url, len(paths)) 

101 try: 

102 from lilbee.ingest import sync 

103 

104 await sync(quiet=True) 

105 except Exception: 

106 log.warning("Post-crawl sync failed for %s", task.url, exc_info=True) 

107 except Exception as exc: 

108 task.status = TaskStatus.FAILED 

109 task.error = str(exc) 

110 task.finished_at = now_iso() 

111 log.warning("Crawl failed: %s — %s", task.url, exc) 

112 finally: 

113 task._async_task = None 

114 

115 

116def _evict_completed() -> None: 

117 """Remove completed tasks with the earliest ``finished_at`` when over cap. 

118 

119 Finish order diverges from start order whenever short tasks complete 

120 while a long one is still running, so sort on ``finished_at`` 

121 rather than dict insertion order. 

122 """ 

123 done_statuses = (TaskStatus.DONE, TaskStatus.FAILED) 

124 tasks = _registry.tasks 

125 completed = [(tid, t) for tid, t in tasks.items() if t.status in done_statuses] 

126 excess = len(completed) - _MAX_COMPLETED_TASKS 

127 if excess <= 0: 

128 return 

129 completed.sort(key=lambda pair: pair[1].finished_at) 

130 for tid, _ in completed[:excess]: 

131 del tasks[tid] 

132 

133 

134def start_crawl( 

135 url: str, 

136 depth: int | None = None, 

137 max_pages: int | None = None, 

138) -> str: 

139 """Create a crawl task and launch it as an asyncio background task. 

140 

141 Defaults to whole-site unbounded recursion. Pass depth=0 for single URL. 

142 Returns the task_id for status polling. 

143 """ 

144 _evict_completed() 

145 task_id = uuid.uuid4().hex[:12] 

146 task = CrawlTask( 

147 task_id=task_id, 

148 url=url, 

149 depth=depth, 

150 max_pages=max_pages, 

151 ) 

152 _registry.tasks[task_id] = task 

153 task._async_task = asyncio.create_task(run_crawl(task)) 

154 return task_id 

155 

156 

157def get_task(task_id: str) -> CrawlTask | None: 

158 """Look up a crawl task by ID.""" 

159 return _registry.tasks.get(task_id) 

160 

161 

162def list_tasks() -> list[CrawlTask]: 

163 """Return all tracked crawl tasks (active and completed).""" 

164 return list(_registry.tasks.values()) 

165 

166 

167def clear_tasks() -> None: 

168 """Remove all tasks from the registry (for testing).""" 

169 _registry.clear()