Coverage for src / lilbee / server / handlers.py: 100%

242 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 08:27 +0000

1"""Framework-agnostic route handlers for the lilbee HTTP server. 

2 

3Every public function is a plain async callable — no framework imports. 

4Return types are dicts (JSON responses), lists, or async generators of SSE strings. 

5""" 

6 

7from __future__ import annotations 

8 

9import asyncio 

10import json 

11import logging 

12import threading 

13from collections.abc import AsyncGenerator 

14from pathlib import Path 

15from typing import TYPE_CHECKING, Any 

16 

17from pydantic import BaseModel 

18 

19from lilbee.progress import DetailedProgressCallback, EventType 

20 

21if TYPE_CHECKING: 

22 from lilbee.query import ChatMessage 

23 

24log = logging.getLogger(__name__) 

25 

26MAX_ADD_FILES = 100 

27 

28 

29class ModelCatalogEntry(BaseModel): 

30 """A single model in the catalog.""" 

31 

32 name: str 

33 size_gb: float 

34 min_ram_gb: float 

35 description: str 

36 installed: bool 

37 

38 

39class ModelCatalogSection(BaseModel): 

40 """Chat or vision model catalog with active model and installed list.""" 

41 

42 active: str 

43 catalog: list[ModelCatalogEntry] 

44 installed: list[str] 

45 

46 

47class ModelsResponse(BaseModel): 

48 """Response for the list-models endpoint.""" 

49 

50 chat: ModelCatalogSection 

51 vision: ModelCatalogSection 

52 

53 

54def sse_event(event: str, data: Any) -> str: 

55 """Format a single Server-Sent Event string.""" 

56 return f"event: {event}\ndata: {json.dumps(data)}\n\n" 

57 

58 

59def _make_sse_callback(queue: asyncio.Queue[str | None]) -> DetailedProgressCallback: 

60 """Return a progress callback that serializes events into an asyncio queue. 

61 

62 Safe to call from both the event loop thread (async code) and worker 

63 threads (``asyncio.to_thread`` / ``run_in_executor``). 

64 """ 

65 loop = asyncio.get_event_loop() 

66 

67 def _callback(event_type: EventType, data: dict[str, Any]) -> None: 

68 payload = f"event: {event_type}\ndata: {json.dumps(data)}\n\n" 

69 try: 

70 running = asyncio.get_running_loop() 

71 except RuntimeError: 

72 running = None 

73 if running is loop: 

74 queue.put_nowait(payload) 

75 else: 

76 loop.call_soon_threadsafe(queue.put_nowait, payload) 

77 

78 return _callback 

79 

80 

81async def _sse_generator(queue: asyncio.Queue[str | None]) -> AsyncGenerator[bytes, None]: 

82 """Yield SSE-formatted bytes from a queue until sentinel (None) is received.""" 

83 while True: 

84 item = await queue.get() 

85 if item is None: 

86 break 

87 yield item.encode() 

88 

89 

90async def health() -> dict[str, str]: 

91 """Return service health and version.""" 

92 from lilbee.cli.helpers import get_version 

93 

94 return {"status": "ok", "version": get_version()} 

95 

96 

97async def status() -> dict[str, Any]: 

98 """Return config, sources, and chunk counts.""" 

99 from lilbee.cli.helpers import gather_status 

100 

101 return gather_status().model_dump(exclude_none=True) 

102 

103 

104async def search(q: str, top_k: int = 5) -> list[dict[str, Any]]: 

105 """Search and return grouped DocumentResults as dicts.""" 

106 from lilbee.query import search_context 

107 from lilbee.results import group, to_dicts 

108 

109 results = search_context(q, top_k=top_k) 

110 grouped = group(results) 

111 return to_dicts(grouped) 

112 

113 

114async def ask( 

115 question: str, top_k: int = 0, options: dict[str, Any] | None = None 

116) -> dict[str, Any]: 

117 """One-shot RAG answer. Returns {answer, sources[]}.""" 

118 from lilbee.cli.helpers import clean_result 

119 from lilbee.config import cfg 

120 from lilbee.query import ask_raw 

121 

122 opts = cfg.generation_options(**options) if options else None 

123 result = ask_raw(question, top_k=top_k, options=opts) 

124 return { 

125 "answer": result.answer, 

126 "sources": [clean_result(s) for s in result.sources], 

127 } 

128 

129 

130async def ask_stream( 

131 question: str, top_k: int = 0, options: dict[str, Any] | None = None 

132) -> AsyncGenerator[str, None]: 

133 """Yield SSE events: token, sources, done.""" 

134 yield "" # force generator 

135 from lilbee.cli.helpers import clean_result 

136 from lilbee.config import cfg 

137 from lilbee.query import ( 

138 _CONTEXT_TEMPLATE, 

139 build_context, 

140 search_context, 

141 sort_by_relevance, 

142 ) 

143 

144 results = search_context(question, top_k=top_k) 

145 if not results: 

146 yield sse_event("error", {"message": "No relevant documents found."}) 

147 return 

148 

149 results = sort_by_relevance(results) 

150 context = build_context(results) 

151 prompt = _CONTEXT_TEMPLATE.format(context=context, question=question) 

152 messages: list[ChatMessage] = [{"role": "system", "content": cfg.system_prompt}] 

153 messages.append({"role": "user", "content": prompt}) 

154 opts = cfg.generation_options(**options) if options else cfg.generation_options() 

155 

156 queue: asyncio.Queue[str | None] = asyncio.Queue() 

157 cancel = threading.Event() 

158 error_holder: list[str] = [] 

159 

160 def _generate() -> None: 

161 try: 

162 import ollama as ollama_client 

163 

164 stream = ollama_client.chat( 

165 model=cfg.chat_model, messages=messages, stream=True, options=opts or None 

166 ) 

167 for chunk in stream: 

168 if cancel.is_set(): 

169 break 

170 token = chunk.message.content 

171 if token: 

172 queue.put_nowait(sse_event("token", {"token": token})) 

173 except Exception as exc: 

174 error_holder.append(str(exc)) 

175 finally: 

176 queue.put_nowait(None) 

177 

178 loop = asyncio.get_event_loop() 

179 loop.run_in_executor(None, _generate) 

180 try: 

181 while True: 

182 event = await queue.get() 

183 if event is None: 

184 break 

185 yield event 

186 except (asyncio.CancelledError, GeneratorExit): 

187 log.info("Stream cancelled by client") 

188 cancel.set() 

189 return 

190 

191 if error_holder: 

192 yield sse_event("error", {"message": error_holder[0]}) 

193 return 

194 

195 yield sse_event("sources", [clean_result(s) for s in results]) 

196 yield sse_event("done", {}) 

197 

198 

199async def chat( 

200 question: str, 

201 history: list[ChatMessage], 

202 top_k: int = 0, 

203 options: dict[str, Any] | None = None, 

204) -> dict[str, Any]: 

205 """Chat with history. Returns {answer, sources[]}.""" 

206 from lilbee.cli.helpers import clean_result 

207 from lilbee.config import cfg 

208 from lilbee.query import ask_raw 

209 

210 opts = cfg.generation_options(**options) if options else None 

211 result = ask_raw(question, top_k=top_k, history=history, options=opts) 

212 return { 

213 "answer": result.answer, 

214 "sources": [clean_result(s) for s in result.sources], 

215 } 

216 

217 

218async def chat_stream( 

219 question: str, 

220 history: list[ChatMessage], 

221 top_k: int = 0, 

222 options: dict[str, Any] | None = None, 

223) -> AsyncGenerator[str, None]: 

224 """Yield SSE events with chat history support.""" 

225 yield "" # force generator 

226 from lilbee.cli.helpers import clean_result 

227 from lilbee.config import cfg 

228 from lilbee.query import ( 

229 _CONTEXT_TEMPLATE, 

230 build_context, 

231 search_context, 

232 sort_by_relevance, 

233 ) 

234 

235 results = search_context(question, top_k=top_k) 

236 if not results: 

237 yield sse_event("error", {"message": "No relevant documents found."}) 

238 return 

239 

240 results = sort_by_relevance(results) 

241 context = build_context(results) 

242 prompt = _CONTEXT_TEMPLATE.format(context=context, question=question) 

243 messages: list[ChatMessage] = [{"role": "system", "content": cfg.system_prompt}] 

244 messages.extend(history) 

245 messages.append({"role": "user", "content": prompt}) 

246 opts = cfg.generation_options(**options) if options else cfg.generation_options() 

247 

248 queue: asyncio.Queue[str | None] = asyncio.Queue() 

249 cancel = threading.Event() 

250 error_holder: list[str] = [] 

251 

252 def _generate() -> None: 

253 try: 

254 import ollama as ollama_client 

255 

256 stream = ollama_client.chat( 

257 model=cfg.chat_model, messages=messages, stream=True, options=opts or None 

258 ) 

259 for chunk in stream: 

260 if cancel.is_set(): 

261 break 

262 token = chunk.message.content 

263 if token: 

264 queue.put_nowait(sse_event("token", {"token": token})) 

265 except Exception as exc: 

266 error_holder.append(str(exc)) 

267 finally: 

268 queue.put_nowait(None) 

269 

270 loop = asyncio.get_event_loop() 

271 loop.run_in_executor(None, _generate) 

272 try: 

273 while True: 

274 event = await queue.get() 

275 if event is None: 

276 break 

277 yield event 

278 except (asyncio.CancelledError, GeneratorExit): 

279 log.info("Stream cancelled by client") 

280 cancel.set() 

281 return 

282 

283 if error_holder: 

284 yield sse_event("error", {"message": error_holder[0]}) 

285 return 

286 

287 yield sse_event("sources", [clean_result(s) for s in results]) 

288 yield sse_event("done", {}) 

289 

290 

291async def sync_stream(*, force_vision: bool = False) -> AsyncGenerator[str, None]: 

292 """Trigger sync, yield SSE progress events, then done event.""" 

293 from lilbee.ingest import SyncResult, sync 

294 

295 queue: asyncio.Queue[str | None] = asyncio.Queue() 

296 callback = _make_sse_callback(queue) 

297 

298 async def run_sync() -> SyncResult: 

299 return await sync(quiet=True, on_progress=callback, force_vision=force_vision) 

300 

301 task = asyncio.create_task(run_sync()) 

302 while not task.done() or not queue.empty(): 

303 try: 

304 item = await asyncio.wait_for(queue.get(), timeout=0.1) 

305 except TimeoutError: 

306 continue 

307 if item is not None: 

308 yield item 

309 yield sse_event("done", task.result().model_dump()) 

310 

311 

312async def _run_add( 

313 paths: list[str], 

314 force: bool, 

315 vision_model: str, 

316 queue: asyncio.Queue[str | None], 

317) -> None: 

318 """Copy files and sync, pushing SSE events to the queue.""" 

319 from lilbee.cli.helpers import copy_files 

320 from lilbee.config import cfg 

321 from lilbee.ingest import sync 

322 

323 callback = _make_sse_callback(queue) 

324 

325 errors: list[str] = [] 

326 valid: list[Path] = [] 

327 for p_str in paths: 

328 p = Path(p_str) 

329 if not p.exists(): 

330 errors.append(p_str) 

331 else: 

332 valid.append(p) 

333 

334 copy_result = copy_files(valid, force=force) 

335 

336 old_vision = cfg.vision_model 

337 if vision_model: 

338 cfg.vision_model = vision_model 

339 try: 

340 sync_result = await sync(quiet=True, force_vision=bool(vision_model), on_progress=callback) 

341 finally: 

342 if vision_model: 

343 cfg.vision_model = old_vision 

344 

345 summary = { 

346 "copied": copy_result.copied, 

347 "skipped": copy_result.skipped, 

348 "errors": errors, 

349 "sync": sync_result.model_dump(), 

350 } 

351 payload = f"event: summary\ndata: {json.dumps(summary)}\n\n" 

352 queue.put_nowait(payload) 

353 queue.put_nowait(None) # sentinel 

354 

355 

356AddResult = tuple[list[str], asyncio.Queue[str | None], asyncio.Task[None]] 

357 

358 

359async def add_files(data: dict[str, Any]) -> AddResult: 

360 """Validate and start the add-files operation. 

361 

362 Returns (paths, queue, task) for the Litestar adapter to stream. 

363 Raises ValueError on validation failure. 

364 """ 

365 paths = data.get("paths") 

366 if not isinstance(paths, list) or not paths: 

367 raise ValueError("'paths' must be a non-empty list of strings") 

368 if len(paths) > MAX_ADD_FILES: 

369 raise ValueError(f"Too many files: {len(paths)} exceeds limit of {MAX_ADD_FILES}") 

370 

371 force = bool(data.get("force", False)) 

372 vision_model = str(data.get("vision_model", "") or "") 

373 

374 queue: asyncio.Queue[str | None] = asyncio.Queue() 

375 task = asyncio.create_task(_run_add(paths, force, vision_model, queue)) 

376 return paths, queue, task 

377 

378 

379async def list_models() -> dict[str, Any]: 

380 """Return chat and vision model catalogs with installed status.""" 

381 from lilbee.cli.chat import list_ollama_models 

382 from lilbee.config import cfg 

383 from lilbee.models import MODEL_CATALOG, VISION_CATALOG 

384 

385 installed = set(list_ollama_models()) 

386 chat_installed = set(list_ollama_models(exclude_vision=True)) 

387 vision_names = {v.name for v in VISION_CATALOG} 

388 

389 response = ModelsResponse( 

390 chat=ModelCatalogSection( 

391 active=cfg.chat_model, 

392 catalog=[ 

393 ModelCatalogEntry( 

394 name=m.name, 

395 size_gb=m.size_gb, 

396 min_ram_gb=m.min_ram_gb, 

397 description=m.description, 

398 installed=m.name in installed, 

399 ) 

400 for m in MODEL_CATALOG 

401 ], 

402 installed=sorted(chat_installed), 

403 ), 

404 vision=ModelCatalogSection( 

405 active=cfg.vision_model, 

406 catalog=[ 

407 ModelCatalogEntry( 

408 name=m.name, 

409 size_gb=m.size_gb, 

410 min_ram_gb=m.min_ram_gb, 

411 description=m.description, 

412 installed=m.name in installed, 

413 ) 

414 for m in VISION_CATALOG 

415 ], 

416 installed=sorted(m for m in installed if m in vision_names), 

417 ), 

418 ) 

419 return response.model_dump() 

420 

421 

422async def set_chat_model(model: str) -> dict[str, str]: 

423 """Switch active chat model. Returns {model}.""" 

424 from lilbee import settings 

425 from lilbee.config import cfg 

426 from lilbee.models import ensure_tag 

427 

428 tagged = ensure_tag(model) 

429 cfg.chat_model = tagged 

430 settings.set_value(cfg.data_root, "chat_model", tagged) 

431 return {"model": tagged} 

432 

433 

434async def set_vision_model(model: str) -> dict[str, str]: 

435 """Switch active vision model. Pass empty string to disable. Returns {model}.""" 

436 from lilbee import settings 

437 from lilbee.config import cfg 

438 

439 cfg.vision_model = model 

440 settings.set_value(cfg.data_root, "vision_model", model) 

441 return {"model": model}