Coverage for src / lilbee / mcp.py: 100%
268 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
1"""MCP server exposing lilbee as tools for AI agents."""
3from __future__ import annotations
5import asyncio
6import concurrent.futures
7import logging
8import os
9from pathlib import Path
10from typing import Any
12from mcp.server.fastmcp import Context, FastMCP
14from lilbee.cli.app import overlay_persisted_settings
15from lilbee.cli.helpers import clean_result
16from lilbee.config import cfg
17from lilbee.crawl_task import get_task, start_crawl
18from lilbee.crawler import is_url, require_valid_crawl_url
19from lilbee.services import get_services, reset_services
20from lilbee.store import SearchScope, scope_to_chunk_type
21from lilbee.wiki.shared import (
22 DRAFTS_SUBDIR,
23 SUMMARIES_SUBDIR,
24 WIKI_DISABLED_ERROR,
25)
27log = logging.getLogger(__name__)
29mcp = FastMCP("lilbee", instructions="Local RAG knowledge base. Search indexed documents.")
32@mcp.tool()
33def search(
34 query: str, top_k: int = 5, scope: str = SearchScope.BOTH.value
35) -> list[dict[str, Any]] | dict[str, Any]:
36 """Search the knowledge base for relevant document chunks.
38 ``scope`` picks the pool: ``"raw"`` (source chunks), ``"wiki"`` (wiki
39 page bodies), or ``"both"`` (default, unfiltered). Returns chunks
40 sorted by relevance. No LLM call -- uses pre-computed embeddings.
41 """
42 if not query or not query.strip():
43 return {"error": "query must not be empty"}
44 try:
45 chunk_type = scope_to_chunk_type(scope)
46 except ValueError as exc:
47 return {"error": str(exc)}
48 try:
49 results = get_services().searcher.search(query, top_k=top_k, chunk_type=chunk_type)
50 results = [r for r in results if r.distance is None or r.distance <= cfg.max_distance]
51 return [clean_result(r) for r in results]
52 except Exception as exc:
53 return {"error": str(exc)}
56@mcp.tool()
57def status() -> dict[str, Any]:
58 """Show indexed documents, configuration, and chunk counts."""
59 sources = get_services().store.get_sources()
60 return {
61 "config": {
62 "documents_dir": str(cfg.documents_dir),
63 "data_dir": str(cfg.data_dir),
64 "chat_model": cfg.chat_model,
65 "embedding_model": cfg.embedding_model,
66 "vision_model": cfg.vision_model,
67 "reranker_model": cfg.reranker_model,
68 "enable_ocr": cfg.enable_ocr,
69 "num_ctx": cfg.num_ctx,
70 "num_ctx_max": cfg.num_ctx_max,
71 "flash_attention": cfg.flash_attention,
72 "kv_cache_type": cfg.kv_cache_type.value,
73 "n_gpu_layers": cfg.n_gpu_layers,
74 },
75 "sources": [
76 {"filename": s["filename"], "chunk_count": s["chunk_count"]}
77 for s in sorted(sources, key=lambda x: x["filename"])
78 ],
79 "total_chunks": sum(s["chunk_count"] for s in sources),
80 }
83@mcp.tool()
84async def sync() -> dict[str, Any]:
85 """Sync documents directory with the vector store."""
86 from lilbee.ingest import sync as run_sync
88 return (await run_sync(quiet=True)).model_dump()
91@mcp.tool()
92async def add(
93 paths: list[str],
94 force: bool = False,
95 enable_ocr: bool | None = None,
96 ocr_timeout: float | None = None,
97) -> dict[str, Any]:
98 """Add files, directories, or URLs to the knowledge base and sync.
99 Copies the given paths into the documents directory, then ingests them.
100 URLs (http:// or https://) are fetched as markdown and saved to _web/.
101 Paths must be absolute and accessible from this machine.
103 Args:
104 paths: Absolute file/directory paths or URLs to add.
105 force: Overwrite files that already exist in the knowledge base.
106 enable_ocr: Force vision OCR on (True), off (False), or auto-detect
107 from chat model capabilities (None/omit).
108 ocr_timeout: Per-page timeout in seconds for vision OCR. Overrides
109 the configured default for this invocation only.
110 """
111 from lilbee.cli.helpers import copy_files
112 from lilbee.ingest import sync as run_sync
114 errors: list[str] = []
115 valid: list[Path] = []
116 urls: list[str] = []
117 for p_str in paths:
118 if is_url(p_str):
119 urls.append(p_str)
120 else:
121 p = Path(p_str)
122 if not p.exists():
123 errors.append(p_str)
124 else:
125 valid.append(p)
127 # Crawl URLs
128 crawled_count = 0
129 if urls:
130 from lilbee.crawler import crawler_available
132 if not crawler_available():
133 return {"error": "Web crawling requires: pip install 'lilbee[crawler]'"}
134 from lilbee.crawler import crawl_and_save
136 for url in urls:
137 try:
138 require_valid_crawl_url(url)
139 except ValueError as exc:
140 errors.append(f"{url}: {exc}")
141 continue
142 crawled_paths = await crawl_and_save(url)
143 crawled_count += len(crawled_paths)
145 copy_result = copy_files(valid, force=force)
147 from lilbee.cli.helpers import temporary_ocr_config
149 with temporary_ocr_config(enable_ocr, ocr_timeout):
150 sync_result = (await run_sync(quiet=True)).model_dump()
152 result: dict[str, Any] = {
153 "command": "add",
154 "copied": copy_result.copied,
155 "skipped": copy_result.skipped,
156 "crawled": crawled_count,
157 "errors": errors,
158 "sync": sync_result,
159 }
160 if errors or sync_result.get("failed"):
161 result["warning"] = "some files could not be processed"
162 return result
165@mcp.tool()
166def crawl(
167 url: str,
168 depth: int | None = None,
169 max_pages: int | None = None,
170) -> dict[str, Any]:
171 """Crawl a web page and add it to the knowledge base (non-blocking).
172 Launches the crawl as a background task and returns immediately with a
173 task_id. Use crawl_status(task_id) to poll progress.
175 Args:
176 url: The URL to crawl (must start with http:// or https://).
177 depth: None (default) crawls the whole site; 0 fetches only this URL;
178 positive int caps link-follow depth.
179 max_pages: None (default) means no page limit. Positive int caps total
180 pages fetched.
181 """
182 from lilbee.crawler import crawler_available
184 if not crawler_available():
185 return {"error": "Web crawling requires: pip install 'lilbee[crawler]'"}
186 try:
187 require_valid_crawl_url(url)
188 except ValueError as exc:
189 return {"error": str(exc)}
191 task_id = start_crawl(url, depth=depth, max_pages=max_pages)
192 return {"status": "started", "task_id": task_id, "url": url}
195@mcp.tool()
196def crawl_status(task_id: str) -> dict[str, Any]:
197 """Check the status of a running crawl task.
198 Returns the current state including status, pages crawled, and any error.
199 Use this to poll after crawl returns a task_id.
201 Args:
202 task_id: The task ID returned by crawl.
203 """
204 task = get_task(task_id)
205 if task is None:
206 return {"error": f"No task found with id: {task_id}"}
207 return {
208 "task_id": task.task_id,
209 "url": task.url,
210 "status": task.status.value,
211 "pages_crawled": task.pages_crawled,
212 "pages_total": task.pages_total,
213 "error": task.error,
214 "started_at": task.started_at,
215 "finished_at": task.finished_at,
216 }
219@mcp.tool()
220def init(path: str = "") -> dict[str, Any]:
221 """Initialize a local .lilbee/ knowledge base in a directory.
222 Creates .lilbee/ with documents/, data/, and .gitignore.
223 If path is empty, uses the current working directory.
224 Also switches the MCP session to use this knowledge base for
225 subsequent tool calls.
226 """
227 base = Path(path) if path else Path.cwd()
228 root = base / ".lilbee"
230 created = False
231 if not root.is_dir():
232 (root / "documents").mkdir(parents=True)
233 (root / "data").mkdir(parents=True)
234 (root / ".gitignore").write_text("data/\n")
235 created = True
237 # Switch MCP session to this project's KB. Overlay any persisted
238 # config.toml in the project base so per-vault model / generation
239 # settings take effect, matching the CLI's --data-dir behaviour.
240 cfg.data_root = base
241 cfg.documents_dir = root / "documents"
242 cfg.data_dir = root / "data"
243 cfg.lancedb_dir = root / "data" / "lancedb"
244 overlay_persisted_settings(base)
245 reset_services()
247 return {"command": "init", "path": str(root), "created": created}
250@mcp.tool()
251def remove(names: list[str], delete_files: bool = False) -> dict[str, Any]:
252 """Remove documents from the knowledge base by source name.
253 Args:
254 names: Source filenames to remove (as shown by status).
255 delete_files: Also delete the physical files from the documents directory.
256 """
257 result = get_services().store.remove_documents(
258 names, delete_files=delete_files, documents_dir=cfg.documents_dir
259 )
260 return {"command": "remove", "removed": result.removed, "not_found": result.not_found}
263@mcp.tool()
264def list_documents() -> dict[str, Any]:
265 """List all indexed documents with their chunk counts."""
266 sources = get_services().store.get_sources()
267 return {
268 "documents": [
269 {"filename": s["filename"], "chunk_count": s.get("chunk_count", 0)} for s in sources
270 ],
271 "total": len(sources),
272 }
275@mcp.tool()
276def reset(confirm: bool = False) -> dict[str, Any]:
277 """Delete all documents and data (full factory reset).
278 WARNING: This permanently removes all indexed documents and vector data.
279 Pass confirm=true to proceed.
280 """
281 if not confirm:
282 return {"error": "pass confirm=true to confirm deletion"}
283 from lilbee.cli import perform_reset
285 return perform_reset().model_dump()
288@mcp.tool()
289def wiki_lint(wiki_source: str = "") -> dict[str, Any]:
290 """Lint wiki pages for citation staleness, missing sources, and unmarked claims.
291 If wiki_source is provided, lint only that page. Otherwise, lint all wiki pages.
293 Args:
294 wiki_source: Path like "wiki/summaries/doc.md". Empty = lint all.
295 """
296 from lilbee.wiki.lint import lint_all, lint_wiki_page
298 store = get_services().store
299 if wiki_source:
300 issues = lint_wiki_page(wiki_source, store)
301 else:
302 report = lint_all(store)
303 issues = report.issues
304 return {
305 "command": "wiki_lint",
306 "issues": [i.to_dict() for i in issues],
307 "total": len(issues),
308 }
311@mcp.tool()
312def wiki_citations(wiki_source: str) -> dict[str, Any]:
313 """Get all citations for a wiki page.
314 Args:
315 wiki_source: Wiki page path, e.g. "wiki/summaries/doc.md".
316 """
317 records = get_services().store.get_citations_for_wiki(wiki_source)
318 return {
319 "command": "wiki_citations",
320 "wiki_source": wiki_source,
321 "citations": [dict(r) for r in records],
322 "total": len(records),
323 }
326@mcp.tool()
327def wiki_status() -> dict[str, Any]:
328 """Show wiki layer status: page counts, recent lint issues."""
329 from lilbee.wiki.lint import lint_all
331 wiki_root = cfg.data_root / cfg.wiki_dir
332 if not wiki_root.exists():
333 return {"wiki_enabled": cfg.wiki, "pages": 0, "issues": 0}
335 summaries_dir = wiki_root / SUMMARIES_SUBDIR
336 drafts_dir = wiki_root / DRAFTS_SUBDIR
337 summaries = list(summaries_dir.rglob("*.md")) if summaries_dir.exists() else []
338 drafts = list(drafts_dir.rglob("*.md")) if drafts_dir.exists() else []
340 report = lint_all(get_services().store)
341 return {
342 "wiki_enabled": cfg.wiki,
343 SUMMARIES_SUBDIR: len(summaries),
344 DRAFTS_SUBDIR: len(drafts),
345 "pages": len(summaries) + len(drafts),
346 "lint_errors": report.error_count,
347 "lint_warnings": report.warning_count,
348 }
351@mcp.tool()
352def wiki_list() -> dict[str, Any]:
353 """List all wiki pages (summaries and concepts) with metadata.
354 Returns page slugs, titles, types, source counts, and creation dates.
355 """
356 if not cfg.wiki:
357 return {"error": WIKI_DISABLED_ERROR}
358 from dataclasses import asdict
360 from lilbee.wiki.browse import list_pages
362 wiki_root = cfg.data_root / cfg.wiki_dir
363 pages = list_pages(wiki_root)
364 return {
365 "command": "wiki_list",
366 "pages": [asdict(p) for p in pages],
367 "total": len(pages),
368 }
371@mcp.tool()
372def wiki_read(slug: str) -> dict[str, Any]:
373 """Read a wiki page's content and frontmatter by slug.
374 Args:
375 slug: Page slug like "summaries/my-doc" or "concepts/typing".
376 """
377 if not cfg.wiki:
378 return {"error": WIKI_DISABLED_ERROR}
379 from dataclasses import asdict
381 from lilbee.wiki.browse import read_page
383 wiki_root = cfg.data_root / cfg.wiki_dir
384 result = read_page(wiki_root, slug)
385 if result is None:
386 return {"error": f"wiki page not found: {slug}"}
387 return {"command": "wiki_read", **asdict(result)}
390@mcp.tool()
391def wiki_build() -> dict[str, Any]:
392 """Build the concept and entity wiki across all ingested sources.
394 Returns ``{paths, entities, count}``.
395 """
396 if not cfg.wiki:
397 return {"error": WIKI_DISABLED_ERROR}
398 from lilbee.wiki import run_full_build
400 return {"command": "wiki_build", **run_full_build(cfg)}
403@mcp.tool()
404def wiki_update() -> dict[str, Any]:
405 """Refresh the concept and entity wiki after an ingest. Currently a full rebuild."""
406 if not cfg.wiki:
407 return {"error": WIKI_DISABLED_ERROR}
408 from lilbee.wiki import run_full_build
410 return {"command": "wiki_update", **run_full_build(cfg)}
413@mcp.tool()
414def wiki_synthesize() -> dict[str, Any]:
415 """Generate synthesis pages for concept clusters spanning three or more sources.
417 Returns the list of synthesis page paths written to disk. When no
418 cluster meets the 3+ source threshold, returns an empty list and
419 ``count: 0``.
420 """
421 if not cfg.wiki:
422 return {"error": WIKI_DISABLED_ERROR}
423 from lilbee.wiki import run_full_synthesize
425 return {"command": "wiki_synthesize", **run_full_synthesize(cfg)}
428@mcp.tool()
429def wiki_prune() -> dict[str, Any]:
430 """Prune stale and orphaned wiki pages.
431 Archives pages whose sources are all deleted or whose concept cluster
432 dropped below 3 live sources. Flags pages with >50% stale citations
433 for regeneration.
434 """
435 from lilbee.wiki.prune import prune_wiki
437 report = prune_wiki(get_services().store)
438 return {
439 "command": "wiki_prune",
440 "records": [r.to_dict() for r in report.records],
441 "archived": report.archived_count,
442 "flagged": report.flagged_count,
443 }
446@mcp.tool()
447def model_list(source: str = "", task: str = "") -> dict[str, Any]:
448 """List installed models across native and SDK-backend sources.
450 Args:
451 source: Filter by source: "native", "remote", or "" for all.
452 task: Filter by task: "chat", "embedding", "vision", "rerank", or "" for all.
453 """
454 from lilbee.cli.model import list_models_data
455 from lilbee.model_manager import ModelSource
457 try:
458 src = ModelSource.parse(source)
459 except ValueError as exc:
460 return {"error": str(exc)}
461 return list_models_data(source=src, task=task or None).model_dump()
464@mcp.tool()
465def model_show(model: str) -> dict[str, Any]:
466 """Show catalog and installed metadata for a model ref."""
467 from lilbee.cli.model import show_model_data
468 from lilbee.model_manager import ModelNotFoundError
470 try:
471 return show_model_data(model).model_dump()
472 except ModelNotFoundError as exc:
473 return {"error": str(exc)}
476def _log_progress_failure(future: concurrent.futures.Future[None]) -> None:
477 """Log report_progress failures without raising.
479 Progress notifications are best-effort: a failure should not abort
480 an in-flight pull.
481 """
482 try:
483 future.result()
484 except Exception:
485 log.warning("MCP report_progress failed", exc_info=True)
488@mcp.tool()
489async def model_pull(
490 model: str,
491 source: str = "native",
492 ctx: Context | None = None,
493) -> dict[str, Any]:
494 """Download a model, streaming progress via MCP notifications.
496 Args:
497 model: Model ref to pull (e.g. "Qwen/Qwen3-0.6B-GGUF" or
498 "Qwen/Qwen3-0.6B-GGUF/Qwen3-0.6B-Q4_K_M.gguf").
499 source: "native" (HuggingFace GGUF) or "remote" (SDK-managed).
500 """
501 from lilbee.catalog import DownloadProgress
502 from lilbee.cli.model import pull_model_data
503 from lilbee.model_manager import ModelSource
505 try:
506 src = ModelSource.parse(source) or ModelSource.NATIVE
507 except ValueError as exc:
508 return {"error": str(exc)}
510 loop = asyncio.get_running_loop()
512 def on_update(p: DownloadProgress) -> None:
513 if ctx is None:
514 return
515 future = asyncio.run_coroutine_threadsafe(
516 ctx.report_progress(progress=float(p.percent), total=100.0, message=p.detail),
517 loop,
518 )
519 future.add_done_callback(_log_progress_failure)
521 try:
522 result = await asyncio.to_thread(pull_model_data, model, src, on_update=on_update)
523 except (RuntimeError, PermissionError) as exc:
524 return {"error": str(exc)}
525 return result.model_dump()
528@mcp.tool()
529def model_rm(model: str, source: str = "") -> dict[str, Any]:
530 """Remove an installed model.
532 Args:
533 model: Model ref to remove.
534 source: Restrict to "native" or "remote"; empty = both.
535 """
536 from lilbee.cli.model import remove_model_data
537 from lilbee.model_manager import ModelSource
539 try:
540 src = ModelSource.parse(source)
541 except ValueError as exc:
542 return {"error": str(exc)}
543 return remove_model_data(model, source=src).model_dump()
546@mcp.tool()
547def wiki_drafts_list() -> dict[str, Any]:
548 """List pending wiki drafts with drift, faithfulness, and pairing info.
550 Read-only. Accept and reject are CLI-only (destructive, explicit).
551 """
552 from lilbee.wiki.drafts import list_drafts
554 wiki_root = cfg.data_root / cfg.wiki_dir
555 drafts = list_drafts(wiki_root)
556 return {
557 "command": "wiki_drafts_list",
558 "drafts": [d.to_dict() for d in drafts],
559 "total": len(drafts),
560 }
563@mcp.tool()
564def wiki_drafts_diff(slug: str) -> dict[str, Any]:
565 """Return a unified diff of the draft against its published counterpart.
567 Args:
568 slug: Draft slug (e.g. ``"chevrolet"``).
569 """
570 from lilbee.wiki.drafts import diff_draft
572 wiki_root = cfg.data_root / cfg.wiki_dir
573 try:
574 diff = diff_draft(slug, wiki_root)
575 except FileNotFoundError as exc:
576 return {"error": str(exc)}
577 return {"command": "wiki_drafts_diff", "slug": slug, "diff": diff}
580def main() -> None:
581 """Entry point for the MCP server."""
582 # Preload so the first tool call doesn't pay the cold-start cost
583 # of provider/embedder/store init. Failures (missing model, bad
584 # config) still surface on the first tool call rather than crashing
585 # the server before it attaches to stdio.
586 try:
587 get_services()
588 except Exception:
589 log.debug("MCP pre-warm failed; services will init on first call", exc_info=True)
591 from lilbee.parent_monitor import parse_parent_pid, watch_parent_thread
593 parent_pid = parse_parent_pid()
594 if parent_pid is not None:
595 watch_parent_thread(parent_pid, lambda: os._exit(0))
597 mcp.run()