Coverage for src / lilbee / _splash_runner.py: 100%

101 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-29 19:16 +0000

1"""Standalone splash animation process — zero lilbee imports, stdlib only. 

2 

3Launched as a subprocess by ``splash.start()``. Reads a pipe fd from argv 

4and animates until the pipe signals EOF (parent closed its write end, or 

5parent died). This guarantees no orphan/zombie animation processes. 

6""" 

7 

8from __future__ import annotations 

9 

10import contextlib 

11import os 

12import select 

13import signal 

14import sys 

15import time 

16 

17HIDE_CURSOR = "\033[?25l" 

18SHOW_CURSOR = "\033[?25h" 

19CLEAR_LINE = "\033[2K" 

20MOVE_UP = "\033[A" 

21 

22AMBER_BRIGHT = "\033[38;5;214m" 

23AMBER_MID = "\033[38;5;172m" 

24AMBER_DIM = "\033[38;5;94m" 

25RESET = "\033[0m" 

26 

27FRAME_INTERVAL = 0.15 

28STARTUP_DELAY = 0.08 

29POLL_INTERVAL = 0.01 

30 

31BEE_LINES = [ 

32 " ", 

33 "@@@ @@@ @@@ @@@@@@@ @@@@@@@@ @@@@@@@@ ", 

34 "@@@ @@@ @@@ @@@@@@@@ @@@@@@@@ @@@@@@@@ ", 

35 "@@@ @@@ @@@ @@! @@@ @@! @@! ", 

36 "@! !@! !@! !@ @!@ !@! !@! ", 

37 "@!! !!@ @!! @!@!@!@ @!!!:! @!!!:! ", 

38 "!!! !!! !!! !!!@!!!! !!!!!: !!!!!: ", 

39 "!!: !!: !!: !!: !!! !!: !!: ", 

40 " :!: :!: :!: :!: !:! :!: :!: ", 

41 " :: :::: :: :: :::: :: :::: :: :::: :: :::: ", 

42 ": :: : : : : :: : : :: : :: : :: :: : :: :: ", 

43 " ", 

44] 

45 

46LOGO_WIDTH = len(BEE_LINES[1]) 

47 

48COLOR_SEQUENCE = [AMBER_BRIGHT, AMBER_MID, AMBER_DIM, AMBER_MID] 

49 

50 

51def apply_color(line: str, color: str) -> str: 

52 """Apply color to non-empty parts of a line.""" 

53 if not line.strip(): 

54 return line 

55 return color + line + RESET 

56 

57 

58def build_logo_frames() -> list[list[str]]: 

59 """Pre-create 4 color-pulsed versions of the logo.""" 

60 return [[apply_color(line, color) for line in BEE_LINES] for color in COLOR_SEQUENCE] 

61 

62 

63def build_knight_rider_frames() -> list[str]: 

64 """Build 22-frame Knight Rider bar spanning the full logo width.""" 

65 frames: list[str] = [] 

66 sweep_range = LOGO_WIDTH - 1 

67 total_frames = sweep_range * 2 

68 

69 for pos in range(total_frames): 

70 head_pos = pos if pos < sweep_range else (total_frames - pos) 

71 

72 bar = "" 

73 for i in range(LOGO_WIDTH): 

74 dist = abs(i - head_pos) 

75 if dist == 0: 

76 bar += AMBER_BRIGHT + "\u2593" + RESET 

77 elif dist == 1: 

78 bar += AMBER_DIM + "\u2592" + RESET 

79 elif dist == 2: 

80 bar += AMBER_DIM + "\u2591" + RESET 

81 else: 

82 bar += " " 

83 frames.append(bar) 

84 

85 return frames 

86 

87 

88def render_frame(logo_lines: list[str], loading_bar: str) -> bytes: 

89 """Build a single frame as raw bytes for os.write().""" 

90 all_lines = [*logo_lines, "", f" {loading_bar}"] 

91 return ("\n".join(all_lines) + "\n").encode() 

92 

93 

94def move_up_and_clear(n: int) -> bytes: 

95 """ANSI sequence to move cursor up n lines and clear each one.""" 

96 return ((MOVE_UP + CLEAR_LINE) * n).encode() 

97 

98 

99def clear_screen(frame_height: int) -> bytes: 

100 """Erase the splash frame area and restore the cursor to the top. 

101 

102 Uses line-by-line clear (move-up + erase) instead of ``\\033[2J\\033[H`` 

103 so the subprocess never writes a cursor-home escape. A cursor-home 

104 would land on the Textual alt-screen if the TUI starts before the 

105 subprocess has finished, leaving a stuck cursor artifact at (0,0). 

106 """ 

107 return move_up_and_clear(frame_height) + SHOW_CURSOR.encode() 

108 

109 

110def _read_eof(pipe_fd: int) -> bool: 

111 """Try to read one byte — returns True if EOF, False if data available.""" 

112 try: 

113 return len(os.read(pipe_fd, 1)) == 0 

114 except OSError: 

115 return True 

116 

117 

118def pipe_closed(pipe_fd: int) -> bool: 

119 """Check if the pipe has been closed (EOF) without blocking.""" 

120 if sys.platform == "win32": 

121 import ctypes 

122 import msvcrt 

123 

124 try: 

125 handle = msvcrt.get_osfhandle(pipe_fd) 

126 except OSError: 

127 return True # bad fd, pipe is gone 

128 avail = ctypes.c_ulong(0) 

129 if not ctypes.windll.kernel32.PeekNamedPipe( 

130 handle, None, 0, None, ctypes.byref(avail), None 

131 ): 

132 return True 

133 if avail.value == 0: 

134 return False 

135 return _read_eof(pipe_fd) 

136 if sys.platform != "win32": 

137 try: 

138 readable, _, _ = select.select([pipe_fd], [], [], 0) 

139 except (ValueError, OSError): 

140 return True 

141 if not readable: 

142 return False 

143 return _read_eof(pipe_fd) 

144 return True # pragma: no cover 

145 

146 

147def animation_loop(pipe_fd: int) -> None: 

148 """Run the animation, exiting when the pipe signals EOF.""" 

149 fd = 2 # stderr 

150 

151 logo_frames = build_logo_frames() 

152 knight_frames = build_knight_rider_frames() 

153 frame_height = len(BEE_LINES) + 2 

154 

155 got_signal = False 

156 

157 if sys.platform != "win32": 

158 

159 def handle_term(signum: int, frame: object) -> None: 

160 nonlocal got_signal 

161 got_signal = True 

162 

163 signal.signal(signal.SIGTERM, handle_term) 

164 

165 for _ in range(int(STARTUP_DELAY / POLL_INTERVAL)): 

166 if got_signal or pipe_closed(pipe_fd): 

167 return 

168 time.sleep(POLL_INTERVAL) 

169 

170 try: 

171 os.write(fd, HIDE_CURSOR.encode()) 

172 frame_idx = 0 

173 knight_idx = 0 

174 

175 while not got_signal and not pipe_closed(pipe_fd): 

176 logo = logo_frames[frame_idx % len(logo_frames)] 

177 knight = knight_frames[knight_idx % len(knight_frames)] 

178 rendered = render_frame(logo, knight) 

179 os.write(fd, rendered) 

180 

181 for _ in range(int(FRAME_INTERVAL / POLL_INTERVAL)): 

182 if got_signal or pipe_closed(pipe_fd): 

183 break 

184 time.sleep(POLL_INTERVAL) 

185 

186 if not got_signal and not pipe_closed(pipe_fd): 

187 os.write(fd, move_up_and_clear(frame_height)) # pragma: no cover 

188 

189 frame_idx += 1 

190 knight_idx += 1 

191 except OSError: 

192 pass 

193 finally: 

194 with contextlib.suppress(OSError): 

195 os.write(fd, clear_screen(frame_height)) 

196 

197 

198def main() -> None: 

199 """Entry point when run as ``python -m lilbee._splash_runner <pipe_fd>``.""" 

200 if len(sys.argv) != 2: 

201 sys.exit(1) 

202 

203 try: 

204 pipe_fd = int(sys.argv[1]) 

205 except ValueError: 

206 sys.exit(1) 

207 

208 try: 

209 animation_loop(pipe_fd) 

210 finally: 

211 with contextlib.suppress(OSError): 

212 os.close(pipe_fd) 

213 

214 

215if __name__ == "__main__": 

216 main()