mirror of https://github.com/home-assistant/core
467 lines
14 KiB
Python
467 lines
14 KiB
Python
"""Tests for the floor registry."""
|
|
|
|
from datetime import datetime
|
|
from functools import partial
|
|
import re
|
|
from typing import Any
|
|
|
|
from freezegun.api import FrozenDateTimeFactory
|
|
import pytest
|
|
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers import area_registry as ar, floor_registry as fr
|
|
from homeassistant.util.dt import utcnow
|
|
|
|
from tests.common import async_capture_events, flush_store
|
|
|
|
|
|
async def test_list_floors(floor_registry: fr.FloorRegistry) -> None:
|
|
"""Make sure that we can read floors."""
|
|
floors = floor_registry.async_list_floors()
|
|
assert len(list(floors)) == len(floor_registry.floors)
|
|
|
|
|
|
@pytest.mark.usefixtures("freezer")
|
|
async def test_create_floor(
|
|
hass: HomeAssistant,
|
|
floor_registry: fr.FloorRegistry,
|
|
) -> None:
|
|
"""Make sure that we can create floors."""
|
|
update_events = async_capture_events(hass, fr.EVENT_FLOOR_REGISTRY_UPDATED)
|
|
floor = floor_registry.async_create(
|
|
name="First floor",
|
|
icon="mdi:home-floor-1",
|
|
aliases={"first", "ground", "ground floor"},
|
|
level=1,
|
|
)
|
|
|
|
assert floor == fr.FloorEntry(
|
|
floor_id="first_floor",
|
|
name="First floor",
|
|
icon="mdi:home-floor-1",
|
|
aliases={"first", "ground", "ground floor"},
|
|
level=1,
|
|
created_at=utcnow(),
|
|
modified_at=utcnow(),
|
|
)
|
|
|
|
assert len(floor_registry.floors) == 1
|
|
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(update_events) == 1
|
|
assert update_events[0].data == {
|
|
"action": "create",
|
|
"floor_id": floor.floor_id,
|
|
}
|
|
|
|
|
|
async def test_create_floor_with_name_already_in_use(
|
|
hass: HomeAssistant, floor_registry: fr.FloorRegistry
|
|
) -> None:
|
|
"""Make sure that we can't create a floor with a name already in use."""
|
|
update_events = async_capture_events(hass, fr.EVENT_FLOOR_REGISTRY_UPDATED)
|
|
floor_registry.async_create("First floor")
|
|
|
|
with pytest.raises(
|
|
ValueError,
|
|
match=re.escape("The name First floor (firstfloor) is already in use"),
|
|
):
|
|
floor_registry.async_create("First floor")
|
|
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(floor_registry.floors) == 1
|
|
assert len(update_events) == 1
|
|
|
|
|
|
async def test_create_floor_with_id_already_in_use(
|
|
floor_registry: fr.FloorRegistry,
|
|
) -> None:
|
|
"""Make sure that we can't create an floor with an id already in use."""
|
|
floor = floor_registry.async_create("First")
|
|
|
|
updated_floor = floor_registry.async_update(floor.floor_id, name="Second")
|
|
assert updated_floor.floor_id == floor.floor_id
|
|
|
|
another_floor = floor_registry.async_create("First")
|
|
assert floor.floor_id != another_floor.floor_id
|
|
assert another_floor.floor_id == "first_2"
|
|
|
|
|
|
async def test_delete_floor(
|
|
hass: HomeAssistant, floor_registry: fr.FloorRegistry
|
|
) -> None:
|
|
"""Make sure that we can delete a floor."""
|
|
update_events = async_capture_events(hass, fr.EVENT_FLOOR_REGISTRY_UPDATED)
|
|
floor = floor_registry.async_create("First floor")
|
|
assert len(floor_registry.floors) == 1
|
|
|
|
floor_registry.async_delete(floor.floor_id)
|
|
|
|
assert not floor_registry.floors
|
|
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(update_events) == 2
|
|
assert update_events[0].data == {
|
|
"action": "create",
|
|
"floor_id": floor.floor_id,
|
|
}
|
|
assert update_events[1].data == {
|
|
"action": "remove",
|
|
"floor_id": floor.floor_id,
|
|
}
|
|
|
|
|
|
async def test_delete_non_existing_floor(floor_registry: fr.FloorRegistry) -> None:
|
|
"""Make sure that we can't delete a floor that doesn't exist."""
|
|
floor_registry.async_create("First floor")
|
|
|
|
with pytest.raises(KeyError):
|
|
floor_registry.async_delete("")
|
|
|
|
assert len(floor_registry.floors) == 1
|
|
|
|
|
|
async def test_update_floor(
|
|
hass: HomeAssistant,
|
|
floor_registry: fr.FloorRegistry,
|
|
freezer: FrozenDateTimeFactory,
|
|
) -> None:
|
|
"""Make sure that we can update floors."""
|
|
created_at = datetime.fromisoformat("2024-01-01T01:00:00+00:00")
|
|
freezer.move_to(created_at)
|
|
|
|
update_events = async_capture_events(hass, fr.EVENT_FLOOR_REGISTRY_UPDATED)
|
|
floor = floor_registry.async_create("First floor")
|
|
|
|
assert floor == fr.FloorEntry(
|
|
floor_id="first_floor",
|
|
name="First floor",
|
|
icon=None,
|
|
aliases=set(),
|
|
level=None,
|
|
created_at=created_at,
|
|
modified_at=created_at,
|
|
)
|
|
assert len(floor_registry.floors) == 1
|
|
|
|
modified_at = datetime.fromisoformat("2024-02-01T01:00:00+00:00")
|
|
freezer.move_to(modified_at)
|
|
|
|
updated_floor = floor_registry.async_update(
|
|
floor.floor_id,
|
|
name="Second floor",
|
|
icon="mdi:home-floor-2",
|
|
aliases={"ground", "downstairs"},
|
|
level=2,
|
|
)
|
|
|
|
assert updated_floor != floor
|
|
assert updated_floor == fr.FloorEntry(
|
|
floor_id="first_floor",
|
|
name="Second floor",
|
|
icon="mdi:home-floor-2",
|
|
aliases={"ground", "downstairs"},
|
|
level=2,
|
|
created_at=created_at,
|
|
modified_at=modified_at,
|
|
)
|
|
|
|
assert len(floor_registry.floors) == 1
|
|
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(update_events) == 2
|
|
assert update_events[0].data == {
|
|
"action": "create",
|
|
"floor_id": floor.floor_id,
|
|
}
|
|
assert update_events[1].data == {
|
|
"action": "update",
|
|
"floor_id": floor.floor_id,
|
|
}
|
|
|
|
|
|
async def test_update_floor_with_same_data(
|
|
hass: HomeAssistant, floor_registry: fr.FloorRegistry
|
|
) -> None:
|
|
"""Make sure that we can reapply the same data to a floor and it won't update."""
|
|
update_events = async_capture_events(hass, fr.EVENT_FLOOR_REGISTRY_UPDATED)
|
|
floor = floor_registry.async_create(
|
|
"First floor",
|
|
icon="mdi:home-floor-1",
|
|
)
|
|
|
|
updated_floor = floor_registry.async_update(
|
|
floor_id=floor.floor_id,
|
|
name="First floor",
|
|
icon="mdi:home-floor-1",
|
|
)
|
|
assert floor == updated_floor
|
|
|
|
await hass.async_block_till_done()
|
|
|
|
# No update event
|
|
assert len(update_events) == 1
|
|
assert update_events[0].data == {
|
|
"action": "create",
|
|
"floor_id": floor.floor_id,
|
|
}
|
|
|
|
|
|
async def test_update_floor_with_same_name_change_case(
|
|
floor_registry: fr.FloorRegistry,
|
|
) -> None:
|
|
"""Make sure that we can reapply the same name with a different case to a floor."""
|
|
floor = floor_registry.async_create("first floor")
|
|
|
|
updated_floor = floor_registry.async_update(floor.floor_id, name="First floor")
|
|
|
|
assert updated_floor.floor_id == floor.floor_id
|
|
assert updated_floor.name == "First floor"
|
|
assert updated_floor.normalized_name == floor.normalized_name
|
|
assert len(floor_registry.floors) == 1
|
|
|
|
|
|
async def test_update_floor_with_name_already_in_use(
|
|
floor_registry: fr.FloorRegistry,
|
|
) -> None:
|
|
"""Make sure that we can't update a floor with a name already in use."""
|
|
floor1 = floor_registry.async_create("First floor")
|
|
floor2 = floor_registry.async_create("Second floor")
|
|
|
|
with pytest.raises(
|
|
ValueError,
|
|
match=re.escape("The name Second floor (secondfloor) is already in use"),
|
|
):
|
|
floor_registry.async_update(floor1.floor_id, name="Second floor")
|
|
|
|
assert floor1.name == "First floor"
|
|
assert floor2.name == "Second floor"
|
|
assert len(floor_registry.floors) == 2
|
|
|
|
|
|
async def test_update_floor_with_normalized_name_already_in_use(
|
|
floor_registry: fr.FloorRegistry,
|
|
) -> None:
|
|
"""Make sure that we can't update a floor with a normalized name already in use."""
|
|
floor1 = floor_registry.async_create("first")
|
|
floor2 = floor_registry.async_create("S E C O N D")
|
|
|
|
with pytest.raises(
|
|
ValueError, match=re.escape("The name second (second) is already in use")
|
|
):
|
|
floor_registry.async_update(floor1.floor_id, name="second")
|
|
|
|
assert floor1.name == "first"
|
|
assert floor2.name == "S E C O N D"
|
|
assert len(floor_registry.floors) == 2
|
|
|
|
|
|
async def test_load_floors(
|
|
hass: HomeAssistant,
|
|
floor_registry: fr.FloorRegistry,
|
|
freezer: FrozenDateTimeFactory,
|
|
) -> None:
|
|
"""Make sure that we can load/save data correctly."""
|
|
floor1_created = datetime.fromisoformat("2024-01-01T00:00:00+00:00")
|
|
freezer.move_to(floor1_created)
|
|
floor1 = floor_registry.async_create(
|
|
"First floor",
|
|
icon="mdi:home-floor-1",
|
|
aliases={"first", "ground"},
|
|
level=1,
|
|
)
|
|
|
|
floor2_created = datetime.fromisoformat("2024-02-01T00:00:00+00:00")
|
|
freezer.move_to(floor2_created)
|
|
floor2 = floor_registry.async_create(
|
|
"Second floor",
|
|
icon="mdi:home-floor-2",
|
|
aliases={"first", "ground"},
|
|
level=2,
|
|
)
|
|
|
|
assert len(floor_registry.floors) == 2
|
|
|
|
registry2 = fr.FloorRegistry(hass)
|
|
await flush_store(floor_registry._store)
|
|
await registry2.async_load()
|
|
|
|
assert len(registry2.floors) == 2
|
|
assert list(floor_registry.floors) == list(registry2.floors)
|
|
|
|
floor1_registry2 = registry2.async_get_floor_by_name("First floor")
|
|
assert floor1_registry2 == floor1
|
|
|
|
floor2_registry2 = registry2.async_get_floor_by_name("Second floor")
|
|
assert floor2_registry2 == floor2
|
|
|
|
|
|
@pytest.mark.parametrize("load_registries", [False])
|
|
async def test_loading_floors_from_storage(
|
|
hass: HomeAssistant,
|
|
hass_storage: dict[str, Any],
|
|
) -> None:
|
|
"""Test loading stored floors on start."""
|
|
hass_storage[fr.STORAGE_KEY] = {
|
|
"version": fr.STORAGE_VERSION_MAJOR,
|
|
"data": {
|
|
"floors": [
|
|
{
|
|
"icon": "mdi:home-floor-1",
|
|
"floor_id": "first_floor",
|
|
"name": "First floor",
|
|
"aliases": ["first", "ground"],
|
|
"level": 1,
|
|
}
|
|
]
|
|
},
|
|
}
|
|
|
|
await fr.async_load(hass)
|
|
registry = fr.async_get(hass)
|
|
|
|
assert len(registry.floors) == 1
|
|
|
|
|
|
async def test_getting_floor(floor_registry: fr.FloorRegistry) -> None:
|
|
"""Make sure we can get the floors by name."""
|
|
floor = floor_registry.async_create("First floor")
|
|
floor2 = floor_registry.async_get_floor_by_name("first floor")
|
|
floor3 = floor_registry.async_get_floor_by_name("first floor")
|
|
|
|
assert floor == floor2
|
|
assert floor == floor3
|
|
assert floor2 == floor3
|
|
|
|
get_floor = floor_registry.async_get_floor(floor.floor_id)
|
|
assert get_floor == floor
|
|
|
|
|
|
async def test_async_get_floor_by_name_not_found(
|
|
floor_registry: fr.FloorRegistry,
|
|
) -> None:
|
|
"""Make sure we return None for non-existent floors."""
|
|
floor_registry.async_create("First floor")
|
|
|
|
assert len(floor_registry.floors) == 1
|
|
|
|
assert floor_registry.async_get_floor_by_name("non_exist") is None
|
|
|
|
|
|
async def test_floor_removed_from_areas(
|
|
hass: HomeAssistant,
|
|
area_registry: ar.AreaRegistry,
|
|
floor_registry: fr.FloorRegistry,
|
|
) -> None:
|
|
"""Test if floor gets removed from areas when the floor is removed."""
|
|
|
|
floor = floor_registry.async_create("First floor")
|
|
assert len(floor_registry.floors) == 1
|
|
|
|
entry = area_registry.async_create(name="Kitchen")
|
|
area_registry.async_update(entry.id, floor_id=floor.floor_id)
|
|
|
|
entries = ar.async_entries_for_floor(area_registry, floor.floor_id)
|
|
assert len(entries) == 1
|
|
|
|
floor_registry.async_delete(floor.floor_id)
|
|
await hass.async_block_till_done()
|
|
|
|
entries = ar.async_entries_for_floor(area_registry, floor.floor_id)
|
|
assert len(entries) == 0
|
|
|
|
|
|
async def test_async_create_thread_safety(
|
|
hass: HomeAssistant,
|
|
floor_registry: fr.FloorRegistry,
|
|
) -> None:
|
|
"""Test async_create raises when called from wrong thread."""
|
|
with pytest.raises(
|
|
RuntimeError,
|
|
match="Detected code that calls floor_registry.async_create from a thread.",
|
|
):
|
|
await hass.async_add_executor_job(floor_registry.async_create, "any")
|
|
|
|
|
|
async def test_async_delete_thread_safety(
|
|
hass: HomeAssistant,
|
|
floor_registry: fr.FloorRegistry,
|
|
) -> None:
|
|
"""Test async_delete raises when called from wrong thread."""
|
|
any_floor = floor_registry.async_create("any")
|
|
|
|
with pytest.raises(
|
|
RuntimeError,
|
|
match="Detected code that calls floor_registry.async_delete from a thread.",
|
|
):
|
|
await hass.async_add_executor_job(floor_registry.async_delete, any_floor)
|
|
|
|
|
|
async def test_async_update_thread_safety(
|
|
hass: HomeAssistant,
|
|
floor_registry: fr.FloorRegistry,
|
|
) -> None:
|
|
"""Test async_update raises when called from wrong thread."""
|
|
any_floor = floor_registry.async_create("any")
|
|
|
|
with pytest.raises(
|
|
RuntimeError,
|
|
match="Detected code that calls floor_registry.async_update from a thread.",
|
|
):
|
|
await hass.async_add_executor_job(
|
|
partial(floor_registry.async_update, any_floor.floor_id, name="new name")
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("load_registries", [False])
|
|
async def test_migration_from_1_1(
|
|
hass: HomeAssistant, hass_storage: dict[str, Any]
|
|
) -> None:
|
|
"""Test migration from version 1.1."""
|
|
hass_storage[fr.STORAGE_KEY] = {
|
|
"version": 1,
|
|
"data": {
|
|
"floors": [
|
|
{
|
|
"floor_id": "12345A",
|
|
"name": "mock",
|
|
"aliases": [],
|
|
"icon": None,
|
|
"level": None,
|
|
}
|
|
]
|
|
},
|
|
}
|
|
|
|
await fr.async_load(hass)
|
|
registry = fr.async_get(hass)
|
|
|
|
# Test data was loaded
|
|
entry = registry.async_get_floor_by_name("mock")
|
|
assert entry.floor_id == "12345A"
|
|
|
|
# Check we store migrated data
|
|
await flush_store(registry._store)
|
|
assert hass_storage[fr.STORAGE_KEY] == {
|
|
"version": fr.STORAGE_VERSION_MAJOR,
|
|
"minor_version": fr.STORAGE_VERSION_MINOR,
|
|
"key": fr.STORAGE_KEY,
|
|
"data": {
|
|
"floors": [
|
|
{
|
|
"aliases": [],
|
|
"icon": None,
|
|
"floor_id": "12345A",
|
|
"level": None,
|
|
"name": "mock",
|
|
"created_at": "1970-01-01T00:00:00+00:00",
|
|
"modified_at": "1970-01-01T00:00:00+00:00",
|
|
}
|
|
]
|
|
},
|
|
}
|