Coverage for src / lilbee / server / auth.py: 100%
81 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-29 19:16 +0000
1"""Session token auth middleware with decorator-based read-only marking."""
3from __future__ import annotations
5import hmac
6import json
7import logging
8import secrets
9import sys
10from collections.abc import Callable
11from pathlib import Path
12from typing import Any, TypeVar
14from litestar.exceptions import NotAuthorizedException
15from litestar.types import ASGIApp, Receive, Scope, Send
17from lilbee.config import cfg
19log = logging.getLogger(__name__)
21_TOKEN_BYTES = 32
23F = TypeVar("F", bound=Callable[..., Any])
26def read_only(fn: F) -> F:
27 """Mark a route handler as read-only (no auth required)."""
28 fn._lilbee_read_only = True # type: ignore[attr-defined]
29 return fn
32def server_json_path() -> Path:
33 """Return the path to the server session file."""
34 return cfg.data_dir / "server.json"
37class SessionManager:
38 """Manages the server session token lifecycle.
39 Replaces the old module-level ``_session_token`` global so that auth
40 state is explicit and injectable rather than hidden mutable state.
41 """
43 def __init__(self) -> None:
44 self.token: str | None = None
46 def load_or_generate(self) -> str:
47 """Return the persisted token if shape-valid; generate a new one otherwise."""
48 path = server_json_path()
49 existing = self._read_persisted_token(path)
50 if existing is not None:
51 self.token = existing
52 return existing
53 self.token = secrets.token_urlsafe(_TOKEN_BYTES)
54 path.parent.mkdir(parents=True, exist_ok=True)
55 path.write_text(json.dumps({"token": self.token}))
56 if sys.platform != "win32":
57 path.chmod(0o600)
58 return self.token
60 @staticmethod
61 def _read_persisted_token(path: Path) -> str | None:
62 """Return a previously-persisted token if shape-valid, else None."""
63 try:
64 raw = path.read_text()
65 except (FileNotFoundError, OSError):
66 return None
67 try:
68 data = json.loads(raw)
69 except json.JSONDecodeError:
70 return None
71 if not isinstance(data, dict):
72 return None
73 token = data.get("token")
74 if not isinstance(token, str) or len(token) < _TOKEN_BYTES:
75 return None
76 return token
78 def cleanup(self) -> None:
79 """Remove server.json on shutdown and clear the in-memory token."""
80 self.token = None
81 path = server_json_path()
82 path.unlink(missing_ok=True)
84 def validate(self, auth_header: str) -> bool:
85 """Check whether *auth_header* carries a valid bearer token."""
86 if self.token is None:
87 return True # auth disabled (tests)
88 if not self.token:
89 raise NotAuthorizedException("Server token not initialized")
90 return hmac.compare_digest(auth_header, f"Bearer {self.token}")
93# Singleton instance — used by AuthMiddleware and the app lifespan.
94session_manager = SessionManager()
97class AuthMiddleware:
98 """Bearer token auth middleware for mutating endpoints."""
100 def __init__(self, app: ASGIApp) -> None:
101 self.app = app
103 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
104 if scope["type"] != "http":
105 await self.app(scope, receive, send)
106 return
108 method = scope.get("method", "")
109 if method == "OPTIONS":
110 await self.app(scope, receive, send)
111 return
113 handler = scope.get("route_handler")
114 if handler and getattr(handler.fn, "_lilbee_read_only", False):
115 await self.app(scope, receive, send)
116 return
118 headers = dict(scope.get("headers", []))
119 auth_header = headers.get(b"authorization", b"").decode()
120 if session_manager.validate(auth_header):
121 await self.app(scope, receive, send)
122 return
123 raise NotAuthorizedException("Missing or invalid bearer token")