Compare commits
13 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b6d751a97a | |||
| 3f0490ebc9 | |||
| 04360b42d8 | |||
| 4edcc6acc7 | |||
| d9b559d13d | |||
| b0d98dd48b | |||
| 64f45e9779 | |||
| 2dd48042e7 | |||
| 74ceb0f677 | |||
| e7abe7924f | |||
| 8f7e4c8a91 | |||
| ab6879d54f | |||
| 8c93b9a015 |
+2
-1
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "stapler"
|
||||
version = "1.2.5"
|
||||
version = "1.3.2"
|
||||
description = "Static pages as simple as a gzip file"
|
||||
requires-python = ">=3.14"
|
||||
dependencies = [
|
||||
@@ -21,6 +21,7 @@ module-name = "stapler"
|
||||
[dependency-groups]
|
||||
dev = [
|
||||
"coverage>=7.13.5",
|
||||
"parameterized>=0.9.0",
|
||||
"pytest>=9.0.3",
|
||||
"ruff>=0.15.10",
|
||||
"ty>=0.0.29",
|
||||
|
||||
+12
-6
@@ -5,6 +5,8 @@ import ssl
|
||||
import subprocess
|
||||
import typing
|
||||
|
||||
from stapler.strings import valid_host
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from .params import Parameters
|
||||
|
||||
@@ -35,7 +37,7 @@ class CertManager:
|
||||
self.with_certbot: bool = params.with_certbot
|
||||
self.last_file_change: int | float = 0
|
||||
|
||||
def init(self, hosts: list[str]) -> None:
|
||||
def init(self) -> None:
|
||||
self.logger.debug("Initializing...")
|
||||
if not self.certbot_www.exists():
|
||||
self.certbot_www.mkdir(parents=True)
|
||||
@@ -43,8 +45,6 @@ class CertManager:
|
||||
if not self.self_signed_path.exists():
|
||||
self.self_signed_path.mkdir(parents=True)
|
||||
self.logger.debug("Created %s", self.self_signed_path)
|
||||
for host in hosts:
|
||||
self.init_cert(host)
|
||||
|
||||
def exists(self, host: str) -> bool:
|
||||
return self.__exists_certbot(host) or self.__exists_self_signed(host)
|
||||
@@ -57,7 +57,7 @@ class CertManager:
|
||||
|
||||
def create_or_update(self, host: str) -> bool:
|
||||
created = self.init_cert(host)
|
||||
if self.with_certbot and self.__create_certbot(host):
|
||||
if self.with_certbot and valid_host(host) and self.__create_certbot(host):
|
||||
return True
|
||||
return created or self.__create_self_signed(host)
|
||||
|
||||
@@ -185,11 +185,16 @@ class CertManager:
|
||||
return False
|
||||
return self.__exists_certbot(host)
|
||||
|
||||
def sni_callback(
|
||||
self, socket: ssl.SSLObject, host: str | None, _: ssl.SSLContext, /
|
||||
def servername_callback(
|
||||
self,
|
||||
socket: ssl.SSLSocket | ssl.SSLObject,
|
||||
host: str | None,
|
||||
_: ssl.SSLSocket,
|
||||
/,
|
||||
) -> None | int:
|
||||
if host is None:
|
||||
return None
|
||||
self.logger.debug("servername callback: %s", host)
|
||||
if not self.exists(host) and not self.create_or_update(host):
|
||||
return None
|
||||
cert_file = self.get_cert(host)
|
||||
@@ -200,6 +205,7 @@ class CertManager:
|
||||
cert_file,
|
||||
key_file,
|
||||
)
|
||||
new_context.set_alpn_protocols(["http/1.1"])
|
||||
socket.context = new_context
|
||||
except Exception:
|
||||
self.logger.exception("Could not create HTTPS context for %s", host)
|
||||
|
||||
+1
-1
@@ -16,7 +16,7 @@ class DataDir:
|
||||
]
|
||||
|
||||
PATH_REGEX = re.compile(r"^[\w-]+$")
|
||||
NEEDED_FILES: typing.ClassVar[list[str]] = ["favicon.ico"]
|
||||
NEEDED_FILES: typing.ClassVar[list[str]] = ["favicon.ico", "robots.txt"]
|
||||
|
||||
def __init__(self, root_path: str) -> None:
|
||||
self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)
|
||||
|
||||
+42
-27
@@ -16,6 +16,7 @@ import requests
|
||||
|
||||
from . import PKG_VERSION, STAPLER_ASCII, logs
|
||||
from .data_dir import DataDir
|
||||
from .strings import sanitize_string, valid_host
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from .page import Page
|
||||
@@ -25,8 +26,9 @@ if typing.TYPE_CHECKING:
|
||||
|
||||
|
||||
class BaseHandler(abc.ABC, http.server.BaseHTTPRequestHandler):
|
||||
SANITIZE_REGEX = re.compile(r"[^\x20-\x7F]+")
|
||||
timeout = 10
|
||||
protocol_version = "HTTP/1.1"
|
||||
REQUEST_COUNT = 0
|
||||
|
||||
@typing.override
|
||||
def __init__(
|
||||
@@ -41,6 +43,7 @@ class BaseHandler(abc.ABC, http.server.BaseHTTPRequestHandler):
|
||||
self.__host: str | None = None
|
||||
self.__in_size: int | None = None
|
||||
self.https: bool = params.https
|
||||
self.__class__.REQUEST_COUNT += 1
|
||||
super().__init__(*args, **kwargs)
|
||||
with contextlib.suppress(Exception):
|
||||
self.connection.close()
|
||||
@@ -80,7 +83,7 @@ class BaseHandler(abc.ABC, http.server.BaseHTTPRequestHandler):
|
||||
|
||||
@typing.override
|
||||
def address_string(self) -> str: # pragma: no cover
|
||||
return self.SANITIZE_REGEX.sub("?", super().address_string())
|
||||
return sanitize_string(super().address_string())
|
||||
|
||||
@typing.override
|
||||
def log_message(self, format: str, *args: typing.Any) -> None: # pragma: no cover
|
||||
@@ -92,9 +95,25 @@ class BaseHandler(abc.ABC, http.server.BaseHTTPRequestHandler):
|
||||
fmt = "%s - " + format
|
||||
self.logger.error(fmt, self.address_string(), *args)
|
||||
|
||||
def _pre_log_request(self) -> None: # pragma: no cover
|
||||
args = (
|
||||
"...",
|
||||
self.address_string(),
|
||||
self.host,
|
||||
format(self.__class__.REQUEST_COUNT, "07_d"),
|
||||
sanitize_string(self.requestline),
|
||||
)
|
||||
fmt = "← %s - %s - %s - %s - %s"
|
||||
if self.in_size > 0:
|
||||
args = (*args, self.in_size)
|
||||
fmt += " - %s"
|
||||
self.logger.debug(fmt, *args)
|
||||
|
||||
@typing.override
|
||||
def log_request(self, code: str = "?", size: str = "-") -> None: # ty:ignore[invalid-method-override] # pragma: no cover
|
||||
if isinstance(code, http.HTTPStatus):
|
||||
code = code.value
|
||||
if isinstance(code, int):
|
||||
color = logs.TermColor.RED
|
||||
if 100 <= code < 200:
|
||||
color = logs.TermColor.CYAN
|
||||
@@ -104,16 +123,17 @@ class BaseHandler(abc.ABC, http.server.BaseHTTPRequestHandler):
|
||||
color = logs.TermColor.BLUE
|
||||
elif 400 <= code < 500:
|
||||
color = logs.TermColor.YELLOW
|
||||
code = color + str(code.value) + logs.TermColor.RESET
|
||||
code = color + str(code) + logs.TermColor.RESET
|
||||
if size == "" and self.out_size > 0:
|
||||
size = str(self.out_size)
|
||||
args = (
|
||||
code,
|
||||
self.address_string(),
|
||||
self.host,
|
||||
self.SANITIZE_REGEX.sub("?", self.requestline),
|
||||
format(self.__class__.REQUEST_COUNT, "07_d"),
|
||||
sanitize_string(self.requestline),
|
||||
)
|
||||
fmt = "→ %s - %s - %s - %s"
|
||||
fmt = "→ %s - %s - %s - %s - %s"
|
||||
if size != "":
|
||||
args = (*args, size)
|
||||
fmt += " - %s"
|
||||
@@ -176,6 +196,7 @@ class BaseHandler(abc.ABC, http.server.BaseHTTPRequestHandler):
|
||||
headers=headers,
|
||||
allow_redirects=False,
|
||||
timeout=480,
|
||||
stream=False,
|
||||
)
|
||||
except Exception as e:
|
||||
self.send_error(
|
||||
@@ -236,19 +257,6 @@ class BaseHandler(abc.ABC, http.server.BaseHTTPRequestHandler):
|
||||
and len(self.headers[key]) > 0
|
||||
)
|
||||
|
||||
def _pre_log_request(self) -> None: # pragma: no cover
|
||||
args = (
|
||||
"...",
|
||||
self.address_string(),
|
||||
self.host,
|
||||
self.SANITIZE_REGEX.sub("?", self.requestline),
|
||||
)
|
||||
fmt = "← %s - %s - %s - %s"
|
||||
if self.in_size > 0:
|
||||
args = (*args, self.in_size)
|
||||
fmt += " - %s"
|
||||
self.logger.debug(fmt, *args)
|
||||
|
||||
def server_signature(self) -> str:
|
||||
return self.server_version + "\n\n" + STAPLER_ASCII + "\n"
|
||||
|
||||
@@ -268,7 +276,7 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, BaseHandler):
|
||||
UPDATE_PATH_REGEX = re.compile(r"^\/([\w-]+)\/?$")
|
||||
GET_PATH_REGEX = re.compile(r"^\/([\w-]+)($|\/)")
|
||||
HOST_PART_REGEX = re.compile(r"^([a-z0-9]|[a-z0-9][a-z0-9-]{,61}[a-z0-9])$")
|
||||
AUTHORIZED_PATHS: typing.ClassVar[list[str]] = ["/favicon.ico"]
|
||||
AUTHORIZED_PATHS: typing.ClassVar[list[str]] = ["/favicon.ico", "/robots.txt"]
|
||||
TOKEN_HEADER = "X-Token" # noqa: S105
|
||||
HOST_HEADER = "X-Host"
|
||||
HOST_ONLY_HEADER = "X-Host-Only"
|
||||
@@ -300,6 +308,8 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, BaseHandler):
|
||||
self.__target_spa: str | None = None
|
||||
try:
|
||||
super().__init__(*args, directory=params.data_dir, **kwargs, params=params) # ty:ignore[unknown-argument]
|
||||
except (BrokenPipeError, ConnectionResetError) as e:
|
||||
self.logger.error("Connection lost: %s", str(e)) # noqa: TRY400
|
||||
except:
|
||||
self.logger.exception("Could not handle request")
|
||||
|
||||
@@ -379,6 +389,7 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, BaseHandler):
|
||||
self._pre_log_request()
|
||||
if not self._proxy_or_redirect():
|
||||
super().do_HEAD()
|
||||
self.close_connection = True
|
||||
|
||||
@typing.override
|
||||
def do_GET(self) -> None:
|
||||
@@ -388,7 +399,9 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, BaseHandler):
|
||||
return None
|
||||
if self.path == "/" and self.host == self.default_host:
|
||||
return self.send_basic_body(self.server_signature())
|
||||
return super().do_GET()
|
||||
super().do_GET()
|
||||
self.close_connection = True
|
||||
return None
|
||||
|
||||
def do_PUT(self) -> None:
|
||||
with self.handle_errors():
|
||||
@@ -536,6 +549,11 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, BaseHandler):
|
||||
return super().translate_path(path)
|
||||
return ""
|
||||
if self.host != self.default_host:
|
||||
if (
|
||||
not (self.root_path / page.path / path).is_file()
|
||||
and path in self.AUTHORIZED_PATHS
|
||||
):
|
||||
return super().translate_path(path)
|
||||
path = f"/{page.path}" + path
|
||||
if pathlib.Path(path).name.startswith("."): # hidden files
|
||||
return ""
|
||||
@@ -573,7 +591,7 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, BaseHandler):
|
||||
f"Cannot use {self.HOST_ONLY_HEADER} with {self.HOST_HEADER}",
|
||||
)
|
||||
return None
|
||||
if self.has_target_host and not self.__valid_host(self.target_host):
|
||||
if self.has_target_host and not valid_host(self.target_host):
|
||||
self.send_error(http.HTTPStatus.BAD_REQUEST, "Invalid requested host")
|
||||
return None
|
||||
if self.has_target_proxy and self.has_target_redirect:
|
||||
@@ -596,12 +614,6 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, BaseHandler):
|
||||
return match.group(1)
|
||||
return None
|
||||
|
||||
def __valid_host(self, host: str) -> bool:
|
||||
return (
|
||||
all(self.HOST_PART_REGEX.fullmatch(part) for part in host.split("."))
|
||||
and len(host) < 256
|
||||
)
|
||||
|
||||
def __get_page(self, src_path: str) -> Page | None:
|
||||
if self.host == self.default_host:
|
||||
if (
|
||||
@@ -615,16 +627,19 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, BaseHandler):
|
||||
|
||||
|
||||
class UpgradeHandler(RequestHandler):
|
||||
protocol_version = "HTTP/1.0"
|
||||
server_version = "StaplerUpgradeServer/" + PKG_VERSION
|
||||
|
||||
def do_HEAD(self) -> None:
|
||||
with self.handle_errors():
|
||||
self._pre_log_request()
|
||||
self.send_redirect(f"https://{self.host}{self.path}")
|
||||
self.close_connection = True
|
||||
|
||||
def do_GET(self) -> None:
|
||||
with self.handle_errors():
|
||||
if self.path.startswith(self.CERTBOT_CHALLENGE_PATH):
|
||||
super().do_GET()
|
||||
self.close_connection = True
|
||||
else:
|
||||
self.do_HEAD()
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
User-agent: *
|
||||
Disallow: /
|
||||
+5
-7
@@ -29,7 +29,6 @@ class StaplerServer:
|
||||
"logger",
|
||||
"params",
|
||||
"registry",
|
||||
"server",
|
||||
"token_manager",
|
||||
]
|
||||
|
||||
@@ -41,7 +40,6 @@ class StaplerServer:
|
||||
self.token_manager: TokenManager = TokenManager(params, self.registry)
|
||||
self.data_dir: DataDir = DataDir(params.data_dir)
|
||||
self.default_host: str = params.host.split(":", maxsplit=2)[0]
|
||||
self.server: http.server.ThreadingHTTPServer | None = None
|
||||
|
||||
def __get_all_hosts(self) -> list[str]:
|
||||
return [self.default_host, *self.registry.get_hosts()]
|
||||
@@ -50,7 +48,7 @@ class StaplerServer:
|
||||
self.logger.info("Starting up...")
|
||||
self.registry.load_pages()
|
||||
if self.params.with_certificates:
|
||||
self.cert_manager.init(self.__get_all_hosts())
|
||||
self.cert_manager.init()
|
||||
self.data_dir.init()
|
||||
self.token_manager.init()
|
||||
|
||||
@@ -75,7 +73,7 @@ class StaplerServer:
|
||||
)
|
||||
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
|
||||
server.socket = context.wrap_socket(server.socket, server_side=True)
|
||||
context.sni_callback = self.cert_manager.sni_callback
|
||||
context.set_servername_callback(self.cert_manager.servername_callback)
|
||||
else:
|
||||
server = http.server.ThreadingHTTPServer(
|
||||
(
|
||||
@@ -131,7 +129,7 @@ class StaplerServer:
|
||||
for line in STAPLER_ASCII.split("\n"):
|
||||
self.logger.debug(line.ljust(36))
|
||||
self.__startup()
|
||||
self.server = self.__create_base_server()
|
||||
base_server = self.__create_base_server()
|
||||
upgrade_server = self.__start_upgrade_server() if self.params.https else None
|
||||
self.logger.info(
|
||||
"Server up and ready on %s://%s",
|
||||
@@ -140,7 +138,7 @@ class StaplerServer:
|
||||
)
|
||||
self.__start_background_tasks()
|
||||
with contextlib.suppress(KeyboardInterrupt):
|
||||
self.server.serve_forever()
|
||||
base_server.serve_forever()
|
||||
self.logger.info("Shutting down...")
|
||||
if upgrade_server is not None:
|
||||
upgrade_server.shutdown()
|
||||
@@ -152,7 +150,7 @@ class StaplerServer:
|
||||
self.logger.warning("Cannot renew without certificates")
|
||||
return 1
|
||||
self.registry.load_pages()
|
||||
self.cert_manager.init(self.__get_all_hosts())
|
||||
self.cert_manager.init()
|
||||
for host in self.__get_all_hosts():
|
||||
self.cert_manager.create_or_update(host)
|
||||
return 0
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
import re
|
||||
|
||||
__HOST_PART_REGEX = re.compile(r"^([a-z0-9]|[a-z0-9][a-z0-9-]{,61}[a-z0-9])$")
|
||||
__SANITIZE_REGEX = re.compile(r"[^\x20-\x7F]")
|
||||
|
||||
|
||||
def valid_host(host: str) -> bool:
|
||||
parts = host.split(".")
|
||||
return (
|
||||
len(parts) > 1
|
||||
and len(parts[-1]) > 1
|
||||
and all(__HOST_PART_REGEX.fullmatch(part) for part in parts)
|
||||
and not all(part.isnumeric() for part in parts)
|
||||
and len(host) < 256
|
||||
)
|
||||
|
||||
|
||||
def sanitize_string(raw: str) -> str:
|
||||
return __SANITIZE_REGEX.sub("?", raw)
|
||||
+60
-64
@@ -35,41 +35,33 @@ class TestRegistry(BaseTestCase):
|
||||
self.patch("shutil.which", count=0),
|
||||
self.patch("subprocess.check_output", count=0),
|
||||
):
|
||||
self.cert_manager.init([])
|
||||
self.cert_manager.init()
|
||||
assert self.self_signed_path.is_dir()
|
||||
assert self.certbot_www.is_dir()
|
||||
|
||||
def test_init_with_hosts(self) -> None:
|
||||
with (
|
||||
self.patch("shutil.which", count=0),
|
||||
self.patch("subprocess.check_output", count=0),
|
||||
):
|
||||
self._make_self_signed("localhost")
|
||||
self.cert_manager.init(["localhost"])
|
||||
|
||||
def test_exists_self_signed(self) -> None:
|
||||
self._make_self_signed("localhost")
|
||||
assert self.cert_manager.exists("localhost")
|
||||
self._make_self_signed("example.com")
|
||||
assert self.cert_manager.exists("example.com")
|
||||
|
||||
def test_exists_certbot(self) -> None:
|
||||
self._make_certbot("localhost")
|
||||
assert self.cert_manager.exists("localhost")
|
||||
self._make_certbot("example.com")
|
||||
assert self.cert_manager.exists("example.com")
|
||||
|
||||
def test_exists_fail(self) -> None:
|
||||
assert not self.cert_manager.exists("localhost")
|
||||
assert not self.cert_manager.exists("example.com")
|
||||
|
||||
def test_exists_fail_without_certbot(self) -> None:
|
||||
self.cert_manager.with_certbot = False
|
||||
self._make_certbot("localhost")
|
||||
assert not self.cert_manager.exists("localhost")
|
||||
self._make_certbot("example.com")
|
||||
assert not self.cert_manager.exists("example.com")
|
||||
|
||||
def test_init_cert_existing(self) -> None:
|
||||
with (
|
||||
self.patch("shutil.which", count=0),
|
||||
self.patch("subprocess.check_output", count=0),
|
||||
):
|
||||
self._make_self_signed("localhost")
|
||||
assert not self.cert_manager.init_cert("localhost")
|
||||
self._make_self_signed("example.com")
|
||||
assert not self.cert_manager.init_cert("example.com")
|
||||
|
||||
def test_init_cert_fail(self) -> None:
|
||||
with (
|
||||
@@ -77,7 +69,7 @@ class TestRegistry(BaseTestCase):
|
||||
self.patch("subprocess.check_output") as process_mock,
|
||||
):
|
||||
process_mock.side_effect = subprocess.CalledProcessError(1, "", output=b"")
|
||||
assert not self.cert_manager.init_cert("localhost")
|
||||
assert not self.cert_manager.init_cert("example.com")
|
||||
|
||||
def test_init_cert_new(self) -> None:
|
||||
with (
|
||||
@@ -85,135 +77,139 @@ class TestRegistry(BaseTestCase):
|
||||
self.patch("subprocess.check_output") as process_mock,
|
||||
):
|
||||
process_mock.side_effect = lambda *_, **__: self._make_self_signed(
|
||||
"localhost"
|
||||
"example.com"
|
||||
)
|
||||
assert self.cert_manager.init_cert("localhost")
|
||||
assert self.cert_manager.init_cert("example.com")
|
||||
|
||||
def test_create_or_update_existing_no_certbot(self) -> None:
|
||||
self._make_self_signed("localhost")
|
||||
self._make_self_signed("example.com")
|
||||
self.cert_manager.with_certbot = False
|
||||
with (
|
||||
self.patch("shutil.which", return_value=""),
|
||||
self.patch("subprocess.check_output") as process_mock,
|
||||
):
|
||||
process_mock.side_effect = lambda *_, **__: self._make_self_signed(
|
||||
"localhost"
|
||||
"example.com"
|
||||
)
|
||||
assert self.cert_manager.create_or_update("localhost")
|
||||
assert self.cert_manager.create_or_update("example.com")
|
||||
|
||||
def test_create_or_update_existing_certbot(self) -> None:
|
||||
self._make_certbot("localhost")
|
||||
self._make_certbot("example.com")
|
||||
with (
|
||||
self.patch("shutil.which", return_value=""),
|
||||
self.patch("subprocess.check_output") as process_mock,
|
||||
):
|
||||
process_mock.side_effect = lambda *_, **__: self._make_certbot("localhost")
|
||||
assert self.cert_manager.create_or_update("localhost")
|
||||
process_mock.side_effect = lambda *_, **__: self._make_certbot(
|
||||
"example.com"
|
||||
)
|
||||
assert self.cert_manager.create_or_update("example.com")
|
||||
|
||||
def test_create_or_update_existing_fail_both(self) -> None:
|
||||
self._make_certbot("localhost")
|
||||
self._make_certbot("example.com")
|
||||
with (
|
||||
self.patch("shutil.which", return_value="", count=2),
|
||||
self.patch("subprocess.check_output", count=2) as process_mock,
|
||||
):
|
||||
process_mock.side_effect = subprocess.CalledProcessError(1, "", output=b"")
|
||||
assert not self.cert_manager.create_or_update("localhost")
|
||||
assert not self.cert_manager.create_or_update("example.com")
|
||||
|
||||
def test_create_or_update_existing_fail_both_binary(self) -> None:
|
||||
self._make_certbot("localhost")
|
||||
self._make_certbot("example.com")
|
||||
with (
|
||||
self.patch("shutil.which", count=2),
|
||||
self.patch("subprocess.check_output", count=0),
|
||||
):
|
||||
assert not self.cert_manager.create_or_update("localhost")
|
||||
assert not self.cert_manager.create_or_update("example.com")
|
||||
|
||||
def test_get_cert_certbot(self) -> None:
|
||||
self._make_certbot("localhost")
|
||||
self._make_certbot("example.com")
|
||||
self.assertEqual(
|
||||
self.cert_manager.get_cert("localhost"),
|
||||
self.certbot_conf / "live" / "localhost" / CertManager.CRT_FILE,
|
||||
self.cert_manager.get_cert("example.com"),
|
||||
self.certbot_conf / "live" / "example.com" / CertManager.CRT_FILE,
|
||||
)
|
||||
|
||||
def test_get_cert_self_signed(self) -> None:
|
||||
self._make_self_signed("localhost")
|
||||
self._make_self_signed("example.com")
|
||||
self.assertEqual(
|
||||
self.cert_manager.get_cert("localhost"),
|
||||
self.self_signed_path / "localhost" / CertManager.CRT_FILE,
|
||||
self.cert_manager.get_cert("example.com"),
|
||||
self.self_signed_path / "example.com" / CertManager.CRT_FILE,
|
||||
)
|
||||
|
||||
def test_get_cert_fail(self) -> None:
|
||||
self.assertRaises(
|
||||
CertManagerError,
|
||||
lambda: self.cert_manager.get_cert("localhost"),
|
||||
lambda: self.cert_manager.get_cert("example.com"),
|
||||
)
|
||||
|
||||
def test_get_key_certbot(self) -> None:
|
||||
self._make_certbot("localhost")
|
||||
self._make_certbot("example.com")
|
||||
self.assertEqual(
|
||||
self.cert_manager.get_key("localhost"),
|
||||
self.certbot_conf / "live" / "localhost" / CertManager.KEY_FILE,
|
||||
self.cert_manager.get_key("example.com"),
|
||||
self.certbot_conf / "live" / "example.com" / CertManager.KEY_FILE,
|
||||
)
|
||||
|
||||
def test_get_key_self_signed(self) -> None:
|
||||
self._make_self_signed("localhost")
|
||||
self._make_self_signed("example.com")
|
||||
self.assertEqual(
|
||||
self.cert_manager.get_key("localhost"),
|
||||
self.self_signed_path / "localhost" / CertManager.KEY_FILE,
|
||||
self.cert_manager.get_key("example.com"),
|
||||
self.self_signed_path / "example.com" / CertManager.KEY_FILE,
|
||||
)
|
||||
|
||||
def test_get_key_fail(self) -> None:
|
||||
self.assertRaises(
|
||||
CertManagerError,
|
||||
lambda: self.cert_manager.get_key("localhost"),
|
||||
lambda: self.cert_manager.get_key("example.com"),
|
||||
)
|
||||
|
||||
def test_sni_callback_no_host(self) -> None:
|
||||
self._make_self_signed("localhost")
|
||||
def test_servername_callback_no_host(self) -> None:
|
||||
self._make_self_signed("example.com")
|
||||
with (
|
||||
self.patch("ssl.create_default_context", count=0),
|
||||
):
|
||||
self.cert_manager.sni_callback(self.socket_mock, None, self.context_mock)
|
||||
self.cert_manager.servername_callback(
|
||||
self.socket_mock, None, self.context_mock
|
||||
)
|
||||
|
||||
def test_sni_callback_fail(self) -> None:
|
||||
self._make_self_signed("localhost")
|
||||
def test_servername_callback_fail(self) -> None:
|
||||
self._make_self_signed("example.com")
|
||||
with (
|
||||
self.patch("shutil.which", count=3),
|
||||
self.patch("ssl.create_default_context", count=0),
|
||||
):
|
||||
self.cert_manager.sni_callback(
|
||||
self.socket_mock, "new_host", self.context_mock
|
||||
self.cert_manager.servername_callback(
|
||||
self.socket_mock, "example.fr", self.context_mock
|
||||
)
|
||||
|
||||
def test_sni_callback_create_context(self) -> None:
|
||||
self._make_self_signed("localhost")
|
||||
def test_servername_callback_create_context(self) -> None:
|
||||
self._make_self_signed("example.com")
|
||||
with (
|
||||
self.patch("ssl.create_default_context", return_value=self.context_mock),
|
||||
self.mock_call(
|
||||
self.context_mock.load_cert_chain,
|
||||
[
|
||||
self.self_signed_path / "localhost" / CertManager.CRT_FILE,
|
||||
self.self_signed_path / "localhost" / CertManager.KEY_FILE,
|
||||
self.self_signed_path / "example.com" / CertManager.CRT_FILE,
|
||||
self.self_signed_path / "example.com" / CertManager.KEY_FILE,
|
||||
],
|
||||
),
|
||||
self.patch("shutil.which", count=0),
|
||||
):
|
||||
self.cert_manager.sni_callback(
|
||||
self.socket_mock, "localhost", self.context_mock
|
||||
self.cert_manager.servername_callback(
|
||||
self.socket_mock, "example.com", self.context_mock
|
||||
)
|
||||
|
||||
def test_sni_callback_create_context_fail(self) -> None:
|
||||
self._make_self_signed("localhost")
|
||||
def test_servername_callback_create_context_fail(self) -> None:
|
||||
self._make_self_signed("example.com")
|
||||
with (
|
||||
self.patch("ssl.create_default_context", return_value=self.context_mock),
|
||||
self.patch("shutil.which", count=0),
|
||||
):
|
||||
self.context_mock.load_cert_chain.side_effect = Exception
|
||||
self.cert_manager.sni_callback(
|
||||
self.socket_mock, "localhost", self.context_mock
|
||||
self.cert_manager.servername_callback(
|
||||
self.socket_mock, "example.com", self.context_mock
|
||||
)
|
||||
self.context_mock.load_cert_chain.assert_called_once_with(
|
||||
self.self_signed_path / "localhost" / CertManager.CRT_FILE,
|
||||
self.self_signed_path / "localhost" / CertManager.KEY_FILE,
|
||||
self.self_signed_path / "example.com" / CertManager.CRT_FILE,
|
||||
self.self_signed_path / "example.com" / CertManager.KEY_FILE,
|
||||
)
|
||||
|
||||
def _make_self_signed(self, host: str) -> None:
|
||||
|
||||
@@ -177,6 +177,21 @@ class TestRequestHandler(BaseHandlerTestCase):
|
||||
token_manager=self.token_manager,
|
||||
)
|
||||
|
||||
def test_handle_disconnect_silently(self) -> None:
|
||||
with self.patch("http.server.BaseHTTPRequestHandler.__init__") as mock:
|
||||
mock.side_effect = BrokenPipeError
|
||||
logging.basicConfig(level=logging.CRITICAL)
|
||||
RequestHandler(
|
||||
unittest.mock.MagicMock(),
|
||||
"127.0.0.1",
|
||||
unittest.mock.MagicMock(),
|
||||
params=Parameters(
|
||||
data_dir=self.get_tmp_dir(), certbot_www=str(self.certbot_www)
|
||||
),
|
||||
registry=self.registry,
|
||||
token_manager=self.token_manager,
|
||||
)
|
||||
|
||||
def test_do_head_forward(self) -> None:
|
||||
handler = self._get_handler()
|
||||
with (
|
||||
@@ -844,6 +859,7 @@ class TestRequestHandler(BaseHandlerTestCase):
|
||||
},
|
||||
"allow_redirects": False,
|
||||
"timeout": 480,
|
||||
"stream": False,
|
||||
},
|
||||
),
|
||||
self.expects_status_only(handler, 200, "OK"),
|
||||
@@ -888,6 +904,7 @@ class TestRequestHandler(BaseHandlerTestCase):
|
||||
},
|
||||
"allow_redirects": False,
|
||||
"timeout": 480,
|
||||
"stream": False,
|
||||
},
|
||||
),
|
||||
self.expects_status_only(handler, 200, "OK"),
|
||||
@@ -930,6 +947,7 @@ class TestRequestHandler(BaseHandlerTestCase):
|
||||
},
|
||||
"allow_redirects": False,
|
||||
"timeout": 480,
|
||||
"stream": False,
|
||||
},
|
||||
),
|
||||
self.expects_basic_body(handler, "hello", message="OK"),
|
||||
@@ -964,6 +982,7 @@ class TestRequestHandler(BaseHandlerTestCase):
|
||||
},
|
||||
"allow_redirects": False,
|
||||
"timeout": 480,
|
||||
"stream": False,
|
||||
},
|
||||
) as request_mock,
|
||||
self.expects_status_only(
|
||||
@@ -1007,6 +1026,7 @@ class TestRequestHandler(BaseHandlerTestCase):
|
||||
},
|
||||
"allow_redirects": False,
|
||||
"timeout": 480,
|
||||
"stream": False,
|
||||
},
|
||||
),
|
||||
self.expects_status_only(handler, 200, "OK"),
|
||||
@@ -1047,6 +1067,7 @@ class TestRequestHandler(BaseHandlerTestCase):
|
||||
},
|
||||
"allow_redirects": False,
|
||||
"timeout": 480,
|
||||
"stream": False,
|
||||
},
|
||||
),
|
||||
self.expects_status_only(handler, 200, "OK"),
|
||||
@@ -1177,6 +1198,21 @@ class TestRequestHandler(BaseHandlerTestCase):
|
||||
None,
|
||||
)
|
||||
|
||||
def test_translate_path_with_host_favicon(self) -> None:
|
||||
handler = self._get_handler(headers={"Host": "example.com"})
|
||||
with (
|
||||
self.mock_call(self.registry.get_from_host, ["example.com"], Page("path")),
|
||||
self.patch_call(
|
||||
"http.server.SimpleHTTPRequestHandler.translate_path",
|
||||
["/favicon.ico"],
|
||||
),
|
||||
self.seal_mocks(),
|
||||
):
|
||||
self.assertEqual(
|
||||
handler.translate_path("/favicon.ico"),
|
||||
None,
|
||||
)
|
||||
|
||||
def test_translate_path_default_host(self) -> None:
|
||||
handler = self._get_handler()
|
||||
with (
|
||||
|
||||
@@ -26,10 +26,8 @@ class TestStaplerServer(BaseTestCase):
|
||||
def test_renew(self) -> None:
|
||||
with (
|
||||
self.mock_call(self.registry.load_pages),
|
||||
self.mock_calls(
|
||||
self.registry.get_hosts, [[], []], [["host_1"], ["host_1"]]
|
||||
),
|
||||
self.mock_call(self.cert_manager.init, [["localhost", "host_1"]]),
|
||||
self.mock_calls(self.registry.get_hosts, [[]], [["host_1"]]),
|
||||
self.mock_call(self.cert_manager.init),
|
||||
self.mock_calls(
|
||||
self.cert_manager.create_or_update, [["localhost"], ["host_1"]]
|
||||
),
|
||||
@@ -67,16 +65,16 @@ class TestStaplerServer(BaseTestCase):
|
||||
|
||||
def test_run_https(self) -> None:
|
||||
self.token_manager.detect_file_change.side_effect = KeyboardInterrupt
|
||||
self.cert_manager.sni_callback = unittest.mock.Mock()
|
||||
self.cert_manager.servername_callback = unittest.mock.Mock()
|
||||
with (
|
||||
self.mock_call(self.registry.load_pages),
|
||||
self.mock_call(self.registry.get_hosts, [], []),
|
||||
self.mock_call(self.cert_manager.init, [["localhost"]]),
|
||||
self.mock_call(self.cert_manager.init),
|
||||
self.mock_call(self.data_dir.init),
|
||||
self.mock_call(self.token_manager.init),
|
||||
self.patch("ssl.create_default_context", return_value=self.context_mock),
|
||||
self.patch("http.server.ThreadingHTTPServer", self.server_mock, 2),
|
||||
self.mock_call_unchecked(self.context_mock.wrap_socket),
|
||||
self.mock_call_unchecked(self.context_mock.set_servername_callback),
|
||||
self.mock_calls_unchecked(self.server_mock.serve_forever, 2),
|
||||
self.mock_call(self.server_mock.shutdown),
|
||||
self.seal_mocks(),
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
import parameterized
|
||||
|
||||
from stapler.strings import sanitize_string, valid_host
|
||||
|
||||
from . import BaseTestCase
|
||||
|
||||
|
||||
class TestStrings(BaseTestCase):
|
||||
def test_sanitize(self) -> None:
|
||||
self.assertEqual("??A??", sanitize_string("\n\tA\x00\x99"))
|
||||
|
||||
@parameterized.parameterized.expand(
|
||||
[("example.com"), ("test-test.com"), ("subdomain.example.com")]
|
||||
)
|
||||
def test_valid_host(self, host: str) -> None:
|
||||
self.assertTrue(valid_host(host), host)
|
||||
|
||||
@parameterized.parameterized.expand(
|
||||
[("example.c"), ("localhost"), ("127.0.0.1"), ("test..com"), ("www-.test.com")]
|
||||
)
|
||||
def test_invalid_host(self, host: str) -> None:
|
||||
self.assertFalse(valid_host(host), host)
|
||||
@@ -127,6 +127,15 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/df/b2/87e62e8c3e2f4b32e5fe99e0b86d576da1312593b39f47d8ceef365e95ed/packaging-26.2-py3-none-any.whl", hash = "sha256:5fc45236b9446107ff2415ce77c807cee2862cb6fac22b8a73826d0693b0980e", size = 100195, upload-time = "2026-04-24T20:15:22.081Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parameterized"
|
||||
version = "0.9.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/ea/49/00c0c0cc24ff4266025a53e41336b79adaa5a4ebfad214f433d623f9865e/parameterized-0.9.0.tar.gz", hash = "sha256:7fc905272cefa4f364c1a3429cbbe9c0f98b793988efb5bf90aac80f08db09b1", size = 24351, upload-time = "2023-03-27T02:01:11.592Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/00/2f/804f58f0b856ab3bf21617cccf5b39206e6c4c94c2cd227bde125ea6105f/parameterized-0.9.0-py2.py3-none-any.whl", hash = "sha256:4e0758e3d41bea3bbd05ec14fc2c24736723f243b28d702081aef438c9372b1b", size = 20475, upload-time = "2023-03-27T02:01:09.31Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pluggy"
|
||||
version = "1.6.0"
|
||||
@@ -203,7 +212,7 @@ wheels = [
|
||||
|
||||
[[package]]
|
||||
name = "stapler"
|
||||
version = "1.2.5"
|
||||
version = "1.3.2"
|
||||
source = { editable = "." }
|
||||
dependencies = [
|
||||
{ name = "requests" },
|
||||
@@ -212,6 +221,7 @@ dependencies = [
|
||||
[package.dev-dependencies]
|
||||
dev = [
|
||||
{ name = "coverage" },
|
||||
{ name = "parameterized" },
|
||||
{ name = "pytest" },
|
||||
{ name = "ruff" },
|
||||
{ name = "ty" },
|
||||
@@ -223,6 +233,7 @@ requires-dist = [{ name = "requests", specifier = ">=2.33.1" }]
|
||||
[package.metadata.requires-dev]
|
||||
dev = [
|
||||
{ name = "coverage", specifier = ">=7.13.5" },
|
||||
{ name = "parameterized", specifier = ">=0.9.0" },
|
||||
{ name = "pytest", specifier = ">=9.0.3" },
|
||||
{ name = "ruff", specifier = ">=0.15.10" },
|
||||
{ name = "ty", specifier = ">=0.0.29" },
|
||||
|
||||
Reference in New Issue
Block a user