Coverage for src / lilbee / cli / commands.py: 100%
328 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 08:27 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 08:27 +0000
1"""CLI command definitions registered on the app."""
3import asyncio
4from pathlib import Path
6import typer
7from rich.table import Table
9from lilbee import settings
10from lilbee.cli.app import (
11 _global_option,
12 app,
13 apply_overrides,
14 console,
15 data_dir_option,
16 model_option,
17 num_ctx_option,
18 repeat_penalty_option,
19 seed_option,
20 temperature_option,
21 top_k_sampling_option,
22 top_p_option,
23)
24from lilbee.cli.helpers import (
25 add_paths,
26 auto_sync,
27 clean_result,
28 copy_paths,
29 gather_status,
30 get_version,
31 json_output,
32 perform_reset,
33 render_status,
34 sync_result_to_json,
35)
36from lilbee.config import cfg
38CHUNK_PREVIEW_LEN = 80 # characters shown in human-readable search output
40_vision_option = typer.Option(False, "--vision", help="Enable vision OCR for scanned PDFs.")
41_vision_timeout_option = typer.Option(
42 None,
43 "--vision-timeout",
44 help="Per-page timeout in seconds for vision OCR (default: 120, 0 = no limit).",
45)
48def _ensure_vision_model() -> None:
49 """Ensure a vision model is configured and available for this run."""
50 if cfg.vision_model:
51 _validate_configured_vision()
52 return
54 # Restore persisted model from TOML (--vision is explicit even if model was cleared)
55 saved = settings.get(cfg.data_root, "vision_model") or ""
56 if saved:
57 cfg.vision_model = saved
58 _validate_configured_vision()
59 return
61 import sys
63 from lilbee.cli.chat import list_ollama_models
65 try:
66 installed = set(list_ollama_models())
67 except Exception:
68 console.print("[yellow]Warning: Cannot connect to Ollama. Vision OCR disabled.[/yellow]")
69 return
71 if sys.stdin.isatty():
72 _pick_vision_interactive(installed)
73 else:
74 _pick_vision_auto(installed)
77def _validate_configured_vision() -> None:
78 """Check that a pre-configured vision model is available; pull if needed."""
79 from lilbee.cli.chat import list_ollama_models
80 from lilbee.models import ensure_tag
82 tagged = ensure_tag(cfg.vision_model)
83 cfg.vision_model = tagged
85 try:
86 installed = set(list_ollama_models())
87 except Exception:
88 # Can't reach Ollama — keep the config and let downstream handle errors
89 return
91 if tagged in installed:
92 return
94 console.print(f"Vision model '{tagged}' not installed. Pulling...")
95 if not _try_pull(tagged):
96 cfg.vision_model = ""
99def _pick_vision_interactive(installed: set[str]) -> None:
100 """Interactive vision model picker for TTY sessions."""
101 from lilbee.models import (
102 VISION_CATALOG,
103 display_vision_picker,
104 get_free_disk_gb,
105 get_system_ram_gb,
106 )
108 ram_gb = get_system_ram_gb()
109 free_gb = get_free_disk_gb(cfg.data_dir)
110 recommended = display_vision_picker(ram_gb, free_gb)
111 default_idx = list(VISION_CATALOG).index(recommended) + 1
113 try:
114 raw = input(f"Choice [{default_idx}]: ").strip()
115 except (EOFError, KeyboardInterrupt):
116 return
118 if not raw:
119 model_info = recommended
120 else:
121 try:
122 choice = int(raw)
123 except ValueError:
124 console.print(f"[red]Enter a number 1-{len(VISION_CATALOG)}.[/red]")
125 return
126 if not (1 <= choice <= len(VISION_CATALOG)):
127 console.print(f"[red]Enter a number 1-{len(VISION_CATALOG)}.[/red]")
128 return
129 model_info = VISION_CATALOG[choice - 1]
131 _pull_and_save_vision(model_info.name, installed)
134def _pick_vision_auto(installed: set[str]) -> None:
135 """Non-interactive vision model auto-selection."""
136 import sys
138 from lilbee.models import pick_default_vision_model
140 model_info = pick_default_vision_model()
141 sys.stderr.write(f"No vision model configured. Auto-selecting '{model_info.name}'...\n")
142 _pull_and_save_vision(model_info.name, installed)
145def _try_pull(model_name: str) -> bool:
146 """Attempt to pull a model. Returns True on success, False on failure."""
147 from lilbee.models import pull_with_progress
149 try:
150 pull_with_progress(model_name)
151 except Exception as exc:
152 console.print(f"[yellow]Warning: Failed to pull '{model_name}': {exc}[/yellow]")
153 console.print("[yellow]Continuing without vision OCR.[/yellow]")
154 return False
155 return True
158def _pull_and_save_vision(model_name: str, installed: set[str]) -> None:
159 """Pull if needed and persist vision model choice."""
160 if model_name not in installed and not _try_pull(model_name):
161 return
163 cfg.vision_model = model_name
164 settings.set_value(cfg.data_root, "vision_model", model_name)
167_paths_argument = typer.Argument(
168 ...,
169 exists=True,
170 help="Files or directories to add to the knowledge base.",
171)
174@app.command()
175def search(
176 query: str = typer.Argument(..., help="Search query"),
177 top_k: int = typer.Option(None, "--top-k", "-k", help="Number of results"),
178 data_dir: Path | None = data_dir_option,
179 use_global: bool = _global_option,
180) -> None:
181 """Search the knowledge base for relevant chunks."""
182 apply_overrides(data_dir=data_dir, use_global=use_global)
183 from lilbee.query import search_context
185 results = search_context(query, top_k=top_k or cfg.top_k)
186 cleaned = [clean_result(r) for r in results]
188 if cfg.json_mode:
189 json_output({"command": "search", "query": query, "results": cleaned})
190 return
192 if not cleaned:
193 console.print("No results found.")
194 return
196 table = Table(title="Search Results")
197 table.add_column("Source", style="cyan")
198 table.add_column("Chunk", max_width=80)
199 table.add_column("Distance", justify="right", style="dim")
201 for r in cleaned:
202 preview = r.get("chunk", "")[:CHUNK_PREVIEW_LEN]
203 if len(r.get("chunk", "")) > CHUNK_PREVIEW_LEN:
204 preview += "..."
205 table.add_row(
206 r.get("source", ""),
207 preview,
208 f"{r.get('distance', 0):.4f}",
209 )
210 console.print(table)
213@app.command(name="sync")
214def sync_cmd(
215 data_dir: Path | None = data_dir_option,
216 use_global: bool = _global_option,
217 vision: bool = _vision_option,
218 vision_timeout: float | None = _vision_timeout_option,
219) -> None:
220 """Manually trigger document sync."""
221 apply_overrides(data_dir=data_dir, use_global=use_global)
222 if vision_timeout is not None:
223 cfg.vision_timeout = vision_timeout
224 if vision:
225 _ensure_vision_model()
226 from lilbee.ingest import sync
228 try:
229 result = asyncio.run(sync(quiet=cfg.json_mode, force_vision=vision))
230 except RuntimeError as exc:
231 if cfg.json_mode:
232 json_output({"error": str(exc)})
233 raise SystemExit(1) from None
234 console.print(f"[red]Error:[/red] {exc}")
235 raise SystemExit(1) from None
236 if cfg.json_mode:
237 json_output(sync_result_to_json(result))
238 return
239 console.print(result)
242@app.command()
243def rebuild(
244 data_dir: Path | None = data_dir_option,
245 use_global: bool = _global_option,
246 vision: bool = _vision_option,
247 vision_timeout: float | None = _vision_timeout_option,
248) -> None:
249 """Nuke the DB and re-ingest everything from documents/."""
250 apply_overrides(data_dir=data_dir, use_global=use_global)
251 if vision_timeout is not None:
252 cfg.vision_timeout = vision_timeout
253 if vision:
254 _ensure_vision_model()
255 from lilbee.ingest import sync
257 try:
258 result = asyncio.run(sync(force_rebuild=True, quiet=cfg.json_mode, force_vision=vision))
259 except RuntimeError as exc:
260 if cfg.json_mode:
261 json_output({"error": str(exc)})
262 raise SystemExit(1) from None
263 console.print(f"[red]Error:[/red] {exc}")
264 raise SystemExit(1) from None
265 if cfg.json_mode:
266 json_output({"command": "rebuild", "ingested": len(result.added)})
267 return
268 console.print(f"Rebuilt: {len(result.added)} documents ingested")
271_force_option = typer.Option(False, "--force", "-f", help="Overwrite existing files.")
274@app.command()
275def add(
276 paths: list[Path] = _paths_argument,
277 data_dir: Path | None = data_dir_option,
278 use_global: bool = _global_option,
279 force: bool = _force_option,
280 vision: bool = _vision_option,
281 vision_timeout: float | None = _vision_timeout_option,
282) -> None:
283 """Copy files into the knowledge base and ingest them."""
284 apply_overrides(data_dir=data_dir, use_global=use_global)
285 if vision_timeout is not None:
286 cfg.vision_timeout = vision_timeout
287 if vision:
288 _ensure_vision_model()
289 try:
290 if cfg.json_mode:
291 from lilbee.ingest import sync
293 copied = copy_paths(paths, console, force=force)
294 result = asyncio.run(sync(quiet=True, force_vision=vision))
295 json_output({"command": "add", "copied": copied, "sync": sync_result_to_json(result)})
296 return
297 add_paths(paths, console, force=force, force_vision=vision)
298 except RuntimeError as exc:
299 if cfg.json_mode:
300 json_output({"error": str(exc)})
301 raise SystemExit(1) from None
302 console.print(f"[red]Error:[/red] {exc}")
303 raise SystemExit(1) from None
306_chunks_source_argument = typer.Argument(..., help="Source name to inspect chunks for.")
309@app.command()
310def chunks(
311 source: str = _chunks_source_argument,
312 data_dir: Path | None = data_dir_option,
313 use_global: bool = _global_option,
314) -> None:
315 """Show chunks a document was split into (useful for debugging retrieval)."""
316 apply_overrides(data_dir=data_dir, use_global=use_global)
318 from lilbee.store import get_chunks_by_source, get_sources
320 known = {s["filename"] for s in get_sources()}
321 if source not in known:
322 if cfg.json_mode:
323 json_output({"error": f"Source not found: {source}"})
324 raise SystemExit(1)
325 console.print(f"[red]Source not found:[/red] {source}")
326 raise SystemExit(1)
328 raw_chunks = get_chunks_by_source(source)
329 cleaned = sorted(
330 [clean_result(c) for c in raw_chunks],
331 key=lambda c: c.get("chunk_index", 0),
332 )
334 if cfg.json_mode:
335 json_output({"command": "chunks", "source": source, "chunks": cleaned})
336 return
338 console.print(f"[bold]{len(cleaned)}[/bold] chunks from [cyan]{source}[/cyan]\n")
339 for c in cleaned:
340 idx = c.get("chunk_index", "?")
341 preview = c.get("chunk", "")[:CHUNK_PREVIEW_LEN]
342 if len(c.get("chunk", "")) > CHUNK_PREVIEW_LEN:
343 preview += "..."
344 console.print(f" [{idx}] {preview}")
347_remove_names_argument = typer.Argument(
348 ..., help="Source name(s) to remove from the knowledge base."
349)
351_delete_file_option = typer.Option(
352 False, "--delete", help="Also delete the file from the documents directory."
353)
356@app.command()
357def remove(
358 names: list[str] = _remove_names_argument,
359 data_dir: Path | None = data_dir_option,
360 use_global: bool = _global_option,
361 delete_file: bool = _delete_file_option,
362) -> None:
363 """Remove documents from the knowledge base by source name."""
364 apply_overrides(data_dir=data_dir, use_global=use_global)
366 from lilbee.store import delete_by_source, delete_source, get_sources
368 known = {s["filename"] for s in get_sources()}
369 removed: list[str] = []
370 not_found: list[str] = []
372 for name in names:
373 if name not in known:
374 not_found.append(name)
375 continue
376 delete_by_source(name)
377 delete_source(name)
378 removed.append(name)
379 if delete_file:
380 path = cfg.documents_dir / name
381 if path.exists():
382 path.unlink()
384 if cfg.json_mode:
385 payload: dict = {"command": "remove", "removed": removed}
386 if not_found:
387 payload["not_found"] = not_found
388 json_output(payload)
389 return
391 for name in removed:
392 console.print(f"Removed [cyan]{name}[/cyan]")
393 for name in not_found:
394 console.print(f"[red]Not found:[/red] {name}")
395 if not removed and not_found:
396 raise SystemExit(1)
399@app.command()
400def ask(
401 question: str = typer.Argument(..., help="Question to ask"),
402 data_dir: Path | None = data_dir_option,
403 model: str | None = model_option,
404 use_global: bool = _global_option,
405 temperature: float | None = temperature_option,
406 top_p: float | None = top_p_option,
407 top_k_sampling: int | None = top_k_sampling_option,
408 repeat_penalty: float | None = repeat_penalty_option,
409 num_ctx: int | None = num_ctx_option,
410 seed: int | None = seed_option,
411) -> None:
412 """Ask a one-shot question (auto-syncs first)."""
413 apply_overrides(
414 data_dir=data_dir,
415 model=model,
416 use_global=use_global,
417 temperature=temperature,
418 top_p=top_p,
419 top_k_sampling=top_k_sampling,
420 repeat_penalty=repeat_penalty,
421 num_ctx=num_ctx,
422 seed=seed,
423 )
425 from lilbee.embedder import validate_model
426 from lilbee.models import ensure_chat_model
428 ensure_chat_model()
429 validate_model()
430 auto_sync(console)
432 try:
433 if cfg.json_mode:
434 from lilbee.query import ask_raw
436 result = ask_raw(question)
437 json_output(
438 {
439 "command": "ask",
440 "question": question,
441 "answer": result.answer,
442 "sources": [clean_result(s) for s in result.sources],
443 }
444 )
445 return
447 from lilbee.query import ask_stream
449 for token in ask_stream(question):
450 console.print(token, end="")
451 console.print()
452 except RuntimeError as exc:
453 if cfg.json_mode:
454 json_output({"error": str(exc)})
455 raise SystemExit(1) from None
456 console.print(f"[red]Error:[/red] {exc}")
457 raise SystemExit(1) from None
460@app.command()
461def chat(
462 data_dir: Path | None = data_dir_option,
463 model: str | None = model_option,
464 use_global: bool = _global_option,
465 temperature: float | None = temperature_option,
466 top_p: float | None = top_p_option,
467 top_k_sampling: int | None = top_k_sampling_option,
468 repeat_penalty: float | None = repeat_penalty_option,
469 num_ctx: int | None = num_ctx_option,
470 seed: int | None = seed_option,
471) -> None:
472 """Interactive chat loop (auto-syncs first)."""
473 apply_overrides(
474 data_dir=data_dir,
475 model=model,
476 use_global=use_global,
477 temperature=temperature,
478 top_p=top_p,
479 top_k_sampling=top_k_sampling,
480 repeat_penalty=repeat_penalty,
481 num_ctx=num_ctx,
482 seed=seed,
483 )
484 from lilbee.embedder import validate_model
485 from lilbee.models import ensure_chat_model
487 ensure_chat_model()
488 validate_model()
489 auto_sync(console)
490 from lilbee.cli.chat import chat_loop
492 chat_loop(console)
495@app.command()
496def version() -> None:
497 """Show the lilbee version."""
498 ver = get_version()
499 if cfg.json_mode:
500 json_output({"command": "version", "version": ver})
501 return
502 console.print(f"lilbee {ver}")
505@app.command()
506def status(
507 data_dir: Path | None = data_dir_option,
508 use_global: bool = _global_option,
509) -> None:
510 """Show indexed documents, paths, and chunk counts."""
511 apply_overrides(data_dir=data_dir, use_global=use_global)
512 if cfg.json_mode:
513 json_output(gather_status().model_dump(exclude_none=True))
514 return
515 render_status(console)
518_yes_option = typer.Option(False, "--yes", "-y", help="Skip confirmation prompt.")
521@app.command()
522def reset(
523 data_dir: Path | None = data_dir_option,
524 use_global: bool = _global_option,
525 yes: bool = _yes_option,
526) -> None:
527 """Delete all documents and data (full factory reset)."""
528 apply_overrides(data_dir=data_dir, use_global=use_global)
529 if not yes:
530 if cfg.json_mode:
531 json_output({"error": "Use --yes to confirm reset in JSON mode"})
532 raise SystemExit(1)
533 console.print(
534 f"[bold red]This will delete ALL documents and data.[/bold red]\n"
535 f" Documents: {cfg.documents_dir}\n"
536 f" Data: {cfg.data_dir}"
537 )
538 confirmed = typer.confirm("Are you sure?", default=False)
539 if not confirmed:
540 console.print("Aborted.")
541 raise SystemExit(0)
543 result = perform_reset()
545 if cfg.json_mode:
546 json_output(result.model_dump())
547 return
549 console.print(
550 f"Reset complete: {result.deleted_docs} document(s), "
551 f"{result.deleted_data} data item(s) deleted."
552 )
555@app.command()
556def init() -> None:
557 """Initialize a local .lilbee/ knowledge base in the current directory."""
558 from pathlib import Path
560 root = Path.cwd() / ".lilbee"
561 if root.is_dir():
562 if cfg.json_mode:
563 json_output({"command": "init", "path": str(root), "created": False})
564 return
565 console.print(f"Already initialized: {root}")
566 return
568 docs = root / "documents"
569 data = root / "data"
570 docs.mkdir(parents=True)
571 data.mkdir(parents=True)
572 (root / ".gitignore").write_text("data/\n")
574 if cfg.json_mode:
575 json_output({"command": "init", "path": str(root), "created": True})
576 return
577 console.print(f"Initialized local knowledge base at {root}")
580@app.command()
581def serve(
582 host: str = typer.Option(None, "--host", "-H", help="Bind address (default: 127.0.0.1)"),
583 port: int = typer.Option(None, "--port", "-p", help="Port (default: 7433)"),
584 data_dir: Path | None = data_dir_option,
585 use_global: bool = _global_option,
586) -> None:
587 """Start the HTTP API server for Obsidian and other clients."""
588 apply_overrides(data_dir=data_dir, use_global=use_global)
589 if host is not None:
590 cfg.server_host = host
591 if port is not None:
592 cfg.server_port = port
594 import logging
596 import uvicorn
598 from lilbee.server import create_app
600 logging.getLogger("asyncio").setLevel(logging.ERROR)
602 uvicorn.run(
603 create_app(),
604 host=cfg.server_host,
605 port=cfg.server_port,
606 )
609@app.command(name="mcp")
610def mcp_cmd() -> None:
611 """Start the MCP server (stdio transport) for agent integration."""
612 from lilbee.mcp import main
614 main()