Coverage for src / lilbee / server / handlers.py: 100%
242 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 08:27 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 08:27 +0000
1"""Framework-agnostic route handlers for the lilbee HTTP server.
3Every public function is a plain async callable — no framework imports.
4Return types are dicts (JSON responses), lists, or async generators of SSE strings.
5"""
7from __future__ import annotations
9import asyncio
10import json
11import logging
12import threading
13from collections.abc import AsyncGenerator
14from pathlib import Path
15from typing import TYPE_CHECKING, Any
17from pydantic import BaseModel
19from lilbee.progress import DetailedProgressCallback, EventType
21if TYPE_CHECKING:
22 from lilbee.query import ChatMessage
24log = logging.getLogger(__name__)
26MAX_ADD_FILES = 100
29class ModelCatalogEntry(BaseModel):
30 """A single model in the catalog."""
32 name: str
33 size_gb: float
34 min_ram_gb: float
35 description: str
36 installed: bool
39class ModelCatalogSection(BaseModel):
40 """Chat or vision model catalog with active model and installed list."""
42 active: str
43 catalog: list[ModelCatalogEntry]
44 installed: list[str]
47class ModelsResponse(BaseModel):
48 """Response for the list-models endpoint."""
50 chat: ModelCatalogSection
51 vision: ModelCatalogSection
54def sse_event(event: str, data: Any) -> str:
55 """Format a single Server-Sent Event string."""
56 return f"event: {event}\ndata: {json.dumps(data)}\n\n"
59def _make_sse_callback(queue: asyncio.Queue[str | None]) -> DetailedProgressCallback:
60 """Return a progress callback that serializes events into an asyncio queue.
62 Safe to call from both the event loop thread (async code) and worker
63 threads (``asyncio.to_thread`` / ``run_in_executor``).
64 """
65 loop = asyncio.get_event_loop()
67 def _callback(event_type: EventType, data: dict[str, Any]) -> None:
68 payload = f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
69 try:
70 running = asyncio.get_running_loop()
71 except RuntimeError:
72 running = None
73 if running is loop:
74 queue.put_nowait(payload)
75 else:
76 loop.call_soon_threadsafe(queue.put_nowait, payload)
78 return _callback
81async def _sse_generator(queue: asyncio.Queue[str | None]) -> AsyncGenerator[bytes, None]:
82 """Yield SSE-formatted bytes from a queue until sentinel (None) is received."""
83 while True:
84 item = await queue.get()
85 if item is None:
86 break
87 yield item.encode()
90async def health() -> dict[str, str]:
91 """Return service health and version."""
92 from lilbee.cli.helpers import get_version
94 return {"status": "ok", "version": get_version()}
97async def status() -> dict[str, Any]:
98 """Return config, sources, and chunk counts."""
99 from lilbee.cli.helpers import gather_status
101 return gather_status().model_dump(exclude_none=True)
104async def search(q: str, top_k: int = 5) -> list[dict[str, Any]]:
105 """Search and return grouped DocumentResults as dicts."""
106 from lilbee.query import search_context
107 from lilbee.results import group, to_dicts
109 results = search_context(q, top_k=top_k)
110 grouped = group(results)
111 return to_dicts(grouped)
114async def ask(
115 question: str, top_k: int = 0, options: dict[str, Any] | None = None
116) -> dict[str, Any]:
117 """One-shot RAG answer. Returns {answer, sources[]}."""
118 from lilbee.cli.helpers import clean_result
119 from lilbee.config import cfg
120 from lilbee.query import ask_raw
122 opts = cfg.generation_options(**options) if options else None
123 result = ask_raw(question, top_k=top_k, options=opts)
124 return {
125 "answer": result.answer,
126 "sources": [clean_result(s) for s in result.sources],
127 }
130async def ask_stream(
131 question: str, top_k: int = 0, options: dict[str, Any] | None = None
132) -> AsyncGenerator[str, None]:
133 """Yield SSE events: token, sources, done."""
134 yield "" # force generator
135 from lilbee.cli.helpers import clean_result
136 from lilbee.config import cfg
137 from lilbee.query import (
138 _CONTEXT_TEMPLATE,
139 build_context,
140 search_context,
141 sort_by_relevance,
142 )
144 results = search_context(question, top_k=top_k)
145 if not results:
146 yield sse_event("error", {"message": "No relevant documents found."})
147 return
149 results = sort_by_relevance(results)
150 context = build_context(results)
151 prompt = _CONTEXT_TEMPLATE.format(context=context, question=question)
152 messages: list[ChatMessage] = [{"role": "system", "content": cfg.system_prompt}]
153 messages.append({"role": "user", "content": prompt})
154 opts = cfg.generation_options(**options) if options else cfg.generation_options()
156 queue: asyncio.Queue[str | None] = asyncio.Queue()
157 cancel = threading.Event()
158 error_holder: list[str] = []
160 def _generate() -> None:
161 try:
162 import ollama as ollama_client
164 stream = ollama_client.chat(
165 model=cfg.chat_model, messages=messages, stream=True, options=opts or None
166 )
167 for chunk in stream:
168 if cancel.is_set():
169 break
170 token = chunk.message.content
171 if token:
172 queue.put_nowait(sse_event("token", {"token": token}))
173 except Exception as exc:
174 error_holder.append(str(exc))
175 finally:
176 queue.put_nowait(None)
178 loop = asyncio.get_event_loop()
179 loop.run_in_executor(None, _generate)
180 try:
181 while True:
182 event = await queue.get()
183 if event is None:
184 break
185 yield event
186 except (asyncio.CancelledError, GeneratorExit):
187 log.info("Stream cancelled by client")
188 cancel.set()
189 return
191 if error_holder:
192 yield sse_event("error", {"message": error_holder[0]})
193 return
195 yield sse_event("sources", [clean_result(s) for s in results])
196 yield sse_event("done", {})
199async def chat(
200 question: str,
201 history: list[ChatMessage],
202 top_k: int = 0,
203 options: dict[str, Any] | None = None,
204) -> dict[str, Any]:
205 """Chat with history. Returns {answer, sources[]}."""
206 from lilbee.cli.helpers import clean_result
207 from lilbee.config import cfg
208 from lilbee.query import ask_raw
210 opts = cfg.generation_options(**options) if options else None
211 result = ask_raw(question, top_k=top_k, history=history, options=opts)
212 return {
213 "answer": result.answer,
214 "sources": [clean_result(s) for s in result.sources],
215 }
218async def chat_stream(
219 question: str,
220 history: list[ChatMessage],
221 top_k: int = 0,
222 options: dict[str, Any] | None = None,
223) -> AsyncGenerator[str, None]:
224 """Yield SSE events with chat history support."""
225 yield "" # force generator
226 from lilbee.cli.helpers import clean_result
227 from lilbee.config import cfg
228 from lilbee.query import (
229 _CONTEXT_TEMPLATE,
230 build_context,
231 search_context,
232 sort_by_relevance,
233 )
235 results = search_context(question, top_k=top_k)
236 if not results:
237 yield sse_event("error", {"message": "No relevant documents found."})
238 return
240 results = sort_by_relevance(results)
241 context = build_context(results)
242 prompt = _CONTEXT_TEMPLATE.format(context=context, question=question)
243 messages: list[ChatMessage] = [{"role": "system", "content": cfg.system_prompt}]
244 messages.extend(history)
245 messages.append({"role": "user", "content": prompt})
246 opts = cfg.generation_options(**options) if options else cfg.generation_options()
248 queue: asyncio.Queue[str | None] = asyncio.Queue()
249 cancel = threading.Event()
250 error_holder: list[str] = []
252 def _generate() -> None:
253 try:
254 import ollama as ollama_client
256 stream = ollama_client.chat(
257 model=cfg.chat_model, messages=messages, stream=True, options=opts or None
258 )
259 for chunk in stream:
260 if cancel.is_set():
261 break
262 token = chunk.message.content
263 if token:
264 queue.put_nowait(sse_event("token", {"token": token}))
265 except Exception as exc:
266 error_holder.append(str(exc))
267 finally:
268 queue.put_nowait(None)
270 loop = asyncio.get_event_loop()
271 loop.run_in_executor(None, _generate)
272 try:
273 while True:
274 event = await queue.get()
275 if event is None:
276 break
277 yield event
278 except (asyncio.CancelledError, GeneratorExit):
279 log.info("Stream cancelled by client")
280 cancel.set()
281 return
283 if error_holder:
284 yield sse_event("error", {"message": error_holder[0]})
285 return
287 yield sse_event("sources", [clean_result(s) for s in results])
288 yield sse_event("done", {})
291async def sync_stream(*, force_vision: bool = False) -> AsyncGenerator[str, None]:
292 """Trigger sync, yield SSE progress events, then done event."""
293 from lilbee.ingest import SyncResult, sync
295 queue: asyncio.Queue[str | None] = asyncio.Queue()
296 callback = _make_sse_callback(queue)
298 async def run_sync() -> SyncResult:
299 return await sync(quiet=True, on_progress=callback, force_vision=force_vision)
301 task = asyncio.create_task(run_sync())
302 while not task.done() or not queue.empty():
303 try:
304 item = await asyncio.wait_for(queue.get(), timeout=0.1)
305 except TimeoutError:
306 continue
307 if item is not None:
308 yield item
309 yield sse_event("done", task.result().model_dump())
312async def _run_add(
313 paths: list[str],
314 force: bool,
315 vision_model: str,
316 queue: asyncio.Queue[str | None],
317) -> None:
318 """Copy files and sync, pushing SSE events to the queue."""
319 from lilbee.cli.helpers import copy_files
320 from lilbee.config import cfg
321 from lilbee.ingest import sync
323 callback = _make_sse_callback(queue)
325 errors: list[str] = []
326 valid: list[Path] = []
327 for p_str in paths:
328 p = Path(p_str)
329 if not p.exists():
330 errors.append(p_str)
331 else:
332 valid.append(p)
334 copy_result = copy_files(valid, force=force)
336 old_vision = cfg.vision_model
337 if vision_model:
338 cfg.vision_model = vision_model
339 try:
340 sync_result = await sync(quiet=True, force_vision=bool(vision_model), on_progress=callback)
341 finally:
342 if vision_model:
343 cfg.vision_model = old_vision
345 summary = {
346 "copied": copy_result.copied,
347 "skipped": copy_result.skipped,
348 "errors": errors,
349 "sync": sync_result.model_dump(),
350 }
351 payload = f"event: summary\ndata: {json.dumps(summary)}\n\n"
352 queue.put_nowait(payload)
353 queue.put_nowait(None) # sentinel
356AddResult = tuple[list[str], asyncio.Queue[str | None], asyncio.Task[None]]
359async def add_files(data: dict[str, Any]) -> AddResult:
360 """Validate and start the add-files operation.
362 Returns (paths, queue, task) for the Litestar adapter to stream.
363 Raises ValueError on validation failure.
364 """
365 paths = data.get("paths")
366 if not isinstance(paths, list) or not paths:
367 raise ValueError("'paths' must be a non-empty list of strings")
368 if len(paths) > MAX_ADD_FILES:
369 raise ValueError(f"Too many files: {len(paths)} exceeds limit of {MAX_ADD_FILES}")
371 force = bool(data.get("force", False))
372 vision_model = str(data.get("vision_model", "") or "")
374 queue: asyncio.Queue[str | None] = asyncio.Queue()
375 task = asyncio.create_task(_run_add(paths, force, vision_model, queue))
376 return paths, queue, task
379async def list_models() -> dict[str, Any]:
380 """Return chat and vision model catalogs with installed status."""
381 from lilbee.cli.chat import list_ollama_models
382 from lilbee.config import cfg
383 from lilbee.models import MODEL_CATALOG, VISION_CATALOG
385 installed = set(list_ollama_models())
386 chat_installed = set(list_ollama_models(exclude_vision=True))
387 vision_names = {v.name for v in VISION_CATALOG}
389 response = ModelsResponse(
390 chat=ModelCatalogSection(
391 active=cfg.chat_model,
392 catalog=[
393 ModelCatalogEntry(
394 name=m.name,
395 size_gb=m.size_gb,
396 min_ram_gb=m.min_ram_gb,
397 description=m.description,
398 installed=m.name in installed,
399 )
400 for m in MODEL_CATALOG
401 ],
402 installed=sorted(chat_installed),
403 ),
404 vision=ModelCatalogSection(
405 active=cfg.vision_model,
406 catalog=[
407 ModelCatalogEntry(
408 name=m.name,
409 size_gb=m.size_gb,
410 min_ram_gb=m.min_ram_gb,
411 description=m.description,
412 installed=m.name in installed,
413 )
414 for m in VISION_CATALOG
415 ],
416 installed=sorted(m for m in installed if m in vision_names),
417 ),
418 )
419 return response.model_dump()
422async def set_chat_model(model: str) -> dict[str, str]:
423 """Switch active chat model. Returns {model}."""
424 from lilbee import settings
425 from lilbee.config import cfg
426 from lilbee.models import ensure_tag
428 tagged = ensure_tag(model)
429 cfg.chat_model = tagged
430 settings.set_value(cfg.data_root, "chat_model", tagged)
431 return {"model": tagged}
434async def set_vision_model(model: str) -> dict[str, str]:
435 """Switch active vision model. Pass empty string to disable. Returns {model}."""
436 from lilbee import settings
437 from lilbee.config import cfg
439 cfg.vision_model = model
440 settings.set_value(cfg.data_root, "vision_model", model)
441 return {"model": model}