feat: cert_manager detect file change

This commit is contained in:
2026-04-20 19:55:35 +02:00
parent 5ff397e6d1
commit 7f02abca1a
7 changed files with 65 additions and 21 deletions
+10
View File
@@ -17,6 +17,7 @@ class CertManager:
__slots__ = [
"certbot_conf",
"certbot_www",
"last_file_change",
"logger",
"self_signed_path",
"with_certbot",
@@ -32,6 +33,7 @@ class CertManager:
self.certbot_www: pathlib.Path = pathlib.Path(params.certbot_www)
self.self_signed_path: pathlib.Path = pathlib.Path(params.self_signed_path)
self.with_certbot: bool = params.with_certbot
self.last_file_change: int | float = 0
def init(self, hosts: list[str]) -> None:
self.logger.debug("Initializing...")
@@ -187,6 +189,7 @@ class CertManager:
return None
cert_file = self.get_cert(default_host)
key_file = self.get_key(default_host)
self.last_file_change = cert_file.stat().st_mtime
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(
cert_file,
@@ -195,6 +198,13 @@ class CertManager:
context.sni_callback = self.__sni_callback
return context
def detect_default_cert_change(self, default_host: str) -> bool:
cert_file = self.get_cert(default_host)
if cert_file.exists() and cert_file.stat().st_mtime != self.last_file_change:
self.logger.debug("Detected change: %s", cert_file)
return True
return False
def __sni_callback(
self, socket: ssl.SSLObject, host: str | None, _: ssl.SSLContext, /
) -> None | int:
+31 -6
View File
@@ -2,6 +2,7 @@ import contextlib
import http.server
import logging
import threading
import time
import typing
from . import (
@@ -24,9 +25,11 @@ class StaplerServer:
"cert_manager",
"data_dir",
"default_host",
"https",
"logger",
"params",
"registry",
"server",
"token_manager",
]
@@ -38,6 +41,8 @@ class StaplerServer:
self.token_manager: TokenManager = TokenManager(params, self.registry)
self.data_dir: DataDir = DataDir(params.data_dir)
self.default_host: str = params.host.split(":", maxsplit=2)[0]
self.server: http.server.ThreadingHTTPServer | None = None
self.https = params.https
def __get_all_hosts(self) -> list[str]:
return [self.default_host, *self.registry.get_hosts()]
@@ -115,29 +120,49 @@ class StaplerServer:
threading.Thread(target=server.serve_forever, daemon=True).start()
return server
def __token_manager_background(self) -> None:
def __token_manager_background(self) -> None: # pragma: no cover
with contextlib.suppress(KeyboardInterrupt):
while True:
self.token_manager.detect_file_change()
if self.token_manager.detect_file_change():
self.token_manager.init()
time.sleep(1)
def __cert_manager_background(self) -> None: # pragma: no cover
with contextlib.suppress(KeyboardInterrupt):
while True:
if (
self.server is not None
and self.cert_manager.detect_default_cert_change(self.default_host)
and (
context := self.cert_manager.get_https_context(
self.default_host
)
)
is not None
):
self.server.socket = context.wrap_socket(self.server.socket)
time.sleep(1)
def __start_background_tasks(self) -> None:
threading.Thread(target=self.__token_manager_background, daemon=True).start()
if self.https:
threading.Thread(target=self.__cert_manager_background, daemon=True).start()
def run(self) -> int:
self.logger.info("Version %s", project.get_version())
for line in STAPLER_ASCII.split("\n"):
self.logger.debug(line.ljust(36))
self.__startup()
base_server, https = self.__create_base_server()
upgrade_server = self.__start_upgrade_server() if https else None
self.server, self.https = self.__create_base_server()
upgrade_server = self.__start_upgrade_server() if self.https else None
self.logger.info(
"Server up and ready on %s://%s",
"https" if https else "http",
"https" if self.https else "http",
self.params.host,
)
self.__start_background_tasks()
with contextlib.suppress(KeyboardInterrupt):
base_server.serve_forever()
self.server.serve_forever()
self.logger.info("Shutting down...")
if upgrade_server is not None:
upgrade_server.shutdown()
+4 -6
View File
@@ -60,13 +60,11 @@ class TokenManager:
self.logger.warning("NEW TOKEN: %s", new_token)
self.logger.warning("Please copy this secret value before it disappears")
def detect_file_change(self) -> None:
if (
not self.tokens_file.exists()
or self.tokens_file.stat().st_mtime != self.last_file_change
):
def detect_file_change(self) -> bool:
if self.tokens_file.stat().st_mtime != self.last_file_change:
self.logger.debug("Detected change: %s", self.tokens_file)
self.init()
return True
return False
def __hash_token(self, token: str) -> str:
return hashlib.sha512(