Coverage for src / lilbee / asyncio_loop.py: 100%
51 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
1"""Process-lifetime background asyncio loop for TUI workers.
3One loop on a daemon thread, used by every @work(thread=True) worker.
4CLI one-shots and the server own their own loops — don't route them here.
5"""
7from __future__ import annotations
9import asyncio
10import atexit
11import concurrent.futures
12import contextlib
13import threading
14from collections.abc import Coroutine
15from typing import Any, TypeVar
17T = TypeVar("T")
19_loop: asyncio.AbstractEventLoop | None = None
20_thread: threading.Thread | None = None
21_lock = threading.Lock()
22_atexit_registered = False
25def get_loop() -> asyncio.AbstractEventLoop:
26 """Return the background loop, starting it on a daemon thread if needed."""
27 global _loop, _thread, _atexit_registered
28 with _lock:
29 if _loop is not None and not _loop.is_closed():
30 return _loop
31 loop = asyncio.new_event_loop()
32 thread = threading.Thread(
33 target=loop.run_forever,
34 name="lilbee-bg-loop",
35 daemon=True,
36 )
37 thread.start()
38 _loop = loop
39 _thread = thread
40 if not _atexit_registered:
41 # Register once per process; shutdown is idempotent, so restarting
42 # the loop later doesn't need a second registration.
43 atexit.register(shutdown)
44 _atexit_registered = True
45 return loop
48def run(coro: Coroutine[Any, Any, T]) -> T:
49 """Submit *coro* to the background loop from any thread; block for result.
51 Exceptions raised inside *coro* propagate unchanged, including
52 asyncio.CancelledError.
53 """
54 loop = get_loop()
55 try:
56 return asyncio.run_coroutine_threadsafe(coro, loop).result()
57 except concurrent.futures.CancelledError as exc:
58 # run_coroutine_threadsafe re-raises cancellation as the concurrent
59 # flavour; rewrap so `except asyncio.CancelledError` still matches.
60 raise asyncio.CancelledError(*exc.args) from None
63def shutdown() -> None:
64 """Cancel pending tasks, stop the loop, join the thread. Idempotent."""
65 global _loop, _thread
66 with _lock:
67 loop, _loop = _loop, None
68 thread, _thread = _thread, None
69 if loop is None or loop.is_closed():
70 return
71 # Best-effort drain; always stop the loop even if drain raised.
72 with contextlib.suppress(Exception):
73 asyncio.run_coroutine_threadsafe(_drain(loop), loop).result(timeout=10.0)
74 loop.call_soon_threadsafe(loop.stop)
75 if thread is not None:
76 thread.join(timeout=10.0)
77 loop.close()
80async def _drain(loop: asyncio.AbstractEventLoop) -> None:
81 pending = [t for t in asyncio.all_tasks(loop) if t is not asyncio.current_task()]
82 for task in pending:
83 task.cancel()
84 if pending:
85 await asyncio.gather(*pending, return_exceptions=True)
86 # Give scheduled close callbacks a chance to run before we stop the loop.
87 await asyncio.sleep(0.05)