Coverage for src / lilbee / cli / tui / task_queue.py: 100%

204 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-29 19:16 +0000

1"""Per-type concurrent task queue for background operations (downloads, syncs, crawls). 

2 

3Each task type (download, sync, crawl) gets its own independent queue, so a long 

4download does not block a sync from starting. Within a type, tasks run sequentially. 

5""" 

6 

7from __future__ import annotations 

8 

9import logging 

10import threading 

11import time 

12import uuid 

13from collections.abc import Callable 

14from dataclasses import dataclass 

15from enum import StrEnum 

16 

17log = logging.getLogger(__name__) 

18 

19 

20class TaskStatus(StrEnum): 

21 """Lifecycle states for a queued task.""" 

22 

23 QUEUED = "queued" 

24 ACTIVE = "active" 

25 DONE = "done" 

26 FAILED = "failed" 

27 CANCELLED = "cancelled" 

28 

29 

30class TaskType(StrEnum): 

31 """Canonical task types. Replaces raw string literals at call sites.""" 

32 

33 DOWNLOAD = "download" 

34 SYNC = "sync" 

35 CRAWL = "crawl" 

36 WIKI = "wiki" 

37 ADD = "add" 

38 REMOVE = "remove" 

39 SETUP = "setup" 

40 

41 

42STATUS_ICONS: dict[TaskStatus, str] = { 

43 TaskStatus.QUEUED: "⏳", 

44 TaskStatus.ACTIVE: "▶", 

45 TaskStatus.DONE: "✓", 

46 TaskStatus.FAILED: "✗", 

47 TaskStatus.CANCELLED: "⊘", 

48} 

49 

50 

51@dataclass 

52class Task: 

53 """A single unit of work in the queue.""" 

54 

55 task_id: str 

56 name: str 

57 task_type: str 

58 fn: Callable[[], None] 

59 status: TaskStatus = TaskStatus.QUEUED 

60 progress: float = 0.0 

61 detail: str = "" 

62 indeterminate: bool = False 

63 # Monotonic timestamp at which the task transitioned to ACTIVE. None 

64 # while QUEUED. Used by the Task Center row to render elapsed time. 

65 started_at: float | None = None 

66 # Monotonic timestamp at which the task reached a terminal state 

67 # (DONE / FAILED / CANCELLED). None while still running. Used to 

68 # freeze the elapsed-time display so it doesn't keep ticking during 

69 # the 2-second post-finish flash. 

70 completed_at: float | None = None 

71 

72 

73class TaskQueue: 

74 """Per-type concurrent task queue. 

75 Thread-safe. Each task type (download, sync, crawl, etc.) has its own 

76 independent FIFO queue. One task per type can be active simultaneously, 

77 so a download does not block a sync. 

78 

79 Callers receive a *task_id* they can use to update progress, cancel, or 

80 query status. 

81 """ 

82 

83 def __init__( 

84 self, 

85 *, 

86 on_change: Callable[[], None] | None = None, 

87 capacity: dict[str, int] | None = None, 

88 ) -> None: 

89 self._lock = threading.Lock() 

90 self._tasks: dict[str, Task] = {} 

91 self._queues: dict[str, list[str]] = {} 

92 # Per-type set of currently-active task ids. A "type" here means 

93 # sync/crawl/download/wiki; each has its own FIFO and own active slots. 

94 self._active_ids: dict[str, set[str]] = {} 

95 # Max concurrent active tasks per type. Defaults to 1 (single-active). 

96 # Callers override per type (e.g. "download": 2 to allow two concurrent 

97 # model downloads). Types absent from the map implicitly cap at 1. 

98 self._capacity: dict[str, int] = dict(capacity or {}) 

99 self._on_change: list[Callable[[], None]] = [] 

100 if on_change: 

101 self._on_change.append(on_change) 

102 self._history: list[Task] = [] 

103 

104 def _capacity_for(self, task_type: str) -> int: 

105 return self._capacity.get(task_type, 1) 

106 

107 def subscribe(self, callback: Callable[[], None]) -> None: 

108 """Subscribe to task queue changes. Callback is called on any queue update.""" 

109 with self._lock: 

110 if callback not in self._on_change: 

111 self._on_change.append(callback) 

112 

113 def unsubscribe(self, callback: Callable[[], None]) -> None: 

114 """Unsubscribe from task queue changes.""" 

115 with self._lock: 

116 if callback in self._on_change: 

117 self._on_change.remove(callback) 

118 

119 @property 

120 def active_task(self) -> Task | None: 

121 """Return any one active task (for backward compat). Prefer active_tasks.""" 

122 with self._lock: 

123 for ids in self._active_ids.values(): 

124 for tid in ids: 

125 task = self._tasks.get(tid) 

126 if task: 

127 return task 

128 return None 

129 

130 @property 

131 def active_tasks(self) -> list[Task]: 

132 """Return all currently active tasks across all types.""" 

133 with self._lock: 

134 tasks: list[Task] = [] 

135 for ids in self._active_ids.values(): 

136 for tid in ids: 

137 task = self._tasks.get(tid) 

138 if task: 

139 tasks.append(task) 

140 return tasks 

141 

142 @property 

143 def queued_tasks(self) -> list[Task]: 

144 with self._lock: 

145 result: list[Task] = [] 

146 for tids in self._queues.values(): 

147 for tid in tids: 

148 task = self._tasks.get(tid) 

149 if task: 

150 result.append(task) 

151 return result 

152 

153 @property 

154 def history(self) -> list[Task]: 

155 with self._lock: 

156 return list(self._history) 

157 

158 @property 

159 def is_empty(self) -> bool: 

160 with self._lock: 

161 has_active = any(ids for ids in self._active_ids.values()) 

162 has_queued = any(len(q) > 0 for q in self._queues.values()) 

163 return not has_active and not has_queued 

164 

165 def get_task(self, task_id: str) -> Task | None: 

166 """Look up a task by ID. Returns None if not found.""" 

167 with self._lock: 

168 return self._tasks.get(task_id) 

169 

170 def enqueue( 

171 self, 

172 fn: Callable[[], None], 

173 name: str, 

174 task_type: str, 

175 *, 

176 indeterminate: bool = False, 

177 ) -> str: 

178 """Add a task to the per-type queue. Returns a task_id.""" 

179 task_id = uuid.uuid4().hex[:8] 

180 task = Task( 

181 task_id=task_id, name=name, task_type=task_type, fn=fn, indeterminate=indeterminate 

182 ) 

183 with self._lock: 

184 self._tasks[task_id] = task 

185 self._queues.setdefault(task_type, []).append(task_id) 

186 self._notify() 

187 return task_id 

188 

189 def update_task( 

190 self, 

191 task_id: str, 

192 progress: float, 

193 detail: str = "", 

194 *, 

195 indeterminate: bool | None = None, 

196 ) -> None: 

197 """Update progress and detail text for a task. 

198 When *indeterminate* is True the task's progress bar renders as a 

199 pulsing indeterminate bar instead of a percentage. When explicitly 

200 False it returns to determinate mode. ``None`` leaves the flag as-is 

201 so incremental progress updates don't clobber the caller's intent. 

202 """ 

203 with self._lock: 

204 task = self._tasks.get(task_id) 

205 if task: 

206 task.progress = progress 

207 task.detail = detail 

208 if indeterminate is not None: 

209 task.indeterminate = indeterminate 

210 self._notify() 

211 

212 def complete_task(self, task_id: str) -> None: 

213 """Mark a task as done and append it to history. 

214 

215 The task record stays in ``_tasks`` so callers can still look it 

216 up by id. Bulk removal happens in ``clear_history`` or targeted 

217 removal via ``remove_task``. 

218 """ 

219 with self._lock: 

220 task = self._tasks.get(task_id) 

221 if task: 

222 task.status = TaskStatus.DONE 

223 task.progress = 100 

224 task.indeterminate = False 

225 task.completed_at = time.monotonic() 

226 self._history.append(task) 

227 self._remove_from_active_locked(task_id, task.task_type) 

228 self._remove_from_queue_locked(task_id, task.task_type) 

229 self._notify() 

230 

231 def fail_task(self, task_id: str, detail: str = "") -> None: 

232 """Mark a task as failed and append it to history. 

233 

234 The task record stays in ``_tasks`` so callers can still inspect 

235 its detail. Bulk removal happens in ``clear_history`` or 

236 targeted removal via ``remove_task``. 

237 """ 

238 with self._lock: 

239 task = self._tasks.get(task_id) 

240 if task: 

241 task.status = TaskStatus.FAILED 

242 task.detail = detail 

243 task.completed_at = time.monotonic() 

244 self._history.append(task) 

245 self._remove_from_active_locked(task_id, task.task_type) 

246 self._remove_from_queue_locked(task_id, task.task_type) 

247 self._notify() 

248 

249 def cancel(self, task_id: str) -> bool: 

250 """Cancel a queued or active task. Returns True if cancelled. 

251 

252 Terminal rows (DONE / FAILED / CANCELLED) are immutable: a cancel 

253 call against an already-finished task is a no-op and returns 

254 False so callers that key off the return value (e.g. UI actions) 

255 don't treat it as success. 

256 

257 Appends the cancelled task to ``_history`` so the Task Center 

258 renders it as a lingering ``cancelled`` row, matching the 

259 post-flash contract for DONE and FAILED. 

260 """ 

261 with self._lock: 

262 task = self._tasks.get(task_id) 

263 if not task: 

264 return False 

265 if task.status in (TaskStatus.DONE, TaskStatus.FAILED, TaskStatus.CANCELLED): 

266 return False 

267 task.status = TaskStatus.CANCELLED 

268 task.completed_at = time.monotonic() 

269 self._history.append(task) 

270 self._remove_from_active_locked(task_id, task.task_type) 

271 self._remove_from_queue_locked(task_id, task.task_type) 

272 self._notify() 

273 return True 

274 

275 def advance(self, task_type: str | None = None) -> Task | None: 

276 """Pop the next queued task of this type and mark it active. 

277 If *task_type* is given, only advance that type's queue. 

278 If omitted, advance any type that still has a free slot. 

279 Respects the per-type capacity: returns None once all slots are full. 

280 """ 

281 advanced: Task | None = None 

282 with self._lock: 

283 types = [task_type] if task_type else list(self._queues.keys()) 

284 for tt in types: 

285 active = self._active_ids.setdefault(tt, set()) 

286 if len(active) >= self._capacity_for(tt): 

287 continue 

288 queue = self._queues.get(tt, []) 

289 if not queue: 

290 continue 

291 tid = queue.pop(0) 

292 task = self._tasks.get(tid) 

293 if task: 

294 task.status = TaskStatus.ACTIVE 

295 task.started_at = time.monotonic() 

296 active.add(tid) 

297 advanced = task 

298 break 

299 if advanced is not None: 

300 self._notify() 

301 return advanced 

302 

303 def remove_task(self, task_id: str) -> None: 

304 """Remove a task from both live tracking and history. 

305 

306 Most callers shouldn't need this in normal flow. Completed rows 

307 linger in the Task Center on purpose so users can review recent 

308 work, and ``clear_history()`` handles bulk pruning. This stays 

309 for tests and administrative paths that need to drop a specific 

310 id. 

311 """ 

312 with self._lock: 

313 task = self._tasks.pop(task_id, None) 

314 if task: 

315 self._remove_from_active_locked(task_id, task.task_type) 

316 self._remove_from_queue_locked(task_id, task.task_type) 

317 self._history = [t for t in self._history if t.task_id != task_id] 

318 self._notify() 

319 

320 def clear_history(self) -> int: 

321 """Drop all DONE/FAILED/CANCELLED entries from history. 

322 

323 Returns the number of rows cleared. The Task Center binds this 

324 to ``C`` so the user can tidy up once they're done inspecting. 

325 """ 

326 with self._lock: 

327 cleared = len(self._history) 

328 self._history = [] 

329 # Also drop the backing task records so memory is actually freed. 

330 self._tasks = { 

331 tid: t 

332 for tid, t in self._tasks.items() 

333 if t.status in (TaskStatus.QUEUED, TaskStatus.ACTIVE) 

334 } 

335 if cleared: 

336 self._notify() 

337 return cleared 

338 

339 def _remove_from_active_locked(self, task_id: str, task_type: str) -> None: 

340 """Remove a task from active tracking. Caller must hold _lock.""" 

341 active = self._active_ids.get(task_type) 

342 if active is not None: 

343 active.discard(task_id) 

344 

345 def _remove_from_queue_locked(self, task_id: str, task_type: str) -> None: 

346 """Remove a task from its type queue. Caller must hold _lock.""" 

347 queue = self._queues.get(task_type) 

348 if queue: 

349 self._queues[task_type] = [tid for tid in queue if tid != task_id] 

350 

351 def _notify(self) -> None: 

352 # Snapshot under the lock so subscribe/unsubscribe from another thread 

353 # (or from inside a callback) cannot mutate the list mid-iteration. 

354 # Callbacks run outside the lock so synchronous subscribers that 

355 # re-enter the queue (e.g. TaskBar refreshing from active_tasks) 

356 # do not deadlock on the non-reentrant lock. 

357 with self._lock: 

358 callbacks = list(self._on_change) 

359 for callback in callbacks: 

360 callback()