mirror of https://github.com/home-assistant/core
2021 lines
69 KiB
Python
2021 lines
69 KiB
Python
"""Common test objects."""
|
|
|
|
from collections.abc import Iterable
|
|
from contextlib import suppress
|
|
import copy
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from unittest.mock import ANY, MagicMock, patch
|
|
|
|
from freezegun import freeze_time
|
|
import pytest
|
|
import voluptuous as vol
|
|
import yaml
|
|
|
|
from homeassistant import config as module_hass_config
|
|
from homeassistant.components import mqtt
|
|
from homeassistant.components.mqtt import debug_info
|
|
from homeassistant.components.mqtt.const import (
|
|
MQTT_CONNECTION_STATE,
|
|
SUPPORTED_COMPONENTS,
|
|
)
|
|
from homeassistant.components.mqtt.entity import MQTT_ATTRIBUTES_BLOCKED
|
|
from homeassistant.components.mqtt.models import PublishPayloadType
|
|
from homeassistant.config_entries import ConfigEntryState
|
|
from homeassistant.const import (
|
|
ATTR_ASSUMED_STATE,
|
|
ATTR_ENTITY_ID,
|
|
SERVICE_RELOAD,
|
|
STATE_UNAVAILABLE,
|
|
EntityCategory,
|
|
)
|
|
from homeassistant.core import HassJobType, HomeAssistant
|
|
from homeassistant.generated.mqtt import MQTT
|
|
from homeassistant.helpers import device_registry as dr, entity_registry as er
|
|
from homeassistant.helpers.dispatcher import async_dispatcher_send
|
|
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
|
from homeassistant.util import dt as dt_util
|
|
|
|
from tests.common import MockConfigEntry, async_fire_mqtt_message
|
|
from tests.typing import MqttMockHAClientGenerator, MqttMockPahoClient
|
|
|
|
DEFAULT_CONFIG_DEVICE_INFO_ID = {
|
|
"identifiers": ["helloworld"],
|
|
"manufacturer": "Whatever",
|
|
"name": "Beer",
|
|
"model": "Glass",
|
|
"model_id": "XYZ001",
|
|
"hw_version": "rev1",
|
|
"serial_number": "1234deadbeef",
|
|
"sw_version": "0.1-beta",
|
|
"suggested_area": "default_area",
|
|
"configuration_url": "http://example.com",
|
|
}
|
|
|
|
DEFAULT_CONFIG_DEVICE_INFO_MAC = {
|
|
"connections": [[dr.CONNECTION_NETWORK_MAC, "02:5b:26:a8:dc:12"]],
|
|
"manufacturer": "Whatever",
|
|
"name": "Beer",
|
|
"model": "Glass",
|
|
"model_id": "XYZ001",
|
|
"hw_version": "rev1",
|
|
"serial_number": "1234deadbeef",
|
|
"sw_version": "0.1-beta",
|
|
"suggested_area": "default_area",
|
|
"configuration_url": "http://example.com",
|
|
}
|
|
|
|
_SENTINEL = object()
|
|
|
|
DISCOVERY_COUNT = len(MQTT)
|
|
DEVICE_DISCOVERY_COUNT = 2
|
|
|
|
type _MqttMessageType = list[tuple[str, str]]
|
|
type _AttributesType = list[tuple[str, Any]]
|
|
type _StateDataType = (
|
|
list[tuple[_MqttMessageType, str, _AttributesType | None]]
|
|
| list[tuple[_MqttMessageType, str, None]]
|
|
)
|
|
|
|
|
|
def help_all_subscribe_calls(mqtt_client_mock: MqttMockPahoClient) -> list[Any]:
|
|
"""Test of a call."""
|
|
all_calls = []
|
|
for call_l1 in mqtt_client_mock.subscribe.mock_calls:
|
|
if isinstance(call_l1[1][0], list):
|
|
for call_l2 in call_l1[1]:
|
|
all_calls.extend(call_l2)
|
|
else:
|
|
all_calls.append(call_l1[1])
|
|
return all_calls
|
|
|
|
|
|
def help_custom_config(
|
|
mqtt_entity_domain: str,
|
|
mqtt_base_config: ConfigType,
|
|
mqtt_entity_configs: Iterable[ConfigType],
|
|
) -> ConfigType:
|
|
"""Tweak a default config for parametrization.
|
|
|
|
Returns a custom config to be used as parametrization for with hass_config,
|
|
based on the supplied mqtt_base_config and updated with mqtt_entity_configs.
|
|
For each item in mqtt_entity_configs an entity instance is added to the config.
|
|
"""
|
|
config: ConfigType = copy.deepcopy(mqtt_base_config)
|
|
entity_instances: list[ConfigType] = []
|
|
for instance in mqtt_entity_configs:
|
|
base: ConfigType = copy.deepcopy(
|
|
mqtt_base_config[mqtt.DOMAIN][mqtt_entity_domain]
|
|
)
|
|
base.update(instance)
|
|
entity_instances.append(base)
|
|
config[mqtt.DOMAIN][mqtt_entity_domain] = entity_instances
|
|
return config
|
|
|
|
|
|
async def help_test_availability_when_connection_lost(
|
|
hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator, domain: str
|
|
) -> None:
|
|
"""Test availability after MQTT disconnection."""
|
|
mqtt_mock = await mqtt_mock_entry()
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state != STATE_UNAVAILABLE
|
|
|
|
mqtt_mock.connected = False
|
|
async_dispatcher_send(hass, MQTT_CONNECTION_STATE, False)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
|
|
|
|
async def help_test_availability_without_topic(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Test availability without defined availability topic."""
|
|
assert "availability_topic" not in config[mqtt.DOMAIN][domain]
|
|
await mqtt_mock_entry()
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state != STATE_UNAVAILABLE
|
|
|
|
|
|
async def help_test_default_availability_payload(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
no_assumed_state: bool = False,
|
|
state_topic: str | None = None,
|
|
state_message: str | None = None,
|
|
) -> None:
|
|
"""Test availability by default payload with defined topic.
|
|
|
|
This is a test helper for the MqttAvailability mixin.
|
|
"""
|
|
# Add availability settings to config
|
|
config = copy.deepcopy(config)
|
|
config[mqtt.DOMAIN][domain]["availability_topic"] = "availability-topic"
|
|
|
|
with patch("homeassistant.config.load_yaml_config_file", return_value=config):
|
|
await mqtt_mock_entry()
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic", "online")
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state != STATE_UNAVAILABLE
|
|
if no_assumed_state:
|
|
assert not state.attributes.get(ATTR_ASSUMED_STATE)
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic", "offline")
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
|
|
if state_topic is not None and state_message is not None:
|
|
async_fire_mqtt_message(hass, state_topic, state_message)
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic", "online")
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state != STATE_UNAVAILABLE
|
|
|
|
|
|
async def help_test_default_availability_list_payload(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
no_assumed_state: bool = False,
|
|
state_topic: str | None = None,
|
|
state_message: str | None = None,
|
|
) -> None:
|
|
"""Test availability by default payload with defined topic.
|
|
|
|
This is a test helper for the MqttAvailability mixin.
|
|
"""
|
|
# Add availability settings to config
|
|
config = copy.deepcopy(config)
|
|
config[mqtt.DOMAIN][domain]["availability"] = [
|
|
{"topic": "availability-topic1"},
|
|
{"topic": "availability-topic2"},
|
|
]
|
|
with patch("homeassistant.config.load_yaml_config_file", return_value=config):
|
|
await mqtt_mock_entry()
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic1", "online")
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state != STATE_UNAVAILABLE
|
|
if no_assumed_state:
|
|
assert not state.attributes.get(ATTR_ASSUMED_STATE)
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic1", "offline")
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic2", "online")
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state != STATE_UNAVAILABLE
|
|
if no_assumed_state:
|
|
assert not state.attributes.get(ATTR_ASSUMED_STATE)
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic2", "offline")
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
|
|
if state_topic is not None and state_message is not None:
|
|
async_fire_mqtt_message(hass, state_topic, state_message)
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic1", "online")
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state != STATE_UNAVAILABLE
|
|
|
|
|
|
async def help_test_default_availability_list_payload_all(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
no_assumed_state: bool = False,
|
|
) -> None:
|
|
"""Test availability by default payload with defined topic.
|
|
|
|
This is a test helper for the MqttAvailability mixin.
|
|
"""
|
|
# Add availability settings to config
|
|
config = copy.deepcopy(config)
|
|
config[mqtt.DOMAIN][domain]["availability_mode"] = "all"
|
|
config[mqtt.DOMAIN][domain]["availability"] = [
|
|
{"topic": "availability-topic1"},
|
|
{"topic": "availability-topic2"},
|
|
]
|
|
with patch("homeassistant.config.load_yaml_config_file", return_value=config):
|
|
await mqtt_mock_entry()
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic1", "online")
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
if no_assumed_state:
|
|
assert not state.attributes.get(ATTR_ASSUMED_STATE)
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic2", "online")
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state != STATE_UNAVAILABLE
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic2", "offline")
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
if no_assumed_state:
|
|
assert not state.attributes.get(ATTR_ASSUMED_STATE)
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic2", "online")
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state != STATE_UNAVAILABLE
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic1", "offline")
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
if no_assumed_state:
|
|
assert not state.attributes.get(ATTR_ASSUMED_STATE)
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic1", "online")
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state != STATE_UNAVAILABLE
|
|
|
|
|
|
async def help_test_default_availability_list_payload_any(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
no_assumed_state: bool = False,
|
|
) -> None:
|
|
"""Test availability by default payload with defined topic.
|
|
|
|
This is a test helper for the MqttAvailability mixin.
|
|
"""
|
|
# Add availability settings to config
|
|
config = copy.deepcopy(config)
|
|
config[mqtt.DOMAIN][domain]["availability_mode"] = "any"
|
|
config[mqtt.DOMAIN][domain]["availability"] = [
|
|
{"topic": "availability-topic1"},
|
|
{"topic": "availability-topic2"},
|
|
]
|
|
with patch("homeassistant.config.load_yaml_config_file", return_value=config):
|
|
await mqtt_mock_entry()
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic1", "online")
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state != STATE_UNAVAILABLE
|
|
if no_assumed_state:
|
|
assert not state.attributes.get(ATTR_ASSUMED_STATE)
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic2", "online")
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state != STATE_UNAVAILABLE
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic2", "offline")
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state != STATE_UNAVAILABLE
|
|
if no_assumed_state:
|
|
assert not state.attributes.get(ATTR_ASSUMED_STATE)
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic1", "offline")
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic1", "online")
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state != STATE_UNAVAILABLE
|
|
if no_assumed_state:
|
|
assert not state.attributes.get(ATTR_ASSUMED_STATE)
|
|
|
|
|
|
async def help_test_default_availability_list_single(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
caplog: pytest.LogCaptureFixture,
|
|
domain: str,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Test availability list and availability_topic are mutually exclusive.
|
|
|
|
This is a test helper for the MqttAvailability mixin.
|
|
"""
|
|
# Add availability settings to config
|
|
config = copy.deepcopy(config)
|
|
config[mqtt.DOMAIN][domain]["availability"] = [
|
|
{"topic": "availability-topic1"},
|
|
]
|
|
config[mqtt.DOMAIN][domain]["availability_topic"] = "availability-topic"
|
|
|
|
with (
|
|
patch("homeassistant.config.load_yaml_config_file", return_value=config),
|
|
suppress(vol.MultipleInvalid),
|
|
):
|
|
await mqtt_mock_entry()
|
|
|
|
assert (
|
|
"two or more values in the same group of exclusion 'availability'"
|
|
in caplog.text
|
|
)
|
|
|
|
|
|
async def help_test_custom_availability_payload(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
no_assumed_state: bool = False,
|
|
state_topic: str | None = None,
|
|
state_message: str | None = None,
|
|
) -> None:
|
|
"""Test availability by custom payload with defined topic.
|
|
|
|
This is a test helper for the MqttAvailability mixin.
|
|
"""
|
|
# Add availability settings to config
|
|
config = copy.deepcopy(config)
|
|
config[mqtt.DOMAIN][domain]["availability_topic"] = "availability-topic"
|
|
config[mqtt.DOMAIN][domain]["payload_available"] = "good"
|
|
config[mqtt.DOMAIN][domain]["payload_not_available"] = "nogood"
|
|
with patch("homeassistant.config.load_yaml_config_file", return_value=config):
|
|
await mqtt_mock_entry()
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic", "good")
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state != STATE_UNAVAILABLE
|
|
if no_assumed_state:
|
|
assert not state.attributes.get(ATTR_ASSUMED_STATE)
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic", "nogood")
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
|
|
if state_topic is not None and state_message is not None:
|
|
async_fire_mqtt_message(hass, state_topic, state_message)
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic", "good")
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state != STATE_UNAVAILABLE
|
|
|
|
|
|
async def help_test_discovery_update_availability(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Test update of discovered MQTTAvailability.
|
|
|
|
This is a test helper for the MQTTAvailability mixin.
|
|
"""
|
|
await mqtt_mock_entry()
|
|
# Add availability settings to config
|
|
config1 = copy.deepcopy(config)
|
|
config1[mqtt.DOMAIN][domain]["availability_topic"] = "availability-topic1"
|
|
config2 = copy.deepcopy(config)
|
|
config2[mqtt.DOMAIN][domain]["availability"] = [
|
|
{"topic": "availability-topic2"},
|
|
{"topic": "availability-topic3"},
|
|
]
|
|
config3 = copy.deepcopy(config)
|
|
config3[mqtt.DOMAIN][domain]["availability_topic"] = "availability-topic4"
|
|
data1 = json.dumps(config1[mqtt.DOMAIN][domain])
|
|
data2 = json.dumps(config2[mqtt.DOMAIN][domain])
|
|
data3 = json.dumps(config3[mqtt.DOMAIN][domain])
|
|
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data1)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic1", "online")
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state != STATE_UNAVAILABLE
|
|
|
|
async_fire_mqtt_message(hass, "availability-topic1", "offline")
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
|
|
# Change availability_topic
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data2)
|
|
await hass.async_block_till_done()
|
|
|
|
# Verify we are no longer subscribing to the old topic
|
|
async_fire_mqtt_message(hass, "availability-topic1", "online")
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
|
|
# Verify we are subscribing to the new topic
|
|
async_fire_mqtt_message(hass, "availability-topic2", "online")
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state != STATE_UNAVAILABLE
|
|
|
|
# Verify we are subscribing to the new topic
|
|
async_fire_mqtt_message(hass, "availability-topic3", "offline")
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
|
|
# Change availability_topic
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data3)
|
|
await hass.async_block_till_done()
|
|
|
|
# Verify we are no longer subscribing to the old topic
|
|
async_fire_mqtt_message(hass, "availability-topic2", "online")
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
|
|
# Verify we are no longer subscribing to the old topic
|
|
async_fire_mqtt_message(hass, "availability-topic3", "online")
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
|
|
# Verify we are subscribing to the new topic
|
|
async_fire_mqtt_message(hass, "availability-topic4", "online")
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state != STATE_UNAVAILABLE
|
|
|
|
|
|
async def help_test_setting_attribute_via_mqtt_json_message(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Test the setting of attribute via MQTT with JSON payload.
|
|
|
|
This is a test helper for the MqttAttributes mixin.
|
|
"""
|
|
# Add JSON attributes settings to config
|
|
config = copy.deepcopy(config)
|
|
config[mqtt.DOMAIN][domain]["json_attributes_topic"] = "attr-topic"
|
|
with patch("homeassistant.config.load_yaml_config_file", return_value=config):
|
|
await mqtt_mock_entry()
|
|
|
|
async_fire_mqtt_message(hass, "attr-topic", '{ "val": "100" }')
|
|
state = hass.states.get(f"{domain}.test")
|
|
|
|
assert state and state.attributes.get("val") == "100"
|
|
|
|
|
|
async def help_test_setting_blocked_attribute_via_mqtt_json_message(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
extra_blocked_attributes: frozenset[str] | None,
|
|
) -> None:
|
|
"""Test the setting of blocked attribute via MQTT with JSON payload.
|
|
|
|
This is a test helper for the MqttAttributes mixin.
|
|
"""
|
|
await mqtt_mock_entry()
|
|
extra_blocked_attribute_list = list(extra_blocked_attributes or [])
|
|
|
|
# Add JSON attributes settings to config
|
|
config = copy.deepcopy(config)
|
|
config[mqtt.DOMAIN][domain]["json_attributes_topic"] = "attr-topic"
|
|
data = json.dumps(config[mqtt.DOMAIN][domain])
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
|
|
await hass.async_block_till_done()
|
|
val = "abc123"
|
|
|
|
for attr in MQTT_ATTRIBUTES_BLOCKED:
|
|
async_fire_mqtt_message(hass, "attr-topic", json.dumps({attr: val}))
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.attributes.get(attr) != val
|
|
|
|
for attr in extra_blocked_attribute_list:
|
|
async_fire_mqtt_message(hass, "attr-topic", json.dumps({attr: val}))
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.attributes.get(attr) != val
|
|
|
|
|
|
async def help_test_setting_attribute_with_template(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Test the setting of attribute via MQTT with JSON payload.
|
|
|
|
This is a test helper for the MqttAttributes mixin.
|
|
"""
|
|
# Add JSON attributes settings to config
|
|
config = copy.deepcopy(config)
|
|
config[mqtt.DOMAIN][domain]["json_attributes_topic"] = "attr-topic"
|
|
config[mqtt.DOMAIN][domain]["json_attributes_template"] = (
|
|
"{{ value_json['Timer1'] | tojson }}"
|
|
)
|
|
with patch("homeassistant.config.load_yaml_config_file", return_value=config):
|
|
await mqtt_mock_entry()
|
|
|
|
async_fire_mqtt_message(
|
|
hass, "attr-topic", json.dumps({"Timer1": {"Arm": 0, "Time": "22:18"}})
|
|
)
|
|
state = hass.states.get(f"{domain}.test")
|
|
|
|
assert state is not None
|
|
assert state.attributes.get("Arm") == 0
|
|
assert state.attributes.get("Time") == "22:18"
|
|
|
|
|
|
async def help_test_update_with_json_attrs_not_dict(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
caplog: pytest.LogCaptureFixture,
|
|
domain: str,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Test attributes get extracted from a JSON result.
|
|
|
|
This is a test helper for the MqttAttributes mixin.
|
|
"""
|
|
# Add JSON attributes settings to config
|
|
config = copy.deepcopy(config)
|
|
config[mqtt.DOMAIN][domain]["json_attributes_topic"] = "attr-topic"
|
|
with patch("homeassistant.config.load_yaml_config_file", return_value=config):
|
|
await mqtt_mock_entry()
|
|
|
|
async_fire_mqtt_message(hass, "attr-topic", '[ "list", "of", "things"]')
|
|
state = hass.states.get(f"{domain}.test")
|
|
|
|
assert state and state.attributes.get("val") is None
|
|
assert "JSON result was not a dictionary" in caplog.text
|
|
|
|
|
|
async def help_test_update_with_json_attrs_bad_json(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
caplog: pytest.LogCaptureFixture,
|
|
domain: str,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Test JSON validation of attributes.
|
|
|
|
This is a test helper for the MqttAttributes mixin.
|
|
"""
|
|
# Add JSON attributes settings to config
|
|
config = copy.deepcopy(config)
|
|
config[mqtt.DOMAIN][domain]["json_attributes_topic"] = "attr-topic"
|
|
with patch("homeassistant.config.load_yaml_config_file", return_value=config):
|
|
await mqtt_mock_entry()
|
|
|
|
async_fire_mqtt_message(hass, "attr-topic", "This is not JSON")
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.attributes.get("val") is None
|
|
assert "Erroneous JSON: This is not JSON" in caplog.text
|
|
|
|
|
|
async def help_test_discovery_update_attr(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Test update of discovered MQTTAttributes.
|
|
|
|
This is a test helper for the MqttAttributes mixin.
|
|
"""
|
|
await mqtt_mock_entry()
|
|
# Add JSON attributes settings to config
|
|
config1 = copy.deepcopy(config)
|
|
config1[mqtt.DOMAIN][domain]["json_attributes_topic"] = "attr-topic1"
|
|
config2 = copy.deepcopy(config)
|
|
config2[mqtt.DOMAIN][domain]["json_attributes_topic"] = "attr-topic2"
|
|
data1 = json.dumps(config1[mqtt.DOMAIN][domain])
|
|
data2 = json.dumps(config2[mqtt.DOMAIN][domain])
|
|
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data1)
|
|
await hass.async_block_till_done()
|
|
async_fire_mqtt_message(hass, "attr-topic1", '{ "val": "100" }')
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.attributes.get("val") == "100"
|
|
|
|
# Change json_attributes_topic
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data2)
|
|
await hass.async_block_till_done()
|
|
|
|
# Verify we are no longer subscribing to the old topic
|
|
async_fire_mqtt_message(hass, "attr-topic1", '{ "val": "50" }')
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.attributes.get("val") != "50"
|
|
|
|
# Verify we are subscribing to the new topic
|
|
async_fire_mqtt_message(hass, "attr-topic2", '{ "val": "75" }')
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.attributes.get("val") == "75"
|
|
|
|
|
|
async def help_test_unique_id(
|
|
hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator, domain: str
|
|
) -> None:
|
|
"""Test unique id option only creates one entity per unique_id."""
|
|
await mqtt_mock_entry()
|
|
await hass.async_block_till_done()
|
|
assert len(hass.states.async_entity_ids(domain)) == 1
|
|
|
|
|
|
async def help_test_discovery_removal(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
data: str,
|
|
) -> None:
|
|
"""Test removal of discovered component.
|
|
|
|
This is a test helper for the MqttDiscoveryUpdate mixin.
|
|
"""
|
|
await mqtt_mock_entry()
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state is not None
|
|
assert state.name == "test"
|
|
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", "")
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state is None
|
|
|
|
|
|
async def help_test_discovery_update(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
discovery_config1: DiscoveryInfoType,
|
|
discovery_config2: DiscoveryInfoType,
|
|
state_data1: _StateDataType | None = None,
|
|
state_data2: _StateDataType | None = None,
|
|
) -> None:
|
|
"""Test update of discovered component.
|
|
|
|
This is a test helper for the MqttDiscoveryUpdate mixin.
|
|
"""
|
|
await mqtt_mock_entry()
|
|
# Add some future configuration to the configurations
|
|
config1 = copy.deepcopy(discovery_config1)
|
|
config1["some_future_option_1"] = "future_option_1"
|
|
config2 = copy.deepcopy(discovery_config2)
|
|
config2["some_future_option_2"] = "future_option_2"
|
|
discovery_data1 = json.dumps(config1)
|
|
discovery_data2 = json.dumps(config2)
|
|
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", discovery_data1)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(f"{domain}.beer")
|
|
assert state is not None
|
|
assert state.name == "Beer"
|
|
|
|
if state_data1:
|
|
for mqtt_messages, expected_state, attributes in state_data1:
|
|
for topic, data in mqtt_messages:
|
|
async_fire_mqtt_message(hass, topic, data)
|
|
state = hass.states.get(f"{domain}.beer")
|
|
assert state is not None
|
|
if expected_state:
|
|
assert state.state == expected_state
|
|
if attributes:
|
|
for attr, value in attributes:
|
|
assert state.attributes.get(attr) == value
|
|
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", discovery_data2)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(f"{domain}.beer")
|
|
assert state is not None
|
|
assert state.name == "Milk"
|
|
|
|
if state_data2:
|
|
for mqtt_messages, expected_state, attributes in state_data2:
|
|
for topic, data in mqtt_messages:
|
|
async_fire_mqtt_message(hass, topic, data)
|
|
state = hass.states.get(f"{domain}.beer")
|
|
assert state is not None
|
|
if expected_state:
|
|
assert state.state == expected_state
|
|
if attributes:
|
|
for attr, value in attributes:
|
|
assert state.attributes.get(attr) == value
|
|
|
|
state = hass.states.get(f"{domain}.milk")
|
|
assert state is None
|
|
|
|
|
|
async def help_test_discovery_update_unchanged(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
data1: str,
|
|
discovery_update: MagicMock,
|
|
) -> None:
|
|
"""Test update of discovered component without changes.
|
|
|
|
This is a test helper for the MqttDiscoveryUpdate mixin.
|
|
"""
|
|
await mqtt_mock_entry()
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data1)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(f"{domain}.beer")
|
|
assert state is not None
|
|
assert state.name == "Beer"
|
|
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data1)
|
|
await hass.async_block_till_done()
|
|
|
|
assert not discovery_update.called
|
|
|
|
|
|
async def help_test_discovery_broken(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
data1: str,
|
|
data2: str,
|
|
) -> None:
|
|
"""Test handling of bad discovery message."""
|
|
await mqtt_mock_entry()
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data1)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(f"{domain}.beer")
|
|
assert state is None
|
|
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data2)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(f"{domain}.milk")
|
|
assert state is not None
|
|
assert state.name == "Milk"
|
|
state = hass.states.get(f"{domain}.beer")
|
|
assert state is None
|
|
|
|
|
|
async def help_test_encoding_subscribable_topics(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
topic: str,
|
|
value: Any,
|
|
attribute: str | None = None,
|
|
attribute_value: Any = None,
|
|
init_payload: tuple[str, str] | None = None,
|
|
skip_raw_test: bool = False,
|
|
) -> None:
|
|
"""Test handling of incoming encoded payload."""
|
|
|
|
async def _test_encoding(
|
|
hass: HomeAssistant,
|
|
entity_id,
|
|
topic,
|
|
encoded_value,
|
|
attribute,
|
|
init_payload_topic,
|
|
init_payload_value,
|
|
) -> Any:
|
|
state = hass.states.get(entity_id)
|
|
|
|
if init_payload_value:
|
|
# Sometimes a device needs to have an initialization pay load, e.g. to switch the device on.
|
|
async_fire_mqtt_message(hass, init_payload_topic, init_payload_value)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(entity_id)
|
|
|
|
async_fire_mqtt_message(hass, topic, encoded_value)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(entity_id)
|
|
assert state is not None
|
|
|
|
if attribute:
|
|
return state.attributes.get(attribute)
|
|
|
|
return state.state if state else None
|
|
|
|
init_payload_value_utf8 = None
|
|
init_payload_value_utf16 = None
|
|
# setup test1 default encoding
|
|
config1 = copy.deepcopy(config)
|
|
if domain == "device_tracker":
|
|
config1["unique_id"] = "test1"
|
|
else:
|
|
config1["name"] = "test1"
|
|
config1[topic] = "topic/test1"
|
|
# setup test2 alternate encoding
|
|
config2 = copy.deepcopy(config)
|
|
if domain == "device_tracker":
|
|
config2["unique_id"] = "test2"
|
|
else:
|
|
config2["name"] = "test2"
|
|
config2["encoding"] = "utf-16"
|
|
config2[topic] = "topic/test2"
|
|
# setup test3 raw encoding
|
|
config3 = copy.deepcopy(config)
|
|
if domain == "device_tracker":
|
|
config3["unique_id"] = "test3"
|
|
else:
|
|
config3["name"] = "test3"
|
|
config3["encoding"] = ""
|
|
config3[topic] = "topic/test3"
|
|
|
|
if init_payload:
|
|
config1[init_payload[0]] = "topic/init_payload1"
|
|
config2[init_payload[0]] = "topic/init_payload2"
|
|
config3[init_payload[0]] = "topic/init_payload3"
|
|
init_payload_value_utf8 = init_payload[1].encode("utf-8")
|
|
init_payload_value_utf16 = init_payload[1].encode("utf-16")
|
|
|
|
await mqtt_mock_entry()
|
|
async_fire_mqtt_message(
|
|
hass, f"homeassistant/{domain}/item1/config", json.dumps(config1)
|
|
)
|
|
async_fire_mqtt_message(
|
|
hass, f"homeassistant/{domain}/item2/config", json.dumps(config2)
|
|
)
|
|
async_fire_mqtt_message(
|
|
hass, f"homeassistant/{domain}/item3/config", json.dumps(config3)
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
expected_result = attribute_value or value
|
|
|
|
# test1 default encoding
|
|
assert (
|
|
await _test_encoding(
|
|
hass,
|
|
f"{domain}.test1",
|
|
"topic/test1",
|
|
value.encode("utf-8"),
|
|
attribute,
|
|
"topic/init_payload1",
|
|
init_payload_value_utf8,
|
|
)
|
|
== expected_result
|
|
)
|
|
|
|
# test2 alternate encoding
|
|
assert (
|
|
await _test_encoding(
|
|
hass,
|
|
f"{domain}.test2",
|
|
"topic/test2",
|
|
value.encode("utf-16"),
|
|
attribute,
|
|
"topic/init_payload2",
|
|
init_payload_value_utf16,
|
|
)
|
|
== expected_result
|
|
)
|
|
|
|
# test3 raw encoded input
|
|
if skip_raw_test:
|
|
return
|
|
|
|
with suppress(AttributeError, TypeError, ValueError):
|
|
result = await _test_encoding(
|
|
hass,
|
|
f"{domain}.test3",
|
|
"topic/test3",
|
|
value.encode("utf-16"),
|
|
attribute,
|
|
"topic/init_payload3",
|
|
init_payload_value_utf16,
|
|
)
|
|
assert result != expected_result
|
|
|
|
|
|
async def help_test_entity_device_info_with_identifier(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Test device registry integration.
|
|
|
|
This is a test helper for the MqttDiscoveryUpdate mixin.
|
|
"""
|
|
await mqtt_mock_entry()
|
|
# Add device settings to config
|
|
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
|
|
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID)
|
|
config["unique_id"] = "veryunique"
|
|
|
|
registry = dr.async_get(hass)
|
|
|
|
data = json.dumps(config)
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
|
|
await hass.async_block_till_done()
|
|
|
|
device = registry.async_get_device(identifiers={("mqtt", "helloworld")})
|
|
assert device is not None
|
|
assert device.identifiers == {("mqtt", "helloworld")}
|
|
assert device.manufacturer == "Whatever"
|
|
assert device.name == "Beer"
|
|
assert device.model == "Glass"
|
|
assert device.model_id == "XYZ001"
|
|
assert device.hw_version == "rev1"
|
|
assert device.sw_version == "0.1-beta"
|
|
assert device.suggested_area == "default_area"
|
|
assert device.configuration_url == "http://example.com"
|
|
|
|
|
|
async def help_test_entity_device_info_with_connection(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Test device registry integration.
|
|
|
|
This is a test helper for the MqttDiscoveryUpdate mixin.
|
|
"""
|
|
await mqtt_mock_entry()
|
|
# Add device settings to config
|
|
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
|
|
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_MAC)
|
|
config["unique_id"] = "veryunique"
|
|
|
|
registry = dr.async_get(hass)
|
|
|
|
data = json.dumps(config)
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
|
|
await hass.async_block_till_done()
|
|
|
|
device = registry.async_get_device(
|
|
connections={(dr.CONNECTION_NETWORK_MAC, "02:5b:26:a8:dc:12")}
|
|
)
|
|
assert device is not None
|
|
assert device.connections == {(dr.CONNECTION_NETWORK_MAC, "02:5b:26:a8:dc:12")}
|
|
assert device.manufacturer == "Whatever"
|
|
assert device.name == "Beer"
|
|
assert device.model == "Glass"
|
|
assert device.model_id == "XYZ001"
|
|
assert device.hw_version == "rev1"
|
|
assert device.sw_version == "0.1-beta"
|
|
assert device.suggested_area == "default_area"
|
|
assert device.configuration_url == "http://example.com"
|
|
|
|
|
|
async def help_test_entity_device_info_remove(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Test device registry remove."""
|
|
await mqtt_mock_entry()
|
|
# Add device settings to config
|
|
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
|
|
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID)
|
|
config["unique_id"] = "veryunique"
|
|
|
|
dev_registry = dr.async_get(hass)
|
|
ent_registry = er.async_get(hass)
|
|
|
|
data = json.dumps(config)
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
|
|
await hass.async_block_till_done()
|
|
|
|
device = dev_registry.async_get_device(identifiers={("mqtt", "helloworld")})
|
|
assert device is not None
|
|
assert ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, "veryunique")
|
|
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", "")
|
|
await hass.async_block_till_done()
|
|
|
|
device = dev_registry.async_get_device(identifiers={("mqtt", "helloworld")})
|
|
assert device is None
|
|
assert not ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, "veryunique")
|
|
|
|
|
|
async def help_test_entity_device_info_update(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Test device registry update.
|
|
|
|
This is a test helper for the MqttDiscoveryUpdate mixin.
|
|
"""
|
|
await mqtt_mock_entry()
|
|
# Add device settings to config
|
|
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
|
|
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID)
|
|
config["unique_id"] = "veryunique"
|
|
|
|
registry = dr.async_get(hass)
|
|
|
|
data = json.dumps(config)
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
|
|
await hass.async_block_till_done()
|
|
|
|
device = registry.async_get_device(identifiers={("mqtt", "helloworld")})
|
|
assert device is not None
|
|
assert device.name == "Beer"
|
|
|
|
config["device"]["name"] = "Milk"
|
|
data = json.dumps(config)
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
|
|
await hass.async_block_till_done()
|
|
|
|
device = registry.async_get_device(identifiers={("mqtt", "helloworld")})
|
|
assert device is not None
|
|
assert device.name == "Milk"
|
|
|
|
|
|
async def help_test_entity_name(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
expected_friendly_name: str | None = None,
|
|
device_class: str | None = None,
|
|
) -> None:
|
|
"""Test device name setup with and without a device_class set.
|
|
|
|
This is a test helper for the _setup_common_attributes_from_config mixin.
|
|
"""
|
|
await mqtt_mock_entry()
|
|
# Add device settings to config
|
|
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
|
|
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID)
|
|
config["unique_id"] = "veryunique"
|
|
expected_entity_name = "test"
|
|
if device_class is not None:
|
|
config["device_class"] = device_class
|
|
# Do not set a name
|
|
config.pop("name")
|
|
expected_entity_name = device_class
|
|
|
|
registry = dr.async_get(hass)
|
|
|
|
data = json.dumps(config)
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
|
|
await hass.async_block_till_done()
|
|
|
|
device = registry.async_get_device({("mqtt", "helloworld")})
|
|
assert device is not None
|
|
|
|
entity_id = f"{domain}.beer_{expected_entity_name}"
|
|
state = hass.states.get(entity_id)
|
|
assert state is not None
|
|
assert state.name == f"Beer {expected_friendly_name}"
|
|
|
|
|
|
async def help_test_entity_id_update_subscriptions(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
topics: list[str] | None = None,
|
|
) -> None:
|
|
"""Test MQTT subscriptions are managed when entity_id is updated."""
|
|
# Add unique_id to config
|
|
config = copy.deepcopy(config)
|
|
config[mqtt.DOMAIN][domain]["unique_id"] = "TOTALLY_UNIQUE"
|
|
|
|
if topics is None:
|
|
# Add default topics to config
|
|
config[mqtt.DOMAIN][domain]["availability_topic"] = "avty-topic"
|
|
config[mqtt.DOMAIN][domain]["state_topic"] = "test-topic"
|
|
topics = ["avty-topic", "test-topic"]
|
|
assert len(topics) > 0
|
|
entity_registry = er.async_get(hass)
|
|
|
|
with patch("homeassistant.config.load_yaml_config_file", return_value=config):
|
|
mqtt_mock = await mqtt_mock_entry()
|
|
assert mqtt_mock is not None
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state is not None
|
|
assert (
|
|
mqtt_mock.async_subscribe.call_count
|
|
== len(topics)
|
|
+ 2 * len(SUPPORTED_COMPONENTS)
|
|
+ DISCOVERY_COUNT
|
|
+ DEVICE_DISCOVERY_COUNT
|
|
)
|
|
for topic in topics:
|
|
mqtt_mock.async_subscribe.assert_any_call(
|
|
topic, ANY, ANY, ANY, HassJobType.Callback
|
|
)
|
|
mqtt_mock.async_subscribe.reset_mock()
|
|
|
|
entity_registry.async_update_entity(
|
|
f"{domain}.test", new_entity_id=f"{domain}.milk"
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state is None
|
|
|
|
state = hass.states.get(f"{domain}.milk")
|
|
assert state is not None
|
|
for topic in topics:
|
|
mqtt_mock.async_subscribe.assert_any_call(
|
|
topic, ANY, ANY, ANY, HassJobType.Callback
|
|
)
|
|
|
|
|
|
async def help_test_entity_id_update_discovery_update(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
topic: str | None = None,
|
|
) -> None:
|
|
"""Test MQTT discovery update after entity_id is updated."""
|
|
# Add unique_id to config
|
|
await mqtt_mock_entry()
|
|
config = copy.deepcopy(config)
|
|
config[mqtt.DOMAIN][domain]["unique_id"] = "TOTALLY_UNIQUE"
|
|
|
|
if topic is None:
|
|
# Add default topic to config
|
|
config[mqtt.DOMAIN][domain]["availability_topic"] = "avty-topic"
|
|
topic = "avty-topic"
|
|
|
|
entity_registry = er.async_get(hass)
|
|
data = json.dumps(config[mqtt.DOMAIN][domain])
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
|
|
await hass.async_block_till_done()
|
|
|
|
async_fire_mqtt_message(hass, topic, "online")
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state != STATE_UNAVAILABLE
|
|
|
|
async_fire_mqtt_message(hass, topic, "offline")
|
|
state = hass.states.get(f"{domain}.test")
|
|
assert state and state.state == STATE_UNAVAILABLE
|
|
|
|
entity_registry.async_update_entity(
|
|
f"{domain}.test", new_entity_id=f"{domain}.milk"
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
config[mqtt.DOMAIN][domain]["availability_topic"] = f"{topic}_2"
|
|
data = json.dumps(config[mqtt.DOMAIN][domain])
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
|
|
await hass.async_block_till_done()
|
|
assert len(hass.states.async_entity_ids(domain)) == 1
|
|
|
|
async_fire_mqtt_message(hass, f"{topic}_2", "online")
|
|
state = hass.states.get(f"{domain}.milk")
|
|
assert state and state.state != STATE_UNAVAILABLE
|
|
|
|
|
|
async def help_test_entity_debug_info(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Test debug_info.
|
|
|
|
This is a test helper for MQTT debug_info.
|
|
"""
|
|
await mqtt_mock_entry()
|
|
# Add device settings to config
|
|
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
|
|
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID)
|
|
config["unique_id"] = "veryunique"
|
|
config["platform"] = "mqtt"
|
|
|
|
registry = dr.async_get(hass)
|
|
|
|
data = json.dumps(config)
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
|
|
await hass.async_block_till_done()
|
|
|
|
device = registry.async_get_device(identifiers={("mqtt", "helloworld")})
|
|
assert device is not None
|
|
|
|
debug_info_data = debug_info.info_for_device(hass, device.id)
|
|
assert len(debug_info_data["entities"]) == 1
|
|
assert (
|
|
debug_info_data["entities"][0]["discovery_data"]["topic"]
|
|
== f"homeassistant/{domain}/bla/config"
|
|
)
|
|
assert debug_info_data["entities"][0]["discovery_data"]["payload"] == config
|
|
assert len(debug_info_data["entities"][0]["subscriptions"]) == 1
|
|
assert {"topic": "test-topic", "messages": []} in debug_info_data["entities"][0][
|
|
"subscriptions"
|
|
]
|
|
assert debug_info_data["entities"][0]["transmitted"] == []
|
|
assert len(debug_info_data["triggers"]) == 0
|
|
|
|
|
|
async def help_test_entity_debug_info_max_messages(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Test debug_info message overflow.
|
|
|
|
This is a test helper for MQTT debug_info.
|
|
"""
|
|
await mqtt_mock_entry()
|
|
# Add device settings to config
|
|
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
|
|
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID)
|
|
config["unique_id"] = "veryunique"
|
|
|
|
registry = dr.async_get(hass)
|
|
|
|
data = json.dumps(config)
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
|
|
await hass.async_block_till_done()
|
|
|
|
device = registry.async_get_device(identifiers={("mqtt", "helloworld")})
|
|
assert device is not None
|
|
|
|
debug_info_data = debug_info.info_for_device(hass, device.id)
|
|
assert len(debug_info_data["entities"][0]["subscriptions"]) == 1
|
|
assert {"topic": "test-topic", "messages": []} in debug_info_data["entities"][0][
|
|
"subscriptions"
|
|
]
|
|
|
|
with freeze_time(start_dt := dt_util.utcnow()):
|
|
for i in range(debug_info.STORED_MESSAGES + 1):
|
|
async_fire_mqtt_message(hass, "test-topic", f"{i}")
|
|
|
|
debug_info_data = debug_info.info_for_device(hass, device.id)
|
|
|
|
assert len(debug_info_data["entities"][0]["subscriptions"]) == 1
|
|
assert (
|
|
len(debug_info_data["entities"][0]["subscriptions"][0]["messages"])
|
|
== debug_info.STORED_MESSAGES
|
|
)
|
|
messages = [
|
|
{
|
|
"payload": f"{i}",
|
|
"qos": 0,
|
|
"retain": False,
|
|
"time": start_dt,
|
|
"topic": "test-topic",
|
|
}
|
|
for i in range(1, debug_info.STORED_MESSAGES + 1)
|
|
]
|
|
assert {"topic": "test-topic", "messages": messages} in debug_info_data["entities"][
|
|
0
|
|
]["subscriptions"]
|
|
|
|
|
|
async def help_test_entity_debug_info_message(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
service: str | None,
|
|
command_topic: str | None = None,
|
|
command_payload: str | None = None,
|
|
state_topic: str | object | None = _SENTINEL,
|
|
state_payload: bytes | str | None = None,
|
|
service_parameters: dict[str, Any] | None = None,
|
|
) -> None:
|
|
"""Test debug_info.
|
|
|
|
This is a test helper for MQTT debug_info.
|
|
"""
|
|
# Add device settings to config
|
|
await mqtt_mock_entry()
|
|
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
|
|
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID)
|
|
config["unique_id"] = "veryunique"
|
|
|
|
if command_topic is None:
|
|
# Add default topic to config
|
|
config["command_topic"] = "command-topic"
|
|
command_topic = "command-topic"
|
|
|
|
if command_payload is None:
|
|
command_payload = "ON"
|
|
|
|
if state_topic is _SENTINEL:
|
|
# Add default topic to config
|
|
config["state_topic"] = "state-topic"
|
|
state_topic = "state-topic"
|
|
|
|
if state_payload is None:
|
|
state_payload = "ON"
|
|
|
|
registry = dr.async_get(hass)
|
|
|
|
data = json.dumps(config)
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
|
|
await hass.async_block_till_done()
|
|
|
|
device = registry.async_get_device(identifiers={("mqtt", "helloworld")})
|
|
assert device is not None
|
|
|
|
debug_info_data = debug_info.info_for_device(hass, device.id)
|
|
|
|
if state_topic is not None:
|
|
assert len(debug_info_data["entities"][0]["subscriptions"]) >= 1
|
|
assert {"topic": state_topic, "messages": []} in debug_info_data["entities"][0][
|
|
"subscriptions"
|
|
]
|
|
|
|
with freeze_time(start_dt := dt_util.utcnow()):
|
|
async_fire_mqtt_message(hass, str(state_topic), state_payload)
|
|
|
|
debug_info_data = debug_info.info_for_device(hass, device.id)
|
|
assert len(debug_info_data["entities"][0]["subscriptions"]) >= 1
|
|
assert {
|
|
"topic": state_topic,
|
|
"messages": [
|
|
{
|
|
"payload": str(state_payload),
|
|
"qos": 0,
|
|
"retain": False,
|
|
"time": start_dt,
|
|
"topic": state_topic,
|
|
}
|
|
],
|
|
} in debug_info_data["entities"][0]["subscriptions"]
|
|
|
|
expected_transmissions = []
|
|
|
|
with freeze_time(start_dt := dt_util.utcnow()):
|
|
if service:
|
|
# Trigger an outgoing MQTT message
|
|
if service:
|
|
service_data = {ATTR_ENTITY_ID: f"{domain}.beer_test"}
|
|
if service_parameters:
|
|
service_data.update(service_parameters)
|
|
|
|
await hass.services.async_call(
|
|
domain,
|
|
service,
|
|
service_data,
|
|
blocking=True,
|
|
)
|
|
|
|
expected_transmissions = [
|
|
{
|
|
"topic": command_topic,
|
|
"messages": [
|
|
{
|
|
"payload": str(command_payload),
|
|
"qos": 0,
|
|
"retain": False,
|
|
"time": start_dt,
|
|
"topic": command_topic,
|
|
}
|
|
],
|
|
}
|
|
]
|
|
|
|
debug_info_data = debug_info.info_for_device(hass, device.id)
|
|
assert debug_info_data["entities"][0]["transmitted"] == expected_transmissions
|
|
|
|
|
|
async def help_test_entity_debug_info_remove(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Test debug_info.
|
|
|
|
This is a test helper for MQTT debug_info.
|
|
"""
|
|
await mqtt_mock_entry()
|
|
# Add device settings to config
|
|
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
|
|
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID)
|
|
config["unique_id"] = "veryunique"
|
|
config["platform"] = "mqtt"
|
|
|
|
registry = dr.async_get(hass)
|
|
|
|
data = json.dumps(config)
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
|
|
await hass.async_block_till_done()
|
|
|
|
device = registry.async_get_device(identifiers={("mqtt", "helloworld")})
|
|
assert device is not None
|
|
|
|
debug_info_data = debug_info.info_for_device(hass, device.id)
|
|
assert len(debug_info_data["entities"]) == 1
|
|
assert (
|
|
debug_info_data["entities"][0]["discovery_data"]["topic"]
|
|
== f"homeassistant/{domain}/bla/config"
|
|
)
|
|
assert debug_info_data["entities"][0]["discovery_data"]["payload"] == config
|
|
assert len(debug_info_data["entities"][0]["subscriptions"]) == 1
|
|
assert {"topic": "test-topic", "messages": []} in debug_info_data["entities"][0][
|
|
"subscriptions"
|
|
]
|
|
assert len(debug_info_data["triggers"]) == 0
|
|
assert debug_info_data["entities"][0]["entity_id"] == f"{domain}.beer_test"
|
|
entity_id = debug_info_data["entities"][0]["entity_id"]
|
|
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", "")
|
|
await hass.async_block_till_done()
|
|
|
|
debug_info_data = debug_info.info_for_device(hass, device.id)
|
|
assert len(debug_info_data["entities"]) == 0
|
|
assert len(debug_info_data["triggers"]) == 0
|
|
assert entity_id not in hass.data["mqtt"].debug_info_entities
|
|
|
|
|
|
async def help_test_entity_debug_info_update_entity_id(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Test debug_info.
|
|
|
|
This is a test helper for MQTT debug_info.
|
|
"""
|
|
await mqtt_mock_entry()
|
|
# Add device settings to config
|
|
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
|
|
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID)
|
|
config["unique_id"] = "veryunique"
|
|
config["platform"] = "mqtt"
|
|
|
|
device_registry = dr.async_get(hass)
|
|
entity_registry = er.async_get(hass)
|
|
data = json.dumps(config)
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
|
|
await hass.async_block_till_done()
|
|
|
|
device = device_registry.async_get_device(identifiers={("mqtt", "helloworld")})
|
|
assert device is not None
|
|
|
|
debug_info_data = debug_info.info_for_device(hass, device.id)
|
|
assert len(debug_info_data["entities"]) == 1
|
|
assert (
|
|
debug_info_data["entities"][0]["discovery_data"]["topic"]
|
|
== f"homeassistant/{domain}/bla/config"
|
|
)
|
|
assert debug_info_data["entities"][0]["discovery_data"]["payload"] == config
|
|
assert debug_info_data["entities"][0]["entity_id"] == f"{domain}.beer_test"
|
|
assert len(debug_info_data["entities"][0]["subscriptions"]) == 1
|
|
assert {"topic": "test-topic", "messages": []} in debug_info_data["entities"][0][
|
|
"subscriptions"
|
|
]
|
|
assert len(debug_info_data["triggers"]) == 0
|
|
|
|
entity_registry.async_update_entity(
|
|
f"{domain}.beer_test", new_entity_id=f"{domain}.milk"
|
|
)
|
|
await hass.async_block_till_done()
|
|
await hass.async_block_till_done()
|
|
|
|
debug_info_data = debug_info.info_for_device(hass, device.id)
|
|
assert len(debug_info_data["entities"]) == 1
|
|
assert (
|
|
debug_info_data["entities"][0]["discovery_data"]["topic"]
|
|
== f"homeassistant/{domain}/bla/config"
|
|
)
|
|
assert debug_info_data["entities"][0]["discovery_data"]["payload"] == config
|
|
assert debug_info_data["entities"][0]["entity_id"] == f"{domain}.milk"
|
|
assert len(debug_info_data["entities"][0]["subscriptions"]) == 1
|
|
assert {"topic": "test-topic", "messages": []} in debug_info_data["entities"][0][
|
|
"subscriptions"
|
|
]
|
|
assert len(debug_info_data["triggers"]) == 0
|
|
assert f"{domain}.beer_test" not in hass.data["mqtt"].debug_info_entities
|
|
|
|
|
|
async def help_test_entity_disabled_by_default(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Test device registry remove."""
|
|
await mqtt_mock_entry()
|
|
# Add device settings to config
|
|
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
|
|
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID)
|
|
config["enabled_by_default"] = False
|
|
config["unique_id"] = "veryunique1"
|
|
|
|
dev_registry = dr.async_get(hass)
|
|
ent_registry = er.async_get(hass)
|
|
|
|
# Discover a disabled entity
|
|
data = json.dumps(config)
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla1/config", data)
|
|
await hass.async_block_till_done()
|
|
entity_id = ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, "veryunique1")
|
|
assert entity_id is not None and hass.states.get(entity_id) is None
|
|
assert dev_registry.async_get_device(identifiers={("mqtt", "helloworld")})
|
|
|
|
# Discover an enabled entity, tied to the same device
|
|
config["enabled_by_default"] = True
|
|
config["unique_id"] = "veryunique2"
|
|
data = json.dumps(config)
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla2/config", data)
|
|
await hass.async_block_till_done()
|
|
entity_id = ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, "veryunique2")
|
|
assert entity_id is not None and hass.states.get(entity_id) is not None
|
|
|
|
# Remove the enabled entity, both entities and the device should be removed
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla2/config", "")
|
|
await hass.async_block_till_done()
|
|
assert not ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, "veryunique1")
|
|
assert not ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, "veryunique2")
|
|
assert not dev_registry.async_get_device(identifiers={("mqtt", "helloworld")})
|
|
|
|
|
|
async def help_test_entity_category(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Test device registry remove."""
|
|
await mqtt_mock_entry()
|
|
# Add device settings to config
|
|
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
|
|
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID)
|
|
|
|
ent_registry = er.async_get(hass)
|
|
|
|
# Discover an entity without entity category
|
|
unique_id = "veryunique1"
|
|
config["unique_id"] = unique_id
|
|
data = json.dumps(config)
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/{unique_id}/config", data)
|
|
await hass.async_block_till_done()
|
|
entity_id = ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, unique_id)
|
|
assert entity_id is not None and hass.states.get(entity_id)
|
|
entry = ent_registry.async_get(entity_id)
|
|
assert entry is not None and entry.entity_category is None
|
|
|
|
# Discover an entity with entity category set to "diagnostic"
|
|
unique_id = "veryunique2"
|
|
config["entity_category"] = EntityCategory.DIAGNOSTIC
|
|
config["unique_id"] = unique_id
|
|
data = json.dumps(config)
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/{unique_id}/config", data)
|
|
await hass.async_block_till_done()
|
|
entity_id = ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, unique_id)
|
|
assert entity_id is not None and hass.states.get(entity_id)
|
|
entry = ent_registry.async_get(entity_id)
|
|
assert entry is not None and entry.entity_category == EntityCategory.DIAGNOSTIC
|
|
|
|
# Discover an entity with entity category set to "no_such_category"
|
|
unique_id = "veryunique3"
|
|
config["entity_category"] = "no_such_category"
|
|
config["unique_id"] = unique_id
|
|
data = json.dumps(config)
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/{unique_id}/config", data)
|
|
await hass.async_block_till_done()
|
|
assert not ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, unique_id)
|
|
|
|
|
|
async def help_test_entity_icon_and_entity_picture(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: ConfigType,
|
|
default_entity_picture: str | None = None,
|
|
) -> None:
|
|
"""Test entity picture and icon."""
|
|
await mqtt_mock_entry()
|
|
# Add device settings to config
|
|
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
|
|
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID)
|
|
|
|
ent_registry = er.async_get(hass)
|
|
|
|
# Discover an entity without entity icon or picture
|
|
unique_id = "veryunique1"
|
|
config["unique_id"] = unique_id
|
|
data = json.dumps(config)
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/{unique_id}/config", data)
|
|
await hass.async_block_till_done()
|
|
entity_id = ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, unique_id)
|
|
state = hass.states.get(entity_id)
|
|
assert entity_id is not None and state
|
|
assert state.attributes.get("icon") is None
|
|
assert state.attributes.get("entity_picture") == default_entity_picture
|
|
|
|
# Discover an entity with an entity picture set
|
|
unique_id = "veryunique2"
|
|
config["entity_picture"] = "https://example.com/mypicture.png"
|
|
config["unique_id"] = unique_id
|
|
data = json.dumps(config)
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/{unique_id}/config", data)
|
|
await hass.async_block_till_done()
|
|
entity_id = ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, unique_id)
|
|
state = hass.states.get(entity_id)
|
|
assert entity_id is not None and state
|
|
assert state.attributes.get("icon") is None
|
|
assert state.attributes.get("entity_picture") == "https://example.com/mypicture.png"
|
|
config.pop("entity_picture")
|
|
|
|
# Discover an entity with an entity icon set
|
|
unique_id = "veryunique3"
|
|
config["icon"] = "mdi:emoji-happy-outline"
|
|
config["unique_id"] = unique_id
|
|
data = json.dumps(config)
|
|
async_fire_mqtt_message(hass, f"homeassistant/{domain}/{unique_id}/config", data)
|
|
await hass.async_block_till_done()
|
|
entity_id = ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, unique_id)
|
|
state = hass.states.get(entity_id)
|
|
assert entity_id is not None and state
|
|
assert state.attributes.get("icon") == "mdi:emoji-happy-outline"
|
|
assert state.attributes.get("entity_picture") == default_entity_picture
|
|
|
|
|
|
async def help_test_publishing_with_custom_encoding(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
caplog: pytest.LogCaptureFixture,
|
|
domain: str,
|
|
config: ConfigType,
|
|
service: str,
|
|
topic: str,
|
|
parameters: dict[str, Any] | None,
|
|
payload: str,
|
|
template: str | None,
|
|
tpl_par: str = "value",
|
|
tpl_output: PublishPayloadType = None,
|
|
) -> None:
|
|
"""Test a service with publishing MQTT payload with different encoding."""
|
|
# prepare config for tests
|
|
test_config: dict[str, dict[str, Any]] = {
|
|
"test1": {"encoding": None, "cmd_tpl": False},
|
|
"test2": {"encoding": "utf-16", "cmd_tpl": False},
|
|
"test3": {"encoding": "", "cmd_tpl": False},
|
|
"test4": {"encoding": "invalid", "cmd_tpl": False},
|
|
"test5": {"encoding": "", "cmd_tpl": True},
|
|
}
|
|
setup_config = []
|
|
service_data = {}
|
|
for test_id, test_data in test_config.items():
|
|
test_config_setup: dict[str, Any] = copy.copy(config[mqtt.DOMAIN][domain])
|
|
test_config_setup.update(
|
|
{
|
|
topic: f"cmd/{test_id}",
|
|
"name": f"{test_id}",
|
|
}
|
|
)
|
|
if test_data["encoding"] is not None:
|
|
test_config_setup["encoding"] = test_data["encoding"]
|
|
if template and test_data["cmd_tpl"]:
|
|
test_config_setup[template] = (
|
|
f"{{{{ (('%.1f'|format({tpl_par}))[0] if is_number({tpl_par}) else {tpl_par}[0]) | ord | pack('b') }}}}"
|
|
)
|
|
setup_config.append(test_config_setup)
|
|
|
|
# setup service data
|
|
service_data[test_id] = {ATTR_ENTITY_ID: f"{domain}.{test_id}"}
|
|
if parameters:
|
|
service_data[test_id].update(parameters)
|
|
|
|
# setup test entities using discovery
|
|
mqtt_mock = await mqtt_mock_entry()
|
|
for item, component_config in enumerate(setup_config):
|
|
conf = json.dumps(component_config)
|
|
async_fire_mqtt_message(
|
|
hass, f"homeassistant/{domain}/component_{item}/config", conf
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
# 1) test with default encoding
|
|
await hass.services.async_call(
|
|
domain,
|
|
service,
|
|
service_data["test1"],
|
|
blocking=True,
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
mqtt_mock.async_publish.assert_any_call("cmd/test1", str(payload), 0, False)
|
|
mqtt_mock.async_publish.reset_mock()
|
|
|
|
# 2) test with utf-16 encoding
|
|
await hass.services.async_call(
|
|
domain,
|
|
service,
|
|
service_data["test2"],
|
|
blocking=True,
|
|
)
|
|
mqtt_mock.async_publish.assert_any_call(
|
|
"cmd/test2", str(payload).encode("utf-16"), 0, False
|
|
)
|
|
mqtt_mock.async_publish.reset_mock()
|
|
|
|
# 3) test with no encoding set should fail if payload is a string
|
|
await hass.services.async_call(
|
|
domain,
|
|
service,
|
|
service_data["test3"],
|
|
blocking=True,
|
|
)
|
|
assert (
|
|
f"Can't pass-through payload for publishing {payload} on cmd/test3 with no encoding set, need 'bytes'"
|
|
in caplog.text
|
|
)
|
|
|
|
# 4) test with invalid encoding set should fail
|
|
await hass.services.async_call(
|
|
domain,
|
|
service,
|
|
service_data["test4"],
|
|
blocking=True,
|
|
)
|
|
assert (
|
|
f"Can't encode payload for publishing {payload} on cmd/test4 with encoding invalid"
|
|
in caplog.text
|
|
)
|
|
|
|
# 5) test with command template and raw encoding if specified
|
|
if not template:
|
|
return
|
|
|
|
await hass.services.async_call(
|
|
domain,
|
|
service,
|
|
service_data["test5"],
|
|
blocking=True,
|
|
)
|
|
mqtt_mock.async_publish.assert_any_call(
|
|
"cmd/test5", tpl_output or str(payload)[0].encode("utf-8"), 0, False
|
|
)
|
|
mqtt_mock.async_publish.reset_mock()
|
|
|
|
|
|
async def help_test_reload_with_config(
|
|
hass: HomeAssistant,
|
|
caplog: pytest.LogCaptureFixture,
|
|
tmp_path: Path,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Test reloading with supplied config."""
|
|
new_yaml_config_file = tmp_path / "configuration.yaml"
|
|
new_yaml_config = yaml.dump(config)
|
|
new_yaml_config_file.write_text(new_yaml_config)
|
|
assert new_yaml_config_file.read_text() == new_yaml_config
|
|
|
|
with patch.object(module_hass_config, "YAML_CONFIG_FILE", new_yaml_config_file):
|
|
await hass.services.async_call(
|
|
"mqtt",
|
|
SERVICE_RELOAD,
|
|
{},
|
|
blocking=True,
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
|
|
async def help_test_reloadable(
|
|
hass: HomeAssistant,
|
|
mqtt_client_mock: MqttMockPahoClient,
|
|
domain: str,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Test reloading an MQTT platform."""
|
|
# Set up with empty config
|
|
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
|
|
# Create and test an old config of 2 entities based on the config supplied
|
|
old_config_1 = copy.deepcopy(config)
|
|
old_config_1["name"] = "test_old_1"
|
|
old_config_2 = copy.deepcopy(config)
|
|
old_config_2["name"] = "test_old_2"
|
|
|
|
old_config = {
|
|
mqtt.DOMAIN: {domain: [old_config_1, old_config_2]},
|
|
}
|
|
# Start the MQTT entry with the old config
|
|
entry = MockConfigEntry(domain=mqtt.DOMAIN, data={mqtt.CONF_BROKER: "test-broker"})
|
|
entry.add_to_hass(hass)
|
|
mqtt_client_mock.connect.return_value = 0
|
|
with patch("homeassistant.config.load_yaml_config_file", return_value=old_config):
|
|
await hass.config_entries.async_setup(entry.entry_id)
|
|
|
|
assert hass.states.get(f"{domain}.test_old_1")
|
|
assert hass.states.get(f"{domain}.test_old_2")
|
|
assert len(hass.states.async_all(domain)) == 2
|
|
|
|
# Create temporary fixture for configuration.yaml based on the supplied config and
|
|
# test a reload with this new config
|
|
new_config_1 = copy.deepcopy(config)
|
|
new_config_1["name"] = "test_new_1"
|
|
new_config_2 = copy.deepcopy(config)
|
|
new_config_2["name"] = "test_new_2"
|
|
new_config_extra = copy.deepcopy(config)
|
|
new_config_extra["name"] = "test_new_3"
|
|
|
|
new_config = {
|
|
mqtt.DOMAIN: {domain: [new_config_1, new_config_2, new_config_extra]},
|
|
}
|
|
with patch("homeassistant.config.load_yaml_config_file", return_value=new_config):
|
|
# Reload the mqtt entry with the new config
|
|
await hass.services.async_call(
|
|
"mqtt",
|
|
SERVICE_RELOAD,
|
|
{},
|
|
blocking=True,
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(hass.states.async_all(domain)) == 3
|
|
|
|
assert hass.states.get(f"{domain}.test_new_1")
|
|
assert hass.states.get(f"{domain}.test_new_2")
|
|
assert hass.states.get(f"{domain}.test_new_3")
|
|
|
|
|
|
async def help_test_unload_config_entry(hass: HomeAssistant) -> None:
|
|
"""Test unloading the MQTT config entry."""
|
|
mqtt_config_entry = hass.config_entries.async_entries(mqtt.DOMAIN)[0]
|
|
assert mqtt_config_entry.state is ConfigEntryState.LOADED
|
|
|
|
assert await hass.config_entries.async_unload(mqtt_config_entry.entry_id)
|
|
# work-a-round mypy bug https://github.com/python/mypy/issues/9005#issuecomment-1280985006
|
|
updated_config_entry = mqtt_config_entry
|
|
assert updated_config_entry.state is ConfigEntryState.NOT_LOADED
|
|
await hass.async_block_till_done()
|
|
|
|
|
|
async def help_test_unload_config_entry_with_platform(
|
|
hass: HomeAssistant,
|
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
domain: str,
|
|
config: dict[str, dict[str, Any]],
|
|
) -> None:
|
|
"""Test unloading the MQTT config entry with a specific platform domain."""
|
|
# prepare setup through configuration.yaml
|
|
config_setup: dict[str, dict[str, Any]] = copy.deepcopy(config)
|
|
config_setup[mqtt.DOMAIN][domain]["name"] = "config_setup"
|
|
config_name = config_setup
|
|
|
|
with patch("homeassistant.config.load_yaml_config_file", return_value=config_name):
|
|
await mqtt_mock_entry()
|
|
|
|
# prepare setup through discovery
|
|
discovery_setup = copy.deepcopy(config[mqtt.DOMAIN][domain])
|
|
discovery_setup["name"] = "discovery_setup"
|
|
async_fire_mqtt_message(
|
|
hass, f"homeassistant/{domain}/bla/config", json.dumps(discovery_setup)
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
# check if both entities were setup correctly
|
|
config_setup_entity = hass.states.get(f"{domain}.config_setup")
|
|
assert config_setup_entity
|
|
|
|
discovery_setup_entity = hass.states.get(f"{domain}.discovery_setup")
|
|
assert discovery_setup_entity
|
|
|
|
await help_test_unload_config_entry(hass)
|
|
|
|
async_fire_mqtt_message(
|
|
hass, f"homeassistant/{domain}/bla/config", json.dumps(discovery_setup)
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
# check if both entities were unloaded correctly
|
|
config_setup_entity = hass.states.get(f"{domain}.{config_name}")
|
|
assert config_setup_entity is None
|
|
|
|
discovery_setup_entity = hass.states.get(f"{domain}.discovery_setup")
|
|
assert discovery_setup_entity is None
|
|
|
|
|
|
async def help_test_discovery_setup(
|
|
hass: HomeAssistant, domain: str, discovery_data_payload: str, name: str
|
|
) -> None:
|
|
"""Test setting up an MQTT entity using discovery."""
|
|
async_fire_mqtt_message(
|
|
hass, f"homeassistant/{domain}/{name}/config", discovery_data_payload
|
|
)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(f"{domain}.{name}")
|
|
assert state and state.state is not None
|
|
|
|
|
|
async def help_test_skipped_async_ha_write_state(
|
|
hass: HomeAssistant, topic: str, payload1: str, payload2: str
|
|
) -> None:
|
|
"""Test entity.async_ha_write_state is only called on changes."""
|
|
with patch(
|
|
"homeassistant.components.mqtt.entity.MqttEntity.async_write_ha_state"
|
|
) as mock_async_ha_write_state:
|
|
assert len(mock_async_ha_write_state.mock_calls) == 0
|
|
async_fire_mqtt_message(hass, topic, payload1)
|
|
await hass.async_block_till_done()
|
|
assert len(mock_async_ha_write_state.mock_calls) == 1
|
|
|
|
async_fire_mqtt_message(hass, topic, payload1)
|
|
await hass.async_block_till_done()
|
|
assert len(mock_async_ha_write_state.mock_calls) == 1
|
|
|
|
async_fire_mqtt_message(hass, topic, payload2)
|
|
await hass.async_block_till_done()
|
|
assert len(mock_async_ha_write_state.mock_calls) == 2
|
|
|
|
async_fire_mqtt_message(hass, topic, payload2)
|
|
await hass.async_block_till_done()
|
|
assert len(mock_async_ha_write_state.mock_calls) == 2
|