mirror of https://github.com/home-assistant/core
203 lines
6.1 KiB
Python
203 lines
6.1 KiB
Python
"""Test config flow."""
|
|
|
|
import json
|
|
import time
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from qbusmqttapi.discovery import QbusDiscovery
|
|
|
|
from homeassistant.components.qbus.const import CONF_SERIAL_NUMBER, DOMAIN
|
|
from homeassistant.components.qbus.coordinator import QbusConfigCoordinator
|
|
from homeassistant.config_entries import SOURCE_MQTT, SOURCE_USER
|
|
from homeassistant.const import CONF_ID
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.data_entry_flow import FlowResultType
|
|
from homeassistant.helpers.service_info.mqtt import MqttServiceInfo
|
|
from homeassistant.util.json import JsonObjectType
|
|
|
|
from .const import TOPIC_CONFIG
|
|
|
|
_PAYLOAD_DEVICE_STATE = '{"id":"UL1","properties":{"connected":true},"type":"event"}'
|
|
|
|
|
|
async def test_step_discovery_confirm_create_entry(
|
|
hass: HomeAssistant, payload_config: JsonObjectType
|
|
) -> None:
|
|
"""Test mqtt confirm step and entry creation."""
|
|
discovery = MqttServiceInfo(
|
|
subscribed_topic="cloudapp/QBUSMQTTGW/+/state",
|
|
topic="cloudapp/QBUSMQTTGW/UL1/state",
|
|
payload=_PAYLOAD_DEVICE_STATE,
|
|
qos=0,
|
|
retain=False,
|
|
timestamp=time.time(),
|
|
)
|
|
|
|
with (
|
|
patch.object(
|
|
QbusConfigCoordinator,
|
|
"async_get_or_request_config",
|
|
return_value=QbusDiscovery(payload_config),
|
|
),
|
|
):
|
|
result = await hass.config_entries.flow.async_init(
|
|
DOMAIN, context={"source": SOURCE_MQTT}, data=discovery
|
|
)
|
|
|
|
assert result.get("type") == FlowResultType.FORM
|
|
assert result.get("step_id") == "discovery_confirm"
|
|
|
|
result = await hass.config_entries.flow.async_configure(
|
|
result["flow_id"], user_input={}
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
assert result.get("type") == FlowResultType.CREATE_ENTRY
|
|
assert result.get("data") == {
|
|
CONF_ID: "UL1",
|
|
CONF_SERIAL_NUMBER: "000001",
|
|
}
|
|
assert result.get("result").unique_id == "000001"
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("topic", "payload"),
|
|
[
|
|
("cloudapp/QBUSMQTTGW/state", b""),
|
|
("invalid/topic", b"{}"),
|
|
],
|
|
)
|
|
async def test_step_mqtt_invalid(
|
|
hass: HomeAssistant, topic: str, payload: bytes
|
|
) -> None:
|
|
"""Test mqtt discovery with empty payload."""
|
|
discovery = MqttServiceInfo(
|
|
subscribed_topic=topic,
|
|
topic=topic,
|
|
payload=payload,
|
|
qos=0,
|
|
retain=False,
|
|
timestamp=time.time(),
|
|
)
|
|
|
|
result = await hass.config_entries.flow.async_init(
|
|
DOMAIN, context={"source": SOURCE_MQTT}, data=discovery
|
|
)
|
|
|
|
assert result.get("type") == FlowResultType.ABORT
|
|
assert result.get("reason") == "invalid_discovery_info"
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("payload", "mqtt_publish"),
|
|
[
|
|
('{ "online": true }', True),
|
|
('{ "online": false }', False),
|
|
],
|
|
)
|
|
async def test_handle_gateway_topic_when_online(
|
|
hass: HomeAssistant, payload: str, mqtt_publish: bool
|
|
) -> None:
|
|
"""Test handling of gateway topic with payload indicating online."""
|
|
discovery = MqttServiceInfo(
|
|
subscribed_topic="cloudapp/QBUSMQTTGW/state",
|
|
topic="cloudapp/QBUSMQTTGW/state",
|
|
payload=payload,
|
|
qos=0,
|
|
retain=False,
|
|
timestamp=time.time(),
|
|
)
|
|
|
|
with (
|
|
patch("homeassistant.components.mqtt.client.async_publish") as mock_publish,
|
|
):
|
|
result = await hass.config_entries.flow.async_init(
|
|
DOMAIN, context={"source": SOURCE_MQTT}, data=discovery
|
|
)
|
|
|
|
assert mock_publish.called is mqtt_publish
|
|
assert result.get("type") == FlowResultType.ABORT
|
|
assert result.get("reason") == "discovery_in_progress"
|
|
|
|
|
|
async def test_handle_config_topic(
|
|
hass: HomeAssistant, payload_config: JsonObjectType
|
|
) -> None:
|
|
"""Test handling of config topic."""
|
|
|
|
discovery = MqttServiceInfo(
|
|
subscribed_topic=TOPIC_CONFIG,
|
|
topic=TOPIC_CONFIG,
|
|
payload=json.dumps(payload_config),
|
|
qos=0,
|
|
retain=False,
|
|
timestamp=time.time(),
|
|
)
|
|
|
|
with (
|
|
patch("homeassistant.components.mqtt.client.async_publish") as mock_publish,
|
|
):
|
|
result = await hass.config_entries.flow.async_init(
|
|
DOMAIN, context={"source": SOURCE_MQTT}, data=discovery
|
|
)
|
|
|
|
assert mock_publish.called
|
|
assert result.get("type") == FlowResultType.ABORT
|
|
assert result.get("reason") == "discovery_in_progress"
|
|
|
|
|
|
async def test_handle_device_topic_missing_config(hass: HomeAssistant) -> None:
|
|
"""Test handling of device topic when config is missing."""
|
|
discovery = MqttServiceInfo(
|
|
subscribed_topic="cloudapp/QBUSMQTTGW/+/state",
|
|
topic="cloudapp/QBUSMQTTGW/UL1/state",
|
|
payload=_PAYLOAD_DEVICE_STATE,
|
|
qos=0,
|
|
retain=False,
|
|
timestamp=time.time(),
|
|
)
|
|
|
|
result = await hass.config_entries.flow.async_init(
|
|
DOMAIN, context={"source": SOURCE_MQTT}, data=discovery
|
|
)
|
|
|
|
assert result.get("type") == FlowResultType.ABORT
|
|
assert result.get("reason") == "invalid_discovery_info"
|
|
|
|
|
|
async def test_handle_device_topic_device_not_found(
|
|
hass: HomeAssistant, payload_config: JsonObjectType
|
|
) -> None:
|
|
"""Test handling of device topic when device is not found."""
|
|
discovery = MqttServiceInfo(
|
|
subscribed_topic="cloudapp/QBUSMQTTGW/+/state",
|
|
topic="cloudapp/QBUSMQTTGW/UL2/state",
|
|
payload='{"id":"UL2","properties":{"connected":true},"type":"event"}',
|
|
qos=0,
|
|
retain=False,
|
|
timestamp=time.time(),
|
|
)
|
|
|
|
with patch.object(
|
|
QbusConfigCoordinator,
|
|
"async_get_or_request_config",
|
|
return_value=QbusDiscovery(payload_config),
|
|
):
|
|
result = await hass.config_entries.flow.async_init(
|
|
DOMAIN, context={"source": SOURCE_MQTT}, data=discovery
|
|
)
|
|
|
|
assert result.get("type") == FlowResultType.ABORT
|
|
assert result.get("reason") == "invalid_discovery_info"
|
|
|
|
|
|
async def test_step_user_not_supported(hass: HomeAssistant) -> None:
|
|
"""Test user step, which should abort."""
|
|
result = await hass.config_entries.flow.async_init(
|
|
DOMAIN, context={"source": SOURCE_USER}
|
|
)
|
|
|
|
assert result.get("type") == FlowResultType.ABORT
|
|
assert result.get("reason") == "not_supported"
|