Coverage for src / lilbee / cli / commands.py: 100%

328 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 08:27 +0000

1"""CLI command definitions registered on the app.""" 

2 

3import asyncio 

4from pathlib import Path 

5 

6import typer 

7from rich.table import Table 

8 

9from lilbee import settings 

10from lilbee.cli.app import ( 

11 _global_option, 

12 app, 

13 apply_overrides, 

14 console, 

15 data_dir_option, 

16 model_option, 

17 num_ctx_option, 

18 repeat_penalty_option, 

19 seed_option, 

20 temperature_option, 

21 top_k_sampling_option, 

22 top_p_option, 

23) 

24from lilbee.cli.helpers import ( 

25 add_paths, 

26 auto_sync, 

27 clean_result, 

28 copy_paths, 

29 gather_status, 

30 get_version, 

31 json_output, 

32 perform_reset, 

33 render_status, 

34 sync_result_to_json, 

35) 

36from lilbee.config import cfg 

37 

38CHUNK_PREVIEW_LEN = 80 # characters shown in human-readable search output 

39 

40_vision_option = typer.Option(False, "--vision", help="Enable vision OCR for scanned PDFs.") 

41_vision_timeout_option = typer.Option( 

42 None, 

43 "--vision-timeout", 

44 help="Per-page timeout in seconds for vision OCR (default: 120, 0 = no limit).", 

45) 

46 

47 

48def _ensure_vision_model() -> None: 

49 """Ensure a vision model is configured and available for this run.""" 

50 if cfg.vision_model: 

51 _validate_configured_vision() 

52 return 

53 

54 # Restore persisted model from TOML (--vision is explicit even if model was cleared) 

55 saved = settings.get(cfg.data_root, "vision_model") or "" 

56 if saved: 

57 cfg.vision_model = saved 

58 _validate_configured_vision() 

59 return 

60 

61 import sys 

62 

63 from lilbee.cli.chat import list_ollama_models 

64 

65 try: 

66 installed = set(list_ollama_models()) 

67 except Exception: 

68 console.print("[yellow]Warning: Cannot connect to Ollama. Vision OCR disabled.[/yellow]") 

69 return 

70 

71 if sys.stdin.isatty(): 

72 _pick_vision_interactive(installed) 

73 else: 

74 _pick_vision_auto(installed) 

75 

76 

77def _validate_configured_vision() -> None: 

78 """Check that a pre-configured vision model is available; pull if needed.""" 

79 from lilbee.cli.chat import list_ollama_models 

80 from lilbee.models import ensure_tag 

81 

82 tagged = ensure_tag(cfg.vision_model) 

83 cfg.vision_model = tagged 

84 

85 try: 

86 installed = set(list_ollama_models()) 

87 except Exception: 

88 # Can't reach Ollama — keep the config and let downstream handle errors 

89 return 

90 

91 if tagged in installed: 

92 return 

93 

94 console.print(f"Vision model '{tagged}' not installed. Pulling...") 

95 if not _try_pull(tagged): 

96 cfg.vision_model = "" 

97 

98 

99def _pick_vision_interactive(installed: set[str]) -> None: 

100 """Interactive vision model picker for TTY sessions.""" 

101 from lilbee.models import ( 

102 VISION_CATALOG, 

103 display_vision_picker, 

104 get_free_disk_gb, 

105 get_system_ram_gb, 

106 ) 

107 

108 ram_gb = get_system_ram_gb() 

109 free_gb = get_free_disk_gb(cfg.data_dir) 

110 recommended = display_vision_picker(ram_gb, free_gb) 

111 default_idx = list(VISION_CATALOG).index(recommended) + 1 

112 

113 try: 

114 raw = input(f"Choice [{default_idx}]: ").strip() 

115 except (EOFError, KeyboardInterrupt): 

116 return 

117 

118 if not raw: 

119 model_info = recommended 

120 else: 

121 try: 

122 choice = int(raw) 

123 except ValueError: 

124 console.print(f"[red]Enter a number 1-{len(VISION_CATALOG)}.[/red]") 

125 return 

126 if not (1 <= choice <= len(VISION_CATALOG)): 

127 console.print(f"[red]Enter a number 1-{len(VISION_CATALOG)}.[/red]") 

128 return 

129 model_info = VISION_CATALOG[choice - 1] 

130 

131 _pull_and_save_vision(model_info.name, installed) 

132 

133 

134def _pick_vision_auto(installed: set[str]) -> None: 

135 """Non-interactive vision model auto-selection.""" 

136 import sys 

137 

138 from lilbee.models import pick_default_vision_model 

139 

140 model_info = pick_default_vision_model() 

141 sys.stderr.write(f"No vision model configured. Auto-selecting '{model_info.name}'...\n") 

142 _pull_and_save_vision(model_info.name, installed) 

143 

144 

145def _try_pull(model_name: str) -> bool: 

146 """Attempt to pull a model. Returns True on success, False on failure.""" 

147 from lilbee.models import pull_with_progress 

148 

149 try: 

150 pull_with_progress(model_name) 

151 except Exception as exc: 

152 console.print(f"[yellow]Warning: Failed to pull '{model_name}': {exc}[/yellow]") 

153 console.print("[yellow]Continuing without vision OCR.[/yellow]") 

154 return False 

155 return True 

156 

157 

158def _pull_and_save_vision(model_name: str, installed: set[str]) -> None: 

159 """Pull if needed and persist vision model choice.""" 

160 if model_name not in installed and not _try_pull(model_name): 

161 return 

162 

163 cfg.vision_model = model_name 

164 settings.set_value(cfg.data_root, "vision_model", model_name) 

165 

166 

167_paths_argument = typer.Argument( 

168 ..., 

169 exists=True, 

170 help="Files or directories to add to the knowledge base.", 

171) 

172 

173 

174@app.command() 

175def search( 

176 query: str = typer.Argument(..., help="Search query"), 

177 top_k: int = typer.Option(None, "--top-k", "-k", help="Number of results"), 

178 data_dir: Path | None = data_dir_option, 

179 use_global: bool = _global_option, 

180) -> None: 

181 """Search the knowledge base for relevant chunks.""" 

182 apply_overrides(data_dir=data_dir, use_global=use_global) 

183 from lilbee.query import search_context 

184 

185 results = search_context(query, top_k=top_k or cfg.top_k) 

186 cleaned = [clean_result(r) for r in results] 

187 

188 if cfg.json_mode: 

189 json_output({"command": "search", "query": query, "results": cleaned}) 

190 return 

191 

192 if not cleaned: 

193 console.print("No results found.") 

194 return 

195 

196 table = Table(title="Search Results") 

197 table.add_column("Source", style="cyan") 

198 table.add_column("Chunk", max_width=80) 

199 table.add_column("Distance", justify="right", style="dim") 

200 

201 for r in cleaned: 

202 preview = r.get("chunk", "")[:CHUNK_PREVIEW_LEN] 

203 if len(r.get("chunk", "")) > CHUNK_PREVIEW_LEN: 

204 preview += "..." 

205 table.add_row( 

206 r.get("source", ""), 

207 preview, 

208 f"{r.get('distance', 0):.4f}", 

209 ) 

210 console.print(table) 

211 

212 

213@app.command(name="sync") 

214def sync_cmd( 

215 data_dir: Path | None = data_dir_option, 

216 use_global: bool = _global_option, 

217 vision: bool = _vision_option, 

218 vision_timeout: float | None = _vision_timeout_option, 

219) -> None: 

220 """Manually trigger document sync.""" 

221 apply_overrides(data_dir=data_dir, use_global=use_global) 

222 if vision_timeout is not None: 

223 cfg.vision_timeout = vision_timeout 

224 if vision: 

225 _ensure_vision_model() 

226 from lilbee.ingest import sync 

227 

228 try: 

229 result = asyncio.run(sync(quiet=cfg.json_mode, force_vision=vision)) 

230 except RuntimeError as exc: 

231 if cfg.json_mode: 

232 json_output({"error": str(exc)}) 

233 raise SystemExit(1) from None 

234 console.print(f"[red]Error:[/red] {exc}") 

235 raise SystemExit(1) from None 

236 if cfg.json_mode: 

237 json_output(sync_result_to_json(result)) 

238 return 

239 console.print(result) 

240 

241 

242@app.command() 

243def rebuild( 

244 data_dir: Path | None = data_dir_option, 

245 use_global: bool = _global_option, 

246 vision: bool = _vision_option, 

247 vision_timeout: float | None = _vision_timeout_option, 

248) -> None: 

249 """Nuke the DB and re-ingest everything from documents/.""" 

250 apply_overrides(data_dir=data_dir, use_global=use_global) 

251 if vision_timeout is not None: 

252 cfg.vision_timeout = vision_timeout 

253 if vision: 

254 _ensure_vision_model() 

255 from lilbee.ingest import sync 

256 

257 try: 

258 result = asyncio.run(sync(force_rebuild=True, quiet=cfg.json_mode, force_vision=vision)) 

259 except RuntimeError as exc: 

260 if cfg.json_mode: 

261 json_output({"error": str(exc)}) 

262 raise SystemExit(1) from None 

263 console.print(f"[red]Error:[/red] {exc}") 

264 raise SystemExit(1) from None 

265 if cfg.json_mode: 

266 json_output({"command": "rebuild", "ingested": len(result.added)}) 

267 return 

268 console.print(f"Rebuilt: {len(result.added)} documents ingested") 

269 

270 

271_force_option = typer.Option(False, "--force", "-f", help="Overwrite existing files.") 

272 

273 

274@app.command() 

275def add( 

276 paths: list[Path] = _paths_argument, 

277 data_dir: Path | None = data_dir_option, 

278 use_global: bool = _global_option, 

279 force: bool = _force_option, 

280 vision: bool = _vision_option, 

281 vision_timeout: float | None = _vision_timeout_option, 

282) -> None: 

283 """Copy files into the knowledge base and ingest them.""" 

284 apply_overrides(data_dir=data_dir, use_global=use_global) 

285 if vision_timeout is not None: 

286 cfg.vision_timeout = vision_timeout 

287 if vision: 

288 _ensure_vision_model() 

289 try: 

290 if cfg.json_mode: 

291 from lilbee.ingest import sync 

292 

293 copied = copy_paths(paths, console, force=force) 

294 result = asyncio.run(sync(quiet=True, force_vision=vision)) 

295 json_output({"command": "add", "copied": copied, "sync": sync_result_to_json(result)}) 

296 return 

297 add_paths(paths, console, force=force, force_vision=vision) 

298 except RuntimeError as exc: 

299 if cfg.json_mode: 

300 json_output({"error": str(exc)}) 

301 raise SystemExit(1) from None 

302 console.print(f"[red]Error:[/red] {exc}") 

303 raise SystemExit(1) from None 

304 

305 

306_chunks_source_argument = typer.Argument(..., help="Source name to inspect chunks for.") 

307 

308 

309@app.command() 

310def chunks( 

311 source: str = _chunks_source_argument, 

312 data_dir: Path | None = data_dir_option, 

313 use_global: bool = _global_option, 

314) -> None: 

315 """Show chunks a document was split into (useful for debugging retrieval).""" 

316 apply_overrides(data_dir=data_dir, use_global=use_global) 

317 

318 from lilbee.store import get_chunks_by_source, get_sources 

319 

320 known = {s["filename"] for s in get_sources()} 

321 if source not in known: 

322 if cfg.json_mode: 

323 json_output({"error": f"Source not found: {source}"}) 

324 raise SystemExit(1) 

325 console.print(f"[red]Source not found:[/red] {source}") 

326 raise SystemExit(1) 

327 

328 raw_chunks = get_chunks_by_source(source) 

329 cleaned = sorted( 

330 [clean_result(c) for c in raw_chunks], 

331 key=lambda c: c.get("chunk_index", 0), 

332 ) 

333 

334 if cfg.json_mode: 

335 json_output({"command": "chunks", "source": source, "chunks": cleaned}) 

336 return 

337 

338 console.print(f"[bold]{len(cleaned)}[/bold] chunks from [cyan]{source}[/cyan]\n") 

339 for c in cleaned: 

340 idx = c.get("chunk_index", "?") 

341 preview = c.get("chunk", "")[:CHUNK_PREVIEW_LEN] 

342 if len(c.get("chunk", "")) > CHUNK_PREVIEW_LEN: 

343 preview += "..." 

344 console.print(f" [{idx}] {preview}") 

345 

346 

347_remove_names_argument = typer.Argument( 

348 ..., help="Source name(s) to remove from the knowledge base." 

349) 

350 

351_delete_file_option = typer.Option( 

352 False, "--delete", help="Also delete the file from the documents directory." 

353) 

354 

355 

356@app.command() 

357def remove( 

358 names: list[str] = _remove_names_argument, 

359 data_dir: Path | None = data_dir_option, 

360 use_global: bool = _global_option, 

361 delete_file: bool = _delete_file_option, 

362) -> None: 

363 """Remove documents from the knowledge base by source name.""" 

364 apply_overrides(data_dir=data_dir, use_global=use_global) 

365 

366 from lilbee.store import delete_by_source, delete_source, get_sources 

367 

368 known = {s["filename"] for s in get_sources()} 

369 removed: list[str] = [] 

370 not_found: list[str] = [] 

371 

372 for name in names: 

373 if name not in known: 

374 not_found.append(name) 

375 continue 

376 delete_by_source(name) 

377 delete_source(name) 

378 removed.append(name) 

379 if delete_file: 

380 path = cfg.documents_dir / name 

381 if path.exists(): 

382 path.unlink() 

383 

384 if cfg.json_mode: 

385 payload: dict = {"command": "remove", "removed": removed} 

386 if not_found: 

387 payload["not_found"] = not_found 

388 json_output(payload) 

389 return 

390 

391 for name in removed: 

392 console.print(f"Removed [cyan]{name}[/cyan]") 

393 for name in not_found: 

394 console.print(f"[red]Not found:[/red] {name}") 

395 if not removed and not_found: 

396 raise SystemExit(1) 

397 

398 

399@app.command() 

400def ask( 

401 question: str = typer.Argument(..., help="Question to ask"), 

402 data_dir: Path | None = data_dir_option, 

403 model: str | None = model_option, 

404 use_global: bool = _global_option, 

405 temperature: float | None = temperature_option, 

406 top_p: float | None = top_p_option, 

407 top_k_sampling: int | None = top_k_sampling_option, 

408 repeat_penalty: float | None = repeat_penalty_option, 

409 num_ctx: int | None = num_ctx_option, 

410 seed: int | None = seed_option, 

411) -> None: 

412 """Ask a one-shot question (auto-syncs first).""" 

413 apply_overrides( 

414 data_dir=data_dir, 

415 model=model, 

416 use_global=use_global, 

417 temperature=temperature, 

418 top_p=top_p, 

419 top_k_sampling=top_k_sampling, 

420 repeat_penalty=repeat_penalty, 

421 num_ctx=num_ctx, 

422 seed=seed, 

423 ) 

424 

425 from lilbee.embedder import validate_model 

426 from lilbee.models import ensure_chat_model 

427 

428 ensure_chat_model() 

429 validate_model() 

430 auto_sync(console) 

431 

432 try: 

433 if cfg.json_mode: 

434 from lilbee.query import ask_raw 

435 

436 result = ask_raw(question) 

437 json_output( 

438 { 

439 "command": "ask", 

440 "question": question, 

441 "answer": result.answer, 

442 "sources": [clean_result(s) for s in result.sources], 

443 } 

444 ) 

445 return 

446 

447 from lilbee.query import ask_stream 

448 

449 for token in ask_stream(question): 

450 console.print(token, end="") 

451 console.print() 

452 except RuntimeError as exc: 

453 if cfg.json_mode: 

454 json_output({"error": str(exc)}) 

455 raise SystemExit(1) from None 

456 console.print(f"[red]Error:[/red] {exc}") 

457 raise SystemExit(1) from None 

458 

459 

460@app.command() 

461def chat( 

462 data_dir: Path | None = data_dir_option, 

463 model: str | None = model_option, 

464 use_global: bool = _global_option, 

465 temperature: float | None = temperature_option, 

466 top_p: float | None = top_p_option, 

467 top_k_sampling: int | None = top_k_sampling_option, 

468 repeat_penalty: float | None = repeat_penalty_option, 

469 num_ctx: int | None = num_ctx_option, 

470 seed: int | None = seed_option, 

471) -> None: 

472 """Interactive chat loop (auto-syncs first).""" 

473 apply_overrides( 

474 data_dir=data_dir, 

475 model=model, 

476 use_global=use_global, 

477 temperature=temperature, 

478 top_p=top_p, 

479 top_k_sampling=top_k_sampling, 

480 repeat_penalty=repeat_penalty, 

481 num_ctx=num_ctx, 

482 seed=seed, 

483 ) 

484 from lilbee.embedder import validate_model 

485 from lilbee.models import ensure_chat_model 

486 

487 ensure_chat_model() 

488 validate_model() 

489 auto_sync(console) 

490 from lilbee.cli.chat import chat_loop 

491 

492 chat_loop(console) 

493 

494 

495@app.command() 

496def version() -> None: 

497 """Show the lilbee version.""" 

498 ver = get_version() 

499 if cfg.json_mode: 

500 json_output({"command": "version", "version": ver}) 

501 return 

502 console.print(f"lilbee {ver}") 

503 

504 

505@app.command() 

506def status( 

507 data_dir: Path | None = data_dir_option, 

508 use_global: bool = _global_option, 

509) -> None: 

510 """Show indexed documents, paths, and chunk counts.""" 

511 apply_overrides(data_dir=data_dir, use_global=use_global) 

512 if cfg.json_mode: 

513 json_output(gather_status().model_dump(exclude_none=True)) 

514 return 

515 render_status(console) 

516 

517 

518_yes_option = typer.Option(False, "--yes", "-y", help="Skip confirmation prompt.") 

519 

520 

521@app.command() 

522def reset( 

523 data_dir: Path | None = data_dir_option, 

524 use_global: bool = _global_option, 

525 yes: bool = _yes_option, 

526) -> None: 

527 """Delete all documents and data (full factory reset).""" 

528 apply_overrides(data_dir=data_dir, use_global=use_global) 

529 if not yes: 

530 if cfg.json_mode: 

531 json_output({"error": "Use --yes to confirm reset in JSON mode"}) 

532 raise SystemExit(1) 

533 console.print( 

534 f"[bold red]This will delete ALL documents and data.[/bold red]\n" 

535 f" Documents: {cfg.documents_dir}\n" 

536 f" Data: {cfg.data_dir}" 

537 ) 

538 confirmed = typer.confirm("Are you sure?", default=False) 

539 if not confirmed: 

540 console.print("Aborted.") 

541 raise SystemExit(0) 

542 

543 result = perform_reset() 

544 

545 if cfg.json_mode: 

546 json_output(result.model_dump()) 

547 return 

548 

549 console.print( 

550 f"Reset complete: {result.deleted_docs} document(s), " 

551 f"{result.deleted_data} data item(s) deleted." 

552 ) 

553 

554 

555@app.command() 

556def init() -> None: 

557 """Initialize a local .lilbee/ knowledge base in the current directory.""" 

558 from pathlib import Path 

559 

560 root = Path.cwd() / ".lilbee" 

561 if root.is_dir(): 

562 if cfg.json_mode: 

563 json_output({"command": "init", "path": str(root), "created": False}) 

564 return 

565 console.print(f"Already initialized: {root}") 

566 return 

567 

568 docs = root / "documents" 

569 data = root / "data" 

570 docs.mkdir(parents=True) 

571 data.mkdir(parents=True) 

572 (root / ".gitignore").write_text("data/\n") 

573 

574 if cfg.json_mode: 

575 json_output({"command": "init", "path": str(root), "created": True}) 

576 return 

577 console.print(f"Initialized local knowledge base at {root}") 

578 

579 

580@app.command() 

581def serve( 

582 host: str = typer.Option(None, "--host", "-H", help="Bind address (default: 127.0.0.1)"), 

583 port: int = typer.Option(None, "--port", "-p", help="Port (default: 7433)"), 

584 data_dir: Path | None = data_dir_option, 

585 use_global: bool = _global_option, 

586) -> None: 

587 """Start the HTTP API server for Obsidian and other clients.""" 

588 apply_overrides(data_dir=data_dir, use_global=use_global) 

589 if host is not None: 

590 cfg.server_host = host 

591 if port is not None: 

592 cfg.server_port = port 

593 

594 import logging 

595 

596 import uvicorn 

597 

598 from lilbee.server import create_app 

599 

600 logging.getLogger("asyncio").setLevel(logging.ERROR) 

601 

602 uvicorn.run( 

603 create_app(), 

604 host=cfg.server_host, 

605 port=cfg.server_port, 

606 ) 

607 

608 

609@app.command(name="mcp") 

610def mcp_cmd() -> None: 

611 """Start the MCP server (stdio transport) for agent integration.""" 

612 from lilbee.mcp import main 

613 

614 main()