mirror of https://github.com/home-assistant/core
505 lines
16 KiB
Python
505 lines
16 KiB
Python
"""Test real forwarded middleware."""
|
|
|
|
from http import HTTPStatus
|
|
from ipaddress import ip_network
|
|
from unittest.mock import Mock, patch
|
|
|
|
from aiohttp import web
|
|
from aiohttp.hdrs import X_FORWARDED_FOR, X_FORWARDED_HOST, X_FORWARDED_PROTO
|
|
import pytest
|
|
|
|
from homeassistant.components.http.forwarded import async_setup_forwarded
|
|
|
|
from tests.typing import ClientSessionGenerator
|
|
|
|
|
|
async def mock_handler(request):
|
|
"""Return the real IP as text."""
|
|
return web.Response(text=request.remote)
|
|
|
|
|
|
async def test_x_forwarded_for_without_trusted_proxy(
|
|
aiohttp_client: ClientSessionGenerator, caplog: pytest.LogCaptureFixture
|
|
) -> None:
|
|
"""Test that we get the IP from the transport."""
|
|
|
|
async def handler(request):
|
|
url = mock_api_client.make_url("/")
|
|
assert request.host == f"{url.host}:{url.port}"
|
|
assert request.scheme == "http"
|
|
assert not request.secure
|
|
assert request.remote == "127.0.0.1"
|
|
|
|
return web.Response()
|
|
|
|
app = web.Application()
|
|
app.router.add_get("/", handler)
|
|
|
|
async_setup_forwarded(app, True, [])
|
|
|
|
mock_api_client = await aiohttp_client(app)
|
|
resp = await mock_api_client.get("/", headers={X_FORWARDED_FOR: "255.255.255.255"})
|
|
|
|
assert resp.status == HTTPStatus.BAD_REQUEST
|
|
assert (
|
|
"Received X-Forwarded-For header from an untrusted proxy 127.0.0.1"
|
|
in caplog.text
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("trusted_proxies", "x_forwarded_for", "remote"),
|
|
[
|
|
(
|
|
["127.0.0.0/24", "1.1.1.1", "10.10.10.0/24"],
|
|
"10.10.10.10, 1.1.1.1",
|
|
"10.10.10.10",
|
|
),
|
|
(["127.0.0.0/24", "1.1.1.1"], "123.123.123.123, 2.2.2.2, 1.1.1.1", "2.2.2.2"),
|
|
(["127.0.0.0/24", "1.1.1.1"], "123.123.123.123,2.2.2.2,1.1.1.1", "2.2.2.2"),
|
|
(["127.0.0.0/24"], "123.123.123.123, 2.2.2.2, 1.1.1.1", "1.1.1.1"),
|
|
(["127.0.0.0/24"], "127.0.0.1", "127.0.0.1"),
|
|
(["127.0.0.1", "1.1.1.1"], "123.123.123.123, 1.1.1.1", "123.123.123.123"),
|
|
(["127.0.0.1", "1.1.1.1"], "123.123.123.123, 2.2.2.2, 1.1.1.1", "2.2.2.2"),
|
|
(["127.0.0.1"], "255.255.255.255", "255.255.255.255"),
|
|
],
|
|
)
|
|
async def test_x_forwarded_for_with_trusted_proxy(
|
|
trusted_proxies, x_forwarded_for, remote, aiohttp_client: ClientSessionGenerator
|
|
) -> None:
|
|
"""Test that we get the IP from the forwarded for header."""
|
|
|
|
async def handler(request):
|
|
url = mock_api_client.make_url("/")
|
|
assert request.host == f"{url.host}:{url.port}"
|
|
assert request.scheme == "http"
|
|
assert not request.secure
|
|
assert request.remote == remote
|
|
|
|
return web.Response()
|
|
|
|
app = web.Application()
|
|
app.router.add_get("/", handler)
|
|
async_setup_forwarded(
|
|
app, True, [ip_network(trusted_proxy) for trusted_proxy in trusted_proxies]
|
|
)
|
|
|
|
mock_api_client = await aiohttp_client(app)
|
|
resp = await mock_api_client.get("/", headers={X_FORWARDED_FOR: x_forwarded_for})
|
|
|
|
assert resp.status == HTTPStatus.OK
|
|
|
|
|
|
async def test_x_forwarded_for_disabled_with_proxy(
|
|
aiohttp_client: ClientSessionGenerator, caplog: pytest.LogCaptureFixture
|
|
) -> None:
|
|
"""Test that we warn when processing is disabled, but proxy has been detected."""
|
|
|
|
async def handler(request):
|
|
url = mock_api_client.make_url("/")
|
|
assert request.host == f"{url.host}:{url.port}"
|
|
assert request.scheme == "http"
|
|
assert not request.secure
|
|
assert request.remote == "127.0.0.1"
|
|
|
|
return web.Response()
|
|
|
|
app = web.Application()
|
|
app.router.add_get("/", handler)
|
|
|
|
async_setup_forwarded(app, False, [])
|
|
|
|
mock_api_client = await aiohttp_client(app)
|
|
resp = await mock_api_client.get("/", headers={X_FORWARDED_FOR: "255.255.255.255"})
|
|
|
|
assert resp.status == HTTPStatus.BAD_REQUEST
|
|
assert (
|
|
"A request from a reverse proxy was received from 127.0.0.1, but your HTTP "
|
|
"integration is not set-up for reverse proxies" in caplog.text
|
|
)
|
|
|
|
|
|
async def test_x_forwarded_for_with_spoofed_header(
|
|
aiohttp_client: ClientSessionGenerator,
|
|
) -> None:
|
|
"""Test that we get the IP from the transport with a spoofed header."""
|
|
|
|
async def handler(request):
|
|
url = mock_api_client.make_url("/")
|
|
assert request.host == f"{url.host}:{url.port}"
|
|
assert request.scheme == "http"
|
|
assert not request.secure
|
|
assert request.remote == "255.255.255.255"
|
|
|
|
return web.Response()
|
|
|
|
app = web.Application()
|
|
app.router.add_get("/", handler)
|
|
async_setup_forwarded(app, True, [ip_network("127.0.0.1")])
|
|
|
|
mock_api_client = await aiohttp_client(app)
|
|
resp = await mock_api_client.get(
|
|
"/", headers={X_FORWARDED_FOR: "222.222.222.222, 255.255.255.255"}
|
|
)
|
|
|
|
assert resp.status == HTTPStatus.OK
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"x_forwarded_for",
|
|
[
|
|
"This value is invalid",
|
|
"1.1.1.1, , 1.2.3.4",
|
|
"1.1.1.1,,1.2.3.4",
|
|
"1.1.1.1, batman, 1.2.3.4",
|
|
"192.168.0.0/24",
|
|
"192.168.0.0/24, 1.1.1.1",
|
|
",",
|
|
"",
|
|
],
|
|
)
|
|
async def test_x_forwarded_for_with_malformed_header(
|
|
x_forwarded_for,
|
|
aiohttp_client: ClientSessionGenerator,
|
|
caplog: pytest.LogCaptureFixture,
|
|
) -> None:
|
|
"""Test that we get a HTTP 400 bad request with a malformed header."""
|
|
app = web.Application()
|
|
app.router.add_get("/", mock_handler)
|
|
async_setup_forwarded(app, True, [ip_network("127.0.0.1")])
|
|
|
|
mock_api_client = await aiohttp_client(app)
|
|
|
|
resp = await mock_api_client.get("/", headers={X_FORWARDED_FOR: x_forwarded_for})
|
|
|
|
assert resp.status == HTTPStatus.BAD_REQUEST
|
|
assert "Invalid IP address in X-Forwarded-For" in caplog.text
|
|
|
|
|
|
async def test_x_forwarded_for_with_multiple_headers(
|
|
aiohttp_client: ClientSessionGenerator, caplog: pytest.LogCaptureFixture
|
|
) -> None:
|
|
"""Test that we get a HTTP 400 bad request with multiple headers."""
|
|
app = web.Application()
|
|
app.router.add_get("/", mock_handler)
|
|
async_setup_forwarded(app, True, [ip_network("127.0.0.1")])
|
|
|
|
mock_api_client = await aiohttp_client(app)
|
|
|
|
resp = await mock_api_client.get(
|
|
"/",
|
|
headers=[
|
|
(X_FORWARDED_FOR, "222.222.222.222"),
|
|
(X_FORWARDED_FOR, "123.123.123.123"),
|
|
],
|
|
)
|
|
|
|
assert resp.status == HTTPStatus.BAD_REQUEST
|
|
assert "Too many headers for X-Forwarded-For" in caplog.text
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("x_forwarded_for", "remote", "x_forwarded_proto", "secure"),
|
|
[
|
|
("10.10.10.10, 127.0.0.1, 127.0.0.2", "10.10.10.10", "https, http, http", True),
|
|
("10.10.10.10, 127.0.0.1, 127.0.0.2", "10.10.10.10", "https,http,http", True),
|
|
("10.10.10.10, 127.0.0.1, 127.0.0.2", "10.10.10.10", "http", False),
|
|
(
|
|
"10.10.10.10, 127.0.0.1, 127.0.0.2",
|
|
"10.10.10.10",
|
|
"http, https, https",
|
|
False,
|
|
),
|
|
("10.10.10.10, 127.0.0.1, 127.0.0.2", "10.10.10.10", "https", True),
|
|
(
|
|
"255.255.255.255, 10.10.10.10, 127.0.0.1",
|
|
"10.10.10.10",
|
|
"http, https, http",
|
|
True,
|
|
),
|
|
(
|
|
"255.255.255.255, 10.10.10.10, 127.0.0.1",
|
|
"10.10.10.10",
|
|
"https, http, https",
|
|
False,
|
|
),
|
|
("255.255.255.255, 10.10.10.10, 127.0.0.1", "10.10.10.10", "https", True),
|
|
],
|
|
)
|
|
async def test_x_forwarded_proto_with_trusted_proxy(
|
|
x_forwarded_for,
|
|
remote,
|
|
x_forwarded_proto,
|
|
secure,
|
|
aiohttp_client: ClientSessionGenerator,
|
|
) -> None:
|
|
"""Test that we get the proto header if proxy is trusted."""
|
|
|
|
async def handler(request):
|
|
assert request.remote == remote
|
|
assert request.scheme == ("https" if secure else "http")
|
|
assert request.secure == secure
|
|
|
|
return web.Response()
|
|
|
|
app = web.Application()
|
|
app.router.add_get("/", handler)
|
|
async_setup_forwarded(app, True, [ip_network("127.0.0.0/24")])
|
|
|
|
mock_api_client = await aiohttp_client(app)
|
|
resp = await mock_api_client.get(
|
|
"/",
|
|
headers={
|
|
X_FORWARDED_FOR: x_forwarded_for,
|
|
X_FORWARDED_PROTO: x_forwarded_proto,
|
|
},
|
|
)
|
|
|
|
assert resp.status == HTTPStatus.OK
|
|
|
|
|
|
async def test_x_forwarded_proto_with_trusted_proxy_multiple_for(
|
|
aiohttp_client: ClientSessionGenerator,
|
|
) -> None:
|
|
"""Test that we get the proto with 1 element in the proto, multiple in the for."""
|
|
|
|
async def handler(request):
|
|
url = mock_api_client.make_url("/")
|
|
assert request.host == f"{url.host}:{url.port}"
|
|
assert request.scheme == "https"
|
|
assert request.secure
|
|
assert request.remote == "255.255.255.255"
|
|
|
|
return web.Response()
|
|
|
|
app = web.Application()
|
|
app.router.add_get("/", handler)
|
|
async_setup_forwarded(app, True, [ip_network("127.0.0.0/24")])
|
|
|
|
mock_api_client = await aiohttp_client(app)
|
|
resp = await mock_api_client.get(
|
|
"/",
|
|
headers={
|
|
X_FORWARDED_FOR: "255.255.255.255, 127.0.0.1, 127.0.0.2",
|
|
X_FORWARDED_PROTO: "https",
|
|
},
|
|
)
|
|
|
|
assert resp.status == HTTPStatus.OK
|
|
|
|
|
|
async def test_x_forwarded_proto_not_processed_without_for(
|
|
aiohttp_client: ClientSessionGenerator,
|
|
) -> None:
|
|
"""Test that proto header isn't processed without a for header."""
|
|
|
|
async def handler(request):
|
|
url = mock_api_client.make_url("/")
|
|
assert request.host == f"{url.host}:{url.port}"
|
|
assert request.scheme == "http"
|
|
assert not request.secure
|
|
assert request.remote == "127.0.0.1"
|
|
|
|
return web.Response()
|
|
|
|
app = web.Application()
|
|
app.router.add_get("/", handler)
|
|
async_setup_forwarded(app, True, [ip_network("127.0.0.1")])
|
|
|
|
mock_api_client = await aiohttp_client(app)
|
|
resp = await mock_api_client.get("/", headers={X_FORWARDED_PROTO: "https"})
|
|
|
|
assert resp.status == HTTPStatus.OK
|
|
|
|
|
|
async def test_x_forwarded_proto_with_multiple_headers(
|
|
aiohttp_client: ClientSessionGenerator, caplog: pytest.LogCaptureFixture
|
|
) -> None:
|
|
"""Test that we get a HTTP 400 bad request with multiple headers."""
|
|
app = web.Application()
|
|
app.router.add_get("/", mock_handler)
|
|
async_setup_forwarded(app, True, [ip_network("127.0.0.1")])
|
|
|
|
mock_api_client = await aiohttp_client(app)
|
|
resp = await mock_api_client.get(
|
|
"/",
|
|
headers=[
|
|
(X_FORWARDED_FOR, "222.222.222.222"),
|
|
(X_FORWARDED_PROTO, "https"),
|
|
(X_FORWARDED_PROTO, "http"),
|
|
],
|
|
)
|
|
|
|
assert resp.status == HTTPStatus.BAD_REQUEST
|
|
assert "Too many headers for X-Forward-Proto" in caplog.text
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"x_forwarded_proto",
|
|
["", ",", "https, , https", "https, https, "],
|
|
)
|
|
async def test_x_forwarded_proto_empty_element(
|
|
x_forwarded_proto,
|
|
aiohttp_client: ClientSessionGenerator,
|
|
caplog: pytest.LogCaptureFixture,
|
|
) -> None:
|
|
"""Test that we get a HTTP 400 bad request with empty proto."""
|
|
app = web.Application()
|
|
app.router.add_get("/", mock_handler)
|
|
async_setup_forwarded(app, True, [ip_network("127.0.0.1")])
|
|
|
|
mock_api_client = await aiohttp_client(app)
|
|
resp = await mock_api_client.get(
|
|
"/",
|
|
headers={X_FORWARDED_FOR: "1.1.1.1", X_FORWARDED_PROTO: x_forwarded_proto},
|
|
)
|
|
|
|
assert resp.status == HTTPStatus.BAD_REQUEST
|
|
assert "Empty item received in X-Forward-Proto header" in caplog.text
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("x_forwarded_for", "x_forwarded_proto", "expected", "got"),
|
|
[
|
|
("1.1.1.1, 2.2.2.2", "https, https, https", 2, 3),
|
|
("1.1.1.1, 2.2.2.2, 3.3.3.3, 4.4.4.4", "https, https, https", 4, 3),
|
|
],
|
|
)
|
|
async def test_x_forwarded_proto_incorrect_number_of_elements(
|
|
x_forwarded_for,
|
|
x_forwarded_proto,
|
|
expected,
|
|
got,
|
|
aiohttp_client: ClientSessionGenerator,
|
|
caplog: pytest.LogCaptureFixture,
|
|
) -> None:
|
|
"""Test that we get a HTTP 400 bad request with incorrect number of elements."""
|
|
app = web.Application()
|
|
app.router.add_get("/", mock_handler)
|
|
async_setup_forwarded(app, True, [ip_network("127.0.0.1")])
|
|
|
|
mock_api_client = await aiohttp_client(app)
|
|
resp = await mock_api_client.get(
|
|
"/",
|
|
headers={
|
|
X_FORWARDED_FOR: x_forwarded_for,
|
|
X_FORWARDED_PROTO: x_forwarded_proto,
|
|
},
|
|
)
|
|
|
|
assert resp.status == HTTPStatus.BAD_REQUEST
|
|
assert (
|
|
f"Incorrect number of elements in X-Forward-Proto. Expected 1 or {expected}, got {got}"
|
|
in caplog.text
|
|
)
|
|
|
|
|
|
async def test_x_forwarded_host_with_trusted_proxy(
|
|
aiohttp_client: ClientSessionGenerator,
|
|
) -> None:
|
|
"""Test that we get the host header if proxy is trusted."""
|
|
|
|
async def handler(request):
|
|
assert request.host == "example.com"
|
|
assert request.scheme == "http"
|
|
assert not request.secure
|
|
assert request.remote == "255.255.255.255"
|
|
|
|
return web.Response()
|
|
|
|
app = web.Application()
|
|
app.router.add_get("/", handler)
|
|
async_setup_forwarded(app, True, [ip_network("127.0.0.1")])
|
|
|
|
mock_api_client = await aiohttp_client(app)
|
|
resp = await mock_api_client.get(
|
|
"/",
|
|
headers={X_FORWARDED_FOR: "255.255.255.255", X_FORWARDED_HOST: "example.com"},
|
|
)
|
|
|
|
assert resp.status == HTTPStatus.OK
|
|
|
|
|
|
async def test_x_forwarded_host_not_processed_without_for(
|
|
aiohttp_client: ClientSessionGenerator,
|
|
) -> None:
|
|
"""Test that host header isn't processed without a for header."""
|
|
|
|
async def handler(request):
|
|
url = mock_api_client.make_url("/")
|
|
assert request.host == f"{url.host}:{url.port}"
|
|
assert request.scheme == "http"
|
|
assert not request.secure
|
|
assert request.remote == "127.0.0.1"
|
|
|
|
return web.Response()
|
|
|
|
app = web.Application()
|
|
app.router.add_get("/", handler)
|
|
async_setup_forwarded(app, True, [ip_network("127.0.0.1")])
|
|
|
|
mock_api_client = await aiohttp_client(app)
|
|
resp = await mock_api_client.get("/", headers={X_FORWARDED_HOST: "example.com"})
|
|
|
|
assert resp.status == HTTPStatus.OK
|
|
|
|
|
|
async def test_x_forwarded_host_with_multiple_headers(
|
|
aiohttp_client: ClientSessionGenerator, caplog: pytest.LogCaptureFixture
|
|
) -> None:
|
|
"""Test that we get a HTTP 400 bad request with multiple headers."""
|
|
app = web.Application()
|
|
app.router.add_get("/", mock_handler)
|
|
async_setup_forwarded(app, True, [ip_network("127.0.0.1")])
|
|
|
|
mock_api_client = await aiohttp_client(app)
|
|
resp = await mock_api_client.get(
|
|
"/",
|
|
headers=[
|
|
(X_FORWARDED_FOR, "222.222.222.222"),
|
|
(X_FORWARDED_HOST, "example.com"),
|
|
(X_FORWARDED_HOST, "example.spoof"),
|
|
],
|
|
)
|
|
|
|
assert resp.status == HTTPStatus.BAD_REQUEST
|
|
assert "Too many headers for X-Forwarded-Host" in caplog.text
|
|
|
|
|
|
async def test_x_forwarded_host_with_empty_header(
|
|
aiohttp_client: ClientSessionGenerator, caplog: pytest.LogCaptureFixture
|
|
) -> None:
|
|
"""Test that we get a HTTP 400 bad request with empty host value."""
|
|
app = web.Application()
|
|
app.router.add_get("/", mock_handler)
|
|
async_setup_forwarded(app, True, [ip_network("127.0.0.1")])
|
|
|
|
mock_api_client = await aiohttp_client(app)
|
|
resp = await mock_api_client.get(
|
|
"/", headers={X_FORWARDED_FOR: "222.222.222.222", X_FORWARDED_HOST: ""}
|
|
)
|
|
|
|
assert resp.status == HTTPStatus.BAD_REQUEST
|
|
assert "Empty value received in X-Forward-Host header" in caplog.text
|
|
|
|
|
|
async def test_x_forwarded_cloud(
|
|
aiohttp_client: ClientSessionGenerator, caplog: pytest.LogCaptureFixture
|
|
) -> None:
|
|
"""Test that cloud requests are not processed."""
|
|
app = web.Application()
|
|
app.router.add_get("/", mock_handler)
|
|
async_setup_forwarded(app, True, [ip_network("127.0.0.1")])
|
|
|
|
mock_api_client = await aiohttp_client(app)
|
|
|
|
with patch(
|
|
"hass_nabucasa.remote.is_cloud_request", Mock(get=Mock(return_value=True))
|
|
):
|
|
resp = await mock_api_client.get(
|
|
"/", headers={X_FORWARDED_FOR: "222.222.222.222", X_FORWARDED_HOST: ""}
|
|
)
|
|
|
|
# This request would normally fail because it's invalid, now it works.
|
|
assert resp.status == HTTPStatus.OK
|