Coverage for src / lilbee / ingest.py: 100%
254 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 08:27 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 08:27 +0000
1"""Document sync engine — keeps documents/ dir in sync with LanceDB."""
3import asyncio
4import hashlib
5import logging
6import os
7from collections.abc import Callable
8from dataclasses import dataclass
9from pathlib import Path
10from typing import Any, TypedDict, cast
12from pydantic import BaseModel
13from rich.progress import Progress, SpinnerColumn, TextColumn
15from lilbee import embedder, store
16from lilbee.chunker import chunk_text
17from lilbee.code_chunker import CodeChunk, chunk_code, supported_extensions
18from lilbee.config import cfg
19from lilbee.platform import is_ignored_dir
20from lilbee.preprocessors import preprocess_csv, preprocess_json, preprocess_xml
21from lilbee.progress import (
22 BatchProgressEvent,
23 DetailedProgressCallback,
24 EventType,
25 FileDoneEvent,
26 FileStartEvent,
27 SyncDoneEvent,
28 noop_callback,
29)
31log = logging.getLogger(__name__)
33# Minimum total chars for kreuzberg text to be considered meaningful.
34# 50 chars ≈ 12 words — if a PDF yields less, it's almost certainly a scanned
35# document with no embedded text layer. Text PDFs with even just a title page
36# easily exceed this threshold; blank/scan-only PDFs yield 0 chars.
37_MIN_MEANINGFUL_CHARS = 50
39# Approximate chars-per-token ratio (kreuzberg uses chars, not tokens)
40_CHARS_PER_TOKEN = 4
43def _has_meaningful_text(result: Any) -> bool:
44 """Check if kreuzberg extraction produced meaningful text."""
45 if hasattr(result, "chunks") and result.chunks:
46 total = sum(len(c.content.strip()) for c in result.chunks)
47 return total > _MIN_MEANINGFUL_CHARS
48 return False
51class ChunkRecord(TypedDict):
52 """A single store-ready chunk record matching store.CHUNKS_SCHEMA."""
54 source: str
55 content_type: str
56 page_start: int
57 page_end: int
58 line_start: int
59 line_end: int
60 chunk: str
61 chunk_index: int
62 vector: list[float]
65class SyncResult(BaseModel):
66 """Summary of a sync operation."""
68 added: list[str] = []
69 updated: list[str] = []
70 removed: list[str] = []
71 unchanged: int = 0
72 failed: list[str] = []
74 def __str__(self) -> str:
75 lines = [
76 f"Added: {len(self.added)}",
77 f"Updated: {len(self.updated)}",
78 f"Removed: {len(self.removed)}",
79 f"Unchanged: {self.unchanged}",
80 f"Failed: {len(self.failed)}",
81 ]
82 for f in self.failed:
83 lines.append(f" [red]{f}[/red]")
84 return "\n".join(lines)
86 def __repr__(self) -> str:
87 return self.__str__()
89 def __rich__(self) -> str:
90 return self.__str__()
93@dataclass
94class _IngestResult:
95 """Outcome of a single file ingestion attempt."""
97 name: str
98 path: Path
99 chunk_count: int
100 error: Exception | None
103# File extensions routed to the code chunker (tree-sitter)
104_CODE_EXTENSIONS = supported_extensions()
106# All document extensions handled by kreuzberg or structured preprocessors
107_DOCUMENT_EXTENSIONS = frozenset(
108 {
109 ".md",
110 ".txt",
111 ".html",
112 ".rst",
113 ".pdf",
114 ".docx",
115 ".xlsx",
116 ".pptx",
117 ".epub",
118 ".png",
119 ".jpg",
120 ".jpeg",
121 ".tiff",
122 ".tif",
123 ".bmp",
124 ".webp",
125 ".csv",
126 ".tsv",
127 ".xml",
128 ".json",
129 ".jsonl",
130 ".yaml",
131 ".yml",
132 }
133)
135# Extension → content_type string for metadata
136_EXTENSION_MAP: dict[str, str] = {
137 **{ext: "text" for ext in (".md", ".txt", ".html", ".rst", ".yaml", ".yml")},
138 ".pdf": "pdf",
139 **{ext: "code" for ext in _CODE_EXTENSIONS if ext not in _DOCUMENT_EXTENSIONS},
140 **{ext: ext.lstrip(".") for ext in (".docx", ".xlsx", ".pptx")},
141 ".epub": "epub",
142 **{ext: "image" for ext in (".png", ".jpg", ".jpeg", ".tiff", ".tif", ".bmp", ".webp")},
143 **{ext: "data" for ext in (".csv", ".tsv")},
144 ".xml": "xml",
145 **{ext: "json" for ext in (".json", ".jsonl")},
146}
149# Preprocessors for structured formats: content_type → callable(Path) → str
150_PREPROCESSORS: dict[str, Callable[[Path], str]] = {
151 "xml": preprocess_xml,
152 "json": preprocess_json,
153 "data": preprocess_csv,
154}
157def file_hash(path: Path) -> str:
158 """Compute SHA-256 hex digest of a file."""
159 h = hashlib.sha256()
160 with open(path, "rb") as f:
161 for block in iter(lambda: f.read(8192), b""):
162 h.update(block)
163 return h.hexdigest()
166def _relative_name(path: Path) -> str:
167 """Get path relative to documents dir as a forward-slash string (portable across OS)."""
168 return path.relative_to(cfg.documents_dir).as_posix()
171def discover_files() -> dict[str, Path]:
172 """Scan documents/ recursively, return {relative_name: absolute_path}."""
173 if not cfg.documents_dir.exists():
174 return {}
175 files: dict[str, Path] = {}
176 for root, dirs, filenames in os.walk(cfg.documents_dir, topdown=True):
177 dirs[:] = [d for d in dirs if not is_ignored_dir(d, cfg.ignore_dirs)]
178 for fname in filenames:
179 if fname.startswith("."):
180 continue
181 path = Path(root) / fname
182 if path.suffix.lower() in _EXTENSION_MAP:
183 files[_relative_name(path)] = path
184 return files
187def classify_file(path: Path) -> str | None:
188 """Classify file by extension. Returns content_type or None if unsupported."""
189 return _EXTENSION_MAP.get(path.suffix.lower())
192def kreuzberg_config(content_type: str) -> object:
193 """Build kreuzberg ExtractionConfig for a given content type."""
194 from kreuzberg import ChunkingConfig, ExtractionConfig, PageConfig
196 chunking = ChunkingConfig(
197 max_chars=cfg.chunk_size * _CHARS_PER_TOKEN,
198 max_overlap=cfg.chunk_overlap * _CHARS_PER_TOKEN,
199 )
201 if content_type == "pdf":
202 return ExtractionConfig(
203 chunking=chunking,
204 pages=PageConfig(extract_pages=True, insert_page_markers=False),
205 )
206 return ExtractionConfig(chunking=chunking, output_format="markdown")
209def kreuzberg_ocr_config() -> object:
210 """Build kreuzberg ExtractionConfig with Tesseract OCR enabled for scanned PDFs."""
211 from kreuzberg import ChunkingConfig, ExtractionConfig, OcrConfig, PageConfig
213 chunking = ChunkingConfig(
214 max_chars=cfg.chunk_size * _CHARS_PER_TOKEN,
215 max_overlap=cfg.chunk_overlap * _CHARS_PER_TOKEN,
216 )
217 return ExtractionConfig(
218 chunking=chunking,
219 pages=PageConfig(extract_pages=True, insert_page_markers=False),
220 ocr=OcrConfig(backend="tesseract"),
221 )
224async def _try_tesseract_ocr(path: Path, source_name: str, fallback: object) -> object:
225 """Attempt Tesseract OCR on a scanned PDF. Returns the OCR result or *fallback* on failure."""
226 try:
227 from kreuzberg import extract_file
229 log.info("PDF text extraction empty, trying Tesseract OCR: %s", source_name)
230 # Suppress Tesseract's "Detected N diacritics" stderr noise at the fd level
231 # (contextlib.redirect_stderr only catches Python's sys.stderr, not subprocess output)
232 old_stderr = os.dup(2)
233 devnull = os.open(os.devnull, os.O_WRONLY)
234 os.dup2(devnull, 2)
235 try:
236 return await extract_file(str(path), config=kreuzberg_ocr_config())
237 finally:
238 os.dup2(old_stderr, 2)
239 os.close(devnull)
240 os.close(old_stderr)
241 except Exception:
242 log.debug("Tesseract OCR unavailable or failed for %s, skipping", source_name)
243 return fallback
246async def _vision_fallback(
247 path: Path,
248 source_name: str,
249 content_type: str,
250 on_progress: DetailedProgressCallback = noop_callback,
251) -> list[ChunkRecord]:
252 """OCR a scanned PDF via vision model, chunk, and embed."""
253 from lilbee.vision import extract_pdf_vision
255 page_texts = await asyncio.to_thread(
256 extract_pdf_vision,
257 path,
258 cfg.vision_model,
259 quiet=cfg.json_mode,
260 timeout=cfg.vision_timeout,
261 on_progress=on_progress,
262 )
263 if not page_texts:
264 return []
266 all_chunks = [(page_num, chunk) for page_num, text in page_texts for chunk in chunk_text(text)]
267 if not all_chunks:
268 return []
270 texts = [c for _, c in all_chunks]
271 vectors = await asyncio.to_thread(
272 embedder.embed_batch, texts, source=source_name, on_progress=on_progress
273 )
274 return [
275 ChunkRecord(
276 source=source_name,
277 content_type=content_type,
278 page_start=page_num,
279 page_end=page_num,
280 line_start=0,
281 line_end=0,
282 chunk=text,
283 chunk_index=i,
284 vector=vec,
285 )
286 for i, ((page_num, text), vec) in enumerate(zip(all_chunks, vectors, strict=True))
287 ]
290async def ingest_document(
291 path: Path,
292 source_name: str,
293 content_type: str,
294 *,
295 force_vision: bool = False,
296 on_progress: DetailedProgressCallback = noop_callback,
297) -> list[ChunkRecord]:
298 """Extract and chunk a document via kreuzberg, embed, return records.
300 When *force_vision* is True (CLI ``--vision``) or a vision model is
301 configured, Tesseract OCR is skipped and we go straight to the vision
302 model for scanned PDFs.
303 """
304 from kreuzberg import extract_file
306 use_vision = force_vision or bool(cfg.vision_model)
308 config = kreuzberg_config(content_type)
309 result = await extract_file(str(path), config=config)
311 # Scanned PDF fallback chain: Tesseract OCR → vision model
312 if content_type == "pdf" and not _has_meaningful_text(result):
313 # When vision is explicitly enabled, skip Tesseract and go straight to vision
314 if not use_vision:
315 result = await _try_tesseract_ocr(path, source_name, result)
317 if not _has_meaningful_text(result):
318 if not cfg.vision_model:
319 log.warning(
320 "Skipped %s: Tesseract OCR produced no usable text. "
321 "For better results on complex scans, set a vision model "
322 "with /vision or LILBEE_VISION_MODEL.",
323 source_name,
324 )
325 return []
326 log.info("PDF text extraction empty, falling back to vision OCR: %s", source_name)
327 return await _vision_fallback(path, source_name, content_type, on_progress)
329 log.info(
330 "Scanned PDF detected — extracted with Tesseract OCR: %s. "
331 "For structured markdown output (tables, headings), re-add with --vision.",
332 source_name,
333 )
335 if not result.chunks:
336 return []
338 texts = [chunk.content for chunk in result.chunks]
339 vectors = await asyncio.to_thread(
340 embedder.embed_batch, texts, source=source_name, on_progress=on_progress
341 )
343 return [
344 ChunkRecord(
345 source=source_name,
346 content_type=content_type,
347 page_start=chunk.metadata.get("first_page") or 0,
348 page_end=chunk.metadata.get("last_page") or 0,
349 line_start=0,
350 line_end=0,
351 chunk=text,
352 chunk_index=chunk.metadata.get("chunk_index", idx),
353 vector=vec,
354 )
355 for idx, (chunk, text, vec) in enumerate(zip(result.chunks, texts, vectors, strict=True))
356 ]
359def ingest_code_sync(
360 path: Path,
361 source_name: str,
362 on_progress: DetailedProgressCallback = noop_callback,
363) -> list[ChunkRecord]:
364 """Parse code with tree-sitter, chunk, embed, and return store-ready records."""
365 code_chunks: list[CodeChunk] = chunk_code(path)
366 if not code_chunks:
367 return []
369 texts = [cc.chunk for cc in code_chunks]
370 vectors = embedder.embed_batch(texts, source=source_name, on_progress=on_progress)
372 return [
373 ChunkRecord(
374 source=source_name,
375 content_type="code",
376 page_start=0,
377 page_end=0,
378 line_start=cc.line_start,
379 line_end=cc.line_end,
380 chunk=cc.chunk,
381 chunk_index=cc.chunk_index,
382 vector=vec,
383 )
384 for cc, vec in zip(code_chunks, vectors, strict=True)
385 ]
388async def ingest_structured(
389 path: Path,
390 source_name: str,
391 content_type: str,
392 on_progress: DetailedProgressCallback = noop_callback,
393) -> list[ChunkRecord]:
394 """Preprocess a structured file, chunk, embed, and return store-ready records."""
395 preprocessor = _PREPROCESSORS[content_type]
396 text = await asyncio.to_thread(preprocessor, path)
397 if not text.strip():
398 return []
399 texts = chunk_text(text)
400 if not texts:
401 return []
402 vectors = await asyncio.to_thread(
403 embedder.embed_batch, texts, source=source_name, on_progress=on_progress
404 )
405 return [
406 ChunkRecord(
407 source=source_name,
408 content_type=content_type,
409 page_start=0,
410 page_end=0,
411 line_start=0,
412 line_end=0,
413 chunk=text,
414 chunk_index=idx,
415 vector=vec,
416 )
417 for idx, (text, vec) in enumerate(zip(texts, vectors, strict=True))
418 ]
421async def _ingest_file(
422 path: Path,
423 source_name: str,
424 content_type: str,
425 *,
426 force_vision: bool = False,
427 on_progress: DetailedProgressCallback = noop_callback,
428) -> int:
429 """Ingest a single file. Returns chunk count."""
430 records: list[ChunkRecord]
431 if content_type == "code":
432 records = await asyncio.to_thread(ingest_code_sync, path, source_name, on_progress)
433 elif content_type in _PREPROCESSORS:
434 records = await ingest_structured(path, source_name, content_type, on_progress)
435 else:
436 records = await ingest_document(
437 path, source_name, content_type, force_vision=force_vision, on_progress=on_progress
438 )
439 return await asyncio.to_thread(store.add_chunks, cast(list[dict], records))
442async def sync(
443 force_rebuild: bool = False,
444 quiet: bool = False,
445 *,
446 force_vision: bool = False,
447 on_progress: DetailedProgressCallback = noop_callback,
448) -> SyncResult:
449 """Sync documents/ with the vector store.
451 Returns summary dict with keys: added, updated, removed, unchanged, failed.
452 When *quiet* is True, the Rich progress bar is suppressed (for JSON output).
453 """
454 if force_rebuild:
455 store.drop_all()
457 cfg.documents_dir.mkdir(parents=True, exist_ok=True)
459 disk_files = discover_files()
460 existing_sources = {s["filename"]: s["file_hash"] for s in store.get_sources()}
462 added: list[str] = []
463 updated: list[str] = []
464 removed: list[str] = []
465 unchanged = 0
466 failed: list[str] = []
468 # Find files to remove (in DB but not on disk)
469 for name in existing_sources:
470 if name not in disk_files:
471 store.delete_by_source(name)
472 store.delete_source(name)
473 removed.append(name)
475 # Process files on disk
476 files_to_process: list[tuple[str, Path, str]] = [] # (name, path, content_type)
478 for name, path in sorted(disk_files.items()):
479 content_type = classify_file(path)
480 assert content_type is not None, f"Unsupported file slipped through discovery: {name}"
482 current_hash = file_hash(path)
483 old_hash = existing_sources.get(name)
485 if old_hash == current_hash:
486 unchanged += 1
487 continue
489 if old_hash is not None:
490 # Modified — remove old data
491 store.delete_by_source(name)
492 store.delete_source(name)
493 files_to_process.append((name, path, content_type))
494 updated.append(name)
495 else:
496 files_to_process.append((name, path, content_type))
497 added.append(name)
499 # Ingest files (with optional progress bar)
500 if files_to_process:
501 embedder.validate_model()
502 await ingest_batch(
503 files_to_process,
504 added,
505 updated,
506 failed,
507 quiet=quiet,
508 force_vision=force_vision,
509 on_progress=on_progress,
510 )
512 result = SyncResult(
513 added=added,
514 updated=updated,
515 removed=removed,
516 unchanged=unchanged,
517 failed=failed,
518 )
519 on_progress(
520 EventType.DONE,
521 SyncDoneEvent(
522 added=len(result.added),
523 updated=len(result.updated),
524 removed=len(result.removed),
525 failed=len(result.failed),
526 ).model_dump(),
527 )
528 return result
531# Limit concurrent ingestion to avoid overwhelming I/O
532_MAX_CONCURRENT = os.cpu_count() or 4
535async def ingest_batch(
536 files_to_process: list[tuple[str, Path, str]],
537 added: list[str],
538 updated: list[str],
539 failed: list[str],
540 *,
541 quiet: bool = False,
542 force_vision: bool = False,
543 on_progress: DetailedProgressCallback = noop_callback,
544) -> None:
545 """Ingest a batch of files, optionally showing a Rich progress bar."""
546 semaphore = asyncio.Semaphore(_MAX_CONCURRENT)
547 total_files = len(files_to_process)
549 async def _process_one(
550 name: str, path: Path, content_type: str, file_index: int
551 ) -> _IngestResult:
552 async with semaphore:
553 on_progress(
554 EventType.FILE_START,
555 FileStartEvent(
556 file=name, total_files=total_files, current_file=file_index
557 ).model_dump(),
558 )
559 try:
560 chunk_count = await _ingest_file(
561 path, name, content_type, force_vision=force_vision, on_progress=on_progress
562 )
563 on_progress(
564 EventType.FILE_DONE,
565 FileDoneEvent(file=name, status="ok", chunks=chunk_count).model_dump(),
566 )
567 return _IngestResult(name, path, chunk_count, error=None)
568 except asyncio.CancelledError:
569 raise
570 except Exception as exc:
571 on_progress(
572 EventType.FILE_DONE,
573 FileDoneEvent(file=name, status="error", chunks=0).model_dump(),
574 )
575 return _IngestResult(name, path, 0, error=exc)
577 tasks = [
578 asyncio.ensure_future(_process_one(name, path, ct, idx))
579 for idx, (name, path, ct) in enumerate(files_to_process, 1)
580 ]
582 if quiet:
583 await _collect_results(tasks, added, updated, failed, on_progress=on_progress)
584 else:
585 await _collect_results_with_progress(tasks, added, updated, failed, on_progress=on_progress)
588async def _collect_results(
589 tasks: list[asyncio.Task[_IngestResult]],
590 added: list[str],
591 updated: list[str],
592 failed: list[str],
593 *,
594 on_progress: DetailedProgressCallback = noop_callback,
595) -> None:
596 """Collect task results without progress display."""
597 for completed_count, fut in enumerate(asyncio.as_completed(tasks), 1):
598 result = await fut
599 _apply_result(result, added, updated, failed)
600 progress_status = "failed" if result.error is not None else "ingested"
601 on_progress(
602 EventType.BATCH_PROGRESS,
603 BatchProgressEvent(
604 file=result.name,
605 status=progress_status,
606 current=completed_count,
607 total=len(tasks),
608 ).model_dump(),
609 )
612async def _collect_results_with_progress(
613 tasks: list[asyncio.Task[_IngestResult]],
614 added: list[str],
615 updated: list[str],
616 failed: list[str],
617 *,
618 on_progress: DetailedProgressCallback = noop_callback,
619) -> None:
620 """Collect task results with Rich progress bar."""
621 with Progress(
622 SpinnerColumn(),
623 TextColumn("{task.description}"),
624 transient=True,
625 ) as progress:
626 ptask = progress.add_task("Ingesting documents...", total=len(tasks))
627 for completed_count, fut in enumerate(asyncio.as_completed(tasks), 1):
628 result = await fut
629 _apply_result(result, added, updated, failed)
630 desc = f"Ingested {result.name}" if result.error is None else f"Failed {result.name}"
631 progress.update(ptask, description=desc)
632 progress.advance(ptask)
633 progress_status = "failed" if result.error is not None else "ingested"
634 on_progress(
635 EventType.BATCH_PROGRESS,
636 BatchProgressEvent(
637 file=result.name,
638 status=progress_status,
639 current=completed_count,
640 total=len(tasks),
641 ).model_dump(),
642 )
645def _apply_result(
646 result: _IngestResult,
647 added: list[str],
648 updated: list[str],
649 failed: list[str],
650) -> None:
651 """Record an ingestion result — update store on success, track failure."""
652 if result.error is not None:
653 log.exception("Failed to ingest %s", result.name, exc_info=result.error)
654 if result.name in added:
655 added.remove(result.name)
656 if result.name in updated:
657 updated.remove(result.name)
658 failed.append(result.name)
659 return
660 if result.chunk_count == 0:
661 # No chunks produced (e.g. scanned PDF without vision model).
662 # Don't record as a source so it gets retried on next sync.
663 if result.name in added:
664 added.remove(result.name)
665 if result.name in updated:
666 updated.remove(result.name)
667 return
668 store.upsert_source(result.name, file_hash(result.path), result.chunk_count)