Coverage for src / lilbee / cli / helpers.py: 100%
160 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 08:27 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 08:27 +0000
1"""Shared helper functions for CLI commands and slash commands."""
3from __future__ import annotations
5import asyncio
6import json
7import shutil
8from collections.abc import Generator
9from dataclasses import dataclass, field
10from importlib.metadata import version as _pkg_version
11from pathlib import Path
12from typing import TYPE_CHECKING
14from pydantic import BaseModel
15from rich.console import Console, RenderableType
16from rich.table import Table
18from lilbee.config import cfg
19from lilbee.platform import is_ignored_dir
21if TYPE_CHECKING:
22 from lilbee.query import ChatMessage
23 from lilbee.store import SearchChunk
26class ResetResult(BaseModel):
27 """Result of a full knowledge base reset."""
29 command: str = "reset"
30 deleted_docs: int
31 deleted_data: int
32 documents_dir: str
33 data_dir: str
36class StatusConfig(BaseModel):
37 """Configuration section of a status response."""
39 documents_dir: str
40 data_dir: str
41 chat_model: str
42 embedding_model: str
43 vision_model: str | None = None
46class SourceInfo(BaseModel):
47 """A single indexed source in a status response."""
49 filename: str
50 file_hash: str
51 chunk_count: int
52 ingested_at: str
55class StatusResult(BaseModel):
56 """Full status response for the knowledge base."""
58 command: str = "status"
59 config: StatusConfig
60 sources: list[SourceInfo]
61 total_chunks: int
63 def __rich_console__(
64 self, console: Console, options: object
65 ) -> Generator[RenderableType, None, None]:
66 yield f"[bold]Documents:[/bold] {self.config.documents_dir}"
67 yield f"[bold]Database:[/bold] {self.config.data_dir}"
68 yield f"[bold]Chat model:[/bold] {self.config.chat_model}"
69 yield f"[bold]Embeddings:[/bold] {self.config.embedding_model}"
70 if self.config.vision_model:
71 yield f"[bold]Vision OCR:[/bold] {self.config.vision_model}"
72 yield ""
74 if not self.sources:
75 yield (
76 "No documents indexed. Drop files into the documents directory "
77 "and run 'lilbee sync'."
78 )
79 return
81 table = Table(title="Indexed Documents")
82 table.add_column("File", style="cyan")
83 table.add_column("Hash", style="dim", max_width=12)
84 table.add_column("Chunks", justify="right")
85 table.add_column("Ingested", style="dim")
86 for s in self.sources:
87 table.add_row(s.filename, s.file_hash, str(s.chunk_count), s.ingested_at)
88 yield table
89 yield (
90 f"\n[bold]{len(self.sources)}[/bold] documents, [bold]{self.total_chunks}[/bold] chunks"
91 )
94def _copytree_ignore(directory: str, contents: list[str]) -> set[str]:
95 """Ignore callback for shutil.copytree — filters ignored directories."""
96 return {
97 name
98 for name in contents
99 if (Path(directory) / name).is_dir() and is_ignored_dir(name, cfg.ignore_dirs)
100 }
103def get_version() -> str:
104 """Return the installed lilbee version."""
105 return _pkg_version("lilbee")
108def json_output(data: dict) -> None:
109 """Print a JSON object to stdout."""
110 print(json.dumps(data))
113def clean_result(result: SearchChunk) -> dict:
114 """Strip vector field and rename _distance for JSON output."""
115 cleaned = {k: v for k, v in result.items() if k != "vector"}
116 if "_distance" in cleaned:
117 cleaned["distance"] = cleaned.pop("_distance")
118 return cleaned
121def gather_status() -> StatusResult:
122 """Collect status data as a typed model (shared by human + JSON output)."""
123 from lilbee.store import get_sources
125 sources = get_sources()
126 sorted_sources = sorted(sources, key=lambda x: x["filename"])
127 total_chunks = sum(s["chunk_count"] for s in sources)
128 return StatusResult(
129 config=StatusConfig(
130 documents_dir=str(cfg.documents_dir),
131 data_dir=str(cfg.data_dir),
132 chat_model=cfg.chat_model,
133 embedding_model=cfg.embedding_model,
134 vision_model=cfg.vision_model or None,
135 ),
136 sources=[
137 SourceInfo(
138 filename=s["filename"],
139 file_hash=s["file_hash"][:12],
140 chunk_count=s["chunk_count"],
141 ingested_at=s["ingested_at"][:19],
142 )
143 for s in sorted_sources
144 ],
145 total_chunks=total_chunks,
146 )
149def render_status(con: Console) -> None:
150 """Print status info (documents, paths, chunk counts)."""
151 con.print(gather_status())
154@dataclass
155class CopyResult:
156 """Result of copying files into the documents directory."""
158 copied: list[str] = field(default_factory=list)
159 skipped: list[str] = field(default_factory=list)
162def copy_files(paths: list[Path], *, force: bool = False) -> CopyResult:
163 """Copy paths into documents dir. Returns structured result (no console output)."""
164 cfg.documents_dir.mkdir(parents=True, exist_ok=True)
165 result = CopyResult()
166 for p in paths:
167 dest = cfg.documents_dir / p.name
168 if dest.exists() and not force:
169 result.skipped.append(p.name)
170 continue
171 if p.is_dir():
172 shutil.copytree(p, dest, dirs_exist_ok=True, ignore=_copytree_ignore)
173 else:
174 shutil.copy2(p, dest)
175 result.copied.append(p.name)
176 return result
179def copy_paths(paths: list[Path], con: Console, *, force: bool = False) -> list[str]:
180 """Copy *paths* into the documents directory. Returns list of copied names."""
181 result = copy_files(paths, force=force)
182 for name in result.skipped:
183 con.print(
184 f"[yellow]Warning:[/yellow] {name} already exists in knowledge base "
185 f"(use --force to overwrite)"
186 )
187 return result.copied
190def add_paths(
191 paths: list[Path], con: Console, *, force: bool = False, force_vision: bool = False
192) -> None:
193 """Copy *paths* into the knowledge base and sync (human output)."""
194 from lilbee.ingest import sync
196 copied = copy_paths(paths, con, force=force)
197 con.print(f"[dim]Copied {len(copied)} path(s) to {cfg.documents_dir}[/dim]")
199 result = asyncio.run(sync(force_vision=force_vision))
200 con.print(result)
203def stream_response(
204 question: str,
205 history: list[ChatMessage],
206 con: Console,
207) -> None:
208 """Stream an LLM answer and append the exchange to *history*."""
209 from lilbee.query import ask_stream
211 stream = ask_stream(question, history=history)
212 response_parts: list[str] = []
213 cancelled = False
215 try:
216 # Show a spinner while waiting for the first token from the LLM.
217 with con.status("Thinking..."):
218 first_token = next(stream, None)
220 if first_token is not None:
221 con.print(first_token, end="")
222 response_parts.append(first_token)
224 for token in stream:
225 con.print(token, end="")
226 response_parts.append(token)
227 except KeyboardInterrupt:
228 cancelled = True
229 stream.close()
230 con.print("\n[dim](stopped)[/dim]")
231 except RuntimeError as exc:
232 con.print(f"\n[red]Error:[/red] {exc}")
233 return
235 if not cancelled:
236 con.print("\n")
237 full = "".join(response_parts)
238 if full:
239 history.append({"role": "user", "content": question})
240 history.append({"role": "assistant", "content": full})
243def perform_reset() -> ResetResult:
244 """Delete all documents and data. Returns summary of what was deleted."""
245 deleted_docs = 0
246 deleted_data = 0
248 if cfg.documents_dir.exists():
249 for item in list(cfg.documents_dir.iterdir()):
250 if item.is_dir():
251 shutil.rmtree(item)
252 else:
253 item.unlink()
254 deleted_docs += 1
256 if cfg.data_dir.exists():
257 for item in list(cfg.data_dir.iterdir()):
258 if item.is_dir():
259 shutil.rmtree(item)
260 else:
261 item.unlink()
262 deleted_data += 1
264 return ResetResult(
265 deleted_docs=deleted_docs,
266 deleted_data=deleted_data,
267 documents_dir=str(cfg.documents_dir),
268 data_dir=str(cfg.data_dir),
269 )
272def sync_result_to_json(result: object) -> dict:
273 """Convert a SyncResult to the JSON output envelope."""
274 from lilbee.ingest import SyncResult
276 assert isinstance(result, SyncResult)
277 return {"command": "sync", **result.model_dump()}
280def auto_sync(con: Console) -> None:
281 """Run document sync before queries."""
282 from lilbee.ingest import sync
284 try:
285 result = asyncio.run(sync())
286 except RuntimeError as exc:
287 con.print(f"[red]Error:[/red] {exc}")
288 raise SystemExit(1) from None
289 total = len(result.added) + len(result.updated) + len(result.removed) + len(result.failed)
290 if total:
291 con.print(
292 f"[dim]Synced: {len(result.added)} added, "
293 f"{len(result.updated)} updated, "
294 f"{len(result.removed)} removed, "
295 f"{len(result.failed)} failed[/dim]"
296 )