Coverage for src / lilbee / cli / sync.py: 100%
85 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
1"""Background sync, executor management, and sync status for chat mode."""
3from __future__ import annotations
5import asyncio
6from concurrent.futures import Future, ThreadPoolExecutor
7from typing import TYPE_CHECKING
9from rich.console import Console
11from lilbee.cli import theme
12from lilbee.ingest import sync
13from lilbee.progress import EventType, ExtractEvent, FileStartEvent, ProgressEvent, SyncDoneEvent
15if TYPE_CHECKING:
16 from lilbee.progress import DetailedProgressCallback
19def _format_sync_summary(added: int, updated: int, removed: int, failed: int) -> str | None:
20 """Format sync counts into a human-readable summary, or None if nothing changed."""
21 counts = {"added": added, "updated": updated, "removed": removed, "failed": failed}
22 parts = [f"{n} {label}" for label, n in counts.items() if n]
23 return ", ".join(parts) if parts else None
26def _sync_progress_printer(con: Console) -> DetailedProgressCallback:
27 """Return a callback that prints one-line status for FILE_START and DONE events."""
29 def _callback(event_type: EventType, data: ProgressEvent) -> None:
30 if event_type == EventType.FILE_START:
31 if not isinstance(data, FileStartEvent):
32 raise TypeError(f"Expected FileStartEvent, got {type(data).__name__}")
33 m = theme.MUTED
34 con.print(f"[{m}]Syncing [{data.current_file}/{data.total_files}]: {data.file}[/{m}]")
35 elif event_type == EventType.DONE:
36 if not isinstance(data, SyncDoneEvent):
37 raise TypeError(f"Expected SyncDoneEvent, got {type(data).__name__}")
38 summary = _format_sync_summary(data.added, data.updated, data.removed, data.failed)
39 if summary:
40 con.print(f"[{theme.MUTED}]Synced: {summary}[/{theme.MUTED}]")
42 return _callback
45_bg_executor: ThreadPoolExecutor | None = None
48def _get_executor() -> ThreadPoolExecutor:
49 """Lazy-init a single-worker executor."""
50 global _bg_executor
51 if _bg_executor is None:
52 _bg_executor = ThreadPoolExecutor(max_workers=1)
53 return _bg_executor
56def shutdown_executor() -> None:
57 """Shut down the background executor without blocking.
58 Uses wait=False + cancel_futures to avoid blocking the main thread.
59 """
60 global _bg_executor
61 if _bg_executor is None:
62 return
64 _bg_executor.shutdown(wait=False, cancel_futures=True)
65 _bg_executor = None
68def _on_sync_done(con: Console, future: Future[object], *, chat_mode: bool = False) -> None:
69 """Callback attached to background sync futures — logs errors."""
70 exc = future.exception()
71 if exc is None:
72 return
73 if isinstance(exc, asyncio.CancelledError):
74 return
75 if isinstance(exc, RuntimeError) and "cannot schedule new futures" in str(exc):
76 return
77 if chat_mode:
78 print(f"Background sync error: {exc}")
79 else:
80 con.print(f"[{theme.ERROR}]Background sync error:[/{theme.ERROR}] {exc}")
83class SyncStatus:
84 """Thread-safe holder for background sync status text.
85 The background sync callback writes here; prompt_toolkit's
86 ``bottom_toolbar`` reads it on every render cycle — no cursor
87 manipulation, no flickering.
88 """
90 def __init__(self) -> None:
91 self.text: str = ""
92 self.pending: int = 0
94 def clear(self) -> None:
95 self.text = ""
98def _chat_sync_callback(status: SyncStatus) -> DetailedProgressCallback:
99 """Return a progress callback for chat-mode background sync.
100 FILE_START updates *status.text* (rendered by prompt_toolkit's bottom
101 toolbar). On DONE the status is cleared and the summary is printed via
102 ``print()`` (goes through StdoutProxy → appears above the prompt).
103 """
104 status.clear()
106 def _callback(event_type: EventType, data: ProgressEvent) -> None:
107 queue_suffix = f" (+{status.pending} queued)" if status.pending > 0 else ""
108 if event_type == EventType.FILE_START:
109 if not isinstance(data, FileStartEvent):
110 raise TypeError(f"Expected FileStartEvent, got {type(data).__name__}")
111 status.text = (
112 f"⟳ Syncing [{data.current_file}/{data.total_files}]: {data.file}{queue_suffix}"
113 )
114 elif event_type == EventType.EXTRACT:
115 if not isinstance(data, ExtractEvent):
116 raise TypeError(f"Expected ExtractEvent, got {type(data).__name__}")
117 status.text = (
118 f"⟳ Vision OCR [{data.page}/{data.total_pages}]: {data.file}{queue_suffix}"
119 )
120 elif event_type == EventType.DONE:
121 status.clear()
122 if not isinstance(data, SyncDoneEvent):
123 raise TypeError(f"Expected SyncDoneEvent, got {type(data).__name__}")
124 summary = _format_sync_summary(data.added, data.updated, data.removed, data.failed)
125 if summary:
126 print(f"✓ Synced: {summary}")
128 return _callback
131def run_sync_background(
132 con: Console,
133 *,
134 chat_mode: bool = False,
135 sync_status: SyncStatus | None = None,
136) -> Future[object]:
137 """Submit sync to a background thread. Returns the Future."""
138 status = sync_status or SyncStatus()
140 callback = _chat_sync_callback(status) if chat_mode else _sync_progress_printer(con)
142 def _run() -> object:
143 if chat_mode:
144 status.pending -= 1
145 return asyncio.run(sync(quiet=True, on_progress=callback))
147 if chat_mode:
148 status.pending += 1
150 future = _get_executor().submit(_run)
151 future.add_done_callback(lambda f: _on_sync_done(con, f, chat_mode=chat_mode))
152 return future