mirror of https://github.com/home-assistant/core
413 lines
13 KiB
Python
413 lines
13 KiB
Python
"""The tests for the Home Assistant HTTP component."""
|
|
|
|
from http import HTTPStatus
|
|
from ipaddress import ip_address
|
|
import os
|
|
from unittest.mock import AsyncMock, Mock, mock_open, patch
|
|
|
|
from aiohttp import web
|
|
from aiohttp.web_exceptions import HTTPUnauthorized
|
|
from aiohttp.web_middlewares import middleware
|
|
import pytest
|
|
|
|
from homeassistant.components import http
|
|
from homeassistant.components.http import KEY_AUTHENTICATED, KEY_HASS
|
|
from homeassistant.components.http.ban import (
|
|
IP_BANS_FILE,
|
|
KEY_BAN_MANAGER,
|
|
KEY_FAILED_LOGIN_ATTEMPTS,
|
|
process_success_login,
|
|
setup_bans,
|
|
)
|
|
from homeassistant.components.http.view import request_handler_factory
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.exceptions import HomeAssistantError
|
|
from homeassistant.setup import async_setup_component
|
|
|
|
from tests.common import async_get_persistent_notifications
|
|
from tests.test_util import mock_real_ip
|
|
from tests.typing import ClientSessionGenerator
|
|
|
|
SUPERVISOR_IP = "1.2.3.4"
|
|
BANNED_IPS = ["200.201.202.203", "100.64.0.2"]
|
|
BANNED_IPS_WITH_SUPERVISOR = [*BANNED_IPS, SUPERVISOR_IP]
|
|
|
|
|
|
@pytest.fixture(name="hassio_env")
|
|
def hassio_env_fixture(supervisor_is_connected: AsyncMock):
|
|
"""Fixture to inject hassio env."""
|
|
with (
|
|
patch.dict(os.environ, {"SUPERVISOR": "127.0.0.1"}),
|
|
patch.dict(os.environ, {"SUPERVISOR_TOKEN": "123456"}),
|
|
):
|
|
yield
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def gethostbyaddr_mock():
|
|
"""Fixture to mock out I/O on getting host by address."""
|
|
with patch(
|
|
"homeassistant.components.http.ban.gethostbyaddr",
|
|
return_value=("example.com", ["0.0.0.0.in-addr.arpa"], ["0.0.0.0"]),
|
|
):
|
|
yield
|
|
|
|
|
|
async def test_access_from_banned_ip(
|
|
hass: HomeAssistant, aiohttp_client: ClientSessionGenerator
|
|
) -> None:
|
|
"""Test accessing to server from banned IP. Both trusted and not."""
|
|
app = web.Application()
|
|
app[KEY_HASS] = hass
|
|
setup_bans(hass, app, 5)
|
|
set_real_ip = mock_real_ip(app)
|
|
|
|
with patch(
|
|
"homeassistant.components.http.ban.load_yaml_config_file",
|
|
return_value={
|
|
banned_ip: {"banned_at": "2016-11-16T19:20:03"} for banned_ip in BANNED_IPS
|
|
},
|
|
):
|
|
client = await aiohttp_client(app)
|
|
|
|
for remote_addr in BANNED_IPS:
|
|
set_real_ip(remote_addr)
|
|
resp = await client.get("/")
|
|
assert resp.status == HTTPStatus.FORBIDDEN
|
|
|
|
|
|
async def test_access_from_banned_ip_with_partially_broken_yaml_file(
|
|
hass: HomeAssistant,
|
|
aiohttp_client: ClientSessionGenerator,
|
|
caplog: pytest.LogCaptureFixture,
|
|
) -> None:
|
|
"""Test accessing to server from banned IP. Both trusted and not.
|
|
|
|
We inject some garbage into the yaml file to make sure it can
|
|
still load the bans.
|
|
"""
|
|
app = web.Application()
|
|
app[KEY_HASS] = hass
|
|
setup_bans(hass, app, 5)
|
|
set_real_ip = mock_real_ip(app)
|
|
|
|
data = {banned_ip: {"banned_at": "2016-11-16T19:20:03"} for banned_ip in BANNED_IPS}
|
|
data["5.3.3.3"] = {"banned_at": "garbage"}
|
|
|
|
with patch(
|
|
"homeassistant.components.http.ban.load_yaml_config_file",
|
|
return_value=data,
|
|
):
|
|
client = await aiohttp_client(app)
|
|
|
|
for remote_addr in BANNED_IPS:
|
|
set_real_ip(remote_addr)
|
|
resp = await client.get("/")
|
|
assert resp.status == HTTPStatus.FORBIDDEN
|
|
|
|
# Ensure garbage data is ignored
|
|
set_real_ip("5.3.3.3")
|
|
resp = await client.get("/")
|
|
assert resp.status == HTTPStatus.NOT_FOUND
|
|
|
|
assert "Failed to load IP ban" in caplog.text
|
|
|
|
|
|
async def test_no_ip_bans_file(
|
|
hass: HomeAssistant, aiohttp_client: ClientSessionGenerator
|
|
) -> None:
|
|
"""Test no ip bans file."""
|
|
app = web.Application()
|
|
app[KEY_HASS] = hass
|
|
setup_bans(hass, app, 5)
|
|
set_real_ip = mock_real_ip(app)
|
|
|
|
with patch(
|
|
"homeassistant.components.http.ban.load_yaml_config_file",
|
|
side_effect=FileNotFoundError,
|
|
):
|
|
client = await aiohttp_client(app)
|
|
|
|
set_real_ip("4.3.2.1")
|
|
resp = await client.get("/")
|
|
assert resp.status == HTTPStatus.NOT_FOUND
|
|
|
|
|
|
async def test_failure_loading_ip_bans_file(
|
|
hass: HomeAssistant, aiohttp_client: ClientSessionGenerator
|
|
) -> None:
|
|
"""Test failure loading ip bans file."""
|
|
app = web.Application()
|
|
app[KEY_HASS] = hass
|
|
setup_bans(hass, app, 5)
|
|
set_real_ip = mock_real_ip(app)
|
|
|
|
with patch(
|
|
"homeassistant.components.http.ban.load_yaml_config_file",
|
|
side_effect=HomeAssistantError,
|
|
):
|
|
client = await aiohttp_client(app)
|
|
|
|
set_real_ip("4.3.2.1")
|
|
resp = await client.get("/")
|
|
assert resp.status == HTTPStatus.NOT_FOUND
|
|
|
|
|
|
async def test_ip_ban_manager_never_started(
|
|
hass: HomeAssistant,
|
|
aiohttp_client: ClientSessionGenerator,
|
|
caplog: pytest.LogCaptureFixture,
|
|
) -> None:
|
|
"""Test we handle the ip ban manager not being started."""
|
|
app = web.Application()
|
|
app[KEY_HASS] = hass
|
|
setup_bans(hass, app, 5)
|
|
set_real_ip = mock_real_ip(app)
|
|
|
|
with patch(
|
|
"homeassistant.components.http.ban.load_yaml_config_file",
|
|
side_effect=FileNotFoundError,
|
|
):
|
|
client = await aiohttp_client(app)
|
|
|
|
# Mock the manager never being started
|
|
del app[KEY_BAN_MANAGER]
|
|
|
|
set_real_ip("4.3.2.1")
|
|
resp = await client.get("/")
|
|
assert resp.status == HTTPStatus.NOT_FOUND
|
|
assert "IP Ban middleware loaded but banned IPs not loaded" in caplog.text
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("remote_addr", "bans", "status"),
|
|
list(
|
|
zip(
|
|
BANNED_IPS_WITH_SUPERVISOR,
|
|
[1, 1, 0],
|
|
[HTTPStatus.FORBIDDEN, HTTPStatus.FORBIDDEN, HTTPStatus.UNAUTHORIZED],
|
|
strict=False,
|
|
)
|
|
),
|
|
)
|
|
async def test_access_from_supervisor_ip(
|
|
remote_addr,
|
|
bans,
|
|
status,
|
|
hass: HomeAssistant,
|
|
aiohttp_client: ClientSessionGenerator,
|
|
hassio_env,
|
|
resolution_info: AsyncMock,
|
|
) -> None:
|
|
"""Test accessing to server from supervisor IP."""
|
|
app = web.Application()
|
|
app[KEY_HASS] = hass
|
|
|
|
async def unauth_handler(request):
|
|
"""Return a mock web response."""
|
|
raise HTTPUnauthorized
|
|
|
|
app.router.add_get("/", unauth_handler)
|
|
setup_bans(hass, app, 1)
|
|
mock_real_ip(app)(remote_addr)
|
|
|
|
with patch(
|
|
"homeassistant.components.http.ban.load_yaml_config_file",
|
|
return_value={},
|
|
):
|
|
client = await aiohttp_client(app)
|
|
|
|
manager = app[KEY_BAN_MANAGER]
|
|
|
|
assert await async_setup_component(hass, "hassio", {"hassio": {}})
|
|
|
|
m_open = mock_open()
|
|
|
|
with (
|
|
patch.dict(os.environ, {"SUPERVISOR": SUPERVISOR_IP}),
|
|
patch("homeassistant.components.http.ban.open", m_open, create=True),
|
|
):
|
|
resp = await client.get("/")
|
|
assert resp.status == HTTPStatus.UNAUTHORIZED
|
|
assert len(manager.ip_bans_lookup) == bans
|
|
assert m_open.call_count == bans
|
|
|
|
# second request should be forbidden if banned
|
|
resp = await client.get("/")
|
|
assert resp.status == status
|
|
assert len(manager.ip_bans_lookup) == bans
|
|
|
|
|
|
async def test_ban_middleware_not_loaded_by_config(hass: HomeAssistant) -> None:
|
|
"""Test accessing to server from banned IP when feature is off."""
|
|
with patch("homeassistant.components.http.setup_bans") as mock_setup:
|
|
await async_setup_component(
|
|
hass, "http", {"http": {http.CONF_IP_BAN_ENABLED: False}}
|
|
)
|
|
|
|
assert len(mock_setup.mock_calls) == 0
|
|
|
|
|
|
async def test_ban_middleware_loaded_by_default(hass: HomeAssistant) -> None:
|
|
"""Test accessing to server from banned IP when feature is off."""
|
|
with patch("homeassistant.components.http.setup_bans") as mock_setup:
|
|
await async_setup_component(hass, "http", {"http": {}})
|
|
|
|
assert len(mock_setup.mock_calls) == 1
|
|
|
|
|
|
async def test_ip_bans_file_creation(
|
|
hass: HomeAssistant,
|
|
aiohttp_client: ClientSessionGenerator,
|
|
caplog: pytest.LogCaptureFixture,
|
|
) -> None:
|
|
"""Testing if banned IP file created."""
|
|
app = web.Application()
|
|
app[KEY_HASS] = hass
|
|
|
|
async def unauth_handler(request):
|
|
"""Return a mock web response."""
|
|
raise HTTPUnauthorized
|
|
|
|
app.router.add_get("/example", unauth_handler)
|
|
setup_bans(hass, app, 2)
|
|
mock_real_ip(app)("200.201.202.204")
|
|
|
|
with patch(
|
|
"homeassistant.components.http.ban.load_yaml_config_file",
|
|
return_value={
|
|
banned_ip: {"banned_at": "2016-11-16T19:20:03"} for banned_ip in BANNED_IPS
|
|
},
|
|
):
|
|
client = await aiohttp_client(app)
|
|
|
|
manager = app[KEY_BAN_MANAGER]
|
|
m_open = mock_open()
|
|
|
|
with patch("homeassistant.components.http.ban.open", m_open, create=True):
|
|
resp = await client.get("/example")
|
|
assert resp.status == HTTPStatus.UNAUTHORIZED
|
|
assert len(manager.ip_bans_lookup) == len(BANNED_IPS)
|
|
assert m_open.call_count == 0
|
|
|
|
resp = await client.get("/example")
|
|
assert resp.status == HTTPStatus.UNAUTHORIZED
|
|
assert len(manager.ip_bans_lookup) == len(BANNED_IPS) + 1
|
|
m_open.assert_called_once_with(
|
|
hass.config.path(IP_BANS_FILE), "a", encoding="utf8"
|
|
)
|
|
|
|
resp = await client.get("/example")
|
|
assert resp.status == HTTPStatus.FORBIDDEN
|
|
assert m_open.call_count == 1
|
|
|
|
notifications = async_get_persistent_notifications(hass)
|
|
assert len(notifications) == 2
|
|
assert (
|
|
notifications["http-login"]["message"]
|
|
== "Login attempt or request with invalid authentication from example.com (200.201.202.204). See the log for details."
|
|
)
|
|
|
|
assert (
|
|
"Login attempt or request with invalid authentication from example.com (200.201.202.204). Requested URL: '/example'."
|
|
in caplog.text
|
|
)
|
|
|
|
|
|
async def test_failed_login_attempts_counter(
|
|
hass: HomeAssistant, aiohttp_client: ClientSessionGenerator
|
|
) -> None:
|
|
"""Testing if failed login attempts counter increased."""
|
|
app = web.Application()
|
|
app[KEY_HASS] = hass
|
|
|
|
async def auth_handler(request):
|
|
"""Return 200 status code."""
|
|
return None, 200
|
|
|
|
async def auth_true_handler(request):
|
|
"""Return 200 status code."""
|
|
process_success_login(request)
|
|
return None, 200
|
|
|
|
app.router.add_get(
|
|
"/auth_true",
|
|
request_handler_factory(hass, Mock(requires_auth=True), auth_true_handler),
|
|
)
|
|
app.router.add_get(
|
|
"/auth_false",
|
|
request_handler_factory(hass, Mock(requires_auth=True), auth_handler),
|
|
)
|
|
app.router.add_get(
|
|
"/", request_handler_factory(hass, Mock(requires_auth=False), auth_handler)
|
|
)
|
|
|
|
setup_bans(hass, app, 5)
|
|
remote_ip = ip_address("200.201.202.204")
|
|
mock_real_ip(app)("200.201.202.204")
|
|
|
|
@middleware
|
|
async def mock_auth(request, handler):
|
|
"""Mock auth middleware."""
|
|
if "auth_true" in request.path:
|
|
request[KEY_AUTHENTICATED] = True
|
|
else:
|
|
request[KEY_AUTHENTICATED] = False
|
|
return await handler(request)
|
|
|
|
app.middlewares.append(mock_auth)
|
|
|
|
client = await aiohttp_client(app)
|
|
|
|
resp = await client.get("/auth_false")
|
|
assert resp.status == HTTPStatus.UNAUTHORIZED
|
|
assert app[KEY_FAILED_LOGIN_ATTEMPTS][remote_ip] == 1
|
|
|
|
resp = await client.get("/auth_false")
|
|
assert resp.status == HTTPStatus.UNAUTHORIZED
|
|
assert app[KEY_FAILED_LOGIN_ATTEMPTS][remote_ip] == 2
|
|
|
|
resp = await client.get("/")
|
|
assert resp.status == HTTPStatus.OK
|
|
assert app[KEY_FAILED_LOGIN_ATTEMPTS][remote_ip] == 2
|
|
|
|
# This used to check that with trusted networks we reset login attempts
|
|
# We no longer support trusted networks.
|
|
resp = await client.get("/auth_true")
|
|
assert resp.status == HTTPStatus.OK
|
|
assert app[KEY_FAILED_LOGIN_ATTEMPTS][remote_ip] == 0
|
|
|
|
resp = await client.get("/auth_false")
|
|
assert resp.status == HTTPStatus.UNAUTHORIZED
|
|
assert app[KEY_FAILED_LOGIN_ATTEMPTS][remote_ip] == 1
|
|
|
|
resp = await client.get("/auth_false")
|
|
assert resp.status == HTTPStatus.UNAUTHORIZED
|
|
assert app[KEY_FAILED_LOGIN_ATTEMPTS][remote_ip] == 2
|
|
|
|
|
|
async def test_single_ban_file_entry(
|
|
hass: HomeAssistant,
|
|
) -> None:
|
|
"""Test that only one item is added to ban file."""
|
|
app = web.Application()
|
|
app[KEY_HASS] = hass
|
|
|
|
async def unauth_handler(request):
|
|
"""Return a mock web response."""
|
|
raise HTTPUnauthorized
|
|
|
|
app.router.add_get("/example", unauth_handler)
|
|
setup_bans(hass, app, 2)
|
|
mock_real_ip(app)("200.201.202.204")
|
|
|
|
manager = app[KEY_BAN_MANAGER]
|
|
m_open = mock_open()
|
|
|
|
with patch("homeassistant.components.http.ban.open", m_open, create=True):
|
|
remote_ip = ip_address("200.201.202.204")
|
|
await manager.async_add_ban(remote_ip)
|
|
await manager.async_add_ban(remote_ip)
|
|
|
|
assert m_open.call_count == 1
|