Coverage for src / lilbee / lock.py: 100%

29 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-29 19:16 +0000

1"""Write locking for LanceDB access. 

2 

3Combines an in-process mutex with a cross-process file lock (filelock) 

4so separate processes also coordinate writes. Read consistency is handled 

5by LanceDB's built-in MVCC via read_consistency_interval in store.py. 

6""" 

7 

8import logging 

9import threading 

10from collections.abc import Generator 

11from contextlib import contextmanager 

12from pathlib import Path 

13 

14from filelock import FileLock 

15from filelock import Timeout as FileLockTimeout 

16 

17from lilbee.config import cfg 

18 

19log = logging.getLogger(__name__) 

20 

21# Default timeout (seconds) for acquiring the write lock 

22LOCK_TIMEOUT = 30.0 

23 

24 

25class LockTimeoutError(TimeoutError): 

26 """Raised when a lock cannot be acquired within the timeout.""" 

27 

28 

29# In-process write mutex — serializes writers within the same process 

30_write_mutex = threading.Lock() 

31 

32 

33def _lock_path() -> Path: 

34 return cfg.lancedb_dir / ".lock" 

35 

36 

37@contextmanager 

38def write_lock(timeout: float = LOCK_TIMEOUT) -> Generator[None, None, None]: 

39 """Context manager: acquire exclusive file lock then in-process mutex.""" 

40 flock = FileLock(_lock_path()) 

41 try: 

42 flock.acquire(timeout=timeout) 

43 except FileLockTimeout: 

44 raise LockTimeoutError("Timed out waiting for exclusive file lock") from None 

45 try: 

46 acquired = _write_mutex.acquire(timeout=timeout) 

47 if not acquired: 

48 raise LockTimeoutError("Timed out waiting for write lock") 

49 try: 

50 yield 

51 finally: 

52 _write_mutex.release() 

53 finally: 

54 flock.release()