diff --git a/main.py b/main.py index e0c98df..1156348 100644 --- a/main.py +++ b/main.py @@ -3,7 +3,7 @@ from src.params import parse_parameters from src.server import StaplerServer -def main(): +def main() -> None: params = parse_parameters() setup_logs(params) server = StaplerServer(params) diff --git a/pyproject.toml b/pyproject.toml index 7f50ba6..aa0481c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,5 +15,5 @@ dev = [ ] [tool.ruff.lint] -select = ["E", "F", "C", "W", "I"] -ignore = ["E501"] +select = ["ALL"] +ignore = ["D", "E501", "S104", "PLR2004", "ANN401", "BLE001", "COM812"] \ No newline at end of file diff --git a/src/data_dir.py b/src/data_dir.py index d300ced..78c8ffa 100644 --- a/src/data_dir.py +++ b/src/data_dir.py @@ -1,69 +1,74 @@ -import io import logging -import os +import pathlib import re import shutil import tarfile +import typing + +if typing.TYPE_CHECKING: + import io class DataDir: HOST_FILE = ".host" PATH_REGEX = re.compile(r"^[\w-]+$") - def __init__(self, root_path: str): + def __init__(self, root_path: str) -> None: self.logger = logging.getLogger(self.__class__.__name__) - self.root_path = root_path + self.root_path = pathlib.Path(root_path) def list_paths(self) -> list[str]: paths: list[str] = [] - for path in os.listdir(self.root_path): - if self.__valid_path(path): - paths += [path] + for path in self.root_path.iterdir(): + if self.exists(path.name): + paths += [path.name] return paths - def __valid_path(self, path: str, exists: bool = True) -> bool: - return ( - not exists or os.path.isdir(os.path.join(self.root_path, path)) - ) and self.PATH_REGEX.match(path) is not None + def __valid_path(self, path: str) -> bool: + return self.PATH_REGEX.match(path) is not None - def set_host(self, path: str, host: str): - if self.__valid_path(path): - path_host = os.path.join(self.root_path, path, self.HOST_FILE) - with open(path_host, mode="w") as host_file: + def set_host(self, path: str, host: str) -> None: + if self.exists(path): + path_host = self.root_path / path / self.HOST_FILE + with path_host.open(mode="w") as host_file: host_file.write(host) self.logger.debug("Wrote %s", path_host) - def has_index(self, path: str): - if self.__valid_path(path): - path_index = os.path.join(self.root_path, path, "index.html") - return os.path.exists(path_index) and os.path.isfile(path_index) + def has_index(self, path: str) -> bool: + if self.exists(path): + path_index = self.root_path / path / "index.html" + return path_index.is_file() + return False - def get_host(self, path: str): - if self.__valid_path(path): - path_host = os.path.join(self.root_path, path, self.HOST_FILE) - if os.path.exists(path_host) and os.path.isfile(path_host): + def get_host(self, path: str) -> str | None: + if self.exists(path): + path_host = self.root_path / path / self.HOST_FILE + if path_host.is_file(): try: - with open(path_host) as host_file: + with path_host.open() as host_file: return host_file.read().split("\n")[0].strip() except Exception: - pass + self.logger.exception("Cannot read %s", path_host) return None + return None - def extract_tar_bytes(self, path: str, tar_bytes: io.BytesIO): - if self.__valid_path(path, exists=False): - target_path = os.path.join(self.root_path, path) + def extract_tar_bytes(self, path: str, tar_bytes: io.BytesIO) -> None: + if self.__valid_path(path): + target_path = self.root_path / path with tarfile.open(fileobj=tar_bytes) as tar_file: - if os.path.exists(target_path): + if target_path.exists(): shutil.rmtree(target_path) self.logger.debug("Deleted %s", target_path) - tar_file.extractall(target_path) + tar_file.extractall(target_path, filter="data") self.logger.debug("Extracted tar to %s", target_path) - def remove(self, path: str): - if self.__valid_path(path): - target_path = os.path.join(self.root_path, path) + def remove(self, path: str) -> None: + if self.exists(path): + target_path = self.root_path / path shutil.rmtree(target_path) self.logger.debug("Deleted %s", target_path) - def exists(self, path: str): - return self.__valid_path(path) + def exists(self, path: str) -> bool: + return ( + self.PATH_REGEX.match(path) is not None and (self.root_path / path).is_dir() + ) diff --git a/src/handler.py b/src/handler.py index c57dd4b..5ee9080 100644 --- a/src/handler.py +++ b/src/handler.py @@ -3,11 +3,15 @@ import http.server import io import logging import os +import pathlib import re import tarfile import typing -from . import data_dir, logs, params, project, registry +from . import data_dir, logs, project + +if typing.TYPE_CHECKING: + from . import params, registry class RequestHandler(http.server.SimpleHTTPRequestHandler): @@ -18,8 +22,12 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler): @typing.override def __init__( - self, *args, params: params.Parameters, registry: registry.Registry, **kwargs - ): + self, + *args: typing.Any, + params: params.Parameters, + registry: registry.Registry, + **kwargs: dict[str, typing.Any], + ) -> None: self.logger = logging.getLogger(self.__class__.__name__) self.default_host = params.host self.token = params.token @@ -31,28 +39,28 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler): super().__init__(*args, directory=params.data_dir, **kwargs) @typing.override - def do_HEAD(self): + def do_HEAD(self) -> None: self.__pre_log_request() super().do_HEAD() @typing.override - def do_GET(self): + def do_GET(self) -> None: self.__pre_log_request() if self.path == "/" and self.__get_host() == self.default_host: return self.__server_index() super().do_GET() + return None - def do_PUT(self): + def do_PUT(self) -> None: self.__pre_log_request() - if self.headers["X-Token"] != self.token: - return self.send_error(http.HTTPStatus.UNAUTHORIZED, "Invalid token") - if (sub_path := self.__get_subpath()) is None: - return self.send_error(http.HTTPStatus.BAD_REQUEST, "Invalid path") + if (sub_path := self.__check_update_request()) is None: + return None if (content_length := self.__get_length()) == 0: return self.send_error(http.HTTPStatus.LENGTH_REQUIRED, "No body found") if content_length > self.max_size_bytes: return self.send_error( - http.HTTPStatus.CONTENT_TOO_LARGE, "Archive too large" + http.HTTPStatus.CONTENT_TOO_LARGE, + "Archive too large", ) try: file_bytes = io.BytesIO(self.rfile.read(content_length)) @@ -62,32 +70,35 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler): except Exception as e: return self.send_error(http.HTTPStatus.INTERNAL_SERVER_ERROR, str(e)) self.__send_status_only( - http.HTTPStatus.CREATED, f"Resource /{sub_path}/ updated" + http.HTTPStatus.CREATED, + f"Resource /{sub_path}/ updated", ) if self.headers["X-Host"] is not None: self.registry.set_host(sub_path, self.headers["X-Host"]) self.registry.add(sub_path) + return None - def do_DELETE(self): + def do_DELETE(self) -> None: self.__pre_log_request() - if self.headers["X-Token"] != self.token: - return self.send_error(http.HTTPStatus.UNAUTHORIZED, "Invalid token") - if (sub_path := self.__get_subpath()) is None: - return self.send_error(http.HTTPStatus.BAD_REQUEST, "Invalid path") + if (sub_path := self.__check_update_request()) is None: + return None if not self.data_dir.exists(sub_path): - return self.send_error(http.HTTPStatus.NOT_FOUND, "Not found") + self.send_error(http.HTTPStatus.NOT_FOUND, "Not found") + return None try: self.data_dir.remove(sub_path) except Exception as e: return self.send_error(http.HTTPStatus.INTERNAL_SERVER_ERROR, str(e)) self.__send_status_only( - http.HTTPStatus.NO_CONTENT, f"Resource /{sub_path}/ removed" + http.HTTPStatus.NO_CONTENT, + f"Resource /{sub_path}/ removed", ) self.registry.remove(sub_path) + return None @typing.override - def list_directory(self, *_, **__): - """Disable default directory listing""" + def list_directory(self, *_: typing.Any, **__: typing.Any) -> None: + """Disable default directory listing.""" self.send_error(http.HTTPStatus.NOT_FOUND, "File not found") @typing.override @@ -97,16 +108,19 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler): if (page := self.registry.get_from_host(self.__get_host())) is not None: path = f"/{page.path}" + path path = super().translate_path(path) - if self.__get_subpath(match_full=False) is None: # not a valid path + if self.__get_subpath() is None: # not a valid path return "" - if os.path.basename(path).startswith("."): # hidden files + if pathlib.Path(path).name.startswith("."): # hidden files return "" return path @typing.override def send_error( - self, code: int, message: str | None = None, explain: str | None = None - ): + self, + code: int, + message: str | None = None, + explain: str | None = None, + ) -> None: shortmsg, longmsg = self.responses[code] if message is None: message = shortmsg @@ -122,15 +136,17 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler): self.__send_status_only(code, message) @typing.override - def log_message(self, format: str, *args): - self.logger.info("%s - " + format, self.address_string(), *args) + def log_message(self, format: str, *args: typing.Any) -> None: + fmt = "%s - " + format + self.logger.info(fmt, self.address_string(), *args) @typing.override - def log_error(self, format: str, *args): - self.logger.error("%s - " + format, self.address_string(), *args) + def log_error(self, format: str, *args: typing.Any) -> None: + fmt = "%s - " + format + self.logger.error(fmt, self.address_string(), *args) @typing.override - def log_request(self, code="?", size=""): + def log_request(self, code: str = "?", size: str = "-") -> None: # ty:ignore[invalid-method-override] if isinstance(code, http.HTTPStatus): color = logs.TermColor.RED if 100 <= code < 200: @@ -145,26 +161,36 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler): if size == "" and self.out_size > 0: size = str(self.out_size) args = (code, self.address_string(), self.__get_host(), self.requestline) - format = "→ %s - %s - %s - %s" + fmt = "→ %s - %s - %s - %s" if size != "": args = (*args, size) - format += " - %s" - self.logger.info(format, *args) + fmt += " - %s" + self.logger.info(fmt, *args) - def __pre_log_request(self): + def __pre_log_request(self) -> None: args = ("...", self.address_string(), self.__get_host(), self.requestline) - format = "← %s - %s - %s - %s" + fmt = "← %s - %s - %s - %s" if (size := self.__get_length()) > 0: args = (*args, size) - format += " - %s" - self.logger.debug(format, *args) + fmt += " - %s" + self.logger.debug(fmt, *args) - def __get_subpath(self, match_full: bool = True) -> str | None: - if match_full: - match = self.PATH_REGEX.fullmatch(self.path) - else: - match = self.PATH_REGEX.match(self.path) - if match is not None: + def __check_update_request(self) -> str | None: + if self.headers["X-Token"] != self.token: + self.send_error(http.HTTPStatus.UNAUTHORIZED, "Invalid token") + return None + if (sub_path := self.__get_subpath_full()) is None: + self.send_error(http.HTTPStatus.BAD_REQUEST, "Invalid path") + return None + return sub_path + + def __get_subpath(self) -> str | None: + if (match := self.PATH_REGEX.match(self.path)) is not None: + return match.group(1) + return None + + def __get_subpath_full(self) -> str | None: + if (match := self.PATH_REGEX.fullmatch(self.path)) is not None: return match.group(1) return None @@ -176,10 +202,9 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler): def __get_length(self) -> int: if not self.headers["Content-Length"]: return 0 - else: - return int(self.headers["Content-Length"]) + return int(self.headers["Content-Length"]) - def __server_index(self): + def __server_index(self) -> None: self.__send_basic_body(self.server_version + "\n") def __send_basic_body( @@ -188,7 +213,7 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler): content_type: str = "text/plain", code: int = http.HTTPStatus.OK, message: str | None = None, - ): + ) -> None: encoded: bytes = body.encode() self.out_size = len(encoded) self.send_response(code, message) @@ -197,7 +222,7 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler): self.end_headers() self.wfile.write(encoded) - def __send_status_only(self, code: int, message: str | None = None): + def __send_status_only(self, code: int, message: str | None = None) -> None: self.send_response(code, message) self.send_header("Content-Length", "0") self.end_headers() diff --git a/src/logs.py b/src/logs.py index 25503d0..32a54c5 100644 --- a/src/logs.py +++ b/src/logs.py @@ -2,7 +2,8 @@ import enum import logging import typing -from . import params +if typing.TYPE_CHECKING: + from . import params class TermColor(enum.StrEnum): @@ -24,11 +25,12 @@ class TermColor(enum.StrEnum): def __str__(self) -> str: return f"\033[{self.value}m" - def __add__(self, second): - return str(self) + str(second) + @typing.override + def __add__(self, value: typing.Any, /) -> str: # ty:ignore[invalid-method-override] + return str(self) + str(value) - def __radd__(self, second): - return str(second) + str(self) + def __radd__(self, value: typing.Any, /) -> str: + return str(value) + str(self) class ColoredLoggingFormatter(logging.Formatter): @@ -42,7 +44,7 @@ class ColoredLoggingFormatter(logging.Formatter): + TermColor.RESET ) - FORMAT_COLORS = { + FORMAT_COLORS: typing.ClassVar[dict[int, TermColor | str]] = { logging.DEBUG: TermColor.FEINT + TermColor.GREY, logging.INFO: TermColor.GREEN, logging.WARNING: TermColor.YELLOW, @@ -51,16 +53,15 @@ class ColoredLoggingFormatter(logging.Formatter): } @typing.override - def __init__(self, trace: bool): + def __init__(self, trace: bool) -> None: self.trace = trace super().__init__() @typing.override - def format(self, record: logging.LogRecord): - log_color: TermColor = ( - self.FORMAT_COLORS[record.levelno] - if record.levelno in self.FORMAT_COLORS - else TermColor.MAGENTA + def format(self, record: logging.LogRecord) -> str: + log_color: TermColor | str = self.FORMAT_COLORS.get( + record.levelno, + TermColor.MAGENTA, ) formatter = logging.Formatter( self.pre_format @@ -69,12 +70,12 @@ class ColoredLoggingFormatter(logging.Formatter): + self.level_format + TermColor.RESET + self.post_format - + (self.trace_format if self.trace else "") + + (self.trace_format if self.trace else ""), ) return formatter.format(record) -def setup_logs(params: params.Parameters): +def setup_logs(params: params.Parameters) -> None: stream_handler = logging.StreamHandler() stream_handler.setFormatter(ColoredLoggingFormatter(trace=params.debug)) log_level = logging.INFO diff --git a/src/params.py b/src/params.py index f83f2ed..15087d1 100644 --- a/src/params.py +++ b/src/params.py @@ -1,7 +1,6 @@ import argparse import dataclasses import os -import os.path from . import project @@ -19,7 +18,7 @@ class Parameters: debug: bool @classmethod - def from_namespace(cls, args: argparse.Namespace) -> "Parameters": + def from_namespace(cls, args: argparse.Namespace) -> Parameters: return Parameters(**vars(args)) @@ -37,37 +36,48 @@ def __get_env_int(var: str, default: int) -> int: def __add_arg_str( - parser: argparse.ArgumentParser, *flags: str, env_var: str, default: str, help: str -): + parser: argparse.ArgumentParser, + *flags: str, + env_var: str, + default: str, + help_txt: str, +) -> None: parser.add_argument( *flags, metavar=env_var, default=__get_env_str(env_var, default), - help=f"{help} (default: {default})", + help=f"{help_txt} (default: {default})", ) def __add_arg_int( - parser: argparse.ArgumentParser, *flags: str, env_var: str, default: int, help: str -): + parser: argparse.ArgumentParser, + *flags: str, + env_var: str, + default: int, + help_txt: str, +) -> None: parser.add_argument( *flags, type=int, metavar=env_var, default=__get_env_int(env_var, default), - help=f"{help} (default: {default})", + help=f"{help_txt} (default: {default})", ) def __add_arg_str_required( - parser: argparse.ArgumentParser, *flags: str, env_var: str, help: str -): + parser: argparse.ArgumentParser, + *flags: str, + env_var: str, + help_txt: str, +) -> None: parser.add_argument( *flags, metavar=env_var, required=os.getenv(env_var) is None, default=os.getenv(env_var), - help=f"{help}", + help=help_txt, ) @@ -78,36 +88,41 @@ def parse_parameters() -> Parameters: epilog="(Each option can be supplied with equivalent environment variable.)", ) __add_arg_int( - parser, "-p", "--port", env_var="PORT", default=8080, help="server port" + parser, + "-p", + "--port", + env_var="PORT", + default=8080, + help_txt="server port", ) __add_arg_str( parser, "--host", env_var="HOST", default="localhost:8080", - help="server default host", + help_txt="server default host", ) __add_arg_str( parser, "-d", "--data-dir", env_var="DATA_DIR", - default=os.path.join(".", "data"), - help="directory where pages are/will be stored", + default="./data", + help_txt="directory where pages are/will be stored", ) __add_arg_str_required( parser, "-t", "--token", env_var="TOKEN", - help="secret token for update requests", + help_txt="secret token for update requests", ) __add_arg_int( parser, "--max-size-bytes", env_var="MAX_SIZE", default=2_000_000, - help="max size of accepted archives (in bytes)", + help_txt="max size of accepted archives (in bytes)", ) __add_arg_str( parser, @@ -115,21 +130,21 @@ def parse_parameters() -> Parameters: "--bind", env_var="BIND", default="0.0.0.0", - help="server bind address", + help_txt="server bind address", ) __add_arg_str( parser, "--certbot-conf", env_var="CERTBOT_CONF", default="/etc/letsencrypt", - help="Certbot config dir", + help_txt="Certbot config dir", ) __add_arg_str( parser, "--certbot-www", env_var="CERTBOT_WWW", - default=os.path.join(".", "data", ".certbot"), - help="Certbot www dir", + default="./data/.certbot", + help_txt="Certbot www dir", ) parser.add_argument("--debug", action=argparse.BooleanOptionalAction) args = parser.parse_args() diff --git a/src/project.py b/src/project.py index 80358f3..b2cdb27 100644 --- a/src/project.py +++ b/src/project.py @@ -1,29 +1,26 @@ -import os.path +import pathlib import typing import toml -__project_data = None - def __get_project_data() -> None | dict[str, typing.Any]: - global __project_data - if __project_data is None: - pyproject_toml_file = os.path.join( - os.path.dirname(__file__), "..", "pyproject.toml" - ) - if os.path.exists(pyproject_toml_file) and os.path.isfile(pyproject_toml_file): - try: - data = toml.load(pyproject_toml_file) - if "project" in data: - __project_data = data["project"] - except TypeError, toml.TomlDecodeError, FileNotFoundError: - pass - return __project_data + pyproject_toml_file = pathlib.Path(__file__) / ".." / ".." / "pyproject.toml" + if pyproject_toml_file.is_file(): + try: + data = toml.load(pyproject_toml_file) + if "project" in data: + return data["project"] + except TypeError, toml.TomlDecodeError, FileNotFoundError: + pass + return None + + +__project_data = __get_project_data() def __get_str_value(key: str) -> str: - project_data = __get_project_data() + project_data = __project_data if project_data is not None and key in project_data: return project_data[key] return "unknown" diff --git a/src/registry.py b/src/registry.py index e62fb35..21237b2 100644 --- a/src/registry.py +++ b/src/registry.py @@ -1,31 +1,37 @@ import logging +import typing -from . import data_dir, page, params +from . import data_dir, page + +if typing.TYPE_CHECKING: + from . import params class Registry: - def __init__(self, params: params.Parameters): + def __init__(self, params: params.Parameters) -> None: self.logger = logging.getLogger(self.__class__.__name__) self.pages: dict[str, page.Page] = {} self.data_dir = data_dir.DataDir(params.data_dir) self.prefix = f"http://{params.host}" - def load_pages(self): + def load_pages(self) -> None: self.pages = {} for path in self.data_dir.list_paths(): self.add(path) - def add(self, path: str): + def add(self, path: str) -> None: self.pages[path] = page.Page( - path, self.data_dir.has_index(path), self.data_dir.get_host(path) + path, + self.data_dir.has_index(path), + self.data_dir.get_host(path), ) self.logger.info("Updated %s%s", self.prefix, str(self.pages[path])) - def set_host(self, path: str, host: str): + def set_host(self, path: str, host: str) -> None: self.data_dir.set_host(path, host) self.pages[path].host = host - def remove(self, path: str): + def remove(self, path: str) -> None: page = self.pages[path] del self.pages[path] self.logger.info("Removed %s%s", self.prefix, str(page)) @@ -34,3 +40,4 @@ class Registry: for p in self.pages.values(): if p.host == host: return p + return None diff --git a/src/server.py b/src/server.py index ea15fbc..3a45a43 100644 --- a/src/server.py +++ b/src/server.py @@ -1,12 +1,17 @@ +import contextlib import http.server import logging -import os +import pathlib +import typing -from . import handler, params, project, registry +from . import handler, project, registry + +if typing.TYPE_CHECKING: + from . import params class StaplerServer: - def __init__(self, params: params.Parameters): + def __init__(self, params: params.Parameters) -> None: self.logger = logging.getLogger(self.__class__.__name__) self.params = params self.registry = registry.Registry(params) @@ -15,18 +20,20 @@ class StaplerServer: self.request_handler, ) - def request_handler(self, *args) -> http.server.BaseHTTPRequestHandler: + def request_handler(self, *args: typing.Any) -> http.server.BaseHTTPRequestHandler: return handler.RequestHandler(*args, params=self.params, registry=self.registry) - def __init_certbot_www(self): - os.makedirs(self.params.certbot_www, exist_ok=True) + def __init_certbot_www(self) -> None: + certbot_www_path = pathlib.Path(self.params.certbot_www) + if not certbot_www_path.exists(): + certbot_www_path.mkdir(parents=True) - def __startup(self): + def __startup(self) -> None: self.logger.info("Starting up...") self.registry.load_pages() self.__init_certbot_www() - def start(self): + def start(self) -> None: self.logger.info("Version %s", project.get_version()) self.__startup() self.logger.info( @@ -38,7 +45,5 @@ class StaplerServer: "Server up and ready on http://%s", self.params.host, ) - try: + with contextlib.suppress(KeyboardInterrupt): self.server.serve_forever() - except KeyboardInterrupt: - pass