Coverage for src / lilbee / mcp.py: 100%

268 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-29 19:16 +0000

1"""MCP server exposing lilbee as tools for AI agents.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import concurrent.futures 

7import logging 

8import os 

9from pathlib import Path 

10from typing import Any 

11 

12from mcp.server.fastmcp import Context, FastMCP 

13 

14from lilbee.cli.app import overlay_persisted_settings 

15from lilbee.cli.helpers import clean_result 

16from lilbee.config import cfg 

17from lilbee.crawl_task import get_task, start_crawl 

18from lilbee.crawler import is_url, require_valid_crawl_url 

19from lilbee.services import get_services, reset_services 

20from lilbee.store import SearchScope, scope_to_chunk_type 

21from lilbee.wiki.shared import ( 

22 DRAFTS_SUBDIR, 

23 SUMMARIES_SUBDIR, 

24 WIKI_DISABLED_ERROR, 

25) 

26 

27log = logging.getLogger(__name__) 

28 

29mcp = FastMCP("lilbee", instructions="Local RAG knowledge base. Search indexed documents.") 

30 

31 

32@mcp.tool() 

33def search( 

34 query: str, top_k: int = 5, scope: str = SearchScope.BOTH.value 

35) -> list[dict[str, Any]] | dict[str, Any]: 

36 """Search the knowledge base for relevant document chunks. 

37 

38 ``scope`` picks the pool: ``"raw"`` (source chunks), ``"wiki"`` (wiki 

39 page bodies), or ``"both"`` (default, unfiltered). Returns chunks 

40 sorted by relevance. No LLM call -- uses pre-computed embeddings. 

41 """ 

42 if not query or not query.strip(): 

43 return {"error": "query must not be empty"} 

44 try: 

45 chunk_type = scope_to_chunk_type(scope) 

46 except ValueError as exc: 

47 return {"error": str(exc)} 

48 try: 

49 results = get_services().searcher.search(query, top_k=top_k, chunk_type=chunk_type) 

50 results = [r for r in results if r.distance is None or r.distance <= cfg.max_distance] 

51 return [clean_result(r) for r in results] 

52 except Exception as exc: 

53 return {"error": str(exc)} 

54 

55 

56@mcp.tool() 

57def status() -> dict[str, Any]: 

58 """Show indexed documents, configuration, and chunk counts.""" 

59 sources = get_services().store.get_sources() 

60 return { 

61 "config": { 

62 "documents_dir": str(cfg.documents_dir), 

63 "data_dir": str(cfg.data_dir), 

64 "chat_model": cfg.chat_model, 

65 "embedding_model": cfg.embedding_model, 

66 "vision_model": cfg.vision_model, 

67 "reranker_model": cfg.reranker_model, 

68 "enable_ocr": cfg.enable_ocr, 

69 "num_ctx": cfg.num_ctx, 

70 "num_ctx_max": cfg.num_ctx_max, 

71 "flash_attention": cfg.flash_attention, 

72 "kv_cache_type": cfg.kv_cache_type.value, 

73 "n_gpu_layers": cfg.n_gpu_layers, 

74 }, 

75 "sources": [ 

76 {"filename": s["filename"], "chunk_count": s["chunk_count"]} 

77 for s in sorted(sources, key=lambda x: x["filename"]) 

78 ], 

79 "total_chunks": sum(s["chunk_count"] for s in sources), 

80 } 

81 

82 

83@mcp.tool() 

84async def sync() -> dict[str, Any]: 

85 """Sync documents directory with the vector store.""" 

86 from lilbee.ingest import sync as run_sync 

87 

88 return (await run_sync(quiet=True)).model_dump() 

89 

90 

91@mcp.tool() 

92async def add( 

93 paths: list[str], 

94 force: bool = False, 

95 enable_ocr: bool | None = None, 

96 ocr_timeout: float | None = None, 

97) -> dict[str, Any]: 

98 """Add files, directories, or URLs to the knowledge base and sync. 

99 Copies the given paths into the documents directory, then ingests them. 

100 URLs (http:// or https://) are fetched as markdown and saved to _web/. 

101 Paths must be absolute and accessible from this machine. 

102 

103 Args: 

104 paths: Absolute file/directory paths or URLs to add. 

105 force: Overwrite files that already exist in the knowledge base. 

106 enable_ocr: Force vision OCR on (True), off (False), or auto-detect 

107 from chat model capabilities (None/omit). 

108 ocr_timeout: Per-page timeout in seconds for vision OCR. Overrides 

109 the configured default for this invocation only. 

110 """ 

111 from lilbee.cli.helpers import copy_files 

112 from lilbee.ingest import sync as run_sync 

113 

114 errors: list[str] = [] 

115 valid: list[Path] = [] 

116 urls: list[str] = [] 

117 for p_str in paths: 

118 if is_url(p_str): 

119 urls.append(p_str) 

120 else: 

121 p = Path(p_str) 

122 if not p.exists(): 

123 errors.append(p_str) 

124 else: 

125 valid.append(p) 

126 

127 # Crawl URLs 

128 crawled_count = 0 

129 if urls: 

130 from lilbee.crawler import crawler_available 

131 

132 if not crawler_available(): 

133 return {"error": "Web crawling requires: pip install 'lilbee[crawler]'"} 

134 from lilbee.crawler import crawl_and_save 

135 

136 for url in urls: 

137 try: 

138 require_valid_crawl_url(url) 

139 except ValueError as exc: 

140 errors.append(f"{url}: {exc}") 

141 continue 

142 crawled_paths = await crawl_and_save(url) 

143 crawled_count += len(crawled_paths) 

144 

145 copy_result = copy_files(valid, force=force) 

146 

147 from lilbee.cli.helpers import temporary_ocr_config 

148 

149 with temporary_ocr_config(enable_ocr, ocr_timeout): 

150 sync_result = (await run_sync(quiet=True)).model_dump() 

151 

152 result: dict[str, Any] = { 

153 "command": "add", 

154 "copied": copy_result.copied, 

155 "skipped": copy_result.skipped, 

156 "crawled": crawled_count, 

157 "errors": errors, 

158 "sync": sync_result, 

159 } 

160 if errors or sync_result.get("failed"): 

161 result["warning"] = "some files could not be processed" 

162 return result 

163 

164 

165@mcp.tool() 

166def crawl( 

167 url: str, 

168 depth: int | None = None, 

169 max_pages: int | None = None, 

170) -> dict[str, Any]: 

171 """Crawl a web page and add it to the knowledge base (non-blocking). 

172 Launches the crawl as a background task and returns immediately with a 

173 task_id. Use crawl_status(task_id) to poll progress. 

174 

175 Args: 

176 url: The URL to crawl (must start with http:// or https://). 

177 depth: None (default) crawls the whole site; 0 fetches only this URL; 

178 positive int caps link-follow depth. 

179 max_pages: None (default) means no page limit. Positive int caps total 

180 pages fetched. 

181 """ 

182 from lilbee.crawler import crawler_available 

183 

184 if not crawler_available(): 

185 return {"error": "Web crawling requires: pip install 'lilbee[crawler]'"} 

186 try: 

187 require_valid_crawl_url(url) 

188 except ValueError as exc: 

189 return {"error": str(exc)} 

190 

191 task_id = start_crawl(url, depth=depth, max_pages=max_pages) 

192 return {"status": "started", "task_id": task_id, "url": url} 

193 

194 

195@mcp.tool() 

196def crawl_status(task_id: str) -> dict[str, Any]: 

197 """Check the status of a running crawl task. 

198 Returns the current state including status, pages crawled, and any error. 

199 Use this to poll after crawl returns a task_id. 

200 

201 Args: 

202 task_id: The task ID returned by crawl. 

203 """ 

204 task = get_task(task_id) 

205 if task is None: 

206 return {"error": f"No task found with id: {task_id}"} 

207 return { 

208 "task_id": task.task_id, 

209 "url": task.url, 

210 "status": task.status.value, 

211 "pages_crawled": task.pages_crawled, 

212 "pages_total": task.pages_total, 

213 "error": task.error, 

214 "started_at": task.started_at, 

215 "finished_at": task.finished_at, 

216 } 

217 

218 

219@mcp.tool() 

220def init(path: str = "") -> dict[str, Any]: 

221 """Initialize a local .lilbee/ knowledge base in a directory. 

222 Creates .lilbee/ with documents/, data/, and .gitignore. 

223 If path is empty, uses the current working directory. 

224 Also switches the MCP session to use this knowledge base for 

225 subsequent tool calls. 

226 """ 

227 base = Path(path) if path else Path.cwd() 

228 root = base / ".lilbee" 

229 

230 created = False 

231 if not root.is_dir(): 

232 (root / "documents").mkdir(parents=True) 

233 (root / "data").mkdir(parents=True) 

234 (root / ".gitignore").write_text("data/\n") 

235 created = True 

236 

237 # Switch MCP session to this project's KB. Overlay any persisted 

238 # config.toml in the project base so per-vault model / generation 

239 # settings take effect, matching the CLI's --data-dir behaviour. 

240 cfg.data_root = base 

241 cfg.documents_dir = root / "documents" 

242 cfg.data_dir = root / "data" 

243 cfg.lancedb_dir = root / "data" / "lancedb" 

244 overlay_persisted_settings(base) 

245 reset_services() 

246 

247 return {"command": "init", "path": str(root), "created": created} 

248 

249 

250@mcp.tool() 

251def remove(names: list[str], delete_files: bool = False) -> dict[str, Any]: 

252 """Remove documents from the knowledge base by source name. 

253 Args: 

254 names: Source filenames to remove (as shown by status). 

255 delete_files: Also delete the physical files from the documents directory. 

256 """ 

257 result = get_services().store.remove_documents( 

258 names, delete_files=delete_files, documents_dir=cfg.documents_dir 

259 ) 

260 return {"command": "remove", "removed": result.removed, "not_found": result.not_found} 

261 

262 

263@mcp.tool() 

264def list_documents() -> dict[str, Any]: 

265 """List all indexed documents with their chunk counts.""" 

266 sources = get_services().store.get_sources() 

267 return { 

268 "documents": [ 

269 {"filename": s["filename"], "chunk_count": s.get("chunk_count", 0)} for s in sources 

270 ], 

271 "total": len(sources), 

272 } 

273 

274 

275@mcp.tool() 

276def reset(confirm: bool = False) -> dict[str, Any]: 

277 """Delete all documents and data (full factory reset). 

278 WARNING: This permanently removes all indexed documents and vector data. 

279 Pass confirm=true to proceed. 

280 """ 

281 if not confirm: 

282 return {"error": "pass confirm=true to confirm deletion"} 

283 from lilbee.cli import perform_reset 

284 

285 return perform_reset().model_dump() 

286 

287 

288@mcp.tool() 

289def wiki_lint(wiki_source: str = "") -> dict[str, Any]: 

290 """Lint wiki pages for citation staleness, missing sources, and unmarked claims. 

291 If wiki_source is provided, lint only that page. Otherwise, lint all wiki pages. 

292 

293 Args: 

294 wiki_source: Path like "wiki/summaries/doc.md". Empty = lint all. 

295 """ 

296 from lilbee.wiki.lint import lint_all, lint_wiki_page 

297 

298 store = get_services().store 

299 if wiki_source: 

300 issues = lint_wiki_page(wiki_source, store) 

301 else: 

302 report = lint_all(store) 

303 issues = report.issues 

304 return { 

305 "command": "wiki_lint", 

306 "issues": [i.to_dict() for i in issues], 

307 "total": len(issues), 

308 } 

309 

310 

311@mcp.tool() 

312def wiki_citations(wiki_source: str) -> dict[str, Any]: 

313 """Get all citations for a wiki page. 

314 Args: 

315 wiki_source: Wiki page path, e.g. "wiki/summaries/doc.md". 

316 """ 

317 records = get_services().store.get_citations_for_wiki(wiki_source) 

318 return { 

319 "command": "wiki_citations", 

320 "wiki_source": wiki_source, 

321 "citations": [dict(r) for r in records], 

322 "total": len(records), 

323 } 

324 

325 

326@mcp.tool() 

327def wiki_status() -> dict[str, Any]: 

328 """Show wiki layer status: page counts, recent lint issues.""" 

329 from lilbee.wiki.lint import lint_all 

330 

331 wiki_root = cfg.data_root / cfg.wiki_dir 

332 if not wiki_root.exists(): 

333 return {"wiki_enabled": cfg.wiki, "pages": 0, "issues": 0} 

334 

335 summaries_dir = wiki_root / SUMMARIES_SUBDIR 

336 drafts_dir = wiki_root / DRAFTS_SUBDIR 

337 summaries = list(summaries_dir.rglob("*.md")) if summaries_dir.exists() else [] 

338 drafts = list(drafts_dir.rglob("*.md")) if drafts_dir.exists() else [] 

339 

340 report = lint_all(get_services().store) 

341 return { 

342 "wiki_enabled": cfg.wiki, 

343 SUMMARIES_SUBDIR: len(summaries), 

344 DRAFTS_SUBDIR: len(drafts), 

345 "pages": len(summaries) + len(drafts), 

346 "lint_errors": report.error_count, 

347 "lint_warnings": report.warning_count, 

348 } 

349 

350 

351@mcp.tool() 

352def wiki_list() -> dict[str, Any]: 

353 """List all wiki pages (summaries and concepts) with metadata. 

354 Returns page slugs, titles, types, source counts, and creation dates. 

355 """ 

356 if not cfg.wiki: 

357 return {"error": WIKI_DISABLED_ERROR} 

358 from dataclasses import asdict 

359 

360 from lilbee.wiki.browse import list_pages 

361 

362 wiki_root = cfg.data_root / cfg.wiki_dir 

363 pages = list_pages(wiki_root) 

364 return { 

365 "command": "wiki_list", 

366 "pages": [asdict(p) for p in pages], 

367 "total": len(pages), 

368 } 

369 

370 

371@mcp.tool() 

372def wiki_read(slug: str) -> dict[str, Any]: 

373 """Read a wiki page's content and frontmatter by slug. 

374 Args: 

375 slug: Page slug like "summaries/my-doc" or "concepts/typing". 

376 """ 

377 if not cfg.wiki: 

378 return {"error": WIKI_DISABLED_ERROR} 

379 from dataclasses import asdict 

380 

381 from lilbee.wiki.browse import read_page 

382 

383 wiki_root = cfg.data_root / cfg.wiki_dir 

384 result = read_page(wiki_root, slug) 

385 if result is None: 

386 return {"error": f"wiki page not found: {slug}"} 

387 return {"command": "wiki_read", **asdict(result)} 

388 

389 

390@mcp.tool() 

391def wiki_build() -> dict[str, Any]: 

392 """Build the concept and entity wiki across all ingested sources. 

393 

394 Returns ``{paths, entities, count}``. 

395 """ 

396 if not cfg.wiki: 

397 return {"error": WIKI_DISABLED_ERROR} 

398 from lilbee.wiki import run_full_build 

399 

400 return {"command": "wiki_build", **run_full_build(cfg)} 

401 

402 

403@mcp.tool() 

404def wiki_update() -> dict[str, Any]: 

405 """Refresh the concept and entity wiki after an ingest. Currently a full rebuild.""" 

406 if not cfg.wiki: 

407 return {"error": WIKI_DISABLED_ERROR} 

408 from lilbee.wiki import run_full_build 

409 

410 return {"command": "wiki_update", **run_full_build(cfg)} 

411 

412 

413@mcp.tool() 

414def wiki_synthesize() -> dict[str, Any]: 

415 """Generate synthesis pages for concept clusters spanning three or more sources. 

416 

417 Returns the list of synthesis page paths written to disk. When no 

418 cluster meets the 3+ source threshold, returns an empty list and 

419 ``count: 0``. 

420 """ 

421 if not cfg.wiki: 

422 return {"error": WIKI_DISABLED_ERROR} 

423 from lilbee.wiki import run_full_synthesize 

424 

425 return {"command": "wiki_synthesize", **run_full_synthesize(cfg)} 

426 

427 

428@mcp.tool() 

429def wiki_prune() -> dict[str, Any]: 

430 """Prune stale and orphaned wiki pages. 

431 Archives pages whose sources are all deleted or whose concept cluster 

432 dropped below 3 live sources. Flags pages with >50% stale citations 

433 for regeneration. 

434 """ 

435 from lilbee.wiki.prune import prune_wiki 

436 

437 report = prune_wiki(get_services().store) 

438 return { 

439 "command": "wiki_prune", 

440 "records": [r.to_dict() for r in report.records], 

441 "archived": report.archived_count, 

442 "flagged": report.flagged_count, 

443 } 

444 

445 

446@mcp.tool() 

447def model_list(source: str = "", task: str = "") -> dict[str, Any]: 

448 """List installed models across native and SDK-backend sources. 

449 

450 Args: 

451 source: Filter by source: "native", "remote", or "" for all. 

452 task: Filter by task: "chat", "embedding", "vision", "rerank", or "" for all. 

453 """ 

454 from lilbee.cli.model import list_models_data 

455 from lilbee.model_manager import ModelSource 

456 

457 try: 

458 src = ModelSource.parse(source) 

459 except ValueError as exc: 

460 return {"error": str(exc)} 

461 return list_models_data(source=src, task=task or None).model_dump() 

462 

463 

464@mcp.tool() 

465def model_show(model: str) -> dict[str, Any]: 

466 """Show catalog and installed metadata for a model ref.""" 

467 from lilbee.cli.model import show_model_data 

468 from lilbee.model_manager import ModelNotFoundError 

469 

470 try: 

471 return show_model_data(model).model_dump() 

472 except ModelNotFoundError as exc: 

473 return {"error": str(exc)} 

474 

475 

476def _log_progress_failure(future: concurrent.futures.Future[None]) -> None: 

477 """Log report_progress failures without raising. 

478 

479 Progress notifications are best-effort: a failure should not abort 

480 an in-flight pull. 

481 """ 

482 try: 

483 future.result() 

484 except Exception: 

485 log.warning("MCP report_progress failed", exc_info=True) 

486 

487 

488@mcp.tool() 

489async def model_pull( 

490 model: str, 

491 source: str = "native", 

492 ctx: Context | None = None, 

493) -> dict[str, Any]: 

494 """Download a model, streaming progress via MCP notifications. 

495 

496 Args: 

497 model: Model ref to pull (e.g. "Qwen/Qwen3-0.6B-GGUF" or 

498 "Qwen/Qwen3-0.6B-GGUF/Qwen3-0.6B-Q4_K_M.gguf"). 

499 source: "native" (HuggingFace GGUF) or "remote" (SDK-managed). 

500 """ 

501 from lilbee.catalog import DownloadProgress 

502 from lilbee.cli.model import pull_model_data 

503 from lilbee.model_manager import ModelSource 

504 

505 try: 

506 src = ModelSource.parse(source) or ModelSource.NATIVE 

507 except ValueError as exc: 

508 return {"error": str(exc)} 

509 

510 loop = asyncio.get_running_loop() 

511 

512 def on_update(p: DownloadProgress) -> None: 

513 if ctx is None: 

514 return 

515 future = asyncio.run_coroutine_threadsafe( 

516 ctx.report_progress(progress=float(p.percent), total=100.0, message=p.detail), 

517 loop, 

518 ) 

519 future.add_done_callback(_log_progress_failure) 

520 

521 try: 

522 result = await asyncio.to_thread(pull_model_data, model, src, on_update=on_update) 

523 except (RuntimeError, PermissionError) as exc: 

524 return {"error": str(exc)} 

525 return result.model_dump() 

526 

527 

528@mcp.tool() 

529def model_rm(model: str, source: str = "") -> dict[str, Any]: 

530 """Remove an installed model. 

531 

532 Args: 

533 model: Model ref to remove. 

534 source: Restrict to "native" or "remote"; empty = both. 

535 """ 

536 from lilbee.cli.model import remove_model_data 

537 from lilbee.model_manager import ModelSource 

538 

539 try: 

540 src = ModelSource.parse(source) 

541 except ValueError as exc: 

542 return {"error": str(exc)} 

543 return remove_model_data(model, source=src).model_dump() 

544 

545 

546@mcp.tool() 

547def wiki_drafts_list() -> dict[str, Any]: 

548 """List pending wiki drafts with drift, faithfulness, and pairing info. 

549 

550 Read-only. Accept and reject are CLI-only (destructive, explicit). 

551 """ 

552 from lilbee.wiki.drafts import list_drafts 

553 

554 wiki_root = cfg.data_root / cfg.wiki_dir 

555 drafts = list_drafts(wiki_root) 

556 return { 

557 "command": "wiki_drafts_list", 

558 "drafts": [d.to_dict() for d in drafts], 

559 "total": len(drafts), 

560 } 

561 

562 

563@mcp.tool() 

564def wiki_drafts_diff(slug: str) -> dict[str, Any]: 

565 """Return a unified diff of the draft against its published counterpart. 

566 

567 Args: 

568 slug: Draft slug (e.g. ``"chevrolet"``). 

569 """ 

570 from lilbee.wiki.drafts import diff_draft 

571 

572 wiki_root = cfg.data_root / cfg.wiki_dir 

573 try: 

574 diff = diff_draft(slug, wiki_root) 

575 except FileNotFoundError as exc: 

576 return {"error": str(exc)} 

577 return {"command": "wiki_drafts_diff", "slug": slug, "diff": diff} 

578 

579 

580def main() -> None: 

581 """Entry point for the MCP server.""" 

582 # Preload so the first tool call doesn't pay the cold-start cost 

583 # of provider/embedder/store init. Failures (missing model, bad 

584 # config) still surface on the first tool call rather than crashing 

585 # the server before it attaches to stdio. 

586 try: 

587 get_services() 

588 except Exception: 

589 log.debug("MCP pre-warm failed; services will init on first call", exc_info=True) 

590 

591 from lilbee.parent_monitor import parse_parent_pid, watch_parent_thread 

592 

593 parent_pid = parse_parent_pid() 

594 if parent_pid is not None: 

595 watch_parent_thread(parent_pid, lambda: os._exit(0)) 

596 

597 mcp.run()