Coverage for src / lilbee / preprocessors.py: 100%
85 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 08:27 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 08:27 +0000
1"""Format-specific preprocessors for structured data files.
3Convert structured formats (XML, JSON, CSV) into readable prose
4that embeds well for vector search. Each preprocessor takes a Path
5and returns a string of human-readable text.
6"""
8import csv
9import json
10import logging
11import xml.etree.ElementTree as ET
12from collections.abc import Iterator
13from pathlib import Path
14from typing import Any
16log = logging.getLogger(__name__)
19def preprocess_xml(path: Path) -> str:
20 """Convert XML to readable prose using element tags as labels."""
21 try:
22 tree = ET.parse(path)
23 except ET.ParseError:
24 log.warning("Malformed XML, falling back to raw text: %s", path)
25 return path.read_text(encoding="utf-8", errors="replace")
26 return _walk_element(tree.getroot(), depth=0).strip()
29def _walk_element(elem: ET.Element, depth: int) -> str:
30 """Recursively convert an XML element tree to readable text."""
31 parts: list[str] = []
32 tag = elem.tag
33 attrs = " ".join(f"{k}: {v}" for k, v in elem.attrib.items() if not k.startswith("{"))
34 label = f"{tag} ({attrs})" if attrs else tag
36 if depth == 0:
37 parts.append(f"{label}\n")
38 elif depth == 1:
39 parts.append(f"\n{label}\n")
40 else:
41 indent = " " * (depth - 1)
42 parts.append(f"{indent}{label}\n")
44 text = (elem.text or "").strip()
45 if text:
46 indent = " " * max(depth, 1)
47 parts.append(f"{indent}{text}\n")
49 for child in elem:
50 parts.append(_walk_element(child, depth + 1))
52 tail = (elem.tail or "").strip()
53 if tail:
54 indent = " " * max(depth - 1, 0)
55 parts.append(f"{indent}{tail}\n")
57 return "".join(parts)
60def _flatten_tree(data: Any, prefix: str = "", _top: bool = True) -> Iterator[str]:
61 """Walk nested dicts/lists, yielding 'dotted.path: value' lines."""
62 if isinstance(data, dict):
63 for i, (key, val) in enumerate(data.items()):
64 path = f"{prefix}.{key}" if prefix else key
65 if _top and i > 0:
66 yield ""
67 yield from _flatten_tree(val, path, _top=False)
68 elif isinstance(data, list):
69 for i, val in enumerate(data):
70 path = f"{prefix}[{i}]"
71 yield from _flatten_tree(val, path, _top=False)
72 else:
73 yield f"{prefix}: {data}"
76def preprocess_csv(path: Path) -> str:
77 """Convert CSV/TSV to readable 'Header: Value' per row."""
78 delimiter = "\t" if path.suffix == ".tsv" else ","
79 text = path.read_text(encoding="utf-8", errors="replace")
80 if not text.strip():
81 return ""
82 reader = csv.DictReader(text.splitlines(), delimiter=delimiter)
83 sections: list[str] = []
84 for i, row in enumerate(reader, 1):
85 lines = [f"Row {i}:"]
86 for header, value in row.items():
87 if header and value and value.strip():
88 lines.append(f" {header}: {value}")
89 if len(lines) > 1:
90 sections.append("\n".join(lines))
91 return "\n\n".join(sections)
94def preprocess_json(path: Path) -> str:
95 """Convert JSON/JSONL to readable 'dotted.path: value' lines."""
96 text = path.read_text(encoding="utf-8", errors="replace")
98 if path.suffix == ".jsonl":
99 sections: list[str] = []
100 for line in text.splitlines():
101 line = line.strip()
102 if not line:
103 continue
104 try:
105 obj = json.loads(line)
106 except json.JSONDecodeError:
107 sections.append(line)
108 continue
109 sections.append("\n".join(_flatten_tree(obj)))
110 return "\n\n".join(sections)
112 try:
113 data = json.loads(text)
114 except json.JSONDecodeError:
115 log.warning("Malformed JSON, falling back to raw text: %s", path)
116 return text
117 return "\n".join(_flatten_tree(data))