Coverage for src / lilbee / chunker.py: 100%
65 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 08:27 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 08:27 +0000
1"""Token-based recursive text chunking (used by code_chunker fallback)."""
3import tiktoken
5from lilbee.config import cfg
7_enc = tiktoken.get_encoding("cl100k_base")
9# Separators tried in order from coarsest to finest
10_SEPARATORS = ("\n\n", ". ", " ")
13def _token_len(text: str) -> int:
14 return len(_enc.encode(text))
17def _split_nonempty(text: str, sep: str) -> list[str]:
18 """Split text on separator, dropping empty/whitespace-only parts."""
19 return [p for p in text.split(sep) if p.strip()]
22def _split_to_segments(text: str, max_tokens: int) -> list[str]:
23 """Recursively split text into segments within max_tokens.
25 Tries separators coarsest-first: paragraphs, sentences, words.
26 """
27 if _token_len(text) <= max_tokens:
28 return [text]
30 for sep in _SEPARATORS:
31 parts = _split_nonempty(text, sep)
32 if len(parts) > 1:
33 return [seg for part in parts for seg in _split_to_segments(part, max_tokens)]
35 return hard_split_words(text, max_tokens)
38def hard_split_words(text: str, max_tokens: int) -> list[str]:
39 """Last-resort split by individual words."""
40 words = text.split()
41 segments: list[str] = []
42 buf: list[str] = []
43 buf_tokens = 0
45 for word in words:
46 wt = _token_len(word + " ")
47 if buf_tokens + wt > max_tokens and buf:
48 segments.append(" ".join(buf))
49 buf, buf_tokens = [], 0
50 buf.append(word)
51 buf_tokens += wt
53 if buf:
54 segments.append(" ".join(buf))
55 return segments
58def _tail_overlap(segments: list[str], max_tokens: int) -> list[str]:
59 """Take trailing segments that fit within the overlap token budget."""
60 result: list[str] = []
61 tokens = 0
62 for seg in reversed(segments):
63 seg_t = _token_len(seg)
64 if tokens + seg_t > max_tokens:
65 break
66 result.insert(0, seg)
67 tokens += seg_t
68 return result
71def chunk_text(
72 text: str,
73 chunk_size: int | None = None,
74 chunk_overlap: int | None = None,
75) -> list[str]:
76 """Split text into overlapping token-sized chunks.
78 Strategy: recursively split on paragraph/sentence/word boundaries,
79 then merge segments into target-sized chunks with overlap.
80 """
81 if chunk_size is None:
82 chunk_size = cfg.chunk_size
83 if chunk_overlap is None:
84 chunk_overlap = cfg.chunk_overlap
85 if not text or not text.strip():
86 return []
88 segments = _split_to_segments(text, chunk_size)
89 if not segments:
90 return []
92 chunks: list[str] = []
93 pending_segments: list[str] = []
94 pending_tokens = 0
96 for seg in segments:
97 seg_t = _token_len(seg)
98 if pending_tokens + seg_t > chunk_size and pending_segments:
99 chunks.append("\n\n".join(pending_segments))
100 pending_segments = _tail_overlap(pending_segments, chunk_overlap)
101 pending_tokens = sum(_token_len(s) for s in pending_segments)
102 pending_segments.append(seg)
103 pending_tokens += seg_t
105 if pending_segments:
106 chunks.append("\n\n".join(pending_segments))
108 return chunks