refactor!: use all ruff rules

This commit is contained in:
2026-04-12 16:46:12 +02:00
parent 1a165876fe
commit 5bb9725000
9 changed files with 211 additions and 156 deletions
+1 -1
View File
@@ -3,7 +3,7 @@ from src.params import parse_parameters
from src.server import StaplerServer from src.server import StaplerServer
def main(): def main() -> None:
params = parse_parameters() params = parse_parameters()
setup_logs(params) setup_logs(params)
server = StaplerServer(params) server = StaplerServer(params)
+2 -2
View File
@@ -15,5 +15,5 @@ dev = [
] ]
[tool.ruff.lint] [tool.ruff.lint]
select = ["E", "F", "C", "W", "I"] select = ["ALL"]
ignore = ["E501"] ignore = ["D", "E501", "S104", "PLR2004", "ANN401", "BLE001", "COM812"]
+40 -35
View File
@@ -1,69 +1,74 @@
import io
import logging import logging
import os import pathlib
import re import re
import shutil import shutil
import tarfile import tarfile
import typing
if typing.TYPE_CHECKING:
import io
class DataDir: class DataDir:
HOST_FILE = ".host" HOST_FILE = ".host"
PATH_REGEX = re.compile(r"^[\w-]+$") PATH_REGEX = re.compile(r"^[\w-]+$")
def __init__(self, root_path: str): def __init__(self, root_path: str) -> None:
self.logger = logging.getLogger(self.__class__.__name__) self.logger = logging.getLogger(self.__class__.__name__)
self.root_path = root_path self.root_path = pathlib.Path(root_path)
def list_paths(self) -> list[str]: def list_paths(self) -> list[str]:
paths: list[str] = [] paths: list[str] = []
for path in os.listdir(self.root_path): for path in self.root_path.iterdir():
if self.__valid_path(path): if self.exists(path.name):
paths += [path] paths += [path.name]
return paths return paths
def __valid_path(self, path: str, exists: bool = True) -> bool: def __valid_path(self, path: str) -> bool:
return ( return self.PATH_REGEX.match(path) is not None
not exists or os.path.isdir(os.path.join(self.root_path, path))
) and self.PATH_REGEX.match(path) is not None
def set_host(self, path: str, host: str): def set_host(self, path: str, host: str) -> None:
if self.__valid_path(path): if self.exists(path):
path_host = os.path.join(self.root_path, path, self.HOST_FILE) path_host = self.root_path / path / self.HOST_FILE
with open(path_host, mode="w") as host_file: with path_host.open(mode="w") as host_file:
host_file.write(host) host_file.write(host)
self.logger.debug("Wrote %s", path_host) self.logger.debug("Wrote %s", path_host)
def has_index(self, path: str): def has_index(self, path: str) -> bool:
if self.__valid_path(path): if self.exists(path):
path_index = os.path.join(self.root_path, path, "index.html") path_index = self.root_path / path / "index.html"
return os.path.exists(path_index) and os.path.isfile(path_index) return path_index.is_file()
return False
def get_host(self, path: str): def get_host(self, path: str) -> str | None:
if self.__valid_path(path): if self.exists(path):
path_host = os.path.join(self.root_path, path, self.HOST_FILE) path_host = self.root_path / path / self.HOST_FILE
if os.path.exists(path_host) and os.path.isfile(path_host): if path_host.is_file():
try: try:
with open(path_host) as host_file: with path_host.open() as host_file:
return host_file.read().split("\n")[0].strip() return host_file.read().split("\n")[0].strip()
except Exception: except Exception:
pass self.logger.exception("Cannot read %s", path_host)
return None return None
return None
def extract_tar_bytes(self, path: str, tar_bytes: io.BytesIO): def extract_tar_bytes(self, path: str, tar_bytes: io.BytesIO) -> None:
if self.__valid_path(path, exists=False): if self.__valid_path(path):
target_path = os.path.join(self.root_path, path) target_path = self.root_path / path
with tarfile.open(fileobj=tar_bytes) as tar_file: with tarfile.open(fileobj=tar_bytes) as tar_file:
if os.path.exists(target_path): if target_path.exists():
shutil.rmtree(target_path) shutil.rmtree(target_path)
self.logger.debug("Deleted %s", target_path) self.logger.debug("Deleted %s", target_path)
tar_file.extractall(target_path) tar_file.extractall(target_path, filter="data")
self.logger.debug("Extracted tar to %s", target_path) self.logger.debug("Extracted tar to %s", target_path)
def remove(self, path: str): def remove(self, path: str) -> None:
if self.__valid_path(path): if self.exists(path):
target_path = os.path.join(self.root_path, path) target_path = self.root_path / path
shutil.rmtree(target_path) shutil.rmtree(target_path)
self.logger.debug("Deleted %s", target_path) self.logger.debug("Deleted %s", target_path)
def exists(self, path: str): def exists(self, path: str) -> bool:
return self.__valid_path(path) return (
self.PATH_REGEX.match(path) is not None and (self.root_path / path).is_dir()
)
+73 -48
View File
@@ -3,11 +3,15 @@ import http.server
import io import io
import logging import logging
import os import os
import pathlib
import re import re
import tarfile import tarfile
import typing import typing
from . import data_dir, logs, params, project, registry from . import data_dir, logs, project
if typing.TYPE_CHECKING:
from . import params, registry
class RequestHandler(http.server.SimpleHTTPRequestHandler): class RequestHandler(http.server.SimpleHTTPRequestHandler):
@@ -18,8 +22,12 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler):
@typing.override @typing.override
def __init__( def __init__(
self, *args, params: params.Parameters, registry: registry.Registry, **kwargs self,
): *args: typing.Any,
params: params.Parameters,
registry: registry.Registry,
**kwargs: dict[str, typing.Any],
) -> None:
self.logger = logging.getLogger(self.__class__.__name__) self.logger = logging.getLogger(self.__class__.__name__)
self.default_host = params.host self.default_host = params.host
self.token = params.token self.token = params.token
@@ -31,28 +39,28 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler):
super().__init__(*args, directory=params.data_dir, **kwargs) super().__init__(*args, directory=params.data_dir, **kwargs)
@typing.override @typing.override
def do_HEAD(self): def do_HEAD(self) -> None:
self.__pre_log_request() self.__pre_log_request()
super().do_HEAD() super().do_HEAD()
@typing.override @typing.override
def do_GET(self): def do_GET(self) -> None:
self.__pre_log_request() self.__pre_log_request()
if self.path == "/" and self.__get_host() == self.default_host: if self.path == "/" and self.__get_host() == self.default_host:
return self.__server_index() return self.__server_index()
super().do_GET() super().do_GET()
return None
def do_PUT(self): def do_PUT(self) -> None:
self.__pre_log_request() self.__pre_log_request()
if self.headers["X-Token"] != self.token: if (sub_path := self.__check_update_request()) is None:
return self.send_error(http.HTTPStatus.UNAUTHORIZED, "Invalid token") return None
if (sub_path := self.__get_subpath()) is None:
return self.send_error(http.HTTPStatus.BAD_REQUEST, "Invalid path")
if (content_length := self.__get_length()) == 0: if (content_length := self.__get_length()) == 0:
return self.send_error(http.HTTPStatus.LENGTH_REQUIRED, "No body found") return self.send_error(http.HTTPStatus.LENGTH_REQUIRED, "No body found")
if content_length > self.max_size_bytes: if content_length > self.max_size_bytes:
return self.send_error( return self.send_error(
http.HTTPStatus.CONTENT_TOO_LARGE, "Archive too large" http.HTTPStatus.CONTENT_TOO_LARGE,
"Archive too large",
) )
try: try:
file_bytes = io.BytesIO(self.rfile.read(content_length)) file_bytes = io.BytesIO(self.rfile.read(content_length))
@@ -62,32 +70,35 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler):
except Exception as e: except Exception as e:
return self.send_error(http.HTTPStatus.INTERNAL_SERVER_ERROR, str(e)) return self.send_error(http.HTTPStatus.INTERNAL_SERVER_ERROR, str(e))
self.__send_status_only( self.__send_status_only(
http.HTTPStatus.CREATED, f"Resource /{sub_path}/ updated" http.HTTPStatus.CREATED,
f"Resource /{sub_path}/ updated",
) )
if self.headers["X-Host"] is not None: if self.headers["X-Host"] is not None:
self.registry.set_host(sub_path, self.headers["X-Host"]) self.registry.set_host(sub_path, self.headers["X-Host"])
self.registry.add(sub_path) self.registry.add(sub_path)
return None
def do_DELETE(self): def do_DELETE(self) -> None:
self.__pre_log_request() self.__pre_log_request()
if self.headers["X-Token"] != self.token: if (sub_path := self.__check_update_request()) is None:
return self.send_error(http.HTTPStatus.UNAUTHORIZED, "Invalid token") return None
if (sub_path := self.__get_subpath()) is None:
return self.send_error(http.HTTPStatus.BAD_REQUEST, "Invalid path")
if not self.data_dir.exists(sub_path): if not self.data_dir.exists(sub_path):
return self.send_error(http.HTTPStatus.NOT_FOUND, "Not found") self.send_error(http.HTTPStatus.NOT_FOUND, "Not found")
return None
try: try:
self.data_dir.remove(sub_path) self.data_dir.remove(sub_path)
except Exception as e: except Exception as e:
return self.send_error(http.HTTPStatus.INTERNAL_SERVER_ERROR, str(e)) return self.send_error(http.HTTPStatus.INTERNAL_SERVER_ERROR, str(e))
self.__send_status_only( self.__send_status_only(
http.HTTPStatus.NO_CONTENT, f"Resource /{sub_path}/ removed" http.HTTPStatus.NO_CONTENT,
f"Resource /{sub_path}/ removed",
) )
self.registry.remove(sub_path) self.registry.remove(sub_path)
return None
@typing.override @typing.override
def list_directory(self, *_, **__): def list_directory(self, *_: typing.Any, **__: typing.Any) -> None:
"""Disable default directory listing""" """Disable default directory listing."""
self.send_error(http.HTTPStatus.NOT_FOUND, "File not found") self.send_error(http.HTTPStatus.NOT_FOUND, "File not found")
@typing.override @typing.override
@@ -97,16 +108,19 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler):
if (page := self.registry.get_from_host(self.__get_host())) is not None: if (page := self.registry.get_from_host(self.__get_host())) is not None:
path = f"/{page.path}" + path path = f"/{page.path}" + path
path = super().translate_path(path) path = super().translate_path(path)
if self.__get_subpath(match_full=False) is None: # not a valid path if self.__get_subpath() is None: # not a valid path
return "" return ""
if os.path.basename(path).startswith("."): # hidden files if pathlib.Path(path).name.startswith("."): # hidden files
return "" return ""
return path return path
@typing.override @typing.override
def send_error( def send_error(
self, code: int, message: str | None = None, explain: str | None = None self,
): code: int,
message: str | None = None,
explain: str | None = None,
) -> None:
shortmsg, longmsg = self.responses[code] shortmsg, longmsg = self.responses[code]
if message is None: if message is None:
message = shortmsg message = shortmsg
@@ -122,15 +136,17 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler):
self.__send_status_only(code, message) self.__send_status_only(code, message)
@typing.override @typing.override
def log_message(self, format: str, *args): def log_message(self, format: str, *args: typing.Any) -> None:
self.logger.info("%s - " + format, self.address_string(), *args) fmt = "%s - " + format
self.logger.info(fmt, self.address_string(), *args)
@typing.override @typing.override
def log_error(self, format: str, *args): def log_error(self, format: str, *args: typing.Any) -> None:
self.logger.error("%s - " + format, self.address_string(), *args) fmt = "%s - " + format
self.logger.error(fmt, self.address_string(), *args)
@typing.override @typing.override
def log_request(self, code="?", size=""): def log_request(self, code: str = "?", size: str = "-") -> None: # ty:ignore[invalid-method-override]
if isinstance(code, http.HTTPStatus): if isinstance(code, http.HTTPStatus):
color = logs.TermColor.RED color = logs.TermColor.RED
if 100 <= code < 200: if 100 <= code < 200:
@@ -145,26 +161,36 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler):
if size == "" and self.out_size > 0: if size == "" and self.out_size > 0:
size = str(self.out_size) size = str(self.out_size)
args = (code, self.address_string(), self.__get_host(), self.requestline) args = (code, self.address_string(), self.__get_host(), self.requestline)
format = "%s - %s - %s - %s" fmt = "%s - %s - %s - %s"
if size != "": if size != "":
args = (*args, size) args = (*args, size)
format += " - %s" fmt += " - %s"
self.logger.info(format, *args) self.logger.info(fmt, *args)
def __pre_log_request(self): def __pre_log_request(self) -> None:
args = ("...", self.address_string(), self.__get_host(), self.requestline) args = ("...", self.address_string(), self.__get_host(), self.requestline)
format = "%s - %s - %s - %s" fmt = "%s - %s - %s - %s"
if (size := self.__get_length()) > 0: if (size := self.__get_length()) > 0:
args = (*args, size) args = (*args, size)
format += " - %s" fmt += " - %s"
self.logger.debug(format, *args) self.logger.debug(fmt, *args)
def __get_subpath(self, match_full: bool = True) -> str | None: def __check_update_request(self) -> str | None:
if match_full: if self.headers["X-Token"] != self.token:
match = self.PATH_REGEX.fullmatch(self.path) self.send_error(http.HTTPStatus.UNAUTHORIZED, "Invalid token")
else: return None
match = self.PATH_REGEX.match(self.path) if (sub_path := self.__get_subpath_full()) is None:
if match is not None: self.send_error(http.HTTPStatus.BAD_REQUEST, "Invalid path")
return None
return sub_path
def __get_subpath(self) -> str | None:
if (match := self.PATH_REGEX.match(self.path)) is not None:
return match.group(1)
return None
def __get_subpath_full(self) -> str | None:
if (match := self.PATH_REGEX.fullmatch(self.path)) is not None:
return match.group(1) return match.group(1)
return None return None
@@ -176,10 +202,9 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler):
def __get_length(self) -> int: def __get_length(self) -> int:
if not self.headers["Content-Length"]: if not self.headers["Content-Length"]:
return 0 return 0
else: return int(self.headers["Content-Length"])
return int(self.headers["Content-Length"])
def __server_index(self): def __server_index(self) -> None:
self.__send_basic_body(self.server_version + "\n") self.__send_basic_body(self.server_version + "\n")
def __send_basic_body( def __send_basic_body(
@@ -188,7 +213,7 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler):
content_type: str = "text/plain", content_type: str = "text/plain",
code: int = http.HTTPStatus.OK, code: int = http.HTTPStatus.OK,
message: str | None = None, message: str | None = None,
): ) -> None:
encoded: bytes = body.encode() encoded: bytes = body.encode()
self.out_size = len(encoded) self.out_size = len(encoded)
self.send_response(code, message) self.send_response(code, message)
@@ -197,7 +222,7 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler):
self.end_headers() self.end_headers()
self.wfile.write(encoded) self.wfile.write(encoded)
def __send_status_only(self, code: int, message: str | None = None): def __send_status_only(self, code: int, message: str | None = None) -> None:
self.send_response(code, message) self.send_response(code, message)
self.send_header("Content-Length", "0") self.send_header("Content-Length", "0")
self.end_headers() self.end_headers()
+15 -14
View File
@@ -2,7 +2,8 @@ import enum
import logging import logging
import typing import typing
from . import params if typing.TYPE_CHECKING:
from . import params
class TermColor(enum.StrEnum): class TermColor(enum.StrEnum):
@@ -24,11 +25,12 @@ class TermColor(enum.StrEnum):
def __str__(self) -> str: def __str__(self) -> str:
return f"\033[{self.value}m" return f"\033[{self.value}m"
def __add__(self, second): @typing.override
return str(self) + str(second) def __add__(self, value: typing.Any, /) -> str: # ty:ignore[invalid-method-override]
return str(self) + str(value)
def __radd__(self, second): def __radd__(self, value: typing.Any, /) -> str:
return str(second) + str(self) return str(value) + str(self)
class ColoredLoggingFormatter(logging.Formatter): class ColoredLoggingFormatter(logging.Formatter):
@@ -42,7 +44,7 @@ class ColoredLoggingFormatter(logging.Formatter):
+ TermColor.RESET + TermColor.RESET
) )
FORMAT_COLORS = { FORMAT_COLORS: typing.ClassVar[dict[int, TermColor | str]] = {
logging.DEBUG: TermColor.FEINT + TermColor.GREY, logging.DEBUG: TermColor.FEINT + TermColor.GREY,
logging.INFO: TermColor.GREEN, logging.INFO: TermColor.GREEN,
logging.WARNING: TermColor.YELLOW, logging.WARNING: TermColor.YELLOW,
@@ -51,16 +53,15 @@ class ColoredLoggingFormatter(logging.Formatter):
} }
@typing.override @typing.override
def __init__(self, trace: bool): def __init__(self, trace: bool) -> None:
self.trace = trace self.trace = trace
super().__init__() super().__init__()
@typing.override @typing.override
def format(self, record: logging.LogRecord): def format(self, record: logging.LogRecord) -> str:
log_color: TermColor = ( log_color: TermColor | str = self.FORMAT_COLORS.get(
self.FORMAT_COLORS[record.levelno] record.levelno,
if record.levelno in self.FORMAT_COLORS TermColor.MAGENTA,
else TermColor.MAGENTA
) )
formatter = logging.Formatter( formatter = logging.Formatter(
self.pre_format self.pre_format
@@ -69,12 +70,12 @@ class ColoredLoggingFormatter(logging.Formatter):
+ self.level_format + self.level_format
+ TermColor.RESET + TermColor.RESET
+ self.post_format + self.post_format
+ (self.trace_format if self.trace else "") + (self.trace_format if self.trace else ""),
) )
return formatter.format(record) return formatter.format(record)
def setup_logs(params: params.Parameters): def setup_logs(params: params.Parameters) -> None:
stream_handler = logging.StreamHandler() stream_handler = logging.StreamHandler()
stream_handler.setFormatter(ColoredLoggingFormatter(trace=params.debug)) stream_handler.setFormatter(ColoredLoggingFormatter(trace=params.debug))
log_level = logging.INFO log_level = logging.INFO
+36 -21
View File
@@ -1,7 +1,6 @@
import argparse import argparse
import dataclasses import dataclasses
import os import os
import os.path
from . import project from . import project
@@ -19,7 +18,7 @@ class Parameters:
debug: bool debug: bool
@classmethod @classmethod
def from_namespace(cls, args: argparse.Namespace) -> "Parameters": def from_namespace(cls, args: argparse.Namespace) -> Parameters:
return Parameters(**vars(args)) return Parameters(**vars(args))
@@ -37,37 +36,48 @@ def __get_env_int(var: str, default: int) -> int:
def __add_arg_str( def __add_arg_str(
parser: argparse.ArgumentParser, *flags: str, env_var: str, default: str, help: str parser: argparse.ArgumentParser,
): *flags: str,
env_var: str,
default: str,
help_txt: str,
) -> None:
parser.add_argument( parser.add_argument(
*flags, *flags,
metavar=env_var, metavar=env_var,
default=__get_env_str(env_var, default), default=__get_env_str(env_var, default),
help=f"{help} (default: {default})", help=f"{help_txt} (default: {default})",
) )
def __add_arg_int( def __add_arg_int(
parser: argparse.ArgumentParser, *flags: str, env_var: str, default: int, help: str parser: argparse.ArgumentParser,
): *flags: str,
env_var: str,
default: int,
help_txt: str,
) -> None:
parser.add_argument( parser.add_argument(
*flags, *flags,
type=int, type=int,
metavar=env_var, metavar=env_var,
default=__get_env_int(env_var, default), default=__get_env_int(env_var, default),
help=f"{help} (default: {default})", help=f"{help_txt} (default: {default})",
) )
def __add_arg_str_required( def __add_arg_str_required(
parser: argparse.ArgumentParser, *flags: str, env_var: str, help: str parser: argparse.ArgumentParser,
): *flags: str,
env_var: str,
help_txt: str,
) -> None:
parser.add_argument( parser.add_argument(
*flags, *flags,
metavar=env_var, metavar=env_var,
required=os.getenv(env_var) is None, required=os.getenv(env_var) is None,
default=os.getenv(env_var), default=os.getenv(env_var),
help=f"{help}", help=help_txt,
) )
@@ -78,36 +88,41 @@ def parse_parameters() -> Parameters:
epilog="(Each option can be supplied with equivalent environment variable.)", epilog="(Each option can be supplied with equivalent environment variable.)",
) )
__add_arg_int( __add_arg_int(
parser, "-p", "--port", env_var="PORT", default=8080, help="server port" parser,
"-p",
"--port",
env_var="PORT",
default=8080,
help_txt="server port",
) )
__add_arg_str( __add_arg_str(
parser, parser,
"--host", "--host",
env_var="HOST", env_var="HOST",
default="localhost:8080", default="localhost:8080",
help="server default host", help_txt="server default host",
) )
__add_arg_str( __add_arg_str(
parser, parser,
"-d", "-d",
"--data-dir", "--data-dir",
env_var="DATA_DIR", env_var="DATA_DIR",
default=os.path.join(".", "data"), default="./data",
help="directory where pages are/will be stored", help_txt="directory where pages are/will be stored",
) )
__add_arg_str_required( __add_arg_str_required(
parser, parser,
"-t", "-t",
"--token", "--token",
env_var="TOKEN", env_var="TOKEN",
help="secret token for update requests", help_txt="secret token for update requests",
) )
__add_arg_int( __add_arg_int(
parser, parser,
"--max-size-bytes", "--max-size-bytes",
env_var="MAX_SIZE", env_var="MAX_SIZE",
default=2_000_000, default=2_000_000,
help="max size of accepted archives (in bytes)", help_txt="max size of accepted archives (in bytes)",
) )
__add_arg_str( __add_arg_str(
parser, parser,
@@ -115,21 +130,21 @@ def parse_parameters() -> Parameters:
"--bind", "--bind",
env_var="BIND", env_var="BIND",
default="0.0.0.0", default="0.0.0.0",
help="server bind address", help_txt="server bind address",
) )
__add_arg_str( __add_arg_str(
parser, parser,
"--certbot-conf", "--certbot-conf",
env_var="CERTBOT_CONF", env_var="CERTBOT_CONF",
default="/etc/letsencrypt", default="/etc/letsencrypt",
help="Certbot config dir", help_txt="Certbot config dir",
) )
__add_arg_str( __add_arg_str(
parser, parser,
"--certbot-www", "--certbot-www",
env_var="CERTBOT_WWW", env_var="CERTBOT_WWW",
default=os.path.join(".", "data", ".certbot"), default="./data/.certbot",
help="Certbot www dir", help_txt="Certbot www dir",
) )
parser.add_argument("--debug", action=argparse.BooleanOptionalAction) parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
args = parser.parse_args() args = parser.parse_args()
+14 -17
View File
@@ -1,29 +1,26 @@
import os.path import pathlib
import typing import typing
import toml import toml
__project_data = None
def __get_project_data() -> None | dict[str, typing.Any]: def __get_project_data() -> None | dict[str, typing.Any]:
global __project_data pyproject_toml_file = pathlib.Path(__file__) / ".." / ".." / "pyproject.toml"
if __project_data is None: if pyproject_toml_file.is_file():
pyproject_toml_file = os.path.join( try:
os.path.dirname(__file__), "..", "pyproject.toml" data = toml.load(pyproject_toml_file)
) if "project" in data:
if os.path.exists(pyproject_toml_file) and os.path.isfile(pyproject_toml_file): return data["project"]
try: except TypeError, toml.TomlDecodeError, FileNotFoundError:
data = toml.load(pyproject_toml_file) pass
if "project" in data: return None
__project_data = data["project"]
except TypeError, toml.TomlDecodeError, FileNotFoundError:
pass __project_data = __get_project_data()
return __project_data
def __get_str_value(key: str) -> str: def __get_str_value(key: str) -> str:
project_data = __get_project_data() project_data = __project_data
if project_data is not None and key in project_data: if project_data is not None and key in project_data:
return project_data[key] return project_data[key]
return "unknown" return "unknown"
+14 -7
View File
@@ -1,31 +1,37 @@
import logging import logging
import typing
from . import data_dir, page, params from . import data_dir, page
if typing.TYPE_CHECKING:
from . import params
class Registry: class Registry:
def __init__(self, params: params.Parameters): def __init__(self, params: params.Parameters) -> None:
self.logger = logging.getLogger(self.__class__.__name__) self.logger = logging.getLogger(self.__class__.__name__)
self.pages: dict[str, page.Page] = {} self.pages: dict[str, page.Page] = {}
self.data_dir = data_dir.DataDir(params.data_dir) self.data_dir = data_dir.DataDir(params.data_dir)
self.prefix = f"http://{params.host}" self.prefix = f"http://{params.host}"
def load_pages(self): def load_pages(self) -> None:
self.pages = {} self.pages = {}
for path in self.data_dir.list_paths(): for path in self.data_dir.list_paths():
self.add(path) self.add(path)
def add(self, path: str): def add(self, path: str) -> None:
self.pages[path] = page.Page( self.pages[path] = page.Page(
path, self.data_dir.has_index(path), self.data_dir.get_host(path) path,
self.data_dir.has_index(path),
self.data_dir.get_host(path),
) )
self.logger.info("Updated %s%s", self.prefix, str(self.pages[path])) self.logger.info("Updated %s%s", self.prefix, str(self.pages[path]))
def set_host(self, path: str, host: str): def set_host(self, path: str, host: str) -> None:
self.data_dir.set_host(path, host) self.data_dir.set_host(path, host)
self.pages[path].host = host self.pages[path].host = host
def remove(self, path: str): def remove(self, path: str) -> None:
page = self.pages[path] page = self.pages[path]
del self.pages[path] del self.pages[path]
self.logger.info("Removed %s%s", self.prefix, str(page)) self.logger.info("Removed %s%s", self.prefix, str(page))
@@ -34,3 +40,4 @@ class Registry:
for p in self.pages.values(): for p in self.pages.values():
if p.host == host: if p.host == host:
return p return p
return None
+16 -11
View File
@@ -1,12 +1,17 @@
import contextlib
import http.server import http.server
import logging import logging
import os import pathlib
import typing
from . import handler, params, project, registry from . import handler, project, registry
if typing.TYPE_CHECKING:
from . import params
class StaplerServer: class StaplerServer:
def __init__(self, params: params.Parameters): def __init__(self, params: params.Parameters) -> None:
self.logger = logging.getLogger(self.__class__.__name__) self.logger = logging.getLogger(self.__class__.__name__)
self.params = params self.params = params
self.registry = registry.Registry(params) self.registry = registry.Registry(params)
@@ -15,18 +20,20 @@ class StaplerServer:
self.request_handler, self.request_handler,
) )
def request_handler(self, *args) -> http.server.BaseHTTPRequestHandler: def request_handler(self, *args: typing.Any) -> http.server.BaseHTTPRequestHandler:
return handler.RequestHandler(*args, params=self.params, registry=self.registry) return handler.RequestHandler(*args, params=self.params, registry=self.registry)
def __init_certbot_www(self): def __init_certbot_www(self) -> None:
os.makedirs(self.params.certbot_www, exist_ok=True) certbot_www_path = pathlib.Path(self.params.certbot_www)
if not certbot_www_path.exists():
certbot_www_path.mkdir(parents=True)
def __startup(self): def __startup(self) -> None:
self.logger.info("Starting up...") self.logger.info("Starting up...")
self.registry.load_pages() self.registry.load_pages()
self.__init_certbot_www() self.__init_certbot_www()
def start(self): def start(self) -> None:
self.logger.info("Version %s", project.get_version()) self.logger.info("Version %s", project.get_version())
self.__startup() self.__startup()
self.logger.info( self.logger.info(
@@ -38,7 +45,5 @@ class StaplerServer:
"Server up and ready on http://%s", "Server up and ready on http://%s",
self.params.host, self.params.host,
) )
try: with contextlib.suppress(KeyboardInterrupt):
self.server.serve_forever() self.server.serve_forever()
except KeyboardInterrupt:
pass