Coverage for src / lilbee / _splash_runner.py: 100%
101 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
1"""Standalone splash animation process — zero lilbee imports, stdlib only.
3Launched as a subprocess by ``splash.start()``. Reads a pipe fd from argv
4and animates until the pipe signals EOF (parent closed its write end, or
5parent died). This guarantees no orphan/zombie animation processes.
6"""
8from __future__ import annotations
10import contextlib
11import os
12import select
13import signal
14import sys
15import time
17HIDE_CURSOR = "\033[?25l"
18SHOW_CURSOR = "\033[?25h"
19CLEAR_LINE = "\033[2K"
20MOVE_UP = "\033[A"
22AMBER_BRIGHT = "\033[38;5;214m"
23AMBER_MID = "\033[38;5;172m"
24AMBER_DIM = "\033[38;5;94m"
25RESET = "\033[0m"
27FRAME_INTERVAL = 0.15
28STARTUP_DELAY = 0.08
29POLL_INTERVAL = 0.01
31BEE_LINES = [
32 " ",
33 "@@@ @@@ @@@ @@@@@@@ @@@@@@@@ @@@@@@@@ ",
34 "@@@ @@@ @@@ @@@@@@@@ @@@@@@@@ @@@@@@@@ ",
35 "@@@ @@@ @@@ @@! @@@ @@! @@! ",
36 "@! !@! !@! !@ @!@ !@! !@! ",
37 "@!! !!@ @!! @!@!@!@ @!!!:! @!!!:! ",
38 "!!! !!! !!! !!!@!!!! !!!!!: !!!!!: ",
39 "!!: !!: !!: !!: !!! !!: !!: ",
40 " :!: :!: :!: :!: !:! :!: :!: ",
41 " :: :::: :: :: :::: :: :::: :: :::: :: :::: ",
42 ": :: : : : : :: : : :: : :: : :: :: : :: :: ",
43 " ",
44]
46LOGO_WIDTH = len(BEE_LINES[1])
48COLOR_SEQUENCE = [AMBER_BRIGHT, AMBER_MID, AMBER_DIM, AMBER_MID]
51def apply_color(line: str, color: str) -> str:
52 """Apply color to non-empty parts of a line."""
53 if not line.strip():
54 return line
55 return color + line + RESET
58def build_logo_frames() -> list[list[str]]:
59 """Pre-create 4 color-pulsed versions of the logo."""
60 return [[apply_color(line, color) for line in BEE_LINES] for color in COLOR_SEQUENCE]
63def build_knight_rider_frames() -> list[str]:
64 """Build 22-frame Knight Rider bar spanning the full logo width."""
65 frames: list[str] = []
66 sweep_range = LOGO_WIDTH - 1
67 total_frames = sweep_range * 2
69 for pos in range(total_frames):
70 head_pos = pos if pos < sweep_range else (total_frames - pos)
72 bar = ""
73 for i in range(LOGO_WIDTH):
74 dist = abs(i - head_pos)
75 if dist == 0:
76 bar += AMBER_BRIGHT + "\u2593" + RESET
77 elif dist == 1:
78 bar += AMBER_DIM + "\u2592" + RESET
79 elif dist == 2:
80 bar += AMBER_DIM + "\u2591" + RESET
81 else:
82 bar += " "
83 frames.append(bar)
85 return frames
88def render_frame(logo_lines: list[str], loading_bar: str) -> bytes:
89 """Build a single frame as raw bytes for os.write()."""
90 all_lines = [*logo_lines, "", f" {loading_bar}"]
91 return ("\n".join(all_lines) + "\n").encode()
94def move_up_and_clear(n: int) -> bytes:
95 """ANSI sequence to move cursor up n lines and clear each one."""
96 return ((MOVE_UP + CLEAR_LINE) * n).encode()
99def clear_screen(frame_height: int) -> bytes:
100 """Erase the splash frame area and restore the cursor to the top.
102 Uses line-by-line clear (move-up + erase) instead of ``\\033[2J\\033[H``
103 so the subprocess never writes a cursor-home escape. A cursor-home
104 would land on the Textual alt-screen if the TUI starts before the
105 subprocess has finished, leaving a stuck cursor artifact at (0,0).
106 """
107 return move_up_and_clear(frame_height) + SHOW_CURSOR.encode()
110def _read_eof(pipe_fd: int) -> bool:
111 """Try to read one byte — returns True if EOF, False if data available."""
112 try:
113 return len(os.read(pipe_fd, 1)) == 0
114 except OSError:
115 return True
118def pipe_closed(pipe_fd: int) -> bool:
119 """Check if the pipe has been closed (EOF) without blocking."""
120 if sys.platform == "win32":
121 import ctypes
122 import msvcrt
124 try:
125 handle = msvcrt.get_osfhandle(pipe_fd)
126 except OSError:
127 return True # bad fd, pipe is gone
128 avail = ctypes.c_ulong(0)
129 if not ctypes.windll.kernel32.PeekNamedPipe(
130 handle, None, 0, None, ctypes.byref(avail), None
131 ):
132 return True
133 if avail.value == 0:
134 return False
135 return _read_eof(pipe_fd)
136 if sys.platform != "win32":
137 try:
138 readable, _, _ = select.select([pipe_fd], [], [], 0)
139 except (ValueError, OSError):
140 return True
141 if not readable:
142 return False
143 return _read_eof(pipe_fd)
144 return True # pragma: no cover
147def animation_loop(pipe_fd: int) -> None:
148 """Run the animation, exiting when the pipe signals EOF."""
149 fd = 2 # stderr
151 logo_frames = build_logo_frames()
152 knight_frames = build_knight_rider_frames()
153 frame_height = len(BEE_LINES) + 2
155 got_signal = False
157 if sys.platform != "win32":
159 def handle_term(signum: int, frame: object) -> None:
160 nonlocal got_signal
161 got_signal = True
163 signal.signal(signal.SIGTERM, handle_term)
165 for _ in range(int(STARTUP_DELAY / POLL_INTERVAL)):
166 if got_signal or pipe_closed(pipe_fd):
167 return
168 time.sleep(POLL_INTERVAL)
170 try:
171 os.write(fd, HIDE_CURSOR.encode())
172 frame_idx = 0
173 knight_idx = 0
175 while not got_signal and not pipe_closed(pipe_fd):
176 logo = logo_frames[frame_idx % len(logo_frames)]
177 knight = knight_frames[knight_idx % len(knight_frames)]
178 rendered = render_frame(logo, knight)
179 os.write(fd, rendered)
181 for _ in range(int(FRAME_INTERVAL / POLL_INTERVAL)):
182 if got_signal or pipe_closed(pipe_fd):
183 break
184 time.sleep(POLL_INTERVAL)
186 if not got_signal and not pipe_closed(pipe_fd):
187 os.write(fd, move_up_and_clear(frame_height)) # pragma: no cover
189 frame_idx += 1
190 knight_idx += 1
191 except OSError:
192 pass
193 finally:
194 with contextlib.suppress(OSError):
195 os.write(fd, clear_screen(frame_height))
198def main() -> None:
199 """Entry point when run as ``python -m lilbee._splash_runner <pipe_fd>``."""
200 if len(sys.argv) != 2:
201 sys.exit(1)
203 try:
204 pipe_fd = int(sys.argv[1])
205 except ValueError:
206 sys.exit(1)
208 try:
209 animation_loop(pipe_fd)
210 finally:
211 with contextlib.suppress(OSError):
212 os.close(pipe_fd)
215if __name__ == "__main__":
216 main()