Coverage for src / lilbee / server / auth.py: 100%

81 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-29 19:16 +0000

1"""Session token auth middleware with decorator-based read-only marking.""" 

2 

3from __future__ import annotations 

4 

5import hmac 

6import json 

7import logging 

8import secrets 

9import sys 

10from collections.abc import Callable 

11from pathlib import Path 

12from typing import Any, TypeVar 

13 

14from litestar.exceptions import NotAuthorizedException 

15from litestar.types import ASGIApp, Receive, Scope, Send 

16 

17from lilbee.config import cfg 

18 

19log = logging.getLogger(__name__) 

20 

21_TOKEN_BYTES = 32 

22 

23F = TypeVar("F", bound=Callable[..., Any]) 

24 

25 

26def read_only(fn: F) -> F: 

27 """Mark a route handler as read-only (no auth required).""" 

28 fn._lilbee_read_only = True # type: ignore[attr-defined] 

29 return fn 

30 

31 

32def server_json_path() -> Path: 

33 """Return the path to the server session file.""" 

34 return cfg.data_dir / "server.json" 

35 

36 

37class SessionManager: 

38 """Manages the server session token lifecycle. 

39 Replaces the old module-level ``_session_token`` global so that auth 

40 state is explicit and injectable rather than hidden mutable state. 

41 """ 

42 

43 def __init__(self) -> None: 

44 self.token: str | None = None 

45 

46 def load_or_generate(self) -> str: 

47 """Return the persisted token if shape-valid; generate a new one otherwise.""" 

48 path = server_json_path() 

49 existing = self._read_persisted_token(path) 

50 if existing is not None: 

51 self.token = existing 

52 return existing 

53 self.token = secrets.token_urlsafe(_TOKEN_BYTES) 

54 path.parent.mkdir(parents=True, exist_ok=True) 

55 path.write_text(json.dumps({"token": self.token})) 

56 if sys.platform != "win32": 

57 path.chmod(0o600) 

58 return self.token 

59 

60 @staticmethod 

61 def _read_persisted_token(path: Path) -> str | None: 

62 """Return a previously-persisted token if shape-valid, else None.""" 

63 try: 

64 raw = path.read_text() 

65 except (FileNotFoundError, OSError): 

66 return None 

67 try: 

68 data = json.loads(raw) 

69 except json.JSONDecodeError: 

70 return None 

71 if not isinstance(data, dict): 

72 return None 

73 token = data.get("token") 

74 if not isinstance(token, str) or len(token) < _TOKEN_BYTES: 

75 return None 

76 return token 

77 

78 def cleanup(self) -> None: 

79 """Remove server.json on shutdown and clear the in-memory token.""" 

80 self.token = None 

81 path = server_json_path() 

82 path.unlink(missing_ok=True) 

83 

84 def validate(self, auth_header: str) -> bool: 

85 """Check whether *auth_header* carries a valid bearer token.""" 

86 if self.token is None: 

87 return True # auth disabled (tests) 

88 if not self.token: 

89 raise NotAuthorizedException("Server token not initialized") 

90 return hmac.compare_digest(auth_header, f"Bearer {self.token}") 

91 

92 

93# Singleton instance — used by AuthMiddleware and the app lifespan. 

94session_manager = SessionManager() 

95 

96 

97class AuthMiddleware: 

98 """Bearer token auth middleware for mutating endpoints.""" 

99 

100 def __init__(self, app: ASGIApp) -> None: 

101 self.app = app 

102 

103 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: 

104 if scope["type"] != "http": 

105 await self.app(scope, receive, send) 

106 return 

107 

108 method = scope.get("method", "") 

109 if method == "OPTIONS": 

110 await self.app(scope, receive, send) 

111 return 

112 

113 handler = scope.get("route_handler") 

114 if handler and getattr(handler.fn, "_lilbee_read_only", False): 

115 await self.app(scope, receive, send) 

116 return 

117 

118 headers = dict(scope.get("headers", [])) 

119 auth_header = headers.get(b"authorization", b"").decode() 

120 if session_manager.validate(auth_header): 

121 await self.app(scope, receive, send) 

122 return 

123 raise NotAuthorizedException("Missing or invalid bearer token")