Coverage for src / lilbee / cli / tui / screens / catalog.py: 100%
586 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
1"""Catalog screen -- browse and install models via grid or list view."""
3from __future__ import annotations
5import contextlib
6import logging
7from dataclasses import dataclass
8from typing import ClassVar
10from textual import on, work
11from textual.app import ComposeResult
12from textual.binding import Binding, BindingType
13from textual.containers import VerticalScroll
14from textual.events import Click
15from textual.screen import Screen
16from textual.widgets import Input, Static
17from textual.worker import Worker, WorkerState
19from lilbee.catalog import (
20 CatalogModel,
21 ModelFamily,
22 ModelVariant,
23 get_catalog,
24 get_families,
25)
26from lilbee.cli.tui import messages as msg
27from lilbee.cli.tui.app import apply_active_model
28from lilbee.cli.tui.screens.catalog_utils import (
29 SORT_KEYS,
30 TableRow,
31 catalog_to_row,
32 matches_search,
33 remote_to_row,
34 variant_to_row,
35)
36from lilbee.cli.tui.widgets.grid_select import GridSelect
37from lilbee.cli.tui.widgets.model_card import ModelCard
38from lilbee.cli.tui.widgets.model_list_item import ModelListItem
39from lilbee.cli.tui.widgets.nav_aware_input import NavAwareInput
40from lilbee.cli.tui.widgets.search_hf_cta_item import SearchHFCtaItem
41from lilbee.config import cfg
42from lilbee.model_manager import RemoteModel, get_model_manager
43from lilbee.models import ModelTask
44from lilbee.providers.model_ref import OLLAMA_PREFIX
45from lilbee.providers.sdk_backend import OLLAMA_BACKEND_NAME
47log = logging.getLogger(__name__)
49_HF_PAGE_SIZE = 25
50# When the highlighted row is within this many rows of the end we
51# auto-fetch the next page. Small enough that the request is already
52# in flight by the time the user reaches the bottom.
53_HF_LOAD_MORE_TRIGGER = 5
54# Long enough to register; short enough to clear before a warm-cache fetch.
55_NOTIFY_SEARCHING_TIMEOUT_SECONDS = 4
56_ALL_TASKS = tuple(ModelTask)
58_WORKER_FETCH_HF = "fetch_hf_models"
59_WORKER_FETCH_MORE_HF = "fetch_more_hf"
60_WORKER_FETCH_REMOTE = "fetch_remote_models"
61_WORKER_FETCH_SEARCH = "fetch_hf_search"
63_GRID_PAGE_ROWS = 3
64_LIST_PAGE_ROWS = 10
66# Sort columns cycled by the `s` keybinding in list view.
67_SORT_CYCLE: tuple[str, ...] = ("Name", "Downloads", "Size", "Params")
70class CatalogScreen(Screen[None]):
71 """Model catalog with grid (default) and list views."""
73 CSS_PATH = "catalog.tcss"
74 AUTO_FOCUS = "" # GridSelect is mounted dynamically; focused in on_mount
76 HELP = (
77 "# Catalog\n"
78 "Browse and install models.\n\n"
79 "Use arrows to navigate the grid, Enter to install."
80 )
82 _ACTION_GROUP = Binding.Group("Actions", compact=True)
83 _SCROLL_GROUP = Binding.Group("Scroll", compact=True)
85 BINDINGS: ClassVar[list[BindingType]] = [
86 Binding("q", "go_back", "Back", show=True, group=_ACTION_GROUP),
87 Binding("escape", "go_back", "Back", show=True),
88 Binding("v", "toggle_view", "View", show=True, group=_ACTION_GROUP),
89 Binding("slash", "focus_search", "Search", show=True, group=_ACTION_GROUP),
90 Binding("d", "delete_model", "Delete", show=True, group=_ACTION_GROUP),
91 Binding("x", "delete_model", "Delete", show=False),
92 Binding("j", "cursor_down", "Nav", show=False, group=_SCROLL_GROUP),
93 Binding("k", "cursor_up", "Nav", show=False, group=_SCROLL_GROUP),
94 Binding("g", "jump_top", "Top", show=False, group=_SCROLL_GROUP),
95 Binding("G", "jump_bottom", "End", show=False, group=_SCROLL_GROUP),
96 Binding("space", "page_down", "PgDn", show=False, group=_SCROLL_GROUP),
97 Binding("ctrl+d", "page_down", "PgDn", show=False, group=_SCROLL_GROUP),
98 Binding("ctrl+u", "page_up", "PgUp", show=False, group=_SCROLL_GROUP),
99 # Hidden from the footer so catalog still has <=5 visible bindings;
100 # the sort-label surfaces "press n for more" and "press s to sort"
101 # to the user instead.
102 Binding("n", "load_more", "More", show=False, group=_ACTION_GROUP),
103 Binding("s", "cycle_sort", "Sort", show=False, group=_ACTION_GROUP),
104 ]
106 def __init__(self) -> None:
107 super().__init__()
108 self._families: list[ModelFamily] = get_families()
109 self._hf_models: list[CatalogModel] = []
110 self._remote_models: list[RemoteModel] = []
111 self._hf_offset = 0
112 self._hf_has_more = True
113 self._rows: list[TableRow] = []
114 self._sort_column: str = "Name"
115 self._sort_ascending: bool = True
116 self._pending_delete: str | None = None
117 self._installed_names: set[str] = set()
118 self._grid_view: bool = True
119 self._hf_fetched: bool = False
120 self._loading_more: bool = False
121 self._grid_cache_key: tuple[tuple[tuple[str, bool], ...], str] | tuple = ()
122 self._search_in_flight: bool = False
124 def compose(self) -> ComposeResult:
125 from textual.widgets import Footer
127 from lilbee.cli.tui.widgets.bottom_bars import BottomBars
128 from lilbee.cli.tui.widgets.status_bar import ViewTabs
129 from lilbee.cli.tui.widgets.task_bar import TaskBar
130 from lilbee.cli.tui.widgets.top_bars import TopBars
132 with TopBars():
133 yield ViewTabs()
134 yield NavAwareInput(placeholder=msg.CATALOG_FILTER_PLACEHOLDER, id="catalog-search")
135 yield Static("", id="sort-label", shrink=True)
136 yield VerticalScroll(id="catalog-grid")
137 yield VerticalScroll(id="catalog-list")
138 yield Static("", id="model-detail")
139 with BottomBars():
140 yield TaskBar()
141 yield Footer()
143 def on_mount(self) -> None:
144 self._fetch_installed_names()
145 self.add_class("-grid-view")
146 self._refresh_grid()
147 self._focus_first_grid()
148 self._fetch_remote_models()
150 def _focus_first_grid(self) -> None:
151 """Focus the first GridSelect widget if available."""
152 with contextlib.suppress(Exception):
153 self.query_one(GridSelect).focus()
155 def _fetch_installed_names(self) -> None:
156 """Populate installed identities from registry manifests.
158 Stored set contains both the canonical ref (``hf_repo/filename``)
159 and the bare ``hf_repo`` so catalog rows whose ref is the repo
160 alone still light up as installed when at least one quant of
161 that repo has a manifest.
162 """
163 with contextlib.suppress(Exception):
164 from lilbee.registry import ModelRegistry
166 registry = ModelRegistry(cfg.models_dir)
167 self._installed_names = set()
168 for m in registry.list_installed():
169 self._installed_names.add(m.ref)
170 self._installed_names.add(m.hf_repo)
172 def action_toggle_view(self) -> None:
173 """Toggle between grid and list view."""
174 if self._grid_view:
175 self._grid_view = False
176 self.remove_class("-grid-view")
177 self.add_class("-list-view")
178 if not self._hf_fetched:
179 self._hf_fetched = True
180 self._fetch_all_hf_models()
181 self._refresh_list()
182 self._focus_list_item(0)
183 else:
184 self._grid_view = True
185 self.remove_class("-list-view")
186 self.add_class("-grid-view")
187 self._refresh_grid()
188 with contextlib.suppress(Exception):
189 self.query_one("#catalog-grid GridSelect", GridSelect).focus()
191 def action_focus_search(self) -> None:
192 """Focus the filter input -- bound to / key."""
193 self.query_one("#catalog-search", Input).focus()
195 @on(Input.Changed, "#catalog-search")
196 def _on_search_changed(self, event: Input.Changed) -> None:
197 """Filter models when search input changes."""
198 if self._grid_view:
199 self._filter_grid()
200 self._sync_grid_search_cta()
201 else:
202 self._filter_list()
204 @on(Input.Submitted, "#catalog-search")
205 def _on_search_submitted(self, event: Input.Submitted) -> None:
206 """Enter installs the first visible match; falls through to the HF CTA
207 when nothing matches locally so the obvious intent ('search for this')
208 doesn't require the user to Tab over to the CTA row first."""
209 if self._grid_view:
210 if any(card.display for card in self.query(ModelCard)):
211 self._select_first_visible_grid_card()
212 return
213 elif any(item.display for item in self.query(ModelListItem)):
214 self._select_first_visible_list_item()
215 return
216 self._trigger_remote_search(self._get_search_text())
218 def _trigger_remote_search(self, query: str) -> None:
219 """Fire the HF search worker, unless one is already in flight."""
220 if self._search_in_flight or not query:
221 return
222 self._search_in_flight = True
223 self._update_sort_label()
224 # Sort label is hidden in grid view, so the toast is the only feedback there.
225 self.notify(msg.CATALOG_SEARCHING_HF, timeout=_NOTIFY_SEARCHING_TIMEOUT_SECONDS)
226 self._fetch_hf_search(query)
228 @on(SearchHFCtaItem.Selected)
229 def _on_search_hf_cta_selected(self, event: SearchHFCtaItem.Selected) -> None:
230 self._trigger_remote_search(event.term)
232 @on(Click, ".search-hf-cta")
233 def _on_search_hf_cta_clicked(self) -> None:
234 self._trigger_remote_search(self._get_search_text())
236 def _select_first_visible_grid_card(self) -> None:
237 """Focus the first grid with a visible match and trigger its install.
239 Without the "first visible" walk, focusing any grid with
240 ``highlighted = 0`` could land on a card the filter just hid,
241 and Enter would install the wrong model. Setting
242 ``highlighted`` to the first visible index guarantees the
243 install fires on what the user can actually see.
244 """
245 with contextlib.suppress(Exception):
246 for grid in self.query(GridSelect):
247 visible = [i for i, card in enumerate(grid.children) if card.display]
248 if visible:
249 grid.focus()
250 grid.highlighted = visible[0]
251 grid.action_select()
252 return
254 def _select_first_visible_list_item(self) -> None:
255 """List-view counterpart: focus + install the first visible row."""
256 with contextlib.suppress(Exception):
257 for item in self.query(ModelListItem):
258 if item.display:
259 item.focus()
260 item.action_select()
261 return
263 def _fetch_hf_page(self) -> list[CatalogModel]:
264 """Fetch one page of HF models for all task types (runs in worker thread)."""
265 all_models: list[CatalogModel] = []
266 seen_repos: set[str] = set()
267 any_has_more = False
268 for task in _ALL_TASKS:
269 result = get_catalog(
270 task=task,
271 featured=False,
272 limit=_HF_PAGE_SIZE,
273 offset=self._hf_offset,
274 )
275 if result.has_more:
276 any_has_more = True
277 for m in result.models:
278 if not m.featured and m.hf_repo not in seen_repos:
279 seen_repos.add(m.hf_repo)
280 all_models.append(m)
281 self._hf_has_more = any_has_more
282 return all_models
284 @work(thread=True, name=_WORKER_FETCH_HF)
285 def _fetch_all_hf_models(self) -> list[CatalogModel]:
286 """Fetch HF models for all task types (replaces current list)."""
287 return self._fetch_hf_page()
289 @work(thread=True, name=_WORKER_FETCH_REMOTE)
290 def _fetch_remote_models(self) -> list[RemoteModel]:
291 from lilbee.model_manager import classify_remote_models
293 return classify_remote_models(cfg.remote_base_url)
295 @work(thread=True, name=_WORKER_FETCH_MORE_HF)
296 def _fetch_more_hf(self) -> list[CatalogModel]:
297 """Fetch next page of HF models for all task types (extends current list)."""
298 return self._fetch_hf_page()
300 @work(thread=True, name=_WORKER_FETCH_SEARCH, exit_on_error=False)
301 def _fetch_hf_search(self, query: str) -> list[CatalogModel]:
302 """Fetch HF models matching the user's search term (runs in worker thread)."""
303 existing_repos = {m.hf_repo for m in self._hf_models}
304 new_models: list[CatalogModel] = []
305 for task in _ALL_TASKS:
306 result = get_catalog(
307 task=task,
308 featured=False,
309 search=query,
310 limit=_HF_PAGE_SIZE,
311 offset=0,
312 )
313 for m in result.models:
314 if not m.featured and m.hf_repo not in existing_repos:
315 existing_repos.add(m.hf_repo)
316 new_models.append(m)
317 return new_models
319 def on_worker_state_changed(self, event: Worker.StateChanged) -> None:
320 # PENDING/RUNNING fire here too; only ERROR/CANCELLED should release latches.
321 if event.state in (WorkerState.ERROR, WorkerState.CANCELLED):
322 if event.worker.name == _WORKER_FETCH_MORE_HF:
323 self._loading_more = False
324 if event.worker.name == _WORKER_FETCH_SEARCH:
325 self._search_in_flight = False
326 self._update_sort_label()
327 return
328 if event.state != WorkerState.SUCCESS:
329 return
330 result = event.worker.result
331 if not isinstance(result, list):
332 return
333 name = event.worker.name
334 if name == _WORKER_FETCH_HF:
335 self._hf_models = result
336 elif name == _WORKER_FETCH_MORE_HF:
337 self._hf_models.extend(result)
338 self._loading_more = False
339 elif name == _WORKER_FETCH_SEARCH:
340 self._hf_fetched = True
341 self._hf_models.extend(result)
342 self._search_in_flight = False
343 self._update_sort_label()
344 elif name == _WORKER_FETCH_REMOTE:
345 self._remote_models = result
346 else:
347 return
348 self._refresh_view()
350 def _get_search_text(self) -> str:
351 # Preserve the user's casing for display (e.g. the CTA label); matching
352 # callers normalize via _normalize_for_search.
353 return self.query_one("#catalog-search", Input).value.strip()
355 def _build_rows(self) -> list[TableRow]:
356 """Build all table rows from current data sources."""
357 search = self._get_search_text()
358 rows: list[TableRow] = []
359 rows.extend(self._build_family_rows(search))
360 rows.extend(self._build_hf_rows(search))
361 rows.extend(self._build_remote_rows(search))
362 return rows
364 def _build_family_rows(self, search: str) -> list[TableRow]:
365 """Build rows from featured model families."""
366 rows: list[TableRow] = []
367 for fam in self._families:
368 for v in fam.variants:
369 installed = self._is_installed(v.hf_repo, repo=v.hf_repo, filename=v.filename)
370 row = variant_to_row(v, fam, installed)
371 if matches_search(row, search):
372 rows.append(row)
373 return rows
375 def _build_hf_rows(self, search: str) -> list[TableRow]:
376 """Build rows from HuggingFace models."""
377 rows: list[TableRow] = []
378 for m in self._hf_models:
379 installed = self._is_installed(m.ref, repo=m.hf_repo, filename=m.gguf_filename)
380 row = catalog_to_row(m, installed)
381 if matches_search(row, search):
382 rows.append(row)
383 return rows
385 def _build_remote_rows(self, search: str) -> list[TableRow]:
386 """Build rows from remote (inference-only) models."""
387 rows: list[TableRow] = []
388 for rm in self._remote_models:
389 row = remote_to_row(rm)
390 if matches_search(row, search):
391 rows.append(row)
392 return rows
394 def _is_installed(self, name: str, repo: str = "", filename: str = "") -> bool:
395 """Check if a model is installed by name or source repo/filename."""
396 if name in self._installed_names:
397 return True
398 if repo and filename:
399 return f"{repo}/{filename}" in self._installed_names
400 return False
402 def _sort_rows(self, rows: list[TableRow]) -> list[TableRow]:
403 """Sort rows: featured first, then by current sort column."""
404 key_fn = SORT_KEYS.get(self._sort_column, SORT_KEYS["Name"])
405 # Stable sort: featured always first, then by column
406 return sorted(
407 rows,
408 key=lambda r: (not r.featured, key_fn(r)),
409 reverse=not self._sort_ascending,
410 )
412 def _refresh_view(self) -> None:
413 """Refresh the active view (grid or list)."""
414 if self._grid_view:
415 self._refresh_grid()
416 else:
417 self._refresh_list()
419 def _refresh_grid(self) -> None:
420 """Rebuild the grid view with all cards (called when data changes)."""
421 family_rows = self._build_family_rows("")
422 remote_rows = self._build_remote_rows("")
423 hf_rows = self._build_hf_rows("") if self._hf_fetched else []
424 all_rows = family_rows + remote_rows + hf_rows
425 # Include the full search text so toggle-back + value-change combinations
426 # rebuild the grid (and therefore the CTA) with the current query.
427 row_key = (
428 tuple((r.name, r.installed) for r in all_rows),
429 self._get_search_text(),
430 )
431 if self._grid_cache_key == row_key:
432 return
433 self._grid_cache_key = row_key
434 container = self.query_one("#catalog-grid", VerticalScroll)
435 container.remove_children()
436 widgets_to_mount: list[Static | GridSelect] = []
437 for section in _group_rows_for_grid(all_rows):
438 if not section.rows:
439 continue
440 widgets_to_mount.append(Static(section.heading, classes="section-heading"))
441 cards = [ModelCard(row) for row in section.rows]
442 grid = GridSelect(*cards, min_column_width=30, max_column_width=50)
443 widgets_to_mount.append(grid)
444 if not self._hf_fetched:
445 widgets_to_mount.append(
446 Static(
447 msg.CATALOG_BROWSE_MORE,
448 classes="grid-cta browse-more-hf",
449 )
450 )
451 search = self._get_search_text()
452 if search:
453 widgets_to_mount.append(
454 Static(
455 msg.CATALOG_SEARCH_HF_CTA.format(query=search),
456 classes="grid-cta search-hf-cta",
457 )
458 )
459 widgets_to_mount.append(
460 Static(
461 msg.CATALOG_VIEW_TOGGLE_GRID,
462 classes="grid-cta",
463 )
464 )
465 container.mount_all(widgets_to_mount)
467 def _sync_grid_search_cta(self) -> None:
468 """Mount/remove/update the grid-view search-HF CTA in response to typing."""
469 search = self._get_search_text()
470 existing = self.query("#catalog-grid > .search-hf-cta")
471 if not search:
472 for w in existing:
473 w.remove()
474 return
475 cta_text = msg.CATALOG_SEARCH_HF_CTA.format(query=search)
476 if existing:
477 for w in existing:
478 if isinstance(w, Static):
479 w.update(cta_text)
480 return
481 container = self.query_one("#catalog-grid", VerticalScroll)
482 container.mount(Static(cta_text, classes="grid-cta search-hf-cta"))
484 def _filter_grid(self) -> None:
485 """Filter visible cards by search text without recreating widgets."""
486 search = self._get_search_text()
487 for card in self.query(ModelCard):
488 card.display = matches_search(card.row, search)
489 container = self.query_one("#catalog-grid", VerticalScroll)
490 children = list(container.children)
491 for i, child in enumerate(children):
492 if not child.has_class("section-heading"):
493 continue
494 grid = children[i + 1] if i + 1 < len(children) else None
495 if isinstance(grid, GridSelect):
496 has_visible = any(c.display for c in grid.children)
497 child.display = has_visible
498 grid.display = has_visible
500 @on(Click, ".browse-more-hf")
501 def _on_browse_more_clicked(self) -> None:
502 """Fetch all models when the browse-more card is clicked."""
503 if not self._hf_fetched:
504 self._hf_fetched = True
505 self._fetch_all_hf_models()
507 @on(GridSelect.LeaveDown)
508 def _on_grid_leave_down(self, event: GridSelect.LeaveDown) -> None:
509 """Move focus to the next GridSelect or focusable widget."""
510 self.focus_next()
512 @on(GridSelect.LeaveUp)
513 def _on_grid_leave_up(self, event: GridSelect.LeaveUp) -> None:
514 """Move focus to the previous GridSelect or focusable widget."""
515 self.focus_previous()
517 @on(GridSelect.Selected)
518 def _on_grid_selected(self, event: GridSelect.Selected) -> None:
519 """Handle model selection from the grid view."""
520 if isinstance(event.widget, ModelCard):
521 self._select_row(event.widget.row)
523 @on(ModelListItem.Selected)
524 def _on_list_item_selected(self, event: ModelListItem.Selected) -> None:
525 """Handle model selection from the list view."""
526 self._select_row(event.item.row)
528 def _refresh_list(self) -> None:
529 """Rebuild the list view from current data; append HF search CTA when filtering."""
530 self._rows = self._sort_rows(self._build_rows())
531 container = self.query_one("#catalog-list", VerticalScroll)
532 container.remove_children()
533 widgets_to_mount: list[ModelListItem | SearchHFCtaItem] = [
534 ModelListItem(row) for row in self._rows
535 ]
536 search = self._get_search_text()
537 if search:
538 widgets_to_mount.append(SearchHFCtaItem(search))
539 if widgets_to_mount:
540 container.mount_all(widgets_to_mount)
541 self._update_sort_label()
543 def _filter_list(self) -> None:
544 """Filter visible list items by search without rebuilding the list.
546 Per-keystroke path: toggles .display on existing ModelListItems
547 and mounts/removes the HF CTA row as needed. Only _refresh_list
548 (data change, sort change) remounts.
549 """
550 search = self._get_search_text()
551 for item in self.query(ModelListItem):
552 item.display = matches_search(item.row, search)
553 self._sync_list_search_cta(search)
554 self._update_sort_label()
556 def _sync_list_search_cta(self, search: str) -> None:
557 """Ensure the search-HF CTA row exists iff a search term is active."""
558 container = self.query_one("#catalog-list", VerticalScroll)
559 existing = list(container.query(SearchHFCtaItem))
560 for widget in existing:
561 widget.remove()
562 if search:
563 container.mount(SearchHFCtaItem(search))
565 def _update_sort_label(self) -> None:
566 """Update the sort indicator label."""
567 direction = "asc" if self._sort_ascending else "desc"
568 n_total = len(self._rows)
569 if self._loading_more:
570 count = f"{n_total} models · loading more…"
571 elif self._hf_has_more:
572 count = f"{n_total} models · press [b]n[/b] for more"
573 else:
574 count = f"{n_total} models"
575 hint = msg.CATALOG_SEARCHING_HF if self._search_in_flight else msg.CATALOG_VIEW_TOGGLE_LIST
576 self.query_one("#sort-label", Static).update(
577 f"Sort: {self._sort_column} ({direction}) | {count} | {hint}"
578 )
580 def action_cycle_sort(self) -> None:
581 """Cycle the list-view sort column ascending: Name, Downloads, Size, Params."""
582 if isinstance(self.focused, Input):
583 return
584 if self._grid_view:
585 self.notify(msg.CATALOG_SORT_LIST_ONLY)
586 return
587 try:
588 idx = _SORT_CYCLE.index(self._sort_column)
589 except ValueError:
590 idx = -1
591 self._sort_column = _SORT_CYCLE[(idx + 1) % len(_SORT_CYCLE)]
592 self._sort_ascending = True
593 self._refresh_list()
594 # _refresh_list replaces the list children asynchronously via
595 # mount_all; focusing before the new widgets settle can leave focus
596 # on the filter Input, which swallows the next `s` press as text
597 # . Defer the focus move until after Textual's next refresh
598 # so _list_items() actually returns the new rows.
599 self.call_after_refresh(self._focus_list_item, 0)
601 def _select_row(self, row: TableRow) -> None:
602 """Handle row selection: install or use the model."""
603 if row.variant and row.family:
604 self._install_variant(row.variant, row.family)
605 elif row.catalog_model:
606 self._install_model(row.catalog_model)
607 elif row.remote_model:
608 ref = (
609 f"{OLLAMA_PREFIX}{row.remote_model.name}"
610 if row.remote_model.provider == OLLAMA_BACKEND_NAME
611 else row.remote_model.name
612 )
613 apply_active_model(self.app, "chat_model", ref)
614 self.notify(msg.CATALOG_USING_REMOTE.format(name=row.remote_model.name))
616 def _load_more(self) -> None:
617 """Load next page of HF models, if any remain and no fetch is in flight."""
618 if self._loading_more or not self._hf_has_more:
619 return
620 self._loading_more = True
621 self._hf_offset += _HF_PAGE_SIZE
622 self._fetch_more_hf()
624 def action_load_more(self) -> None:
625 """Keyboard trigger (``n``) so users can page without scrolling."""
626 self._load_more()
628 def _install_variant(self, variant: ModelVariant, family: ModelFamily) -> None:
629 """Convert a variant back to a CatalogModel and trigger install."""
630 entry = CatalogModel(
631 hf_repo=variant.hf_repo,
632 gguf_filename=variant.filename,
633 size_gb=variant.size_mb / 1024,
634 min_ram_gb=max(2.0, (variant.size_mb / 1024) * 1.5),
635 description=family.description,
636 featured=True,
637 downloads=0,
638 task=family.task,
639 recommended=variant.recommended,
640 )
641 self._install_model(entry)
643 def _install_model(self, model: CatalogModel) -> None:
644 from lilbee.catalog import resolve_filename
646 try:
647 filename = resolve_filename(model)
648 dest = cfg.models_dir / filename
649 if dest.exists():
650 self.notify(msg.CATALOG_ALREADY_INSTALLED.format(name=model.display_name))
651 return
652 except Exception:
653 log.debug("Could not resolve filename", exc_info=True)
655 self._enqueue_download(model)
657 def _enqueue_download(self, model: CatalogModel) -> None:
658 """Submit the download to the app-level TaskBarController.
660 The controller owns the worker thread; this screen just fires the
661 request and returns. Progress is visible from every screen and
662 survives navigation.
663 """
664 from lilbee.cli.tui.app import LilbeeApp
666 if not isinstance(self.app, LilbeeApp): # test apps aren't LilbeeApp
667 self.notify(msg.CATALOG_NO_TASK_BAR, severity="error")
668 return
669 self.app.task_bar.start_download(model)
670 self.notify(msg.CATALOG_QUEUED_DOWNLOAD.format(name=model.display_name))
672 def action_go_back(self) -> None:
673 # First Escape press unfocuses the filter input; without this
674 # the screen-level `s` / `v` keys get typed into the input and the only
675 # way to regain screen focus is to leave the screen entirely.
676 if isinstance(self.focused, Input):
677 self._focus_list_or_grid()
678 return
679 from lilbee.cli.tui.app import LilbeeApp
681 if isinstance(self.app, LilbeeApp): # test apps aren't LilbeeApp
682 self.app.switch_view("Chat")
683 else:
684 self.app.pop_screen()
686 def _focus_list_or_grid(self) -> None:
687 """Move focus from the filter input to the active view's list/grid."""
688 if self._grid_view:
689 self._focus_first_grid()
690 else:
691 self._focus_list_item(0)
693 def action_delete_model(self) -> None:
694 """Delete an installed model. First press asks confirmation, second confirms."""
695 if isinstance(self.focused, Input):
696 return
697 model_name = self._get_highlighted_model_name()
698 if model_name is None:
699 self.notify(msg.CATALOG_SELECT_TO_DELETE, severity="warning")
700 return
702 mgr = get_model_manager()
703 if not mgr.is_installed(model_name):
704 self.notify(msg.CATALOG_NOT_INSTALLED.format(name=model_name), severity="warning")
705 return
707 if self._pending_delete == model_name:
708 self._pending_delete = None
709 self._run_delete(model_name)
710 else:
711 self._pending_delete = model_name
712 self.notify(msg.CATALOG_CONFIRM_DELETE.format(name=model_name))
714 def _get_highlighted_model_name(self) -> str | None:
715 """Return the registry-compatible model ref for the focused/highlighted row."""
716 if isinstance(self.focused, ModelListItem):
717 return self.focused.row.ref or None
718 focused_grid = self._focused_grid()
719 if focused_grid is None or focused_grid.highlighted is None:
720 return None
721 child = focused_grid.children[focused_grid.highlighted]
722 if isinstance(child, ModelCard):
723 return child.row.ref or None
724 return None
726 @work(thread=True)
727 def _run_delete(self, model_name: str) -> None:
728 """Remove a model in a background thread."""
729 from lilbee.cli.tui.thread_safe import call_from_thread
731 try:
732 removed = get_model_manager().remove(model_name)
733 if removed:
734 call_from_thread(self, self.notify, msg.CATALOG_DELETED.format(name=model_name))
735 call_from_thread(self, self._refresh_after_delete)
736 else:
737 call_from_thread(
738 self,
739 self.notify,
740 msg.CATALOG_DELETE_FAILED.format(error=model_name),
741 severity="error",
742 )
743 except Exception as exc:
744 log.warning("Delete failed for %s", model_name, exc_info=True)
745 call_from_thread(
746 self,
747 self.notify,
748 msg.CATALOG_DELETE_FAILED.format(error=exc),
749 severity="error",
750 )
752 def _refresh_after_delete(self) -> None:
753 """Re-fetch remote models and refresh after deletion."""
754 self._fetch_installed_names()
755 self._refresh_view()
756 self._fetch_remote_models()
758 def _focused_grid(self) -> GridSelect | None:
759 """Return the focused GridSelect (grid view), else None."""
760 if self._grid_view and isinstance(self.focused, GridSelect):
761 return self.focused
762 return None
764 def _list_items(self) -> list[ModelListItem]:
765 """Return all visible list items in the list view."""
766 return [item for item in self.query(ModelListItem) if item.display]
768 def _focus_list_item(self, index: int) -> None:
769 """Focus the list item at *index*, clamped to the visible range."""
770 items = self._list_items()
771 if not items:
772 return
773 clamped = max(0, min(index, len(items) - 1))
774 items[clamped].focus()
776 def _focused_list_index(self) -> int | None:
777 """Index of the focused ModelListItem among visible list items."""
778 if not isinstance(self.focused, ModelListItem):
779 return None
780 items = self._list_items()
781 try:
782 return items.index(self.focused)
783 except ValueError:
784 return None
786 def _nudge_list(self, delta: int) -> None:
787 idx = self._focused_list_index()
788 if idx is None:
789 self._focus_list_item(0)
790 return
791 self._focus_list_item(idx + delta)
792 self._maybe_prefetch_on_nav()
794 def _maybe_prefetch_on_nav(self) -> None:
795 if self._grid_view or not self._hf_has_more or self._loading_more:
796 return
797 idx = self._focused_list_index()
798 if idx is None:
799 return
800 if idx >= len(self._list_items()) - _HF_LOAD_MORE_TRIGGER:
801 self._load_more()
803 def _page_rows(self) -> int:
804 """How many cursor steps make up one 'page' in the active view."""
805 return _GRID_PAGE_ROWS if self._grid_view else _LIST_PAGE_ROWS
807 def action_page_down(self) -> None:
808 if isinstance(self.focused, Input):
809 return
810 if self._grid_view:
811 if (grid := self._focused_grid()) is not None:
812 for _ in range(self._page_rows()):
813 grid.action_cursor_down()
814 else:
815 self._nudge_list(self._page_rows())
817 def action_page_up(self) -> None:
818 if isinstance(self.focused, Input):
819 return
820 if self._grid_view:
821 if (grid := self._focused_grid()) is not None:
822 for _ in range(self._page_rows()):
823 grid.action_cursor_up()
824 else:
825 self._nudge_list(-self._page_rows())
827 def action_cursor_down(self) -> None:
828 if isinstance(self.focused, Input):
829 return
830 if self._grid_view:
831 if (grid := self._focused_grid()) is not None:
832 grid.action_cursor_down()
833 else:
834 self._nudge_list(1)
836 def action_cursor_up(self) -> None:
837 if isinstance(self.focused, Input):
838 return
839 if self._grid_view:
840 if (grid := self._focused_grid()) is not None:
841 grid.action_cursor_up()
842 else:
843 self._nudge_list(-1)
845 def action_jump_top(self) -> None:
846 if isinstance(self.focused, Input):
847 return
848 if self._grid_view:
849 if (grid := self._focused_grid()) is not None:
850 grid.highlight_first()
851 else:
852 self._focus_list_item(0)
854 def action_jump_bottom(self) -> None:
855 if isinstance(self.focused, Input):
856 return
857 if self._grid_view:
858 if (grid := self._focused_grid()) is not None:
859 grid.highlight_last()
860 else:
861 items = self._list_items()
862 if items:
863 self._focus_list_item(len(items) - 1)
866@dataclass
867class GridSection:
868 """A named group of rows for the grid view."""
870 heading: str
871 rows: list[TableRow]
874_TASK_BUCKET_ORDER = (ModelTask.CHAT, ModelTask.EMBEDDING, ModelTask.VISION, ModelTask.RERANK)
877def _group_rows_for_grid(rows: list[TableRow]) -> list[GridSection]:
878 """Group rows into sections for the grid view."""
879 recommended: list[TableRow] = []
880 installed: list[TableRow] = []
881 by_task: dict[str, list[TableRow]] = {task: [] for task in _TASK_BUCKET_ORDER}
882 # Display order is fixed by _TASK_BUCKET_ORDER, but any ModelTask value
883 # not in that tuple still renders in its own section after the known
884 # ones, so adding a new task variant never silently drops rows.
885 extras: dict[str, list[TableRow]] = {}
886 for row in rows:
887 if row.featured:
888 recommended.append(row)
889 continue
890 if row.installed:
891 installed.append(row)
892 continue
893 bucket = by_task.get(row.task)
894 if bucket is not None:
895 bucket.append(row)
896 else:
897 extras.setdefault(row.task, []).append(row)
898 return [
899 GridSection(msg.HEADING_OUR_PICKS, recommended),
900 GridSection(msg.HEADING_INSTALLED, installed),
901 *[GridSection(task.capitalize(), by_task[task]) for task in _TASK_BUCKET_ORDER],
902 *[GridSection(task.capitalize(), extras[task]) for task in extras],
903 ]