refactor: reorder handler functions
This commit is contained in:
+82
-92
@@ -1,3 +1,4 @@
|
||||
import typing
|
||||
import http.server
|
||||
import http
|
||||
import tarfile
|
||||
@@ -15,6 +16,7 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler):
|
||||
CERTBOT_CHALLENGE_PATH = "/.well-known/acme-challenge"
|
||||
PATH_REGEX = re.compile(r"^\/([\w-]+)\/")
|
||||
|
||||
@typing.override
|
||||
def __init__(
|
||||
self, *args, params: params.Parameters, registry: registry.Registry, **kwargs
|
||||
):
|
||||
@@ -28,89 +30,20 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler):
|
||||
self.out_size = 0
|
||||
super().__init__(*args, directory=params.data_dir, **kwargs)
|
||||
|
||||
def log_message(self, format: str, *args):
|
||||
self.logger.info("%s - " + format, self.address_string(), *args)
|
||||
|
||||
def log_error(self, format: str, *args):
|
||||
self.logger.error("%s - " + format, self.address_string(), *args)
|
||||
|
||||
def log_request(self, code="?", size=""):
|
||||
if isinstance(code, http.HTTPStatus):
|
||||
color = logs.TermColor.RED
|
||||
if 100 <= code < 200:
|
||||
color = logs.TermColor.CYAN
|
||||
if 200 <= code < 300:
|
||||
color = logs.TermColor.GREEN
|
||||
elif 300 <= code < 400:
|
||||
color = logs.TermColor.BLUE
|
||||
elif 400 <= code < 500:
|
||||
color = logs.TermColor.YELLOW
|
||||
code = color + str(code.value) + logs.TermColor.RESET
|
||||
if size == "" and self.out_size > 0:
|
||||
size = str(self.out_size)
|
||||
if size != "":
|
||||
self.logger.info(
|
||||
"→ %s - %s - %s - %s - %s",
|
||||
code,
|
||||
self.address_string(),
|
||||
self.__get_host(),
|
||||
self.requestline,
|
||||
size,
|
||||
)
|
||||
else:
|
||||
self.logger.info(
|
||||
"→ %s - %s - %s - %s",
|
||||
code,
|
||||
self.address_string(),
|
||||
self.__get_host(),
|
||||
self.requestline,
|
||||
)
|
||||
|
||||
def pre_log_request(self):
|
||||
if (size := self.__get_length()) > 0:
|
||||
self.logger.debug(
|
||||
"← ... - %s - %s - %s - %d",
|
||||
self.address_string(),
|
||||
self.__get_host(),
|
||||
self.requestline,
|
||||
size,
|
||||
)
|
||||
else:
|
||||
self.logger.debug(
|
||||
"← ... - %s - %s - %s",
|
||||
self.address_string(),
|
||||
self.__get_host(),
|
||||
self.requestline,
|
||||
)
|
||||
|
||||
def list_directory(self, *_, **__):
|
||||
"""Disable default directory listing"""
|
||||
self.send_error(http.HTTPStatus.NOT_FOUND, "File not found")
|
||||
|
||||
def translate_path(self, path: str) -> str:
|
||||
if path.startswith(self.CERTBOT_CHALLENGE_PATH):
|
||||
return self.certbot_www + path.removeprefix(self.CERTBOT_CHALLENGE_PATH)
|
||||
if (page := self.registry.get_from_host(self.__get_host())) is not None:
|
||||
path = f"/{page.path}" + path
|
||||
path = super().translate_path(path)
|
||||
if self.__get_subpath(match_full=False) is None: # not a valid path
|
||||
return ""
|
||||
if os.path.basename(path).startswith("."): # hidden files
|
||||
return ""
|
||||
return path
|
||||
|
||||
@typing.override
|
||||
def do_HEAD(self):
|
||||
self.pre_log_request()
|
||||
self.__pre_log_request()
|
||||
super().do_HEAD()
|
||||
|
||||
@typing.override
|
||||
def do_GET(self):
|
||||
self.pre_log_request()
|
||||
self.__pre_log_request()
|
||||
if self.path == "/" and self.__get_host() == self.default_host:
|
||||
return self.__server_index()
|
||||
super().do_GET()
|
||||
|
||||
def do_PUT(self):
|
||||
self.pre_log_request()
|
||||
self.__pre_log_request()
|
||||
if self.headers["X-Token"] != self.token:
|
||||
return self.send_error(http.HTTPStatus.UNAUTHORIZED, "Invalid token")
|
||||
if (sub_path := self.__get_subpath()) is None:
|
||||
@@ -136,7 +69,7 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler):
|
||||
self.registry.add(sub_path)
|
||||
|
||||
def do_DELETE(self):
|
||||
self.pre_log_request()
|
||||
self.__pre_log_request()
|
||||
if self.headers["X-Token"] != self.token:
|
||||
return self.send_error(http.HTTPStatus.UNAUTHORIZED, "Invalid token")
|
||||
if (sub_path := self.__get_subpath()) is None:
|
||||
@@ -152,6 +85,80 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler):
|
||||
)
|
||||
self.registry.remove(sub_path)
|
||||
|
||||
@typing.override
|
||||
def list_directory(self, *_, **__):
|
||||
"""Disable default directory listing"""
|
||||
self.send_error(http.HTTPStatus.NOT_FOUND, "File not found")
|
||||
|
||||
@typing.override
|
||||
def translate_path(self, path: str) -> str:
|
||||
if path.startswith(self.CERTBOT_CHALLENGE_PATH):
|
||||
return self.certbot_www + path.removeprefix(self.CERTBOT_CHALLENGE_PATH)
|
||||
if (page := self.registry.get_from_host(self.__get_host())) is not None:
|
||||
path = f"/{page.path}" + path
|
||||
path = super().translate_path(path)
|
||||
if self.__get_subpath(match_full=False) is None: # not a valid path
|
||||
return ""
|
||||
if os.path.basename(path).startswith("."): # hidden files
|
||||
return ""
|
||||
return path
|
||||
|
||||
@typing.override
|
||||
def send_error(
|
||||
self, code: int, message: str | None = None, explain: str | None = None
|
||||
):
|
||||
shortmsg, longmsg = self.responses[code]
|
||||
if message is None:
|
||||
message = shortmsg
|
||||
if explain is None:
|
||||
explain = longmsg
|
||||
if "Accept" not in self.headers["Accept"] or "text/" in self.headers["Accept"]:
|
||||
self.__send_basic_body(
|
||||
f"{code} {message}\n{explain}\n{self.server_version}\n",
|
||||
code=code,
|
||||
message=message,
|
||||
)
|
||||
else:
|
||||
self.__send_status_only(code, message)
|
||||
|
||||
@typing.override
|
||||
def log_message(self, format: str, *args):
|
||||
self.logger.info("%s - " + format, self.address_string(), *args)
|
||||
|
||||
@typing.override
|
||||
def log_error(self, format: str, *args):
|
||||
self.logger.error("%s - " + format, self.address_string(), *args)
|
||||
|
||||
@typing.override
|
||||
def log_request(self, code="?", size=""):
|
||||
if isinstance(code, http.HTTPStatus):
|
||||
color = logs.TermColor.RED
|
||||
if 100 <= code < 200:
|
||||
color = logs.TermColor.CYAN
|
||||
if 200 <= code < 300:
|
||||
color = logs.TermColor.GREEN
|
||||
elif 300 <= code < 400:
|
||||
color = logs.TermColor.BLUE
|
||||
elif 400 <= code < 500:
|
||||
color = logs.TermColor.YELLOW
|
||||
code = color + str(code.value) + logs.TermColor.RESET
|
||||
if size == "" and self.out_size > 0:
|
||||
size = str(self.out_size)
|
||||
args = (code, self.address_string(), self.__get_host(), self.requestline)
|
||||
format = "→ %s - %s - %s - %s"
|
||||
if size != "":
|
||||
args = (*args, size)
|
||||
format += " - %s"
|
||||
self.logger.info(format, *args)
|
||||
|
||||
def __pre_log_request(self):
|
||||
args = ("...", self.address_string(), self.__get_host(), self.requestline)
|
||||
format = "← %s - %s - %s - %s"
|
||||
if (size := self.__get_length()) > 0:
|
||||
args = (*args, size)
|
||||
format += " - %s"
|
||||
self.logger.debug(format, *args)
|
||||
|
||||
def __get_subpath(self, match_full: bool = True) -> str | None:
|
||||
if match_full:
|
||||
match = self.PATH_REGEX.fullmatch(self.path)
|
||||
@@ -194,20 +201,3 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler):
|
||||
self.send_response(code, message)
|
||||
self.send_header("Content-Length", "0")
|
||||
self.end_headers()
|
||||
|
||||
def send_error(
|
||||
self, code: int, message: str | None = None, explain: str | None = None
|
||||
):
|
||||
shortmsg, longmsg = self.responses[code]
|
||||
if message is None:
|
||||
message = shortmsg
|
||||
if explain is None:
|
||||
explain = longmsg
|
||||
if "Accept" not in self.headers["Accept"] or "text/" in self.headers["Accept"]:
|
||||
self.__send_basic_body(
|
||||
f"{code} {message}\n{explain}\n{self.server_version}",
|
||||
code=code,
|
||||
message=message,
|
||||
)
|
||||
else:
|
||||
self.__send_status_only(code, message)
|
||||
|
||||
Reference in New Issue
Block a user