Coverage for src / lilbee / ingest.py: 100%

385 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-29 19:16 +0000

1"""Document sync engine — keeps documents/ dir in sync with LanceDB.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import contextlib 

7import hashlib 

8import logging 

9import os 

10import threading 

11from collections.abc import Callable, Generator 

12from dataclasses import dataclass 

13from enum import StrEnum 

14from pathlib import Path 

15from typing import TYPE_CHECKING, Any, NamedTuple, TypedDict, cast 

16 

17if TYPE_CHECKING: 

18 from kreuzberg import ExtractionConfig, ExtractionResult 

19 

20from pydantic import BaseModel 

21from rich.progress import ( 

22 BarColumn, 

23 MofNCompleteColumn, 

24 Progress, 

25 SpinnerColumn, 

26 TextColumn, 

27 TimeElapsedColumn, 

28) 

29 

30from lilbee.chunk import build_chunking_config, chunk_text 

31from lilbee.code_chunker import CodeChunk, chunk_code, is_code_file 

32from lilbee.config import cfg 

33from lilbee.platform import is_ignored_dir 

34from lilbee.progress import ( 

35 BatchProgressEvent, 

36 DetailedProgressCallback, 

37 EventType, 

38 FileDoneEvent, 

39 FileStartEvent, 

40 SyncDoneEvent, 

41 noop_callback, 

42 shared_progress, 

43) 

44from lilbee.security import validate_path_within 

45from lilbee.services import get_services 

46from lilbee.store import CHUNK_TYPE_RAW 

47from lilbee.vision import extract_pdf_vision 

48 

49log = logging.getLogger(__name__) 

50 

51 

52class FileToProcess(NamedTuple): 

53 """A file queued for ingestion with its metadata.""" 

54 

55 name: str 

56 path: Path 

57 content_type: str 

58 file_hash: str 

59 needs_cleanup: bool 

60 

61 

62# Minimum total chars for extracted text to be considered meaningful. 

63# 50 chars ≈ 12 words — if a PDF yields less, it's almost certainly a scanned 

64# document with no embedded text layer. Text PDFs with even just a title page 

65# easily exceed this threshold; blank/scan-only PDFs yield 0 chars. 

66_MIN_MEANINGFUL_CHARS = 50 

67 

68_PDF_CONTENT_TYPE = "pdf" 

69_MARKDOWN_OUTPUT = "markdown" 

70_TESSERACT_BACKEND = "tesseract" 

71 

72 

73class ExtractMode(StrEnum): 

74 """Extraction topology: pagination / OCR / output format.""" 

75 

76 MARKDOWN = "markdown" 

77 PAGINATED = "paginated" 

78 PAGINATED_OCR = "paginated_ocr" 

79 

80 

81def _has_meaningful_text(result: Any) -> bool: 

82 """Check if extraction produced meaningful text.""" 

83 chunks = getattr(result, "chunks", None) 

84 if chunks: 

85 total = sum(len(c.content.strip()) for c in chunks) 

86 return total > _MIN_MEANINGFUL_CHARS 

87 return False 

88 

89 

90class ChunkRecord(TypedDict): 

91 """A single store-ready chunk record matching store.CHUNKS_SCHEMA.""" 

92 

93 source: str 

94 content_type: str 

95 chunk_type: str 

96 page_start: int 

97 page_end: int 

98 line_start: int 

99 line_end: int 

100 chunk: str 

101 chunk_index: int 

102 vector: list[float] 

103 

104 

105class SyncResult(BaseModel): 

106 """Summary of a sync operation.""" 

107 

108 added: list[str] = [] 

109 updated: list[str] = [] 

110 removed: list[str] = [] 

111 unchanged: int = 0 

112 failed: list[str] = [] 

113 

114 def __str__(self) -> str: 

115 lines = [ 

116 f"Added: {len(self.added)}", 

117 f"Updated: {len(self.updated)}", 

118 f"Removed: {len(self.removed)}", 

119 f"Unchanged: {self.unchanged}", 

120 f"Failed: {len(self.failed)}", 

121 ] 

122 for f in self.failed: 

123 lines.append(f" [red]{f}[/red]") 

124 return "\n".join(lines) 

125 

126 def __repr__(self) -> str: 

127 return ( 

128 f"SyncResult(added={len(self.added)}, updated={len(self.updated)}, " 

129 f"removed={len(self.removed)}, unchanged={self.unchanged}, " 

130 f"failed={len(self.failed)})" 

131 ) 

132 

133 def __rich__(self) -> str: 

134 return self.__str__() 

135 

136 

137@dataclass 

138class _IngestResult: 

139 """Outcome of a single file ingestion attempt.""" 

140 

141 name: str 

142 path: Path 

143 chunk_count: int 

144 error: Exception | None 

145 file_hash: str = "" 

146 

147 

148# Extension → content_type string for document formats handled by kreuzberg 

149_DOCUMENT_EXTENSION_MAP: dict[str, str] = { 

150 **{ext: "text" for ext in (".md", ".txt", ".html", ".rst", ".yaml", ".yml")}, 

151 ".pdf": _PDF_CONTENT_TYPE, 

152 **{ext: ext.lstrip(".") for ext in (".docx", ".xlsx", ".pptx")}, 

153 ".epub": "epub", 

154 **{ext: "image" for ext in (".png", ".jpg", ".jpeg", ".tiff", ".tif", ".bmp", ".webp")}, 

155 **{ext: "data" for ext in (".csv", ".tsv")}, 

156 ".xml": "xml", 

157 **{ext: "json" for ext in (".json", ".jsonl")}, 

158} 

159 

160 

161def file_hash(path: Path) -> str: 

162 """Compute SHA-256 hex digest of a file.""" 

163 h = hashlib.sha256() 

164 with open(path, "rb") as f: 

165 for block in iter(lambda: f.read(8192), b""): 

166 h.update(block) 

167 return h.hexdigest() 

168 

169 

170def _relative_name(path: Path) -> str: 

171 """Get path relative to documents dir as a forward-slash string (portable across OS).""" 

172 return path.relative_to(cfg.documents_dir).as_posix() 

173 

174 

175def discover_files() -> dict[str, Path]: 

176 """Scan documents/ recursively, return {relative_name: absolute_path}.""" 

177 if not cfg.documents_dir.exists(): 

178 return {} 

179 docs_resolved = cfg.documents_dir.resolve() 

180 files: dict[str, Path] = {} 

181 for root, dirs, filenames in os.walk(cfg.documents_dir, topdown=True): 

182 dirs[:] = [d for d in dirs if not is_ignored_dir(d, cfg.ignore_dirs)] 

183 for fname in filenames: 

184 if fname.startswith("."): 

185 continue 

186 path = Path(root) / fname 

187 try: 

188 validate_path_within(path, docs_resolved) 

189 except ValueError: 

190 log.warning("Symlink escapes documents dir, skipping: %s", path) 

191 continue 

192 if classify_file(path) is not None: 

193 files[_relative_name(path)] = path 

194 return files 

195 

196 

197def classify_file(path: Path) -> str | None: 

198 """Classify file by extension. Returns content_type or None if unsupported.""" 

199 doc_type = _DOCUMENT_EXTENSION_MAP.get(path.suffix.lower()) 

200 if doc_type is not None: 

201 return doc_type 

202 if is_code_file(path): 

203 return "code" 

204 return None 

205 

206 

207def content_type_to_mode(content_type: str) -> ExtractMode: 

208 """Map a content_type to the extraction mode.""" 

209 return ExtractMode.PAGINATED if content_type == _PDF_CONTENT_TYPE else ExtractMode.MARKDOWN 

210 

211 

212def extraction_config(mode: ExtractMode) -> ExtractionConfig: 

213 """Build ExtractionConfig for the given extraction mode.""" 

214 from kreuzberg import ExtractionConfig, OcrConfig, PageConfig 

215 

216 chunking = build_chunking_config() 

217 pages = PageConfig(extract_pages=True, insert_page_markers=False) 

218 ocr = OcrConfig(backend=_TESSERACT_BACKEND) 

219 builders: dict[ExtractMode, Callable[[], ExtractionConfig]] = { 

220 ExtractMode.MARKDOWN: lambda: ExtractionConfig( 

221 chunking=chunking, 

222 output_format=_MARKDOWN_OUTPUT, 

223 ), 

224 ExtractMode.PAGINATED: lambda: ExtractionConfig( 

225 chunking=chunking, 

226 pages=pages, 

227 ), 

228 ExtractMode.PAGINATED_OCR: lambda: ExtractionConfig( 

229 chunking=chunking, 

230 pages=pages, 

231 ocr=ocr, 

232 ), 

233 } 

234 return builders[mode]() 

235 

236 

237@contextlib.contextmanager 

238def suppress_fd_stderr() -> Generator[None, None, None]: 

239 """Suppress stderr at the file-descriptor level. 

240 Catches subprocess output (e.g. Tesseract's "Detected N diacritics") 

241 that ``contextlib.redirect_stderr`` cannot intercept. 

242 """ 

243 old_stderr = os.dup(2) 

244 try: 

245 devnull = os.open(os.devnull, os.O_WRONLY) 

246 try: 

247 os.dup2(devnull, 2) 

248 yield 

249 finally: 

250 os.dup2(old_stderr, 2) 

251 os.close(devnull) 

252 finally: 

253 os.close(old_stderr) 

254 

255 

256async def _try_tesseract_ocr( 

257 path: Path, source_name: str, fallback: ExtractionResult 

258) -> ExtractionResult: 

259 """Attempt Tesseract OCR on a scanned PDF. Returns the OCR result or *fallback* on failure. 

260 

261 Wraps extraction in ``asyncio.wait_for(cfg.tesseract_timeout)`` so a 

262 huge scanned document can't monopolize an ingest worker for many 

263 minutes (which the caller perceives as a UI lockup). The timeout is 

264 configurable via ``LILBEE_TESSERACT_TIMEOUT``; 0 disables the cap. 

265 """ 

266 try: 

267 from kreuzberg import extract_file 

268 

269 log.info("PDF text extraction empty, trying Tesseract OCR: %s", source_name) 

270 with suppress_fd_stderr(): 

271 coro = extract_file(str(path), config=extraction_config(ExtractMode.PAGINATED_OCR)) 

272 if cfg.tesseract_timeout > 0: 

273 return await asyncio.wait_for(coro, timeout=cfg.tesseract_timeout) 

274 return await coro 

275 except TimeoutError: 

276 log.warning( 

277 "Tesseract OCR exceeded %.0fs timeout on %s; skipping.", 

278 cfg.tesseract_timeout, 

279 source_name, 

280 ) 

281 return fallback 

282 except Exception: 

283 log.debug("Tesseract OCR unavailable or failed for %s, skipping", source_name) 

284 return fallback 

285 

286 

287def _should_run_ocr() -> bool: 

288 """Decide whether to attempt vision-based OCR on scanned PDFs. 

289 

290 Uses ``cfg.enable_ocr`` and ``cfg.vision_model``: 

291 True = force on (requires ``cfg.vision_model`` to be set for a real 

292 vision run; otherwise the caller falls back to Tesseract). 

293 False = force off. 

294 None = auto-detect: run vision OCR when ``cfg.vision_model`` is set. 

295 """ 

296 if cfg.enable_ocr is True: 

297 return True 

298 if cfg.enable_ocr is False: 

299 return False 

300 return bool(cfg.vision_model) 

301 

302 

303async def _vision_fallback( 

304 path: Path, 

305 source_name: str, 

306 content_type: str, 

307 on_progress: DetailedProgressCallback = noop_callback, 

308 *, 

309 quiet: bool = False, 

310) -> list[ChunkRecord]: 

311 """OCR a scanned PDF via the configured vision model, chunk, and embed. 

312 

313 Uses ``cfg.vision_model`` unconditionally. The chat model is never 

314 loaded as a vision backend. If ``cfg.vision_model`` is empty this 

315 returns an empty list; callers should fall back to Tesseract via 

316 ``_handle_scanned_pdf_fallback``. 

317 """ 

318 if not cfg.vision_model: 

319 return [] 

320 try: 

321 page_texts = await asyncio.to_thread( 

322 extract_pdf_vision, 

323 path, 

324 cfg.vision_model, 

325 quiet=quiet, 

326 timeout=cfg.ocr_timeout, 

327 on_progress=on_progress, 

328 ) 

329 except Exception: 

330 log.warning( 

331 "Vision OCR failed for %s using vision model %s.", 

332 source_name, 

333 cfg.vision_model, 

334 exc_info=True, 

335 ) 

336 return [] 

337 if not page_texts: 

338 return [] 

339 

340 # Single OCR page rarely spans multiple topics; skip the semantic round-trip. 

341 all_chunks = [ 

342 (page_num, chunk) 

343 for page_num, text in page_texts 

344 for chunk in chunk_text(text, use_semantic=False) 

345 ] 

346 if not all_chunks: 

347 return [] 

348 

349 texts = [c for _, c in all_chunks] 

350 

351 vectors = await asyncio.to_thread( 

352 get_services().embedder.embed_batch, texts, source=source_name, on_progress=on_progress 

353 ) 

354 return [ 

355 ChunkRecord( 

356 source=source_name, 

357 content_type=content_type, 

358 chunk_type=CHUNK_TYPE_RAW, 

359 page_start=page_num, 

360 page_end=page_num, 

361 line_start=0, 

362 line_end=0, 

363 chunk=text, 

364 chunk_index=i, 

365 vector=vec, 

366 ) 

367 for i, ((page_num, text), vec) in enumerate(zip(all_chunks, vectors, strict=True)) 

368 ] 

369 

370 

371async def _handle_scanned_pdf_fallback( 

372 path: Path, 

373 source_name: str, 

374 content_type: str, 

375 result: ExtractionResult, 

376 *, 

377 quiet: bool, 

378 on_progress: DetailedProgressCallback, 

379) -> list[ChunkRecord] | ExtractionResult: 

380 """Handle scanned PDF fallback chain: Tesseract OCR then vision model. 

381 

382 Returns chunk records if a fallback produced final results, or an 

383 updated ExtractionResult when Tesseract OCR succeeded (so the 

384 caller can proceed with normal chunking/embedding). 

385 

386 When vision OCR is available (``_should_run_ocr()`` True) we go 

387 straight to it. Tesseract is only attempted when vision isn't an 

388 option at all. Running a huge scanned PDF through vision *and then* 

389 through Tesseract would double the wall-clock cost for no reason, 

390 and Tesseract on a 50+ MB document otherwise feels like a TUI 

391 lockup to the user. 

392 """ 

393 use_ocr = _should_run_ocr() 

394 

395 if use_ocr and cfg.vision_model: 

396 log.info( 

397 "Scanned PDF: using vision OCR for %s (model=%s)", 

398 source_name, 

399 cfg.vision_model, 

400 ) 

401 return await _vision_fallback(path, source_name, content_type, on_progress, quiet=quiet) 

402 

403 result = await _try_tesseract_ocr(path, source_name, result) 

404 

405 if not _has_meaningful_text(result): 

406 log.warning( 

407 "Skipped %s: text extraction produced no usable text. " 

408 "For better results on scanned PDFs, configure a vision model " 

409 "via PUT /api/models/vision or set LILBEE_ENABLE_OCR=true.", 

410 source_name, 

411 ) 

412 return [] 

413 

414 log.info( 

415 "Scanned PDF detected — extracted with Tesseract OCR: %s. " 

416 "For structured markdown output (tables, headings), " 

417 "configure a vision model via PUT /api/models/vision.", 

418 source_name, 

419 ) 

420 return result 

421 

422 

423async def ingest_document( 

424 path: Path, 

425 source_name: str, 

426 content_type: str, 

427 *, 

428 quiet: bool = False, 

429 on_progress: DetailedProgressCallback = noop_callback, 

430) -> list[ChunkRecord]: 

431 """Extract and chunk a document, embed, return records. 

432 

433 Vision OCR is controlled by ``cfg.enable_ocr`` (see ``_should_run_ocr``). 

434 """ 

435 from kreuzberg import extract_file 

436 

437 config = extraction_config(content_type_to_mode(content_type)) 

438 result = await extract_file(str(path), config=config) 

439 

440 if content_type == _PDF_CONTENT_TYPE and not _has_meaningful_text(result): 

441 fallback = await _handle_scanned_pdf_fallback( 

442 path, 

443 source_name, 

444 content_type, 

445 result, 

446 quiet=quiet, 

447 on_progress=on_progress, 

448 ) 

449 if isinstance(fallback, list): 

450 return fallback 

451 # Tesseract OCR succeeded — use the updated ExtractionResult 

452 result = fallback 

453 

454 if not result.chunks: 

455 return [] 

456 

457 texts = [chunk.content for chunk in result.chunks] 

458 vectors = await asyncio.to_thread( 

459 get_services().embedder.embed_batch, texts, source=source_name, on_progress=on_progress 

460 ) 

461 

462 return [ 

463 ChunkRecord( 

464 source=source_name, 

465 content_type=content_type, 

466 chunk_type=CHUNK_TYPE_RAW, 

467 page_start=chunk.metadata.get("first_page") or 0, 

468 page_end=chunk.metadata.get("last_page") or 0, 

469 line_start=0, 

470 line_end=0, 

471 chunk=text, 

472 chunk_index=chunk.metadata.get("chunk_index", idx), 

473 vector=vec, 

474 ) 

475 for idx, (chunk, text, vec) in enumerate(zip(result.chunks, texts, vectors, strict=True)) 

476 ] 

477 

478 

479def ingest_code_sync( 

480 path: Path, 

481 source_name: str, 

482 on_progress: DetailedProgressCallback = noop_callback, 

483) -> list[ChunkRecord]: 

484 """Parse code with tree-sitter, chunk, embed, and return store-ready records.""" 

485 code_chunks: list[CodeChunk] = chunk_code(path) 

486 if not code_chunks: 

487 return [] 

488 

489 texts = [cc.chunk for cc in code_chunks] 

490 embedder = get_services().embedder 

491 vectors = embedder.embed_batch(texts, source=source_name, on_progress=on_progress) 

492 

493 return [ 

494 ChunkRecord( 

495 source=source_name, 

496 content_type="code", 

497 chunk_type=CHUNK_TYPE_RAW, 

498 page_start=0, 

499 page_end=0, 

500 line_start=cc.line_start, 

501 line_end=cc.line_end, 

502 chunk=cc.chunk, 

503 chunk_index=cc.chunk_index, 

504 vector=vec, 

505 ) 

506 for cc, vec in zip(code_chunks, vectors, strict=True) 

507 ] 

508 

509 

510async def ingest_markdown( 

511 path: Path, 

512 source_name: str, 

513 on_progress: DetailedProgressCallback = noop_callback, 

514) -> list[ChunkRecord]: 

515 """Chunk a markdown file with heading context prepended to each chunk. 

516 Each chunk gets the heading hierarchy path (e.g. "# Setup > ## Install") 

517 prepended for better retrieval context. 

518 """ 

519 raw_text = await asyncio.to_thread(path.read_text, encoding="utf-8", errors="replace") 

520 if not raw_text.strip(): 

521 return [] 

522 

523 texts = chunk_text(raw_text, mime_type="text/markdown", heading_context=True) 

524 if not texts: 

525 return [] 

526 

527 vectors = await asyncio.to_thread( 

528 get_services().embedder.embed_batch, texts, source=source_name, on_progress=on_progress 

529 ) 

530 return [ 

531 ChunkRecord( 

532 source=source_name, 

533 content_type="text", 

534 chunk_type=CHUNK_TYPE_RAW, 

535 page_start=0, 

536 page_end=0, 

537 line_start=0, 

538 line_end=0, 

539 chunk=t, 

540 chunk_index=idx, 

541 vector=vec, 

542 ) 

543 for idx, (t, vec) in enumerate(zip(texts, vectors, strict=True)) 

544 ] 

545 

546 

547async def _rebuild_concept_clusters() -> None: 

548 """Re-run Leiden clustering after sync. No-op if disabled.""" 

549 if not cfg.concept_graph: 

550 return 

551 from lilbee.concepts import concepts_available 

552 

553 if not concepts_available(): 

554 return 

555 try: 

556 cg = get_services().concepts 

557 if not cg.get_graph(): 

558 return 

559 await asyncio.to_thread(cg.rebuild_clusters) 

560 except Exception: 

561 log.warning("Concept cluster rebuild failed", exc_info=True) 

562 

563 

564async def _incremental_wiki_update(changed_sources: set[str]) -> None: 

565 """Regenerate only the wiki pages touched by *changed_sources*. 

566 

567 Runs after a successful sync. Builds a fresh ``ExtractedEntity`` 

568 set from the current corpus, keeps the records that either have no 

569 page on disk yet or whose chunk trail includes one of the changed 

570 sources, and regenerates just those. Above 

571 ``cfg.wiki_ingest_update_cap`` touched pages the auto-update 

572 bails out and logs a manual-update hint instead. 

573 """ 

574 if not cfg.wiki or not changed_sources: 

575 return 

576 # circular: the wiki layer imports lilbee.ingest.file_hash, so these 

577 # stay function-local to break the cycle at the hook-entry boundary. 

578 from lilbee.store import SearchChunk 

579 from lilbee.wiki import append_wiki_log, build_wiki, update_wiki_index 

580 from lilbee.wiki.entity_extractor import EntityKind, get_entity_extractor 

581 from lilbee.wiki.shared import ( 

582 CONCEPTS_SUBDIR, 

583 ENTITIES_SUBDIR, 

584 WIKI_LOG_ACTION_INGEST, 

585 ) 

586 

587 svc = get_services() 

588 extractor = get_entity_extractor(cfg.wiki_entity_mode, svc.provider, cfg) 

589 

590 chunks: list[SearchChunk] = [] 

591 for record in svc.store.get_sources(): 

592 chunks.extend(svc.store.get_chunks_by_source(record["filename"])) 

593 entities = await asyncio.to_thread(extractor.extract, chunks) 

594 

595 wiki_root = cfg.data_root / cfg.wiki_dir 

596 touched = [] 

597 for entity in entities: 

598 # Phase D: the extractor emits only ENTITY kind; CONCEPT is 

599 # reserved for LLM-curated pages produced inside the batched 

600 # call and is intentionally not considered here. Keeping the 

601 # dispatch neutral guards against a future extractor that 

602 # re-introduces CONCEPT. 

603 subdir = CONCEPTS_SUBDIR if entity.kind is EntityKind.CONCEPT else ENTITIES_SUBDIR 

604 page_path = wiki_root / subdir / f"{entity.slug}.md" 

605 if not page_path.exists(): 

606 touched.append(entity) 

607 continue 

608 if any(ref.source in changed_sources for ref in entity.chunk_refs): 

609 touched.append(entity) 

610 

611 if not touched: 

612 return 

613 

614 if len(touched) > cfg.wiki_ingest_update_cap: 

615 # warning, not info: the default LILBEE_LOG_LEVEL is WARNING, so 

616 # log.info would silently drop the manual-update hint and the user 

617 # would see no signal at all during `lilbee sync` when the cap trips. 

618 log.warning( 

619 "Wiki auto-update skipped: %d pages touched (cap %d). " 

620 "Run 'lilbee wiki update' to refresh.", 

621 len(touched), 

622 cfg.wiki_ingest_update_cap, 

623 ) 

624 append_wiki_log( 

625 WIKI_LOG_ACTION_INGEST, 

626 f"skipped: {len(touched)} pages exceeds cap {cfg.wiki_ingest_update_cap}", 

627 ) 

628 return 

629 

630 # extract_concepts=False so an incremental sync does not churn 

631 # concept slugs. Concept curation is a deliberate, user-invoked 

632 # refresh (full `lilbee wiki build`). 

633 pages = await asyncio.to_thread( 

634 build_wiki, touched, svc.provider, svc.store, cfg, extract_concepts=False 

635 ) 

636 update_wiki_index() 

637 append_wiki_log( 

638 WIKI_LOG_ACTION_INGEST, 

639 f"{len(pages)} pages regenerated for {', '.join(sorted(changed_sources))}", 

640 ) 

641 

642 

643async def _index_concepts(records: list[ChunkRecord], source_name: str) -> None: 

644 """Extract and index concepts for ingested chunks. No-op if disabled.""" 

645 if not cfg.concept_graph or not records: 

646 return 

647 from lilbee.concepts import concepts_available 

648 

649 if not concepts_available(): 

650 return 

651 try: 

652 cg = get_services().concepts 

653 texts = [r["chunk"] for r in records] 

654 concept_lists = await asyncio.to_thread(cg.extract_concepts_batch, texts) 

655 chunk_ids = [(source_name, r["chunk_index"]) for r in records] 

656 await asyncio.to_thread(cg.build_from_chunks, chunk_ids, concept_lists) 

657 except Exception: 

658 log.warning("Concept indexing failed for %s", source_name, exc_info=True) 

659 

660 

661async def _ingest_file( 

662 path: Path, 

663 source_name: str, 

664 content_type: str, 

665 *, 

666 quiet: bool = False, 

667 on_progress: DetailedProgressCallback = noop_callback, 

668) -> int: 

669 """Ingest a single file. Returns chunk count.""" 

670 records: list[ChunkRecord] 

671 if content_type == "code": 

672 records = await asyncio.to_thread(ingest_code_sync, path, source_name, on_progress) 

673 elif path.suffix.lower() == ".md": 

674 records = await ingest_markdown(path, source_name, on_progress) 

675 else: 

676 records = await ingest_document( 

677 path, 

678 source_name, 

679 content_type, 

680 quiet=quiet, 

681 on_progress=on_progress, 

682 ) 

683 

684 store = get_services().store 

685 chunk_count = await asyncio.to_thread(store.add_chunks, cast(list[dict], records)) 

686 await _index_concepts(records, source_name) 

687 return chunk_count 

688 

689 

690async def sync( 

691 force_rebuild: bool = False, 

692 quiet: bool = False, 

693 *, 

694 on_progress: DetailedProgressCallback = noop_callback, 

695 cancel: threading.Event | None = None, 

696) -> SyncResult: 

697 """Sync documents/ with the vector store. 

698 Returns summary dict with keys: added, updated, removed, unchanged, failed. 

699 When *quiet* is True, the Rich progress bar is suppressed (for JSON output). 

700 When *cancel* is set, processing stops between files without data loss. 

701 """ 

702 _store = get_services().store 

703 

704 if force_rebuild: 

705 _store.drop_all() 

706 

707 cfg.documents_dir.mkdir(parents=True, exist_ok=True) 

708 

709 disk_files = discover_files() 

710 existing_sources = {s["filename"]: s["file_hash"] for s in _store.get_sources()} 

711 

712 added: list[str] = [] 

713 updated: list[str] = [] 

714 removed: list[str] = [] 

715 unchanged = 0 

716 failed: list[str] = [] 

717 

718 # Find files to remove (in DB but not on disk) 

719 for name in existing_sources: 

720 if name not in disk_files: 

721 _store.delete_by_source(name) 

722 _store.delete_source(name) 

723 removed.append(name) 

724 

725 files_to_process: list[FileToProcess] = [] 

726 

727 for name, path in sorted(disk_files.items()): 

728 if cancel and cancel.is_set(): 

729 break 

730 

731 content_type = classify_file(path) 

732 if content_type is None: 

733 raise ValueError(f"Unsupported file slipped through discovery: {name}") 

734 

735 old_hash = existing_sources.get(name) 

736 

737 current_hash = file_hash(path) 

738 

739 if old_hash == current_hash: 

740 unchanged += 1 

741 continue 

742 

743 # needs_cleanup=True unconditionally: delete_by_source is idempotent, 

744 # and this closes the race where a prior ingest wrote chunks but died 

745 # before upsert_source, leaving orphaned chunks that would duplicate. 

746 files_to_process.append( 

747 FileToProcess(name, path, content_type, current_hash, needs_cleanup=True) 

748 ) 

749 if old_hash is not None: 

750 updated.append(name) 

751 else: 

752 added.append(name) 

753 

754 # Ingest files (with optional progress bar) 

755 if files_to_process: 

756 get_services().embedder.validate_model() 

757 await ingest_batch( 

758 files_to_process, 

759 added, 

760 updated, 

761 failed, 

762 quiet=quiet, 

763 on_progress=on_progress, 

764 cancel=cancel, 

765 ) 

766 

767 if files_to_process or removed: 

768 _store.ensure_fts_index() 

769 await _rebuild_concept_clusters() 

770 await _incremental_wiki_update(set(added) | set(updated) | set(removed)) 

771 

772 result = SyncResult( 

773 added=added, 

774 updated=updated, 

775 removed=removed, 

776 unchanged=unchanged, 

777 failed=failed, 

778 ) 

779 on_progress( 

780 EventType.DONE, 

781 SyncDoneEvent( 

782 added=len(result.added), 

783 updated=len(result.updated), 

784 removed=len(result.removed), 

785 failed=len(result.failed), 

786 ), 

787 ) 

788 return result 

789 

790 

791# Limit concurrent ingestion to avoid overwhelming I/O 

792_MAX_CONCURRENT = os.cpu_count() or 4 

793 

794# Concurrent.futures raises this exact RuntimeError message when submitting to 

795# a shutdown executor (Python 3.11+). There is no dedicated exception class to 

796# catch, so callers have to string-match the message. 

797_EXECUTOR_SHUTDOWN_MSG = "cannot schedule new futures after shutdown" 

798 

799 

800def _is_executor_shutdown(exc: BaseException) -> bool: 

801 """True if ``exc`` is the concurrent.futures shutdown-race RuntimeError.""" 

802 return isinstance(exc, RuntimeError) and _EXECUTOR_SHUTDOWN_MSG in str(exc) 

803 

804 

805async def ingest_batch( 

806 files_to_process: list[FileToProcess], 

807 added: list[str], 

808 updated: list[str], 

809 failed: list[str], 

810 *, 

811 quiet: bool = False, 

812 on_progress: DetailedProgressCallback = noop_callback, 

813 cancel: threading.Event | None = None, 

814) -> None: 

815 """Ingest a batch of files, optionally showing a Rich progress bar. 

816 When *needs_cleanup* is True, old chunks are deleted immediately before 

817 ingesting new ones so the two operations are atomic per file. 

818 When *cancel* is set, pending files raise CancelledError before starting. 

819 """ 

820 semaphore = asyncio.Semaphore(_MAX_CONCURRENT) 

821 total_files = len(files_to_process) 

822 

823 async def _process_one( 

824 name: str, 

825 path: Path, 

826 content_type: str, 

827 fhash: str, 

828 needs_cleanup: bool, 

829 file_index: int, 

830 ) -> _IngestResult: 

831 async with semaphore: 

832 if cancel and cancel.is_set(): 

833 raise asyncio.CancelledError 

834 

835 on_progress( 

836 EventType.FILE_START, 

837 FileStartEvent(file=name, total_files=total_files, current_file=file_index), 

838 ) 

839 try: 

840 if needs_cleanup: 

841 get_services().store.delete_by_source(name) 

842 chunk_count = await _ingest_file( 

843 path, 

844 name, 

845 content_type, 

846 quiet=quiet, 

847 on_progress=on_progress, 

848 ) 

849 on_progress( 

850 EventType.FILE_DONE, 

851 FileDoneEvent(file=name, status="ok", chunks=chunk_count), 

852 ) 

853 return _IngestResult(name, path, chunk_count, error=None, file_hash=fhash) 

854 except asyncio.CancelledError: 

855 raise 

856 except Exception as exc: 

857 # During shutdown, worker pools raise RuntimeError from 

858 # submit(). Prefer to treat these as cancellation rather than 

859 # as ingest failures. Detect via the cancel flag (source of 

860 # truth) or the executor's well-known shutdown message as a 

861 # fallback when cancel was set after the submit race. 

862 if (cancel and cancel.is_set()) or _is_executor_shutdown(exc): 

863 raise asyncio.CancelledError from exc 

864 on_progress( 

865 EventType.FILE_DONE, 

866 FileDoneEvent(file=name, status="error", chunks=0), 

867 ) 

868 return _IngestResult(name, path, 0, error=exc) 

869 

870 if quiet: 

871 tasks = [ 

872 asyncio.ensure_future(_process_one(name, path, ct, fh, cleanup, idx)) 

873 for idx, (name, path, ct, fh, cleanup) in enumerate(files_to_process, 1) 

874 ] 

875 await _collect_results(tasks, added, updated, failed, on_progress=on_progress) 

876 else: 

877 with Progress( 

878 SpinnerColumn(), 

879 TextColumn("{task.description}"), 

880 BarColumn(), 

881 MofNCompleteColumn(), 

882 TimeElapsedColumn(), 

883 transient=True, 

884 ) as progress: 

885 ptask = progress.add_task("Ingesting documents...", total=total_files) 

886 token = shared_progress.set((progress, ptask)) 

887 try: 

888 tasks = [ 

889 asyncio.ensure_future(_process_one(name, path, ct, fh, cleanup, idx)) 

890 for idx, (name, path, ct, fh, cleanup) in enumerate(files_to_process, 1) 

891 ] 

892 await _collect_results( 

893 tasks, 

894 added, 

895 updated, 

896 failed, 

897 on_progress=on_progress, 

898 progress=progress, 

899 ptask=ptask, 

900 ) 

901 finally: 

902 shared_progress.reset(token) 

903 

904 

905async def _collect_results( 

906 tasks: list[asyncio.Task[_IngestResult]], 

907 added: list[str], 

908 updated: list[str], 

909 failed: list[str], 

910 *, 

911 on_progress: DetailedProgressCallback = noop_callback, 

912 progress: Progress | None = None, 

913 ptask: Any = None, 

914) -> None: 

915 """Collect task results, optionally updating a Rich progress bar.""" 

916 for completed_count, fut in enumerate(asyncio.as_completed(tasks), 1): 

917 result = await fut 

918 _apply_result(result, added, updated, failed) 

919 if progress is not None and ptask is not None: 

920 desc = f"Ingested {result.name}" if result.error is None else f"Failed {result.name}" 

921 progress.update(ptask, description=desc) 

922 progress.advance(ptask) 

923 progress_status = "failed" if result.error is not None else "ingested" 

924 on_progress( 

925 EventType.BATCH_PROGRESS, 

926 BatchProgressEvent( 

927 file=result.name, 

928 status=progress_status, 

929 current=completed_count, 

930 total=len(tasks), 

931 ), 

932 ) 

933 

934 

935def _discard_from_list(lst: list[str], value: str) -> None: 

936 """Remove *value* from *lst* if present.""" 

937 with contextlib.suppress(ValueError): 

938 lst.remove(value) 

939 

940 

941def _apply_result( 

942 result: _IngestResult, 

943 added: list[str], 

944 updated: list[str], 

945 failed: list[str], 

946) -> None: 

947 """Record an ingestion result — update store on success, track failure.""" 

948 if result.error is not None: 

949 # Log the error message without the traceback: ingest failures are 

950 # already surfaced to callers via SyncResult.failed, and the raw 

951 # traceback from log.exception bleeds into the TUI chat pane via the 

952 # stderr bridge. Full stack traces stay reachable by 

953 # lowering LILBEE_LOG_LEVEL to DEBUG. 

954 log.warning("Failed to ingest %s: %s", result.name, result.error) 

955 log.debug("Traceback for failed ingest of %s", result.name, exc_info=result.error) 

956 _discard_from_list(added, result.name) 

957 _discard_from_list(updated, result.name) 

958 failed.append(result.name) 

959 return 

960 if result.chunk_count == 0: 

961 # No chunks produced (e.g. scanned PDF without vision model). 

962 # Don't record as a source so it gets retried on next sync. 

963 _discard_from_list(added, result.name) 

964 _discard_from_list(updated, result.name) 

965 return 

966 

967 fhash = result.file_hash or file_hash(result.path) 

968 get_services().store.upsert_source(result.name, fhash, result.chunk_count)