Coverage for src / lilbee / cli / tui / screens / catalog.py: 100%

586 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-29 19:16 +0000

1"""Catalog screen -- browse and install models via grid or list view.""" 

2 

3from __future__ import annotations 

4 

5import contextlib 

6import logging 

7from dataclasses import dataclass 

8from typing import ClassVar 

9 

10from textual import on, work 

11from textual.app import ComposeResult 

12from textual.binding import Binding, BindingType 

13from textual.containers import VerticalScroll 

14from textual.events import Click 

15from textual.screen import Screen 

16from textual.widgets import Input, Static 

17from textual.worker import Worker, WorkerState 

18 

19from lilbee.catalog import ( 

20 CatalogModel, 

21 ModelFamily, 

22 ModelVariant, 

23 get_catalog, 

24 get_families, 

25) 

26from lilbee.cli.tui import messages as msg 

27from lilbee.cli.tui.app import apply_active_model 

28from lilbee.cli.tui.screens.catalog_utils import ( 

29 SORT_KEYS, 

30 TableRow, 

31 catalog_to_row, 

32 matches_search, 

33 remote_to_row, 

34 variant_to_row, 

35) 

36from lilbee.cli.tui.widgets.grid_select import GridSelect 

37from lilbee.cli.tui.widgets.model_card import ModelCard 

38from lilbee.cli.tui.widgets.model_list_item import ModelListItem 

39from lilbee.cli.tui.widgets.nav_aware_input import NavAwareInput 

40from lilbee.cli.tui.widgets.search_hf_cta_item import SearchHFCtaItem 

41from lilbee.config import cfg 

42from lilbee.model_manager import RemoteModel, get_model_manager 

43from lilbee.models import ModelTask 

44from lilbee.providers.model_ref import OLLAMA_PREFIX 

45from lilbee.providers.sdk_backend import OLLAMA_BACKEND_NAME 

46 

47log = logging.getLogger(__name__) 

48 

49_HF_PAGE_SIZE = 25 

50# When the highlighted row is within this many rows of the end we 

51# auto-fetch the next page. Small enough that the request is already 

52# in flight by the time the user reaches the bottom. 

53_HF_LOAD_MORE_TRIGGER = 5 

54# Long enough to register; short enough to clear before a warm-cache fetch. 

55_NOTIFY_SEARCHING_TIMEOUT_SECONDS = 4 

56_ALL_TASKS = tuple(ModelTask) 

57 

58_WORKER_FETCH_HF = "fetch_hf_models" 

59_WORKER_FETCH_MORE_HF = "fetch_more_hf" 

60_WORKER_FETCH_REMOTE = "fetch_remote_models" 

61_WORKER_FETCH_SEARCH = "fetch_hf_search" 

62 

63_GRID_PAGE_ROWS = 3 

64_LIST_PAGE_ROWS = 10 

65 

66# Sort columns cycled by the `s` keybinding in list view. 

67_SORT_CYCLE: tuple[str, ...] = ("Name", "Downloads", "Size", "Params") 

68 

69 

70class CatalogScreen(Screen[None]): 

71 """Model catalog with grid (default) and list views.""" 

72 

73 CSS_PATH = "catalog.tcss" 

74 AUTO_FOCUS = "" # GridSelect is mounted dynamically; focused in on_mount 

75 

76 HELP = ( 

77 "# Catalog\n" 

78 "Browse and install models.\n\n" 

79 "Use arrows to navigate the grid, Enter to install." 

80 ) 

81 

82 _ACTION_GROUP = Binding.Group("Actions", compact=True) 

83 _SCROLL_GROUP = Binding.Group("Scroll", compact=True) 

84 

85 BINDINGS: ClassVar[list[BindingType]] = [ 

86 Binding("q", "go_back", "Back", show=True, group=_ACTION_GROUP), 

87 Binding("escape", "go_back", "Back", show=True), 

88 Binding("v", "toggle_view", "View", show=True, group=_ACTION_GROUP), 

89 Binding("slash", "focus_search", "Search", show=True, group=_ACTION_GROUP), 

90 Binding("d", "delete_model", "Delete", show=True, group=_ACTION_GROUP), 

91 Binding("x", "delete_model", "Delete", show=False), 

92 Binding("j", "cursor_down", "Nav", show=False, group=_SCROLL_GROUP), 

93 Binding("k", "cursor_up", "Nav", show=False, group=_SCROLL_GROUP), 

94 Binding("g", "jump_top", "Top", show=False, group=_SCROLL_GROUP), 

95 Binding("G", "jump_bottom", "End", show=False, group=_SCROLL_GROUP), 

96 Binding("space", "page_down", "PgDn", show=False, group=_SCROLL_GROUP), 

97 Binding("ctrl+d", "page_down", "PgDn", show=False, group=_SCROLL_GROUP), 

98 Binding("ctrl+u", "page_up", "PgUp", show=False, group=_SCROLL_GROUP), 

99 # Hidden from the footer so catalog still has <=5 visible bindings; 

100 # the sort-label surfaces "press n for more" and "press s to sort" 

101 # to the user instead. 

102 Binding("n", "load_more", "More", show=False, group=_ACTION_GROUP), 

103 Binding("s", "cycle_sort", "Sort", show=False, group=_ACTION_GROUP), 

104 ] 

105 

106 def __init__(self) -> None: 

107 super().__init__() 

108 self._families: list[ModelFamily] = get_families() 

109 self._hf_models: list[CatalogModel] = [] 

110 self._remote_models: list[RemoteModel] = [] 

111 self._hf_offset = 0 

112 self._hf_has_more = True 

113 self._rows: list[TableRow] = [] 

114 self._sort_column: str = "Name" 

115 self._sort_ascending: bool = True 

116 self._pending_delete: str | None = None 

117 self._installed_names: set[str] = set() 

118 self._grid_view: bool = True 

119 self._hf_fetched: bool = False 

120 self._loading_more: bool = False 

121 self._grid_cache_key: tuple[tuple[tuple[str, bool], ...], str] | tuple = () 

122 self._search_in_flight: bool = False 

123 

124 def compose(self) -> ComposeResult: 

125 from textual.widgets import Footer 

126 

127 from lilbee.cli.tui.widgets.bottom_bars import BottomBars 

128 from lilbee.cli.tui.widgets.status_bar import ViewTabs 

129 from lilbee.cli.tui.widgets.task_bar import TaskBar 

130 from lilbee.cli.tui.widgets.top_bars import TopBars 

131 

132 with TopBars(): 

133 yield ViewTabs() 

134 yield NavAwareInput(placeholder=msg.CATALOG_FILTER_PLACEHOLDER, id="catalog-search") 

135 yield Static("", id="sort-label", shrink=True) 

136 yield VerticalScroll(id="catalog-grid") 

137 yield VerticalScroll(id="catalog-list") 

138 yield Static("", id="model-detail") 

139 with BottomBars(): 

140 yield TaskBar() 

141 yield Footer() 

142 

143 def on_mount(self) -> None: 

144 self._fetch_installed_names() 

145 self.add_class("-grid-view") 

146 self._refresh_grid() 

147 self._focus_first_grid() 

148 self._fetch_remote_models() 

149 

150 def _focus_first_grid(self) -> None: 

151 """Focus the first GridSelect widget if available.""" 

152 with contextlib.suppress(Exception): 

153 self.query_one(GridSelect).focus() 

154 

155 def _fetch_installed_names(self) -> None: 

156 """Populate installed identities from registry manifests. 

157 

158 Stored set contains both the canonical ref (``hf_repo/filename``) 

159 and the bare ``hf_repo`` so catalog rows whose ref is the repo 

160 alone still light up as installed when at least one quant of 

161 that repo has a manifest. 

162 """ 

163 with contextlib.suppress(Exception): 

164 from lilbee.registry import ModelRegistry 

165 

166 registry = ModelRegistry(cfg.models_dir) 

167 self._installed_names = set() 

168 for m in registry.list_installed(): 

169 self._installed_names.add(m.ref) 

170 self._installed_names.add(m.hf_repo) 

171 

172 def action_toggle_view(self) -> None: 

173 """Toggle between grid and list view.""" 

174 if self._grid_view: 

175 self._grid_view = False 

176 self.remove_class("-grid-view") 

177 self.add_class("-list-view") 

178 if not self._hf_fetched: 

179 self._hf_fetched = True 

180 self._fetch_all_hf_models() 

181 self._refresh_list() 

182 self._focus_list_item(0) 

183 else: 

184 self._grid_view = True 

185 self.remove_class("-list-view") 

186 self.add_class("-grid-view") 

187 self._refresh_grid() 

188 with contextlib.suppress(Exception): 

189 self.query_one("#catalog-grid GridSelect", GridSelect).focus() 

190 

191 def action_focus_search(self) -> None: 

192 """Focus the filter input -- bound to / key.""" 

193 self.query_one("#catalog-search", Input).focus() 

194 

195 @on(Input.Changed, "#catalog-search") 

196 def _on_search_changed(self, event: Input.Changed) -> None: 

197 """Filter models when search input changes.""" 

198 if self._grid_view: 

199 self._filter_grid() 

200 self._sync_grid_search_cta() 

201 else: 

202 self._filter_list() 

203 

204 @on(Input.Submitted, "#catalog-search") 

205 def _on_search_submitted(self, event: Input.Submitted) -> None: 

206 """Enter installs the first visible match; falls through to the HF CTA 

207 when nothing matches locally so the obvious intent ('search for this') 

208 doesn't require the user to Tab over to the CTA row first.""" 

209 if self._grid_view: 

210 if any(card.display for card in self.query(ModelCard)): 

211 self._select_first_visible_grid_card() 

212 return 

213 elif any(item.display for item in self.query(ModelListItem)): 

214 self._select_first_visible_list_item() 

215 return 

216 self._trigger_remote_search(self._get_search_text()) 

217 

218 def _trigger_remote_search(self, query: str) -> None: 

219 """Fire the HF search worker, unless one is already in flight.""" 

220 if self._search_in_flight or not query: 

221 return 

222 self._search_in_flight = True 

223 self._update_sort_label() 

224 # Sort label is hidden in grid view, so the toast is the only feedback there. 

225 self.notify(msg.CATALOG_SEARCHING_HF, timeout=_NOTIFY_SEARCHING_TIMEOUT_SECONDS) 

226 self._fetch_hf_search(query) 

227 

228 @on(SearchHFCtaItem.Selected) 

229 def _on_search_hf_cta_selected(self, event: SearchHFCtaItem.Selected) -> None: 

230 self._trigger_remote_search(event.term) 

231 

232 @on(Click, ".search-hf-cta") 

233 def _on_search_hf_cta_clicked(self) -> None: 

234 self._trigger_remote_search(self._get_search_text()) 

235 

236 def _select_first_visible_grid_card(self) -> None: 

237 """Focus the first grid with a visible match and trigger its install. 

238 

239 Without the "first visible" walk, focusing any grid with 

240 ``highlighted = 0`` could land on a card the filter just hid, 

241 and Enter would install the wrong model. Setting 

242 ``highlighted`` to the first visible index guarantees the 

243 install fires on what the user can actually see. 

244 """ 

245 with contextlib.suppress(Exception): 

246 for grid in self.query(GridSelect): 

247 visible = [i for i, card in enumerate(grid.children) if card.display] 

248 if visible: 

249 grid.focus() 

250 grid.highlighted = visible[0] 

251 grid.action_select() 

252 return 

253 

254 def _select_first_visible_list_item(self) -> None: 

255 """List-view counterpart: focus + install the first visible row.""" 

256 with contextlib.suppress(Exception): 

257 for item in self.query(ModelListItem): 

258 if item.display: 

259 item.focus() 

260 item.action_select() 

261 return 

262 

263 def _fetch_hf_page(self) -> list[CatalogModel]: 

264 """Fetch one page of HF models for all task types (runs in worker thread).""" 

265 all_models: list[CatalogModel] = [] 

266 seen_repos: set[str] = set() 

267 any_has_more = False 

268 for task in _ALL_TASKS: 

269 result = get_catalog( 

270 task=task, 

271 featured=False, 

272 limit=_HF_PAGE_SIZE, 

273 offset=self._hf_offset, 

274 ) 

275 if result.has_more: 

276 any_has_more = True 

277 for m in result.models: 

278 if not m.featured and m.hf_repo not in seen_repos: 

279 seen_repos.add(m.hf_repo) 

280 all_models.append(m) 

281 self._hf_has_more = any_has_more 

282 return all_models 

283 

284 @work(thread=True, name=_WORKER_FETCH_HF) 

285 def _fetch_all_hf_models(self) -> list[CatalogModel]: 

286 """Fetch HF models for all task types (replaces current list).""" 

287 return self._fetch_hf_page() 

288 

289 @work(thread=True, name=_WORKER_FETCH_REMOTE) 

290 def _fetch_remote_models(self) -> list[RemoteModel]: 

291 from lilbee.model_manager import classify_remote_models 

292 

293 return classify_remote_models(cfg.remote_base_url) 

294 

295 @work(thread=True, name=_WORKER_FETCH_MORE_HF) 

296 def _fetch_more_hf(self) -> list[CatalogModel]: 

297 """Fetch next page of HF models for all task types (extends current list).""" 

298 return self._fetch_hf_page() 

299 

300 @work(thread=True, name=_WORKER_FETCH_SEARCH, exit_on_error=False) 

301 def _fetch_hf_search(self, query: str) -> list[CatalogModel]: 

302 """Fetch HF models matching the user's search term (runs in worker thread).""" 

303 existing_repos = {m.hf_repo for m in self._hf_models} 

304 new_models: list[CatalogModel] = [] 

305 for task in _ALL_TASKS: 

306 result = get_catalog( 

307 task=task, 

308 featured=False, 

309 search=query, 

310 limit=_HF_PAGE_SIZE, 

311 offset=0, 

312 ) 

313 for m in result.models: 

314 if not m.featured and m.hf_repo not in existing_repos: 

315 existing_repos.add(m.hf_repo) 

316 new_models.append(m) 

317 return new_models 

318 

319 def on_worker_state_changed(self, event: Worker.StateChanged) -> None: 

320 # PENDING/RUNNING fire here too; only ERROR/CANCELLED should release latches. 

321 if event.state in (WorkerState.ERROR, WorkerState.CANCELLED): 

322 if event.worker.name == _WORKER_FETCH_MORE_HF: 

323 self._loading_more = False 

324 if event.worker.name == _WORKER_FETCH_SEARCH: 

325 self._search_in_flight = False 

326 self._update_sort_label() 

327 return 

328 if event.state != WorkerState.SUCCESS: 

329 return 

330 result = event.worker.result 

331 if not isinstance(result, list): 

332 return 

333 name = event.worker.name 

334 if name == _WORKER_FETCH_HF: 

335 self._hf_models = result 

336 elif name == _WORKER_FETCH_MORE_HF: 

337 self._hf_models.extend(result) 

338 self._loading_more = False 

339 elif name == _WORKER_FETCH_SEARCH: 

340 self._hf_fetched = True 

341 self._hf_models.extend(result) 

342 self._search_in_flight = False 

343 self._update_sort_label() 

344 elif name == _WORKER_FETCH_REMOTE: 

345 self._remote_models = result 

346 else: 

347 return 

348 self._refresh_view() 

349 

350 def _get_search_text(self) -> str: 

351 # Preserve the user's casing for display (e.g. the CTA label); matching 

352 # callers normalize via _normalize_for_search. 

353 return self.query_one("#catalog-search", Input).value.strip() 

354 

355 def _build_rows(self) -> list[TableRow]: 

356 """Build all table rows from current data sources.""" 

357 search = self._get_search_text() 

358 rows: list[TableRow] = [] 

359 rows.extend(self._build_family_rows(search)) 

360 rows.extend(self._build_hf_rows(search)) 

361 rows.extend(self._build_remote_rows(search)) 

362 return rows 

363 

364 def _build_family_rows(self, search: str) -> list[TableRow]: 

365 """Build rows from featured model families.""" 

366 rows: list[TableRow] = [] 

367 for fam in self._families: 

368 for v in fam.variants: 

369 installed = self._is_installed(v.hf_repo, repo=v.hf_repo, filename=v.filename) 

370 row = variant_to_row(v, fam, installed) 

371 if matches_search(row, search): 

372 rows.append(row) 

373 return rows 

374 

375 def _build_hf_rows(self, search: str) -> list[TableRow]: 

376 """Build rows from HuggingFace models.""" 

377 rows: list[TableRow] = [] 

378 for m in self._hf_models: 

379 installed = self._is_installed(m.ref, repo=m.hf_repo, filename=m.gguf_filename) 

380 row = catalog_to_row(m, installed) 

381 if matches_search(row, search): 

382 rows.append(row) 

383 return rows 

384 

385 def _build_remote_rows(self, search: str) -> list[TableRow]: 

386 """Build rows from remote (inference-only) models.""" 

387 rows: list[TableRow] = [] 

388 for rm in self._remote_models: 

389 row = remote_to_row(rm) 

390 if matches_search(row, search): 

391 rows.append(row) 

392 return rows 

393 

394 def _is_installed(self, name: str, repo: str = "", filename: str = "") -> bool: 

395 """Check if a model is installed by name or source repo/filename.""" 

396 if name in self._installed_names: 

397 return True 

398 if repo and filename: 

399 return f"{repo}/{filename}" in self._installed_names 

400 return False 

401 

402 def _sort_rows(self, rows: list[TableRow]) -> list[TableRow]: 

403 """Sort rows: featured first, then by current sort column.""" 

404 key_fn = SORT_KEYS.get(self._sort_column, SORT_KEYS["Name"]) 

405 # Stable sort: featured always first, then by column 

406 return sorted( 

407 rows, 

408 key=lambda r: (not r.featured, key_fn(r)), 

409 reverse=not self._sort_ascending, 

410 ) 

411 

412 def _refresh_view(self) -> None: 

413 """Refresh the active view (grid or list).""" 

414 if self._grid_view: 

415 self._refresh_grid() 

416 else: 

417 self._refresh_list() 

418 

419 def _refresh_grid(self) -> None: 

420 """Rebuild the grid view with all cards (called when data changes).""" 

421 family_rows = self._build_family_rows("") 

422 remote_rows = self._build_remote_rows("") 

423 hf_rows = self._build_hf_rows("") if self._hf_fetched else [] 

424 all_rows = family_rows + remote_rows + hf_rows 

425 # Include the full search text so toggle-back + value-change combinations 

426 # rebuild the grid (and therefore the CTA) with the current query. 

427 row_key = ( 

428 tuple((r.name, r.installed) for r in all_rows), 

429 self._get_search_text(), 

430 ) 

431 if self._grid_cache_key == row_key: 

432 return 

433 self._grid_cache_key = row_key 

434 container = self.query_one("#catalog-grid", VerticalScroll) 

435 container.remove_children() 

436 widgets_to_mount: list[Static | GridSelect] = [] 

437 for section in _group_rows_for_grid(all_rows): 

438 if not section.rows: 

439 continue 

440 widgets_to_mount.append(Static(section.heading, classes="section-heading")) 

441 cards = [ModelCard(row) for row in section.rows] 

442 grid = GridSelect(*cards, min_column_width=30, max_column_width=50) 

443 widgets_to_mount.append(grid) 

444 if not self._hf_fetched: 

445 widgets_to_mount.append( 

446 Static( 

447 msg.CATALOG_BROWSE_MORE, 

448 classes="grid-cta browse-more-hf", 

449 ) 

450 ) 

451 search = self._get_search_text() 

452 if search: 

453 widgets_to_mount.append( 

454 Static( 

455 msg.CATALOG_SEARCH_HF_CTA.format(query=search), 

456 classes="grid-cta search-hf-cta", 

457 ) 

458 ) 

459 widgets_to_mount.append( 

460 Static( 

461 msg.CATALOG_VIEW_TOGGLE_GRID, 

462 classes="grid-cta", 

463 ) 

464 ) 

465 container.mount_all(widgets_to_mount) 

466 

467 def _sync_grid_search_cta(self) -> None: 

468 """Mount/remove/update the grid-view search-HF CTA in response to typing.""" 

469 search = self._get_search_text() 

470 existing = self.query("#catalog-grid > .search-hf-cta") 

471 if not search: 

472 for w in existing: 

473 w.remove() 

474 return 

475 cta_text = msg.CATALOG_SEARCH_HF_CTA.format(query=search) 

476 if existing: 

477 for w in existing: 

478 if isinstance(w, Static): 

479 w.update(cta_text) 

480 return 

481 container = self.query_one("#catalog-grid", VerticalScroll) 

482 container.mount(Static(cta_text, classes="grid-cta search-hf-cta")) 

483 

484 def _filter_grid(self) -> None: 

485 """Filter visible cards by search text without recreating widgets.""" 

486 search = self._get_search_text() 

487 for card in self.query(ModelCard): 

488 card.display = matches_search(card.row, search) 

489 container = self.query_one("#catalog-grid", VerticalScroll) 

490 children = list(container.children) 

491 for i, child in enumerate(children): 

492 if not child.has_class("section-heading"): 

493 continue 

494 grid = children[i + 1] if i + 1 < len(children) else None 

495 if isinstance(grid, GridSelect): 

496 has_visible = any(c.display for c in grid.children) 

497 child.display = has_visible 

498 grid.display = has_visible 

499 

500 @on(Click, ".browse-more-hf") 

501 def _on_browse_more_clicked(self) -> None: 

502 """Fetch all models when the browse-more card is clicked.""" 

503 if not self._hf_fetched: 

504 self._hf_fetched = True 

505 self._fetch_all_hf_models() 

506 

507 @on(GridSelect.LeaveDown) 

508 def _on_grid_leave_down(self, event: GridSelect.LeaveDown) -> None: 

509 """Move focus to the next GridSelect or focusable widget.""" 

510 self.focus_next() 

511 

512 @on(GridSelect.LeaveUp) 

513 def _on_grid_leave_up(self, event: GridSelect.LeaveUp) -> None: 

514 """Move focus to the previous GridSelect or focusable widget.""" 

515 self.focus_previous() 

516 

517 @on(GridSelect.Selected) 

518 def _on_grid_selected(self, event: GridSelect.Selected) -> None: 

519 """Handle model selection from the grid view.""" 

520 if isinstance(event.widget, ModelCard): 

521 self._select_row(event.widget.row) 

522 

523 @on(ModelListItem.Selected) 

524 def _on_list_item_selected(self, event: ModelListItem.Selected) -> None: 

525 """Handle model selection from the list view.""" 

526 self._select_row(event.item.row) 

527 

528 def _refresh_list(self) -> None: 

529 """Rebuild the list view from current data; append HF search CTA when filtering.""" 

530 self._rows = self._sort_rows(self._build_rows()) 

531 container = self.query_one("#catalog-list", VerticalScroll) 

532 container.remove_children() 

533 widgets_to_mount: list[ModelListItem | SearchHFCtaItem] = [ 

534 ModelListItem(row) for row in self._rows 

535 ] 

536 search = self._get_search_text() 

537 if search: 

538 widgets_to_mount.append(SearchHFCtaItem(search)) 

539 if widgets_to_mount: 

540 container.mount_all(widgets_to_mount) 

541 self._update_sort_label() 

542 

543 def _filter_list(self) -> None: 

544 """Filter visible list items by search without rebuilding the list. 

545 

546 Per-keystroke path: toggles .display on existing ModelListItems 

547 and mounts/removes the HF CTA row as needed. Only _refresh_list 

548 (data change, sort change) remounts. 

549 """ 

550 search = self._get_search_text() 

551 for item in self.query(ModelListItem): 

552 item.display = matches_search(item.row, search) 

553 self._sync_list_search_cta(search) 

554 self._update_sort_label() 

555 

556 def _sync_list_search_cta(self, search: str) -> None: 

557 """Ensure the search-HF CTA row exists iff a search term is active.""" 

558 container = self.query_one("#catalog-list", VerticalScroll) 

559 existing = list(container.query(SearchHFCtaItem)) 

560 for widget in existing: 

561 widget.remove() 

562 if search: 

563 container.mount(SearchHFCtaItem(search)) 

564 

565 def _update_sort_label(self) -> None: 

566 """Update the sort indicator label.""" 

567 direction = "asc" if self._sort_ascending else "desc" 

568 n_total = len(self._rows) 

569 if self._loading_more: 

570 count = f"{n_total} models · loading more…" 

571 elif self._hf_has_more: 

572 count = f"{n_total} models · press [b]n[/b] for more" 

573 else: 

574 count = f"{n_total} models" 

575 hint = msg.CATALOG_SEARCHING_HF if self._search_in_flight else msg.CATALOG_VIEW_TOGGLE_LIST 

576 self.query_one("#sort-label", Static).update( 

577 f"Sort: {self._sort_column} ({direction}) | {count} | {hint}" 

578 ) 

579 

580 def action_cycle_sort(self) -> None: 

581 """Cycle the list-view sort column ascending: Name, Downloads, Size, Params.""" 

582 if isinstance(self.focused, Input): 

583 return 

584 if self._grid_view: 

585 self.notify(msg.CATALOG_SORT_LIST_ONLY) 

586 return 

587 try: 

588 idx = _SORT_CYCLE.index(self._sort_column) 

589 except ValueError: 

590 idx = -1 

591 self._sort_column = _SORT_CYCLE[(idx + 1) % len(_SORT_CYCLE)] 

592 self._sort_ascending = True 

593 self._refresh_list() 

594 # _refresh_list replaces the list children asynchronously via 

595 # mount_all; focusing before the new widgets settle can leave focus 

596 # on the filter Input, which swallows the next `s` press as text 

597 # . Defer the focus move until after Textual's next refresh 

598 # so _list_items() actually returns the new rows. 

599 self.call_after_refresh(self._focus_list_item, 0) 

600 

601 def _select_row(self, row: TableRow) -> None: 

602 """Handle row selection: install or use the model.""" 

603 if row.variant and row.family: 

604 self._install_variant(row.variant, row.family) 

605 elif row.catalog_model: 

606 self._install_model(row.catalog_model) 

607 elif row.remote_model: 

608 ref = ( 

609 f"{OLLAMA_PREFIX}{row.remote_model.name}" 

610 if row.remote_model.provider == OLLAMA_BACKEND_NAME 

611 else row.remote_model.name 

612 ) 

613 apply_active_model(self.app, "chat_model", ref) 

614 self.notify(msg.CATALOG_USING_REMOTE.format(name=row.remote_model.name)) 

615 

616 def _load_more(self) -> None: 

617 """Load next page of HF models, if any remain and no fetch is in flight.""" 

618 if self._loading_more or not self._hf_has_more: 

619 return 

620 self._loading_more = True 

621 self._hf_offset += _HF_PAGE_SIZE 

622 self._fetch_more_hf() 

623 

624 def action_load_more(self) -> None: 

625 """Keyboard trigger (``n``) so users can page without scrolling.""" 

626 self._load_more() 

627 

628 def _install_variant(self, variant: ModelVariant, family: ModelFamily) -> None: 

629 """Convert a variant back to a CatalogModel and trigger install.""" 

630 entry = CatalogModel( 

631 hf_repo=variant.hf_repo, 

632 gguf_filename=variant.filename, 

633 size_gb=variant.size_mb / 1024, 

634 min_ram_gb=max(2.0, (variant.size_mb / 1024) * 1.5), 

635 description=family.description, 

636 featured=True, 

637 downloads=0, 

638 task=family.task, 

639 recommended=variant.recommended, 

640 ) 

641 self._install_model(entry) 

642 

643 def _install_model(self, model: CatalogModel) -> None: 

644 from lilbee.catalog import resolve_filename 

645 

646 try: 

647 filename = resolve_filename(model) 

648 dest = cfg.models_dir / filename 

649 if dest.exists(): 

650 self.notify(msg.CATALOG_ALREADY_INSTALLED.format(name=model.display_name)) 

651 return 

652 except Exception: 

653 log.debug("Could not resolve filename", exc_info=True) 

654 

655 self._enqueue_download(model) 

656 

657 def _enqueue_download(self, model: CatalogModel) -> None: 

658 """Submit the download to the app-level TaskBarController. 

659 

660 The controller owns the worker thread; this screen just fires the 

661 request and returns. Progress is visible from every screen and 

662 survives navigation. 

663 """ 

664 from lilbee.cli.tui.app import LilbeeApp 

665 

666 if not isinstance(self.app, LilbeeApp): # test apps aren't LilbeeApp 

667 self.notify(msg.CATALOG_NO_TASK_BAR, severity="error") 

668 return 

669 self.app.task_bar.start_download(model) 

670 self.notify(msg.CATALOG_QUEUED_DOWNLOAD.format(name=model.display_name)) 

671 

672 def action_go_back(self) -> None: 

673 # First Escape press unfocuses the filter input; without this 

674 # the screen-level `s` / `v` keys get typed into the input and the only 

675 # way to regain screen focus is to leave the screen entirely. 

676 if isinstance(self.focused, Input): 

677 self._focus_list_or_grid() 

678 return 

679 from lilbee.cli.tui.app import LilbeeApp 

680 

681 if isinstance(self.app, LilbeeApp): # test apps aren't LilbeeApp 

682 self.app.switch_view("Chat") 

683 else: 

684 self.app.pop_screen() 

685 

686 def _focus_list_or_grid(self) -> None: 

687 """Move focus from the filter input to the active view's list/grid.""" 

688 if self._grid_view: 

689 self._focus_first_grid() 

690 else: 

691 self._focus_list_item(0) 

692 

693 def action_delete_model(self) -> None: 

694 """Delete an installed model. First press asks confirmation, second confirms.""" 

695 if isinstance(self.focused, Input): 

696 return 

697 model_name = self._get_highlighted_model_name() 

698 if model_name is None: 

699 self.notify(msg.CATALOG_SELECT_TO_DELETE, severity="warning") 

700 return 

701 

702 mgr = get_model_manager() 

703 if not mgr.is_installed(model_name): 

704 self.notify(msg.CATALOG_NOT_INSTALLED.format(name=model_name), severity="warning") 

705 return 

706 

707 if self._pending_delete == model_name: 

708 self._pending_delete = None 

709 self._run_delete(model_name) 

710 else: 

711 self._pending_delete = model_name 

712 self.notify(msg.CATALOG_CONFIRM_DELETE.format(name=model_name)) 

713 

714 def _get_highlighted_model_name(self) -> str | None: 

715 """Return the registry-compatible model ref for the focused/highlighted row.""" 

716 if isinstance(self.focused, ModelListItem): 

717 return self.focused.row.ref or None 

718 focused_grid = self._focused_grid() 

719 if focused_grid is None or focused_grid.highlighted is None: 

720 return None 

721 child = focused_grid.children[focused_grid.highlighted] 

722 if isinstance(child, ModelCard): 

723 return child.row.ref or None 

724 return None 

725 

726 @work(thread=True) 

727 def _run_delete(self, model_name: str) -> None: 

728 """Remove a model in a background thread.""" 

729 from lilbee.cli.tui.thread_safe import call_from_thread 

730 

731 try: 

732 removed = get_model_manager().remove(model_name) 

733 if removed: 

734 call_from_thread(self, self.notify, msg.CATALOG_DELETED.format(name=model_name)) 

735 call_from_thread(self, self._refresh_after_delete) 

736 else: 

737 call_from_thread( 

738 self, 

739 self.notify, 

740 msg.CATALOG_DELETE_FAILED.format(error=model_name), 

741 severity="error", 

742 ) 

743 except Exception as exc: 

744 log.warning("Delete failed for %s", model_name, exc_info=True) 

745 call_from_thread( 

746 self, 

747 self.notify, 

748 msg.CATALOG_DELETE_FAILED.format(error=exc), 

749 severity="error", 

750 ) 

751 

752 def _refresh_after_delete(self) -> None: 

753 """Re-fetch remote models and refresh after deletion.""" 

754 self._fetch_installed_names() 

755 self._refresh_view() 

756 self._fetch_remote_models() 

757 

758 def _focused_grid(self) -> GridSelect | None: 

759 """Return the focused GridSelect (grid view), else None.""" 

760 if self._grid_view and isinstance(self.focused, GridSelect): 

761 return self.focused 

762 return None 

763 

764 def _list_items(self) -> list[ModelListItem]: 

765 """Return all visible list items in the list view.""" 

766 return [item for item in self.query(ModelListItem) if item.display] 

767 

768 def _focus_list_item(self, index: int) -> None: 

769 """Focus the list item at *index*, clamped to the visible range.""" 

770 items = self._list_items() 

771 if not items: 

772 return 

773 clamped = max(0, min(index, len(items) - 1)) 

774 items[clamped].focus() 

775 

776 def _focused_list_index(self) -> int | None: 

777 """Index of the focused ModelListItem among visible list items.""" 

778 if not isinstance(self.focused, ModelListItem): 

779 return None 

780 items = self._list_items() 

781 try: 

782 return items.index(self.focused) 

783 except ValueError: 

784 return None 

785 

786 def _nudge_list(self, delta: int) -> None: 

787 idx = self._focused_list_index() 

788 if idx is None: 

789 self._focus_list_item(0) 

790 return 

791 self._focus_list_item(idx + delta) 

792 self._maybe_prefetch_on_nav() 

793 

794 def _maybe_prefetch_on_nav(self) -> None: 

795 if self._grid_view or not self._hf_has_more or self._loading_more: 

796 return 

797 idx = self._focused_list_index() 

798 if idx is None: 

799 return 

800 if idx >= len(self._list_items()) - _HF_LOAD_MORE_TRIGGER: 

801 self._load_more() 

802 

803 def _page_rows(self) -> int: 

804 """How many cursor steps make up one 'page' in the active view.""" 

805 return _GRID_PAGE_ROWS if self._grid_view else _LIST_PAGE_ROWS 

806 

807 def action_page_down(self) -> None: 

808 if isinstance(self.focused, Input): 

809 return 

810 if self._grid_view: 

811 if (grid := self._focused_grid()) is not None: 

812 for _ in range(self._page_rows()): 

813 grid.action_cursor_down() 

814 else: 

815 self._nudge_list(self._page_rows()) 

816 

817 def action_page_up(self) -> None: 

818 if isinstance(self.focused, Input): 

819 return 

820 if self._grid_view: 

821 if (grid := self._focused_grid()) is not None: 

822 for _ in range(self._page_rows()): 

823 grid.action_cursor_up() 

824 else: 

825 self._nudge_list(-self._page_rows()) 

826 

827 def action_cursor_down(self) -> None: 

828 if isinstance(self.focused, Input): 

829 return 

830 if self._grid_view: 

831 if (grid := self._focused_grid()) is not None: 

832 grid.action_cursor_down() 

833 else: 

834 self._nudge_list(1) 

835 

836 def action_cursor_up(self) -> None: 

837 if isinstance(self.focused, Input): 

838 return 

839 if self._grid_view: 

840 if (grid := self._focused_grid()) is not None: 

841 grid.action_cursor_up() 

842 else: 

843 self._nudge_list(-1) 

844 

845 def action_jump_top(self) -> None: 

846 if isinstance(self.focused, Input): 

847 return 

848 if self._grid_view: 

849 if (grid := self._focused_grid()) is not None: 

850 grid.highlight_first() 

851 else: 

852 self._focus_list_item(0) 

853 

854 def action_jump_bottom(self) -> None: 

855 if isinstance(self.focused, Input): 

856 return 

857 if self._grid_view: 

858 if (grid := self._focused_grid()) is not None: 

859 grid.highlight_last() 

860 else: 

861 items = self._list_items() 

862 if items: 

863 self._focus_list_item(len(items) - 1) 

864 

865 

866@dataclass 

867class GridSection: 

868 """A named group of rows for the grid view.""" 

869 

870 heading: str 

871 rows: list[TableRow] 

872 

873 

874_TASK_BUCKET_ORDER = (ModelTask.CHAT, ModelTask.EMBEDDING, ModelTask.VISION, ModelTask.RERANK) 

875 

876 

877def _group_rows_for_grid(rows: list[TableRow]) -> list[GridSection]: 

878 """Group rows into sections for the grid view.""" 

879 recommended: list[TableRow] = [] 

880 installed: list[TableRow] = [] 

881 by_task: dict[str, list[TableRow]] = {task: [] for task in _TASK_BUCKET_ORDER} 

882 # Display order is fixed by _TASK_BUCKET_ORDER, but any ModelTask value 

883 # not in that tuple still renders in its own section after the known 

884 # ones, so adding a new task variant never silently drops rows. 

885 extras: dict[str, list[TableRow]] = {} 

886 for row in rows: 

887 if row.featured: 

888 recommended.append(row) 

889 continue 

890 if row.installed: 

891 installed.append(row) 

892 continue 

893 bucket = by_task.get(row.task) 

894 if bucket is not None: 

895 bucket.append(row) 

896 else: 

897 extras.setdefault(row.task, []).append(row) 

898 return [ 

899 GridSection(msg.HEADING_OUR_PICKS, recommended), 

900 GridSection(msg.HEADING_INSTALLED, installed), 

901 *[GridSection(task.capitalize(), by_task[task]) for task in _TASK_BUCKET_ORDER], 

902 *[GridSection(task.capitalize(), extras[task]) for task in extras], 

903 ]