feat: certificate manager
This commit is contained in:
@@ -11,6 +11,7 @@ ENV MAX_SIZE=2000000
|
||||
ENV BIND=0.0.0.0
|
||||
ENV CERTBOT_CONF=/etc/letsencrypt
|
||||
ENV CERTBOT_WWW=/data/.certbot
|
||||
ENV SELF_SIGNED_PATH=/data/.certificates
|
||||
|
||||
RUN apk add --no-cache \
|
||||
openssl \
|
||||
|
||||
@@ -74,7 +74,7 @@ curl -X DELETE \
|
||||
- [x] ignore .gitignore/.host etc at root
|
||||
- [x] cerbot install in container + path env/arg
|
||||
- [x] redirect /.well-known/acme-challenge to specific path
|
||||
- [ ] certbot/self-signed create/renew in specific dir
|
||||
- [x] certbot/self-signed create/renew in specific dir
|
||||
- [x] better logger
|
||||
- [ ] renew command
|
||||
- [ ] https mode w/ multiple hosts
|
||||
@@ -84,6 +84,7 @@ curl -X DELETE \
|
||||
- [ ] deliver visits in /page/visits
|
||||
- [x] better error page
|
||||
- [ ] add favicon.ico + special path
|
||||
- [ ] [http.server security](https://docs.python.org/3/library/http.server.html#http-server-security)
|
||||
|
||||
### Makefile targets
|
||||
|
||||
|
||||
@@ -1,2 +0,0 @@
|
||||
*
|
||||
!.gitignore
|
||||
@@ -1,3 +1,2 @@
|
||||
*
|
||||
!.gitignore
|
||||
!.certbot
|
||||
+1
-1
@@ -16,4 +16,4 @@ dev = [
|
||||
|
||||
[tool.ruff.lint]
|
||||
select = ["ALL"]
|
||||
ignore = ["D", "E501", "S104", "PLR2004", "ANN401", "BLE001", "COM812"]
|
||||
ignore = ["D", "E501", "S104", "PLR2004", "ANN401", "BLE001", "COM812", "S603"]
|
||||
+162
@@ -0,0 +1,162 @@
|
||||
import logging
|
||||
import pathlib
|
||||
import shutil
|
||||
import subprocess
|
||||
import typing
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from . import params
|
||||
|
||||
|
||||
class CertManagerError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class CertManager:
|
||||
SELF_SIGNED_DAYS = 30
|
||||
CRT_FILE = "fullchain.pem"
|
||||
KEY_FILE = "privkey.pem"
|
||||
|
||||
def __init__(self, params: params.Parameters) -> None:
|
||||
self.logger = logging.getLogger(self.__class__.__name__)
|
||||
self.certbot_conf = pathlib.Path(params.certbot_conf)
|
||||
self.certbot_www = pathlib.Path(params.certbot_www)
|
||||
self.self_signed_path = pathlib.Path(params.self_signed_path)
|
||||
self.with_certbot = params.with_certbot
|
||||
|
||||
def init(self, hosts: list[str]) -> None:
|
||||
if not self.certbot_www.exists():
|
||||
self.certbot_www.mkdir(parents=True)
|
||||
self.logger.debug("Created %s", self.certbot_www)
|
||||
if not self.self_signed_path.exists():
|
||||
self.self_signed_path.mkdir(parents=True)
|
||||
self.logger.debug("Created %s", self.self_signed_path)
|
||||
for host in hosts:
|
||||
self.init_cert(host)
|
||||
|
||||
def exists(self, host: str) -> bool:
|
||||
return self.__exists_certbot(host) or self.__exists_self_signed(host)
|
||||
|
||||
def init_cert(self, host: str) -> bool:
|
||||
if not self.exists(host):
|
||||
return self.__create_self_signed(host)
|
||||
self.logger.debug("Certificate exists for %s", host)
|
||||
return False
|
||||
|
||||
def create_or_update(self, host: str) -> bool:
|
||||
created = self.init_cert(host)
|
||||
if self.with_certbot and self.__create_certbot(host):
|
||||
return True
|
||||
return created or self.__create_self_signed(host)
|
||||
|
||||
def get_pem(self, host: str) -> pathlib.Path | None:
|
||||
if self.__exists_certbot(host):
|
||||
return self.__certbot_file(host, self.CRT_FILE)
|
||||
if self.__exists_self_signed(host):
|
||||
return self.__self_signed_file(host, self.CRT_FILE)
|
||||
return None
|
||||
|
||||
def get_key(self, host: str) -> pathlib.Path | None:
|
||||
if self.__exists_certbot(host):
|
||||
return self.__certbot_file(host, self.KEY_FILE)
|
||||
if self.__exists_self_signed(host):
|
||||
return self.__self_signed_file(host, self.KEY_FILE)
|
||||
return None
|
||||
|
||||
def __self_signed_file(self, host: str, file: str) -> pathlib.Path:
|
||||
return self.self_signed_path / host / file
|
||||
|
||||
def __exists_self_signed(self, host: str) -> bool:
|
||||
return (
|
||||
self.__self_signed_file(host, self.CRT_FILE).is_file()
|
||||
and self.__self_signed_file(host, self.KEY_FILE).is_file()
|
||||
)
|
||||
|
||||
def __get_openssl_bin(self) -> str:
|
||||
binary_path = shutil.which("openssl")
|
||||
if binary_path is None:
|
||||
msg = "Cannot find 'openssl' binary in PATH"
|
||||
raise CertManagerError(msg)
|
||||
return binary_path
|
||||
|
||||
def __create_self_signed(self, host: str) -> bool:
|
||||
cert_path = self.self_signed_path / host
|
||||
if not cert_path.exists():
|
||||
cert_path.mkdir(parents=True)
|
||||
cert_host: str = host
|
||||
if ":" in host:
|
||||
cert_host = host.split(":", maxsplit=2)[0]
|
||||
try:
|
||||
# openssl req -new -newkey rsa:2048 -days 30 -nodes -x509 -keyout server.key -out server.crt
|
||||
subprocess.run(
|
||||
[
|
||||
self.__get_openssl_bin(),
|
||||
"req",
|
||||
"-new",
|
||||
"-newkey",
|
||||
"rsa:4096",
|
||||
"-days",
|
||||
str(self.SELF_SIGNED_DAYS),
|
||||
"-nodes",
|
||||
"-x509",
|
||||
"-keyout",
|
||||
cert_path / "privkey.pem",
|
||||
"-out",
|
||||
cert_path / "fullchain.pem",
|
||||
"-subj",
|
||||
f"/C=/ST=/L=/O=/OU=/CN={cert_host}",
|
||||
],
|
||||
check=True,
|
||||
)
|
||||
self.logger.info("Created self-signed certificate for %s", host)
|
||||
except subprocess.CalledProcessError:
|
||||
self.logger.exception(
|
||||
"Could not create self-signed certificate for %s", host
|
||||
)
|
||||
return False
|
||||
return self.__exists_self_signed(host)
|
||||
|
||||
def __certbot_file(self, host: str, file: str) -> pathlib.Path:
|
||||
return self.certbot_conf / "live" / host / file
|
||||
|
||||
def __exists_certbot(self, host: str) -> bool:
|
||||
return (
|
||||
self.with_certbot
|
||||
and self.__certbot_file(host, self.CRT_FILE).is_file()
|
||||
and self.__certbot_file(host, self.KEY_FILE).is_file()
|
||||
)
|
||||
|
||||
def __get_certbot_bin(self) -> str:
|
||||
binary_path = shutil.which("certbot")
|
||||
if binary_path is None:
|
||||
msg = "Cannot find 'certbot' binary in PATH"
|
||||
raise CertManagerError(msg)
|
||||
return binary_path
|
||||
|
||||
def __create_certbot(self, host: str) -> bool:
|
||||
cert_host: str = host
|
||||
if ":" in host:
|
||||
cert_host = host.split(":", maxsplit=2)[0]
|
||||
try:
|
||||
# certonly -v --webroot --webroot-path=/var/www/certbot --agree-tos --no-eff-email -n --force-renewal --expand
|
||||
subprocess.run(
|
||||
[
|
||||
self.__get_certbot_bin(),
|
||||
"--non-interactive",
|
||||
"--agree-tos",
|
||||
"--webroot",
|
||||
"--webroot-path",
|
||||
self.certbot_www,
|
||||
"--no-eff-email",
|
||||
"--cert-name",
|
||||
host,
|
||||
"--domain",
|
||||
cert_host,
|
||||
],
|
||||
check=True,
|
||||
)
|
||||
self.logger.info("Created certbot certificate for %s", host)
|
||||
except subprocess.CalledProcessError:
|
||||
self.logger.exception("Could not create certbot certificate for %s", host)
|
||||
return False
|
||||
return self.__exists_certbot(host)
|
||||
@@ -15,6 +15,9 @@ class Parameters:
|
||||
max_size_bytes: int
|
||||
certbot_conf: str
|
||||
certbot_www: str
|
||||
self_signed_path: str
|
||||
with_certbot: bool
|
||||
with_certificates: bool
|
||||
debug: bool
|
||||
|
||||
@classmethod
|
||||
@@ -146,6 +149,27 @@ def parse_parameters() -> Parameters:
|
||||
default="./data/.certbot",
|
||||
help_txt="Certbot www dir",
|
||||
)
|
||||
__add_arg_str(
|
||||
parser,
|
||||
"--self-signed-path",
|
||||
env_var="SELF_SIGNED_PATH",
|
||||
default="./data/.certificates",
|
||||
help_txt="Self-signed certificates dir",
|
||||
)
|
||||
parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
|
||||
parser.add_argument(
|
||||
"--certbot",
|
||||
action=argparse.BooleanOptionalAction,
|
||||
help="Use Certbot (default: true)",
|
||||
default=True,
|
||||
dest="with_certbot",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--certificates",
|
||||
action=argparse.BooleanOptionalAction,
|
||||
help="Handle certificates (default: true)",
|
||||
default=True,
|
||||
dest="with_certificates",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
return Parameters.from_namespace(args)
|
||||
|
||||
@@ -19,6 +19,9 @@ class Registry:
|
||||
for path in self.data_dir.list_paths():
|
||||
self.add(path)
|
||||
|
||||
def get_hosts(self) -> list[str]:
|
||||
return [p.host for p in self.pages.values() if p.host is not None]
|
||||
|
||||
def add(self, path: str) -> None:
|
||||
self.pages[path] = page.Page(
|
||||
path,
|
||||
|
||||
+4
-8
@@ -1,10 +1,9 @@
|
||||
import contextlib
|
||||
import http.server
|
||||
import logging
|
||||
import pathlib
|
||||
import typing
|
||||
|
||||
from . import handler, project, registry
|
||||
from . import cert, handler, project, registry
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from . import params
|
||||
@@ -15,6 +14,7 @@ class StaplerServer:
|
||||
self.logger = logging.getLogger(self.__class__.__name__)
|
||||
self.params = params
|
||||
self.registry = registry.Registry(params)
|
||||
self.cert_manager = cert.CertManager(params)
|
||||
self.server = http.server.ThreadingHTTPServer(
|
||||
(params.bind, params.port),
|
||||
self.request_handler,
|
||||
@@ -23,15 +23,11 @@ class StaplerServer:
|
||||
def request_handler(self, *args: typing.Any) -> http.server.BaseHTTPRequestHandler:
|
||||
return handler.RequestHandler(*args, params=self.params, registry=self.registry)
|
||||
|
||||
def __init_certbot_www(self) -> None:
|
||||
certbot_www_path = pathlib.Path(self.params.certbot_www)
|
||||
if not certbot_www_path.exists():
|
||||
certbot_www_path.mkdir(parents=True)
|
||||
|
||||
def __startup(self) -> None:
|
||||
self.logger.info("Starting up...")
|
||||
self.registry.load_pages()
|
||||
self.__init_certbot_www()
|
||||
if self.params.with_certificates:
|
||||
self.cert_manager.init([self.params.host, *self.registry.get_hosts()])
|
||||
|
||||
def start(self) -> None:
|
||||
self.logger.info("Version %s", project.get_version())
|
||||
|
||||
Reference in New Issue
Block a user