Coverage for src / lilbee / cli / sync.py: 100%

85 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-29 19:16 +0000

1"""Background sync, executor management, and sync status for chat mode.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6from concurrent.futures import Future, ThreadPoolExecutor 

7from typing import TYPE_CHECKING 

8 

9from rich.console import Console 

10 

11from lilbee.cli import theme 

12from lilbee.ingest import sync 

13from lilbee.progress import EventType, ExtractEvent, FileStartEvent, ProgressEvent, SyncDoneEvent 

14 

15if TYPE_CHECKING: 

16 from lilbee.progress import DetailedProgressCallback 

17 

18 

19def _format_sync_summary(added: int, updated: int, removed: int, failed: int) -> str | None: 

20 """Format sync counts into a human-readable summary, or None if nothing changed.""" 

21 counts = {"added": added, "updated": updated, "removed": removed, "failed": failed} 

22 parts = [f"{n} {label}" for label, n in counts.items() if n] 

23 return ", ".join(parts) if parts else None 

24 

25 

26def _sync_progress_printer(con: Console) -> DetailedProgressCallback: 

27 """Return a callback that prints one-line status for FILE_START and DONE events.""" 

28 

29 def _callback(event_type: EventType, data: ProgressEvent) -> None: 

30 if event_type == EventType.FILE_START: 

31 if not isinstance(data, FileStartEvent): 

32 raise TypeError(f"Expected FileStartEvent, got {type(data).__name__}") 

33 m = theme.MUTED 

34 con.print(f"[{m}]Syncing [{data.current_file}/{data.total_files}]: {data.file}[/{m}]") 

35 elif event_type == EventType.DONE: 

36 if not isinstance(data, SyncDoneEvent): 

37 raise TypeError(f"Expected SyncDoneEvent, got {type(data).__name__}") 

38 summary = _format_sync_summary(data.added, data.updated, data.removed, data.failed) 

39 if summary: 

40 con.print(f"[{theme.MUTED}]Synced: {summary}[/{theme.MUTED}]") 

41 

42 return _callback 

43 

44 

45_bg_executor: ThreadPoolExecutor | None = None 

46 

47 

48def _get_executor() -> ThreadPoolExecutor: 

49 """Lazy-init a single-worker executor.""" 

50 global _bg_executor 

51 if _bg_executor is None: 

52 _bg_executor = ThreadPoolExecutor(max_workers=1) 

53 return _bg_executor 

54 

55 

56def shutdown_executor() -> None: 

57 """Shut down the background executor without blocking. 

58 Uses wait=False + cancel_futures to avoid blocking the main thread. 

59 """ 

60 global _bg_executor 

61 if _bg_executor is None: 

62 return 

63 

64 _bg_executor.shutdown(wait=False, cancel_futures=True) 

65 _bg_executor = None 

66 

67 

68def _on_sync_done(con: Console, future: Future[object], *, chat_mode: bool = False) -> None: 

69 """Callback attached to background sync futures — logs errors.""" 

70 exc = future.exception() 

71 if exc is None: 

72 return 

73 if isinstance(exc, asyncio.CancelledError): 

74 return 

75 if isinstance(exc, RuntimeError) and "cannot schedule new futures" in str(exc): 

76 return 

77 if chat_mode: 

78 print(f"Background sync error: {exc}") 

79 else: 

80 con.print(f"[{theme.ERROR}]Background sync error:[/{theme.ERROR}] {exc}") 

81 

82 

83class SyncStatus: 

84 """Thread-safe holder for background sync status text. 

85 The background sync callback writes here; prompt_toolkit's 

86 ``bottom_toolbar`` reads it on every render cycle — no cursor 

87 manipulation, no flickering. 

88 """ 

89 

90 def __init__(self) -> None: 

91 self.text: str = "" 

92 self.pending: int = 0 

93 

94 def clear(self) -> None: 

95 self.text = "" 

96 

97 

98def _chat_sync_callback(status: SyncStatus) -> DetailedProgressCallback: 

99 """Return a progress callback for chat-mode background sync. 

100 FILE_START updates *status.text* (rendered by prompt_toolkit's bottom 

101 toolbar). On DONE the status is cleared and the summary is printed via 

102 ``print()`` (goes through StdoutProxy → appears above the prompt). 

103 """ 

104 status.clear() 

105 

106 def _callback(event_type: EventType, data: ProgressEvent) -> None: 

107 queue_suffix = f" (+{status.pending} queued)" if status.pending > 0 else "" 

108 if event_type == EventType.FILE_START: 

109 if not isinstance(data, FileStartEvent): 

110 raise TypeError(f"Expected FileStartEvent, got {type(data).__name__}") 

111 status.text = ( 

112 f"⟳ Syncing [{data.current_file}/{data.total_files}]: {data.file}{queue_suffix}" 

113 ) 

114 elif event_type == EventType.EXTRACT: 

115 if not isinstance(data, ExtractEvent): 

116 raise TypeError(f"Expected ExtractEvent, got {type(data).__name__}") 

117 status.text = ( 

118 f"⟳ Vision OCR [{data.page}/{data.total_pages}]: {data.file}{queue_suffix}" 

119 ) 

120 elif event_type == EventType.DONE: 

121 status.clear() 

122 if not isinstance(data, SyncDoneEvent): 

123 raise TypeError(f"Expected SyncDoneEvent, got {type(data).__name__}") 

124 summary = _format_sync_summary(data.added, data.updated, data.removed, data.failed) 

125 if summary: 

126 print(f"✓ Synced: {summary}") 

127 

128 return _callback 

129 

130 

131def run_sync_background( 

132 con: Console, 

133 *, 

134 chat_mode: bool = False, 

135 sync_status: SyncStatus | None = None, 

136) -> Future[object]: 

137 """Submit sync to a background thread. Returns the Future.""" 

138 status = sync_status or SyncStatus() 

139 

140 callback = _chat_sync_callback(status) if chat_mode else _sync_progress_printer(con) 

141 

142 def _run() -> object: 

143 if chat_mode: 

144 status.pending -= 1 

145 return asyncio.run(sync(quiet=True, on_progress=callback)) 

146 

147 if chat_mode: 

148 status.pending += 1 

149 

150 future = _get_executor().submit(_run) 

151 future.add_done_callback(lambda f: _on_sync_done(con, f, chat_mode=chat_mode)) 

152 return future