Coverage for src / lilbee / chunker.py: 100%

65 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 08:27 +0000

1"""Token-based recursive text chunking (used by code_chunker fallback).""" 

2 

3import tiktoken 

4 

5from lilbee.config import cfg 

6 

7_enc = tiktoken.get_encoding("cl100k_base") 

8 

9# Separators tried in order from coarsest to finest 

10_SEPARATORS = ("\n\n", ". ", " ") 

11 

12 

13def _token_len(text: str) -> int: 

14 return len(_enc.encode(text)) 

15 

16 

17def _split_nonempty(text: str, sep: str) -> list[str]: 

18 """Split text on separator, dropping empty/whitespace-only parts.""" 

19 return [p for p in text.split(sep) if p.strip()] 

20 

21 

22def _split_to_segments(text: str, max_tokens: int) -> list[str]: 

23 """Recursively split text into segments within max_tokens. 

24 

25 Tries separators coarsest-first: paragraphs, sentences, words. 

26 """ 

27 if _token_len(text) <= max_tokens: 

28 return [text] 

29 

30 for sep in _SEPARATORS: 

31 parts = _split_nonempty(text, sep) 

32 if len(parts) > 1: 

33 return [seg for part in parts for seg in _split_to_segments(part, max_tokens)] 

34 

35 return hard_split_words(text, max_tokens) 

36 

37 

38def hard_split_words(text: str, max_tokens: int) -> list[str]: 

39 """Last-resort split by individual words.""" 

40 words = text.split() 

41 segments: list[str] = [] 

42 buf: list[str] = [] 

43 buf_tokens = 0 

44 

45 for word in words: 

46 wt = _token_len(word + " ") 

47 if buf_tokens + wt > max_tokens and buf: 

48 segments.append(" ".join(buf)) 

49 buf, buf_tokens = [], 0 

50 buf.append(word) 

51 buf_tokens += wt 

52 

53 if buf: 

54 segments.append(" ".join(buf)) 

55 return segments 

56 

57 

58def _tail_overlap(segments: list[str], max_tokens: int) -> list[str]: 

59 """Take trailing segments that fit within the overlap token budget.""" 

60 result: list[str] = [] 

61 tokens = 0 

62 for seg in reversed(segments): 

63 seg_t = _token_len(seg) 

64 if tokens + seg_t > max_tokens: 

65 break 

66 result.insert(0, seg) 

67 tokens += seg_t 

68 return result 

69 

70 

71def chunk_text( 

72 text: str, 

73 chunk_size: int | None = None, 

74 chunk_overlap: int | None = None, 

75) -> list[str]: 

76 """Split text into overlapping token-sized chunks. 

77 

78 Strategy: recursively split on paragraph/sentence/word boundaries, 

79 then merge segments into target-sized chunks with overlap. 

80 """ 

81 if chunk_size is None: 

82 chunk_size = cfg.chunk_size 

83 if chunk_overlap is None: 

84 chunk_overlap = cfg.chunk_overlap 

85 if not text or not text.strip(): 

86 return [] 

87 

88 segments = _split_to_segments(text, chunk_size) 

89 if not segments: 

90 return [] 

91 

92 chunks: list[str] = [] 

93 pending_segments: list[str] = [] 

94 pending_tokens = 0 

95 

96 for seg in segments: 

97 seg_t = _token_len(seg) 

98 if pending_tokens + seg_t > chunk_size and pending_segments: 

99 chunks.append("\n\n".join(pending_segments)) 

100 pending_segments = _tail_overlap(pending_segments, chunk_overlap) 

101 pending_tokens = sum(_token_len(s) for s in pending_segments) 

102 pending_segments.append(seg) 

103 pending_tokens += seg_t 

104 

105 if pending_segments: 

106 chunks.append("\n\n".join(pending_segments)) 

107 

108 return chunks