diff --git a/src/cert_manager.py b/src/cert_manager.py index 2354f1e..cd21372 100644 --- a/src/cert_manager.py +++ b/src/cert_manager.py @@ -183,44 +183,23 @@ class CertManager: return False return self.__exists_certbot(host) - def get_https_context(self, default_host: str) -> ssl.SSLContext | None: - if not self.exists(default_host): - self.logger.warning("Cannot create HTTPS context for %s", default_host) - return None - self.logger.debug("Creating HTTP context...") - cert_file = self.get_cert(default_host) - key_file = self.get_key(default_host) - self.last_file_change = cert_file.stat().st_mtime - context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) - context.load_cert_chain( - cert_file, - key_file, - ) - context.sni_callback = self.__sni_callback - return context - - def detect_default_cert_change(self, default_host: str) -> bool: - cert_file = self.get_cert(default_host) - if cert_file.exists() and cert_file.stat().st_mtime != self.last_file_change: - self.logger.debug("Detected change: %s", cert_file) - self.last_file_change = cert_file.stat().st_mtime - return True - return False - - def __sni_callback( + def sni_callback( self, socket: ssl.SSLObject, host: str | None, _: ssl.SSLContext, / ) -> None | int: if host is None: return None if not self.exists(host) and not self.create_or_update(host): - msg = "Could not get certificate for %s" - raise CertManagerError(msg, host) - new_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + return None cert_file = self.get_cert(host) key_file = self.get_key(host) - new_context.load_cert_chain( - cert_file, - key_file, - ) - socket.context = new_context + try: + new_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + new_context.load_cert_chain( + cert_file, + key_file, + ) + socket.context = new_context + except Exception: + self.logger.exception("Could not create HTTPS context for %s", host) + return None return None diff --git a/src/server.py b/src/server.py index daa301a..c81c778 100644 --- a/src/server.py +++ b/src/server.py @@ -1,6 +1,7 @@ import contextlib import http.server import logging +import ssl import threading import time import typing @@ -42,7 +43,6 @@ class StaplerServer: self.data_dir: DataDir = DataDir(params.data_dir) self.default_host: str = params.host.split(":", maxsplit=2)[0] self.server: http.server.ThreadingHTTPServer | None = None - self.https = params.https def __get_all_hosts(self) -> list[str]: return [self.default_host, *self.registry.get_hosts()] @@ -66,13 +66,8 @@ class StaplerServer: token_manager=self.token_manager, ) - def __create_base_server(self) -> tuple[http.server.ThreadingHTTPServer, bool]: - context = ( - self.cert_manager.get_https_context(self.default_host) - if self.params.https - else None - ) - if context is not None: + def __create_base_server(self) -> http.server.ThreadingHTTPServer: + if self.params.https: server = http.server.ThreadingHTTPServer( ( self.params.bind, @@ -80,7 +75,9 @@ class StaplerServer: ), self.__request_handler, ) + context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) server.socket = context.wrap_socket(server.socket, server_side=True) + context.sni_callback = self.cert_manager.sni_callback else: server = http.server.ThreadingHTTPServer( ( @@ -94,7 +91,7 @@ class StaplerServer: server.server_address[0], server.server_port, ) - return server, context is not None + return server def __upgrade_handler( self, *args: typing.Any @@ -127,37 +124,19 @@ class StaplerServer: self.token_manager.init() time.sleep(1) - def __cert_manager_background(self) -> None: - with contextlib.suppress(KeyboardInterrupt): - while True: - if ( - self.server is not None - and self.cert_manager.detect_default_cert_change(self.default_host) - and ( - context := self.cert_manager.get_https_context( - self.default_host - ) - ) - is not None - ): - self.server.socket = context.wrap_socket(self.server.socket) - time.sleep(1) - def __start_background_tasks(self) -> None: threading.Thread(target=self.__token_manager_background, daemon=True).start() - if self.https: - threading.Thread(target=self.__cert_manager_background, daemon=True).start() def run(self) -> int: self.logger.info("Version %s", project.get_version()) for line in STAPLER_ASCII.split("\n"): self.logger.debug(line.ljust(36)) self.__startup() - self.server, self.https = self.__create_base_server() - upgrade_server = self.__start_upgrade_server() if self.https else None + self.server = self.__create_base_server() + upgrade_server = self.__start_upgrade_server() if self.params.https else None self.logger.info( "Server up and ready on %s://%s", - "https" if self.https else "http", + "https" if self.params.https else "http", self.params.host, ) self.__start_background_tasks() diff --git a/tests/test_cert_manager.py b/tests/test_cert_manager.py index fe1704d..4cc6d70 100644 --- a/tests/test_cert_manager.py +++ b/tests/test_cert_manager.py @@ -167,41 +167,24 @@ class TestRegistry(BaseTestCase): lambda: self.cert_manager.get_key("localhost"), ) - def test_get_https_context_fail(self) -> None: - self.assertIsNone(self.cert_manager.get_https_context("localhost")) - - def test_get_https_context(self) -> None: - self._make_self_signed("localhost") - with ( - self.patch("ssl.create_default_context", return_value=self.context_mock), - self.mock_call( - self.context_mock.load_cert_chain, - [ - self.self_signed_path / "localhost" / CertManager.CRT_FILE, - self.self_signed_path / "localhost" / CertManager.KEY_FILE, - ], - ), - ): - self.assertEqual( - self.cert_manager.get_https_context("localhost"), self.context_mock - ) - def test_sni_callback_no_host(self) -> None: self._make_self_signed("localhost") with ( - self.patch("ssl.create_default_context", return_value=self.context_mock), - self.mock_call( - self.context_mock.load_cert_chain, - [ - self.self_signed_path / "localhost" / CertManager.CRT_FILE, - self.self_signed_path / "localhost" / CertManager.KEY_FILE, - ], - ), + self.patch("ssl.create_default_context", count=0), ): - self.cert_manager.get_https_context("localhost") - self.context_mock.sni_callback(self.socket_mock, None, self.context_mock) + self.cert_manager.sni_callback(self.socket_mock, None, self.context_mock) def test_sni_callback_fail(self) -> None: + self._make_self_signed("localhost") + with ( + self.patch("shutil.which", count=3), + self.patch("ssl.create_default_context", count=0), + ): + self.cert_manager.sni_callback( + self.socket_mock, "new_host", self.context_mock + ) + + def test_sni_callback_create_context(self) -> None: self._make_self_signed("localhost") with ( self.patch("ssl.create_default_context", return_value=self.context_mock), @@ -212,53 +195,26 @@ class TestRegistry(BaseTestCase): self.self_signed_path / "localhost" / CertManager.KEY_FILE, ], ), - self.patch("shutil.which", count=3), - ): - self.cert_manager.get_https_context("localhost") - self.assertRaises( - CertManagerError, - lambda: self.context_mock.sni_callback( - self.socket_mock, "new_host", self.context_mock - ), - ) - - def test_sni_callback_change_context(self) -> None: - self._make_self_signed("localhost") - self._make_self_signed("new_host") - with ( - self.patch( - "ssl.create_default_context", return_value=self.context_mock, count=2 - ), - self.mock_calls( - self.context_mock.load_cert_chain, - [ - [ - self.self_signed_path / "localhost" / CertManager.CRT_FILE, - self.self_signed_path / "localhost" / CertManager.KEY_FILE, - ], - [ - self.self_signed_path / "new_host" / CertManager.CRT_FILE, - self.self_signed_path / "new_host" / CertManager.KEY_FILE, - ], - ], - ), self.patch("shutil.which", count=0), ): - self.cert_manager.get_https_context("localhost") - self.context_mock.sni_callback( - self.socket_mock, "new_host", self.context_mock + self.cert_manager.sni_callback( + self.socket_mock, "localhost", self.context_mock ) - def test_detect_default_cert_change(self) -> None: + def test_sni_callback_create_context_fail(self) -> None: self._make_self_signed("localhost") - assert self.cert_manager.detect_default_cert_change("localhost") - - def test_detect_default_cert_change_nothing(self) -> None: - self._make_self_signed("localhost") - self.cert_manager.last_file_change = ( - (self.self_signed_path / "localhost" / CertManager.CRT_FILE).stat().st_mtime - ) - assert not self.cert_manager.detect_default_cert_change("localhost") + with ( + self.patch("ssl.create_default_context", return_value=self.context_mock), + self.patch("shutil.which", count=0), + ): + self.context_mock.load_cert_chain.side_effect = Exception + self.cert_manager.sni_callback( + self.socket_mock, "localhost", self.context_mock + ) + self.context_mock.load_cert_chain.assert_called_once_with( + self.self_signed_path / "localhost" / CertManager.CRT_FILE, + self.self_signed_path / "localhost" / CertManager.KEY_FILE, + ) def _make_self_signed(self, host: str) -> None: (self.self_signed_path / host).mkdir(parents=True, exist_ok=True) diff --git a/tests/test_server.py b/tests/test_server.py index a3137b4..c793bfd 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -65,44 +65,21 @@ class TestStaplerServer(BaseTestCase): self.assertEqual(self.server.run(), 0) self.token_manager.detect_file_change.assert_called_once() - def test_run_https_fail(self) -> None: - self.token_manager.detect_file_change.side_effect = KeyboardInterrupt - with ( - self.mock_call(self.registry.load_pages), - self.mock_call(self.registry.get_hosts, [], []), - self.mock_call(self.cert_manager.init, [["localhost"]]), - self.mock_call(self.data_dir.init), - self.mock_call(self.token_manager.init), - self.mock_call(self.cert_manager.get_https_context, ["localhost"]), - self.patch("http.server.ThreadingHTTPServer", self.server_mock), - self.mock_call(self.server_mock.serve_forever), - self.seal_mocks(), - ): - self.assertEqual(self.server.run(), 0) - self.token_manager.detect_file_change.assert_called_once() - def test_run_https(self) -> None: self.token_manager.detect_file_change.side_effect = KeyboardInterrupt - self.cert_manager.detect_default_cert_change.side_effect = KeyboardInterrupt + self.cert_manager.sni_callback = unittest.mock.Mock() with ( self.mock_call(self.registry.load_pages), self.mock_call(self.registry.get_hosts, [], []), self.mock_call(self.cert_manager.init, [["localhost"]]), self.mock_call(self.data_dir.init), self.mock_call(self.token_manager.init), - self.mock_call( - self.cert_manager.get_https_context, - ["localhost"], - self.context_mock, - ), + self.patch("ssl.create_default_context", return_value=self.context_mock), self.patch("http.server.ThreadingHTTPServer", self.server_mock, 2), self.mock_call_unchecked(self.context_mock.wrap_socket), self.mock_calls_unchecked(self.server_mock.serve_forever, 2), self.mock_call(self.server_mock.shutdown), - self.seal_mocks(self.context_mock), + self.seal_mocks(), ): self.assertEqual(self.server.run(), 0) self.token_manager.detect_file_change.assert_called_once() - self.cert_manager.detect_default_cert_change.assert_called_once_with( - "localhost" - )