Compare commits
9 Commits
2dd48042e7
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 1e5f3ba986 | |||
| d3d98bd9b2 | |||
| b6d751a97a | |||
| 3f0490ebc9 | |||
| 04360b42d8 | |||
| 4edcc6acc7 | |||
| d9b559d13d | |||
| b0d98dd48b | |||
| 64f45e9779 |
+1
-1
@@ -1,6 +1,6 @@
|
|||||||
[project]
|
[project]
|
||||||
name = "stapler"
|
name = "stapler"
|
||||||
version = "1.2.8"
|
version = "1.4.0"
|
||||||
description = "Static pages as simple as a gzip file"
|
description = "Static pages as simple as a gzip file"
|
||||||
requires-python = ">=3.14"
|
requires-python = ">=3.14"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ class CertManager:
|
|||||||
self.with_certbot: bool = params.with_certbot
|
self.with_certbot: bool = params.with_certbot
|
||||||
self.last_file_change: int | float = 0
|
self.last_file_change: int | float = 0
|
||||||
|
|
||||||
def init(self, hosts: list[str]) -> None:
|
def init(self) -> None:
|
||||||
self.logger.debug("Initializing...")
|
self.logger.debug("Initializing...")
|
||||||
if not self.certbot_www.exists():
|
if not self.certbot_www.exists():
|
||||||
self.certbot_www.mkdir(parents=True)
|
self.certbot_www.mkdir(parents=True)
|
||||||
@@ -45,8 +45,6 @@ class CertManager:
|
|||||||
if not self.self_signed_path.exists():
|
if not self.self_signed_path.exists():
|
||||||
self.self_signed_path.mkdir(parents=True)
|
self.self_signed_path.mkdir(parents=True)
|
||||||
self.logger.debug("Created %s", self.self_signed_path)
|
self.logger.debug("Created %s", self.self_signed_path)
|
||||||
for host in hosts:
|
|
||||||
self.init_cert(host)
|
|
||||||
|
|
||||||
def exists(self, host: str) -> bool:
|
def exists(self, host: str) -> bool:
|
||||||
return self.__exists_certbot(host) or self.__exists_self_signed(host)
|
return self.__exists_certbot(host) or self.__exists_self_signed(host)
|
||||||
@@ -187,11 +185,16 @@ class CertManager:
|
|||||||
return False
|
return False
|
||||||
return self.__exists_certbot(host)
|
return self.__exists_certbot(host)
|
||||||
|
|
||||||
def sni_callback(
|
def servername_callback(
|
||||||
self, socket: ssl.SSLObject, host: str | None, _: ssl.SSLContext, /
|
self,
|
||||||
|
socket: ssl.SSLSocket | ssl.SSLObject,
|
||||||
|
host: str | None,
|
||||||
|
_: ssl.SSLSocket,
|
||||||
|
/,
|
||||||
) -> None | int:
|
) -> None | int:
|
||||||
if host is None:
|
if host is None:
|
||||||
return None
|
return None
|
||||||
|
self.logger.debug("servername callback: %s", host)
|
||||||
if not self.exists(host) and not self.create_or_update(host):
|
if not self.exists(host) and not self.create_or_update(host):
|
||||||
return None
|
return None
|
||||||
cert_file = self.get_cert(host)
|
cert_file = self.get_cert(host)
|
||||||
@@ -202,6 +205,7 @@ class CertManager:
|
|||||||
cert_file,
|
cert_file,
|
||||||
key_file,
|
key_file,
|
||||||
)
|
)
|
||||||
|
new_context.set_alpn_protocols(["http/1.1"])
|
||||||
socket.context = new_context
|
socket.context = new_context
|
||||||
except Exception:
|
except Exception:
|
||||||
self.logger.exception("Could not create HTTPS context for %s", host)
|
self.logger.exception("Could not create HTTPS context for %s", host)
|
||||||
|
|||||||
+1
-1
@@ -16,7 +16,7 @@ class DataDir:
|
|||||||
]
|
]
|
||||||
|
|
||||||
PATH_REGEX = re.compile(r"^[\w-]+$")
|
PATH_REGEX = re.compile(r"^[\w-]+$")
|
||||||
NEEDED_FILES: typing.ClassVar[list[str]] = ["favicon.ico"]
|
NEEDED_FILES: typing.ClassVar[list[str]] = ["favicon.ico", "robots.txt"]
|
||||||
|
|
||||||
def __init__(self, root_path: str) -> None:
|
def __init__(self, root_path: str) -> None:
|
||||||
self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)
|
self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)
|
||||||
|
|||||||
+27
-7
@@ -27,6 +27,7 @@ if typing.TYPE_CHECKING:
|
|||||||
|
|
||||||
class BaseHandler(abc.ABC, http.server.BaseHTTPRequestHandler):
|
class BaseHandler(abc.ABC, http.server.BaseHTTPRequestHandler):
|
||||||
timeout = 10
|
timeout = 10
|
||||||
|
protocol_version = "HTTP/1.1"
|
||||||
REQUEST_COUNT = 0
|
REQUEST_COUNT = 0
|
||||||
|
|
||||||
@typing.override
|
@typing.override
|
||||||
@@ -111,6 +112,8 @@ class BaseHandler(abc.ABC, http.server.BaseHTTPRequestHandler):
|
|||||||
@typing.override
|
@typing.override
|
||||||
def log_request(self, code: str = "?", size: str = "-") -> None: # ty:ignore[invalid-method-override] # pragma: no cover
|
def log_request(self, code: str = "?", size: str = "-") -> None: # ty:ignore[invalid-method-override] # pragma: no cover
|
||||||
if isinstance(code, http.HTTPStatus):
|
if isinstance(code, http.HTTPStatus):
|
||||||
|
code = code.value
|
||||||
|
if isinstance(code, int):
|
||||||
color = logs.TermColor.RED
|
color = logs.TermColor.RED
|
||||||
if 100 <= code < 200:
|
if 100 <= code < 200:
|
||||||
color = logs.TermColor.CYAN
|
color = logs.TermColor.CYAN
|
||||||
@@ -120,7 +123,7 @@ class BaseHandler(abc.ABC, http.server.BaseHTTPRequestHandler):
|
|||||||
color = logs.TermColor.BLUE
|
color = logs.TermColor.BLUE
|
||||||
elif 400 <= code < 500:
|
elif 400 <= code < 500:
|
||||||
color = logs.TermColor.YELLOW
|
color = logs.TermColor.YELLOW
|
||||||
code = color + str(code.value) + logs.TermColor.RESET
|
code = color + str(code) + logs.TermColor.RESET
|
||||||
if size == "" and self.out_size > 0:
|
if size == "" and self.out_size > 0:
|
||||||
size = str(self.out_size)
|
size = str(self.out_size)
|
||||||
args = (
|
args = (
|
||||||
@@ -150,7 +153,8 @@ class BaseHandler(abc.ABC, http.server.BaseHTTPRequestHandler):
|
|||||||
self.send_header("Content-Length", str(len(encoded)))
|
self.send_header("Content-Length", str(len(encoded)))
|
||||||
self.send_header("Connection", "close")
|
self.send_header("Connection", "close")
|
||||||
self.end_headers()
|
self.end_headers()
|
||||||
self.wfile.write(encoded)
|
if self.command != http.HTTPMethod.HEAD:
|
||||||
|
self.wfile.write(encoded)
|
||||||
self.close_connection = True
|
self.close_connection = True
|
||||||
|
|
||||||
def send_status_only(
|
def send_status_only(
|
||||||
@@ -193,6 +197,7 @@ class BaseHandler(abc.ABC, http.server.BaseHTTPRequestHandler):
|
|||||||
headers=headers,
|
headers=headers,
|
||||||
allow_redirects=False,
|
allow_redirects=False,
|
||||||
timeout=480,
|
timeout=480,
|
||||||
|
stream=False,
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.send_error(
|
self.send_error(
|
||||||
@@ -218,7 +223,7 @@ class BaseHandler(abc.ABC, http.server.BaseHTTPRequestHandler):
|
|||||||
self.send_header("Content-Length", str(out_size := len(response.content)))
|
self.send_header("Content-Length", str(out_size := len(response.content)))
|
||||||
self.send_header("Connection", "close")
|
self.send_header("Connection", "close")
|
||||||
self.end_headers()
|
self.end_headers()
|
||||||
if out_size > 0:
|
if out_size > 0 and self.command != http.HTTPMethod.HEAD:
|
||||||
self.wfile.write(response.content)
|
self.wfile.write(response.content)
|
||||||
self.close_connection = True
|
self.close_connection = True
|
||||||
|
|
||||||
@@ -272,7 +277,7 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, BaseHandler):
|
|||||||
UPDATE_PATH_REGEX = re.compile(r"^\/([\w-]+)\/?$")
|
UPDATE_PATH_REGEX = re.compile(r"^\/([\w-]+)\/?$")
|
||||||
GET_PATH_REGEX = re.compile(r"^\/([\w-]+)($|\/)")
|
GET_PATH_REGEX = re.compile(r"^\/([\w-]+)($|\/)")
|
||||||
HOST_PART_REGEX = re.compile(r"^([a-z0-9]|[a-z0-9][a-z0-9-]{,61}[a-z0-9])$")
|
HOST_PART_REGEX = re.compile(r"^([a-z0-9]|[a-z0-9][a-z0-9-]{,61}[a-z0-9])$")
|
||||||
AUTHORIZED_PATHS: typing.ClassVar[list[str]] = ["/favicon.ico"]
|
AUTHORIZED_PATHS: typing.ClassVar[list[str]] = ["/favicon.ico", "/robots.txt"]
|
||||||
TOKEN_HEADER = "X-Token" # noqa: S105
|
TOKEN_HEADER = "X-Token" # noqa: S105
|
||||||
HOST_HEADER = "X-Host"
|
HOST_HEADER = "X-Host"
|
||||||
HOST_ONLY_HEADER = "X-Host-Only"
|
HOST_ONLY_HEADER = "X-Host-Only"
|
||||||
@@ -383,8 +388,13 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, BaseHandler):
|
|||||||
def do_HEAD(self) -> None:
|
def do_HEAD(self) -> None:
|
||||||
with self.handle_errors():
|
with self.handle_errors():
|
||||||
self._pre_log_request()
|
self._pre_log_request()
|
||||||
if not self._proxy_or_redirect():
|
if self._proxy_or_redirect():
|
||||||
super().do_HEAD()
|
return None
|
||||||
|
if self.path == "/" and self.host == self.default_host:
|
||||||
|
return self.send_basic_body(self.server_signature())
|
||||||
|
super().do_HEAD()
|
||||||
|
self.close_connection = True
|
||||||
|
return None
|
||||||
|
|
||||||
@typing.override
|
@typing.override
|
||||||
def do_GET(self) -> None:
|
def do_GET(self) -> None:
|
||||||
@@ -394,7 +404,9 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, BaseHandler):
|
|||||||
return None
|
return None
|
||||||
if self.path == "/" and self.host == self.default_host:
|
if self.path == "/" and self.host == self.default_host:
|
||||||
return self.send_basic_body(self.server_signature())
|
return self.send_basic_body(self.server_signature())
|
||||||
return super().do_GET()
|
super().do_GET()
|
||||||
|
self.close_connection = True
|
||||||
|
return None
|
||||||
|
|
||||||
def do_PUT(self) -> None:
|
def do_PUT(self) -> None:
|
||||||
with self.handle_errors():
|
with self.handle_errors():
|
||||||
@@ -542,6 +554,11 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, BaseHandler):
|
|||||||
return super().translate_path(path)
|
return super().translate_path(path)
|
||||||
return ""
|
return ""
|
||||||
if self.host != self.default_host:
|
if self.host != self.default_host:
|
||||||
|
if (
|
||||||
|
not (self.root_path / page.path / path).is_file()
|
||||||
|
and path in self.AUTHORIZED_PATHS
|
||||||
|
):
|
||||||
|
return super().translate_path(path)
|
||||||
path = f"/{page.path}" + path
|
path = f"/{page.path}" + path
|
||||||
if pathlib.Path(path).name.startswith("."): # hidden files
|
if pathlib.Path(path).name.startswith("."): # hidden files
|
||||||
return ""
|
return ""
|
||||||
@@ -615,16 +632,19 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler, BaseHandler):
|
|||||||
|
|
||||||
|
|
||||||
class UpgradeHandler(RequestHandler):
|
class UpgradeHandler(RequestHandler):
|
||||||
|
protocol_version = "HTTP/1.0"
|
||||||
server_version = "StaplerUpgradeServer/" + PKG_VERSION
|
server_version = "StaplerUpgradeServer/" + PKG_VERSION
|
||||||
|
|
||||||
def do_HEAD(self) -> None:
|
def do_HEAD(self) -> None:
|
||||||
with self.handle_errors():
|
with self.handle_errors():
|
||||||
self._pre_log_request()
|
self._pre_log_request()
|
||||||
self.send_redirect(f"https://{self.host}{self.path}")
|
self.send_redirect(f"https://{self.host}{self.path}")
|
||||||
|
self.close_connection = True
|
||||||
|
|
||||||
def do_GET(self) -> None:
|
def do_GET(self) -> None:
|
||||||
with self.handle_errors():
|
with self.handle_errors():
|
||||||
if self.path.startswith(self.CERTBOT_CHALLENGE_PATH):
|
if self.path.startswith(self.CERTBOT_CHALLENGE_PATH):
|
||||||
super().do_GET()
|
super().do_GET()
|
||||||
|
self.close_connection = True
|
||||||
else:
|
else:
|
||||||
self.do_HEAD()
|
self.do_HEAD()
|
||||||
|
|||||||
@@ -0,0 +1,2 @@
|
|||||||
|
User-agent: *
|
||||||
|
Disallow: /
|
||||||
+5
-7
@@ -29,7 +29,6 @@ class StaplerServer:
|
|||||||
"logger",
|
"logger",
|
||||||
"params",
|
"params",
|
||||||
"registry",
|
"registry",
|
||||||
"server",
|
|
||||||
"token_manager",
|
"token_manager",
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -41,7 +40,6 @@ class StaplerServer:
|
|||||||
self.token_manager: TokenManager = TokenManager(params, self.registry)
|
self.token_manager: TokenManager = TokenManager(params, self.registry)
|
||||||
self.data_dir: DataDir = DataDir(params.data_dir)
|
self.data_dir: DataDir = DataDir(params.data_dir)
|
||||||
self.default_host: str = params.host.split(":", maxsplit=2)[0]
|
self.default_host: str = params.host.split(":", maxsplit=2)[0]
|
||||||
self.server: http.server.ThreadingHTTPServer | None = None
|
|
||||||
|
|
||||||
def __get_all_hosts(self) -> list[str]:
|
def __get_all_hosts(self) -> list[str]:
|
||||||
return [self.default_host, *self.registry.get_hosts()]
|
return [self.default_host, *self.registry.get_hosts()]
|
||||||
@@ -50,7 +48,7 @@ class StaplerServer:
|
|||||||
self.logger.info("Starting up...")
|
self.logger.info("Starting up...")
|
||||||
self.registry.load_pages()
|
self.registry.load_pages()
|
||||||
if self.params.with_certificates:
|
if self.params.with_certificates:
|
||||||
self.cert_manager.init(self.__get_all_hosts())
|
self.cert_manager.init()
|
||||||
self.data_dir.init()
|
self.data_dir.init()
|
||||||
self.token_manager.init()
|
self.token_manager.init()
|
||||||
|
|
||||||
@@ -75,7 +73,7 @@ class StaplerServer:
|
|||||||
)
|
)
|
||||||
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
|
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
|
||||||
server.socket = context.wrap_socket(server.socket, server_side=True)
|
server.socket = context.wrap_socket(server.socket, server_side=True)
|
||||||
context.sni_callback = self.cert_manager.sni_callback
|
context.set_servername_callback(self.cert_manager.servername_callback)
|
||||||
else:
|
else:
|
||||||
server = http.server.ThreadingHTTPServer(
|
server = http.server.ThreadingHTTPServer(
|
||||||
(
|
(
|
||||||
@@ -131,7 +129,7 @@ class StaplerServer:
|
|||||||
for line in STAPLER_ASCII.split("\n"):
|
for line in STAPLER_ASCII.split("\n"):
|
||||||
self.logger.debug(line.ljust(36))
|
self.logger.debug(line.ljust(36))
|
||||||
self.__startup()
|
self.__startup()
|
||||||
self.server = self.__create_base_server()
|
base_server = self.__create_base_server()
|
||||||
upgrade_server = self.__start_upgrade_server() if self.params.https else None
|
upgrade_server = self.__start_upgrade_server() if self.params.https else None
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
"Server up and ready on %s://%s",
|
"Server up and ready on %s://%s",
|
||||||
@@ -140,7 +138,7 @@ class StaplerServer:
|
|||||||
)
|
)
|
||||||
self.__start_background_tasks()
|
self.__start_background_tasks()
|
||||||
with contextlib.suppress(KeyboardInterrupt):
|
with contextlib.suppress(KeyboardInterrupt):
|
||||||
self.server.serve_forever()
|
base_server.serve_forever()
|
||||||
self.logger.info("Shutting down...")
|
self.logger.info("Shutting down...")
|
||||||
if upgrade_server is not None:
|
if upgrade_server is not None:
|
||||||
upgrade_server.shutdown()
|
upgrade_server.shutdown()
|
||||||
@@ -152,7 +150,7 @@ class StaplerServer:
|
|||||||
self.logger.warning("Cannot renew without certificates")
|
self.logger.warning("Cannot renew without certificates")
|
||||||
return 1
|
return 1
|
||||||
self.registry.load_pages()
|
self.registry.load_pages()
|
||||||
self.cert_manager.init(self.__get_all_hosts())
|
self.cert_manager.init()
|
||||||
for host in self.__get_all_hosts():
|
for host in self.__get_all_hosts():
|
||||||
self.cert_manager.create_or_update(host)
|
self.cert_manager.create_or_update(host)
|
||||||
return 0
|
return 0
|
||||||
|
|||||||
+11
-17
@@ -35,18 +35,10 @@ class TestRegistry(BaseTestCase):
|
|||||||
self.patch("shutil.which", count=0),
|
self.patch("shutil.which", count=0),
|
||||||
self.patch("subprocess.check_output", count=0),
|
self.patch("subprocess.check_output", count=0),
|
||||||
):
|
):
|
||||||
self.cert_manager.init([])
|
self.cert_manager.init()
|
||||||
assert self.self_signed_path.is_dir()
|
assert self.self_signed_path.is_dir()
|
||||||
assert self.certbot_www.is_dir()
|
assert self.certbot_www.is_dir()
|
||||||
|
|
||||||
def test_init_with_hosts(self) -> None:
|
|
||||||
with (
|
|
||||||
self.patch("shutil.which", count=0),
|
|
||||||
self.patch("subprocess.check_output", count=0),
|
|
||||||
):
|
|
||||||
self._make_self_signed("example.com")
|
|
||||||
self.cert_manager.init(["example.com"])
|
|
||||||
|
|
||||||
def test_exists_self_signed(self) -> None:
|
def test_exists_self_signed(self) -> None:
|
||||||
self._make_self_signed("example.com")
|
self._make_self_signed("example.com")
|
||||||
assert self.cert_manager.exists("example.com")
|
assert self.cert_manager.exists("example.com")
|
||||||
@@ -169,24 +161,26 @@ class TestRegistry(BaseTestCase):
|
|||||||
lambda: self.cert_manager.get_key("example.com"),
|
lambda: self.cert_manager.get_key("example.com"),
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_sni_callback_no_host(self) -> None:
|
def test_servername_callback_no_host(self) -> None:
|
||||||
self._make_self_signed("example.com")
|
self._make_self_signed("example.com")
|
||||||
with (
|
with (
|
||||||
self.patch("ssl.create_default_context", count=0),
|
self.patch("ssl.create_default_context", count=0),
|
||||||
):
|
):
|
||||||
self.cert_manager.sni_callback(self.socket_mock, None, self.context_mock)
|
self.cert_manager.servername_callback(
|
||||||
|
self.socket_mock, None, self.context_mock
|
||||||
|
)
|
||||||
|
|
||||||
def test_sni_callback_fail(self) -> None:
|
def test_servername_callback_fail(self) -> None:
|
||||||
self._make_self_signed("example.com")
|
self._make_self_signed("example.com")
|
||||||
with (
|
with (
|
||||||
self.patch("shutil.which", count=3),
|
self.patch("shutil.which", count=3),
|
||||||
self.patch("ssl.create_default_context", count=0),
|
self.patch("ssl.create_default_context", count=0),
|
||||||
):
|
):
|
||||||
self.cert_manager.sni_callback(
|
self.cert_manager.servername_callback(
|
||||||
self.socket_mock, "example.fr", self.context_mock
|
self.socket_mock, "example.fr", self.context_mock
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_sni_callback_create_context(self) -> None:
|
def test_servername_callback_create_context(self) -> None:
|
||||||
self._make_self_signed("example.com")
|
self._make_self_signed("example.com")
|
||||||
with (
|
with (
|
||||||
self.patch("ssl.create_default_context", return_value=self.context_mock),
|
self.patch("ssl.create_default_context", return_value=self.context_mock),
|
||||||
@@ -199,18 +193,18 @@ class TestRegistry(BaseTestCase):
|
|||||||
),
|
),
|
||||||
self.patch("shutil.which", count=0),
|
self.patch("shutil.which", count=0),
|
||||||
):
|
):
|
||||||
self.cert_manager.sni_callback(
|
self.cert_manager.servername_callback(
|
||||||
self.socket_mock, "example.com", self.context_mock
|
self.socket_mock, "example.com", self.context_mock
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_sni_callback_create_context_fail(self) -> None:
|
def test_servername_callback_create_context_fail(self) -> None:
|
||||||
self._make_self_signed("example.com")
|
self._make_self_signed("example.com")
|
||||||
with (
|
with (
|
||||||
self.patch("ssl.create_default_context", return_value=self.context_mock),
|
self.patch("ssl.create_default_context", return_value=self.context_mock),
|
||||||
self.patch("shutil.which", count=0),
|
self.patch("shutil.which", count=0),
|
||||||
):
|
):
|
||||||
self.context_mock.load_cert_chain.side_effect = Exception
|
self.context_mock.load_cert_chain.side_effect = Exception
|
||||||
self.cert_manager.sni_callback(
|
self.cert_manager.servername_callback(
|
||||||
self.socket_mock, "example.com", self.context_mock
|
self.socket_mock, "example.com", self.context_mock
|
||||||
)
|
)
|
||||||
self.context_mock.load_cert_chain.assert_called_once_with(
|
self.context_mock.load_cert_chain.assert_called_once_with(
|
||||||
|
|||||||
+36
-2
@@ -36,6 +36,7 @@ class BaseHandlerTestCase(BaseTestCase, abc.ABC):
|
|||||||
code: int,
|
code: int,
|
||||||
message: str | None = None,
|
message: str | None = None,
|
||||||
headers: dict[str, str] | None = None,
|
headers: dict[str, str] | None = None,
|
||||||
|
content_length: int = 0,
|
||||||
) -> typing.Iterator[None]:
|
) -> typing.Iterator[None]:
|
||||||
if headers is None:
|
if headers is None:
|
||||||
headers = {}
|
headers = {}
|
||||||
@@ -46,7 +47,7 @@ class BaseHandlerTestCase(BaseTestCase, abc.ABC):
|
|||||||
send_response_mock.assert_called_once_with(code, message)
|
send_response_mock.assert_called_once_with(code, message)
|
||||||
send_header_mock.assert_has_calls(
|
send_header_mock.assert_has_calls(
|
||||||
[
|
[
|
||||||
unittest.mock.call("Content-Length", "0"),
|
unittest.mock.call("Content-Length", str(content_length)),
|
||||||
]
|
]
|
||||||
+ [unittest.mock.call(header, value) for header, value in headers.items()],
|
+ [unittest.mock.call(header, value) for header, value in headers.items()],
|
||||||
any_order=True,
|
any_order=True,
|
||||||
@@ -192,9 +193,21 @@ class TestRequestHandler(BaseHandlerTestCase):
|
|||||||
token_manager=self.token_manager,
|
token_manager=self.token_manager,
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_do_head_forward(self) -> None:
|
def test_do_head_index(self) -> None:
|
||||||
handler = self._get_handler()
|
handler = self._get_handler()
|
||||||
with (
|
with (
|
||||||
|
self.expects_status_only(
|
||||||
|
handler, 200, content_length=len(handler.server_signature())
|
||||||
|
),
|
||||||
|
self.patch("http.server.SimpleHTTPRequestHandler.do_HEAD", count=0),
|
||||||
|
self.seal_mocks(),
|
||||||
|
):
|
||||||
|
handler.do_HEAD()
|
||||||
|
|
||||||
|
def test_do_head_forward(self) -> None:
|
||||||
|
handler = self._get_handler("/file")
|
||||||
|
with (
|
||||||
|
self.mock_call(self.registry.get_from_path, ["file"], Page("file")),
|
||||||
self.patch("http.server.SimpleHTTPRequestHandler.do_HEAD"),
|
self.patch("http.server.SimpleHTTPRequestHandler.do_HEAD"),
|
||||||
self.seal_mocks(),
|
self.seal_mocks(),
|
||||||
):
|
):
|
||||||
@@ -859,6 +872,7 @@ class TestRequestHandler(BaseHandlerTestCase):
|
|||||||
},
|
},
|
||||||
"allow_redirects": False,
|
"allow_redirects": False,
|
||||||
"timeout": 480,
|
"timeout": 480,
|
||||||
|
"stream": False,
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
self.expects_status_only(handler, 200, "OK"),
|
self.expects_status_only(handler, 200, "OK"),
|
||||||
@@ -903,6 +917,7 @@ class TestRequestHandler(BaseHandlerTestCase):
|
|||||||
},
|
},
|
||||||
"allow_redirects": False,
|
"allow_redirects": False,
|
||||||
"timeout": 480,
|
"timeout": 480,
|
||||||
|
"stream": False,
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
self.expects_status_only(handler, 200, "OK"),
|
self.expects_status_only(handler, 200, "OK"),
|
||||||
@@ -945,6 +960,7 @@ class TestRequestHandler(BaseHandlerTestCase):
|
|||||||
},
|
},
|
||||||
"allow_redirects": False,
|
"allow_redirects": False,
|
||||||
"timeout": 480,
|
"timeout": 480,
|
||||||
|
"stream": False,
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
self.expects_basic_body(handler, "hello", message="OK"),
|
self.expects_basic_body(handler, "hello", message="OK"),
|
||||||
@@ -979,6 +995,7 @@ class TestRequestHandler(BaseHandlerTestCase):
|
|||||||
},
|
},
|
||||||
"allow_redirects": False,
|
"allow_redirects": False,
|
||||||
"timeout": 480,
|
"timeout": 480,
|
||||||
|
"stream": False,
|
||||||
},
|
},
|
||||||
) as request_mock,
|
) as request_mock,
|
||||||
self.expects_status_only(
|
self.expects_status_only(
|
||||||
@@ -1022,6 +1039,7 @@ class TestRequestHandler(BaseHandlerTestCase):
|
|||||||
},
|
},
|
||||||
"allow_redirects": False,
|
"allow_redirects": False,
|
||||||
"timeout": 480,
|
"timeout": 480,
|
||||||
|
"stream": False,
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
self.expects_status_only(handler, 200, "OK"),
|
self.expects_status_only(handler, 200, "OK"),
|
||||||
@@ -1062,6 +1080,7 @@ class TestRequestHandler(BaseHandlerTestCase):
|
|||||||
},
|
},
|
||||||
"allow_redirects": False,
|
"allow_redirects": False,
|
||||||
"timeout": 480,
|
"timeout": 480,
|
||||||
|
"stream": False,
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
self.expects_status_only(handler, 200, "OK"),
|
self.expects_status_only(handler, 200, "OK"),
|
||||||
@@ -1192,6 +1211,21 @@ class TestRequestHandler(BaseHandlerTestCase):
|
|||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_translate_path_with_host_favicon(self) -> None:
|
||||||
|
handler = self._get_handler(headers={"Host": "example.com"})
|
||||||
|
with (
|
||||||
|
self.mock_call(self.registry.get_from_host, ["example.com"], Page("path")),
|
||||||
|
self.patch_call(
|
||||||
|
"http.server.SimpleHTTPRequestHandler.translate_path",
|
||||||
|
["/favicon.ico"],
|
||||||
|
),
|
||||||
|
self.seal_mocks(),
|
||||||
|
):
|
||||||
|
self.assertEqual(
|
||||||
|
handler.translate_path("/favicon.ico"),
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
|
||||||
def test_translate_path_default_host(self) -> None:
|
def test_translate_path_default_host(self) -> None:
|
||||||
handler = self._get_handler()
|
handler = self._get_handler()
|
||||||
with (
|
with (
|
||||||
|
|||||||
@@ -26,10 +26,8 @@ class TestStaplerServer(BaseTestCase):
|
|||||||
def test_renew(self) -> None:
|
def test_renew(self) -> None:
|
||||||
with (
|
with (
|
||||||
self.mock_call(self.registry.load_pages),
|
self.mock_call(self.registry.load_pages),
|
||||||
self.mock_calls(
|
self.mock_calls(self.registry.get_hosts, [[]], [["host_1"]]),
|
||||||
self.registry.get_hosts, [[], []], [["host_1"], ["host_1"]]
|
self.mock_call(self.cert_manager.init),
|
||||||
),
|
|
||||||
self.mock_call(self.cert_manager.init, [["localhost", "host_1"]]),
|
|
||||||
self.mock_calls(
|
self.mock_calls(
|
||||||
self.cert_manager.create_or_update, [["localhost"], ["host_1"]]
|
self.cert_manager.create_or_update, [["localhost"], ["host_1"]]
|
||||||
),
|
),
|
||||||
@@ -67,16 +65,16 @@ class TestStaplerServer(BaseTestCase):
|
|||||||
|
|
||||||
def test_run_https(self) -> None:
|
def test_run_https(self) -> None:
|
||||||
self.token_manager.detect_file_change.side_effect = KeyboardInterrupt
|
self.token_manager.detect_file_change.side_effect = KeyboardInterrupt
|
||||||
self.cert_manager.sni_callback = unittest.mock.Mock()
|
self.cert_manager.servername_callback = unittest.mock.Mock()
|
||||||
with (
|
with (
|
||||||
self.mock_call(self.registry.load_pages),
|
self.mock_call(self.registry.load_pages),
|
||||||
self.mock_call(self.registry.get_hosts, [], []),
|
self.mock_call(self.cert_manager.init),
|
||||||
self.mock_call(self.cert_manager.init, [["localhost"]]),
|
|
||||||
self.mock_call(self.data_dir.init),
|
self.mock_call(self.data_dir.init),
|
||||||
self.mock_call(self.token_manager.init),
|
self.mock_call(self.token_manager.init),
|
||||||
self.patch("ssl.create_default_context", return_value=self.context_mock),
|
self.patch("ssl.create_default_context", return_value=self.context_mock),
|
||||||
self.patch("http.server.ThreadingHTTPServer", self.server_mock, 2),
|
self.patch("http.server.ThreadingHTTPServer", self.server_mock, 2),
|
||||||
self.mock_call_unchecked(self.context_mock.wrap_socket),
|
self.mock_call_unchecked(self.context_mock.wrap_socket),
|
||||||
|
self.mock_call_unchecked(self.context_mock.set_servername_callback),
|
||||||
self.mock_calls_unchecked(self.server_mock.serve_forever, 2),
|
self.mock_calls_unchecked(self.server_mock.serve_forever, 2),
|
||||||
self.mock_call(self.server_mock.shutdown),
|
self.mock_call(self.server_mock.shutdown),
|
||||||
self.seal_mocks(),
|
self.seal_mocks(),
|
||||||
|
|||||||
Reference in New Issue
Block a user