fix: don't reload cert manager, use only sni callback

This commit is contained in:
2026-04-20 21:14:17 +02:00
parent d7bca9dc10
commit e7e8c9f141
4 changed files with 51 additions and 160 deletions
+12 -33
View File
@@ -183,44 +183,23 @@ class CertManager:
return False return False
return self.__exists_certbot(host) return self.__exists_certbot(host)
def get_https_context(self, default_host: str) -> ssl.SSLContext | None: def sni_callback(
if not self.exists(default_host):
self.logger.warning("Cannot create HTTPS context for %s", default_host)
return None
self.logger.debug("Creating HTTP context...")
cert_file = self.get_cert(default_host)
key_file = self.get_key(default_host)
self.last_file_change = cert_file.stat().st_mtime
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(
cert_file,
key_file,
)
context.sni_callback = self.__sni_callback
return context
def detect_default_cert_change(self, default_host: str) -> bool:
cert_file = self.get_cert(default_host)
if cert_file.exists() and cert_file.stat().st_mtime != self.last_file_change:
self.logger.debug("Detected change: %s", cert_file)
self.last_file_change = cert_file.stat().st_mtime
return True
return False
def __sni_callback(
self, socket: ssl.SSLObject, host: str | None, _: ssl.SSLContext, / self, socket: ssl.SSLObject, host: str | None, _: ssl.SSLContext, /
) -> None | int: ) -> None | int:
if host is None: if host is None:
return None return None
if not self.exists(host) and not self.create_or_update(host): if not self.exists(host) and not self.create_or_update(host):
msg = "Could not get certificate for %s" return None
raise CertManagerError(msg, host)
new_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
cert_file = self.get_cert(host) cert_file = self.get_cert(host)
key_file = self.get_key(host) key_file = self.get_key(host)
new_context.load_cert_chain( try:
cert_file, new_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
key_file, new_context.load_cert_chain(
) cert_file,
socket.context = new_context key_file,
)
socket.context = new_context
except Exception:
self.logger.exception("Could not create HTTPS context for %s", host)
return None
return None return None
+9 -30
View File
@@ -1,6 +1,7 @@
import contextlib import contextlib
import http.server import http.server
import logging import logging
import ssl
import threading import threading
import time import time
import typing import typing
@@ -42,7 +43,6 @@ class StaplerServer:
self.data_dir: DataDir = DataDir(params.data_dir) self.data_dir: DataDir = DataDir(params.data_dir)
self.default_host: str = params.host.split(":", maxsplit=2)[0] self.default_host: str = params.host.split(":", maxsplit=2)[0]
self.server: http.server.ThreadingHTTPServer | None = None self.server: http.server.ThreadingHTTPServer | None = None
self.https = params.https
def __get_all_hosts(self) -> list[str]: def __get_all_hosts(self) -> list[str]:
return [self.default_host, *self.registry.get_hosts()] return [self.default_host, *self.registry.get_hosts()]
@@ -66,13 +66,8 @@ class StaplerServer:
token_manager=self.token_manager, token_manager=self.token_manager,
) )
def __create_base_server(self) -> tuple[http.server.ThreadingHTTPServer, bool]: def __create_base_server(self) -> http.server.ThreadingHTTPServer:
context = ( if self.params.https:
self.cert_manager.get_https_context(self.default_host)
if self.params.https
else None
)
if context is not None:
server = http.server.ThreadingHTTPServer( server = http.server.ThreadingHTTPServer(
( (
self.params.bind, self.params.bind,
@@ -80,7 +75,9 @@ class StaplerServer:
), ),
self.__request_handler, self.__request_handler,
) )
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
server.socket = context.wrap_socket(server.socket, server_side=True) server.socket = context.wrap_socket(server.socket, server_side=True)
context.sni_callback = self.cert_manager.sni_callback
else: else:
server = http.server.ThreadingHTTPServer( server = http.server.ThreadingHTTPServer(
( (
@@ -94,7 +91,7 @@ class StaplerServer:
server.server_address[0], server.server_address[0],
server.server_port, server.server_port,
) )
return server, context is not None return server
def __upgrade_handler( def __upgrade_handler(
self, *args: typing.Any self, *args: typing.Any
@@ -127,37 +124,19 @@ class StaplerServer:
self.token_manager.init() self.token_manager.init()
time.sleep(1) time.sleep(1)
def __cert_manager_background(self) -> None:
with contextlib.suppress(KeyboardInterrupt):
while True:
if (
self.server is not None
and self.cert_manager.detect_default_cert_change(self.default_host)
and (
context := self.cert_manager.get_https_context(
self.default_host
)
)
is not None
):
self.server.socket = context.wrap_socket(self.server.socket)
time.sleep(1)
def __start_background_tasks(self) -> None: def __start_background_tasks(self) -> None:
threading.Thread(target=self.__token_manager_background, daemon=True).start() threading.Thread(target=self.__token_manager_background, daemon=True).start()
if self.https:
threading.Thread(target=self.__cert_manager_background, daemon=True).start()
def run(self) -> int: def run(self) -> int:
self.logger.info("Version %s", project.get_version()) self.logger.info("Version %s", project.get_version())
for line in STAPLER_ASCII.split("\n"): for line in STAPLER_ASCII.split("\n"):
self.logger.debug(line.ljust(36)) self.logger.debug(line.ljust(36))
self.__startup() self.__startup()
self.server, self.https = self.__create_base_server() self.server = self.__create_base_server()
upgrade_server = self.__start_upgrade_server() if self.https else None upgrade_server = self.__start_upgrade_server() if self.params.https else None
self.logger.info( self.logger.info(
"Server up and ready on %s://%s", "Server up and ready on %s://%s",
"https" if self.https else "http", "https" if self.params.https else "http",
self.params.host, self.params.host,
) )
self.__start_background_tasks() self.__start_background_tasks()
+27 -71
View File
@@ -167,41 +167,24 @@ class TestRegistry(BaseTestCase):
lambda: self.cert_manager.get_key("localhost"), lambda: self.cert_manager.get_key("localhost"),
) )
def test_get_https_context_fail(self) -> None:
self.assertIsNone(self.cert_manager.get_https_context("localhost"))
def test_get_https_context(self) -> None:
self._make_self_signed("localhost")
with (
self.patch("ssl.create_default_context", return_value=self.context_mock),
self.mock_call(
self.context_mock.load_cert_chain,
[
self.self_signed_path / "localhost" / CertManager.CRT_FILE,
self.self_signed_path / "localhost" / CertManager.KEY_FILE,
],
),
):
self.assertEqual(
self.cert_manager.get_https_context("localhost"), self.context_mock
)
def test_sni_callback_no_host(self) -> None: def test_sni_callback_no_host(self) -> None:
self._make_self_signed("localhost") self._make_self_signed("localhost")
with ( with (
self.patch("ssl.create_default_context", return_value=self.context_mock), self.patch("ssl.create_default_context", count=0),
self.mock_call(
self.context_mock.load_cert_chain,
[
self.self_signed_path / "localhost" / CertManager.CRT_FILE,
self.self_signed_path / "localhost" / CertManager.KEY_FILE,
],
),
): ):
self.cert_manager.get_https_context("localhost") self.cert_manager.sni_callback(self.socket_mock, None, self.context_mock)
self.context_mock.sni_callback(self.socket_mock, None, self.context_mock)
def test_sni_callback_fail(self) -> None: def test_sni_callback_fail(self) -> None:
self._make_self_signed("localhost")
with (
self.patch("shutil.which", count=3),
self.patch("ssl.create_default_context", count=0),
):
self.cert_manager.sni_callback(
self.socket_mock, "new_host", self.context_mock
)
def test_sni_callback_create_context(self) -> None:
self._make_self_signed("localhost") self._make_self_signed("localhost")
with ( with (
self.patch("ssl.create_default_context", return_value=self.context_mock), self.patch("ssl.create_default_context", return_value=self.context_mock),
@@ -212,53 +195,26 @@ class TestRegistry(BaseTestCase):
self.self_signed_path / "localhost" / CertManager.KEY_FILE, self.self_signed_path / "localhost" / CertManager.KEY_FILE,
], ],
), ),
self.patch("shutil.which", count=3),
):
self.cert_manager.get_https_context("localhost")
self.assertRaises(
CertManagerError,
lambda: self.context_mock.sni_callback(
self.socket_mock, "new_host", self.context_mock
),
)
def test_sni_callback_change_context(self) -> None:
self._make_self_signed("localhost")
self._make_self_signed("new_host")
with (
self.patch(
"ssl.create_default_context", return_value=self.context_mock, count=2
),
self.mock_calls(
self.context_mock.load_cert_chain,
[
[
self.self_signed_path / "localhost" / CertManager.CRT_FILE,
self.self_signed_path / "localhost" / CertManager.KEY_FILE,
],
[
self.self_signed_path / "new_host" / CertManager.CRT_FILE,
self.self_signed_path / "new_host" / CertManager.KEY_FILE,
],
],
),
self.patch("shutil.which", count=0), self.patch("shutil.which", count=0),
): ):
self.cert_manager.get_https_context("localhost") self.cert_manager.sni_callback(
self.context_mock.sni_callback( self.socket_mock, "localhost", self.context_mock
self.socket_mock, "new_host", self.context_mock
) )
def test_detect_default_cert_change(self) -> None: def test_sni_callback_create_context_fail(self) -> None:
self._make_self_signed("localhost") self._make_self_signed("localhost")
assert self.cert_manager.detect_default_cert_change("localhost") with (
self.patch("ssl.create_default_context", return_value=self.context_mock),
def test_detect_default_cert_change_nothing(self) -> None: self.patch("shutil.which", count=0),
self._make_self_signed("localhost") ):
self.cert_manager.last_file_change = ( self.context_mock.load_cert_chain.side_effect = Exception
(self.self_signed_path / "localhost" / CertManager.CRT_FILE).stat().st_mtime self.cert_manager.sni_callback(
) self.socket_mock, "localhost", self.context_mock
assert not self.cert_manager.detect_default_cert_change("localhost") )
self.context_mock.load_cert_chain.assert_called_once_with(
self.self_signed_path / "localhost" / CertManager.CRT_FILE,
self.self_signed_path / "localhost" / CertManager.KEY_FILE,
)
def _make_self_signed(self, host: str) -> None: def _make_self_signed(self, host: str) -> None:
(self.self_signed_path / host).mkdir(parents=True, exist_ok=True) (self.self_signed_path / host).mkdir(parents=True, exist_ok=True)
+3 -26
View File
@@ -65,44 +65,21 @@ class TestStaplerServer(BaseTestCase):
self.assertEqual(self.server.run(), 0) self.assertEqual(self.server.run(), 0)
self.token_manager.detect_file_change.assert_called_once() self.token_manager.detect_file_change.assert_called_once()
def test_run_https_fail(self) -> None:
self.token_manager.detect_file_change.side_effect = KeyboardInterrupt
with (
self.mock_call(self.registry.load_pages),
self.mock_call(self.registry.get_hosts, [], []),
self.mock_call(self.cert_manager.init, [["localhost"]]),
self.mock_call(self.data_dir.init),
self.mock_call(self.token_manager.init),
self.mock_call(self.cert_manager.get_https_context, ["localhost"]),
self.patch("http.server.ThreadingHTTPServer", self.server_mock),
self.mock_call(self.server_mock.serve_forever),
self.seal_mocks(),
):
self.assertEqual(self.server.run(), 0)
self.token_manager.detect_file_change.assert_called_once()
def test_run_https(self) -> None: def test_run_https(self) -> None:
self.token_manager.detect_file_change.side_effect = KeyboardInterrupt self.token_manager.detect_file_change.side_effect = KeyboardInterrupt
self.cert_manager.detect_default_cert_change.side_effect = KeyboardInterrupt self.cert_manager.sni_callback = unittest.mock.Mock()
with ( with (
self.mock_call(self.registry.load_pages), self.mock_call(self.registry.load_pages),
self.mock_call(self.registry.get_hosts, [], []), self.mock_call(self.registry.get_hosts, [], []),
self.mock_call(self.cert_manager.init, [["localhost"]]), self.mock_call(self.cert_manager.init, [["localhost"]]),
self.mock_call(self.data_dir.init), self.mock_call(self.data_dir.init),
self.mock_call(self.token_manager.init), self.mock_call(self.token_manager.init),
self.mock_call( self.patch("ssl.create_default_context", return_value=self.context_mock),
self.cert_manager.get_https_context,
["localhost"],
self.context_mock,
),
self.patch("http.server.ThreadingHTTPServer", self.server_mock, 2), self.patch("http.server.ThreadingHTTPServer", self.server_mock, 2),
self.mock_call_unchecked(self.context_mock.wrap_socket), self.mock_call_unchecked(self.context_mock.wrap_socket),
self.mock_calls_unchecked(self.server_mock.serve_forever, 2), self.mock_calls_unchecked(self.server_mock.serve_forever, 2),
self.mock_call(self.server_mock.shutdown), self.mock_call(self.server_mock.shutdown),
self.seal_mocks(self.context_mock), self.seal_mocks(),
): ):
self.assertEqual(self.server.run(), 0) self.assertEqual(self.server.run(), 0)
self.token_manager.detect_file_change.assert_called_once() self.token_manager.detect_file_change.assert_called_once()
self.cert_manager.detect_default_cert_change.assert_called_once_with(
"localhost"
)