feat: better tokens

This commit is contained in:
2026-04-17 00:48:44 +02:00
parent b103890009
commit 1f1f142446
12 changed files with 137 additions and 38 deletions
+1 -1
View File
@@ -1,2 +1,2 @@
HOST=example.com
TOKEN=secret
TOKEN_SALT=secret
+1 -2
View File
@@ -12,7 +12,6 @@ RUFF ?= $(UV) run --active ruff
TY ?= $(UV) run --active ty
DOCKER ?= docker
DOCKER_TAG ?= localhost/stapler:latest
TOKEN ?= secret
PORT ?= 8080
# DOCS
@@ -72,7 +71,7 @@ docker-build: ## docker build
.PHONY: docker-run
docker-run: docker-build ## docker run
@$(DOCKER) run -it -p $(PORT):80 -v ./data:/data $(DOCKER_TAG) --debug --no-certbot --no-https --token $(TOKEN) --host localhost:$(PORT) run
@$(DOCKER) run -it -p $(PORT):80 -v ./data:/data $(DOCKER_TAG) --debug --no-certbot --no-https --host localhost:$(PORT) run
# ACTIONS
+8 -4
View File
@@ -3,8 +3,10 @@
![logo.svg](logo.svg)
```txt
usage: stapler [-h] [--debug | --no-debug] [-d DATA_DIR] [--certificates | --no-certificates] [--certbot | --no-certbot] [--self-signed-path SELF_SIGNED_PATH] [--certbot-conf CERTBOT_CONF]
[--certbot-www CERTBOT_WWW] [--host HOST] [--http-port HTTP_PORT] [--https-port HTTPS_PORT] [--https | --no-https] [-t TOKEN] [--max-size-bytes MAX_SIZE] [-b BIND]
usage: stapler [-h] [--debug | --no-debug] [-d DATA_DIR] [--certificates | --no-certificates] [--certbot | --no-certbot]
[--self-signed-path SELF_SIGNED_PATH] [--certbot-conf CERTBOT_CONF] [--certbot-www CERTBOT_WWW]
[--host HOST] [--http-port HTTP_PORT] [--https-port HTTPS_PORT] [--https | --no-https] [-t TOKEN_SALT]
[--max-size-bytes MAX_SIZE] [-b BIND]
COMMAND ...
Static pages as simple as a gzip file
@@ -13,6 +15,7 @@ positional arguments:
COMMAND
run Run Stapler server
renew Renew certificates
token Generate a new token
options:
-h, --help show this help message and exit
@@ -35,7 +38,8 @@ options:
--https-port HTTPS_PORT
server https port (default: 443)
--https, --no-https Use https (implies --certificates) (default: true)
-t, --token TOKEN secret token for update requests (default: )
-t, --token-salt TOKEN_SALT
salt for tokens generation
--max-size-bytes MAX_SIZE
max size of accepted archives (in bytes) (default: 2000000)
-b, --bind BIND server bind address (default: 0.0.0.0)
@@ -104,7 +108,7 @@ curl -X DELETE \
- [x] add favicon.ico + special path
- [x] [http.server security](https://docs.python.org/3/library/http.server.html#http-server-security)
- [x] launch separate upgrade 80->443 server when https
- [ ] token management with "generate" command and bind path to specific token
- [x] token management with "generate" command and bind path to specific token
- [x] docker compose example + .env
- [ ] proper doc
+1 -1
View File
@@ -12,5 +12,5 @@ services:
- "./letsencrypt:/etc/letsencrypt"
environment:
- HOST=${HOST}
- TOKEN=${TOKEN}
- TOKEN_SALT=${TOKEN_SALT}
command: run
+1
View File
@@ -26,6 +26,7 @@ class CertManager:
self.with_certbot = params.with_certbot
def init(self, hosts: list[str]) -> None:
self.logger.debug("Initializing...")
if not self.certbot_www.exists():
self.certbot_www.mkdir(parents=True)
self.logger.debug("Created %s", self.certbot_www)
+14 -14
View File
@@ -10,7 +10,6 @@ if typing.TYPE_CHECKING:
class DataDir:
HOST_FILE = ".host"
PATH_REGEX = re.compile(r"^[\w-]+$")
NEEDED_FILES: typing.ClassVar[list[str]] = ["favicon.ico"]
@@ -19,6 +18,7 @@ class DataDir:
self.root_path = pathlib.Path(root_path)
def init(self) -> None:
self.logger.debug("Initializing...")
for file in self.NEEDED_FILES:
if not (self.root_path / file).is_file():
(pathlib.Path.cwd() / file).copy_into(self.root_path)
@@ -34,28 +34,28 @@ class DataDir:
def __valid_path(self, path: str) -> bool:
return self.PATH_REGEX.match(path) is not None
def set_host(self, path: str, host: str) -> None:
if self.exists(path):
path_host = self.root_path / path / self.HOST_FILE
with path_host.open(mode="w") as host_file:
host_file.write(host)
self.logger.debug("Wrote %s", path_host)
def has_index(self, path: str) -> bool:
if self.exists(path):
path_index = self.root_path / path / "index.html"
return path_index.is_file()
return False
def get_host(self, path: str) -> str | None:
def set_file(self, path: str, file_name: str, value: str) -> None:
if self.exists(path):
path_host = self.root_path / path / self.HOST_FILE
if path_host.is_file():
file_path = self.root_path / path / file_name
with file_path.open(mode="w") as file:
file.write(value)
self.logger.debug("Wrote %s", file_path)
def get_file(self, path: str, file_name: str) -> str | None:
if self.exists(path):
file_path = self.root_path / path / file_name
if file_path.is_file():
try:
with path_host.open() as host_file:
return host_file.read().split("\n")[0].strip()
with file_path.open() as file:
return file.read().split("\n")[0].strip()
except Exception:
self.logger.exception("Cannot read %s", path_host)
self.logger.exception("Cannot read %s", file_path)
return None
return None
+11 -4
View File
@@ -9,7 +9,7 @@ import re
import tarfile
import typing
from . import STAPLER_ASCII, cert, data_dir, logs, project
from . import STAPLER_ASCII, cert, data_dir, logs, project, tokens
if typing.TYPE_CHECKING:
from . import params, registry
@@ -150,10 +150,11 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, _BaseHandler):
params: params.Parameters,
registry: registry.Registry,
cert_manager: cert.CertManager,
token_manager: tokens.TokenManager,
**kwargs: dict[str, typing.Any],
) -> None:
self.logger = logging.getLogger(self.__class__.__name__)
self.token = params.token
self.token_manager = token_manager
self.data_dir = data_dir.DataDir(params.data_dir)
self.max_size_bytes = params.max_size_bytes
self.registry = registry
@@ -202,9 +203,9 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, _BaseHandler):
f"Resource /{sub_path}/ updated",
)
self.registry.add(sub_path)
self.token_manager.set_token(self.headers["X-Token"], sub_path)
if host is not None and self.cert_manager.create_or_update(host):
self.registry.set_host(sub_path, host)
self.registry.add(sub_path)
return None
def do_DELETE(self) -> None:
@@ -245,12 +246,18 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, _BaseHandler):
return super().translate_path(path)
def __check_update_request(self) -> str | None:
if len(self.token) and self.headers["X-Token"] != self.token:
if (token := self.headers["X-Token"]) is None:
self.send_error(http.HTTPStatus.BAD_REQUEST, "No X-Token header in request")
return None
if not self.token_manager.is_valid(token):
self.send_error(http.HTTPStatus.UNAUTHORIZED, "Invalid token")
return None
if (sub_path := self.__get_subpath_full(self.path)) is None:
self.send_error(http.HTTPStatus.BAD_REQUEST, "Invalid path")
return None
if not self.token_manager.is_valid_for_path(token, sub_path):
self.send_error(http.HTTPStatus.FORBIDDEN, "Path forbidden for this token")
return None
return sub_path
def __get_subpath(self, path: str) -> str | None:
+1
View File
@@ -6,6 +6,7 @@ class Page:
path: str
with_index: bool
host: str | None = None
token_hash: str | None = None
def get_url_path(self) -> str:
return f"/{self.path}/"
+6 -5
View File
@@ -15,7 +15,7 @@ class Parameters:
host: str
data_dir: str
bind: str
token: str
token_salt: str
max_size_bytes: int
certbot_conf: str
certbot_www: str
@@ -55,7 +55,7 @@ def __add_arg_str(
*flags,
metavar=env_var,
default=__get_env_str(env_var, default),
help=f"{help_txt} (default: {default})",
help=f"{help_txt} (default: {default})" if len(default) else help_txt,
)
@@ -171,10 +171,10 @@ def parse_parameters() -> Parameters:
__add_arg_str(
parser,
"-t",
"--token",
env_var="TOKEN",
"--token-salt",
env_var="TOKEN_SALT",
default="",
help_txt="secret token for update requests",
help_txt="salt for tokens generation",
)
__add_arg_int(
parser,
@@ -194,6 +194,7 @@ def parse_parameters() -> Parameters:
subparsers = parser.add_subparsers(dest="command", required=True, metavar="COMMAND")
subparsers.add_parser("run", help="Run Stapler server")
subparsers.add_parser("renew", help="Renew certificates")
subparsers.add_parser("token", help="Generate a new token")
args = parser.parse_args()
if args.https:
args.with_certificates = True
+19 -2
View File
@@ -8,6 +8,9 @@ if typing.TYPE_CHECKING:
class Registry:
HOST_FILE = ".host"
TOKEN_FILE = ".token" # noqa: S105
def __init__(self, params: params.Parameters) -> None:
self.logger = logging.getLogger(self.__class__.__name__)
self.pages: dict[str, page.Page] = {}
@@ -25,19 +28,33 @@ class Registry:
self.pages[path] = page.Page(
path,
self.data_dir.has_index(path),
self.data_dir.get_host(path),
self.data_dir.get_file(path, self.HOST_FILE),
self.data_dir.get_file(path, self.TOKEN_FILE),
)
self.logger.info("Updated %s", self.pages[path])
def set_host(self, path: str, host: str) -> None:
self.data_dir.set_host(path, host)
if self.pages[path].host != host:
self.data_dir.set_file(path, self.HOST_FILE, host)
self.pages[path].host = host
self.logger.debug("Updated %s", self.pages[path])
def set_token_hash(self, path: str, token_hash: str) -> None:
if self.pages[path].token_hash != token_hash:
self.data_dir.set_file(path, self.TOKEN_FILE, token_hash)
self.pages[path].token_hash = token_hash
self.logger.debug("Updated %s", self.pages[path])
def remove(self, path: str) -> None:
page = self.pages[path]
del self.pages[path]
self.logger.info("Removed %s", page)
def get_from_path(self, path: str) -> page.Page | None:
if path in self.pages:
return self.pages[path]
return None
def get_from_host(self, host: str) -> page.Page | None:
for p in self.pages.values():
if p.host == host:
+11 -3
View File
@@ -4,7 +4,7 @@ import logging
import threading
import typing
from . import STAPLER_ASCII, cert, data_dir, handlers, project, registry
from . import STAPLER_ASCII, cert, data_dir, handlers, project, registry, tokens
if typing.TYPE_CHECKING:
from . import params
@@ -16,6 +16,7 @@ class StaplerServer:
self.params = params
self.registry = registry.Registry(params)
self.cert_manager = cert.CertManager(params)
self.token_manager = tokens.TokenManager(params, self.registry)
self.data_dir = data_dir.DataDir(params.data_dir)
self.default_host = params.host.split(":", maxsplit=2)[0]
@@ -28,8 +29,7 @@ class StaplerServer:
if self.params.with_certificates:
self.cert_manager.init(self.__get_all_hosts())
self.data_dir.init()
if not len(self.params.token):
self.logger.warning("No token provided update requests will fail")
self.token_manager.init()
def __create_https_context(self, server: http.server.HTTPServer) -> bool:
https = False
@@ -48,6 +48,7 @@ class StaplerServer:
params=self.params,
registry=self.registry,
cert_manager=self.cert_manager,
token_manager=self.token_manager,
)
def __create_base_server(self) -> tuple[http.server.ThreadingHTTPServer, bool]:
@@ -133,3 +134,10 @@ class StaplerServer:
for host in self.__get_all_hosts():
self.cert_manager.create_or_update(host)
return 0
def token(self) -> int:
self.logger.info("Starting up...")
self.registry.load_pages()
self.token_manager.init()
self.token_manager.new_token()
return 0
+61
View File
@@ -0,0 +1,61 @@
import hashlib
import logging
import pathlib
import secrets
import typing
if typing.TYPE_CHECKING:
from . import params, registry
class TokenManager:
FILE = ".tokens"
def __init__(self, params: params.Parameters, registry: registry.Registry) -> None:
self.logger = logging.getLogger(self.__class__.__name__)
self.token_salt = params.token_salt
self.tokens_file = pathlib.Path(params.data_dir) / self.FILE
self.registry = registry
self.token_hashes: list[str] = []
def init(self) -> None:
self.logger.debug("Initializing...")
if not len(self.token_salt):
self.logger.warning(
"No salt provided, tokens will be cryptographically weak"
)
self.token_hashes = self.__load_hashes()
def is_valid(self, token: str) -> bool:
return self.__hash_token(token) in self.token_hashes
def is_valid_for_path(self, token: str, path: str) -> bool:
return (page := self.registry.get_from_path(path)) is None or (
page.token_hash is None or page.token_hash == self.__hash_token(token)
)
def set_token(self, token: str, path: str) -> None:
self.registry.set_token_hash(path, self.__hash_token(token))
def new_token(self) -> None:
new_token = secrets.token_hex(16)
self.token_hashes += [self.__hash_token(new_token)]
self.__save_hashes()
self.logger.warning("NEW TOKEN: %s", new_token)
self.logger.warning("Please copy this secret value before it disappears")
def __hash_token(self, token: str) -> str:
return hashlib.sha512(
(self.token_salt + token).encode(), usedforsecurity=True
).hexdigest()
def __load_hashes(self) -> list[str]:
if self.tokens_file.is_file():
with self.tokens_file.open() as file:
return [line.strip() for line in file]
return []
def __save_hashes(self) -> None:
with self.tokens_file.open(mode="w") as file:
file.write("\n".join(self.token_hashes))
self.logger.debug("Updated %s", self.tokens_file)