mirror of https://github.com/home-assistant/core
482 lines
16 KiB
Python
482 lines
16 KiB
Python
"""Test the Home Assistant local auth provider."""
|
|
|
|
import asyncio
|
|
from typing import Any
|
|
from unittest.mock import Mock, patch
|
|
|
|
import pytest
|
|
import voluptuous as vol
|
|
|
|
from homeassistant import data_entry_flow
|
|
from homeassistant.auth import auth_manager_from_config, auth_store
|
|
from homeassistant.auth.providers import (
|
|
auth_provider_from_config,
|
|
homeassistant as hass_auth,
|
|
)
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers import issue_registry as ir
|
|
from homeassistant.setup import async_setup_component
|
|
|
|
|
|
@pytest.fixture
|
|
def data(hass: HomeAssistant) -> hass_auth.Data:
|
|
"""Create a loaded data class."""
|
|
data = hass_auth.Data(hass)
|
|
hass.loop.run_until_complete(data.async_load())
|
|
return data
|
|
|
|
|
|
@pytest.fixture
|
|
def legacy_data(hass: HomeAssistant) -> hass_auth.Data:
|
|
"""Create a loaded legacy data class."""
|
|
data = hass_auth.Data(hass)
|
|
hass.loop.run_until_complete(data.async_load())
|
|
data.is_legacy = True
|
|
return data
|
|
|
|
|
|
@pytest.fixture
|
|
async def load_auth_component(hass: HomeAssistant) -> None:
|
|
"""Load the auth component for translations."""
|
|
await async_setup_component(hass, "auth", {})
|
|
|
|
|
|
async def test_validating_password_invalid_user(data: hass_auth.Data) -> None:
|
|
"""Test validating an invalid user."""
|
|
with pytest.raises(hass_auth.InvalidAuth):
|
|
data.validate_login("non-existing", "pw")
|
|
|
|
|
|
async def test_not_allow_set_id() -> None:
|
|
"""Test we are not allowed to set an ID in config."""
|
|
hass = Mock()
|
|
hass.data = {}
|
|
with pytest.raises(vol.Invalid):
|
|
await auth_provider_from_config(
|
|
hass, None, {"type": "homeassistant", "id": "invalid"}
|
|
)
|
|
|
|
|
|
async def test_new_users_populate_values(
|
|
hass: HomeAssistant, data: hass_auth.Data
|
|
) -> None:
|
|
"""Test that we populate data for new users."""
|
|
data.add_auth("hello", "test-pass")
|
|
await data.async_save()
|
|
|
|
manager = await auth_manager_from_config(hass, [{"type": "homeassistant"}], [])
|
|
provider = manager.auth_providers[0]
|
|
credentials = await provider.async_get_or_create_credentials({"username": "hello"})
|
|
user = await manager.async_get_or_create_user(credentials)
|
|
assert user.name == "hello"
|
|
assert user.is_active
|
|
|
|
|
|
async def test_changing_password_raises_invalid_user(data: hass_auth.Data) -> None:
|
|
"""Test that changing password raises invalid user."""
|
|
with pytest.raises(hass_auth.InvalidUser):
|
|
data.change_password("non-existing", "pw")
|
|
|
|
|
|
# Modern mode
|
|
|
|
|
|
async def test_adding_user(data: hass_auth.Data) -> None:
|
|
"""Test adding a user."""
|
|
data.add_auth("test-user", "test-pass")
|
|
data.validate_login(" test-user ", "test-pass")
|
|
|
|
|
|
@pytest.mark.parametrize("username", ["test-user ", "TEST-USER"])
|
|
@pytest.mark.usefixtures("load_auth_component")
|
|
def test_adding_user_not_normalized(data: hass_auth.Data, username: str) -> None:
|
|
"""Test adding a user."""
|
|
with pytest.raises(
|
|
hass_auth.InvalidUsername, match=f'Username "{username}" is not normalized'
|
|
):
|
|
data.add_auth(username, "test-pass")
|
|
|
|
|
|
@pytest.mark.usefixtures("load_auth_component")
|
|
def test_adding_user_duplicate_username(data: hass_auth.Data) -> None:
|
|
"""Test adding a user with duplicate username."""
|
|
data.add_auth("test-user", "test-pass")
|
|
|
|
with pytest.raises(
|
|
hass_auth.InvalidUsername, match='Username "test-user" already exists'
|
|
):
|
|
data.add_auth("test-user", "other-pass")
|
|
|
|
|
|
async def test_validating_password_invalid_password(data: hass_auth.Data) -> None:
|
|
"""Test validating an invalid password."""
|
|
data.add_auth("test-user", "test-pass")
|
|
|
|
with pytest.raises(hass_auth.InvalidAuth):
|
|
data.validate_login(" test-user ", "invalid-pass")
|
|
|
|
with pytest.raises(hass_auth.InvalidAuth):
|
|
data.validate_login("test-user", "test-pass ")
|
|
|
|
with pytest.raises(hass_auth.InvalidAuth):
|
|
data.validate_login("test-user", "Test-pass")
|
|
|
|
|
|
async def test_changing_password(data: hass_auth.Data) -> None:
|
|
"""Test adding a user."""
|
|
data.add_auth("test-user", "test-pass")
|
|
data.change_password("TEST-USER ", "new-pass")
|
|
|
|
with pytest.raises(hass_auth.InvalidAuth):
|
|
data.validate_login("test-user", "test-pass")
|
|
|
|
data.validate_login("test-UsEr", "new-pass")
|
|
|
|
|
|
async def test_login_flow_validates(data: hass_auth.Data, hass: HomeAssistant) -> None:
|
|
"""Test login flow."""
|
|
data.add_auth("test-user", "test-pass")
|
|
await data.async_save()
|
|
|
|
provider = hass_auth.HassAuthProvider(
|
|
hass, auth_store.AuthStore(hass), {"type": "homeassistant"}
|
|
)
|
|
flow = await provider.async_login_flow({})
|
|
result = await flow.async_step_init()
|
|
assert result["type"] == data_entry_flow.FlowResultType.FORM
|
|
|
|
result = await flow.async_step_init(
|
|
{"username": "incorrect-user", "password": "test-pass"}
|
|
)
|
|
assert result["type"] == data_entry_flow.FlowResultType.FORM
|
|
assert result["errors"]["base"] == "invalid_auth"
|
|
|
|
result = await flow.async_step_init(
|
|
{"username": "TEST-user ", "password": "incorrect-pass"}
|
|
)
|
|
assert result["type"] == data_entry_flow.FlowResultType.FORM
|
|
assert result["errors"]["base"] == "invalid_auth"
|
|
|
|
result = await flow.async_step_init(
|
|
{"username": "test-USER", "password": "test-pass"}
|
|
)
|
|
assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
|
|
assert result["data"]["username"] == "test-USER"
|
|
|
|
|
|
async def test_saving_loading(data: hass_auth.Data, hass: HomeAssistant) -> None:
|
|
"""Test saving and loading JSON."""
|
|
data.add_auth("test-user", "test-pass")
|
|
data.add_auth("second-user", "second-pass")
|
|
await data.async_save()
|
|
|
|
data = hass_auth.Data(hass)
|
|
await data.async_load()
|
|
data.validate_login("test-user ", "test-pass")
|
|
data.validate_login("second-user ", "second-pass")
|
|
|
|
|
|
async def test_get_or_create_credentials(
|
|
hass: HomeAssistant, data: hass_auth.Data
|
|
) -> None:
|
|
"""Test that we can get or create credentials."""
|
|
manager = await auth_manager_from_config(hass, [{"type": "homeassistant"}], [])
|
|
provider = manager.auth_providers[0]
|
|
provider.data = data
|
|
credentials1 = await provider.async_get_or_create_credentials({"username": "hello"})
|
|
with patch.object(provider, "async_credentials", return_value=[credentials1]):
|
|
credentials2 = await provider.async_get_or_create_credentials(
|
|
{"username": "hello "}
|
|
)
|
|
assert credentials1 is credentials2
|
|
|
|
|
|
# Legacy mode
|
|
|
|
|
|
async def test_legacy_adding_user(legacy_data: hass_auth.Data) -> None:
|
|
"""Test in legacy mode adding a user."""
|
|
legacy_data.add_auth("test-user", "test-pass")
|
|
legacy_data.validate_login("test-user", "test-pass")
|
|
|
|
|
|
async def test_legacy_validating_password_invalid_password(
|
|
legacy_data: hass_auth.Data,
|
|
) -> None:
|
|
"""Test in legacy mode validating an invalid password."""
|
|
legacy_data.add_auth("test-user", "test-pass")
|
|
|
|
with pytest.raises(hass_auth.InvalidAuth):
|
|
legacy_data.validate_login("test-user", "invalid-pass")
|
|
|
|
|
|
async def test_legacy_changing_password(legacy_data: hass_auth.Data) -> None:
|
|
"""Test in legacy mode adding a user."""
|
|
user = "test-user"
|
|
legacy_data.add_auth(user, "test-pass")
|
|
legacy_data.change_password(user, "new-pass")
|
|
|
|
with pytest.raises(hass_auth.InvalidAuth):
|
|
legacy_data.validate_login(user, "test-pass")
|
|
|
|
legacy_data.validate_login(user, "new-pass")
|
|
|
|
|
|
async def test_legacy_changing_password_raises_invalid_user(
|
|
legacy_data: hass_auth.Data,
|
|
) -> None:
|
|
"""Test in legacy mode that we initialize an empty config."""
|
|
with pytest.raises(hass_auth.InvalidUser):
|
|
legacy_data.change_password("non-existing", "pw")
|
|
|
|
|
|
async def test_legacy_login_flow_validates(
|
|
legacy_data: hass_auth.Data, hass: HomeAssistant
|
|
) -> None:
|
|
"""Test in legacy mode login flow."""
|
|
legacy_data.add_auth("test-user", "test-pass")
|
|
await legacy_data.async_save()
|
|
|
|
provider = hass_auth.HassAuthProvider(
|
|
hass, auth_store.AuthStore(hass), {"type": "homeassistant"}
|
|
)
|
|
flow = await provider.async_login_flow({})
|
|
result = await flow.async_step_init()
|
|
assert result["type"] == data_entry_flow.FlowResultType.FORM
|
|
|
|
result = await flow.async_step_init(
|
|
{"username": "incorrect-user", "password": "test-pass"}
|
|
)
|
|
assert result["type"] == data_entry_flow.FlowResultType.FORM
|
|
assert result["errors"]["base"] == "invalid_auth"
|
|
|
|
result = await flow.async_step_init(
|
|
{"username": "test-user", "password": "incorrect-pass"}
|
|
)
|
|
assert result["type"] == data_entry_flow.FlowResultType.FORM
|
|
assert result["errors"]["base"] == "invalid_auth"
|
|
|
|
result = await flow.async_step_init(
|
|
{"username": "test-user", "password": "test-pass"}
|
|
)
|
|
assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
|
|
assert result["data"]["username"] == "test-user"
|
|
|
|
|
|
async def test_legacy_saving_loading(
|
|
legacy_data: hass_auth.Data, hass: HomeAssistant
|
|
) -> None:
|
|
"""Test in legacy mode saving and loading JSON."""
|
|
legacy_data.add_auth("test-user", "test-pass")
|
|
legacy_data.add_auth("second-user", "second-pass")
|
|
await legacy_data.async_save()
|
|
|
|
legacy_data = hass_auth.Data(hass)
|
|
await legacy_data.async_load()
|
|
legacy_data.is_legacy = True
|
|
legacy_data.validate_login("test-user", "test-pass")
|
|
legacy_data.validate_login("second-user", "second-pass")
|
|
|
|
with pytest.raises(hass_auth.InvalidAuth):
|
|
legacy_data.validate_login("test-user ", "test-pass")
|
|
|
|
|
|
async def test_legacy_get_or_create_credentials(
|
|
hass: HomeAssistant, legacy_data: hass_auth.Data
|
|
) -> None:
|
|
"""Test in legacy mode that we can get or create credentials."""
|
|
manager = await auth_manager_from_config(hass, [{"type": "homeassistant"}], [])
|
|
provider = manager.auth_providers[0]
|
|
provider.data = legacy_data
|
|
credentials1 = await provider.async_get_or_create_credentials({"username": "hello"})
|
|
|
|
with patch.object(provider, "async_credentials", return_value=[credentials1]):
|
|
credentials2 = await provider.async_get_or_create_credentials(
|
|
{"username": "hello"}
|
|
)
|
|
assert credentials1 is credentials2
|
|
|
|
with patch.object(provider, "async_credentials", return_value=[credentials1]):
|
|
credentials3 = await provider.async_get_or_create_credentials(
|
|
{"username": "hello "}
|
|
)
|
|
assert credentials1 is not credentials3
|
|
|
|
|
|
async def test_race_condition_in_data_loading(hass: HomeAssistant) -> None:
|
|
"""Test race condition in the hass_auth.Data loading.
|
|
|
|
Ref issue: https://github.com/home-assistant/core/issues/21569
|
|
"""
|
|
counter = 0
|
|
|
|
async def mock_load(_):
|
|
"""Mock of homeassistant.helpers.storage.Store.async_load."""
|
|
nonlocal counter
|
|
counter += 1
|
|
await asyncio.sleep(0)
|
|
|
|
provider = hass_auth.HassAuthProvider(
|
|
hass, auth_store.AuthStore(hass), {"type": "homeassistant"}
|
|
)
|
|
with patch("homeassistant.helpers.storage.Store.async_load", new=mock_load):
|
|
task1 = provider.async_validate_login("user", "pass")
|
|
task2 = provider.async_validate_login("user", "pass")
|
|
results = await asyncio.gather(task1, task2, return_exceptions=True)
|
|
assert counter == 1
|
|
assert isinstance(results[0], hass_auth.InvalidAuth)
|
|
# results[1] will be a TypeError if race condition occurred
|
|
assert isinstance(results[1], hass_auth.InvalidAuth)
|
|
|
|
|
|
def test_change_username(data: hass_auth.Data) -> None:
|
|
"""Test changing username."""
|
|
data.add_auth("test-user", "test-pass")
|
|
users = data.users
|
|
assert len(users) == 1
|
|
assert users[0]["username"] == "test-user"
|
|
|
|
data.change_username("test-user", "new-user")
|
|
|
|
users = data.users
|
|
assert len(users) == 1
|
|
assert users[0]["username"] == "new-user"
|
|
|
|
|
|
@pytest.mark.parametrize("username", ["test-user ", "TEST-USER"])
|
|
def test_change_username_legacy(legacy_data: hass_auth.Data, username: str) -> None:
|
|
"""Test changing username."""
|
|
# Cannot use add_auth as it normalizes username
|
|
legacy_data.users.append(
|
|
{
|
|
"username": username,
|
|
"password": legacy_data.hash_password("test-pass", True).decode(),
|
|
}
|
|
)
|
|
|
|
users = legacy_data.users
|
|
assert len(users) == 1
|
|
assert users[0]["username"] == username
|
|
|
|
legacy_data.change_username(username, "test-user")
|
|
|
|
users = legacy_data.users
|
|
assert len(users) == 1
|
|
assert users[0]["username"] == "test-user"
|
|
|
|
|
|
def test_change_username_invalid_user(data: hass_auth.Data) -> None:
|
|
"""Test changing username raises on invalid user."""
|
|
data.add_auth("test-user", "test-pass")
|
|
users = data.users
|
|
assert len(users) == 1
|
|
assert users[0]["username"] == "test-user"
|
|
|
|
with pytest.raises(hass_auth.InvalidUser):
|
|
data.change_username("non-existing", "new-user")
|
|
|
|
users = data.users
|
|
assert len(users) == 1
|
|
assert users[0]["username"] == "test-user"
|
|
|
|
|
|
@pytest.mark.usefixtures("load_auth_component")
|
|
async def test_change_username_not_normalized(
|
|
data: hass_auth.Data, hass: HomeAssistant
|
|
) -> None:
|
|
"""Test changing username raises on not normalized username."""
|
|
data.add_auth("test-user", "test-pass")
|
|
|
|
with pytest.raises(
|
|
hass_auth.InvalidUsername, match='Username "TEST-user " is not normalized'
|
|
):
|
|
data.change_username("test-user", "TEST-user ")
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("usernames_in_storage", "usernames_in_repair"),
|
|
[
|
|
(["Uppercase"], '- "Uppercase"'),
|
|
([" leading"], '- " leading"'),
|
|
(["trailing "], '- "trailing "'),
|
|
(["Test", "test", "Fritz "], '- "Fritz "\n- "Test"'),
|
|
],
|
|
)
|
|
async def test_create_repair_on_legacy_usernames(
|
|
hass: HomeAssistant,
|
|
hass_storage: dict[str, Any],
|
|
issue_registry: ir.IssueRegistry,
|
|
usernames_in_storage: list[str],
|
|
usernames_in_repair: str,
|
|
) -> None:
|
|
"""Test that we create a repair issue for legacy usernames."""
|
|
assert not issue_registry.issues.get(
|
|
("auth", "homeassistant_provider_not_normalized_usernames")
|
|
), "Repair issue already exists"
|
|
|
|
hass_storage[hass_auth.STORAGE_KEY] = {
|
|
"version": 1,
|
|
"minor_version": 1,
|
|
"key": "auth_provider.homeassistant",
|
|
"data": {
|
|
"users": [
|
|
{
|
|
"username": username,
|
|
"password": "onlyherebecauseweneedapasswordstring",
|
|
}
|
|
for username in usernames_in_storage
|
|
]
|
|
},
|
|
}
|
|
data = hass_auth.Data(hass)
|
|
await data.async_load()
|
|
issue = issue_registry.issues.get(
|
|
("auth", "homeassistant_provider_not_normalized_usernames")
|
|
)
|
|
assert issue, "Repair issue not created"
|
|
assert issue.translation_placeholders == {"usernames": usernames_in_repair}
|
|
|
|
|
|
async def test_delete_repair_after_fixing_usernames(
|
|
hass: HomeAssistant,
|
|
hass_storage: dict[str, Any],
|
|
issue_registry: ir.IssueRegistry,
|
|
) -> None:
|
|
"""Test that the repair is deleted after fixing the usernames."""
|
|
hass_storage[hass_auth.STORAGE_KEY] = {
|
|
"version": 1,
|
|
"minor_version": 1,
|
|
"key": "auth_provider.homeassistant",
|
|
"data": {
|
|
"users": [
|
|
{
|
|
"username": "Test",
|
|
"password": "onlyherebecauseweneedapasswordstring",
|
|
},
|
|
{
|
|
"username": "bla ",
|
|
"password": "onlyherebecauseweneedapasswordstring",
|
|
},
|
|
]
|
|
},
|
|
}
|
|
data = hass_auth.Data(hass)
|
|
await data.async_load()
|
|
issue = issue_registry.issues.get(
|
|
("auth", "homeassistant_provider_not_normalized_usernames")
|
|
)
|
|
assert issue, "Repair issue not created"
|
|
assert issue.translation_placeholders == {"usernames": '- "Test"\n- "bla "'}
|
|
|
|
data.change_username("Test", "test")
|
|
issue = issue_registry.issues.get(
|
|
("auth", "homeassistant_provider_not_normalized_usernames")
|
|
)
|
|
assert issue
|
|
assert issue.translation_placeholders == {"usernames": '- "bla "'}
|
|
|
|
data.change_username("bla ", "bla")
|
|
assert not issue_registry.issues.get(
|
|
("auth", "homeassistant_provider_not_normalized_usernames")
|
|
), "Repair issue should be deleted"
|