Files
stapler/tests/test_handlers.py
T

665 lines
24 KiB
Python

import abc
import collections
import contextlib
import http
import http.server
import io
import logging
import tarfile
import typing
import unittest.mock
from src.cert_manager import CertManager
from src.data_dir import DataDir
from src.handlers import BaseHandler, RequestHandler, UpgradeHandler
from src.page import Page
from src.params import Parameters
from src.registry import Registry
from src.token_manager import TokenManager
from . import BaseTestCase
class BaseHandlerTestCase(BaseTestCase, abc.ABC):
@abc.abstractmethod
def _get_handler(
self,
path: str = "/",
headers: dict[str, str | None] | None = None,
rfile: io.BufferedIOBase | None = None,
) -> BaseHandler:
pass
@contextlib.contextmanager
def expects_status_only(
self,
handler: BaseHandler,
code: int,
message: str | None = None,
headers: dict[str, str] | None = None,
) -> typing.Iterator[None]:
if headers is None:
headers = {}
send_response_mock = handler.send_response = unittest.mock.Mock() # ty:ignore[invalid-assignment]
send_header_mock = handler.send_header = unittest.mock.Mock() # ty:ignore[invalid-assignment]
end_headers_mock = handler.end_headers = unittest.mock.Mock() # ty:ignore[invalid-assignment]
yield
send_response_mock.assert_called_once_with(code, message)
send_header_mock.assert_has_calls(
[
unittest.mock.call("Content-Length", "0"),
]
+ [unittest.mock.call(header, value) for header, value in headers.items()],
any_order=True,
)
end_headers_mock.assert_called_once()
@contextlib.contextmanager
def expects_basic_body( # noqa: PLR0913
self,
handler: BaseHandler,
body: str,
content_type: str = "text/plain",
code: int = http.HTTPStatus.OK,
message: str | None = None,
headers: dict[str, str] | None = None,
) -> typing.Iterator[None]:
if headers is None:
headers = {}
send_response_mock = handler.send_response = unittest.mock.Mock() # ty:ignore[invalid-assignment]
send_header_mock = handler.send_header = unittest.mock.Mock() # ty:ignore[invalid-assignment]
end_headers_mock = handler.end_headers = unittest.mock.Mock() # ty:ignore[invalid-assignment]
yield
send_response_mock.assert_called_once_with(code, message)
send_header_mock.assert_has_calls(
[
unittest.mock.call("Content-Length", str(len(body.encode()))),
unittest.mock.call("Content-type", f"{content_type}; charset=UTF-8"),
]
+ [unittest.mock.call(header, value) for header, value in headers.items()],
any_order=True,
)
end_headers_mock.assert_called_once()
handler.wfile.seek(0)
self.assertEqual(handler.wfile.read(), body.encode())
@contextlib.contextmanager
def expects_error(
self,
handler: BaseHandler,
code: int,
message: str | None = None,
) -> typing.Iterator[None]:
shortmsg, _ = RequestHandler.responses[code]
if message is None:
message = shortmsg
with self.expects_status_only(handler, code, message):
yield
@contextlib.contextmanager
def expects_error_full(
self,
handler: BaseHandler,
code: int,
message: str | None = None,
explain: str | None = None,
) -> typing.Iterator[None]:
shortmsg, longmsg = http.server.BaseHTTPRequestHandler.responses[code]
if message is None:
message = shortmsg
if explain is None:
explain = longmsg
with self.expects_basic_body(
handler,
body=f"{code} {message}\n{explain}\n\n{handler.server_signature()}",
code=code,
message=message,
):
yield
class TestRequestHandler(BaseHandlerTestCase):
@typing.override
def setUp(self) -> None:
self.get_tmp_dir()
self.registry = self.mock(Registry)
self.cert_manager = self.mock(CertManager)
self.token_manager = self.mock(TokenManager)
self.certbot_www = self.tmp_path / "certbot_www"
self.data_dir = self.mock(DataDir)
super().setUp()
def _get_handler(
self,
path: str = "/",
headers: dict[str, str | None] | None = None,
rfile: io.BufferedIOBase | None = None,
) -> RequestHandler:
if headers is None:
headers = {}
with self.patch("http.server.BaseHTTPRequestHandler.__init__"):
handler = RequestHandler(
unittest.mock.MagicMock(),
"127.0.0.1",
unittest.mock.MagicMock(),
params=Parameters(
data_dir=self.get_tmp_dir(), certbot_www=str(self.certbot_www)
),
registry=self.registry,
cert_manager=self.cert_manager,
token_manager=self.token_manager,
)
handler.address_string = lambda: "127.0.0.1" # ty:ignore[invalid-assignment]
handler.requestline = "GET /"
handler.path = path
handler.request_version = "HTTP/0.9"
handler.headers = collections.defaultdict(lambda: None, headers) # ty:ignore[invalid-assignment]
handler.rfile = rfile if rfile is not None else io.BytesIO()
handler.wfile = io.BytesIO()
handler.logger = unittest.mock.Mock(logging.Logger)
handler.data_dir = self.data_dir
return handler
def test_do_head_proxy(self) -> None:
handler = self._get_handler()
with (
self.patch("http.server.SimpleHTTPRequestHandler.do_HEAD"),
self.seal_mocks(),
):
handler.do_HEAD()
def test_do_get_index(self) -> None:
handler = self._get_handler("/")
with (
self.expects_basic_body(handler, handler.server_signature()),
self.seal_mocks(),
):
handler.do_GET()
def test_do_get_proxy_on_other_path(self) -> None:
handler = self._get_handler("/file")
with (
self.patch("http.server.SimpleHTTPRequestHandler.do_GET"),
self.seal_mocks(),
):
handler.do_GET()
def test_do_get_proxy_on_other_host(self) -> None:
handler = self._get_handler("/", {"Host": "other_host"})
with (
self.patch("http.server.SimpleHTTPRequestHandler.do_GET"),
self.seal_mocks(),
):
handler.do_GET()
def test_do_put_no_token(self) -> None:
handler = self._get_handler("/path")
with (
self.expects_error(
handler, http.HTTPStatus.BAD_REQUEST, "No X-Token header in request"
),
self.seal_mocks(),
):
handler.do_PUT()
def test_do_put_invalid_token(self) -> None:
handler = self._get_handler("/path", {"X-Token": "secret"})
with (
self.mock_call(self.token_manager.is_valid, ["secret"], False), # noqa: FBT003
self.expects_error(handler, http.HTTPStatus.UNAUTHORIZED, "Invalid token"),
self.seal_mocks(),
):
handler.do_PUT()
def test_do_put_invalid_path(self) -> None:
handler = self._get_handler("/pa.th", {"X-Token": "secret"})
with (
self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003
self.expects_error(handler, http.HTTPStatus.BAD_REQUEST, "Invalid path"),
self.seal_mocks(),
):
handler.do_PUT()
def test_do_put_invalid_token_for_path(self) -> None:
handler = self._get_handler("/path", {"X-Token": "secret"})
with (
self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003
self.mock_call(
self.token_manager.is_valid_for_path,
["secret", "path"],
False, # noqa: FBT003
),
self.expects_error(
handler, http.HTTPStatus.FORBIDDEN, "Path forbidden for this token"
),
self.seal_mocks(),
):
handler.do_PUT()
def test_do_put_invalid_host(self) -> None:
handler = self._get_handler(
"/path", {"X-Token": "secret", "X-Host": "invalid_host"}
)
with (
self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003
self.mock_call(
self.token_manager.is_valid_for_path,
["secret", "path"],
True, # noqa: FBT003
),
self.expects_error(
handler, http.HTTPStatus.BAD_REQUEST, "Invalid requested host"
),
self.seal_mocks(),
):
handler.do_PUT()
def test_do_put_invalid_host_for_path(self) -> None:
handler = self._get_handler(
"/path", {"X-Token": "secret", "X-Host": "example.com"}
)
with (
self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003
self.mock_call(
self.token_manager.is_valid_for_path,
["secret", "path"],
True, # noqa: FBT003
),
self.mock_call(
self.registry.get_from_host, ["example.com"], Page("other_path")
),
self.expects_error(
handler, http.HTTPStatus.FORBIDDEN, "Host already taken"
),
self.seal_mocks(),
):
handler.do_PUT()
def test_do_put_no_content(self) -> None:
handler = self._get_handler("/path", {"X-Token": "secret"})
with (
self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003
self.mock_call(
self.token_manager.is_valid_for_path,
["secret", "path"],
True, # noqa: FBT003
),
self.expects_error(
handler, http.HTTPStatus.LENGTH_REQUIRED, "No body found"
),
self.seal_mocks(),
):
handler.do_PUT()
def test_do_put_content_too_large(self) -> None:
handler = self._get_handler(
"/path", {"X-Token": "secret", "Content-Length": "999999999"}
)
with (
self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003
self.mock_call(
self.token_manager.is_valid_for_path,
["secret", "path"],
True, # noqa: FBT003
),
self.expects_error(
handler,
http.HTTPStatus.CONTENT_TOO_LARGE,
"Archive too large",
),
self.seal_mocks(),
):
handler.do_PUT()
def test_do_put_tar_error(self) -> None:
handler = self._get_handler(
"/path", {"X-Token": "secret", "Content-Length": "1"}
)
handler.rfile.write(b"\0")
self.data_dir.extract_tar_bytes.side_effect = tarfile.TarError
with (
self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003
self.mock_call(
self.token_manager.is_valid_for_path,
["secret", "path"],
True, # noqa: FBT003
),
self.expects_error(
handler, http.HTTPStatus.BAD_REQUEST, "Invalid tar archive"
),
self.seal_mocks(),
):
handler.do_PUT()
self.data_dir.extract_tar_bytes.assert_called_once()
def test_do_put_extract_error(self) -> None:
handler = self._get_handler(
"/path", {"X-Token": "secret", "Content-Length": "1"}
)
handler.rfile.write(b"\0")
self.data_dir.extract_tar_bytes.side_effect = Exception
with (
self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003
self.mock_call(
self.token_manager.is_valid_for_path,
["secret", "path"],
True, # noqa: FBT003
),
self.expects_error(handler, http.HTTPStatus.INTERNAL_SERVER_ERROR, ""),
self.seal_mocks(),
):
handler.do_PUT()
self.data_dir.extract_tar_bytes.assert_called_once()
def test_do_put_ok(self) -> None:
handler = self._get_handler(
"/path", {"X-Token": "secret", "Content-Length": "1"}
)
handler.rfile.write(b"\0")
with (
self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003
self.mock_call(
self.token_manager.is_valid_for_path,
["secret", "path"],
True, # noqa: FBT003
),
self.mock_call_unchecked(self.data_dir.extract_tar_bytes),
self.mock_call(self.registry.add, ["path"]),
self.mock_call(self.token_manager.set_token, ["secret", "path"]),
self.expects_status_only(
handler, http.HTTPStatus.CREATED, "Resource /path/ updated"
),
self.seal_mocks(),
):
handler.do_PUT()
def test_do_put_ok_with_host_fail_init(self) -> None:
handler = self._get_handler(
"/path",
{"X-Token": "secret", "Content-Length": "1", "X-Host": "example.com"},
)
handler.rfile.write(b"\0")
with (
self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003
self.mock_call(
self.token_manager.is_valid_for_path,
["secret", "path"],
True, # noqa: FBT003
),
self.mock_call(self.registry.get_from_host, ["example.com"], Page("path")),
self.mock_call_unchecked(self.data_dir.extract_tar_bytes),
self.mock_call(self.registry.add, ["path"]),
self.mock_call(self.token_manager.set_token, ["secret", "path"]),
self.mock_call(self.cert_manager.create_or_update, ["example.com"], False), # noqa: FBT003
self.expects_status_only(
handler, http.HTTPStatus.CREATED, "Resource /path/ updated"
),
self.seal_mocks(),
):
handler.do_PUT()
def test_do_put_ok_with_host(self) -> None:
handler = self._get_handler(
"/path",
{"X-Token": "secret", "Content-Length": "1", "X-Host": "example.com"},
)
handler.rfile.write(b"\0")
with (
self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003
self.mock_call(
self.token_manager.is_valid_for_path,
["secret", "path"],
True, # noqa: FBT003
),
self.mock_call(self.registry.get_from_host, ["example.com"], Page("path")),
self.mock_call_unchecked(self.data_dir.extract_tar_bytes),
self.mock_call(self.registry.add, ["path"]),
self.mock_call(self.token_manager.set_token, ["secret", "path"]),
self.mock_call(self.cert_manager.create_or_update, ["example.com"], True), # noqa: FBT003
self.mock_call(self.registry.set_host, ["path", "example.com"]),
self.expects_status_only(
handler, http.HTTPStatus.CREATED, "Resource /path/ updated"
),
self.seal_mocks(),
):
handler.do_PUT()
def test_do_delete_no_token(self) -> None:
handler = self._get_handler("/path")
with (
self.expects_error(
handler, http.HTTPStatus.BAD_REQUEST, "No X-Token header in request"
),
self.seal_mocks(),
):
handler.do_DELETE()
def test_do_delete_invalid_token(self) -> None:
handler = self._get_handler("/path", {"X-Token": "secret"})
with (
self.mock_call(self.token_manager.is_valid, ["secret"], False), # noqa: FBT003
self.expects_error(handler, http.HTTPStatus.UNAUTHORIZED, "Invalid token"),
self.seal_mocks(),
):
handler.do_DELETE()
def test_do_delete_invalid_path(self) -> None:
handler = self._get_handler("/pa.th", {"X-Token": "secret"})
with (
self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003
self.expects_error(handler, http.HTTPStatus.BAD_REQUEST, "Invalid path"),
self.seal_mocks(),
):
handler.do_DELETE()
def test_do_delete_invalid_token_for_path(self) -> None:
handler = self._get_handler("/path", {"X-Token": "secret"})
with (
self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003
self.mock_call(
self.token_manager.is_valid_for_path,
["secret", "path"],
False, # noqa: FBT003
),
self.expects_error(
handler, http.HTTPStatus.FORBIDDEN, "Path forbidden for this token"
),
self.seal_mocks(),
):
handler.do_DELETE()
def test_do_delete_not_found(self) -> None:
handler = self._get_handler("/path", {"X-Token": "secret"})
with (
self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003
self.mock_call(
self.token_manager.is_valid_for_path,
["secret", "path"],
True, # noqa: FBT003
),
self.mock_call(self.data_dir.exists, ["path"], False), # noqa: FBT003
self.expects_error(handler, http.HTTPStatus.NOT_FOUND, "Not found"),
self.seal_mocks(),
):
handler.do_DELETE()
def test_do_delete_remove_error(self) -> None:
handler = self._get_handler("/path", {"X-Token": "secret"})
self.data_dir.remove.side_effect = Exception
with (
self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003
self.mock_call(
self.token_manager.is_valid_for_path,
["secret", "path"],
True, # noqa: FBT003
),
self.mock_call(self.data_dir.exists, ["path"], True), # noqa: FBT003
self.mock_call(self.data_dir.exists, ["path"], True), # noqa: FBT003
self.expects_error(handler, http.HTTPStatus.INTERNAL_SERVER_ERROR, ""),
self.seal_mocks(),
):
handler.do_DELETE()
self.data_dir.remove.assert_called_once_with("path")
def test_do_delete_ok(self) -> None:
handler = self._get_handler("/path", {"X-Token": "secret"})
with (
self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003
self.mock_call(
self.token_manager.is_valid_for_path,
["secret", "path"],
True, # noqa: FBT003
),
self.mock_call(self.data_dir.exists, ["path"], True), # noqa: FBT003
self.mock_call(self.data_dir.remove, ["path"]),
self.mock_call(self.registry.remove, ["path"]),
self.expects_error(
handler, http.HTTPStatus.NO_CONTENT, "Resource /path/ removed"
),
self.seal_mocks(),
):
handler.do_DELETE()
def test_list_directory(self) -> None:
handler = self._get_handler("/path/", {"Accept": "text/html"})
with (
self.expects_error_full(
handler, http.HTTPStatus.NOT_FOUND, "File not found"
),
self.seal_mocks(),
):
handler.list_directory()
def test_translate_path_certbot(self) -> None:
handler = self._get_handler()
with (
self.patch("http.server.SimpleHTTPRequestHandler.translate_path", count=0),
self.seal_mocks(),
):
self.assertEqual(
handler.translate_path("/.well-known/acme-challenge/abcde"),
str(self.certbot_www / ".well-known" / "acme-challenge" / "abcde"),
)
def test_translate_path_host_not_found(self) -> None:
handler = self._get_handler(headers={"Host": "example.com"})
with (
self.mock_call(self.registry.get_from_host, ["example.com"]),
self.patch("http.server.SimpleHTTPRequestHandler.translate_path", count=0),
self.seal_mocks(),
):
self.assertEqual(
handler.translate_path("/"),
"",
)
def test_translate_path_invalid(self) -> None:
handler = self._get_handler()
with (
self.patch("http.server.SimpleHTTPRequestHandler.translate_path", count=0),
self.seal_mocks(),
):
self.assertEqual(
handler.translate_path("/invalid.path"),
"",
)
def test_translate_path_favicon(self) -> None:
handler = self._get_handler()
with (
self.patch_call(
"http.server.SimpleHTTPRequestHandler.translate_path",
["/favicon.ico"],
),
self.seal_mocks(),
):
self.assertEqual(
handler.translate_path("/favicon.ico"),
None,
)
def test_translate_path_dotfile(self) -> None:
handler = self._get_handler()
with (
self.patch("http.server.SimpleHTTPRequestHandler.translate_path", count=0),
self.seal_mocks(),
):
self.assertEqual(
handler.translate_path("/path/.token"),
"",
)
def test_translate_path_with_host(self) -> None:
handler = self._get_handler(headers={"Host": "example.com"})
with (
self.mock_call(self.registry.get_from_host, ["example.com"], Page("path")),
self.patch_call(
"http.server.SimpleHTTPRequestHandler.translate_path",
["/path/index.html"],
),
self.seal_mocks(),
):
self.assertEqual(
handler.translate_path("/index.html"),
None,
)
def test_translate_path_default_host(self) -> None:
handler = self._get_handler()
with (
self.patch_call(
"http.server.SimpleHTTPRequestHandler.translate_path",
["/path/index.html"],
),
self.seal_mocks(),
):
self.assertEqual(
handler.translate_path("/path/index.html"),
None,
)
class TestUpgradeHandler(BaseHandlerTestCase):
def _get_handler(
self,
path: str = "/",
headers: dict[str, str | None] | None = None,
rfile: io.BufferedIOBase | None = None,
) -> UpgradeHandler:
if headers is None:
headers = {}
with self.patch("http.server.BaseHTTPRequestHandler.__init__"):
handler = UpgradeHandler(
unittest.mock.MagicMock(),
"127.0.0.1",
unittest.mock.MagicMock(),
params=Parameters(),
)
handler.address_string = lambda: "127.0.0.1" # ty:ignore[invalid-assignment]
handler.requestline = "GET /"
handler.path = path
handler.request_version = "HTTP/0.9"
handler.headers = collections.defaultdict(lambda: None, headers) # ty:ignore[invalid-assignment]
handler.rfile = rfile if rfile is not None else io.BytesIO()
handler.wfile = io.BytesIO()
handler.logger = unittest.mock.Mock(logging.Logger)
return handler
def test_do_get(self) -> None:
handler = self._get_handler("/file")
with self.expects_status_only(
handler,
http.HTTPStatus.MOVED_PERMANENTLY,
headers={"Location": "https://localhost/file"},
):
handler.do_GET()
def test_do_head(self) -> None:
handler = self._get_handler("/file")
with self.expects_status_only(
handler,
http.HTTPStatus.MOVED_PERMANENTLY,
headers={"Location": "https://localhost/file"},
):
handler.do_HEAD()