Coverage for src / lilbee / models.py: 100%

168 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-29 19:16 +0000

1"""RAM detection, model selection, interactive picker, and auto-install for chat models.""" 

2 

3import functools 

4import logging 

5import os 

6import shutil 

7import sys 

8from dataclasses import dataclass 

9from enum import StrEnum 

10from pathlib import Path 

11 

12from rich.console import Console 

13from rich.progress import BarColumn, DownloadColumn, Progress, SpinnerColumn, TextColumn 

14from rich.table import Table 

15 

16from lilbee import settings 

17 

18# circular: config -> models via ModelTask. cfg is imported lazily. 

19 

20 

21class ModelTask(StrEnum): 

22 """Task classification for models.""" 

23 

24 CHAT = "chat" 

25 EMBEDDING = "embedding" 

26 VISION = "vision" 

27 RERANK = "rerank" 

28 

29 

30log = logging.getLogger(__name__) 

31 

32FEATURED_STAR = "★" 

33 

34# Extra headroom required beyond model size (GB) 

35_DISK_HEADROOM_GB = 2 

36 

37MODELS_BROWSE_URL = "https://huggingface.co/models?library=gguf&sort=trending" 

38 

39 

40@dataclass(frozen=True) 

41class ModelInfo: 

42 """A curated chat model with metadata for the picker UI.""" 

43 

44 ref: str # canonical HF ref (e.g. "Qwen/Qwen3-0.6B-GGUF") 

45 display_name: str # UI label (e.g. "Qwen3 0.6B") 

46 size_gb: float 

47 min_ram_gb: float 

48 description: str 

49 

50 

51def _catalog_from_featured(featured: tuple) -> tuple[ModelInfo, ...]: 

52 """Build a ModelInfo tuple from catalog.py's CatalogModel entries.""" 

53 return tuple( 

54 ModelInfo(m.ref, m.display_name, m.size_gb, m.min_ram_gb, m.description) for m in featured 

55 ) 

56 

57 

58# Lazy singletons — resolved on first access to break the circular import 

59# between models.py (imports ModelTask) and catalog.py (imports from models). 

60 

61 

62@functools.cache 

63def _get_model_catalog() -> tuple[ModelInfo, ...]: 

64 from lilbee.catalog import FEATURED_CHAT 

65 

66 return _catalog_from_featured(FEATURED_CHAT) 

67 

68 

69def __getattr__(name: str) -> tuple[ModelInfo, ...]: 

70 if name == "MODEL_CATALOG": 

71 return _get_model_catalog() 

72 raise AttributeError(f"module {__name__!r} has no attribute {name!r}") 

73 

74 

75def get_system_ram_gb() -> float: 

76 """Return total system RAM in GB. Falls back to 8.0 if detection fails.""" 

77 try: 

78 if sys.platform == "win32": 

79 import ctypes 

80 

81 class _MEMORYSTATUSEX(ctypes.Structure): 

82 _fields_ = [ 

83 ("dwLength", ctypes.c_ulong), 

84 ("dwMemoryLoad", ctypes.c_ulong), 

85 ("ullTotalPhys", ctypes.c_ulonglong), 

86 ("ullAvailPhys", ctypes.c_ulonglong), 

87 ("ullTotalPageFile", ctypes.c_ulonglong), 

88 ("ullAvailPageFile", ctypes.c_ulonglong), 

89 ("ullTotalVirtual", ctypes.c_ulonglong), 

90 ("ullAvailVirtual", ctypes.c_ulonglong), 

91 ("ullAvailExtendedVirtual", ctypes.c_ulonglong), 

92 ] 

93 

94 stat = _MEMORYSTATUSEX() 

95 stat.dwLength = ctypes.sizeof(stat) 

96 ctypes.windll.kernel32.GlobalMemoryStatusEx(ctypes.byref(stat)) # type: ignore[attr-defined] 

97 return stat.ullTotalPhys / (1024**3) 

98 pages = os.sysconf("SC_PHYS_PAGES") 

99 page_size = os.sysconf("SC_PAGE_SIZE") 

100 return (pages * page_size) / (1024**3) 

101 except (OSError, AttributeError, ValueError): 

102 log.debug("RAM detection failed, falling back to 8.0 GB") 

103 return 8.0 

104 

105 

106def get_free_disk_gb(path: Path) -> float: 

107 """Return free disk space in GB for the filesystem containing *path*.""" 

108 check_path = path if path.exists() else path.parent 

109 while not check_path.exists(): 

110 check_path = check_path.parent 

111 usage = shutil.disk_usage(check_path) 

112 return usage.free / (1024**3) 

113 

114 

115def pick_default_model(ram_gb: float) -> ModelInfo: 

116 """Choose the largest catalog model that fits in *ram_gb*.""" 

117 best = _get_model_catalog()[0] 

118 for model in _get_model_catalog(): 

119 if model.min_ram_gb <= ram_gb: 

120 best = model 

121 return best 

122 

123 

124def _model_download_size_gb(model: str) -> float: 

125 """Estimated download size in GiB for an HF model ref.""" 

126 catalog_sizes = {m.ref: m.size_gb for m in _get_model_catalog()} 

127 fallback = 5.0 # reasonable default for unknown models 

128 return catalog_sizes.get(model, fallback) 

129 

130 

131def display_model_picker( 

132 ram_gb: float, free_disk_gb: float, *, console: Console | None = None 

133) -> ModelInfo: 

134 """Show a Rich table of catalog models and return the recommended model.""" 

135 console = console or Console(stderr=True) 

136 recommended = pick_default_model(ram_gb) 

137 

138 table = Table(title="Available Models", show_lines=False) 

139 table.add_column("#", justify="right", style="bold") 

140 table.add_column("Model", style="cyan") 

141 table.add_column("Size", justify="right") 

142 table.add_column("Description") 

143 

144 for idx, model in enumerate(_get_model_catalog(), 1): 

145 num_str = str(idx) 

146 label = model.display_name 

147 size_str = f"{model.size_gb:.1f} GB" 

148 desc = model.description 

149 

150 is_recommended = model == recommended 

151 disk_too_small = free_disk_gb < model.size_gb + _DISK_HEADROOM_GB 

152 

153 if is_recommended: 

154 label = f"[bold]{label} ★[/bold]" 

155 desc = f"[bold]{desc}[/bold]" 

156 num_str = f"[bold]{num_str}[/bold]" 

157 

158 if disk_too_small: 

159 size_str = f"[red]{model.size_gb:.1f} GB[/red]" 

160 

161 table.add_row(num_str, label, size_str, desc) 

162 

163 console.print() 

164 console.print("[bold]No chat model found.[/bold] Pick one to download:\n") 

165 console.print(table) 

166 console.print(f"\n System: {ram_gb:.0f} GB RAM, {free_disk_gb:.1f} GB free disk") 

167 console.print(f" {FEATURED_STAR} = recommended for your system") 

168 console.print(f" Browse more models at {MODELS_BROWSE_URL}\n") 

169 

170 return recommended 

171 

172 

173def prompt_model_choice(ram_gb: float) -> ModelInfo: 

174 """Prompt the user to pick a model by number. Returns the chosen ModelInfo.""" 

175 from lilbee.config import cfg 

176 

177 free_disk_gb = get_free_disk_gb(cfg.data_dir) 

178 recommended = display_model_picker(ram_gb, free_disk_gb) 

179 default_idx = list(_get_model_catalog()).index(recommended) + 1 

180 

181 while True: 

182 try: 

183 raw = input(f"Choice [{default_idx}]: ").strip() 

184 except (EOFError, KeyboardInterrupt): 

185 return recommended 

186 

187 if not raw: 

188 return recommended 

189 

190 try: 

191 choice = int(raw) 

192 except ValueError: 

193 sys.stderr.write(f"Enter a number 1-{len(_get_model_catalog())}.\n") 

194 continue 

195 

196 if 1 <= choice <= len(_get_model_catalog()): 

197 return _get_model_catalog()[choice - 1] 

198 

199 sys.stderr.write(f"Enter a number 1-{len(_get_model_catalog())}.\n") 

200 

201 

202def validate_disk_and_pull( 

203 model_info: ModelInfo, free_gb: float, *, console: Console | None = None 

204) -> None: 

205 """Check disk space, pull the model, and persist the choice.""" 

206 from lilbee.config import cfg 

207 

208 required_gb = model_info.size_gb + _DISK_HEADROOM_GB 

209 if free_gb < required_gb: 

210 raise RuntimeError( 

211 f"Not enough disk space to download '{model_info.display_name}': " 

212 f"need {required_gb:.1f} GB, have {free_gb:.1f} GB free. " 

213 f"Free up space or choose a smaller model." 

214 ) 

215 

216 pull_with_progress(model_info.ref, console=console) 

217 cfg.chat_model = model_info.ref 

218 settings.set_value(cfg.data_root, "chat_model", model_info.ref) 

219 

220 

221def pull_with_progress(model: str, *, console: Console | None = None) -> None: 

222 """Pull a model via model_manager, showing a Rich progress bar.""" 

223 from lilbee.model_manager import ModelSource, get_model_manager 

224 

225 if console is None: 

226 console = Console(file=sys.__stderr__ or sys.stderr) 

227 manager = get_model_manager() 

228 with Progress( 

229 SpinnerColumn(), 

230 TextColumn("{task.description}"), 

231 BarColumn(), 

232 DownloadColumn(), 

233 TextColumn("{task.percentage:>3.0f}%"), 

234 transient=True, 

235 console=console, 

236 ) as progress: 

237 desc = f"Downloading model '{model}'..." 

238 ptask = progress.add_task(desc, total=None) 

239 

240 def _on_bytes(downloaded: int, total: int) -> None: 

241 if total > 0: 

242 progress.update(ptask, total=total, completed=downloaded) 

243 

244 manager.pull(model, ModelSource.NATIVE, on_bytes=_on_bytes) 

245 console.print(f"Model '{model}' ready.") 

246 

247 

248def ensure_chat_model() -> None: 

249 """If no chat models are installed, pick and pull one. 

250 Interactive (TTY): show catalog picker with descriptions and sizes. 

251 Non-interactive (CI/pipes): auto-pick recommended model silently. 

252 Persists the chosen model in config.toml so it becomes the default. 

253 """ 

254 from lilbee.config import cfg 

255 from lilbee.model_manager import get_model_manager 

256 

257 manager = get_model_manager() 

258 try: 

259 installed = manager.list_installed() 

260 except RuntimeError as exc: 

261 raise RuntimeError(f"Cannot list models: {exc}") from exc 

262 

263 # Filter out the configured embedding model so we only check for chat 

264 # candidates. The embedding ref points at one specific manifest; we 

265 # match it exactly rather than by family stem. 

266 embed_ref = cfg.embedding_model 

267 chat_models = [m for m in installed if m != embed_ref] 

268 if chat_models: 

269 return 

270 

271 ram_gb = get_system_ram_gb() 

272 free_gb = get_free_disk_gb(cfg.data_dir) 

273 

274 if sys.stdin.isatty(): 

275 model_info = prompt_model_choice(ram_gb) 

276 else: 

277 model_info = pick_default_model(ram_gb) 

278 sys.stderr.write( 

279 f"No chat model found. Auto-installing '{model_info.display_name}' " 

280 f"(detected {ram_gb:.0f} GB RAM)...\n" 

281 ) 

282 

283 validate_disk_and_pull(model_info, free_gb) 

284 

285 

286def list_installed_models() -> list[str]: 

287 """Return installed chat-task model names. 

288 

289 Sources both the native registry (manifest ``task`` field) and the 

290 SDK backend catalog (classified by name/family). Non-chat roles 

291 (embedding, vision, rerank) are excluded so TUI pickers don't offer 

292 refs that fail pydantic task validation at assignment time. 

293 """ 

294 from lilbee.config import cfg 

295 from lilbee.model_manager import classify_remote_models 

296 from lilbee.registry import ModelRegistry 

297 

298 try: 

299 names: list[str] = [] 

300 registry = ModelRegistry(cfg.models_dir) 

301 for manifest in registry.list_installed(): 

302 if manifest.task == ModelTask.CHAT: 

303 names.append(manifest.ref) 

304 for remote in classify_remote_models(cfg.remote_base_url): 

305 if remote.task == ModelTask.CHAT: 

306 names.append(remote.name) 

307 return sorted(set(names)) 

308 except Exception: 

309 log.debug("Failed to list installed models", exc_info=True) 

310 return []