Coverage for src / lilbee / models.py: 100%
168 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
1"""RAM detection, model selection, interactive picker, and auto-install for chat models."""
3import functools
4import logging
5import os
6import shutil
7import sys
8from dataclasses import dataclass
9from enum import StrEnum
10from pathlib import Path
12from rich.console import Console
13from rich.progress import BarColumn, DownloadColumn, Progress, SpinnerColumn, TextColumn
14from rich.table import Table
16from lilbee import settings
18# circular: config -> models via ModelTask. cfg is imported lazily.
21class ModelTask(StrEnum):
22 """Task classification for models."""
24 CHAT = "chat"
25 EMBEDDING = "embedding"
26 VISION = "vision"
27 RERANK = "rerank"
30log = logging.getLogger(__name__)
32FEATURED_STAR = "★"
34# Extra headroom required beyond model size (GB)
35_DISK_HEADROOM_GB = 2
37MODELS_BROWSE_URL = "https://huggingface.co/models?library=gguf&sort=trending"
40@dataclass(frozen=True)
41class ModelInfo:
42 """A curated chat model with metadata for the picker UI."""
44 ref: str # canonical HF ref (e.g. "Qwen/Qwen3-0.6B-GGUF")
45 display_name: str # UI label (e.g. "Qwen3 0.6B")
46 size_gb: float
47 min_ram_gb: float
48 description: str
51def _catalog_from_featured(featured: tuple) -> tuple[ModelInfo, ...]:
52 """Build a ModelInfo tuple from catalog.py's CatalogModel entries."""
53 return tuple(
54 ModelInfo(m.ref, m.display_name, m.size_gb, m.min_ram_gb, m.description) for m in featured
55 )
58# Lazy singletons — resolved on first access to break the circular import
59# between models.py (imports ModelTask) and catalog.py (imports from models).
62@functools.cache
63def _get_model_catalog() -> tuple[ModelInfo, ...]:
64 from lilbee.catalog import FEATURED_CHAT
66 return _catalog_from_featured(FEATURED_CHAT)
69def __getattr__(name: str) -> tuple[ModelInfo, ...]:
70 if name == "MODEL_CATALOG":
71 return _get_model_catalog()
72 raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
75def get_system_ram_gb() -> float:
76 """Return total system RAM in GB. Falls back to 8.0 if detection fails."""
77 try:
78 if sys.platform == "win32":
79 import ctypes
81 class _MEMORYSTATUSEX(ctypes.Structure):
82 _fields_ = [
83 ("dwLength", ctypes.c_ulong),
84 ("dwMemoryLoad", ctypes.c_ulong),
85 ("ullTotalPhys", ctypes.c_ulonglong),
86 ("ullAvailPhys", ctypes.c_ulonglong),
87 ("ullTotalPageFile", ctypes.c_ulonglong),
88 ("ullAvailPageFile", ctypes.c_ulonglong),
89 ("ullTotalVirtual", ctypes.c_ulonglong),
90 ("ullAvailVirtual", ctypes.c_ulonglong),
91 ("ullAvailExtendedVirtual", ctypes.c_ulonglong),
92 ]
94 stat = _MEMORYSTATUSEX()
95 stat.dwLength = ctypes.sizeof(stat)
96 ctypes.windll.kernel32.GlobalMemoryStatusEx(ctypes.byref(stat)) # type: ignore[attr-defined]
97 return stat.ullTotalPhys / (1024**3)
98 pages = os.sysconf("SC_PHYS_PAGES")
99 page_size = os.sysconf("SC_PAGE_SIZE")
100 return (pages * page_size) / (1024**3)
101 except (OSError, AttributeError, ValueError):
102 log.debug("RAM detection failed, falling back to 8.0 GB")
103 return 8.0
106def get_free_disk_gb(path: Path) -> float:
107 """Return free disk space in GB for the filesystem containing *path*."""
108 check_path = path if path.exists() else path.parent
109 while not check_path.exists():
110 check_path = check_path.parent
111 usage = shutil.disk_usage(check_path)
112 return usage.free / (1024**3)
115def pick_default_model(ram_gb: float) -> ModelInfo:
116 """Choose the largest catalog model that fits in *ram_gb*."""
117 best = _get_model_catalog()[0]
118 for model in _get_model_catalog():
119 if model.min_ram_gb <= ram_gb:
120 best = model
121 return best
124def _model_download_size_gb(model: str) -> float:
125 """Estimated download size in GiB for an HF model ref."""
126 catalog_sizes = {m.ref: m.size_gb for m in _get_model_catalog()}
127 fallback = 5.0 # reasonable default for unknown models
128 return catalog_sizes.get(model, fallback)
131def display_model_picker(
132 ram_gb: float, free_disk_gb: float, *, console: Console | None = None
133) -> ModelInfo:
134 """Show a Rich table of catalog models and return the recommended model."""
135 console = console or Console(stderr=True)
136 recommended = pick_default_model(ram_gb)
138 table = Table(title="Available Models", show_lines=False)
139 table.add_column("#", justify="right", style="bold")
140 table.add_column("Model", style="cyan")
141 table.add_column("Size", justify="right")
142 table.add_column("Description")
144 for idx, model in enumerate(_get_model_catalog(), 1):
145 num_str = str(idx)
146 label = model.display_name
147 size_str = f"{model.size_gb:.1f} GB"
148 desc = model.description
150 is_recommended = model == recommended
151 disk_too_small = free_disk_gb < model.size_gb + _DISK_HEADROOM_GB
153 if is_recommended:
154 label = f"[bold]{label} ★[/bold]"
155 desc = f"[bold]{desc}[/bold]"
156 num_str = f"[bold]{num_str}[/bold]"
158 if disk_too_small:
159 size_str = f"[red]{model.size_gb:.1f} GB[/red]"
161 table.add_row(num_str, label, size_str, desc)
163 console.print()
164 console.print("[bold]No chat model found.[/bold] Pick one to download:\n")
165 console.print(table)
166 console.print(f"\n System: {ram_gb:.0f} GB RAM, {free_disk_gb:.1f} GB free disk")
167 console.print(f" {FEATURED_STAR} = recommended for your system")
168 console.print(f" Browse more models at {MODELS_BROWSE_URL}\n")
170 return recommended
173def prompt_model_choice(ram_gb: float) -> ModelInfo:
174 """Prompt the user to pick a model by number. Returns the chosen ModelInfo."""
175 from lilbee.config import cfg
177 free_disk_gb = get_free_disk_gb(cfg.data_dir)
178 recommended = display_model_picker(ram_gb, free_disk_gb)
179 default_idx = list(_get_model_catalog()).index(recommended) + 1
181 while True:
182 try:
183 raw = input(f"Choice [{default_idx}]: ").strip()
184 except (EOFError, KeyboardInterrupt):
185 return recommended
187 if not raw:
188 return recommended
190 try:
191 choice = int(raw)
192 except ValueError:
193 sys.stderr.write(f"Enter a number 1-{len(_get_model_catalog())}.\n")
194 continue
196 if 1 <= choice <= len(_get_model_catalog()):
197 return _get_model_catalog()[choice - 1]
199 sys.stderr.write(f"Enter a number 1-{len(_get_model_catalog())}.\n")
202def validate_disk_and_pull(
203 model_info: ModelInfo, free_gb: float, *, console: Console | None = None
204) -> None:
205 """Check disk space, pull the model, and persist the choice."""
206 from lilbee.config import cfg
208 required_gb = model_info.size_gb + _DISK_HEADROOM_GB
209 if free_gb < required_gb:
210 raise RuntimeError(
211 f"Not enough disk space to download '{model_info.display_name}': "
212 f"need {required_gb:.1f} GB, have {free_gb:.1f} GB free. "
213 f"Free up space or choose a smaller model."
214 )
216 pull_with_progress(model_info.ref, console=console)
217 cfg.chat_model = model_info.ref
218 settings.set_value(cfg.data_root, "chat_model", model_info.ref)
221def pull_with_progress(model: str, *, console: Console | None = None) -> None:
222 """Pull a model via model_manager, showing a Rich progress bar."""
223 from lilbee.model_manager import ModelSource, get_model_manager
225 if console is None:
226 console = Console(file=sys.__stderr__ or sys.stderr)
227 manager = get_model_manager()
228 with Progress(
229 SpinnerColumn(),
230 TextColumn("{task.description}"),
231 BarColumn(),
232 DownloadColumn(),
233 TextColumn("{task.percentage:>3.0f}%"),
234 transient=True,
235 console=console,
236 ) as progress:
237 desc = f"Downloading model '{model}'..."
238 ptask = progress.add_task(desc, total=None)
240 def _on_bytes(downloaded: int, total: int) -> None:
241 if total > 0:
242 progress.update(ptask, total=total, completed=downloaded)
244 manager.pull(model, ModelSource.NATIVE, on_bytes=_on_bytes)
245 console.print(f"Model '{model}' ready.")
248def ensure_chat_model() -> None:
249 """If no chat models are installed, pick and pull one.
250 Interactive (TTY): show catalog picker with descriptions and sizes.
251 Non-interactive (CI/pipes): auto-pick recommended model silently.
252 Persists the chosen model in config.toml so it becomes the default.
253 """
254 from lilbee.config import cfg
255 from lilbee.model_manager import get_model_manager
257 manager = get_model_manager()
258 try:
259 installed = manager.list_installed()
260 except RuntimeError as exc:
261 raise RuntimeError(f"Cannot list models: {exc}") from exc
263 # Filter out the configured embedding model so we only check for chat
264 # candidates. The embedding ref points at one specific manifest; we
265 # match it exactly rather than by family stem.
266 embed_ref = cfg.embedding_model
267 chat_models = [m for m in installed if m != embed_ref]
268 if chat_models:
269 return
271 ram_gb = get_system_ram_gb()
272 free_gb = get_free_disk_gb(cfg.data_dir)
274 if sys.stdin.isatty():
275 model_info = prompt_model_choice(ram_gb)
276 else:
277 model_info = pick_default_model(ram_gb)
278 sys.stderr.write(
279 f"No chat model found. Auto-installing '{model_info.display_name}' "
280 f"(detected {ram_gb:.0f} GB RAM)...\n"
281 )
283 validate_disk_and_pull(model_info, free_gb)
286def list_installed_models() -> list[str]:
287 """Return installed chat-task model names.
289 Sources both the native registry (manifest ``task`` field) and the
290 SDK backend catalog (classified by name/family). Non-chat roles
291 (embedding, vision, rerank) are excluded so TUI pickers don't offer
292 refs that fail pydantic task validation at assignment time.
293 """
294 from lilbee.config import cfg
295 from lilbee.model_manager import classify_remote_models
296 from lilbee.registry import ModelRegistry
298 try:
299 names: list[str] = []
300 registry = ModelRegistry(cfg.models_dir)
301 for manifest in registry.list_installed():
302 if manifest.task == ModelTask.CHAT:
303 names.append(manifest.ref)
304 for remote in classify_remote_models(cfg.remote_base_url):
305 if remote.task == ModelTask.CHAT:
306 names.append(remote.name)
307 return sorted(set(names))
308 except Exception:
309 log.debug("Failed to list installed models", exc_info=True)
310 return []