81b3007efd
Python Lint CI / ruff-format-check (push) Successful in 9m32s
Python Lint CI / ruff (push) Successful in 9m33s
Python Lint CI / ty (push) Successful in 9m18s
Docker CI / docker-build (push) Successful in 10m35s
Python Test CI / coverage (push) Successful in 2m20s
101 lines
3.4 KiB
Python
101 lines
3.4 KiB
Python
import hashlib
|
|
import logging
|
|
import pathlib
|
|
import secrets
|
|
import typing
|
|
|
|
from stapler import PKG_NAME
|
|
|
|
if typing.TYPE_CHECKING:
|
|
from .params import Parameters
|
|
from .registry import Registry
|
|
|
|
|
|
class TokenManager:
|
|
__slots__ = [
|
|
"last_file_change",
|
|
"logger",
|
|
"pbkdf2_iterations",
|
|
"registry",
|
|
"token_hashes",
|
|
"token_salt",
|
|
"tokens_file",
|
|
]
|
|
|
|
FILE = ".tokens"
|
|
|
|
def __init__(
|
|
self, params: Parameters, registry: Registry, pbkdf2_iterations: int = 500_000
|
|
) -> None:
|
|
self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)
|
|
self.token_salt: bytes = params.token_salt.encode()
|
|
self.tokens_file: pathlib.Path = pathlib.Path(params.data_dir) / self.FILE
|
|
self.registry: Registry = registry
|
|
self.pbkdf2_iterations: int = pbkdf2_iterations
|
|
self.token_hashes: list[str] = []
|
|
self.last_file_change: int | float = 0
|
|
|
|
def init(self) -> None:
|
|
self.logger.debug("Initializing...")
|
|
if not len(self.token_salt):
|
|
self.logger.warning(
|
|
"No salt provided, tokens will be cryptographically weak"
|
|
)
|
|
self.token_hashes = self.__load_hashes()
|
|
if not self.tokens_file.exists():
|
|
self.__save_hashes()
|
|
self.last_file_change = self.tokens_file.stat().st_mtime
|
|
|
|
def is_valid(self, token: str) -> bool:
|
|
return self.__hash_token(token) in self.token_hashes
|
|
|
|
def is_valid_for_path(self, token: str, path: str) -> bool:
|
|
return (page := self.registry.get_from_path(path)) is None or (
|
|
page.token_hash is None or page.token_hash == self.__hash_token(token)
|
|
)
|
|
|
|
def set_token(self, path: str, token: str) -> None:
|
|
self.registry.set_token_hash(path, self.__hash_token(token))
|
|
|
|
def new_token(self) -> None:
|
|
new_token = secrets.token_hex(16)
|
|
self.token_hashes += [self.__hash_token(new_token)]
|
|
self.__save_hashes()
|
|
self.logger.warning("NEW TOKEN: %s", new_token)
|
|
self.logger.warning("Please copy this secret value before it disappears")
|
|
|
|
def detect_file_change(self) -> bool:
|
|
if (
|
|
self.tokens_file.exists()
|
|
and (file_change := self.tokens_file.stat().st_mtime)
|
|
!= self.last_file_change
|
|
):
|
|
self.logger.debug("Detected change: %s", self.tokens_file)
|
|
self.last_file_change = file_change
|
|
return True
|
|
return False
|
|
|
|
def __hash_token(self, token: str) -> str:
|
|
return hashlib.pbkdf2_hmac(
|
|
"sha256", token.encode(), self.token_salt, self.pbkdf2_iterations
|
|
).hex()
|
|
|
|
def __load_hashes(self) -> list[str]:
|
|
if self.tokens_file.is_file():
|
|
with self.tokens_file.open() as file:
|
|
hashes = [line.strip() for line in file]
|
|
if len(hashes) == 0 or hashes[0] != self.__control_hash():
|
|
self.logger.critical("TOKEN_SALT CHANGED HASHES NOT LOADED")
|
|
return []
|
|
return hashes[1:]
|
|
return []
|
|
|
|
def __save_hashes(self) -> None:
|
|
with self.tokens_file.open(mode="w") as file:
|
|
file.write("\n".join([self.__control_hash(), *self.token_hashes]))
|
|
self.tokens_file.chmod(0o600)
|
|
self.logger.debug("Updated %s", self.tokens_file)
|
|
|
|
def __control_hash(self) -> str:
|
|
return self.__hash_token(PKG_NAME)
|