Coverage for src / lilbee / mcp.py: 100%
61 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 08:27 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 08:27 +0000
1"""MCP server exposing lilbee as tools for AI agents."""
3from __future__ import annotations
5from pathlib import Path
6from typing import TYPE_CHECKING
8from mcp.server.fastmcp import FastMCP
10if TYPE_CHECKING:
11 from lilbee.store import SearchChunk
13mcp = FastMCP("lilbee", instructions="Local RAG knowledge base. Search indexed documents.")
16@mcp.tool()
17def lilbee_search(query: str, top_k: int = 5) -> list[dict]:
18 """Search the knowledge base for relevant document chunks.
20 Returns chunks sorted by relevance. No LLM call — uses pre-computed embeddings.
21 """
22 from lilbee.query import search_context
24 results = search_context(query, top_k=top_k)
25 return [clean(r) for r in results]
28@mcp.tool()
29def lilbee_status() -> dict:
30 """Show indexed documents, configuration, and chunk counts."""
31 from lilbee.config import cfg
32 from lilbee.store import get_sources
34 sources = get_sources()
35 return {
36 "config": {
37 "documents_dir": str(cfg.documents_dir),
38 "data_dir": str(cfg.data_dir),
39 "chat_model": cfg.chat_model,
40 "embedding_model": cfg.embedding_model,
41 **({"vision_model": cfg.vision_model} if cfg.vision_model else {}),
42 },
43 "sources": [
44 {"filename": s["filename"], "chunk_count": s["chunk_count"]}
45 for s in sorted(sources, key=lambda x: x["filename"])
46 ],
47 "total_chunks": sum(s["chunk_count"] for s in sources),
48 }
51@mcp.tool()
52async def lilbee_sync() -> dict:
53 """Sync documents directory with the vector store."""
54 from lilbee.ingest import sync
56 return (await sync(quiet=True)).model_dump()
59@mcp.tool()
60async def lilbee_add(
61 paths: list[str],
62 force: bool = False,
63 vision_model: str = "",
64) -> dict:
65 """Add files or directories to the knowledge base and sync.
67 Copies the given paths into the documents directory, then ingests them.
68 Paths must be absolute and accessible from this machine.
70 Args:
71 paths: Absolute file or directory paths to add.
72 force: Overwrite files that already exist in the knowledge base.
73 vision_model: Ollama vision model for scanned PDF OCR
74 (e.g. "maternion/LightOnOCR-2:latest"). If empty, uses
75 the configured default. If no model is configured,
76 scanned PDFs are skipped.
77 """
78 from lilbee.cli.helpers import copy_files
79 from lilbee.config import cfg
80 from lilbee.ingest import sync
82 errors: list[str] = []
83 valid: list[Path] = []
84 for p_str in paths:
85 p = Path(p_str)
86 if not p.exists():
87 errors.append(p_str)
88 else:
89 valid.append(p)
91 copy_result = copy_files(valid, force=force)
93 old_vision = cfg.vision_model
94 if vision_model:
95 cfg.vision_model = vision_model
96 try:
97 sync_result = (await sync(quiet=True, force_vision=bool(vision_model))).model_dump()
98 finally:
99 if vision_model:
100 cfg.vision_model = old_vision
102 return {
103 "command": "add",
104 "copied": copy_result.copied,
105 "skipped": copy_result.skipped,
106 "errors": errors,
107 "sync": sync_result,
108 }
111@mcp.tool()
112def lilbee_init(path: str = "") -> dict:
113 """Initialize a local .lilbee/ knowledge base in a directory.
115 Creates .lilbee/ with documents/, data/, and .gitignore.
116 If path is empty, uses the current working directory.
117 """
118 root = Path(path) / ".lilbee" if path else Path.cwd() / ".lilbee"
119 if root.is_dir():
120 return {"command": "init", "path": str(root), "created": False}
122 (root / "documents").mkdir(parents=True)
123 (root / "data").mkdir(parents=True)
124 (root / ".gitignore").write_text("data/\n")
125 return {"command": "init", "path": str(root), "created": True}
128@mcp.tool()
129def lilbee_reset() -> dict:
130 """Delete all documents and data (full factory reset).
132 WARNING: This permanently removes all indexed documents and vector data.
133 """
134 from lilbee.cli import perform_reset
136 return perform_reset().model_dump()
139def clean(result: SearchChunk) -> dict[str, object]:
140 """Strip vector field and rename _distance for output."""
141 cleaned = {k: v for k, v in result.items() if k != "vector"}
142 if "_distance" in cleaned:
143 cleaned["distance"] = cleaned.pop("_distance")
144 return cleaned
147def main() -> None:
148 """Entry point for the MCP server."""
149 mcp.run()