import abc import collections import contextlib import http import http.server import io import logging import tarfile import typing import unittest.mock from src.handlers import BaseHandler, RequestHandler, UpgradeHandler from src.page import Page from src.params import Parameters from . import BaseTestCase class BaseHandlerTestCase(BaseTestCase, abc.ABC): @abc.abstractmethod def _get_handler( self, path: str = "/", headers: dict[str, str | None] | None = None, rfile: io.BufferedIOBase | None = None, ) -> BaseHandler: pass @contextlib.contextmanager def expects_status_only( self, handler: BaseHandler, code: int, message: str | None = None, headers: dict[str, str] | None = None, ) -> typing.Iterator[None]: if headers is None: headers = {} send_response_mock = handler.send_response = unittest.mock.Mock() # ty:ignore[invalid-assignment] send_header_mock = handler.send_header = unittest.mock.Mock() # ty:ignore[invalid-assignment] end_headers_mock = handler.end_headers = unittest.mock.Mock() # ty:ignore[invalid-assignment] yield send_response_mock.assert_called_once_with(code, message) send_header_mock.assert_has_calls( [ unittest.mock.call("Content-Length", "0"), ] + [unittest.mock.call(header, value) for header, value in headers.items()], any_order=True, ) end_headers_mock.assert_called_once() @contextlib.contextmanager def expects_basic_body( # noqa: PLR0913 self, handler: BaseHandler, body: str, content_type: str = "text/plain", code: int = http.HTTPStatus.OK, message: str | None = None, headers: dict[str, str] | None = None, ) -> typing.Iterator[None]: if headers is None: headers = {} send_response_mock = handler.send_response = unittest.mock.Mock() # ty:ignore[invalid-assignment] send_header_mock = handler.send_header = unittest.mock.Mock() # ty:ignore[invalid-assignment] end_headers_mock = handler.end_headers = unittest.mock.Mock() # ty:ignore[invalid-assignment] yield send_response_mock.assert_called_once_with(code, message) send_header_mock.assert_has_calls( [ unittest.mock.call("Content-Length", str(len(body.encode()))), unittest.mock.call("Content-type", f"{content_type}; charset=UTF-8"), ] + [unittest.mock.call(header, value) for header, value in headers.items()], any_order=True, ) end_headers_mock.assert_called_once() handler.wfile.seek(0) self.assertEqual(handler.wfile.read(), body.encode()) @contextlib.contextmanager def expects_error( self, handler: BaseHandler, code: int, message: str | None = None, ) -> typing.Iterator[None]: shortmsg, _ = RequestHandler.responses[code] if message is None: message = shortmsg with self.expects_status_only(handler, code, message): yield @contextlib.contextmanager def expects_error_full( self, handler: BaseHandler, code: int, message: str | None = None, explain: str | None = None, ) -> typing.Iterator[None]: shortmsg, longmsg = http.server.BaseHTTPRequestHandler.responses[code] if message is None: message = shortmsg if explain is None: explain = longmsg with self.expects_basic_body( handler, body=f"{code} {message}\n{explain}\n\n{handler.server_signature()}", code=code, message=message, ): yield class TestRequestHandler(BaseHandlerTestCase): @typing.override def setUp(self) -> None: self.get_tmp_dir() self.registry = self.new_mock() self.cert_manager = self.new_mock() self.token_manager = self.new_mock() self.certbot_www = self.tmp_path / "certbot_www" self.data_dir = self.new_mock() super().setUp() def _get_handler( self, path: str = "/", headers: dict[str, str | None] | None = None, rfile: io.BufferedIOBase | None = None, ) -> RequestHandler: if headers is None: headers = {} with self.patch("http.server.BaseHTTPRequestHandler.__init__"): handler = RequestHandler( unittest.mock.MagicMock(), "127.0.0.1", unittest.mock.MagicMock(), params=Parameters( data_dir=self.get_tmp_dir(), certbot_www=str(self.certbot_www) ), registry=self.registry, cert_manager=self.cert_manager, token_manager=self.token_manager, ) handler.address_string = lambda: "127.0.0.1" # ty:ignore[invalid-assignment] handler.requestline = "GET /" handler.path = path handler.request_version = "HTTP/0.9" handler.headers = collections.defaultdict(lambda: None, headers) # ty:ignore[invalid-assignment] handler.rfile = rfile if rfile is not None else io.BytesIO() handler.wfile = io.BytesIO() handler.logger = unittest.mock.Mock(logging.Logger) handler.data_dir = self.data_dir return handler def test_do_head_proxy(self) -> None: handler = self._get_handler() with ( self.patch("http.server.SimpleHTTPRequestHandler.do_HEAD"), self.seal_mocks(), ): handler.do_HEAD() def test_do_get_index(self) -> None: handler = self._get_handler("/") with ( self.expects_basic_body(handler, handler.server_signature()), self.seal_mocks(), ): handler.do_GET() def test_do_get_proxy_on_other_path(self) -> None: handler = self._get_handler("/file") with ( self.patch("http.server.SimpleHTTPRequestHandler.do_GET"), self.seal_mocks(), ): handler.do_GET() def test_do_get_proxy_on_other_host(self) -> None: handler = self._get_handler("/", {"Host": "other_host"}) with ( self.patch("http.server.SimpleHTTPRequestHandler.do_GET"), self.seal_mocks(), ): handler.do_GET() def test_do_put_no_token(self) -> None: handler = self._get_handler("/path") with ( self.expects_error( handler, http.HTTPStatus.BAD_REQUEST, "No X-Token header in request" ), self.seal_mocks(), ): handler.do_PUT() def test_do_put_invalid_token(self) -> None: handler = self._get_handler("/path", {"X-Token": "secret"}) with ( self.mock_call(self.token_manager.is_valid, ["secret"], False), # noqa: FBT003 self.expects_error(handler, http.HTTPStatus.UNAUTHORIZED, "Invalid token"), self.seal_mocks(), ): handler.do_PUT() def test_do_put_invalid_path(self) -> None: handler = self._get_handler("/pa.th", {"X-Token": "secret"}) with ( self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003 self.expects_error(handler, http.HTTPStatus.BAD_REQUEST, "Invalid path"), self.seal_mocks(), ): handler.do_PUT() def test_do_put_invalid_token_for_path(self) -> None: handler = self._get_handler("/path", {"X-Token": "secret"}) with ( self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003 self.mock_call( self.token_manager.is_valid_for_path, ["secret", "path"], False, # noqa: FBT003 ), self.expects_error( handler, http.HTTPStatus.FORBIDDEN, "Path forbidden for this token" ), self.seal_mocks(), ): handler.do_PUT() def test_do_put_invalid_host(self) -> None: handler = self._get_handler( "/path", {"X-Token": "secret", "X-Host": "invalid_host"} ) with ( self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003 self.mock_call( self.token_manager.is_valid_for_path, ["secret", "path"], True, # noqa: FBT003 ), self.expects_error( handler, http.HTTPStatus.BAD_REQUEST, "Invalid requested host" ), self.seal_mocks(), ): handler.do_PUT() def test_do_put_invalid_host_for_path(self) -> None: handler = self._get_handler( "/path", {"X-Token": "secret", "X-Host": "example.com"} ) with ( self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003 self.mock_call( self.token_manager.is_valid_for_path, ["secret", "path"], True, # noqa: FBT003 ), self.mock_call( self.registry.get_from_host, ["example.com"], Page("other_path") ), self.expects_error( handler, http.HTTPStatus.FORBIDDEN, "Host already taken" ), self.seal_mocks(), ): handler.do_PUT() def test_do_put_no_content(self) -> None: handler = self._get_handler("/path", {"X-Token": "secret"}) with ( self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003 self.mock_call( self.token_manager.is_valid_for_path, ["secret", "path"], True, # noqa: FBT003 ), self.expects_error( handler, http.HTTPStatus.LENGTH_REQUIRED, "No body found" ), self.seal_mocks(), ): handler.do_PUT() def test_do_put_content_too_large(self) -> None: handler = self._get_handler( "/path", {"X-Token": "secret", "Content-Length": "999999999"} ) with ( self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003 self.mock_call( self.token_manager.is_valid_for_path, ["secret", "path"], True, # noqa: FBT003 ), self.expects_error( handler, http.HTTPStatus.CONTENT_TOO_LARGE, "Archive too large", ), self.seal_mocks(), ): handler.do_PUT() def test_do_put_tar_error(self) -> None: handler = self._get_handler( "/path", {"X-Token": "secret", "Content-Length": "1"} ) handler.rfile.write(b"\0") self.data_dir.extract_tar_bytes.side_effect = tarfile.TarError with ( self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003 self.mock_call( self.token_manager.is_valid_for_path, ["secret", "path"], True, # noqa: FBT003 ), self.expects_error( handler, http.HTTPStatus.BAD_REQUEST, "Invalid tar archive" ), self.seal_mocks(), ): handler.do_PUT() self.data_dir.extract_tar_bytes.assert_called_once() def test_do_put_extract_error(self) -> None: handler = self._get_handler( "/path", {"X-Token": "secret", "Content-Length": "1"} ) handler.rfile.write(b"\0") self.data_dir.extract_tar_bytes.side_effect = Exception with ( self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003 self.mock_call( self.token_manager.is_valid_for_path, ["secret", "path"], True, # noqa: FBT003 ), self.expects_error(handler, http.HTTPStatus.INTERNAL_SERVER_ERROR, ""), self.seal_mocks(), ): handler.do_PUT() self.data_dir.extract_tar_bytes.assert_called_once() def test_do_put_ok(self) -> None: handler = self._get_handler( "/path", {"X-Token": "secret", "Content-Length": "1"} ) handler.rfile.write(b"\0") with ( self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003 self.mock_call( self.token_manager.is_valid_for_path, ["secret", "path"], True, # noqa: FBT003 ), self.mock_call_unchecked(self.data_dir.extract_tar_bytes), self.mock_call(self.registry.add, ["path"]), self.mock_call(self.token_manager.set_token, ["secret", "path"]), self.expects_status_only( handler, http.HTTPStatus.CREATED, "Resource /path/ updated" ), self.seal_mocks(), ): handler.do_PUT() def test_do_put_ok_with_host_fail_init(self) -> None: handler = self._get_handler( "/path", {"X-Token": "secret", "Content-Length": "1", "X-Host": "example.com"}, ) handler.rfile.write(b"\0") with ( self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003 self.mock_call( self.token_manager.is_valid_for_path, ["secret", "path"], True, # noqa: FBT003 ), self.mock_call(self.registry.get_from_host, ["example.com"], Page("path")), self.mock_call_unchecked(self.data_dir.extract_tar_bytes), self.mock_call(self.registry.add, ["path"]), self.mock_call(self.token_manager.set_token, ["secret", "path"]), self.mock_call(self.cert_manager.create_or_update, ["example.com"], False), # noqa: FBT003 self.expects_status_only( handler, http.HTTPStatus.CREATED, "Resource /path/ updated" ), self.seal_mocks(), ): handler.do_PUT() def test_do_put_ok_with_host(self) -> None: handler = self._get_handler( "/path", {"X-Token": "secret", "Content-Length": "1", "X-Host": "example.com"}, ) handler.rfile.write(b"\0") with ( self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003 self.mock_call( self.token_manager.is_valid_for_path, ["secret", "path"], True, # noqa: FBT003 ), self.mock_call(self.registry.get_from_host, ["example.com"], Page("path")), self.mock_call_unchecked(self.data_dir.extract_tar_bytes), self.mock_call(self.registry.add, ["path"]), self.mock_call(self.token_manager.set_token, ["secret", "path"]), self.mock_call(self.cert_manager.create_or_update, ["example.com"], True), # noqa: FBT003 self.mock_call(self.registry.set_host, ["path", "example.com"]), self.expects_status_only( handler, http.HTTPStatus.CREATED, "Resource /path/ updated" ), self.seal_mocks(), ): handler.do_PUT() def test_do_delete_no_token(self) -> None: handler = self._get_handler("/path") with ( self.expects_error( handler, http.HTTPStatus.BAD_REQUEST, "No X-Token header in request" ), self.seal_mocks(), ): handler.do_DELETE() def test_do_delete_invalid_token(self) -> None: handler = self._get_handler("/path", {"X-Token": "secret"}) with ( self.mock_call(self.token_manager.is_valid, ["secret"], False), # noqa: FBT003 self.expects_error(handler, http.HTTPStatus.UNAUTHORIZED, "Invalid token"), self.seal_mocks(), ): handler.do_DELETE() def test_do_delete_invalid_path(self) -> None: handler = self._get_handler("/pa.th", {"X-Token": "secret"}) with ( self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003 self.expects_error(handler, http.HTTPStatus.BAD_REQUEST, "Invalid path"), self.seal_mocks(), ): handler.do_DELETE() def test_do_delete_invalid_token_for_path(self) -> None: handler = self._get_handler("/path", {"X-Token": "secret"}) with ( self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003 self.mock_call( self.token_manager.is_valid_for_path, ["secret", "path"], False, # noqa: FBT003 ), self.expects_error( handler, http.HTTPStatus.FORBIDDEN, "Path forbidden for this token" ), self.seal_mocks(), ): handler.do_DELETE() def test_do_delete_not_found(self) -> None: handler = self._get_handler("/path", {"X-Token": "secret"}) with ( self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003 self.mock_call( self.token_manager.is_valid_for_path, ["secret", "path"], True, # noqa: FBT003 ), self.mock_call(self.data_dir.exists, ["path"], False), # noqa: FBT003 self.expects_error(handler, http.HTTPStatus.NOT_FOUND, "Not found"), self.seal_mocks(), ): handler.do_DELETE() def test_do_delete_remove_error(self) -> None: handler = self._get_handler("/path", {"X-Token": "secret"}) self.data_dir.remove.side_effect = Exception with ( self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003 self.mock_call( self.token_manager.is_valid_for_path, ["secret", "path"], True, # noqa: FBT003 ), self.mock_call(self.data_dir.exists, ["path"], True), # noqa: FBT003 self.mock_call(self.data_dir.exists, ["path"], True), # noqa: FBT003 self.expects_error(handler, http.HTTPStatus.INTERNAL_SERVER_ERROR, ""), self.seal_mocks(), ): handler.do_DELETE() self.data_dir.remove.assert_called_once_with("path") def test_do_delete_ok(self) -> None: handler = self._get_handler("/path", {"X-Token": "secret"}) with ( self.mock_call(self.token_manager.is_valid, ["secret"], True), # noqa: FBT003 self.mock_call( self.token_manager.is_valid_for_path, ["secret", "path"], True, # noqa: FBT003 ), self.mock_call(self.data_dir.exists, ["path"], True), # noqa: FBT003 self.mock_call(self.data_dir.remove, ["path"]), self.mock_call(self.registry.remove, ["path"]), self.expects_error( handler, http.HTTPStatus.NO_CONTENT, "Resource /path/ removed" ), self.seal_mocks(), ): handler.do_DELETE() def test_list_directory(self) -> None: handler = self._get_handler("/path/", {"Accept": "text/html"}) with ( self.expects_error_full( handler, http.HTTPStatus.NOT_FOUND, "File not found" ), self.seal_mocks(), ): handler.list_directory() def test_translate_path_certbot(self) -> None: handler = self._get_handler() with ( self.patch("http.server.SimpleHTTPRequestHandler.translate_path", count=0), self.seal_mocks(), ): self.assertEqual( handler.translate_path("/.well-known/acme-challenge/abcde"), str(self.certbot_www / ".well-known" / "acme-challenge" / "abcde"), ) def test_translate_path_host_not_found(self) -> None: handler = self._get_handler(headers={"Host": "example.com"}) with ( self.mock_call(self.registry.get_from_host, ["example.com"]), self.patch("http.server.SimpleHTTPRequestHandler.translate_path", count=0), self.seal_mocks(), ): self.assertEqual( handler.translate_path("/"), "", ) def test_translate_path_invalid(self) -> None: handler = self._get_handler() with ( self.patch("http.server.SimpleHTTPRequestHandler.translate_path", count=0), self.seal_mocks(), ): self.assertEqual( handler.translate_path("/invalid.path"), "", ) def test_translate_path_favicon(self) -> None: handler = self._get_handler() with ( self.patch_call( "http.server.SimpleHTTPRequestHandler.translate_path", ["/favicon.ico"], ), self.seal_mocks(), ): self.assertEqual( handler.translate_path("/favicon.ico"), None, ) def test_translate_path_dotfile(self) -> None: handler = self._get_handler() with ( self.patch("http.server.SimpleHTTPRequestHandler.translate_path", count=0), self.seal_mocks(), ): self.assertEqual( handler.translate_path("/path/.token"), "", ) def test_translate_path_with_host(self) -> None: handler = self._get_handler(headers={"Host": "example.com"}) with ( self.mock_call(self.registry.get_from_host, ["example.com"], Page("path")), self.patch_call( "http.server.SimpleHTTPRequestHandler.translate_path", ["/path/index.html"], ), self.seal_mocks(), ): self.assertEqual( handler.translate_path("/index.html"), None, ) def test_translate_path_default_host(self) -> None: handler = self._get_handler() with ( self.patch_call( "http.server.SimpleHTTPRequestHandler.translate_path", ["/path/index.html"], ), self.seal_mocks(), ): self.assertEqual( handler.translate_path("/path/index.html"), None, ) class TestUpgradeHandler(BaseHandlerTestCase): def _get_handler( self, path: str = "/", headers: dict[str, str | None] | None = None, rfile: io.BufferedIOBase | None = None, ) -> UpgradeHandler: if headers is None: headers = {} with self.patch("http.server.BaseHTTPRequestHandler.__init__"): handler = UpgradeHandler( unittest.mock.MagicMock(), "127.0.0.1", unittest.mock.MagicMock(), params=Parameters(), ) handler.address_string = lambda: "127.0.0.1" # ty:ignore[invalid-assignment] handler.requestline = "GET /" handler.path = path handler.request_version = "HTTP/0.9" handler.headers = collections.defaultdict(lambda: None, headers) # ty:ignore[invalid-assignment] handler.rfile = rfile if rfile is not None else io.BytesIO() handler.wfile = io.BytesIO() handler.logger = unittest.mock.Mock(logging.Logger) return handler def test_do_get(self) -> None: handler = self._get_handler("/file") with self.expects_status_only( handler, http.HTTPStatus.MOVED_PERMANENTLY, headers={"Location": "https://localhost/file"}, ): handler.do_GET() def test_do_head(self) -> None: handler = self._get_handler("/file") with self.expects_status_only( handler, http.HTTPStatus.MOVED_PERMANENTLY, headers={"Location": "https://localhost/file"}, ): handler.do_HEAD()