6 Commits

Author SHA1 Message Date
klemek 225b13a7e3 chore: release 1.5.1
Python Lint CI / ruff (push) Successful in 2m1s
Docker CI / docker-build (push) Successful in 3m13s
Python Lint CI / ruff-format-check (push) Successful in 1m39s
Python Lint CI / ty (push) Successful in 1m57s
Python Test CI / coverage (push) Successful in 1m38s
2026-06-03 19:51:21 +02:00
klemek c186668208 fix: check loopback before cert creation 2026-06-03 19:51:08 +02:00
klemek 5f95afd0a3 chore: release 1.5.0
Python Lint CI / ruff (push) Successful in 1m59s
Docker CI / docker-build (push) Successful in 3m6s
Python Lint CI / ruff-format-check (push) Successful in 1m50s
Python Lint CI / ty (push) Successful in 3m57s
Python Test CI / coverage (push) Successful in 3m21s
2026-06-03 19:32:58 +02:00
klemek 132cbe9b20 feat: rate limit 2026-06-03 19:32:18 +02:00
klemek ada69f773e fix: force upgrade for all http methods
Python Lint CI / ruff-format-check (push) Successful in 1m2s
Python Lint CI / ruff (push) Successful in 1m4s
Python Lint CI / ty (push) Successful in 1m10s
Docker CI / docker-build (push) Successful in 2m32s
Python Test CI / coverage (push) Successful in 3m26s
2026-06-03 18:59:59 +02:00
klemek c5f3e53d88 feat: stapler challenge for self discovery 2026-06-03 18:56:07 +02:00
6 changed files with 155 additions and 20 deletions
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "stapler"
version = "1.4.2"
version = "1.5.1"
description = "Static pages as simple as a gzip file"
requires-python = ">=3.14"
dependencies = [
+17 -1
View File
@@ -5,6 +5,8 @@ import ssl
import subprocess
import typing
import requests
from stapler.strings import valid_host
if typing.TYPE_CHECKING:
@@ -49,6 +51,18 @@ class CertManager:
def exists(self, host: str) -> bool:
return self.__exists_certbot(host) or self.__exists_self_signed(host)
def valid_host(self, host: str) -> bool:
try:
response = requests.head(
url=f"http://{host}/.well-known/stapler",
allow_redirects=True,
timeout=5,
stream=False,
)
return type(response.status_code) is int and response.status_code < 400
except Exception:
return False
def init_cert(self, host: str) -> bool:
if not self.exists(host):
return self.__create_self_signed(host)
@@ -196,7 +210,9 @@ class CertManager:
if host is None or not valid_host(host):
return None
self.logger.debug("servername callback: %s", host)
if not self.exists(host) and not self.create_or_update(host):
if not self.exists(host) and (
not self.valid_host(host) or not self.create_or_update(host)
):
return None
cert_file = self.get_cert(host)
key_file = self.get_key(host)
+55 -2
View File
@@ -1,5 +1,6 @@
import abc
import contextlib
import datetime
import http
import http.cookiejar
import http.server
@@ -289,6 +290,7 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, BaseHandler):
protocol_version = "HTTP/1.1"
server_version = "StaplerServer/" + PKG_VERSION
CERTBOT_CHALLENGE_PATH = "/.well-known/acme-challenge"
STAPLER_CHALLENGE_PATH = "/.well-known/stapler"
UPDATE_PATH_REGEX = re.compile(r"^\/([\w-]+)\/?$")
GET_PATH_REGEX = re.compile(r"^\/([\w-]+)($|\/)")
HOST_PART_REGEX = re.compile(r"^([a-z0-9]|[a-z0-9][a-z0-9-]{,61}[a-z0-9])$")
@@ -299,6 +301,7 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, BaseHandler):
REDIRECT_HEADER = "X-Redirect"
PROXY_HEADER = "X-Proxy"
SPA_HEADER = "X-SPA"
RATE_LIMIT = datetime.timedelta(seconds=1)
@typing.override
def __init__(
@@ -322,6 +325,7 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, BaseHandler):
self.__target_redirect: str | None = None
self.__target_proxy: str | None = None
self.__target_spa: str | None = None
self.rate_limits: dict[str, datetime.datetime] = {}
try:
super().__init__(*args, directory=params.data_dir, **kwargs, params=params) # ty:ignore[unknown-argument]
except (BrokenPipeError, ConnectionResetError) as e:
@@ -405,6 +409,8 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, BaseHandler):
self._pre_log_request()
if self._proxy_or_redirect():
return None
if self._is_stapler_challenge(self.path):
return self.send_status_only(http.HTTPStatus.NO_CONTENT)
if self.path == "/" and self.host == self.default_host:
return self.send_basic_body(self.server_signature())
super().do_HEAD()
@@ -417,6 +423,8 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, BaseHandler):
self._pre_log_request()
if self._proxy_or_redirect():
return None
if self._is_stapler_challenge(self.path):
return self.send_status_only(http.HTTPStatus.NO_CONTENT)
if self.path == "/" and self.host == self.default_host:
return self.send_basic_body(self.server_signature())
super().do_GET()
@@ -545,7 +553,11 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, BaseHandler):
return True
def _proxy_or_redirect(self) -> bool:
if self.has_token or self._is_certbot_challenge(self.path):
if (
self.has_token
or self._is_certbot_challenge(self.path)
or self._is_stapler_challenge(self.path)
):
return False
if (page := self.__get_page(self.path)) is None:
return False
@@ -570,6 +582,9 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, BaseHandler):
self.certbot_www + path
).resolve().is_relative_to(self.certbot_www)
def _is_stapler_challenge(self, path: str) -> bool:
return path.startswith(self.STAPLER_CHALLENGE_PATH)
@typing.override
def translate_path(self, path: str) -> str:
if self._is_certbot_challenge(path):
@@ -596,7 +611,20 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, BaseHandler):
path = f"/{page.path}/{page.spa}"
return super().translate_path(path)
def __check_rate_limit(self) -> bool:
now = datetime.datetime.now(tz=datetime.UTC)
address = self.address_string()
last_atempt = self.rate_limits.get(address, None)
self.rate_limits[address] = now
return last_atempt is None or now - last_atempt > self.RATE_LIMIT
def __clear_rate_limit(self) -> None:
del self.rate_limits[self.address_string()]
def __check_update_request(self) -> str | None:
if not self.__check_rate_limit():
self.send_error(http.HTTPStatus.TOO_MANY_REQUESTS, "Rate limit exceeded")
return None
if not self._has_header(self.TOKEN_HEADER):
self.send_error(
http.HTTPStatus.BAD_REQUEST, f"No {self.TOKEN_HEADER} header in request"
@@ -605,6 +633,7 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, BaseHandler):
if not self.token_manager.is_valid(self.token):
self.send_error(http.HTTPStatus.UNAUTHORIZED, "Invalid token")
return None
self.__clear_rate_limit()
if (sub_path := self.__get_path(self.path, self.UPDATE_PATH_REGEX)) is None:
self.send_error(http.HTTPStatus.BAD_REQUEST, "Invalid path")
return None
@@ -664,7 +693,10 @@ class UpgradeHandler(RequestHandler):
def do_HEAD(self) -> None:
with self.handle_errors():
self._pre_log_request()
self.send_redirect(f"https://{self.host}{self.path}")
if self._is_stapler_challenge(self.path):
self.send_status_only(http.HTTPStatus.NO_CONTENT)
else:
self.send_redirect(f"https://{self.host}{self.path}")
self.close_connection = True
def do_GET(self) -> None:
@@ -674,3 +706,24 @@ class UpgradeHandler(RequestHandler):
self.close_connection = True
else:
self.do_HEAD()
def do_POST(self) -> None:
self.do_HEAD()
def do_PUT(self) -> None:
self.do_HEAD()
def do_DELETE(self) -> None:
self.do_HEAD()
def do_OPTIONS(self) -> None:
self.do_HEAD()
def do_PATCH(self) -> None:
self.do_HEAD()
def do_CONNECT(self) -> None:
self.do_HEAD()
def do_TRACE(self) -> None:
self.do_HEAD()
+17 -1
View File
@@ -4,6 +4,8 @@ import subprocess
import typing
import unittest.mock
import requests
from stapler.cert_manager import CertManager, CertManagerError
from stapler.params import Parameters
@@ -170,10 +172,24 @@ class TestRegistry(BaseTestCase):
self.socket_mock, None, self.context_mock
)
def test_servername_callback_fail(self) -> None:
def test_servername_callback_fail_no_valid_host(self) -> None:
self._make_self_signed("example.com")
with (
self.patch("requests.head") as request_mock,
self.patch("ssl.create_default_context", count=0),
):
request_mock.side_effect = Exception()
self.cert_manager.servername_callback(
self.socket_mock, "example.fr", self.context_mock
)
def test_servername_callback_fail_no_binaries(self) -> None:
self._make_self_signed("example.com")
response = requests.Response()
response.status_code = 200
with (
self.patch("shutil.which", count=3),
self.patch("requests.head", response),
self.patch("ssl.create_default_context", count=0),
):
self.cert_manager.servername_callback(
+64 -14
View File
@@ -1,6 +1,7 @@
import abc
import collections
import contextlib
import datetime
import http
import http.server
import io
@@ -204,6 +205,17 @@ class TestRequestHandler(BaseHandlerTestCase):
):
handler.do_HEAD()
def test_do_head_stapler(self) -> None:
handler = self._get_handler("/.well-known/stapler/something")
with (
self.expects_status_only(
handler,
http.HTTPStatus.NO_CONTENT,
),
self.seal_mocks(),
):
handler.do_HEAD()
def test_do_head_forward(self) -> None:
handler = self._get_handler("/file")
with (
@@ -222,6 +234,17 @@ class TestRequestHandler(BaseHandlerTestCase):
):
handler.do_GET()
def test_do_get_stapler(self) -> None:
handler = self._get_handler("/.well-known/stapler/something")
with (
self.expects_status_only(
handler,
http.HTTPStatus.NO_CONTENT,
),
self.seal_mocks(),
):
handler.do_GET()
def test_do_get_forward_on_other_path(self) -> None:
handler = self._get_handler("/file")
with (
@@ -253,6 +276,7 @@ class TestRequestHandler(BaseHandlerTestCase):
self.seal_mocks(),
):
handler.do_PUT()
assert "127.0.0.1" in handler.rate_limits
def test_do_post_is_do_put(self) -> None:
handler = self._get_handler("/path")
@@ -284,6 +308,18 @@ class TestRequestHandler(BaseHandlerTestCase):
self.seal_mocks(),
):
handler.do_PUT()
assert "127.0.0.1" in handler.rate_limits
def test_do_put_rate_limit(self) -> None:
handler = self._get_handler("/path", {"X-Token": "secret"})
handler.rate_limits["127.0.0.1"] = datetime.datetime.now(tz=datetime.UTC)
with (
self.expects_error(
handler, http.HTTPStatus.TOO_MANY_REQUESTS, "Rate limit exceeded"
),
self.seal_mocks(),
):
handler.do_PUT()
def test_do_put_invalid_path(self) -> None:
handler = self._get_handler("/pa.th", {"X-Token": "secret"})
@@ -770,6 +806,7 @@ class TestRequestHandler(BaseHandlerTestCase):
self.seal_mocks(),
):
handler.do_DELETE()
assert "127.0.0.1" in handler.rate_limits
def test_do_delete_invalid_token(self) -> None:
handler = self._get_handler("/path", {"X-Token": "secret"})
@@ -779,6 +816,7 @@ class TestRequestHandler(BaseHandlerTestCase):
self.seal_mocks(),
):
handler.do_DELETE()
assert "127.0.0.1" in handler.rate_limits
def test_do_delete_invalid_path(self) -> None:
handler = self._get_handler("/pa.th", {"X-Token": "secret"})
@@ -1336,21 +1374,22 @@ class TestUpgradeHandler(BaseHandlerTestCase):
handler.data_dir = self.data_dir
return handler
def test_do_get(self) -> None:
def test_do_upgrade(self) -> None:
handler = self._get_handler("/file")
with (
self.expects_status_only(
handler,
http.HTTPStatus.MOVED_PERMANENTLY,
headers={"Location": "https://localhost/file"},
),
self.patch(
"http.server.SimpleHTTPRequestHandler.do_GET",
count=0,
),
self.seal_mocks(),
):
handler.do_GET()
for method in [method.value for method in http.HTTPMethod]:
with (
self.subTest(None, method=method),
self.expects_status_only(
handler,
http.HTTPStatus.MOVED_PERMANENTLY,
headers={"Location": "https://localhost/file"},
),
self.patch(
f"http.server.SimpleHTTPRequestHandler.do_{method}", count=0
),
self.seal_mocks(),
):
getattr(handler, f"do_{method}")()
def test_do_get_certbot(self) -> None:
handler = self._get_handler("/.well-known/acme-challenge/abcde")
@@ -1373,3 +1412,14 @@ class TestUpgradeHandler(BaseHandlerTestCase):
self.seal_mocks(),
):
handler.do_HEAD()
def test_do_head_stapler(self) -> None:
handler = self._get_handler("/.well-known/stapler/something")
with (
self.expects_status_only(
handler,
http.HTTPStatus.NO_CONTENT,
),
self.seal_mocks(),
):
handler.do_HEAD()
Generated
+1 -1
View File
@@ -212,7 +212,7 @@ wheels = [
[[package]]
name = "stapler"
version = "1.4.2"
version = "1.5.1"
source = { editable = "." }
dependencies = [
{ name = "requests" },