Coverage for src / lilbee / query.py: 100%
435 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
1"""RAG query pipeline -- embed question, search, generate answer with citations."""
3from __future__ import annotations
5import logging
6import math
7import re
8from collections.abc import Generator, Iterator
9from datetime import datetime
10from pathlib import Path
11from typing import TYPE_CHECKING, Any, cast
13if TYPE_CHECKING:
14 from lilbee.concepts import ConceptGraph
15 from lilbee.reasoning import StreamToken
16 from lilbee.reranker import Reranker
18from pydantic import BaseModel
19from typing_extensions import TypedDict
21from lilbee.config import Config, cfg
22from lilbee.embedder import Embedder
23from lilbee.providers.base import ClosableIterator, LLMProvider
24from lilbee.reasoning import strip_reasoning
25from lilbee.store import (
26 CHUNK_TYPE_RAW,
27 CHUNK_TYPE_WIKI,
28 CitationRecord,
29 SearchChunk,
30 Store,
31 cosine_sim,
32)
34log = logging.getLogger(__name__)
36_MIN_TOKEN_LEN = 2
37_TOKEN_SPLIT_RE = re.compile(r"\W+")
40def _tokenize(text: str) -> list[str]:
41 """Lowercase alphanumeric tokens, split on any non-alnum run."""
42 return [word for word in _TOKEN_SPLIT_RE.split(text.lower()) if len(word) >= _MIN_TOKEN_LEN]
45def _idf_weights(
46 question_terms: set[str],
47 chunk_tokens: list[set[str]],
48) -> dict[str, float]:
49 """Inverse Document Frequency weight per query term over the candidate chunks.
51 Classical IDF per Spärck Jones (1972), "A Statistical Interpretation
52 of Term Specificity and Its Application in Retrieval", Journal of
53 Documentation 28:11-21. Terms that appear in every chunk collapse to
54 zero weight, so corpus-specific stopwords are filtered automatically.
55 """
56 n = len(chunk_tokens)
57 df: dict[str, int] = {}
58 for tokens in chunk_tokens:
59 for term in tokens & question_terms:
60 df[term] = df.get(term, 0) + 1
61 return {t: max(0.0, math.log(n / (1 + df.get(t, 0)))) for t in question_terms}
64_DEFAULT_RELEVANCE_WEIGHT = 0.5
67def _relevance_weight(result: SearchChunk) -> float:
68 """Return a [0, 1] relevance weight for distance-aware selection.
70 Hybrid results (relevance_score set): use directly.
71 Vector results (distance set): invert cosine distance.
72 Neither: neutral default.
73 """
74 if result.relevance_score is not None:
75 return min(1.0, max(0.0, result.relevance_score))
76 if result.distance is not None:
77 return max(0.0, 1.0 - result.distance)
78 return _DEFAULT_RELEVANCE_WEIGHT
81def _greedy_cover(
82 chunk_tokens: list[set[str]],
83 question_terms: set[str],
84 term_weights: dict[str, float],
85 budget: int,
86 relevance_weights: list[float] | None = None,
87) -> list[int]:
88 """Greedy weighted set cover: pick chunks that add the most uncovered weight.
90 Standard (1 - 1/e) approximation for weighted set cover. Budget is
91 always filled, falling back to retrieval order once no chunk can
92 contribute any new weight. When *relevance_weights* is provided,
93 each chunk's IDF gain is scaled by its relevance so that far-away
94 chunks are penalised even when they share query terms.
95 """
96 selected: list[int] = []
97 covered: set[str] = set()
98 remaining = list(range(len(chunk_tokens)))
99 while remaining and len(selected) < budget:
100 best_pos = -1
101 best_gain = 0.0
102 for pos, idx in enumerate(remaining):
103 new_terms = (chunk_tokens[idx] & question_terms) - covered
104 gain = sum(term_weights[t] for t in new_terms)
105 if relevance_weights is not None:
106 gain *= relevance_weights[idx]
107 if gain > best_gain:
108 best_gain = gain
109 best_pos = pos
110 if best_pos < 0:
111 break
112 chosen = remaining.pop(best_pos)
113 selected.append(chosen)
114 covered |= chunk_tokens[chosen] & question_terms
116 for idx in remaining:
117 if len(selected) >= budget:
118 break
119 selected.append(idx)
120 return selected
123class ChatMessage(TypedDict):
124 """A single chat message with role and content."""
126 role: str
127 content: str
130_CITE_REF_RE = re.compile(r"\[(\d+)\]")
132# Matches trailing LLM-generated citation blocks like "Key sources:", "Sources:",
133# "References:", "Bibliography:", "Citations:" (with optional markdown heading).
134_LLM_CITATION_BLOCK_RE = re.compile(
135 r"\n{1,3}(?:#+\s*)?(?:(?:Key\s+)?Sources|References|Bibliography|Citations)\s*:?\s*\n.*",
136 re.IGNORECASE | re.DOTALL,
137)
140def _extract_cited_indices(text: str) -> set[int]:
141 """Extract [N] citation references from LLM answer text."""
142 return {int(m.group(1)) for m in _CITE_REF_RE.finditer(text)}
145def strip_llm_citations(text: str) -> str:
146 """Remove LLM-generated trailing citation blocks from answer text."""
147 return _LLM_CITATION_BLOCK_RE.sub("", text).rstrip()
150def filter_results(
151 results: list[SearchChunk],
152 max_distance: float,
153 min_relevance_score: float = 0.0,
154) -> list[SearchChunk]:
155 """Drop results above max_distance or below min_relevance_score.
157 Hybrid results (relevance_score set) are checked against min_relevance_score.
158 Vector results (distance set) are checked against max_distance.
159 Results with neither score pass through. When both scores are present,
160 relevance_score takes priority (hybrid results use RRF scoring, not
161 cosine distance). Pass max_distance=0 to disable distance filtering.
162 """
163 if max_distance <= 0 and min_relevance_score <= 0:
164 return results
165 filtered: list[SearchChunk] = []
166 for r in results:
167 # Hybrid results: check relevance_score (takes priority over distance)
168 if r.relevance_score is not None:
169 if min_relevance_score > 0 and r.relevance_score < min_relevance_score:
170 continue
171 elif r.distance is not None and max_distance > 0 and r.distance > max_distance:
172 continue
173 filtered.append(r)
174 return filtered
177CONTEXT_TEMPLATE = """Context:
178{context}
180Question: {question}"""
183def display_source_path(source: str) -> str:
184 """Render a chunk's source as an absolute path with ``~`` expansion.
186 Source values in the store are stored relative to ``documents_dir`` so the
187 database is portable across machines. For display we resolve back to the
188 user's filesystem and substitute ``~`` for the home directory so the path
189 is unambiguous without being noisy.
191 Falls back to the raw source string if the file no longer exists on disk
192 (e.g. the user moved the documents directory since ingestion).
193 """
194 candidate = cfg.documents_dir / source
195 try:
196 resolved = candidate.resolve(strict=False)
197 except OSError:
198 return source
199 home = Path.home()
200 try:
201 return f"~/{resolved.relative_to(home)}"
202 except ValueError:
203 return str(resolved)
206def _format_citation(citation: CitationRecord) -> str:
207 """Format a single citation record as an indented attribution line."""
208 source_display = display_source_path(citation["source_filename"])
209 if citation["page_start"] or citation["page_end"]:
210 ps, pe = citation["page_start"], citation["page_end"]
211 pages = f"page {ps}" if ps == pe else f"pages {ps}-{pe}"
212 return f" → {source_display}, {pages}"
213 if citation["line_start"] or citation["line_end"]:
214 ls, le = citation["line_start"], citation["line_end"]
215 lines = f"line {ls}" if ls == le else f"lines {ls}-{le}"
216 return f" → {source_display}, {lines}"
217 return f" → {source_display}"
220def format_source(result: SearchChunk, citations: list[CitationRecord] | None = None) -> str:
221 """Format a search result as a source citation line.
222 For wiki chunks, shows the wiki page path followed by indented transitive citations.
223 """
224 source_display = display_source_path(result.source)
225 if result.chunk_type == CHUNK_TYPE_WIKI and citations:
226 parts = [f" → {source_display}"]
227 for cit in citations:
228 parts.append(_format_citation(cit))
229 return "\n".join(parts)
231 if result.content_type == "pdf":
232 ps, pe = result.page_start, result.page_end
233 pages = f"page {ps}" if ps == pe else f"pages {ps}-{pe}"
234 return f" → {source_display}, {pages}"
236 if result.content_type == "code":
237 ls, le = result.line_start, result.line_end
238 lines = f"line {ls}" if ls == le else f"lines {ls}-{le}"
239 return f" → {source_display}, {lines}"
241 return f" → {source_display}"
244def deduplicate_sources(
245 results: list[SearchChunk],
246 max_citations: int = 5,
247 citations_map: dict[str, list[CitationRecord]] | None = None,
248) -> list[str]:
249 """Merge results from same source into deduplicated citation lines."""
250 seen: set[str] = set()
251 citation_lines: list[str] = []
252 for r in results:
253 cits = (citations_map or {}).get(r.source)
254 line = format_source(r, citations=cits)
255 if line not in seen:
256 seen.add(line)
257 citation_lines.append(line)
258 if len(citation_lines) >= max_citations:
259 break
260 return citation_lines
263def _sort_key(r: SearchChunk) -> float:
264 """Sort key: lower = more relevant."""
265 if r.relevance_score is not None:
266 return -r.relevance_score
267 if r.distance is not None:
268 return r.distance
269 return float("inf")
272def sort_by_relevance(results: list[SearchChunk]) -> list[SearchChunk]:
273 """Sort search results by relevance (works for both hybrid and vector results)."""
274 return sorted(results, key=_sort_key)
277def diversify_sources(
278 results: list[SearchChunk], max_per_source: int | None = None
279) -> list[SearchChunk]:
280 """Cap results per source document to ensure diversity.
281 Source diversity filtering: Zhai 2008, "Statistical Language Models for
282 Information Retrieval" -- caps per-source representation to prevent
283 any single document from dominating results.
284 """
285 if max_per_source is None:
286 max_per_source = cfg.diversity_max_per_source
287 counts: dict[str, int] = {}
288 diverse: list[SearchChunk] = []
289 for r in results:
290 count = counts.get(r.source, 0)
291 if count < max_per_source:
292 diverse.append(r)
293 counts[r.source] = count + 1
294 return diverse
297def prepare_results(results: list[SearchChunk]) -> list[SearchChunk]:
298 """Sort by relevance and apply source diversity cap."""
299 return diversify_sources(sort_by_relevance(results))
302def build_context(results: list[SearchChunk]) -> str:
303 """Build context block from search results."""
304 return "\n\n".join(f"[{i}] {r.chunk}" for i, r in enumerate(results, 1))
307_EXPANSION_PROMPT = (
308 "Generate {count} alternative search queries for the following question. "
309 "Return ONLY the queries, one per line, no numbering or explanation.\n\n"
310 "Question: {question}"
311)
313_EXPANSION_MAX_TOKENS = 200
316class AskResult(BaseModel):
317 """Structured result from ask_raw -- answer text + raw search results."""
319 answer: str
320 sources: list[SearchChunk]
323class Searcher:
324 """RAG search pipeline -- embed, search, expand, rerank, generate.
325 All search and answer operations go through this class.
326 Constructed with injected dependencies via the Services container.
327 """
329 def __init__(
330 self,
331 config: Config,
332 provider: LLMProvider,
333 store: Store,
334 embedder: Embedder,
335 reranker: Reranker,
336 concepts: ConceptGraph,
337 ) -> None:
338 self._config = config
339 self._provider = provider
340 self._store = store
341 self._embedder = embedder
342 self._reranker = reranker
343 self._concepts = concepts
345 def _apply_temporal_filter(
346 self, results: list[SearchChunk], question: str
347 ) -> list[SearchChunk]:
348 if not self._config.temporal_filtering:
349 return results
350 from lilbee.temporal import detect_temporal, resolve_date_range
352 keyword = detect_temporal(question)
353 if keyword is None:
354 return results
355 date_range = resolve_date_range(keyword)
356 source_dates = self._store.source_ingested_at_map()
357 filtered: list[SearchChunk] = []
358 for r in results:
359 ingested_at = source_dates.get(r.source, "")
360 if not ingested_at:
361 filtered.append(r)
362 continue
363 try:
364 doc_date = datetime.fromisoformat(ingested_at)
365 if date_range.start <= doc_date <= date_range.end:
366 filtered.append(r)
367 except (ValueError, TypeError):
368 filtered.append(r)
369 return filtered if filtered else results
371 def _apply_guardrails(
372 self,
373 variants: list[tuple[str, list[float]]],
374 question_vec: list[float],
375 ) -> list[tuple[str, list[float]]]:
376 """Drop expansion variants whose embedding drifts too far from the question."""
377 if not self._config.expansion_guardrails:
378 return variants
379 threshold = self._config.expansion_similarity_threshold
380 return [(text, vec) for text, vec in variants if cosine_sim(question_vec, vec) >= threshold]
382 def _concept_query_expansion(self, question: str) -> list[str]:
383 if not self._config.concept_graph:
384 return []
385 try:
386 if not self._concepts.get_graph():
387 return []
388 return self._concepts.expand_query(question)
389 except Exception:
390 log.debug("Concept query expansion failed", exc_info=True)
391 return []
393 def _llm_expand(self, question: str, count: int) -> list[str]:
394 """Call the LLM to produce ``count`` alternative phrasings."""
395 prompt = _EXPANSION_PROMPT.format(count=count, question=question)
396 messages = [{"role": "user", "content": prompt}]
397 response = self._provider.chat(
398 messages, stream=False, options={"num_predict": _EXPANSION_MAX_TOKENS}
399 )
400 if not isinstance(response, str):
401 return []
402 variants = [line.strip() for line in response.strip().split("\n") if line.strip()]
403 return variants[:count]
405 def _expand_query(
406 self, question: str, question_vec: list[float]
407 ) -> list[tuple[str, list[float]]]:
408 """Return ``(variant, variant_vec)`` pairs for downstream search.
410 LLM variants run through ``_apply_guardrails``; concept-graph
411 variants bypass it since they come from deterministic traversal.
412 Embeddings batch per source: one provider round-trip per source.
413 """
414 count = self._config.query_expansion_count
415 if count <= 0 and not self._config.concept_graph:
416 return []
417 # Short queries skip LLM expansion: BM25/vector signal is already strong
418 # and the LLM round-trip dominates latency on small local models.
419 # Concept-graph expansion still runs.
420 short_threshold = self._config.expansion_short_query_tokens
421 skip_llm = short_threshold > 0 and len(_tokenize(question)) <= short_threshold
422 try:
423 llm_variants: list[tuple[str, list[float]]] = []
424 if count > 0 and not skip_llm:
425 llm_texts = list(self._llm_expand(question, count))
426 if llm_texts:
427 llm_vectors = self._embedder.embed_batch(llm_texts)
428 llm_variants = list(zip(llm_texts, llm_vectors, strict=True))
429 llm_variants = self._apply_guardrails(llm_variants, question_vec)
431 concept_texts = list(self._concept_query_expansion(question))
432 if concept_texts:
433 concept_vectors = self._embedder.embed_batch(concept_texts)
434 llm_variants.extend(zip(concept_texts, concept_vectors, strict=True))
436 return llm_variants
437 except Exception as exc:
438 log.warning("Query expansion disabled for this call: %s", exc)
439 log.debug("Query expansion exception", exc_info=True)
440 return []
442 def _should_skip_expansion(self, question: str) -> bool:
443 if self._config.expansion_skip_threshold <= 0:
444 return False
445 results = self._store.bm25_probe(question, top_k=2)
446 if not results:
447 return False
448 top_score = results[0].relevance_score or 0
449 if top_score < self._config.expansion_skip_threshold:
450 return False
451 if len(results) < 2:
452 return True
453 second_score = results[1].relevance_score or 0
454 return (top_score - second_score) >= self._config.expansion_skip_gap
456 def _apply_concept_boost(self, results: list[SearchChunk], question: str) -> list[SearchChunk]:
457 if not self._config.concept_graph or not results:
458 return results
459 try:
460 if not self._concepts.get_graph():
461 return results
462 query_concepts = self._concepts.extract_concepts(question)
463 if not query_concepts:
464 return results
465 return self._concepts.boost_results(results, query_concepts)
466 except Exception:
467 log.debug("Concept boost failed", exc_info=True)
468 return results
470 def _hyde_search(self, question: str, top_k: int) -> list[SearchChunk]:
471 """Hypothetical Document Embedding search.
472 Gao et al. 2022, "Precise Zero-Shot Dense Retrieval without
473 Relevance Labels" -- generates a hypothetical answer passage,
474 embeds it, and uses the embedding to search for real documents.
475 """
476 try:
477 response = self._provider.chat(
478 [{"role": "user", "content": self._config.hyde_prompt.format(question=question)}],
479 stream=False,
480 options={"num_predict": _EXPANSION_MAX_TOKENS},
481 )
482 if not isinstance(response, str) or not response.strip():
483 return []
484 hyde_vec = self._embedder.embed(response.strip())
485 return self._store.search(hyde_vec, top_k=top_k, query_text=None)
486 except Exception:
487 log.debug("HyDE search failed", exc_info=True)
488 return []
490 def _normalize_chunk_type(self, chunk_type: str | None) -> str | None:
491 """Drop ``chunk_type="wiki"`` when wiki generation is disabled.
493 With wiki off the chunks table contains only raw rows, so the
494 filter would return empty. Logging once keeps the surprise out
495 of the user's way while surfacing the misuse in logs.
496 """
497 if chunk_type == CHUNK_TYPE_WIKI and not self._config.wiki:
498 log.warning(
499 "wiki scope requested but wiki is disabled; searching the full pool instead"
500 )
501 return None
502 return chunk_type
504 def _parse_structured_query(self, question: str) -> tuple[str | None, str]:
505 for prefix in ("term:", "vec:", "hyde:", "wiki:", "raw:"):
506 if question.strip().lower().startswith(prefix):
507 return prefix[:-1], question.strip()[len(prefix) :].strip()
508 return None, question
510 def _search_structured(
511 self,
512 mode: str,
513 query: str,
514 top_k: int,
515 chunk_type: str | None = None,
516 ) -> list[SearchChunk]:
517 if mode == "term":
518 return self._store.bm25_probe(query, top_k=top_k)
519 if mode == "vec":
520 query_vec = self._embedder.embed(query)
521 return self._store.search(query_vec, top_k=top_k, query_text=None)
522 if mode == "hyde":
523 return self._hyde_search(query, top_k)
524 if mode in (CHUNK_TYPE_WIKI, CHUNK_TYPE_RAW):
525 # Explicit ``chunk_type`` arg beats the prefix shortcut.
526 effective = chunk_type if chunk_type is not None else mode
527 query_vec = self._embedder.embed(query)
528 return self._store.search(
529 query_vec, top_k=top_k, query_text=query, chunk_type=effective
530 )
531 return []
533 def select_context(
534 self, results: list[SearchChunk], question: str, max_sources: int | None = None
535 ) -> list[SearchChunk]:
536 """Pick ``max_sources`` chunks by greedy IDF-weighted set cover."""
537 if max_sources is None:
538 max_sources = self._config.max_context_sources
539 if len(results) <= max_sources:
540 return results
542 question_terms = set(_tokenize(question))
543 if not question_terms:
544 return results[:max_sources]
546 chunk_tokens = [set(_tokenize(r.chunk)) for r in results]
547 term_weights = _idf_weights(question_terms, chunk_tokens)
548 if not any(term_weights.values()):
549 return results[:max_sources]
551 weights = [_relevance_weight(r) for r in results]
552 selected = _greedy_cover(chunk_tokens, question_terms, term_weights, max_sources, weights)
553 selected.sort()
554 return [results[i] for i in selected]
556 def search(
557 self,
558 question: str,
559 top_k: int = 0,
560 *,
561 chunk_type: str | None = None,
562 ) -> list[SearchChunk]:
563 """Embed question and search with expansion, HyDE, and concept boost.
564 Returns up to top_k*2 candidates for downstream filtering.
566 When *chunk_type* is set (``"raw"`` or ``"wiki"``), only chunks of
567 that type are returned. An explicit ``chunk_type`` always wins
568 over the ``wiki:``/``raw:`` prefix shortcut in *question* so the
569 user-facing scope choice has the final say.
571 When ``chunk_type="wiki"`` but wiki generation is disabled on the
572 config, the filter is normalized to ``None`` (mixed pool) and a
573 warning is logged — with wiki off the chunks table has no wiki
574 rows, so honouring the filter would silently return zero results.
575 """
576 if top_k == 0:
577 top_k = self._config.top_k
578 chunk_type = self._normalize_chunk_type(chunk_type)
579 mode, clean_query = self._parse_structured_query(question)
580 if mode is not None:
581 return self._search_structured(mode, clean_query, top_k, chunk_type=chunk_type)
582 query_vec = self._embedder.embed(question)
583 results = self._store.search(
584 query_vec,
585 top_k=top_k,
586 query_text=question,
587 chunk_type=chunk_type,
588 )
589 if self._should_skip_expansion(question):
590 return results[: top_k * 2]
591 seen = {(r.source, r.chunk_index) for r in results}
592 for variant, variant_vec in self._expand_query(question, query_vec):
593 variant_results = self._store.search(
594 variant_vec,
595 top_k=top_k,
596 query_text=variant,
597 chunk_type=chunk_type,
598 )
599 for r in variant_results:
600 key = (r.source, r.chunk_index)
601 if key not in seen:
602 results.append(r)
603 seen.add(key)
604 if self._config.hyde:
605 hyde_results = self._hyde_search(question, top_k)
606 for r in hyde_results:
607 key = (r.source, r.chunk_index)
608 if key not in seen:
609 if r.distance is not None and self._config.hyde_weight > 0:
610 r = r.model_copy(update={"distance": r.distance / self._config.hyde_weight})
611 results.append(r)
612 seen.add(key)
613 results = self._apply_concept_boost(results, question)
614 return results[: top_k * 2]
616 def build_rag_context(
617 self,
618 question: str,
619 top_k: int = 0,
620 history: list[ChatMessage] | None = None,
621 *,
622 chunk_type: str | None = None,
623 ) -> tuple[list[SearchChunk], list[ChatMessage]] | None:
624 """Build RAG context from search results.
626 ``chunk_type`` restricts the pool to ``"raw"`` or ``"wiki"`` rows;
627 ``None`` (default) searches the mixed pool.
628 """
629 results = self.search(question, top_k=top_k, chunk_type=chunk_type)
630 results = filter_results(
631 results, self._config.max_distance, self._config.min_relevance_score
632 )
633 if not results:
634 return None
635 results = prepare_results(results)
636 if self._config.reranker_model:
637 results = self._reranker.rerank(question, results)
638 results = self._apply_temporal_filter(results, question)
639 results = self.select_context(results, question)
640 context = build_context(results)
641 prompt = CONTEXT_TEMPLATE.format(context=context, question=question)
642 messages: list[ChatMessage] = [{"role": "system", "content": self._config.system_prompt}]
643 if history:
644 messages.extend(history)
645 messages.append({"role": "user", "content": prompt})
646 return results, messages
648 _NO_EMBED_WARNING = (
649 "Chat only — no document search configured. "
650 "Install an embedding model: lilbee models install nomic-embed-text\n\n"
651 )
652 _NO_RESULTS_MESSAGE = "No relevant documents found for this query."
654 def _direct_messages(
655 self, question: str, history: list[ChatMessage] | None = None
656 ) -> list[ChatMessage]:
657 """Build messages for direct LLM chat (no RAG context)."""
658 messages: list[ChatMessage] = [{"role": "system", "content": self._config.system_prompt}]
659 if history:
660 messages.extend(history)
661 messages.append({"role": "user", "content": question})
662 return messages
664 def _messages_for_provider(self, messages: list[ChatMessage]) -> list[dict[str, str]]:
665 """Convert ChatMessage list to provider-expected format."""
666 return [{"role": m["role"], "content": m["content"]} for m in messages]
668 def ask_raw(
669 self,
670 question: str,
671 top_k: int = 0,
672 history: list[ChatMessage] | None = None,
673 options: dict[str, Any] | None = None,
674 *,
675 chunk_type: str | None = None,
676 ) -> AskResult:
677 """Ask a question and get a structured result."""
678 if not self._embedder.embedding_available():
679 messages = self._direct_messages(question, history)
680 provider_messages = self._messages_for_provider(messages)
681 opts = options if options is not None else self._config.generation_options()
682 raw = str(self._provider.chat(provider_messages, options=opts or None) or "")
683 clean = raw if self._config.show_reasoning else strip_reasoning(raw)
684 return AskResult(answer=self._NO_EMBED_WARNING + clean, sources=[])
685 rag = self.build_rag_context(question, top_k=top_k, history=history, chunk_type=chunk_type)
686 if rag is None:
687 return AskResult(
688 answer=self._NO_RESULTS_MESSAGE,
689 sources=[],
690 )
691 results, messages = rag
692 provider_messages = self._messages_for_provider(messages)
693 opts = options if options is not None else self._config.generation_options()
694 raw = str(self._provider.chat(provider_messages, options=opts or None) or "")
695 clean = raw if self._config.show_reasoning else strip_reasoning(raw)
696 return AskResult(answer=clean, sources=results)
698 def ask(
699 self,
700 question: str,
701 top_k: int = 0,
702 history: list[ChatMessage] | None = None,
703 options: dict[str, Any] | None = None,
704 *,
705 chunk_type: str | None = None,
706 ) -> str:
707 """Ask a question and get a formatted answer with citations."""
708 result = self.ask_raw(
709 question, top_k=top_k, history=history, options=options, chunk_type=chunk_type
710 )
711 if not result.sources:
712 return result.answer
713 cited = _extract_cited_indices(result.answer)
714 used = [result.sources[i - 1] for i in sorted(cited) if 1 <= i <= len(result.sources)]
715 answer = strip_llm_citations(result.answer)
716 source_list = used if used else result.sources
717 citations = deduplicate_sources(source_list)
718 return f"{answer}\n\nSources:\n" + "\n".join(citations)
720 def ask_stream(
721 self,
722 question: str,
723 top_k: int = 0,
724 history: list[ChatMessage] | None = None,
725 options: dict[str, Any] | None = None,
726 *,
727 chunk_type: str | None = None,
728 ) -> Generator[StreamToken, None, None]:
729 """Stream answer tokens with citations appended at the end."""
730 from lilbee.reasoning import StreamToken, filter_reasoning
732 if not self._embedder.embedding_available():
733 yield StreamToken(content=self._NO_EMBED_WARNING, is_reasoning=False)
734 messages = self._direct_messages(question, history)
735 provider_messages = self._messages_for_provider(messages)
736 opts = options if options is not None else self._config.generation_options()
737 raw = self._provider.chat(provider_messages, stream=True, options=opts or None)
738 try:
739 for st in filter_reasoning(
740 cast(Iterator[str], raw), show=self._config.show_reasoning
741 ):
742 if st.content:
743 yield st
744 except (ConnectionError, OSError) as exc:
745 yield StreamToken(content=f"\n\n[Connection lost: {exc}]", is_reasoning=False)
746 finally:
747 if isinstance(raw, ClosableIterator):
748 raw.close()
749 return
751 rag = self.build_rag_context(question, top_k=top_k, history=history, chunk_type=chunk_type)
752 if rag is None:
753 yield StreamToken(
754 content=self._NO_RESULTS_MESSAGE,
755 is_reasoning=False,
756 )
757 return
758 results, messages = rag
759 provider_messages = self._messages_for_provider(messages)
760 opts = options if options is not None else self._config.generation_options()
761 raw_stream = self._provider.chat(provider_messages, stream=True, options=opts or None)
762 answer_parts: list[str] = []
763 try:
764 for st in filter_reasoning(
765 cast(Iterator[str], raw_stream), show=self._config.show_reasoning
766 ):
767 if st.content:
768 answer_parts.append(st.content)
769 yield st
770 except (ConnectionError, OSError) as exc:
771 yield StreamToken(content=f"\n\n[Connection lost: {exc}]", is_reasoning=False)
772 finally:
773 if isinstance(raw_stream, ClosableIterator):
774 raw_stream.close()
775 # Note: LLM-generated citation blocks in streamed tokens cannot be
776 # retroactively stripped. The system prompt discourages them; this
777 # only filters the code-appended Sources block to cited chunks.
778 full_answer = "".join(answer_parts)
779 cited = _extract_cited_indices(full_answer)
780 used = [results[i - 1] for i in sorted(cited) if 1 <= i <= len(results)]
781 source_list = used if used else results
782 citations = deduplicate_sources(source_list)
783 yield StreamToken(content="\n\nSources:\n" + "\n".join(citations), is_reasoning=False)