Coverage for src / lilbee / cli / tui / widgets / task_bar.py: 100%
270 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
1"""TaskBar widget and controller.
3The TaskBar is a slim 1-line status indicator docked at the bottom of every
4screen. It shows a count of active/queued tasks and directs users to the
5Task Center (``t``) for detailed progress. Full progress panels with spinners
6and progress bars live only in the Task Center screen.
8State ownership is split so the bar can render on every screen:
10- ``TaskBarController`` lives on the app (``app.task_bar``) and owns the
11 single ``TaskQueue``. Every long-running operation in the app should be
12 submitted to the controller via ``start_task`` (or the typed
13 ``start_download`` specialization) so it survives any screen navigation.
14- ``TaskBar`` is a stateless view widget composed by each Screen. It polls the
15 shared queue at 10 Hz on the main event loop and re-renders in place; no
16 thread marshaling or subscriber callbacks are involved in the render path.
17"""
19from __future__ import annotations
21import contextlib
22import logging
23import threading
24from collections.abc import Callable
25from enum import StrEnum
26from pathlib import Path
27from typing import TYPE_CHECKING, Any, ClassVar
29from textual.app import ComposeResult
30from textual.timer import Timer
31from textual.widgets import Label, Static
33from lilbee import asyncio_loop
34from lilbee.cancellation import TaskCancelled
35from lilbee.cli.tui import messages as msg
36from lilbee.cli.tui.task_queue import TaskQueue, TaskStatus, TaskType
37from lilbee.cli.tui.thread_safe import call_from_thread
38from lilbee.crawler import bootstrap_chromium, chromium_installed
39from lilbee.progress import EventType, SetupProgressEvent
41if TYPE_CHECKING:
42 from textual.app import App
44 from lilbee.catalog import CatalogModel
46log = logging.getLogger(__name__)
48_CSS_FILE = Path(__file__).parent / "task_bar.tcss"
50_DONE_FLASH_SECONDS = 2.0
51_POLL_INTERVAL_SECONDS = 0.1
52_DOWNLOAD_CONCURRENCY = 2
54# Pulsing-dot cadence: on/off flip at half of this tick count.
55# 10 Hz poll x 5 = 500 ms per half cycle, which is a 1 Hz dot pulse,
56# matching the active-row rail pulse in the Task Center.
57_DOT_PULSE_HALF_TICKS = 5
58_DOT_GLYPH = "●"
61class TaskOutcome(StrEnum):
62 """How a task terminated. Passed from worker thread to finalizer."""
64 DONE = "done"
65 FAILED = "failed"
66 CANCELLED = "cancelled"
69class ProgressReporter:
70 """Thread-safe handle a worker uses to report progress and check cancellation.
72 The worker only sees this object; it never touches ``self.app``,
73 ``call_from_thread``, or any screen. Writes to the lock-protected
74 ``TaskQueue`` so updates survive any UI navigation.
75 """
77 def __init__(self, controller: TaskBarController, task_id: str) -> None:
78 self._controller = controller
79 self._task_id = task_id
81 @property
82 def task_id(self) -> str:
83 return self._task_id
85 @property
86 def cancelled(self) -> bool:
87 task = self._controller.queue.get_task(self._task_id)
88 return task is not None and task.status == TaskStatus.CANCELLED
90 def check_cancelled(self) -> None:
91 """Raise ``TaskCancelled`` if the task was cancelled from the UI."""
92 if self.cancelled:
93 raise TaskCancelled
95 def update(
96 self, progress: float, detail: str = "", *, indeterminate: bool | None = None
97 ) -> None:
98 """Write a progress snapshot to the shared queue.
100 Raises ``TaskCancelled`` first if the UI cancelled the task, so
101 callers can use ``update`` as both a progress write and a cancel
102 checkpoint.
103 """
104 self.check_cancelled()
105 self._controller.queue.update_task(
106 self._task_id, progress, detail, indeterminate=indeterminate
107 )
110TaskTarget = Callable[[ProgressReporter], None]
113_BYTES_PER_MB = 1024 * 1024
116def _chromium_bootstrap_target(reporter: ProgressReporter) -> None:
117 """Worker target for the SETUP task: run bootstrap_chromium with progress forwarding.
119 Module-level so ``TaskBarController.ensure_chromium`` stays short and
120 tests can stub the target in isolation.
121 """
123 def _forward(event_type: EventType, data: Any) -> None:
124 if event_type != EventType.SETUP_PROGRESS:
125 return
126 if not isinstance(data, SetupProgressEvent):
127 return
128 total = data.total_bytes or 0
129 pct = int(data.downloaded_bytes * 100 / total) if total > 0 else 0
130 mb = data.downloaded_bytes // _BYTES_PER_MB
131 if total > 0:
132 detail = msg.SETUP_CHROMIUM_DETAIL.format(done=mb, total=total // _BYTES_PER_MB)
133 else:
134 detail = msg.SETUP_CHROMIUM_DETAIL_UNKNOWN.format(done=mb)
135 reporter.update(pct, detail)
137 asyncio_loop.run(bootstrap_chromium(on_progress=_forward))
140class TaskBarController:
141 """App-level owner of the shared TaskQueue + all long-running work.
143 The controller is attached as ``app.task_bar`` during
144 ``LilbeeApp.__init__``. All task lifecycle methods
145 (add/update/complete/fail/cancel) go through here so every ``TaskBar``
146 widget sees the same state, and every long-running op is spawned by
147 this controller — never by a screen that may dismiss mid-flight.
148 """
150 def __init__(self, app: App[Any]) -> None:
151 self.app = app
152 self.queue = TaskQueue(capacity={TaskType.DOWNLOAD.value: _DOWNLOAD_CONCURRENCY})
153 # task_id -> (target, on_success). Worker looks up its target here
154 # so we don't capture in a closure that outlives the task.
155 self._task_targets: dict[str, tuple[TaskTarget, Callable[[], None] | None]] = {}
157 def add_task(
158 self,
159 name: str,
160 task_type: str,
161 fn: Callable[[], None] | None = None,
162 *,
163 indeterminate: bool = False,
164 ) -> str:
165 """Enqueue a task. Returns the new task_id."""
166 return self.queue.enqueue(
167 fn or (lambda: None), name, task_type, indeterminate=indeterminate
168 )
170 def update_task(
171 self,
172 task_id: str,
173 progress: float,
174 detail: str = "",
175 *,
176 indeterminate: bool | None = None,
177 ) -> None:
178 """Update progress and detail text for a task."""
179 self.queue.update_task(task_id, progress, detail, indeterminate=indeterminate)
181 def complete_task(self, task_id: str) -> None:
182 """Mark a task done. Row lingers in history until the user clears it."""
183 task_type = self._task_type_of(task_id)
184 self.queue.complete_task(task_id)
185 self._after_done_hooks(task_type)
186 self._advance_all(task_type)
188 def fail_task(self, task_id: str, detail: str = "") -> None:
189 """Mark a task failed. Row lingers in history until the user clears it."""
190 self.queue.fail_task(task_id, detail)
191 self._advance_all(self._task_type_of(task_id))
193 def cancel_task(self, task_id: str) -> None:
194 """Mark a task cancelled. Row lingers in history until the user clears it."""
195 task_type = self._task_type_of(task_id)
196 self.queue.cancel(task_id)
197 self._advance_all(task_type)
199 def _after_done_hooks(self, task_type: str | None) -> None:
200 """Side effects triggered by a DONE completion.
202 Callable from both the direct ``complete_task`` convenience and
203 the worker-thread ``_finalize_task`` path so every success route
204 stays in sync. Does NOT advance the queue; each caller picks the
205 advance strategy that fits its context (``_advance_all`` vs
206 ``_try_start_next``).
207 """
208 if task_type == TaskType.DOWNLOAD.value:
209 self._notify_model_installed()
211 def _task_type_of(self, task_id: str) -> str | None:
212 task = self.queue.get_task(task_id)
213 return task.task_type if task else None
215 def _advance_all(self, task_type: str | None) -> None:
216 """Try to advance the freed type first, then any other idle type."""
217 if task_type:
218 self.queue.advance(task_type)
219 while self.queue.advance() is not None:
220 pass
222 def ensure_chromium(self, on_ready: Callable[[], None]) -> None:
223 """Kick off a Chromium bootstrap if missing, then call ``on_ready``.
225 If Chromium is already installed, ``on_ready`` runs immediately on
226 the caller's thread. Otherwise a single SETUP task is enqueued
227 that runs ``bootstrap_chromium``; on success the controller
228 invokes ``on_ready`` on the worker thread via the task's
229 ``on_success`` hook. On failure the SETUP task surfaces as FAILED
230 and ``on_ready`` is NOT called (the follow-up work shouldn't
231 proceed against a missing browser).
233 bb-wq8g: the on_ready hook is how callers like ``_do_crawl`` chain
234 their real work behind the one-time bootstrap.
235 """
236 if chromium_installed():
237 on_ready()
238 return
240 self.start_task(
241 msg.SETUP_CHROMIUM_NAME,
242 TaskType.SETUP,
243 _chromium_bootstrap_target,
244 indeterminate=False,
245 on_success=on_ready,
246 )
248 def start_task(
249 self,
250 name: str,
251 task_type: TaskType,
252 target: TaskTarget,
253 *,
254 indeterminate: bool = False,
255 on_success: Callable[[], None] | None = None,
256 ) -> str:
257 """Enqueue a task, spawn its worker, return task_id.
259 The *target* receives a ``ProgressReporter`` as its only argument.
260 It should periodically call ``reporter.update(percent, detail)`` and
261 may call ``reporter.check_cancelled()`` to cooperatively abort.
263 On success (target returns normally) the queue marks the task DONE
264 and ``on_success`` (if provided) runs after on the same worker
265 thread. On ``TaskCancelled`` the task is marked CANCELLED. On any
266 other exception the task is marked FAILED with ``str(exc)`` as
267 detail. Rows linger in the Task Center under their final status
268 until the user presses capital ``C`` to clear; the bottom bar
269 flashes the outcome once and then hides when idle.
271 Per-type capacity in ``TaskQueue`` (download=2, everything else=1)
272 controls concurrency: a second sync queues behind the first, but a
273 third download waits until one of the two active downloads finishes.
274 """
275 task_id = self.queue.enqueue(
276 lambda: None, name, task_type.value, indeterminate=indeterminate
277 )
278 self._task_targets[task_id] = (target, on_success)
279 self._try_start_next(task_type.value)
280 return task_id
282 def _try_start_next(self, task_type: str) -> None:
283 """Promote queued tasks of this type into any free capacity slots."""
284 while (task := self.queue.advance(task_type)) is not None:
285 self._spawn_task_worker(task.task_id)
287 def _spawn_task_worker(self, task_id: str) -> None:
288 """Start a daemon thread for the task. Safe to call from any thread."""
289 if task_id not in self._task_targets:
290 return
291 thread = threading.Thread(
292 target=self._run_task_worker,
293 args=(task_id,),
294 daemon=True,
295 name=f"task-{task_id}",
296 )
297 thread.start()
299 def _run_task_worker(self, task_id: str) -> None:
300 """Body of the daemon worker thread."""
301 entry = self._task_targets.get(task_id)
302 if entry is None:
303 return
304 target, on_success = entry
305 task = self.queue.get_task(task_id)
306 task_type = task.task_type if task is not None else None
307 reporter = ProgressReporter(self, task_id)
308 try:
309 target(reporter)
310 except TaskCancelled:
311 log.info("Task %s cancelled", task_id)
312 self._post_finalize(task_id, TaskOutcome.CANCELLED, "", task_type)
313 except Exception as exc:
314 log.warning("Task %s failed: %s", task_id, exc)
315 self._post_finalize(task_id, TaskOutcome.FAILED, str(exc), task_type)
316 else:
317 self._post_finalize(task_id, TaskOutcome.DONE, "", task_type)
318 if on_success is not None:
319 try:
320 on_success()
321 except Exception:
322 log.warning("on_success for %s raised", task_id, exc_info=True)
323 finally:
324 self._task_targets.pop(task_id, None)
326 def _post_finalize(
327 self, task_id: str, outcome: TaskOutcome, detail: str, task_type: str | None
328 ) -> None:
329 """Marshal finalization back to the main thread.
331 Main-thread execution matters because ``set_timer`` (used for the
332 flash-then-remove cycle) isn't safe from workers. ``call_from_thread``
333 targets ``self.app`` — the App is long-lived; screens are not.
334 """
335 call_from_thread(self.app, self._finalize_task, task_id, outcome, detail, task_type)
337 def _finalize_task(
338 self, task_id: str, outcome: TaskOutcome, detail: str, task_type: str | None
339 ) -> None:
340 """Mark the queue state, refresh dependents, promote next queued task.
342 Runs on the main thread. Atomically: free the active slot, notify
343 anything downstream that needs a repaint (e.g. model dropdowns
344 after a download lands), and advance the queue. Rows stay in
345 history; the bottom bar flash expires on its own. Users clear
346 finished rows from the Task Center manually.
347 """
348 if outcome is TaskOutcome.DONE:
349 self.queue.complete_task(task_id)
350 self._after_done_hooks(task_type)
351 elif outcome is TaskOutcome.FAILED:
352 self.queue.fail_task(task_id, detail)
353 elif outcome is TaskOutcome.CANCELLED:
354 self.queue.cancel(task_id)
355 if task_type:
356 self._try_start_next(task_type)
358 def _notify_model_installed(self) -> None:
359 """Refresh any ChatScreen's ModelBar so the new model is selectable.
361 The dropdowns are built once on mount from the registry; without
362 this nudge, a freshly-downloaded model only appears after the
363 user reopens the screen. NoMatches and similar query errors are
364 silently skipped so a transient "bar not mounted yet" doesn't
365 crash the finalize path; anything else is logged so a real
366 failure surfaces in debug output.
367 """
368 # Late import to avoid a circular (ChatScreen imports this module).
369 from textual.css.query import QueryError
371 from lilbee.cli.tui.screens.chat import ChatScreen
373 for screen in self.app.screen_stack:
374 # screen_stack is typed Screen[Any]; narrow at runtime to
375 # locate the one screen that owns the ModelBar.
376 if isinstance(screen, ChatScreen):
377 try:
378 screen.refresh_model_bar()
379 except QueryError:
380 log.debug("ModelBar not mounted yet; skipping refresh", exc_info=True)
381 break
383 def start_download(self, model: CatalogModel) -> str:
384 """Enqueue a model download and spawn a background worker.
386 Thin specialization of ``start_task`` that wires the HuggingFace
387 ``download_model`` API and translates ``PermissionError`` into a
388 friendly "repo requires login" message — gated repos are a common
389 failure mode and the raw exception text is opaque.
390 """
391 return self.start_task(
392 model.display_name,
393 TaskType.DOWNLOAD,
394 lambda reporter: _download_target(reporter, model),
395 )
398def _download_target(reporter: ProgressReporter, model: CatalogModel) -> None:
399 """``start_task`` target for a HuggingFace model download.
401 Kept at module scope (not as a controller method) so it can be unit-
402 tested without spinning up a controller. Translates
403 ``PermissionError`` into the gated-repo friendly message so every call
404 site (wizard, catalog, chat) gets consistent error UX.
405 """
406 from lilbee.catalog import DownloadProgress, download_model, make_download_callback
408 def _on_progress(p: DownloadProgress) -> None:
409 reporter.update(p.percent, f"{model.display_name}: {p.detail}")
411 callback = make_download_callback(_on_progress)
412 try:
413 download_model(model, on_progress=callback)
414 except PermissionError as exc:
415 raise RuntimeError(msg.CATALOG_GATED_REPO.format(name=model.display_name)) from exc
418class TaskBar(Static):
419 """Slim 1-line status indicator for background tasks.
421 Shows a compact summary when tasks are active and hides when idle.
422 Detailed progress (spinners, progress bars, task panels) lives in
423 the Task Center screen, accessible via ``t``.
424 """
426 # NOTE: no ``dock: bottom`` here. TaskBar is always mounted inside a
427 # ``BottomBars`` container that owns the dock; multiple dock-bottom
428 # siblings overlap at the same row in Textual (see BottomBars docstring).
429 DEFAULT_CSS: ClassVar[str] = _CSS_FILE.read_text(encoding="utf-8")
431 def __init__(self, **kwargs: object) -> None:
432 super().__init__(**kwargs) # type: ignore[arg-type]
433 self._tick_count = 0
434 # Timestamp (tick count) at which the current flash started.
435 # None when no flash is active. The 2 s completion/failure
436 # flash holds the coloured dot + summary past queue drain.
437 self._flash_until_tick: int | None = None
438 self._flash_outcome: TaskStatus | None = None
439 # Task ids we've already flashed on. Task Center rows linger in
440 # history after DONE/FAILED/CANCELLED so the user can review
441 # recent work; without this gate the bar would re-flash the same
442 # task every poll because ``history[-1]`` keeps matching.
443 self._flashed_ids: set[str] = set()
445 def compose(self) -> ComposeResult:
446 yield Label("", id="task-status-label")
448 def on_mount(self) -> None:
449 self._refresh_display()
450 # Capture the handle so we can cancel the poll on unmount. Without
451 # this, a screen push/pop cycle leaves the previous TaskBar's
452 # interval firing against a detached widget, racing with the new
453 # TaskBar and occasionally setting ``display=False`` on the live
454 # instance (bb-3uzp).
455 self._interval: Timer | None = self.set_interval(_POLL_INTERVAL_SECONDS, self._tick)
457 def on_unmount(self) -> None:
458 interval = getattr(self, "_interval", None)
459 if interval is not None:
460 interval.stop()
461 self._interval = None
463 @property
464 def _controller(self) -> TaskBarController:
465 controller = getattr(self.app, "task_bar", None)
466 if not isinstance(controller, TaskBarController):
467 log.warning(
468 "TaskBar mounted on %s without a TaskBarController; creating one lazily",
469 type(self.app).__name__,
470 )
471 controller = TaskBarController(self.app)
472 self.app.task_bar = controller # type: ignore[attr-defined]
473 return controller
475 @property
476 def queue(self) -> TaskQueue:
477 """Expose the shared queue for callers that iterate or advance it."""
478 return self._controller.queue
480 def add_task(
481 self,
482 name: str,
483 task_type: str,
484 fn: Callable[[], None] | None = None,
485 *,
486 indeterminate: bool = False,
487 ) -> str:
488 """Enqueue a task via the app's controller. Returns the task_id."""
489 return self._controller.add_task(name, task_type, fn, indeterminate=indeterminate)
491 def update_task(
492 self,
493 task_id: str,
494 progress: float,
495 detail: str = "",
496 *,
497 indeterminate: bool | None = None,
498 ) -> None:
499 self._controller.update_task(task_id, progress, detail, indeterminate=indeterminate)
501 def complete_task(self, task_id: str) -> None:
502 self._controller.complete_task(task_id)
504 def fail_task(self, task_id: str, detail: str = "") -> None:
505 self._controller.fail_task(task_id, detail)
507 def cancel_task(self, task_id: str) -> None:
508 self._controller.cancel_task(task_id)
510 def _tick(self) -> None:
511 """Poll the shared queue at 10 Hz and re-render."""
512 self._tick_count += 1
513 self._refresh_display()
515 def _refresh_display(self) -> None:
516 """Rebuild the 1-line status label from the shared queue.
518 Visual language:
519 - Leading ``●`` pulses ``$primary`` <-> ``$primary-lighten-2`` at 1 Hz
520 when anything is active. Dim ``$text-muted`` when only queued tasks
521 remain, ``$success`` during a completion flash, ``$error`` during
522 a failure flash.
523 - The text either reads ``{name} {pct}`` (one active, zero queued),
524 ``{N} tasks running`` (plural), ``{N} queued`` (throttle mode),
525 or the flash copy.
526 - Right-aligned muted-italic ``Press t for Tasks`` hint.
527 """
528 queue = self.queue
529 active = queue.active_tasks
530 queued = queue.queued_tasks
531 history = queue.history
533 # Drop flashed-id entries for tasks the user has cleared from
534 # history. Without this prune, the set grows unbounded over a
535 # long session even though any id not in history can't re-flash.
536 if self._flashed_ids:
537 live_ids = {t.task_id for t in history}
538 self._flashed_ids &= live_ids
540 in_flash = self._flash_until_tick is not None and self._tick_count <= self._flash_until_tick
541 if not in_flash:
542 self._flash_until_tick = None
543 self._flash_outcome = None
544 # Flash on the freshest completion that hasn't been flashed
545 # yet. History now persists (rows show as DONE in Task
546 # Center until cleared), so we must gate by task_id instead
547 # of "history is non-empty".
548 if not active and not queued and history:
549 last = history[-1]
550 if last.task_id not in self._flashed_ids and last.status in (
551 TaskStatus.DONE,
552 TaskStatus.FAILED,
553 ):
554 self._flashed_ids.add(last.task_id)
555 self._flash_until_tick = self._tick_count + int(
556 _DONE_FLASH_SECONDS / _POLL_INTERVAL_SECONDS
557 )
558 self._flash_outcome = last.status
560 if not active and not queued and not in_flash and self._flash_outcome is None:
561 self.display = False
562 return
564 self.display = True
565 dot_color, summary = self._compose_segments(active, queued)
566 hint = f"[i dim]{self._hint_copy()}[/]"
567 dot = f"[{dot_color}]{_DOT_GLYPH}[/]"
568 label_text = f" {dot} {summary} {hint}"
570 with contextlib.suppress(Exception):
571 label = self.query_one("#task-status-label", Label)
572 label.update(label_text)
574 def _hint_copy(self) -> str:
575 """Return the right-aligned hint, context-aware.
577 When a chat ``Input`` (or similar) is focused the ``t`` keypress is
578 eaten before the app-level binding fires, so the user needs
579 ``Esc then t``. Every other screen (wizard grid, catalog,
580 settings, task center) lets ``t`` bubble, so a shorter ``Press t
581 for Tasks`` is accurate and easier to scan.
582 """
583 from textual.widgets import Input
585 try:
586 focused = self.app.focused
587 except Exception:
588 return msg.TASKBAR_HINT
589 if isinstance(focused, Input):
590 return msg.TASKBAR_HINT_INPUT
591 return msg.TASKBAR_HINT
593 def _compose_segments(self, active: list, queued: list) -> tuple[str, str]:
594 """Return (dot color, text summary) for the current state."""
595 # Pulsing even/odd cadence, shared with TaskRow's rail pulse.
596 on_beat = (self._tick_count // _DOT_PULSE_HALF_TICKS) % 2 == 0
598 if self._flash_outcome == TaskStatus.DONE:
599 return "$success", msg.TASKBAR_ALL_DONE
600 if self._flash_outcome == TaskStatus.FAILED:
601 count = sum(1 for t in self.queue.history if t.status == TaskStatus.FAILED)
602 key = msg.TASKBAR_FAILED if count == 1 else msg.TASKBAR_FAILED_PLURAL
603 return "$error", key.format(count=count)
605 parts: list[str] = []
606 if active:
607 count = len(active)
608 task = active[0]
609 if count == 1 and not queued:
610 pct = "" if task.indeterminate else f" [b]{task.progress:.1f}%[/b]"
611 parts.append(f"[b]{task.name}[/b]{pct}")
612 else:
613 key = msg.TASKBAR_ONE if count == 1 else msg.TASKBAR_MULTIPLE
614 parts.append(key.format(count=count))
615 parts.append(f"[b]{task.name}[/b]")
616 if queued:
617 parts.append(f"[dim]{msg.TASKBAR_QUEUED_COUNT.format(count=len(queued))}[/dim]")
619 dot_color = ("$primary" if on_beat else "$primary-lighten-2") if active else "$text-muted"
620 return dot_color, " · ".join(parts)