Coverage for src / lilbee / preprocessors.py: 100%

85 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 08:27 +0000

1"""Format-specific preprocessors for structured data files. 

2 

3Convert structured formats (XML, JSON, CSV) into readable prose 

4that embeds well for vector search. Each preprocessor takes a Path 

5and returns a string of human-readable text. 

6""" 

7 

8import csv 

9import json 

10import logging 

11import xml.etree.ElementTree as ET 

12from collections.abc import Iterator 

13from pathlib import Path 

14from typing import Any 

15 

16log = logging.getLogger(__name__) 

17 

18 

19def preprocess_xml(path: Path) -> str: 

20 """Convert XML to readable prose using element tags as labels.""" 

21 try: 

22 tree = ET.parse(path) 

23 except ET.ParseError: 

24 log.warning("Malformed XML, falling back to raw text: %s", path) 

25 return path.read_text(encoding="utf-8", errors="replace") 

26 return _walk_element(tree.getroot(), depth=0).strip() 

27 

28 

29def _walk_element(elem: ET.Element, depth: int) -> str: 

30 """Recursively convert an XML element tree to readable text.""" 

31 parts: list[str] = [] 

32 tag = elem.tag 

33 attrs = " ".join(f"{k}: {v}" for k, v in elem.attrib.items() if not k.startswith("{")) 

34 label = f"{tag} ({attrs})" if attrs else tag 

35 

36 if depth == 0: 

37 parts.append(f"{label}\n") 

38 elif depth == 1: 

39 parts.append(f"\n{label}\n") 

40 else: 

41 indent = " " * (depth - 1) 

42 parts.append(f"{indent}{label}\n") 

43 

44 text = (elem.text or "").strip() 

45 if text: 

46 indent = " " * max(depth, 1) 

47 parts.append(f"{indent}{text}\n") 

48 

49 for child in elem: 

50 parts.append(_walk_element(child, depth + 1)) 

51 

52 tail = (elem.tail or "").strip() 

53 if tail: 

54 indent = " " * max(depth - 1, 0) 

55 parts.append(f"{indent}{tail}\n") 

56 

57 return "".join(parts) 

58 

59 

60def _flatten_tree(data: Any, prefix: str = "", _top: bool = True) -> Iterator[str]: 

61 """Walk nested dicts/lists, yielding 'dotted.path: value' lines.""" 

62 if isinstance(data, dict): 

63 for i, (key, val) in enumerate(data.items()): 

64 path = f"{prefix}.{key}" if prefix else key 

65 if _top and i > 0: 

66 yield "" 

67 yield from _flatten_tree(val, path, _top=False) 

68 elif isinstance(data, list): 

69 for i, val in enumerate(data): 

70 path = f"{prefix}[{i}]" 

71 yield from _flatten_tree(val, path, _top=False) 

72 else: 

73 yield f"{prefix}: {data}" 

74 

75 

76def preprocess_csv(path: Path) -> str: 

77 """Convert CSV/TSV to readable 'Header: Value' per row.""" 

78 delimiter = "\t" if path.suffix == ".tsv" else "," 

79 text = path.read_text(encoding="utf-8", errors="replace") 

80 if not text.strip(): 

81 return "" 

82 reader = csv.DictReader(text.splitlines(), delimiter=delimiter) 

83 sections: list[str] = [] 

84 for i, row in enumerate(reader, 1): 

85 lines = [f"Row {i}:"] 

86 for header, value in row.items(): 

87 if header and value and value.strip(): 

88 lines.append(f" {header}: {value}") 

89 if len(lines) > 1: 

90 sections.append("\n".join(lines)) 

91 return "\n\n".join(sections) 

92 

93 

94def preprocess_json(path: Path) -> str: 

95 """Convert JSON/JSONL to readable 'dotted.path: value' lines.""" 

96 text = path.read_text(encoding="utf-8", errors="replace") 

97 

98 if path.suffix == ".jsonl": 

99 sections: list[str] = [] 

100 for line in text.splitlines(): 

101 line = line.strip() 

102 if not line: 

103 continue 

104 try: 

105 obj = json.loads(line) 

106 except json.JSONDecodeError: 

107 sections.append(line) 

108 continue 

109 sections.append("\n".join(_flatten_tree(obj))) 

110 return "\n\n".join(sections) 

111 

112 try: 

113 data = json.loads(text) 

114 except json.JSONDecodeError: 

115 log.warning("Malformed JSON, falling back to raw text: %s", path) 

116 return text 

117 return "\n".join(_flatten_tree(data))