mirror of https://github.com/home-assistant/core
321 lines
11 KiB
Python
321 lines
11 KiB
Python
"""Test the NMBS config flow."""
|
|
|
|
from typing import Any
|
|
from unittest.mock import AsyncMock
|
|
|
|
import pytest
|
|
|
|
from homeassistant import config_entries
|
|
from homeassistant.components.nmbs.const import (
|
|
CONF_STATION_FROM,
|
|
CONF_STATION_LIVE,
|
|
CONF_STATION_TO,
|
|
DOMAIN,
|
|
)
|
|
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
|
|
from homeassistant.config_entries import SOURCE_IMPORT, SOURCE_USER
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.data_entry_flow import FlowResultType
|
|
from homeassistant.helpers import entity_registry as er
|
|
|
|
from tests.common import MockConfigEntry
|
|
|
|
DUMMY_DATA_IMPORT: dict[str, Any] = {
|
|
"STAT_BRUSSELS_NORTH": "Brussel-Noord/Bruxelles-Nord",
|
|
"STAT_BRUSSELS_CENTRAL": "Brussel-Centraal/Bruxelles-Central",
|
|
"STAT_BRUSSELS_SOUTH": "Brussel-Zuid/Bruxelles-Midi",
|
|
}
|
|
|
|
DUMMY_DATA_ALTERNATIVE_IMPORT: dict[str, Any] = {
|
|
"STAT_BRUSSELS_NORTH": "Brussels-North",
|
|
"STAT_BRUSSELS_CENTRAL": "Brussels-Central",
|
|
"STAT_BRUSSELS_SOUTH": "Brussels-South/Brussels-Midi",
|
|
}
|
|
|
|
DUMMY_DATA: dict[str, Any] = {
|
|
"STAT_BRUSSELS_NORTH": "BE.NMBS.008812005",
|
|
"STAT_BRUSSELS_CENTRAL": "BE.NMBS.008813003",
|
|
"STAT_BRUSSELS_SOUTH": "BE.NMBS.008814001",
|
|
}
|
|
|
|
|
|
async def test_full_flow(
|
|
hass: HomeAssistant, mock_nmbs_client: AsyncMock, mock_setup_entry: AsyncMock
|
|
) -> None:
|
|
"""Test the full flow."""
|
|
result = await hass.config_entries.flow.async_init(
|
|
DOMAIN,
|
|
context={"source": SOURCE_USER},
|
|
)
|
|
assert result["type"] is FlowResultType.FORM
|
|
assert result["errors"] == {}
|
|
|
|
result = await hass.config_entries.flow.async_configure(
|
|
result["flow_id"],
|
|
{
|
|
CONF_STATION_FROM: DUMMY_DATA["STAT_BRUSSELS_NORTH"],
|
|
CONF_STATION_TO: DUMMY_DATA["STAT_BRUSSELS_SOUTH"],
|
|
},
|
|
)
|
|
assert result["type"] is FlowResultType.CREATE_ENTRY
|
|
assert (
|
|
result["title"]
|
|
== "Train from Brussel-Noord/Bruxelles-Nord to Brussel-Zuid/Bruxelles-Midi"
|
|
)
|
|
assert result["data"] == {
|
|
CONF_STATION_FROM: DUMMY_DATA["STAT_BRUSSELS_NORTH"],
|
|
CONF_STATION_TO: DUMMY_DATA["STAT_BRUSSELS_SOUTH"],
|
|
}
|
|
assert (
|
|
result["result"].unique_id
|
|
== f"{DUMMY_DATA['STAT_BRUSSELS_NORTH']}_{DUMMY_DATA['STAT_BRUSSELS_SOUTH']}"
|
|
)
|
|
|
|
|
|
async def test_same_station(
|
|
hass: HomeAssistant, mock_nmbs_client: AsyncMock, mock_setup_entry: AsyncMock
|
|
) -> None:
|
|
"""Test selecting the same station."""
|
|
result = await hass.config_entries.flow.async_init(
|
|
DOMAIN,
|
|
context={"source": SOURCE_USER},
|
|
)
|
|
assert result["type"] is FlowResultType.FORM
|
|
assert result["errors"] == {}
|
|
|
|
result = await hass.config_entries.flow.async_configure(
|
|
result["flow_id"],
|
|
{
|
|
CONF_STATION_FROM: DUMMY_DATA["STAT_BRUSSELS_NORTH"],
|
|
CONF_STATION_TO: DUMMY_DATA["STAT_BRUSSELS_NORTH"],
|
|
},
|
|
)
|
|
assert result["type"] is FlowResultType.FORM
|
|
assert result["errors"] == {"base": "same_station"}
|
|
|
|
result = await hass.config_entries.flow.async_configure(
|
|
result["flow_id"],
|
|
{
|
|
CONF_STATION_FROM: DUMMY_DATA["STAT_BRUSSELS_NORTH"],
|
|
CONF_STATION_TO: DUMMY_DATA["STAT_BRUSSELS_SOUTH"],
|
|
},
|
|
)
|
|
assert result["type"] is FlowResultType.CREATE_ENTRY
|
|
|
|
|
|
async def test_abort_if_exists(
|
|
hass: HomeAssistant, mock_nmbs_client: AsyncMock, mock_config_entry: MockConfigEntry
|
|
) -> None:
|
|
"""Test aborting the flow if the entry already exists."""
|
|
mock_config_entry.add_to_hass(hass)
|
|
result = await hass.config_entries.flow.async_init(
|
|
DOMAIN,
|
|
context={"source": SOURCE_USER},
|
|
data={
|
|
CONF_STATION_FROM: DUMMY_DATA["STAT_BRUSSELS_NORTH"],
|
|
CONF_STATION_TO: DUMMY_DATA["STAT_BRUSSELS_SOUTH"],
|
|
},
|
|
)
|
|
assert result["type"] is FlowResultType.ABORT
|
|
assert result["reason"] == "already_configured"
|
|
|
|
|
|
async def test_unavailable_api(
|
|
hass: HomeAssistant, mock_nmbs_client: AsyncMock
|
|
) -> None:
|
|
"""Test starting a flow by user and api is unavailable."""
|
|
mock_nmbs_client.get_stations.return_value = -1
|
|
result = await hass.config_entries.flow.async_init(
|
|
DOMAIN,
|
|
context={"source": config_entries.SOURCE_USER},
|
|
)
|
|
|
|
assert result["type"] is FlowResultType.ABORT
|
|
assert result["reason"] == "api_unavailable"
|
|
|
|
|
|
async def test_import(
|
|
hass: HomeAssistant, mock_nmbs_client: AsyncMock, mock_setup_entry: AsyncMock
|
|
) -> None:
|
|
"""Test starting a flow by user which filled in data for connection."""
|
|
result = await hass.config_entries.flow.async_init(
|
|
DOMAIN,
|
|
context={"source": SOURCE_IMPORT},
|
|
data={
|
|
CONF_STATION_FROM: DUMMY_DATA_IMPORT["STAT_BRUSSELS_NORTH"],
|
|
CONF_STATION_LIVE: DUMMY_DATA_IMPORT["STAT_BRUSSELS_CENTRAL"],
|
|
CONF_STATION_TO: DUMMY_DATA_IMPORT["STAT_BRUSSELS_SOUTH"],
|
|
},
|
|
)
|
|
|
|
assert result["type"] is FlowResultType.CREATE_ENTRY
|
|
assert (
|
|
result["title"]
|
|
== "Train from Brussel-Noord/Bruxelles-Nord to Brussel-Zuid/Bruxelles-Midi"
|
|
)
|
|
assert result["data"] == {
|
|
CONF_STATION_FROM: "BE.NMBS.008812005",
|
|
CONF_STATION_LIVE: "BE.NMBS.008813003",
|
|
CONF_STATION_TO: "BE.NMBS.008814001",
|
|
}
|
|
assert result["result"].unique_id == "BE.NMBS.008812005_BE.NMBS.008814001"
|
|
|
|
|
|
async def test_step_import_abort_if_already_setup(
|
|
hass: HomeAssistant, mock_nmbs_client: AsyncMock, mock_config_entry: MockConfigEntry
|
|
) -> None:
|
|
"""Test starting a flow by user which filled in data for connection for already existing connection."""
|
|
mock_config_entry.add_to_hass(hass)
|
|
result = await hass.config_entries.flow.async_init(
|
|
DOMAIN,
|
|
context={"source": SOURCE_IMPORT},
|
|
data={
|
|
CONF_STATION_FROM: DUMMY_DATA_IMPORT["STAT_BRUSSELS_NORTH"],
|
|
CONF_STATION_TO: DUMMY_DATA_IMPORT["STAT_BRUSSELS_SOUTH"],
|
|
},
|
|
)
|
|
assert result["type"] is FlowResultType.ABORT
|
|
assert result["reason"] == "already_configured"
|
|
|
|
|
|
async def test_unavailable_api_import(
|
|
hass: HomeAssistant, mock_nmbs_client: AsyncMock
|
|
) -> None:
|
|
"""Test starting a flow by import and api is unavailable."""
|
|
mock_nmbs_client.get_stations.return_value = -1
|
|
result = await hass.config_entries.flow.async_init(
|
|
DOMAIN,
|
|
context={"source": SOURCE_IMPORT},
|
|
data={
|
|
CONF_STATION_FROM: DUMMY_DATA_IMPORT["STAT_BRUSSELS_NORTH"],
|
|
CONF_STATION_LIVE: DUMMY_DATA_IMPORT["STAT_BRUSSELS_CENTRAL"],
|
|
CONF_STATION_TO: DUMMY_DATA_IMPORT["STAT_BRUSSELS_SOUTH"],
|
|
},
|
|
)
|
|
|
|
assert result["type"] is FlowResultType.ABORT
|
|
assert result["reason"] == "api_unavailable"
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("config", "reason"),
|
|
[
|
|
(
|
|
{
|
|
CONF_STATION_FROM: DUMMY_DATA_IMPORT["STAT_BRUSSELS_NORTH"],
|
|
CONF_STATION_TO: "Utrecht Centraal",
|
|
},
|
|
"invalid_station",
|
|
),
|
|
(
|
|
{
|
|
CONF_STATION_FROM: "Utrecht Centraal",
|
|
CONF_STATION_TO: DUMMY_DATA_IMPORT["STAT_BRUSSELS_SOUTH"],
|
|
},
|
|
"invalid_station",
|
|
),
|
|
(
|
|
{
|
|
CONF_STATION_FROM: DUMMY_DATA_IMPORT["STAT_BRUSSELS_NORTH"],
|
|
CONF_STATION_TO: DUMMY_DATA_IMPORT["STAT_BRUSSELS_NORTH"],
|
|
},
|
|
"same_station",
|
|
),
|
|
],
|
|
)
|
|
async def test_invalid_station_name(
|
|
hass: HomeAssistant,
|
|
mock_nmbs_client: AsyncMock,
|
|
config: dict[str, Any],
|
|
reason: str,
|
|
) -> None:
|
|
"""Test importing invalid YAML."""
|
|
result = await hass.config_entries.flow.async_init(
|
|
DOMAIN,
|
|
context={"source": SOURCE_IMPORT},
|
|
data=config,
|
|
)
|
|
|
|
assert result["type"] is FlowResultType.ABORT
|
|
assert result["reason"] == reason
|
|
|
|
|
|
async def test_sensor_id_migration_standardname(
|
|
hass: HomeAssistant,
|
|
mock_nmbs_client: AsyncMock,
|
|
entity_registry: er.EntityRegistry,
|
|
) -> None:
|
|
"""Test migrating unique id."""
|
|
old_unique_id = (
|
|
f"live_{DUMMY_DATA_IMPORT['STAT_BRUSSELS_NORTH']}_"
|
|
f"{DUMMY_DATA_IMPORT['STAT_BRUSSELS_NORTH']}_"
|
|
f"{DUMMY_DATA_IMPORT['STAT_BRUSSELS_SOUTH']}"
|
|
)
|
|
new_unique_id = (
|
|
f"nmbs_live_{DUMMY_DATA['STAT_BRUSSELS_NORTH']}_"
|
|
f"{DUMMY_DATA['STAT_BRUSSELS_NORTH']}_"
|
|
f"{DUMMY_DATA['STAT_BRUSSELS_SOUTH']}"
|
|
)
|
|
old_entry = entity_registry.async_get_or_create(
|
|
SENSOR_DOMAIN, DOMAIN, old_unique_id
|
|
)
|
|
result = await hass.config_entries.flow.async_init(
|
|
DOMAIN,
|
|
context={"source": SOURCE_IMPORT},
|
|
data={
|
|
CONF_STATION_LIVE: DUMMY_DATA_IMPORT["STAT_BRUSSELS_NORTH"],
|
|
CONF_STATION_FROM: DUMMY_DATA_IMPORT["STAT_BRUSSELS_NORTH"],
|
|
CONF_STATION_TO: DUMMY_DATA_IMPORT["STAT_BRUSSELS_SOUTH"],
|
|
},
|
|
)
|
|
|
|
assert result["type"] is FlowResultType.CREATE_ENTRY
|
|
config_entry_id = result["result"].entry_id
|
|
await hass.async_block_till_done()
|
|
entities = er.async_entries_for_config_entry(entity_registry, config_entry_id)
|
|
assert len(entities) == 3
|
|
entities_map = {entity.unique_id: entity for entity in entities}
|
|
assert old_unique_id not in entities_map
|
|
assert new_unique_id in entities_map
|
|
assert entities_map[new_unique_id].id == old_entry.id
|
|
|
|
|
|
async def test_sensor_id_migration_localized_name(
|
|
hass: HomeAssistant,
|
|
mock_nmbs_client: AsyncMock,
|
|
entity_registry: er.EntityRegistry,
|
|
) -> None:
|
|
"""Test migrating unique id."""
|
|
old_unique_id = (
|
|
f"live_{DUMMY_DATA_ALTERNATIVE_IMPORT['STAT_BRUSSELS_NORTH']}_"
|
|
f"{DUMMY_DATA_ALTERNATIVE_IMPORT['STAT_BRUSSELS_NORTH']}_"
|
|
f"{DUMMY_DATA_ALTERNATIVE_IMPORT['STAT_BRUSSELS_SOUTH']}"
|
|
)
|
|
new_unique_id = (
|
|
f"nmbs_live_{DUMMY_DATA['STAT_BRUSSELS_NORTH']}_"
|
|
f"{DUMMY_DATA['STAT_BRUSSELS_NORTH']}_"
|
|
f"{DUMMY_DATA['STAT_BRUSSELS_SOUTH']}"
|
|
)
|
|
old_entry = entity_registry.async_get_or_create(
|
|
SENSOR_DOMAIN, DOMAIN, old_unique_id
|
|
)
|
|
result = await hass.config_entries.flow.async_init(
|
|
DOMAIN,
|
|
context={"source": SOURCE_IMPORT},
|
|
data={
|
|
CONF_STATION_LIVE: DUMMY_DATA_ALTERNATIVE_IMPORT["STAT_BRUSSELS_NORTH"],
|
|
CONF_STATION_FROM: DUMMY_DATA_ALTERNATIVE_IMPORT["STAT_BRUSSELS_NORTH"],
|
|
CONF_STATION_TO: DUMMY_DATA_ALTERNATIVE_IMPORT["STAT_BRUSSELS_SOUTH"],
|
|
},
|
|
)
|
|
|
|
assert result["type"] is FlowResultType.CREATE_ENTRY
|
|
config_entry_id = result["result"].entry_id
|
|
await hass.async_block_till_done()
|
|
entities = er.async_entries_for_config_entry(entity_registry, config_entry_id)
|
|
assert len(entities) == 3
|
|
entities_map = {entity.unique_id: entity for entity in entities}
|
|
assert old_unique_id not in entities_map
|
|
assert new_unique_id in entities_map
|
|
assert entities_map[new_unique_id].id == old_entry.id
|