Coverage for src / lilbee / query.py: 100%

435 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-29 19:16 +0000

1"""RAG query pipeline -- embed question, search, generate answer with citations.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6import math 

7import re 

8from collections.abc import Generator, Iterator 

9from datetime import datetime 

10from pathlib import Path 

11from typing import TYPE_CHECKING, Any, cast 

12 

13if TYPE_CHECKING: 

14 from lilbee.concepts import ConceptGraph 

15 from lilbee.reasoning import StreamToken 

16 from lilbee.reranker import Reranker 

17 

18from pydantic import BaseModel 

19from typing_extensions import TypedDict 

20 

21from lilbee.config import Config, cfg 

22from lilbee.embedder import Embedder 

23from lilbee.providers.base import ClosableIterator, LLMProvider 

24from lilbee.reasoning import strip_reasoning 

25from lilbee.store import ( 

26 CHUNK_TYPE_RAW, 

27 CHUNK_TYPE_WIKI, 

28 CitationRecord, 

29 SearchChunk, 

30 Store, 

31 cosine_sim, 

32) 

33 

34log = logging.getLogger(__name__) 

35 

36_MIN_TOKEN_LEN = 2 

37_TOKEN_SPLIT_RE = re.compile(r"\W+") 

38 

39 

40def _tokenize(text: str) -> list[str]: 

41 """Lowercase alphanumeric tokens, split on any non-alnum run.""" 

42 return [word for word in _TOKEN_SPLIT_RE.split(text.lower()) if len(word) >= _MIN_TOKEN_LEN] 

43 

44 

45def _idf_weights( 

46 question_terms: set[str], 

47 chunk_tokens: list[set[str]], 

48) -> dict[str, float]: 

49 """Inverse Document Frequency weight per query term over the candidate chunks. 

50 

51 Classical IDF per Spärck Jones (1972), "A Statistical Interpretation 

52 of Term Specificity and Its Application in Retrieval", Journal of 

53 Documentation 28:11-21. Terms that appear in every chunk collapse to 

54 zero weight, so corpus-specific stopwords are filtered automatically. 

55 """ 

56 n = len(chunk_tokens) 

57 df: dict[str, int] = {} 

58 for tokens in chunk_tokens: 

59 for term in tokens & question_terms: 

60 df[term] = df.get(term, 0) + 1 

61 return {t: max(0.0, math.log(n / (1 + df.get(t, 0)))) for t in question_terms} 

62 

63 

64_DEFAULT_RELEVANCE_WEIGHT = 0.5 

65 

66 

67def _relevance_weight(result: SearchChunk) -> float: 

68 """Return a [0, 1] relevance weight for distance-aware selection. 

69 

70 Hybrid results (relevance_score set): use directly. 

71 Vector results (distance set): invert cosine distance. 

72 Neither: neutral default. 

73 """ 

74 if result.relevance_score is not None: 

75 return min(1.0, max(0.0, result.relevance_score)) 

76 if result.distance is not None: 

77 return max(0.0, 1.0 - result.distance) 

78 return _DEFAULT_RELEVANCE_WEIGHT 

79 

80 

81def _greedy_cover( 

82 chunk_tokens: list[set[str]], 

83 question_terms: set[str], 

84 term_weights: dict[str, float], 

85 budget: int, 

86 relevance_weights: list[float] | None = None, 

87) -> list[int]: 

88 """Greedy weighted set cover: pick chunks that add the most uncovered weight. 

89 

90 Standard (1 - 1/e) approximation for weighted set cover. Budget is 

91 always filled, falling back to retrieval order once no chunk can 

92 contribute any new weight. When *relevance_weights* is provided, 

93 each chunk's IDF gain is scaled by its relevance so that far-away 

94 chunks are penalised even when they share query terms. 

95 """ 

96 selected: list[int] = [] 

97 covered: set[str] = set() 

98 remaining = list(range(len(chunk_tokens))) 

99 while remaining and len(selected) < budget: 

100 best_pos = -1 

101 best_gain = 0.0 

102 for pos, idx in enumerate(remaining): 

103 new_terms = (chunk_tokens[idx] & question_terms) - covered 

104 gain = sum(term_weights[t] for t in new_terms) 

105 if relevance_weights is not None: 

106 gain *= relevance_weights[idx] 

107 if gain > best_gain: 

108 best_gain = gain 

109 best_pos = pos 

110 if best_pos < 0: 

111 break 

112 chosen = remaining.pop(best_pos) 

113 selected.append(chosen) 

114 covered |= chunk_tokens[chosen] & question_terms 

115 

116 for idx in remaining: 

117 if len(selected) >= budget: 

118 break 

119 selected.append(idx) 

120 return selected 

121 

122 

123class ChatMessage(TypedDict): 

124 """A single chat message with role and content.""" 

125 

126 role: str 

127 content: str 

128 

129 

130_CITE_REF_RE = re.compile(r"\[(\d+)\]") 

131 

132# Matches trailing LLM-generated citation blocks like "Key sources:", "Sources:", 

133# "References:", "Bibliography:", "Citations:" (with optional markdown heading). 

134_LLM_CITATION_BLOCK_RE = re.compile( 

135 r"\n{1,3}(?:#+\s*)?(?:(?:Key\s+)?Sources|References|Bibliography|Citations)\s*:?\s*\n.*", 

136 re.IGNORECASE | re.DOTALL, 

137) 

138 

139 

140def _extract_cited_indices(text: str) -> set[int]: 

141 """Extract [N] citation references from LLM answer text.""" 

142 return {int(m.group(1)) for m in _CITE_REF_RE.finditer(text)} 

143 

144 

145def strip_llm_citations(text: str) -> str: 

146 """Remove LLM-generated trailing citation blocks from answer text.""" 

147 return _LLM_CITATION_BLOCK_RE.sub("", text).rstrip() 

148 

149 

150def filter_results( 

151 results: list[SearchChunk], 

152 max_distance: float, 

153 min_relevance_score: float = 0.0, 

154) -> list[SearchChunk]: 

155 """Drop results above max_distance or below min_relevance_score. 

156 

157 Hybrid results (relevance_score set) are checked against min_relevance_score. 

158 Vector results (distance set) are checked against max_distance. 

159 Results with neither score pass through. When both scores are present, 

160 relevance_score takes priority (hybrid results use RRF scoring, not 

161 cosine distance). Pass max_distance=0 to disable distance filtering. 

162 """ 

163 if max_distance <= 0 and min_relevance_score <= 0: 

164 return results 

165 filtered: list[SearchChunk] = [] 

166 for r in results: 

167 # Hybrid results: check relevance_score (takes priority over distance) 

168 if r.relevance_score is not None: 

169 if min_relevance_score > 0 and r.relevance_score < min_relevance_score: 

170 continue 

171 elif r.distance is not None and max_distance > 0 and r.distance > max_distance: 

172 continue 

173 filtered.append(r) 

174 return filtered 

175 

176 

177CONTEXT_TEMPLATE = """Context: 

178{context} 

179 

180Question: {question}""" 

181 

182 

183def display_source_path(source: str) -> str: 

184 """Render a chunk's source as an absolute path with ``~`` expansion. 

185 

186 Source values in the store are stored relative to ``documents_dir`` so the 

187 database is portable across machines. For display we resolve back to the 

188 user's filesystem and substitute ``~`` for the home directory so the path 

189 is unambiguous without being noisy. 

190 

191 Falls back to the raw source string if the file no longer exists on disk 

192 (e.g. the user moved the documents directory since ingestion). 

193 """ 

194 candidate = cfg.documents_dir / source 

195 try: 

196 resolved = candidate.resolve(strict=False) 

197 except OSError: 

198 return source 

199 home = Path.home() 

200 try: 

201 return f"~/{resolved.relative_to(home)}" 

202 except ValueError: 

203 return str(resolved) 

204 

205 

206def _format_citation(citation: CitationRecord) -> str: 

207 """Format a single citation record as an indented attribution line.""" 

208 source_display = display_source_path(citation["source_filename"]) 

209 if citation["page_start"] or citation["page_end"]: 

210 ps, pe = citation["page_start"], citation["page_end"] 

211 pages = f"page {ps}" if ps == pe else f"pages {ps}-{pe}" 

212 return f"{source_display}, {pages}" 

213 if citation["line_start"] or citation["line_end"]: 

214 ls, le = citation["line_start"], citation["line_end"] 

215 lines = f"line {ls}" if ls == le else f"lines {ls}-{le}" 

216 return f"{source_display}, {lines}" 

217 return f"{source_display}" 

218 

219 

220def format_source(result: SearchChunk, citations: list[CitationRecord] | None = None) -> str: 

221 """Format a search result as a source citation line. 

222 For wiki chunks, shows the wiki page path followed by indented transitive citations. 

223 """ 

224 source_display = display_source_path(result.source) 

225 if result.chunk_type == CHUNK_TYPE_WIKI and citations: 

226 parts = [f"{source_display}"] 

227 for cit in citations: 

228 parts.append(_format_citation(cit)) 

229 return "\n".join(parts) 

230 

231 if result.content_type == "pdf": 

232 ps, pe = result.page_start, result.page_end 

233 pages = f"page {ps}" if ps == pe else f"pages {ps}-{pe}" 

234 return f"{source_display}, {pages}" 

235 

236 if result.content_type == "code": 

237 ls, le = result.line_start, result.line_end 

238 lines = f"line {ls}" if ls == le else f"lines {ls}-{le}" 

239 return f"{source_display}, {lines}" 

240 

241 return f"{source_display}" 

242 

243 

244def deduplicate_sources( 

245 results: list[SearchChunk], 

246 max_citations: int = 5, 

247 citations_map: dict[str, list[CitationRecord]] | None = None, 

248) -> list[str]: 

249 """Merge results from same source into deduplicated citation lines.""" 

250 seen: set[str] = set() 

251 citation_lines: list[str] = [] 

252 for r in results: 

253 cits = (citations_map or {}).get(r.source) 

254 line = format_source(r, citations=cits) 

255 if line not in seen: 

256 seen.add(line) 

257 citation_lines.append(line) 

258 if len(citation_lines) >= max_citations: 

259 break 

260 return citation_lines 

261 

262 

263def _sort_key(r: SearchChunk) -> float: 

264 """Sort key: lower = more relevant.""" 

265 if r.relevance_score is not None: 

266 return -r.relevance_score 

267 if r.distance is not None: 

268 return r.distance 

269 return float("inf") 

270 

271 

272def sort_by_relevance(results: list[SearchChunk]) -> list[SearchChunk]: 

273 """Sort search results by relevance (works for both hybrid and vector results).""" 

274 return sorted(results, key=_sort_key) 

275 

276 

277def diversify_sources( 

278 results: list[SearchChunk], max_per_source: int | None = None 

279) -> list[SearchChunk]: 

280 """Cap results per source document to ensure diversity. 

281 Source diversity filtering: Zhai 2008, "Statistical Language Models for 

282 Information Retrieval" -- caps per-source representation to prevent 

283 any single document from dominating results. 

284 """ 

285 if max_per_source is None: 

286 max_per_source = cfg.diversity_max_per_source 

287 counts: dict[str, int] = {} 

288 diverse: list[SearchChunk] = [] 

289 for r in results: 

290 count = counts.get(r.source, 0) 

291 if count < max_per_source: 

292 diverse.append(r) 

293 counts[r.source] = count + 1 

294 return diverse 

295 

296 

297def prepare_results(results: list[SearchChunk]) -> list[SearchChunk]: 

298 """Sort by relevance and apply source diversity cap.""" 

299 return diversify_sources(sort_by_relevance(results)) 

300 

301 

302def build_context(results: list[SearchChunk]) -> str: 

303 """Build context block from search results.""" 

304 return "\n\n".join(f"[{i}] {r.chunk}" for i, r in enumerate(results, 1)) 

305 

306 

307_EXPANSION_PROMPT = ( 

308 "Generate {count} alternative search queries for the following question. " 

309 "Return ONLY the queries, one per line, no numbering or explanation.\n\n" 

310 "Question: {question}" 

311) 

312 

313_EXPANSION_MAX_TOKENS = 200 

314 

315 

316class AskResult(BaseModel): 

317 """Structured result from ask_raw -- answer text + raw search results.""" 

318 

319 answer: str 

320 sources: list[SearchChunk] 

321 

322 

323class Searcher: 

324 """RAG search pipeline -- embed, search, expand, rerank, generate. 

325 All search and answer operations go through this class. 

326 Constructed with injected dependencies via the Services container. 

327 """ 

328 

329 def __init__( 

330 self, 

331 config: Config, 

332 provider: LLMProvider, 

333 store: Store, 

334 embedder: Embedder, 

335 reranker: Reranker, 

336 concepts: ConceptGraph, 

337 ) -> None: 

338 self._config = config 

339 self._provider = provider 

340 self._store = store 

341 self._embedder = embedder 

342 self._reranker = reranker 

343 self._concepts = concepts 

344 

345 def _apply_temporal_filter( 

346 self, results: list[SearchChunk], question: str 

347 ) -> list[SearchChunk]: 

348 if not self._config.temporal_filtering: 

349 return results 

350 from lilbee.temporal import detect_temporal, resolve_date_range 

351 

352 keyword = detect_temporal(question) 

353 if keyword is None: 

354 return results 

355 date_range = resolve_date_range(keyword) 

356 source_dates = self._store.source_ingested_at_map() 

357 filtered: list[SearchChunk] = [] 

358 for r in results: 

359 ingested_at = source_dates.get(r.source, "") 

360 if not ingested_at: 

361 filtered.append(r) 

362 continue 

363 try: 

364 doc_date = datetime.fromisoformat(ingested_at) 

365 if date_range.start <= doc_date <= date_range.end: 

366 filtered.append(r) 

367 except (ValueError, TypeError): 

368 filtered.append(r) 

369 return filtered if filtered else results 

370 

371 def _apply_guardrails( 

372 self, 

373 variants: list[tuple[str, list[float]]], 

374 question_vec: list[float], 

375 ) -> list[tuple[str, list[float]]]: 

376 """Drop expansion variants whose embedding drifts too far from the question.""" 

377 if not self._config.expansion_guardrails: 

378 return variants 

379 threshold = self._config.expansion_similarity_threshold 

380 return [(text, vec) for text, vec in variants if cosine_sim(question_vec, vec) >= threshold] 

381 

382 def _concept_query_expansion(self, question: str) -> list[str]: 

383 if not self._config.concept_graph: 

384 return [] 

385 try: 

386 if not self._concepts.get_graph(): 

387 return [] 

388 return self._concepts.expand_query(question) 

389 except Exception: 

390 log.debug("Concept query expansion failed", exc_info=True) 

391 return [] 

392 

393 def _llm_expand(self, question: str, count: int) -> list[str]: 

394 """Call the LLM to produce ``count`` alternative phrasings.""" 

395 prompt = _EXPANSION_PROMPT.format(count=count, question=question) 

396 messages = [{"role": "user", "content": prompt}] 

397 response = self._provider.chat( 

398 messages, stream=False, options={"num_predict": _EXPANSION_MAX_TOKENS} 

399 ) 

400 if not isinstance(response, str): 

401 return [] 

402 variants = [line.strip() for line in response.strip().split("\n") if line.strip()] 

403 return variants[:count] 

404 

405 def _expand_query( 

406 self, question: str, question_vec: list[float] 

407 ) -> list[tuple[str, list[float]]]: 

408 """Return ``(variant, variant_vec)`` pairs for downstream search. 

409 

410 LLM variants run through ``_apply_guardrails``; concept-graph 

411 variants bypass it since they come from deterministic traversal. 

412 Embeddings batch per source: one provider round-trip per source. 

413 """ 

414 count = self._config.query_expansion_count 

415 if count <= 0 and not self._config.concept_graph: 

416 return [] 

417 # Short queries skip LLM expansion: BM25/vector signal is already strong 

418 # and the LLM round-trip dominates latency on small local models. 

419 # Concept-graph expansion still runs. 

420 short_threshold = self._config.expansion_short_query_tokens 

421 skip_llm = short_threshold > 0 and len(_tokenize(question)) <= short_threshold 

422 try: 

423 llm_variants: list[tuple[str, list[float]]] = [] 

424 if count > 0 and not skip_llm: 

425 llm_texts = list(self._llm_expand(question, count)) 

426 if llm_texts: 

427 llm_vectors = self._embedder.embed_batch(llm_texts) 

428 llm_variants = list(zip(llm_texts, llm_vectors, strict=True)) 

429 llm_variants = self._apply_guardrails(llm_variants, question_vec) 

430 

431 concept_texts = list(self._concept_query_expansion(question)) 

432 if concept_texts: 

433 concept_vectors = self._embedder.embed_batch(concept_texts) 

434 llm_variants.extend(zip(concept_texts, concept_vectors, strict=True)) 

435 

436 return llm_variants 

437 except Exception as exc: 

438 log.warning("Query expansion disabled for this call: %s", exc) 

439 log.debug("Query expansion exception", exc_info=True) 

440 return [] 

441 

442 def _should_skip_expansion(self, question: str) -> bool: 

443 if self._config.expansion_skip_threshold <= 0: 

444 return False 

445 results = self._store.bm25_probe(question, top_k=2) 

446 if not results: 

447 return False 

448 top_score = results[0].relevance_score or 0 

449 if top_score < self._config.expansion_skip_threshold: 

450 return False 

451 if len(results) < 2: 

452 return True 

453 second_score = results[1].relevance_score or 0 

454 return (top_score - second_score) >= self._config.expansion_skip_gap 

455 

456 def _apply_concept_boost(self, results: list[SearchChunk], question: str) -> list[SearchChunk]: 

457 if not self._config.concept_graph or not results: 

458 return results 

459 try: 

460 if not self._concepts.get_graph(): 

461 return results 

462 query_concepts = self._concepts.extract_concepts(question) 

463 if not query_concepts: 

464 return results 

465 return self._concepts.boost_results(results, query_concepts) 

466 except Exception: 

467 log.debug("Concept boost failed", exc_info=True) 

468 return results 

469 

470 def _hyde_search(self, question: str, top_k: int) -> list[SearchChunk]: 

471 """Hypothetical Document Embedding search. 

472 Gao et al. 2022, "Precise Zero-Shot Dense Retrieval without 

473 Relevance Labels" -- generates a hypothetical answer passage, 

474 embeds it, and uses the embedding to search for real documents. 

475 """ 

476 try: 

477 response = self._provider.chat( 

478 [{"role": "user", "content": self._config.hyde_prompt.format(question=question)}], 

479 stream=False, 

480 options={"num_predict": _EXPANSION_MAX_TOKENS}, 

481 ) 

482 if not isinstance(response, str) or not response.strip(): 

483 return [] 

484 hyde_vec = self._embedder.embed(response.strip()) 

485 return self._store.search(hyde_vec, top_k=top_k, query_text=None) 

486 except Exception: 

487 log.debug("HyDE search failed", exc_info=True) 

488 return [] 

489 

490 def _normalize_chunk_type(self, chunk_type: str | None) -> str | None: 

491 """Drop ``chunk_type="wiki"`` when wiki generation is disabled. 

492 

493 With wiki off the chunks table contains only raw rows, so the 

494 filter would return empty. Logging once keeps the surprise out 

495 of the user's way while surfacing the misuse in logs. 

496 """ 

497 if chunk_type == CHUNK_TYPE_WIKI and not self._config.wiki: 

498 log.warning( 

499 "wiki scope requested but wiki is disabled; searching the full pool instead" 

500 ) 

501 return None 

502 return chunk_type 

503 

504 def _parse_structured_query(self, question: str) -> tuple[str | None, str]: 

505 for prefix in ("term:", "vec:", "hyde:", "wiki:", "raw:"): 

506 if question.strip().lower().startswith(prefix): 

507 return prefix[:-1], question.strip()[len(prefix) :].strip() 

508 return None, question 

509 

510 def _search_structured( 

511 self, 

512 mode: str, 

513 query: str, 

514 top_k: int, 

515 chunk_type: str | None = None, 

516 ) -> list[SearchChunk]: 

517 if mode == "term": 

518 return self._store.bm25_probe(query, top_k=top_k) 

519 if mode == "vec": 

520 query_vec = self._embedder.embed(query) 

521 return self._store.search(query_vec, top_k=top_k, query_text=None) 

522 if mode == "hyde": 

523 return self._hyde_search(query, top_k) 

524 if mode in (CHUNK_TYPE_WIKI, CHUNK_TYPE_RAW): 

525 # Explicit ``chunk_type`` arg beats the prefix shortcut. 

526 effective = chunk_type if chunk_type is not None else mode 

527 query_vec = self._embedder.embed(query) 

528 return self._store.search( 

529 query_vec, top_k=top_k, query_text=query, chunk_type=effective 

530 ) 

531 return [] 

532 

533 def select_context( 

534 self, results: list[SearchChunk], question: str, max_sources: int | None = None 

535 ) -> list[SearchChunk]: 

536 """Pick ``max_sources`` chunks by greedy IDF-weighted set cover.""" 

537 if max_sources is None: 

538 max_sources = self._config.max_context_sources 

539 if len(results) <= max_sources: 

540 return results 

541 

542 question_terms = set(_tokenize(question)) 

543 if not question_terms: 

544 return results[:max_sources] 

545 

546 chunk_tokens = [set(_tokenize(r.chunk)) for r in results] 

547 term_weights = _idf_weights(question_terms, chunk_tokens) 

548 if not any(term_weights.values()): 

549 return results[:max_sources] 

550 

551 weights = [_relevance_weight(r) for r in results] 

552 selected = _greedy_cover(chunk_tokens, question_terms, term_weights, max_sources, weights) 

553 selected.sort() 

554 return [results[i] for i in selected] 

555 

556 def search( 

557 self, 

558 question: str, 

559 top_k: int = 0, 

560 *, 

561 chunk_type: str | None = None, 

562 ) -> list[SearchChunk]: 

563 """Embed question and search with expansion, HyDE, and concept boost. 

564 Returns up to top_k*2 candidates for downstream filtering. 

565 

566 When *chunk_type* is set (``"raw"`` or ``"wiki"``), only chunks of 

567 that type are returned. An explicit ``chunk_type`` always wins 

568 over the ``wiki:``/``raw:`` prefix shortcut in *question* so the 

569 user-facing scope choice has the final say. 

570 

571 When ``chunk_type="wiki"`` but wiki generation is disabled on the 

572 config, the filter is normalized to ``None`` (mixed pool) and a 

573 warning is logged — with wiki off the chunks table has no wiki 

574 rows, so honouring the filter would silently return zero results. 

575 """ 

576 if top_k == 0: 

577 top_k = self._config.top_k 

578 chunk_type = self._normalize_chunk_type(chunk_type) 

579 mode, clean_query = self._parse_structured_query(question) 

580 if mode is not None: 

581 return self._search_structured(mode, clean_query, top_k, chunk_type=chunk_type) 

582 query_vec = self._embedder.embed(question) 

583 results = self._store.search( 

584 query_vec, 

585 top_k=top_k, 

586 query_text=question, 

587 chunk_type=chunk_type, 

588 ) 

589 if self._should_skip_expansion(question): 

590 return results[: top_k * 2] 

591 seen = {(r.source, r.chunk_index) for r in results} 

592 for variant, variant_vec in self._expand_query(question, query_vec): 

593 variant_results = self._store.search( 

594 variant_vec, 

595 top_k=top_k, 

596 query_text=variant, 

597 chunk_type=chunk_type, 

598 ) 

599 for r in variant_results: 

600 key = (r.source, r.chunk_index) 

601 if key not in seen: 

602 results.append(r) 

603 seen.add(key) 

604 if self._config.hyde: 

605 hyde_results = self._hyde_search(question, top_k) 

606 for r in hyde_results: 

607 key = (r.source, r.chunk_index) 

608 if key not in seen: 

609 if r.distance is not None and self._config.hyde_weight > 0: 

610 r = r.model_copy(update={"distance": r.distance / self._config.hyde_weight}) 

611 results.append(r) 

612 seen.add(key) 

613 results = self._apply_concept_boost(results, question) 

614 return results[: top_k * 2] 

615 

616 def build_rag_context( 

617 self, 

618 question: str, 

619 top_k: int = 0, 

620 history: list[ChatMessage] | None = None, 

621 *, 

622 chunk_type: str | None = None, 

623 ) -> tuple[list[SearchChunk], list[ChatMessage]] | None: 

624 """Build RAG context from search results. 

625 

626 ``chunk_type`` restricts the pool to ``"raw"`` or ``"wiki"`` rows; 

627 ``None`` (default) searches the mixed pool. 

628 """ 

629 results = self.search(question, top_k=top_k, chunk_type=chunk_type) 

630 results = filter_results( 

631 results, self._config.max_distance, self._config.min_relevance_score 

632 ) 

633 if not results: 

634 return None 

635 results = prepare_results(results) 

636 if self._config.reranker_model: 

637 results = self._reranker.rerank(question, results) 

638 results = self._apply_temporal_filter(results, question) 

639 results = self.select_context(results, question) 

640 context = build_context(results) 

641 prompt = CONTEXT_TEMPLATE.format(context=context, question=question) 

642 messages: list[ChatMessage] = [{"role": "system", "content": self._config.system_prompt}] 

643 if history: 

644 messages.extend(history) 

645 messages.append({"role": "user", "content": prompt}) 

646 return results, messages 

647 

648 _NO_EMBED_WARNING = ( 

649 "Chat only — no document search configured. " 

650 "Install an embedding model: lilbee models install nomic-embed-text\n\n" 

651 ) 

652 _NO_RESULTS_MESSAGE = "No relevant documents found for this query." 

653 

654 def _direct_messages( 

655 self, question: str, history: list[ChatMessage] | None = None 

656 ) -> list[ChatMessage]: 

657 """Build messages for direct LLM chat (no RAG context).""" 

658 messages: list[ChatMessage] = [{"role": "system", "content": self._config.system_prompt}] 

659 if history: 

660 messages.extend(history) 

661 messages.append({"role": "user", "content": question}) 

662 return messages 

663 

664 def _messages_for_provider(self, messages: list[ChatMessage]) -> list[dict[str, str]]: 

665 """Convert ChatMessage list to provider-expected format.""" 

666 return [{"role": m["role"], "content": m["content"]} for m in messages] 

667 

668 def ask_raw( 

669 self, 

670 question: str, 

671 top_k: int = 0, 

672 history: list[ChatMessage] | None = None, 

673 options: dict[str, Any] | None = None, 

674 *, 

675 chunk_type: str | None = None, 

676 ) -> AskResult: 

677 """Ask a question and get a structured result.""" 

678 if not self._embedder.embedding_available(): 

679 messages = self._direct_messages(question, history) 

680 provider_messages = self._messages_for_provider(messages) 

681 opts = options if options is not None else self._config.generation_options() 

682 raw = str(self._provider.chat(provider_messages, options=opts or None) or "") 

683 clean = raw if self._config.show_reasoning else strip_reasoning(raw) 

684 return AskResult(answer=self._NO_EMBED_WARNING + clean, sources=[]) 

685 rag = self.build_rag_context(question, top_k=top_k, history=history, chunk_type=chunk_type) 

686 if rag is None: 

687 return AskResult( 

688 answer=self._NO_RESULTS_MESSAGE, 

689 sources=[], 

690 ) 

691 results, messages = rag 

692 provider_messages = self._messages_for_provider(messages) 

693 opts = options if options is not None else self._config.generation_options() 

694 raw = str(self._provider.chat(provider_messages, options=opts or None) or "") 

695 clean = raw if self._config.show_reasoning else strip_reasoning(raw) 

696 return AskResult(answer=clean, sources=results) 

697 

698 def ask( 

699 self, 

700 question: str, 

701 top_k: int = 0, 

702 history: list[ChatMessage] | None = None, 

703 options: dict[str, Any] | None = None, 

704 *, 

705 chunk_type: str | None = None, 

706 ) -> str: 

707 """Ask a question and get a formatted answer with citations.""" 

708 result = self.ask_raw( 

709 question, top_k=top_k, history=history, options=options, chunk_type=chunk_type 

710 ) 

711 if not result.sources: 

712 return result.answer 

713 cited = _extract_cited_indices(result.answer) 

714 used = [result.sources[i - 1] for i in sorted(cited) if 1 <= i <= len(result.sources)] 

715 answer = strip_llm_citations(result.answer) 

716 source_list = used if used else result.sources 

717 citations = deduplicate_sources(source_list) 

718 return f"{answer}\n\nSources:\n" + "\n".join(citations) 

719 

720 def ask_stream( 

721 self, 

722 question: str, 

723 top_k: int = 0, 

724 history: list[ChatMessage] | None = None, 

725 options: dict[str, Any] | None = None, 

726 *, 

727 chunk_type: str | None = None, 

728 ) -> Generator[StreamToken, None, None]: 

729 """Stream answer tokens with citations appended at the end.""" 

730 from lilbee.reasoning import StreamToken, filter_reasoning 

731 

732 if not self._embedder.embedding_available(): 

733 yield StreamToken(content=self._NO_EMBED_WARNING, is_reasoning=False) 

734 messages = self._direct_messages(question, history) 

735 provider_messages = self._messages_for_provider(messages) 

736 opts = options if options is not None else self._config.generation_options() 

737 raw = self._provider.chat(provider_messages, stream=True, options=opts or None) 

738 try: 

739 for st in filter_reasoning( 

740 cast(Iterator[str], raw), show=self._config.show_reasoning 

741 ): 

742 if st.content: 

743 yield st 

744 except (ConnectionError, OSError) as exc: 

745 yield StreamToken(content=f"\n\n[Connection lost: {exc}]", is_reasoning=False) 

746 finally: 

747 if isinstance(raw, ClosableIterator): 

748 raw.close() 

749 return 

750 

751 rag = self.build_rag_context(question, top_k=top_k, history=history, chunk_type=chunk_type) 

752 if rag is None: 

753 yield StreamToken( 

754 content=self._NO_RESULTS_MESSAGE, 

755 is_reasoning=False, 

756 ) 

757 return 

758 results, messages = rag 

759 provider_messages = self._messages_for_provider(messages) 

760 opts = options if options is not None else self._config.generation_options() 

761 raw_stream = self._provider.chat(provider_messages, stream=True, options=opts or None) 

762 answer_parts: list[str] = [] 

763 try: 

764 for st in filter_reasoning( 

765 cast(Iterator[str], raw_stream), show=self._config.show_reasoning 

766 ): 

767 if st.content: 

768 answer_parts.append(st.content) 

769 yield st 

770 except (ConnectionError, OSError) as exc: 

771 yield StreamToken(content=f"\n\n[Connection lost: {exc}]", is_reasoning=False) 

772 finally: 

773 if isinstance(raw_stream, ClosableIterator): 

774 raw_stream.close() 

775 # Note: LLM-generated citation blocks in streamed tokens cannot be 

776 # retroactively stripped. The system prompt discourages them; this 

777 # only filters the code-appended Sources block to cited chunks. 

778 full_answer = "".join(answer_parts) 

779 cited = _extract_cited_indices(full_answer) 

780 used = [results[i - 1] for i in sorted(cited) if 1 <= i <= len(results)] 

781 source_list = used if used else results 

782 citations = deduplicate_sources(source_list) 

783 yield StreamToken(content="\n\nSources:\n" + "\n".join(citations), is_reasoning=False)