Coverage for src / lilbee / cli / tui / widgets / task_bar.py: 100%

270 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-29 19:16 +0000

1"""TaskBar widget and controller. 

2 

3The TaskBar is a slim 1-line status indicator docked at the bottom of every 

4screen. It shows a count of active/queued tasks and directs users to the 

5Task Center (``t``) for detailed progress. Full progress panels with spinners 

6and progress bars live only in the Task Center screen. 

7 

8State ownership is split so the bar can render on every screen: 

9 

10- ``TaskBarController`` lives on the app (``app.task_bar``) and owns the 

11 single ``TaskQueue``. Every long-running operation in the app should be 

12 submitted to the controller via ``start_task`` (or the typed 

13 ``start_download`` specialization) so it survives any screen navigation. 

14- ``TaskBar`` is a stateless view widget composed by each Screen. It polls the 

15 shared queue at 10 Hz on the main event loop and re-renders in place; no 

16 thread marshaling or subscriber callbacks are involved in the render path. 

17""" 

18 

19from __future__ import annotations 

20 

21import contextlib 

22import logging 

23import threading 

24from collections.abc import Callable 

25from enum import StrEnum 

26from pathlib import Path 

27from typing import TYPE_CHECKING, Any, ClassVar 

28 

29from textual.app import ComposeResult 

30from textual.timer import Timer 

31from textual.widgets import Label, Static 

32 

33from lilbee import asyncio_loop 

34from lilbee.cancellation import TaskCancelled 

35from lilbee.cli.tui import messages as msg 

36from lilbee.cli.tui.task_queue import TaskQueue, TaskStatus, TaskType 

37from lilbee.cli.tui.thread_safe import call_from_thread 

38from lilbee.crawler import bootstrap_chromium, chromium_installed 

39from lilbee.progress import EventType, SetupProgressEvent 

40 

41if TYPE_CHECKING: 

42 from textual.app import App 

43 

44 from lilbee.catalog import CatalogModel 

45 

46log = logging.getLogger(__name__) 

47 

48_CSS_FILE = Path(__file__).parent / "task_bar.tcss" 

49 

50_DONE_FLASH_SECONDS = 2.0 

51_POLL_INTERVAL_SECONDS = 0.1 

52_DOWNLOAD_CONCURRENCY = 2 

53 

54# Pulsing-dot cadence: on/off flip at half of this tick count. 

55# 10 Hz poll x 5 = 500 ms per half cycle, which is a 1 Hz dot pulse, 

56# matching the active-row rail pulse in the Task Center. 

57_DOT_PULSE_HALF_TICKS = 5 

58_DOT_GLYPH = "●" 

59 

60 

61class TaskOutcome(StrEnum): 

62 """How a task terminated. Passed from worker thread to finalizer.""" 

63 

64 DONE = "done" 

65 FAILED = "failed" 

66 CANCELLED = "cancelled" 

67 

68 

69class ProgressReporter: 

70 """Thread-safe handle a worker uses to report progress and check cancellation. 

71 

72 The worker only sees this object; it never touches ``self.app``, 

73 ``call_from_thread``, or any screen. Writes to the lock-protected 

74 ``TaskQueue`` so updates survive any UI navigation. 

75 """ 

76 

77 def __init__(self, controller: TaskBarController, task_id: str) -> None: 

78 self._controller = controller 

79 self._task_id = task_id 

80 

81 @property 

82 def task_id(self) -> str: 

83 return self._task_id 

84 

85 @property 

86 def cancelled(self) -> bool: 

87 task = self._controller.queue.get_task(self._task_id) 

88 return task is not None and task.status == TaskStatus.CANCELLED 

89 

90 def check_cancelled(self) -> None: 

91 """Raise ``TaskCancelled`` if the task was cancelled from the UI.""" 

92 if self.cancelled: 

93 raise TaskCancelled 

94 

95 def update( 

96 self, progress: float, detail: str = "", *, indeterminate: bool | None = None 

97 ) -> None: 

98 """Write a progress snapshot to the shared queue. 

99 

100 Raises ``TaskCancelled`` first if the UI cancelled the task, so 

101 callers can use ``update`` as both a progress write and a cancel 

102 checkpoint. 

103 """ 

104 self.check_cancelled() 

105 self._controller.queue.update_task( 

106 self._task_id, progress, detail, indeterminate=indeterminate 

107 ) 

108 

109 

110TaskTarget = Callable[[ProgressReporter], None] 

111 

112 

113_BYTES_PER_MB = 1024 * 1024 

114 

115 

116def _chromium_bootstrap_target(reporter: ProgressReporter) -> None: 

117 """Worker target for the SETUP task: run bootstrap_chromium with progress forwarding. 

118 

119 Module-level so ``TaskBarController.ensure_chromium`` stays short and 

120 tests can stub the target in isolation. 

121 """ 

122 

123 def _forward(event_type: EventType, data: Any) -> None: 

124 if event_type != EventType.SETUP_PROGRESS: 

125 return 

126 if not isinstance(data, SetupProgressEvent): 

127 return 

128 total = data.total_bytes or 0 

129 pct = int(data.downloaded_bytes * 100 / total) if total > 0 else 0 

130 mb = data.downloaded_bytes // _BYTES_PER_MB 

131 if total > 0: 

132 detail = msg.SETUP_CHROMIUM_DETAIL.format(done=mb, total=total // _BYTES_PER_MB) 

133 else: 

134 detail = msg.SETUP_CHROMIUM_DETAIL_UNKNOWN.format(done=mb) 

135 reporter.update(pct, detail) 

136 

137 asyncio_loop.run(bootstrap_chromium(on_progress=_forward)) 

138 

139 

140class TaskBarController: 

141 """App-level owner of the shared TaskQueue + all long-running work. 

142 

143 The controller is attached as ``app.task_bar`` during 

144 ``LilbeeApp.__init__``. All task lifecycle methods 

145 (add/update/complete/fail/cancel) go through here so every ``TaskBar`` 

146 widget sees the same state, and every long-running op is spawned by 

147 this controller — never by a screen that may dismiss mid-flight. 

148 """ 

149 

150 def __init__(self, app: App[Any]) -> None: 

151 self.app = app 

152 self.queue = TaskQueue(capacity={TaskType.DOWNLOAD.value: _DOWNLOAD_CONCURRENCY}) 

153 # task_id -> (target, on_success). Worker looks up its target here 

154 # so we don't capture in a closure that outlives the task. 

155 self._task_targets: dict[str, tuple[TaskTarget, Callable[[], None] | None]] = {} 

156 

157 def add_task( 

158 self, 

159 name: str, 

160 task_type: str, 

161 fn: Callable[[], None] | None = None, 

162 *, 

163 indeterminate: bool = False, 

164 ) -> str: 

165 """Enqueue a task. Returns the new task_id.""" 

166 return self.queue.enqueue( 

167 fn or (lambda: None), name, task_type, indeterminate=indeterminate 

168 ) 

169 

170 def update_task( 

171 self, 

172 task_id: str, 

173 progress: float, 

174 detail: str = "", 

175 *, 

176 indeterminate: bool | None = None, 

177 ) -> None: 

178 """Update progress and detail text for a task.""" 

179 self.queue.update_task(task_id, progress, detail, indeterminate=indeterminate) 

180 

181 def complete_task(self, task_id: str) -> None: 

182 """Mark a task done. Row lingers in history until the user clears it.""" 

183 task_type = self._task_type_of(task_id) 

184 self.queue.complete_task(task_id) 

185 self._after_done_hooks(task_type) 

186 self._advance_all(task_type) 

187 

188 def fail_task(self, task_id: str, detail: str = "") -> None: 

189 """Mark a task failed. Row lingers in history until the user clears it.""" 

190 self.queue.fail_task(task_id, detail) 

191 self._advance_all(self._task_type_of(task_id)) 

192 

193 def cancel_task(self, task_id: str) -> None: 

194 """Mark a task cancelled. Row lingers in history until the user clears it.""" 

195 task_type = self._task_type_of(task_id) 

196 self.queue.cancel(task_id) 

197 self._advance_all(task_type) 

198 

199 def _after_done_hooks(self, task_type: str | None) -> None: 

200 """Side effects triggered by a DONE completion. 

201 

202 Callable from both the direct ``complete_task`` convenience and 

203 the worker-thread ``_finalize_task`` path so every success route 

204 stays in sync. Does NOT advance the queue; each caller picks the 

205 advance strategy that fits its context (``_advance_all`` vs 

206 ``_try_start_next``). 

207 """ 

208 if task_type == TaskType.DOWNLOAD.value: 

209 self._notify_model_installed() 

210 

211 def _task_type_of(self, task_id: str) -> str | None: 

212 task = self.queue.get_task(task_id) 

213 return task.task_type if task else None 

214 

215 def _advance_all(self, task_type: str | None) -> None: 

216 """Try to advance the freed type first, then any other idle type.""" 

217 if task_type: 

218 self.queue.advance(task_type) 

219 while self.queue.advance() is not None: 

220 pass 

221 

222 def ensure_chromium(self, on_ready: Callable[[], None]) -> None: 

223 """Kick off a Chromium bootstrap if missing, then call ``on_ready``. 

224 

225 If Chromium is already installed, ``on_ready`` runs immediately on 

226 the caller's thread. Otherwise a single SETUP task is enqueued 

227 that runs ``bootstrap_chromium``; on success the controller 

228 invokes ``on_ready`` on the worker thread via the task's 

229 ``on_success`` hook. On failure the SETUP task surfaces as FAILED 

230 and ``on_ready`` is NOT called (the follow-up work shouldn't 

231 proceed against a missing browser). 

232 

233 bb-wq8g: the on_ready hook is how callers like ``_do_crawl`` chain 

234 their real work behind the one-time bootstrap. 

235 """ 

236 if chromium_installed(): 

237 on_ready() 

238 return 

239 

240 self.start_task( 

241 msg.SETUP_CHROMIUM_NAME, 

242 TaskType.SETUP, 

243 _chromium_bootstrap_target, 

244 indeterminate=False, 

245 on_success=on_ready, 

246 ) 

247 

248 def start_task( 

249 self, 

250 name: str, 

251 task_type: TaskType, 

252 target: TaskTarget, 

253 *, 

254 indeterminate: bool = False, 

255 on_success: Callable[[], None] | None = None, 

256 ) -> str: 

257 """Enqueue a task, spawn its worker, return task_id. 

258 

259 The *target* receives a ``ProgressReporter`` as its only argument. 

260 It should periodically call ``reporter.update(percent, detail)`` and 

261 may call ``reporter.check_cancelled()`` to cooperatively abort. 

262 

263 On success (target returns normally) the queue marks the task DONE 

264 and ``on_success`` (if provided) runs after on the same worker 

265 thread. On ``TaskCancelled`` the task is marked CANCELLED. On any 

266 other exception the task is marked FAILED with ``str(exc)`` as 

267 detail. Rows linger in the Task Center under their final status 

268 until the user presses capital ``C`` to clear; the bottom bar 

269 flashes the outcome once and then hides when idle. 

270 

271 Per-type capacity in ``TaskQueue`` (download=2, everything else=1) 

272 controls concurrency: a second sync queues behind the first, but a 

273 third download waits until one of the two active downloads finishes. 

274 """ 

275 task_id = self.queue.enqueue( 

276 lambda: None, name, task_type.value, indeterminate=indeterminate 

277 ) 

278 self._task_targets[task_id] = (target, on_success) 

279 self._try_start_next(task_type.value) 

280 return task_id 

281 

282 def _try_start_next(self, task_type: str) -> None: 

283 """Promote queued tasks of this type into any free capacity slots.""" 

284 while (task := self.queue.advance(task_type)) is not None: 

285 self._spawn_task_worker(task.task_id) 

286 

287 def _spawn_task_worker(self, task_id: str) -> None: 

288 """Start a daemon thread for the task. Safe to call from any thread.""" 

289 if task_id not in self._task_targets: 

290 return 

291 thread = threading.Thread( 

292 target=self._run_task_worker, 

293 args=(task_id,), 

294 daemon=True, 

295 name=f"task-{task_id}", 

296 ) 

297 thread.start() 

298 

299 def _run_task_worker(self, task_id: str) -> None: 

300 """Body of the daemon worker thread.""" 

301 entry = self._task_targets.get(task_id) 

302 if entry is None: 

303 return 

304 target, on_success = entry 

305 task = self.queue.get_task(task_id) 

306 task_type = task.task_type if task is not None else None 

307 reporter = ProgressReporter(self, task_id) 

308 try: 

309 target(reporter) 

310 except TaskCancelled: 

311 log.info("Task %s cancelled", task_id) 

312 self._post_finalize(task_id, TaskOutcome.CANCELLED, "", task_type) 

313 except Exception as exc: 

314 log.warning("Task %s failed: %s", task_id, exc) 

315 self._post_finalize(task_id, TaskOutcome.FAILED, str(exc), task_type) 

316 else: 

317 self._post_finalize(task_id, TaskOutcome.DONE, "", task_type) 

318 if on_success is not None: 

319 try: 

320 on_success() 

321 except Exception: 

322 log.warning("on_success for %s raised", task_id, exc_info=True) 

323 finally: 

324 self._task_targets.pop(task_id, None) 

325 

326 def _post_finalize( 

327 self, task_id: str, outcome: TaskOutcome, detail: str, task_type: str | None 

328 ) -> None: 

329 """Marshal finalization back to the main thread. 

330 

331 Main-thread execution matters because ``set_timer`` (used for the 

332 flash-then-remove cycle) isn't safe from workers. ``call_from_thread`` 

333 targets ``self.app`` — the App is long-lived; screens are not. 

334 """ 

335 call_from_thread(self.app, self._finalize_task, task_id, outcome, detail, task_type) 

336 

337 def _finalize_task( 

338 self, task_id: str, outcome: TaskOutcome, detail: str, task_type: str | None 

339 ) -> None: 

340 """Mark the queue state, refresh dependents, promote next queued task. 

341 

342 Runs on the main thread. Atomically: free the active slot, notify 

343 anything downstream that needs a repaint (e.g. model dropdowns 

344 after a download lands), and advance the queue. Rows stay in 

345 history; the bottom bar flash expires on its own. Users clear 

346 finished rows from the Task Center manually. 

347 """ 

348 if outcome is TaskOutcome.DONE: 

349 self.queue.complete_task(task_id) 

350 self._after_done_hooks(task_type) 

351 elif outcome is TaskOutcome.FAILED: 

352 self.queue.fail_task(task_id, detail) 

353 elif outcome is TaskOutcome.CANCELLED: 

354 self.queue.cancel(task_id) 

355 if task_type: 

356 self._try_start_next(task_type) 

357 

358 def _notify_model_installed(self) -> None: 

359 """Refresh any ChatScreen's ModelBar so the new model is selectable. 

360 

361 The dropdowns are built once on mount from the registry; without 

362 this nudge, a freshly-downloaded model only appears after the 

363 user reopens the screen. NoMatches and similar query errors are 

364 silently skipped so a transient "bar not mounted yet" doesn't 

365 crash the finalize path; anything else is logged so a real 

366 failure surfaces in debug output. 

367 """ 

368 # Late import to avoid a circular (ChatScreen imports this module). 

369 from textual.css.query import QueryError 

370 

371 from lilbee.cli.tui.screens.chat import ChatScreen 

372 

373 for screen in self.app.screen_stack: 

374 # screen_stack is typed Screen[Any]; narrow at runtime to 

375 # locate the one screen that owns the ModelBar. 

376 if isinstance(screen, ChatScreen): 

377 try: 

378 screen.refresh_model_bar() 

379 except QueryError: 

380 log.debug("ModelBar not mounted yet; skipping refresh", exc_info=True) 

381 break 

382 

383 def start_download(self, model: CatalogModel) -> str: 

384 """Enqueue a model download and spawn a background worker. 

385 

386 Thin specialization of ``start_task`` that wires the HuggingFace 

387 ``download_model`` API and translates ``PermissionError`` into a 

388 friendly "repo requires login" message — gated repos are a common 

389 failure mode and the raw exception text is opaque. 

390 """ 

391 return self.start_task( 

392 model.display_name, 

393 TaskType.DOWNLOAD, 

394 lambda reporter: _download_target(reporter, model), 

395 ) 

396 

397 

398def _download_target(reporter: ProgressReporter, model: CatalogModel) -> None: 

399 """``start_task`` target for a HuggingFace model download. 

400 

401 Kept at module scope (not as a controller method) so it can be unit- 

402 tested without spinning up a controller. Translates 

403 ``PermissionError`` into the gated-repo friendly message so every call 

404 site (wizard, catalog, chat) gets consistent error UX. 

405 """ 

406 from lilbee.catalog import DownloadProgress, download_model, make_download_callback 

407 

408 def _on_progress(p: DownloadProgress) -> None: 

409 reporter.update(p.percent, f"{model.display_name}: {p.detail}") 

410 

411 callback = make_download_callback(_on_progress) 

412 try: 

413 download_model(model, on_progress=callback) 

414 except PermissionError as exc: 

415 raise RuntimeError(msg.CATALOG_GATED_REPO.format(name=model.display_name)) from exc 

416 

417 

418class TaskBar(Static): 

419 """Slim 1-line status indicator for background tasks. 

420 

421 Shows a compact summary when tasks are active and hides when idle. 

422 Detailed progress (spinners, progress bars, task panels) lives in 

423 the Task Center screen, accessible via ``t``. 

424 """ 

425 

426 # NOTE: no ``dock: bottom`` here. TaskBar is always mounted inside a 

427 # ``BottomBars`` container that owns the dock; multiple dock-bottom 

428 # siblings overlap at the same row in Textual (see BottomBars docstring). 

429 DEFAULT_CSS: ClassVar[str] = _CSS_FILE.read_text(encoding="utf-8") 

430 

431 def __init__(self, **kwargs: object) -> None: 

432 super().__init__(**kwargs) # type: ignore[arg-type] 

433 self._tick_count = 0 

434 # Timestamp (tick count) at which the current flash started. 

435 # None when no flash is active. The 2 s completion/failure 

436 # flash holds the coloured dot + summary past queue drain. 

437 self._flash_until_tick: int | None = None 

438 self._flash_outcome: TaskStatus | None = None 

439 # Task ids we've already flashed on. Task Center rows linger in 

440 # history after DONE/FAILED/CANCELLED so the user can review 

441 # recent work; without this gate the bar would re-flash the same 

442 # task every poll because ``history[-1]`` keeps matching. 

443 self._flashed_ids: set[str] = set() 

444 

445 def compose(self) -> ComposeResult: 

446 yield Label("", id="task-status-label") 

447 

448 def on_mount(self) -> None: 

449 self._refresh_display() 

450 # Capture the handle so we can cancel the poll on unmount. Without 

451 # this, a screen push/pop cycle leaves the previous TaskBar's 

452 # interval firing against a detached widget, racing with the new 

453 # TaskBar and occasionally setting ``display=False`` on the live 

454 # instance (bb-3uzp). 

455 self._interval: Timer | None = self.set_interval(_POLL_INTERVAL_SECONDS, self._tick) 

456 

457 def on_unmount(self) -> None: 

458 interval = getattr(self, "_interval", None) 

459 if interval is not None: 

460 interval.stop() 

461 self._interval = None 

462 

463 @property 

464 def _controller(self) -> TaskBarController: 

465 controller = getattr(self.app, "task_bar", None) 

466 if not isinstance(controller, TaskBarController): 

467 log.warning( 

468 "TaskBar mounted on %s without a TaskBarController; creating one lazily", 

469 type(self.app).__name__, 

470 ) 

471 controller = TaskBarController(self.app) 

472 self.app.task_bar = controller # type: ignore[attr-defined] 

473 return controller 

474 

475 @property 

476 def queue(self) -> TaskQueue: 

477 """Expose the shared queue for callers that iterate or advance it.""" 

478 return self._controller.queue 

479 

480 def add_task( 

481 self, 

482 name: str, 

483 task_type: str, 

484 fn: Callable[[], None] | None = None, 

485 *, 

486 indeterminate: bool = False, 

487 ) -> str: 

488 """Enqueue a task via the app's controller. Returns the task_id.""" 

489 return self._controller.add_task(name, task_type, fn, indeterminate=indeterminate) 

490 

491 def update_task( 

492 self, 

493 task_id: str, 

494 progress: float, 

495 detail: str = "", 

496 *, 

497 indeterminate: bool | None = None, 

498 ) -> None: 

499 self._controller.update_task(task_id, progress, detail, indeterminate=indeterminate) 

500 

501 def complete_task(self, task_id: str) -> None: 

502 self._controller.complete_task(task_id) 

503 

504 def fail_task(self, task_id: str, detail: str = "") -> None: 

505 self._controller.fail_task(task_id, detail) 

506 

507 def cancel_task(self, task_id: str) -> None: 

508 self._controller.cancel_task(task_id) 

509 

510 def _tick(self) -> None: 

511 """Poll the shared queue at 10 Hz and re-render.""" 

512 self._tick_count += 1 

513 self._refresh_display() 

514 

515 def _refresh_display(self) -> None: 

516 """Rebuild the 1-line status label from the shared queue. 

517 

518 Visual language: 

519 - Leading ``●`` pulses ``$primary`` <-> ``$primary-lighten-2`` at 1 Hz 

520 when anything is active. Dim ``$text-muted`` when only queued tasks 

521 remain, ``$success`` during a completion flash, ``$error`` during 

522 a failure flash. 

523 - The text either reads ``{name} {pct}`` (one active, zero queued), 

524 ``{N} tasks running`` (plural), ``{N} queued`` (throttle mode), 

525 or the flash copy. 

526 - Right-aligned muted-italic ``Press t for Tasks`` hint. 

527 """ 

528 queue = self.queue 

529 active = queue.active_tasks 

530 queued = queue.queued_tasks 

531 history = queue.history 

532 

533 # Drop flashed-id entries for tasks the user has cleared from 

534 # history. Without this prune, the set grows unbounded over a 

535 # long session even though any id not in history can't re-flash. 

536 if self._flashed_ids: 

537 live_ids = {t.task_id for t in history} 

538 self._flashed_ids &= live_ids 

539 

540 in_flash = self._flash_until_tick is not None and self._tick_count <= self._flash_until_tick 

541 if not in_flash: 

542 self._flash_until_tick = None 

543 self._flash_outcome = None 

544 # Flash on the freshest completion that hasn't been flashed 

545 # yet. History now persists (rows show as DONE in Task 

546 # Center until cleared), so we must gate by task_id instead 

547 # of "history is non-empty". 

548 if not active and not queued and history: 

549 last = history[-1] 

550 if last.task_id not in self._flashed_ids and last.status in ( 

551 TaskStatus.DONE, 

552 TaskStatus.FAILED, 

553 ): 

554 self._flashed_ids.add(last.task_id) 

555 self._flash_until_tick = self._tick_count + int( 

556 _DONE_FLASH_SECONDS / _POLL_INTERVAL_SECONDS 

557 ) 

558 self._flash_outcome = last.status 

559 

560 if not active and not queued and not in_flash and self._flash_outcome is None: 

561 self.display = False 

562 return 

563 

564 self.display = True 

565 dot_color, summary = self._compose_segments(active, queued) 

566 hint = f"[i dim]{self._hint_copy()}[/]" 

567 dot = f"[{dot_color}]{_DOT_GLYPH}[/]" 

568 label_text = f" {dot} {summary} {hint}" 

569 

570 with contextlib.suppress(Exception): 

571 label = self.query_one("#task-status-label", Label) 

572 label.update(label_text) 

573 

574 def _hint_copy(self) -> str: 

575 """Return the right-aligned hint, context-aware. 

576 

577 When a chat ``Input`` (or similar) is focused the ``t`` keypress is 

578 eaten before the app-level binding fires, so the user needs 

579 ``Esc then t``. Every other screen (wizard grid, catalog, 

580 settings, task center) lets ``t`` bubble, so a shorter ``Press t 

581 for Tasks`` is accurate and easier to scan. 

582 """ 

583 from textual.widgets import Input 

584 

585 try: 

586 focused = self.app.focused 

587 except Exception: 

588 return msg.TASKBAR_HINT 

589 if isinstance(focused, Input): 

590 return msg.TASKBAR_HINT_INPUT 

591 return msg.TASKBAR_HINT 

592 

593 def _compose_segments(self, active: list, queued: list) -> tuple[str, str]: 

594 """Return (dot color, text summary) for the current state.""" 

595 # Pulsing even/odd cadence, shared with TaskRow's rail pulse. 

596 on_beat = (self._tick_count // _DOT_PULSE_HALF_TICKS) % 2 == 0 

597 

598 if self._flash_outcome == TaskStatus.DONE: 

599 return "$success", msg.TASKBAR_ALL_DONE 

600 if self._flash_outcome == TaskStatus.FAILED: 

601 count = sum(1 for t in self.queue.history if t.status == TaskStatus.FAILED) 

602 key = msg.TASKBAR_FAILED if count == 1 else msg.TASKBAR_FAILED_PLURAL 

603 return "$error", key.format(count=count) 

604 

605 parts: list[str] = [] 

606 if active: 

607 count = len(active) 

608 task = active[0] 

609 if count == 1 and not queued: 

610 pct = "" if task.indeterminate else f" [b]{task.progress:.1f}%[/b]" 

611 parts.append(f"[b]{task.name}[/b]{pct}") 

612 else: 

613 key = msg.TASKBAR_ONE if count == 1 else msg.TASKBAR_MULTIPLE 

614 parts.append(key.format(count=count)) 

615 parts.append(f"[b]{task.name}[/b]") 

616 if queued: 

617 parts.append(f"[dim]{msg.TASKBAR_QUEUED_COUNT.format(count=len(queued))}[/dim]") 

618 

619 dot_color = ("$primary" if on_beat else "$primary-lighten-2") if active else "$text-muted" 

620 return dot_color, " · ".join(parts)