Coverage for src / lilbee / asyncio_loop.py: 100%

51 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-29 19:16 +0000

1"""Process-lifetime background asyncio loop for TUI workers. 

2 

3One loop on a daemon thread, used by every @work(thread=True) worker. 

4CLI one-shots and the server own their own loops — don't route them here. 

5""" 

6 

7from __future__ import annotations 

8 

9import asyncio 

10import atexit 

11import concurrent.futures 

12import contextlib 

13import threading 

14from collections.abc import Coroutine 

15from typing import Any, TypeVar 

16 

17T = TypeVar("T") 

18 

19_loop: asyncio.AbstractEventLoop | None = None 

20_thread: threading.Thread | None = None 

21_lock = threading.Lock() 

22_atexit_registered = False 

23 

24 

25def get_loop() -> asyncio.AbstractEventLoop: 

26 """Return the background loop, starting it on a daemon thread if needed.""" 

27 global _loop, _thread, _atexit_registered 

28 with _lock: 

29 if _loop is not None and not _loop.is_closed(): 

30 return _loop 

31 loop = asyncio.new_event_loop() 

32 thread = threading.Thread( 

33 target=loop.run_forever, 

34 name="lilbee-bg-loop", 

35 daemon=True, 

36 ) 

37 thread.start() 

38 _loop = loop 

39 _thread = thread 

40 if not _atexit_registered: 

41 # Register once per process; shutdown is idempotent, so restarting 

42 # the loop later doesn't need a second registration. 

43 atexit.register(shutdown) 

44 _atexit_registered = True 

45 return loop 

46 

47 

48def run(coro: Coroutine[Any, Any, T]) -> T: 

49 """Submit *coro* to the background loop from any thread; block for result. 

50 

51 Exceptions raised inside *coro* propagate unchanged, including 

52 asyncio.CancelledError. 

53 """ 

54 loop = get_loop() 

55 try: 

56 return asyncio.run_coroutine_threadsafe(coro, loop).result() 

57 except concurrent.futures.CancelledError as exc: 

58 # run_coroutine_threadsafe re-raises cancellation as the concurrent 

59 # flavour; rewrap so `except asyncio.CancelledError` still matches. 

60 raise asyncio.CancelledError(*exc.args) from None 

61 

62 

63def shutdown() -> None: 

64 """Cancel pending tasks, stop the loop, join the thread. Idempotent.""" 

65 global _loop, _thread 

66 with _lock: 

67 loop, _loop = _loop, None 

68 thread, _thread = _thread, None 

69 if loop is None or loop.is_closed(): 

70 return 

71 # Best-effort drain; always stop the loop even if drain raised. 

72 with contextlib.suppress(Exception): 

73 asyncio.run_coroutine_threadsafe(_drain(loop), loop).result(timeout=10.0) 

74 loop.call_soon_threadsafe(loop.stop) 

75 if thread is not None: 

76 thread.join(timeout=10.0) 

77 loop.close() 

78 

79 

80async def _drain(loop: asyncio.AbstractEventLoop) -> None: 

81 pending = [t for t in asyncio.all_tasks(loop) if t is not asyncio.current_task()] 

82 for task in pending: 

83 task.cancel() 

84 if pending: 

85 await asyncio.gather(*pending, return_exceptions=True) 

86 # Give scheduled close callbacks a chance to run before we stop the loop. 

87 await asyncio.sleep(0.05)