Coverage for src / lilbee / cli / helpers.py: 100%

160 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 08:27 +0000

1"""Shared helper functions for CLI commands and slash commands.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import json 

7import shutil 

8from collections.abc import Generator 

9from dataclasses import dataclass, field 

10from importlib.metadata import version as _pkg_version 

11from pathlib import Path 

12from typing import TYPE_CHECKING 

13 

14from pydantic import BaseModel 

15from rich.console import Console, RenderableType 

16from rich.table import Table 

17 

18from lilbee.config import cfg 

19from lilbee.platform import is_ignored_dir 

20 

21if TYPE_CHECKING: 

22 from lilbee.query import ChatMessage 

23 from lilbee.store import SearchChunk 

24 

25 

26class ResetResult(BaseModel): 

27 """Result of a full knowledge base reset.""" 

28 

29 command: str = "reset" 

30 deleted_docs: int 

31 deleted_data: int 

32 documents_dir: str 

33 data_dir: str 

34 

35 

36class StatusConfig(BaseModel): 

37 """Configuration section of a status response.""" 

38 

39 documents_dir: str 

40 data_dir: str 

41 chat_model: str 

42 embedding_model: str 

43 vision_model: str | None = None 

44 

45 

46class SourceInfo(BaseModel): 

47 """A single indexed source in a status response.""" 

48 

49 filename: str 

50 file_hash: str 

51 chunk_count: int 

52 ingested_at: str 

53 

54 

55class StatusResult(BaseModel): 

56 """Full status response for the knowledge base.""" 

57 

58 command: str = "status" 

59 config: StatusConfig 

60 sources: list[SourceInfo] 

61 total_chunks: int 

62 

63 def __rich_console__( 

64 self, console: Console, options: object 

65 ) -> Generator[RenderableType, None, None]: 

66 yield f"[bold]Documents:[/bold] {self.config.documents_dir}" 

67 yield f"[bold]Database:[/bold] {self.config.data_dir}" 

68 yield f"[bold]Chat model:[/bold] {self.config.chat_model}" 

69 yield f"[bold]Embeddings:[/bold] {self.config.embedding_model}" 

70 if self.config.vision_model: 

71 yield f"[bold]Vision OCR:[/bold] {self.config.vision_model}" 

72 yield "" 

73 

74 if not self.sources: 

75 yield ( 

76 "No documents indexed. Drop files into the documents directory " 

77 "and run 'lilbee sync'." 

78 ) 

79 return 

80 

81 table = Table(title="Indexed Documents") 

82 table.add_column("File", style="cyan") 

83 table.add_column("Hash", style="dim", max_width=12) 

84 table.add_column("Chunks", justify="right") 

85 table.add_column("Ingested", style="dim") 

86 for s in self.sources: 

87 table.add_row(s.filename, s.file_hash, str(s.chunk_count), s.ingested_at) 

88 yield table 

89 yield ( 

90 f"\n[bold]{len(self.sources)}[/bold] documents, [bold]{self.total_chunks}[/bold] chunks" 

91 ) 

92 

93 

94def _copytree_ignore(directory: str, contents: list[str]) -> set[str]: 

95 """Ignore callback for shutil.copytree — filters ignored directories.""" 

96 return { 

97 name 

98 for name in contents 

99 if (Path(directory) / name).is_dir() and is_ignored_dir(name, cfg.ignore_dirs) 

100 } 

101 

102 

103def get_version() -> str: 

104 """Return the installed lilbee version.""" 

105 return _pkg_version("lilbee") 

106 

107 

108def json_output(data: dict) -> None: 

109 """Print a JSON object to stdout.""" 

110 print(json.dumps(data)) 

111 

112 

113def clean_result(result: SearchChunk) -> dict: 

114 """Strip vector field and rename _distance for JSON output.""" 

115 cleaned = {k: v for k, v in result.items() if k != "vector"} 

116 if "_distance" in cleaned: 

117 cleaned["distance"] = cleaned.pop("_distance") 

118 return cleaned 

119 

120 

121def gather_status() -> StatusResult: 

122 """Collect status data as a typed model (shared by human + JSON output).""" 

123 from lilbee.store import get_sources 

124 

125 sources = get_sources() 

126 sorted_sources = sorted(sources, key=lambda x: x["filename"]) 

127 total_chunks = sum(s["chunk_count"] for s in sources) 

128 return StatusResult( 

129 config=StatusConfig( 

130 documents_dir=str(cfg.documents_dir), 

131 data_dir=str(cfg.data_dir), 

132 chat_model=cfg.chat_model, 

133 embedding_model=cfg.embedding_model, 

134 vision_model=cfg.vision_model or None, 

135 ), 

136 sources=[ 

137 SourceInfo( 

138 filename=s["filename"], 

139 file_hash=s["file_hash"][:12], 

140 chunk_count=s["chunk_count"], 

141 ingested_at=s["ingested_at"][:19], 

142 ) 

143 for s in sorted_sources 

144 ], 

145 total_chunks=total_chunks, 

146 ) 

147 

148 

149def render_status(con: Console) -> None: 

150 """Print status info (documents, paths, chunk counts).""" 

151 con.print(gather_status()) 

152 

153 

154@dataclass 

155class CopyResult: 

156 """Result of copying files into the documents directory.""" 

157 

158 copied: list[str] = field(default_factory=list) 

159 skipped: list[str] = field(default_factory=list) 

160 

161 

162def copy_files(paths: list[Path], *, force: bool = False) -> CopyResult: 

163 """Copy paths into documents dir. Returns structured result (no console output).""" 

164 cfg.documents_dir.mkdir(parents=True, exist_ok=True) 

165 result = CopyResult() 

166 for p in paths: 

167 dest = cfg.documents_dir / p.name 

168 if dest.exists() and not force: 

169 result.skipped.append(p.name) 

170 continue 

171 if p.is_dir(): 

172 shutil.copytree(p, dest, dirs_exist_ok=True, ignore=_copytree_ignore) 

173 else: 

174 shutil.copy2(p, dest) 

175 result.copied.append(p.name) 

176 return result 

177 

178 

179def copy_paths(paths: list[Path], con: Console, *, force: bool = False) -> list[str]: 

180 """Copy *paths* into the documents directory. Returns list of copied names.""" 

181 result = copy_files(paths, force=force) 

182 for name in result.skipped: 

183 con.print( 

184 f"[yellow]Warning:[/yellow] {name} already exists in knowledge base " 

185 f"(use --force to overwrite)" 

186 ) 

187 return result.copied 

188 

189 

190def add_paths( 

191 paths: list[Path], con: Console, *, force: bool = False, force_vision: bool = False 

192) -> None: 

193 """Copy *paths* into the knowledge base and sync (human output).""" 

194 from lilbee.ingest import sync 

195 

196 copied = copy_paths(paths, con, force=force) 

197 con.print(f"[dim]Copied {len(copied)} path(s) to {cfg.documents_dir}[/dim]") 

198 

199 result = asyncio.run(sync(force_vision=force_vision)) 

200 con.print(result) 

201 

202 

203def stream_response( 

204 question: str, 

205 history: list[ChatMessage], 

206 con: Console, 

207) -> None: 

208 """Stream an LLM answer and append the exchange to *history*.""" 

209 from lilbee.query import ask_stream 

210 

211 stream = ask_stream(question, history=history) 

212 response_parts: list[str] = [] 

213 cancelled = False 

214 

215 try: 

216 # Show a spinner while waiting for the first token from the LLM. 

217 with con.status("Thinking..."): 

218 first_token = next(stream, None) 

219 

220 if first_token is not None: 

221 con.print(first_token, end="") 

222 response_parts.append(first_token) 

223 

224 for token in stream: 

225 con.print(token, end="") 

226 response_parts.append(token) 

227 except KeyboardInterrupt: 

228 cancelled = True 

229 stream.close() 

230 con.print("\n[dim](stopped)[/dim]") 

231 except RuntimeError as exc: 

232 con.print(f"\n[red]Error:[/red] {exc}") 

233 return 

234 

235 if not cancelled: 

236 con.print("\n") 

237 full = "".join(response_parts) 

238 if full: 

239 history.append({"role": "user", "content": question}) 

240 history.append({"role": "assistant", "content": full}) 

241 

242 

243def perform_reset() -> ResetResult: 

244 """Delete all documents and data. Returns summary of what was deleted.""" 

245 deleted_docs = 0 

246 deleted_data = 0 

247 

248 if cfg.documents_dir.exists(): 

249 for item in list(cfg.documents_dir.iterdir()): 

250 if item.is_dir(): 

251 shutil.rmtree(item) 

252 else: 

253 item.unlink() 

254 deleted_docs += 1 

255 

256 if cfg.data_dir.exists(): 

257 for item in list(cfg.data_dir.iterdir()): 

258 if item.is_dir(): 

259 shutil.rmtree(item) 

260 else: 

261 item.unlink() 

262 deleted_data += 1 

263 

264 return ResetResult( 

265 deleted_docs=deleted_docs, 

266 deleted_data=deleted_data, 

267 documents_dir=str(cfg.documents_dir), 

268 data_dir=str(cfg.data_dir), 

269 ) 

270 

271 

272def sync_result_to_json(result: object) -> dict: 

273 """Convert a SyncResult to the JSON output envelope.""" 

274 from lilbee.ingest import SyncResult 

275 

276 assert isinstance(result, SyncResult) 

277 return {"command": "sync", **result.model_dump()} 

278 

279 

280def auto_sync(con: Console) -> None: 

281 """Run document sync before queries.""" 

282 from lilbee.ingest import sync 

283 

284 try: 

285 result = asyncio.run(sync()) 

286 except RuntimeError as exc: 

287 con.print(f"[red]Error:[/red] {exc}") 

288 raise SystemExit(1) from None 

289 total = len(result.added) + len(result.updated) + len(result.removed) + len(result.failed) 

290 if total: 

291 con.print( 

292 f"[dim]Synced: {len(result.added)} added, " 

293 f"{len(result.updated)} updated, " 

294 f"{len(result.removed)} removed, " 

295 f"{len(result.failed)} failed[/dim]" 

296 )