Coverage for src / lilbee / ingest.py: 100%
385 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
1"""Document sync engine — keeps documents/ dir in sync with LanceDB."""
3from __future__ import annotations
5import asyncio
6import contextlib
7import hashlib
8import logging
9import os
10import threading
11from collections.abc import Callable, Generator
12from dataclasses import dataclass
13from enum import StrEnum
14from pathlib import Path
15from typing import TYPE_CHECKING, Any, NamedTuple, TypedDict, cast
17if TYPE_CHECKING:
18 from kreuzberg import ExtractionConfig, ExtractionResult
20from pydantic import BaseModel
21from rich.progress import (
22 BarColumn,
23 MofNCompleteColumn,
24 Progress,
25 SpinnerColumn,
26 TextColumn,
27 TimeElapsedColumn,
28)
30from lilbee.chunk import build_chunking_config, chunk_text
31from lilbee.code_chunker import CodeChunk, chunk_code, is_code_file
32from lilbee.config import cfg
33from lilbee.platform import is_ignored_dir
34from lilbee.progress import (
35 BatchProgressEvent,
36 DetailedProgressCallback,
37 EventType,
38 FileDoneEvent,
39 FileStartEvent,
40 SyncDoneEvent,
41 noop_callback,
42 shared_progress,
43)
44from lilbee.security import validate_path_within
45from lilbee.services import get_services
46from lilbee.store import CHUNK_TYPE_RAW
47from lilbee.vision import extract_pdf_vision
49log = logging.getLogger(__name__)
52class FileToProcess(NamedTuple):
53 """A file queued for ingestion with its metadata."""
55 name: str
56 path: Path
57 content_type: str
58 file_hash: str
59 needs_cleanup: bool
62# Minimum total chars for extracted text to be considered meaningful.
63# 50 chars ≈ 12 words — if a PDF yields less, it's almost certainly a scanned
64# document with no embedded text layer. Text PDFs with even just a title page
65# easily exceed this threshold; blank/scan-only PDFs yield 0 chars.
66_MIN_MEANINGFUL_CHARS = 50
68_PDF_CONTENT_TYPE = "pdf"
69_MARKDOWN_OUTPUT = "markdown"
70_TESSERACT_BACKEND = "tesseract"
73class ExtractMode(StrEnum):
74 """Extraction topology: pagination / OCR / output format."""
76 MARKDOWN = "markdown"
77 PAGINATED = "paginated"
78 PAGINATED_OCR = "paginated_ocr"
81def _has_meaningful_text(result: Any) -> bool:
82 """Check if extraction produced meaningful text."""
83 chunks = getattr(result, "chunks", None)
84 if chunks:
85 total = sum(len(c.content.strip()) for c in chunks)
86 return total > _MIN_MEANINGFUL_CHARS
87 return False
90class ChunkRecord(TypedDict):
91 """A single store-ready chunk record matching store.CHUNKS_SCHEMA."""
93 source: str
94 content_type: str
95 chunk_type: str
96 page_start: int
97 page_end: int
98 line_start: int
99 line_end: int
100 chunk: str
101 chunk_index: int
102 vector: list[float]
105class SyncResult(BaseModel):
106 """Summary of a sync operation."""
108 added: list[str] = []
109 updated: list[str] = []
110 removed: list[str] = []
111 unchanged: int = 0
112 failed: list[str] = []
114 def __str__(self) -> str:
115 lines = [
116 f"Added: {len(self.added)}",
117 f"Updated: {len(self.updated)}",
118 f"Removed: {len(self.removed)}",
119 f"Unchanged: {self.unchanged}",
120 f"Failed: {len(self.failed)}",
121 ]
122 for f in self.failed:
123 lines.append(f" [red]{f}[/red]")
124 return "\n".join(lines)
126 def __repr__(self) -> str:
127 return (
128 f"SyncResult(added={len(self.added)}, updated={len(self.updated)}, "
129 f"removed={len(self.removed)}, unchanged={self.unchanged}, "
130 f"failed={len(self.failed)})"
131 )
133 def __rich__(self) -> str:
134 return self.__str__()
137@dataclass
138class _IngestResult:
139 """Outcome of a single file ingestion attempt."""
141 name: str
142 path: Path
143 chunk_count: int
144 error: Exception | None
145 file_hash: str = ""
148# Extension → content_type string for document formats handled by kreuzberg
149_DOCUMENT_EXTENSION_MAP: dict[str, str] = {
150 **{ext: "text" for ext in (".md", ".txt", ".html", ".rst", ".yaml", ".yml")},
151 ".pdf": _PDF_CONTENT_TYPE,
152 **{ext: ext.lstrip(".") for ext in (".docx", ".xlsx", ".pptx")},
153 ".epub": "epub",
154 **{ext: "image" for ext in (".png", ".jpg", ".jpeg", ".tiff", ".tif", ".bmp", ".webp")},
155 **{ext: "data" for ext in (".csv", ".tsv")},
156 ".xml": "xml",
157 **{ext: "json" for ext in (".json", ".jsonl")},
158}
161def file_hash(path: Path) -> str:
162 """Compute SHA-256 hex digest of a file."""
163 h = hashlib.sha256()
164 with open(path, "rb") as f:
165 for block in iter(lambda: f.read(8192), b""):
166 h.update(block)
167 return h.hexdigest()
170def _relative_name(path: Path) -> str:
171 """Get path relative to documents dir as a forward-slash string (portable across OS)."""
172 return path.relative_to(cfg.documents_dir).as_posix()
175def discover_files() -> dict[str, Path]:
176 """Scan documents/ recursively, return {relative_name: absolute_path}."""
177 if not cfg.documents_dir.exists():
178 return {}
179 docs_resolved = cfg.documents_dir.resolve()
180 files: dict[str, Path] = {}
181 for root, dirs, filenames in os.walk(cfg.documents_dir, topdown=True):
182 dirs[:] = [d for d in dirs if not is_ignored_dir(d, cfg.ignore_dirs)]
183 for fname in filenames:
184 if fname.startswith("."):
185 continue
186 path = Path(root) / fname
187 try:
188 validate_path_within(path, docs_resolved)
189 except ValueError:
190 log.warning("Symlink escapes documents dir, skipping: %s", path)
191 continue
192 if classify_file(path) is not None:
193 files[_relative_name(path)] = path
194 return files
197def classify_file(path: Path) -> str | None:
198 """Classify file by extension. Returns content_type or None if unsupported."""
199 doc_type = _DOCUMENT_EXTENSION_MAP.get(path.suffix.lower())
200 if doc_type is not None:
201 return doc_type
202 if is_code_file(path):
203 return "code"
204 return None
207def content_type_to_mode(content_type: str) -> ExtractMode:
208 """Map a content_type to the extraction mode."""
209 return ExtractMode.PAGINATED if content_type == _PDF_CONTENT_TYPE else ExtractMode.MARKDOWN
212def extraction_config(mode: ExtractMode) -> ExtractionConfig:
213 """Build ExtractionConfig for the given extraction mode."""
214 from kreuzberg import ExtractionConfig, OcrConfig, PageConfig
216 chunking = build_chunking_config()
217 pages = PageConfig(extract_pages=True, insert_page_markers=False)
218 ocr = OcrConfig(backend=_TESSERACT_BACKEND)
219 builders: dict[ExtractMode, Callable[[], ExtractionConfig]] = {
220 ExtractMode.MARKDOWN: lambda: ExtractionConfig(
221 chunking=chunking,
222 output_format=_MARKDOWN_OUTPUT,
223 ),
224 ExtractMode.PAGINATED: lambda: ExtractionConfig(
225 chunking=chunking,
226 pages=pages,
227 ),
228 ExtractMode.PAGINATED_OCR: lambda: ExtractionConfig(
229 chunking=chunking,
230 pages=pages,
231 ocr=ocr,
232 ),
233 }
234 return builders[mode]()
237@contextlib.contextmanager
238def suppress_fd_stderr() -> Generator[None, None, None]:
239 """Suppress stderr at the file-descriptor level.
240 Catches subprocess output (e.g. Tesseract's "Detected N diacritics")
241 that ``contextlib.redirect_stderr`` cannot intercept.
242 """
243 old_stderr = os.dup(2)
244 try:
245 devnull = os.open(os.devnull, os.O_WRONLY)
246 try:
247 os.dup2(devnull, 2)
248 yield
249 finally:
250 os.dup2(old_stderr, 2)
251 os.close(devnull)
252 finally:
253 os.close(old_stderr)
256async def _try_tesseract_ocr(
257 path: Path, source_name: str, fallback: ExtractionResult
258) -> ExtractionResult:
259 """Attempt Tesseract OCR on a scanned PDF. Returns the OCR result or *fallback* on failure.
261 Wraps extraction in ``asyncio.wait_for(cfg.tesseract_timeout)`` so a
262 huge scanned document can't monopolize an ingest worker for many
263 minutes (which the caller perceives as a UI lockup). The timeout is
264 configurable via ``LILBEE_TESSERACT_TIMEOUT``; 0 disables the cap.
265 """
266 try:
267 from kreuzberg import extract_file
269 log.info("PDF text extraction empty, trying Tesseract OCR: %s", source_name)
270 with suppress_fd_stderr():
271 coro = extract_file(str(path), config=extraction_config(ExtractMode.PAGINATED_OCR))
272 if cfg.tesseract_timeout > 0:
273 return await asyncio.wait_for(coro, timeout=cfg.tesseract_timeout)
274 return await coro
275 except TimeoutError:
276 log.warning(
277 "Tesseract OCR exceeded %.0fs timeout on %s; skipping.",
278 cfg.tesseract_timeout,
279 source_name,
280 )
281 return fallback
282 except Exception:
283 log.debug("Tesseract OCR unavailable or failed for %s, skipping", source_name)
284 return fallback
287def _should_run_ocr() -> bool:
288 """Decide whether to attempt vision-based OCR on scanned PDFs.
290 Uses ``cfg.enable_ocr`` and ``cfg.vision_model``:
291 True = force on (requires ``cfg.vision_model`` to be set for a real
292 vision run; otherwise the caller falls back to Tesseract).
293 False = force off.
294 None = auto-detect: run vision OCR when ``cfg.vision_model`` is set.
295 """
296 if cfg.enable_ocr is True:
297 return True
298 if cfg.enable_ocr is False:
299 return False
300 return bool(cfg.vision_model)
303async def _vision_fallback(
304 path: Path,
305 source_name: str,
306 content_type: str,
307 on_progress: DetailedProgressCallback = noop_callback,
308 *,
309 quiet: bool = False,
310) -> list[ChunkRecord]:
311 """OCR a scanned PDF via the configured vision model, chunk, and embed.
313 Uses ``cfg.vision_model`` unconditionally. The chat model is never
314 loaded as a vision backend. If ``cfg.vision_model`` is empty this
315 returns an empty list; callers should fall back to Tesseract via
316 ``_handle_scanned_pdf_fallback``.
317 """
318 if not cfg.vision_model:
319 return []
320 try:
321 page_texts = await asyncio.to_thread(
322 extract_pdf_vision,
323 path,
324 cfg.vision_model,
325 quiet=quiet,
326 timeout=cfg.ocr_timeout,
327 on_progress=on_progress,
328 )
329 except Exception:
330 log.warning(
331 "Vision OCR failed for %s using vision model %s.",
332 source_name,
333 cfg.vision_model,
334 exc_info=True,
335 )
336 return []
337 if not page_texts:
338 return []
340 # Single OCR page rarely spans multiple topics; skip the semantic round-trip.
341 all_chunks = [
342 (page_num, chunk)
343 for page_num, text in page_texts
344 for chunk in chunk_text(text, use_semantic=False)
345 ]
346 if not all_chunks:
347 return []
349 texts = [c for _, c in all_chunks]
351 vectors = await asyncio.to_thread(
352 get_services().embedder.embed_batch, texts, source=source_name, on_progress=on_progress
353 )
354 return [
355 ChunkRecord(
356 source=source_name,
357 content_type=content_type,
358 chunk_type=CHUNK_TYPE_RAW,
359 page_start=page_num,
360 page_end=page_num,
361 line_start=0,
362 line_end=0,
363 chunk=text,
364 chunk_index=i,
365 vector=vec,
366 )
367 for i, ((page_num, text), vec) in enumerate(zip(all_chunks, vectors, strict=True))
368 ]
371async def _handle_scanned_pdf_fallback(
372 path: Path,
373 source_name: str,
374 content_type: str,
375 result: ExtractionResult,
376 *,
377 quiet: bool,
378 on_progress: DetailedProgressCallback,
379) -> list[ChunkRecord] | ExtractionResult:
380 """Handle scanned PDF fallback chain: Tesseract OCR then vision model.
382 Returns chunk records if a fallback produced final results, or an
383 updated ExtractionResult when Tesseract OCR succeeded (so the
384 caller can proceed with normal chunking/embedding).
386 When vision OCR is available (``_should_run_ocr()`` True) we go
387 straight to it. Tesseract is only attempted when vision isn't an
388 option at all. Running a huge scanned PDF through vision *and then*
389 through Tesseract would double the wall-clock cost for no reason,
390 and Tesseract on a 50+ MB document otherwise feels like a TUI
391 lockup to the user.
392 """
393 use_ocr = _should_run_ocr()
395 if use_ocr and cfg.vision_model:
396 log.info(
397 "Scanned PDF: using vision OCR for %s (model=%s)",
398 source_name,
399 cfg.vision_model,
400 )
401 return await _vision_fallback(path, source_name, content_type, on_progress, quiet=quiet)
403 result = await _try_tesseract_ocr(path, source_name, result)
405 if not _has_meaningful_text(result):
406 log.warning(
407 "Skipped %s: text extraction produced no usable text. "
408 "For better results on scanned PDFs, configure a vision model "
409 "via PUT /api/models/vision or set LILBEE_ENABLE_OCR=true.",
410 source_name,
411 )
412 return []
414 log.info(
415 "Scanned PDF detected — extracted with Tesseract OCR: %s. "
416 "For structured markdown output (tables, headings), "
417 "configure a vision model via PUT /api/models/vision.",
418 source_name,
419 )
420 return result
423async def ingest_document(
424 path: Path,
425 source_name: str,
426 content_type: str,
427 *,
428 quiet: bool = False,
429 on_progress: DetailedProgressCallback = noop_callback,
430) -> list[ChunkRecord]:
431 """Extract and chunk a document, embed, return records.
433 Vision OCR is controlled by ``cfg.enable_ocr`` (see ``_should_run_ocr``).
434 """
435 from kreuzberg import extract_file
437 config = extraction_config(content_type_to_mode(content_type))
438 result = await extract_file(str(path), config=config)
440 if content_type == _PDF_CONTENT_TYPE and not _has_meaningful_text(result):
441 fallback = await _handle_scanned_pdf_fallback(
442 path,
443 source_name,
444 content_type,
445 result,
446 quiet=quiet,
447 on_progress=on_progress,
448 )
449 if isinstance(fallback, list):
450 return fallback
451 # Tesseract OCR succeeded — use the updated ExtractionResult
452 result = fallback
454 if not result.chunks:
455 return []
457 texts = [chunk.content for chunk in result.chunks]
458 vectors = await asyncio.to_thread(
459 get_services().embedder.embed_batch, texts, source=source_name, on_progress=on_progress
460 )
462 return [
463 ChunkRecord(
464 source=source_name,
465 content_type=content_type,
466 chunk_type=CHUNK_TYPE_RAW,
467 page_start=chunk.metadata.get("first_page") or 0,
468 page_end=chunk.metadata.get("last_page") or 0,
469 line_start=0,
470 line_end=0,
471 chunk=text,
472 chunk_index=chunk.metadata.get("chunk_index", idx),
473 vector=vec,
474 )
475 for idx, (chunk, text, vec) in enumerate(zip(result.chunks, texts, vectors, strict=True))
476 ]
479def ingest_code_sync(
480 path: Path,
481 source_name: str,
482 on_progress: DetailedProgressCallback = noop_callback,
483) -> list[ChunkRecord]:
484 """Parse code with tree-sitter, chunk, embed, and return store-ready records."""
485 code_chunks: list[CodeChunk] = chunk_code(path)
486 if not code_chunks:
487 return []
489 texts = [cc.chunk for cc in code_chunks]
490 embedder = get_services().embedder
491 vectors = embedder.embed_batch(texts, source=source_name, on_progress=on_progress)
493 return [
494 ChunkRecord(
495 source=source_name,
496 content_type="code",
497 chunk_type=CHUNK_TYPE_RAW,
498 page_start=0,
499 page_end=0,
500 line_start=cc.line_start,
501 line_end=cc.line_end,
502 chunk=cc.chunk,
503 chunk_index=cc.chunk_index,
504 vector=vec,
505 )
506 for cc, vec in zip(code_chunks, vectors, strict=True)
507 ]
510async def ingest_markdown(
511 path: Path,
512 source_name: str,
513 on_progress: DetailedProgressCallback = noop_callback,
514) -> list[ChunkRecord]:
515 """Chunk a markdown file with heading context prepended to each chunk.
516 Each chunk gets the heading hierarchy path (e.g. "# Setup > ## Install")
517 prepended for better retrieval context.
518 """
519 raw_text = await asyncio.to_thread(path.read_text, encoding="utf-8", errors="replace")
520 if not raw_text.strip():
521 return []
523 texts = chunk_text(raw_text, mime_type="text/markdown", heading_context=True)
524 if not texts:
525 return []
527 vectors = await asyncio.to_thread(
528 get_services().embedder.embed_batch, texts, source=source_name, on_progress=on_progress
529 )
530 return [
531 ChunkRecord(
532 source=source_name,
533 content_type="text",
534 chunk_type=CHUNK_TYPE_RAW,
535 page_start=0,
536 page_end=0,
537 line_start=0,
538 line_end=0,
539 chunk=t,
540 chunk_index=idx,
541 vector=vec,
542 )
543 for idx, (t, vec) in enumerate(zip(texts, vectors, strict=True))
544 ]
547async def _rebuild_concept_clusters() -> None:
548 """Re-run Leiden clustering after sync. No-op if disabled."""
549 if not cfg.concept_graph:
550 return
551 from lilbee.concepts import concepts_available
553 if not concepts_available():
554 return
555 try:
556 cg = get_services().concepts
557 if not cg.get_graph():
558 return
559 await asyncio.to_thread(cg.rebuild_clusters)
560 except Exception:
561 log.warning("Concept cluster rebuild failed", exc_info=True)
564async def _incremental_wiki_update(changed_sources: set[str]) -> None:
565 """Regenerate only the wiki pages touched by *changed_sources*.
567 Runs after a successful sync. Builds a fresh ``ExtractedEntity``
568 set from the current corpus, keeps the records that either have no
569 page on disk yet or whose chunk trail includes one of the changed
570 sources, and regenerates just those. Above
571 ``cfg.wiki_ingest_update_cap`` touched pages the auto-update
572 bails out and logs a manual-update hint instead.
573 """
574 if not cfg.wiki or not changed_sources:
575 return
576 # circular: the wiki layer imports lilbee.ingest.file_hash, so these
577 # stay function-local to break the cycle at the hook-entry boundary.
578 from lilbee.store import SearchChunk
579 from lilbee.wiki import append_wiki_log, build_wiki, update_wiki_index
580 from lilbee.wiki.entity_extractor import EntityKind, get_entity_extractor
581 from lilbee.wiki.shared import (
582 CONCEPTS_SUBDIR,
583 ENTITIES_SUBDIR,
584 WIKI_LOG_ACTION_INGEST,
585 )
587 svc = get_services()
588 extractor = get_entity_extractor(cfg.wiki_entity_mode, svc.provider, cfg)
590 chunks: list[SearchChunk] = []
591 for record in svc.store.get_sources():
592 chunks.extend(svc.store.get_chunks_by_source(record["filename"]))
593 entities = await asyncio.to_thread(extractor.extract, chunks)
595 wiki_root = cfg.data_root / cfg.wiki_dir
596 touched = []
597 for entity in entities:
598 # Phase D: the extractor emits only ENTITY kind; CONCEPT is
599 # reserved for LLM-curated pages produced inside the batched
600 # call and is intentionally not considered here. Keeping the
601 # dispatch neutral guards against a future extractor that
602 # re-introduces CONCEPT.
603 subdir = CONCEPTS_SUBDIR if entity.kind is EntityKind.CONCEPT else ENTITIES_SUBDIR
604 page_path = wiki_root / subdir / f"{entity.slug}.md"
605 if not page_path.exists():
606 touched.append(entity)
607 continue
608 if any(ref.source in changed_sources for ref in entity.chunk_refs):
609 touched.append(entity)
611 if not touched:
612 return
614 if len(touched) > cfg.wiki_ingest_update_cap:
615 # warning, not info: the default LILBEE_LOG_LEVEL is WARNING, so
616 # log.info would silently drop the manual-update hint and the user
617 # would see no signal at all during `lilbee sync` when the cap trips.
618 log.warning(
619 "Wiki auto-update skipped: %d pages touched (cap %d). "
620 "Run 'lilbee wiki update' to refresh.",
621 len(touched),
622 cfg.wiki_ingest_update_cap,
623 )
624 append_wiki_log(
625 WIKI_LOG_ACTION_INGEST,
626 f"skipped: {len(touched)} pages exceeds cap {cfg.wiki_ingest_update_cap}",
627 )
628 return
630 # extract_concepts=False so an incremental sync does not churn
631 # concept slugs. Concept curation is a deliberate, user-invoked
632 # refresh (full `lilbee wiki build`).
633 pages = await asyncio.to_thread(
634 build_wiki, touched, svc.provider, svc.store, cfg, extract_concepts=False
635 )
636 update_wiki_index()
637 append_wiki_log(
638 WIKI_LOG_ACTION_INGEST,
639 f"{len(pages)} pages regenerated for {', '.join(sorted(changed_sources))}",
640 )
643async def _index_concepts(records: list[ChunkRecord], source_name: str) -> None:
644 """Extract and index concepts for ingested chunks. No-op if disabled."""
645 if not cfg.concept_graph or not records:
646 return
647 from lilbee.concepts import concepts_available
649 if not concepts_available():
650 return
651 try:
652 cg = get_services().concepts
653 texts = [r["chunk"] for r in records]
654 concept_lists = await asyncio.to_thread(cg.extract_concepts_batch, texts)
655 chunk_ids = [(source_name, r["chunk_index"]) for r in records]
656 await asyncio.to_thread(cg.build_from_chunks, chunk_ids, concept_lists)
657 except Exception:
658 log.warning("Concept indexing failed for %s", source_name, exc_info=True)
661async def _ingest_file(
662 path: Path,
663 source_name: str,
664 content_type: str,
665 *,
666 quiet: bool = False,
667 on_progress: DetailedProgressCallback = noop_callback,
668) -> int:
669 """Ingest a single file. Returns chunk count."""
670 records: list[ChunkRecord]
671 if content_type == "code":
672 records = await asyncio.to_thread(ingest_code_sync, path, source_name, on_progress)
673 elif path.suffix.lower() == ".md":
674 records = await ingest_markdown(path, source_name, on_progress)
675 else:
676 records = await ingest_document(
677 path,
678 source_name,
679 content_type,
680 quiet=quiet,
681 on_progress=on_progress,
682 )
684 store = get_services().store
685 chunk_count = await asyncio.to_thread(store.add_chunks, cast(list[dict], records))
686 await _index_concepts(records, source_name)
687 return chunk_count
690async def sync(
691 force_rebuild: bool = False,
692 quiet: bool = False,
693 *,
694 on_progress: DetailedProgressCallback = noop_callback,
695 cancel: threading.Event | None = None,
696) -> SyncResult:
697 """Sync documents/ with the vector store.
698 Returns summary dict with keys: added, updated, removed, unchanged, failed.
699 When *quiet* is True, the Rich progress bar is suppressed (for JSON output).
700 When *cancel* is set, processing stops between files without data loss.
701 """
702 _store = get_services().store
704 if force_rebuild:
705 _store.drop_all()
707 cfg.documents_dir.mkdir(parents=True, exist_ok=True)
709 disk_files = discover_files()
710 existing_sources = {s["filename"]: s["file_hash"] for s in _store.get_sources()}
712 added: list[str] = []
713 updated: list[str] = []
714 removed: list[str] = []
715 unchanged = 0
716 failed: list[str] = []
718 # Find files to remove (in DB but not on disk)
719 for name in existing_sources:
720 if name not in disk_files:
721 _store.delete_by_source(name)
722 _store.delete_source(name)
723 removed.append(name)
725 files_to_process: list[FileToProcess] = []
727 for name, path in sorted(disk_files.items()):
728 if cancel and cancel.is_set():
729 break
731 content_type = classify_file(path)
732 if content_type is None:
733 raise ValueError(f"Unsupported file slipped through discovery: {name}")
735 old_hash = existing_sources.get(name)
737 current_hash = file_hash(path)
739 if old_hash == current_hash:
740 unchanged += 1
741 continue
743 # needs_cleanup=True unconditionally: delete_by_source is idempotent,
744 # and this closes the race where a prior ingest wrote chunks but died
745 # before upsert_source, leaving orphaned chunks that would duplicate.
746 files_to_process.append(
747 FileToProcess(name, path, content_type, current_hash, needs_cleanup=True)
748 )
749 if old_hash is not None:
750 updated.append(name)
751 else:
752 added.append(name)
754 # Ingest files (with optional progress bar)
755 if files_to_process:
756 get_services().embedder.validate_model()
757 await ingest_batch(
758 files_to_process,
759 added,
760 updated,
761 failed,
762 quiet=quiet,
763 on_progress=on_progress,
764 cancel=cancel,
765 )
767 if files_to_process or removed:
768 _store.ensure_fts_index()
769 await _rebuild_concept_clusters()
770 await _incremental_wiki_update(set(added) | set(updated) | set(removed))
772 result = SyncResult(
773 added=added,
774 updated=updated,
775 removed=removed,
776 unchanged=unchanged,
777 failed=failed,
778 )
779 on_progress(
780 EventType.DONE,
781 SyncDoneEvent(
782 added=len(result.added),
783 updated=len(result.updated),
784 removed=len(result.removed),
785 failed=len(result.failed),
786 ),
787 )
788 return result
791# Limit concurrent ingestion to avoid overwhelming I/O
792_MAX_CONCURRENT = os.cpu_count() or 4
794# Concurrent.futures raises this exact RuntimeError message when submitting to
795# a shutdown executor (Python 3.11+). There is no dedicated exception class to
796# catch, so callers have to string-match the message.
797_EXECUTOR_SHUTDOWN_MSG = "cannot schedule new futures after shutdown"
800def _is_executor_shutdown(exc: BaseException) -> bool:
801 """True if ``exc`` is the concurrent.futures shutdown-race RuntimeError."""
802 return isinstance(exc, RuntimeError) and _EXECUTOR_SHUTDOWN_MSG in str(exc)
805async def ingest_batch(
806 files_to_process: list[FileToProcess],
807 added: list[str],
808 updated: list[str],
809 failed: list[str],
810 *,
811 quiet: bool = False,
812 on_progress: DetailedProgressCallback = noop_callback,
813 cancel: threading.Event | None = None,
814) -> None:
815 """Ingest a batch of files, optionally showing a Rich progress bar.
816 When *needs_cleanup* is True, old chunks are deleted immediately before
817 ingesting new ones so the two operations are atomic per file.
818 When *cancel* is set, pending files raise CancelledError before starting.
819 """
820 semaphore = asyncio.Semaphore(_MAX_CONCURRENT)
821 total_files = len(files_to_process)
823 async def _process_one(
824 name: str,
825 path: Path,
826 content_type: str,
827 fhash: str,
828 needs_cleanup: bool,
829 file_index: int,
830 ) -> _IngestResult:
831 async with semaphore:
832 if cancel and cancel.is_set():
833 raise asyncio.CancelledError
835 on_progress(
836 EventType.FILE_START,
837 FileStartEvent(file=name, total_files=total_files, current_file=file_index),
838 )
839 try:
840 if needs_cleanup:
841 get_services().store.delete_by_source(name)
842 chunk_count = await _ingest_file(
843 path,
844 name,
845 content_type,
846 quiet=quiet,
847 on_progress=on_progress,
848 )
849 on_progress(
850 EventType.FILE_DONE,
851 FileDoneEvent(file=name, status="ok", chunks=chunk_count),
852 )
853 return _IngestResult(name, path, chunk_count, error=None, file_hash=fhash)
854 except asyncio.CancelledError:
855 raise
856 except Exception as exc:
857 # During shutdown, worker pools raise RuntimeError from
858 # submit(). Prefer to treat these as cancellation rather than
859 # as ingest failures. Detect via the cancel flag (source of
860 # truth) or the executor's well-known shutdown message as a
861 # fallback when cancel was set after the submit race.
862 if (cancel and cancel.is_set()) or _is_executor_shutdown(exc):
863 raise asyncio.CancelledError from exc
864 on_progress(
865 EventType.FILE_DONE,
866 FileDoneEvent(file=name, status="error", chunks=0),
867 )
868 return _IngestResult(name, path, 0, error=exc)
870 if quiet:
871 tasks = [
872 asyncio.ensure_future(_process_one(name, path, ct, fh, cleanup, idx))
873 for idx, (name, path, ct, fh, cleanup) in enumerate(files_to_process, 1)
874 ]
875 await _collect_results(tasks, added, updated, failed, on_progress=on_progress)
876 else:
877 with Progress(
878 SpinnerColumn(),
879 TextColumn("{task.description}"),
880 BarColumn(),
881 MofNCompleteColumn(),
882 TimeElapsedColumn(),
883 transient=True,
884 ) as progress:
885 ptask = progress.add_task("Ingesting documents...", total=total_files)
886 token = shared_progress.set((progress, ptask))
887 try:
888 tasks = [
889 asyncio.ensure_future(_process_one(name, path, ct, fh, cleanup, idx))
890 for idx, (name, path, ct, fh, cleanup) in enumerate(files_to_process, 1)
891 ]
892 await _collect_results(
893 tasks,
894 added,
895 updated,
896 failed,
897 on_progress=on_progress,
898 progress=progress,
899 ptask=ptask,
900 )
901 finally:
902 shared_progress.reset(token)
905async def _collect_results(
906 tasks: list[asyncio.Task[_IngestResult]],
907 added: list[str],
908 updated: list[str],
909 failed: list[str],
910 *,
911 on_progress: DetailedProgressCallback = noop_callback,
912 progress: Progress | None = None,
913 ptask: Any = None,
914) -> None:
915 """Collect task results, optionally updating a Rich progress bar."""
916 for completed_count, fut in enumerate(asyncio.as_completed(tasks), 1):
917 result = await fut
918 _apply_result(result, added, updated, failed)
919 if progress is not None and ptask is not None:
920 desc = f"Ingested {result.name}" if result.error is None else f"Failed {result.name}"
921 progress.update(ptask, description=desc)
922 progress.advance(ptask)
923 progress_status = "failed" if result.error is not None else "ingested"
924 on_progress(
925 EventType.BATCH_PROGRESS,
926 BatchProgressEvent(
927 file=result.name,
928 status=progress_status,
929 current=completed_count,
930 total=len(tasks),
931 ),
932 )
935def _discard_from_list(lst: list[str], value: str) -> None:
936 """Remove *value* from *lst* if present."""
937 with contextlib.suppress(ValueError):
938 lst.remove(value)
941def _apply_result(
942 result: _IngestResult,
943 added: list[str],
944 updated: list[str],
945 failed: list[str],
946) -> None:
947 """Record an ingestion result — update store on success, track failure."""
948 if result.error is not None:
949 # Log the error message without the traceback: ingest failures are
950 # already surfaced to callers via SyncResult.failed, and the raw
951 # traceback from log.exception bleeds into the TUI chat pane via the
952 # stderr bridge. Full stack traces stay reachable by
953 # lowering LILBEE_LOG_LEVEL to DEBUG.
954 log.warning("Failed to ingest %s: %s", result.name, result.error)
955 log.debug("Traceback for failed ingest of %s", result.name, exc_info=result.error)
956 _discard_from_list(added, result.name)
957 _discard_from_list(updated, result.name)
958 failed.append(result.name)
959 return
960 if result.chunk_count == 0:
961 # No chunks produced (e.g. scanned PDF without vision model).
962 # Don't record as a source so it gets retried on next sync.
963 _discard_from_list(added, result.name)
964 _discard_from_list(updated, result.name)
965 return
967 fhash = result.file_hash or file_hash(result.path)
968 get_services().store.upsert_source(result.name, fhash, result.chunk_count)