Coverage for src / lilbee / cli / tui / task_queue.py: 100%
204 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
1"""Per-type concurrent task queue for background operations (downloads, syncs, crawls).
3Each task type (download, sync, crawl) gets its own independent queue, so a long
4download does not block a sync from starting. Within a type, tasks run sequentially.
5"""
7from __future__ import annotations
9import logging
10import threading
11import time
12import uuid
13from collections.abc import Callable
14from dataclasses import dataclass
15from enum import StrEnum
17log = logging.getLogger(__name__)
20class TaskStatus(StrEnum):
21 """Lifecycle states for a queued task."""
23 QUEUED = "queued"
24 ACTIVE = "active"
25 DONE = "done"
26 FAILED = "failed"
27 CANCELLED = "cancelled"
30class TaskType(StrEnum):
31 """Canonical task types. Replaces raw string literals at call sites."""
33 DOWNLOAD = "download"
34 SYNC = "sync"
35 CRAWL = "crawl"
36 WIKI = "wiki"
37 ADD = "add"
38 REMOVE = "remove"
39 SETUP = "setup"
42STATUS_ICONS: dict[TaskStatus, str] = {
43 TaskStatus.QUEUED: "⏳",
44 TaskStatus.ACTIVE: "▶",
45 TaskStatus.DONE: "✓",
46 TaskStatus.FAILED: "✗",
47 TaskStatus.CANCELLED: "⊘",
48}
51@dataclass
52class Task:
53 """A single unit of work in the queue."""
55 task_id: str
56 name: str
57 task_type: str
58 fn: Callable[[], None]
59 status: TaskStatus = TaskStatus.QUEUED
60 progress: float = 0.0
61 detail: str = ""
62 indeterminate: bool = False
63 # Monotonic timestamp at which the task transitioned to ACTIVE. None
64 # while QUEUED. Used by the Task Center row to render elapsed time.
65 started_at: float | None = None
66 # Monotonic timestamp at which the task reached a terminal state
67 # (DONE / FAILED / CANCELLED). None while still running. Used to
68 # freeze the elapsed-time display so it doesn't keep ticking during
69 # the 2-second post-finish flash.
70 completed_at: float | None = None
73class TaskQueue:
74 """Per-type concurrent task queue.
75 Thread-safe. Each task type (download, sync, crawl, etc.) has its own
76 independent FIFO queue. One task per type can be active simultaneously,
77 so a download does not block a sync.
79 Callers receive a *task_id* they can use to update progress, cancel, or
80 query status.
81 """
83 def __init__(
84 self,
85 *,
86 on_change: Callable[[], None] | None = None,
87 capacity: dict[str, int] | None = None,
88 ) -> None:
89 self._lock = threading.Lock()
90 self._tasks: dict[str, Task] = {}
91 self._queues: dict[str, list[str]] = {}
92 # Per-type set of currently-active task ids. A "type" here means
93 # sync/crawl/download/wiki; each has its own FIFO and own active slots.
94 self._active_ids: dict[str, set[str]] = {}
95 # Max concurrent active tasks per type. Defaults to 1 (single-active).
96 # Callers override per type (e.g. "download": 2 to allow two concurrent
97 # model downloads). Types absent from the map implicitly cap at 1.
98 self._capacity: dict[str, int] = dict(capacity or {})
99 self._on_change: list[Callable[[], None]] = []
100 if on_change:
101 self._on_change.append(on_change)
102 self._history: list[Task] = []
104 def _capacity_for(self, task_type: str) -> int:
105 return self._capacity.get(task_type, 1)
107 def subscribe(self, callback: Callable[[], None]) -> None:
108 """Subscribe to task queue changes. Callback is called on any queue update."""
109 with self._lock:
110 if callback not in self._on_change:
111 self._on_change.append(callback)
113 def unsubscribe(self, callback: Callable[[], None]) -> None:
114 """Unsubscribe from task queue changes."""
115 with self._lock:
116 if callback in self._on_change:
117 self._on_change.remove(callback)
119 @property
120 def active_task(self) -> Task | None:
121 """Return any one active task (for backward compat). Prefer active_tasks."""
122 with self._lock:
123 for ids in self._active_ids.values():
124 for tid in ids:
125 task = self._tasks.get(tid)
126 if task:
127 return task
128 return None
130 @property
131 def active_tasks(self) -> list[Task]:
132 """Return all currently active tasks across all types."""
133 with self._lock:
134 tasks: list[Task] = []
135 for ids in self._active_ids.values():
136 for tid in ids:
137 task = self._tasks.get(tid)
138 if task:
139 tasks.append(task)
140 return tasks
142 @property
143 def queued_tasks(self) -> list[Task]:
144 with self._lock:
145 result: list[Task] = []
146 for tids in self._queues.values():
147 for tid in tids:
148 task = self._tasks.get(tid)
149 if task:
150 result.append(task)
151 return result
153 @property
154 def history(self) -> list[Task]:
155 with self._lock:
156 return list(self._history)
158 @property
159 def is_empty(self) -> bool:
160 with self._lock:
161 has_active = any(ids for ids in self._active_ids.values())
162 has_queued = any(len(q) > 0 for q in self._queues.values())
163 return not has_active and not has_queued
165 def get_task(self, task_id: str) -> Task | None:
166 """Look up a task by ID. Returns None if not found."""
167 with self._lock:
168 return self._tasks.get(task_id)
170 def enqueue(
171 self,
172 fn: Callable[[], None],
173 name: str,
174 task_type: str,
175 *,
176 indeterminate: bool = False,
177 ) -> str:
178 """Add a task to the per-type queue. Returns a task_id."""
179 task_id = uuid.uuid4().hex[:8]
180 task = Task(
181 task_id=task_id, name=name, task_type=task_type, fn=fn, indeterminate=indeterminate
182 )
183 with self._lock:
184 self._tasks[task_id] = task
185 self._queues.setdefault(task_type, []).append(task_id)
186 self._notify()
187 return task_id
189 def update_task(
190 self,
191 task_id: str,
192 progress: float,
193 detail: str = "",
194 *,
195 indeterminate: bool | None = None,
196 ) -> None:
197 """Update progress and detail text for a task.
198 When *indeterminate* is True the task's progress bar renders as a
199 pulsing indeterminate bar instead of a percentage. When explicitly
200 False it returns to determinate mode. ``None`` leaves the flag as-is
201 so incremental progress updates don't clobber the caller's intent.
202 """
203 with self._lock:
204 task = self._tasks.get(task_id)
205 if task:
206 task.progress = progress
207 task.detail = detail
208 if indeterminate is not None:
209 task.indeterminate = indeterminate
210 self._notify()
212 def complete_task(self, task_id: str) -> None:
213 """Mark a task as done and append it to history.
215 The task record stays in ``_tasks`` so callers can still look it
216 up by id. Bulk removal happens in ``clear_history`` or targeted
217 removal via ``remove_task``.
218 """
219 with self._lock:
220 task = self._tasks.get(task_id)
221 if task:
222 task.status = TaskStatus.DONE
223 task.progress = 100
224 task.indeterminate = False
225 task.completed_at = time.monotonic()
226 self._history.append(task)
227 self._remove_from_active_locked(task_id, task.task_type)
228 self._remove_from_queue_locked(task_id, task.task_type)
229 self._notify()
231 def fail_task(self, task_id: str, detail: str = "") -> None:
232 """Mark a task as failed and append it to history.
234 The task record stays in ``_tasks`` so callers can still inspect
235 its detail. Bulk removal happens in ``clear_history`` or
236 targeted removal via ``remove_task``.
237 """
238 with self._lock:
239 task = self._tasks.get(task_id)
240 if task:
241 task.status = TaskStatus.FAILED
242 task.detail = detail
243 task.completed_at = time.monotonic()
244 self._history.append(task)
245 self._remove_from_active_locked(task_id, task.task_type)
246 self._remove_from_queue_locked(task_id, task.task_type)
247 self._notify()
249 def cancel(self, task_id: str) -> bool:
250 """Cancel a queued or active task. Returns True if cancelled.
252 Terminal rows (DONE / FAILED / CANCELLED) are immutable: a cancel
253 call against an already-finished task is a no-op and returns
254 False so callers that key off the return value (e.g. UI actions)
255 don't treat it as success.
257 Appends the cancelled task to ``_history`` so the Task Center
258 renders it as a lingering ``cancelled`` row, matching the
259 post-flash contract for DONE and FAILED.
260 """
261 with self._lock:
262 task = self._tasks.get(task_id)
263 if not task:
264 return False
265 if task.status in (TaskStatus.DONE, TaskStatus.FAILED, TaskStatus.CANCELLED):
266 return False
267 task.status = TaskStatus.CANCELLED
268 task.completed_at = time.monotonic()
269 self._history.append(task)
270 self._remove_from_active_locked(task_id, task.task_type)
271 self._remove_from_queue_locked(task_id, task.task_type)
272 self._notify()
273 return True
275 def advance(self, task_type: str | None = None) -> Task | None:
276 """Pop the next queued task of this type and mark it active.
277 If *task_type* is given, only advance that type's queue.
278 If omitted, advance any type that still has a free slot.
279 Respects the per-type capacity: returns None once all slots are full.
280 """
281 advanced: Task | None = None
282 with self._lock:
283 types = [task_type] if task_type else list(self._queues.keys())
284 for tt in types:
285 active = self._active_ids.setdefault(tt, set())
286 if len(active) >= self._capacity_for(tt):
287 continue
288 queue = self._queues.get(tt, [])
289 if not queue:
290 continue
291 tid = queue.pop(0)
292 task = self._tasks.get(tid)
293 if task:
294 task.status = TaskStatus.ACTIVE
295 task.started_at = time.monotonic()
296 active.add(tid)
297 advanced = task
298 break
299 if advanced is not None:
300 self._notify()
301 return advanced
303 def remove_task(self, task_id: str) -> None:
304 """Remove a task from both live tracking and history.
306 Most callers shouldn't need this in normal flow. Completed rows
307 linger in the Task Center on purpose so users can review recent
308 work, and ``clear_history()`` handles bulk pruning. This stays
309 for tests and administrative paths that need to drop a specific
310 id.
311 """
312 with self._lock:
313 task = self._tasks.pop(task_id, None)
314 if task:
315 self._remove_from_active_locked(task_id, task.task_type)
316 self._remove_from_queue_locked(task_id, task.task_type)
317 self._history = [t for t in self._history if t.task_id != task_id]
318 self._notify()
320 def clear_history(self) -> int:
321 """Drop all DONE/FAILED/CANCELLED entries from history.
323 Returns the number of rows cleared. The Task Center binds this
324 to ``C`` so the user can tidy up once they're done inspecting.
325 """
326 with self._lock:
327 cleared = len(self._history)
328 self._history = []
329 # Also drop the backing task records so memory is actually freed.
330 self._tasks = {
331 tid: t
332 for tid, t in self._tasks.items()
333 if t.status in (TaskStatus.QUEUED, TaskStatus.ACTIVE)
334 }
335 if cleared:
336 self._notify()
337 return cleared
339 def _remove_from_active_locked(self, task_id: str, task_type: str) -> None:
340 """Remove a task from active tracking. Caller must hold _lock."""
341 active = self._active_ids.get(task_type)
342 if active is not None:
343 active.discard(task_id)
345 def _remove_from_queue_locked(self, task_id: str, task_type: str) -> None:
346 """Remove a task from its type queue. Caller must hold _lock."""
347 queue = self._queues.get(task_type)
348 if queue:
349 self._queues[task_type] = [tid for tid in queue if tid != task_id]
351 def _notify(self) -> None:
352 # Snapshot under the lock so subscribe/unsubscribe from another thread
353 # (or from inside a callback) cannot mutate the list mid-iteration.
354 # Callbacks run outside the lock so synchronous subscribers that
355 # re-enter the queue (e.g. TaskBar refreshing from active_tasks)
356 # do not deadlock on the non-reentrant lock.
357 with self._lock:
358 callbacks = list(self._on_change)
359 for callback in callbacks:
360 callback()