Coverage for src / lilbee / server / litestar_app.py: 100%
71 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 08:27 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 08:27 +0000
1"""Litestar adapter — wires framework-agnostic handlers to HTTP routes."""
3from __future__ import annotations
5from collections.abc import AsyncGenerator
6from typing import Any
8from litestar import Litestar, get, post, put
9from litestar.config.cors import CORSConfig
10from litestar.exceptions import ValidationException
11from litestar.openapi import OpenAPIConfig
12from litestar.params import Parameter
13from litestar.response import Stream
15from lilbee.cli.helpers import get_version
16from lilbee.query import ChatMessage as ChatMessageDict
17from lilbee.server import handlers
18from lilbee.server.handlers import _sse_generator
19from lilbee.server.models import (
20 AddRequest,
21 AskRequest,
22 AskResponse,
23 ChatRequest,
24 CleanedChunk,
25 HealthResponse,
26 SetModelRequest,
27 SetModelResponse,
28 SyncRequest,
29)
32def _clean_to_model(raw: dict) -> CleanedChunk:
33 """Convert a raw cleaned dict to a CleanedChunk model."""
34 return CleanedChunk(**raw)
37@get("/api/health")
38async def health_route() -> HealthResponse:
39 """Service health check returning server version and uptime status."""
40 raw = await handlers.health()
41 return HealthResponse(**raw)
44@get("/api/status")
45async def status_route() -> dict[str, Any]:
46 """Current configuration, indexed document sources, and chunk counts."""
47 return await handlers.status()
50@get("/api/search")
51async def search_route(
52 q: str = Parameter(query="q"),
53 top_k: int = Parameter(query="top_k", default=5),
54) -> list[dict[str, Any]]:
55 """Search indexed documents by semantic similarity. No LLM call required."""
56 return await handlers.search(q, top_k=top_k)
59@post("/api/ask")
60async def ask_route(data: AskRequest) -> AskResponse:
61 """One-shot RAG question returning an answer with source chunks."""
62 raw = await handlers.ask(
63 question=data.question,
64 top_k=data.top_k,
65 options=data.options,
66 )
67 return AskResponse(
68 answer=raw["answer"],
69 sources=[_clean_to_model(s) for s in raw["sources"]],
70 )
73@post("/api/ask/stream")
74async def ask_stream_route(data: AskRequest) -> Stream:
75 """Streaming SSE version of ask, emitting token-by-token answer chunks."""
76 return Stream(
77 handlers.ask_stream(
78 question=data.question,
79 top_k=data.top_k,
80 options=data.options,
81 ),
82 media_type="text/event-stream",
83 )
86@post("/api/chat")
87async def chat_route(data: ChatRequest) -> AskResponse:
88 """RAG chat with conversation history, returning an answer with sources."""
89 history: list[ChatMessageDict] = [
90 ChatMessageDict(role=m.role, content=m.content) for m in data.history
91 ]
92 raw = await handlers.chat(
93 question=data.question,
94 history=history,
95 top_k=data.top_k,
96 options=data.options,
97 )
98 return AskResponse(
99 answer=raw["answer"],
100 sources=[_clean_to_model(s) for s in raw["sources"]],
101 )
104@post("/api/chat/stream")
105async def chat_stream_route(data: ChatRequest) -> Stream:
106 """Streaming SSE version of chat with conversation history."""
107 history: list[ChatMessageDict] = [
108 ChatMessageDict(role=m.role, content=m.content) for m in data.history
109 ]
110 return Stream(
111 handlers.chat_stream(
112 question=data.question,
113 history=history,
114 top_k=data.top_k,
115 options=data.options,
116 ),
117 media_type="text/event-stream",
118 )
121@post("/api/sync")
122async def sync_route(data: SyncRequest | None = None) -> Stream:
123 """Re-index changed documents with streaming SSE progress events."""
124 force_vision = data.force_vision if data else False
125 return Stream(
126 handlers.sync_stream(force_vision=force_vision),
127 media_type="text/event-stream",
128 )
131@post("/api/add")
132async def add_route(data: AddRequest) -> Stream:
133 """Add files to the knowledge base with streaming SSE progress."""
134 try:
135 _paths, queue, task = await handlers.add_files(data.model_dump())
136 except ValueError as exc:
137 raise ValidationException(str(exc)) from exc
139 async def _stream() -> AsyncGenerator[bytes, None]:
140 async for chunk in _sse_generator(queue):
141 yield chunk
142 await task
144 return Stream(_stream(), media_type="text/event-stream", status_code=201)
147@get("/api/models")
148async def models_list_route() -> dict[str, Any]:
149 """Available chat and vision models from Ollama."""
150 return await handlers.list_models()
153@put("/api/models/chat")
154async def models_set_chat_route(data: SetModelRequest) -> SetModelResponse:
155 """Switch the active chat model used for RAG answers."""
156 raw = await handlers.set_chat_model(model=data.model)
157 return SetModelResponse(**raw)
160@put("/api/models/vision")
161async def models_set_vision_route(data: SetModelRequest) -> SetModelResponse:
162 """Switch the active vision model used for image and PDF OCR."""
163 raw = await handlers.set_vision_model(model=data.model)
164 return SetModelResponse(**raw)
167def create_app() -> Litestar:
168 """Create the Litestar application instance."""
169 cors = CORSConfig(
170 allow_origins=["app://obsidian.md"],
171 allow_origin_regex=r"^http://localhost(:\d+)?$",
172 allow_methods=["GET", "POST", "PUT"],
173 allow_headers=["Content-Type"],
174 )
175 return Litestar(
176 route_handlers=[
177 health_route,
178 status_route,
179 search_route,
180 ask_route,
181 ask_stream_route,
182 chat_route,
183 chat_stream_route,
184 sync_route,
185 add_route,
186 models_list_route,
187 models_set_chat_route,
188 models_set_vision_route,
189 ],
190 cors_config=cors,
191 openapi_config=OpenAPIConfig(
192 title="lilbee",
193 description="Local knowledge base REST API",
194 version=get_version(),
195 path="/schema",
196 ),
197 )