core/tests/components/zha/test_websocket_api.py

1233 lines
38 KiB
Python

"""Test ZHA WebSocket API."""
from __future__ import annotations
from binascii import unhexlify
from copy import deepcopy
from typing import TYPE_CHECKING
from unittest.mock import ANY, AsyncMock, MagicMock, call, patch
from freezegun import freeze_time
import pytest
import voluptuous as vol
from zha.application.const import (
ATTR_CLUSTER_ID,
ATTR_CLUSTER_TYPE,
ATTR_ENDPOINT_ID,
ATTR_ENDPOINT_NAMES,
ATTR_IEEE,
ATTR_MANUFACTURER,
ATTR_NEIGHBORS,
ATTR_QUIRK_APPLIED,
ATTR_TYPE,
CLUSTER_TYPE_IN,
)
from zha.zigbee.cluster_handlers import ClusterBindEvent, ClusterConfigureReportingEvent
from zha.zigbee.device import ClusterHandlerConfigurationComplete
import zigpy.backups
from zigpy.const import SIG_EP_INPUT, SIG_EP_OUTPUT, SIG_EP_PROFILE, SIG_EP_TYPE
import zigpy.profiles.zha
import zigpy.types
from zigpy.types.named import EUI64
import zigpy.util
from zigpy.zcl.clusters import closures, general, security
from zigpy.zcl.clusters.general import Groups
import zigpy.zdo.types as zdo_types
from homeassistant.components.websocket_api import (
ERR_INVALID_FORMAT,
ERR_NOT_FOUND,
TYPE_RESULT,
)
from homeassistant.components.zha import DOMAIN
from homeassistant.components.zha.const import EZSP_OVERWRITE_EUI64
from homeassistant.components.zha.helpers import (
ZHADeviceProxy,
ZHAGatewayProxy,
get_zha_gateway,
get_zha_gateway_proxy,
)
from homeassistant.components.zha.websocket_api import (
ATTR_DURATION,
ATTR_INSTALL_CODE,
ATTR_QR_CODE,
ATTR_SOURCE_IEEE,
ATTR_TARGET_IEEE,
BINDINGS,
GROUP_ID,
GROUP_IDS,
GROUP_NAME,
ID,
SERVICE_PERMIT,
TYPE,
async_load_api,
)
from homeassistant.const import ATTR_MODEL, ATTR_NAME, Platform
from homeassistant.core import Context, HomeAssistant
from .conftest import FIXTURE_GRP_ID, FIXTURE_GRP_NAME
from .data import BASE_CUSTOM_CONFIGURATION, CONFIG_WITH_ALARM_OPTIONS
from tests.common import MockConfigEntry, MockUser
from tests.typing import MockHAClientWebSocket, WebSocketGenerator
IEEE_SWITCH_DEVICE = "01:2d:6f:00:0a:90:69:e7"
IEEE_GROUPABLE_DEVICE = "01:2d:6f:00:0a:90:69:e8"
if TYPE_CHECKING:
from zigpy.application import ControllerApplication
@pytest.fixture(autouse=True)
def required_platform_only():
"""Only set up the required and required base platforms to speed up tests."""
with patch(
"homeassistant.components.zha.PLATFORMS",
(
Platform.ALARM_CONTROL_PANEL,
Platform.SELECT,
Platform.SENSOR,
Platform.SWITCH,
),
):
yield
@pytest.fixture
async def zha_client(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
setup_zha,
zigpy_device_mock,
) -> MockHAClientWebSocket:
"""Get ZHA WebSocket client."""
await setup_zha()
gateway = get_zha_gateway(hass)
zigpy_device_switch = zigpy_device_mock(
{
1: {
SIG_EP_INPUT: [general.OnOff.cluster_id, general.Basic.cluster_id],
SIG_EP_OUTPUT: [],
SIG_EP_TYPE: zigpy.profiles.zha.DeviceType.ON_OFF_SWITCH,
SIG_EP_PROFILE: zigpy.profiles.zha.PROFILE_ID,
}
},
ieee=IEEE_SWITCH_DEVICE,
)
zigpy_device_groupable = zigpy_device_mock(
{
1: {
SIG_EP_INPUT: [
general.OnOff.cluster_id,
general.Basic.cluster_id,
general.Groups.cluster_id,
],
SIG_EP_OUTPUT: [],
SIG_EP_TYPE: zigpy.profiles.zha.DeviceType.ON_OFF_SWITCH,
SIG_EP_PROFILE: zigpy.profiles.zha.PROFILE_ID,
}
},
ieee=IEEE_GROUPABLE_DEVICE,
)
gateway.get_or_create_device(zigpy_device_switch)
await gateway.async_device_initialized(zigpy_device_switch)
await hass.async_block_till_done(wait_background_tasks=True)
gateway.get_or_create_device(zigpy_device_groupable)
await gateway.async_device_initialized(zigpy_device_groupable)
await hass.async_block_till_done(wait_background_tasks=True)
# load the ZHA API
async_load_api(hass)
return await hass_ws_client(hass)
async def test_device_clusters(hass: HomeAssistant, zha_client) -> None:
"""Test getting device cluster info."""
await zha_client.send_json(
{ID: 5, TYPE: "zha/devices/clusters", ATTR_IEEE: IEEE_SWITCH_DEVICE}
)
msg = await zha_client.receive_json()
assert len(msg["result"]) == 2
cluster_infos = sorted(msg["result"], key=lambda k: k[ID])
cluster_info = cluster_infos[0]
assert cluster_info[TYPE] == CLUSTER_TYPE_IN
assert cluster_info[ID] == 0
assert cluster_info[ATTR_NAME] == "Basic"
cluster_info = cluster_infos[1]
assert cluster_info[TYPE] == CLUSTER_TYPE_IN
assert cluster_info[ID] == 6
assert cluster_info[ATTR_NAME] == "OnOff"
async def test_device_cluster_attributes(zha_client) -> None:
"""Test getting device cluster attributes."""
await zha_client.send_json(
{
ID: 5,
TYPE: "zha/devices/clusters/attributes",
ATTR_ENDPOINT_ID: 1,
ATTR_IEEE: IEEE_SWITCH_DEVICE,
ATTR_CLUSTER_ID: 6,
ATTR_CLUSTER_TYPE: CLUSTER_TYPE_IN,
}
)
msg = await zha_client.receive_json()
attributes = msg["result"]
assert len(attributes) == 7
for attribute in attributes:
assert attribute[ID] is not None
assert attribute[ATTR_NAME] is not None
async def test_device_cluster_commands(zha_client) -> None:
"""Test getting device cluster commands."""
await zha_client.send_json(
{
ID: 5,
TYPE: "zha/devices/clusters/commands",
ATTR_ENDPOINT_ID: 1,
ATTR_IEEE: IEEE_SWITCH_DEVICE,
ATTR_CLUSTER_ID: 6,
ATTR_CLUSTER_TYPE: CLUSTER_TYPE_IN,
}
)
msg = await zha_client.receive_json()
commands = msg["result"]
assert len(commands) == 6
for command in commands:
assert command[ID] is not None
assert command[ATTR_NAME] is not None
assert command[TYPE] is not None
@freeze_time("2023-09-23 20:16:00+00:00")
async def test_list_devices(zha_client) -> None:
"""Test getting ZHA devices."""
await zha_client.send_json({ID: 5, TYPE: "zha/devices"})
msg = await zha_client.receive_json()
devices = msg["result"]
assert len(devices) == 3 # the coordinator is included as well
msg_id = 100
for device in devices:
msg_id += 1
assert device[ATTR_IEEE] is not None
assert device[ATTR_MANUFACTURER] is not None
assert device[ATTR_MODEL] is not None
assert device[ATTR_NAME] is not None
assert device[ATTR_QUIRK_APPLIED] is not None
assert device["entities"] is not None
assert device[ATTR_NEIGHBORS] is not None
assert device[ATTR_ENDPOINT_NAMES] is not None
for entity_reference in device["entities"]:
assert entity_reference[ATTR_NAME] is not None
assert entity_reference["entity_id"] is not None
await zha_client.send_json(
{ID: msg_id, TYPE: "zha/device", ATTR_IEEE: device[ATTR_IEEE]}
)
msg = await zha_client.receive_json()
device2 = msg["result"]
assert device == device2
async def test_get_zha_config(zha_client) -> None:
"""Test getting ZHA custom configuration."""
await zha_client.send_json({ID: 5, TYPE: "zha/configuration"})
msg = await zha_client.receive_json()
configuration = msg["result"]
assert configuration == BASE_CUSTOM_CONFIGURATION
async def test_get_zha_config_with_alarm(
hass: HomeAssistant, zha_client, zigpy_device_mock
) -> None:
"""Test getting ZHA custom configuration."""
gateway = get_zha_gateway(hass)
gateway_proxy: ZHAGatewayProxy = get_zha_gateway_proxy(hass)
zigpy_device_ias = zigpy_device_mock(
{
1: {
SIG_EP_INPUT: [security.IasAce.cluster_id],
SIG_EP_OUTPUT: [],
SIG_EP_TYPE: zigpy.profiles.zha.DeviceType.IAS_ANCILLARY_CONTROL,
SIG_EP_PROFILE: zigpy.profiles.zha.PROFILE_ID,
}
},
)
gateway.get_or_create_device(zigpy_device_ias)
await gateway.async_device_initialized(zigpy_device_ias)
await hass.async_block_till_done(wait_background_tasks=True)
zha_device_proxy: ZHADeviceProxy = gateway_proxy.get_device_proxy(
zigpy_device_ias.ieee
)
await zha_client.send_json({ID: 5, TYPE: "zha/configuration"})
msg = await zha_client.receive_json()
configuration = msg["result"]
assert configuration == CONFIG_WITH_ALARM_OPTIONS
# test that the alarm options are not in the config when we remove the device
zha_device_proxy.gateway_proxy.gateway.device_removed(zha_device_proxy.device)
await hass.async_block_till_done()
await zha_client.send_json({ID: 6, TYPE: "zha/configuration"})
msg = await zha_client.receive_json()
configuration = msg["result"]
assert configuration == BASE_CUSTOM_CONFIGURATION
async def test_update_zha_config(
hass: HomeAssistant,
config_entry: MockConfigEntry,
zha_client,
app_controller: ControllerApplication,
) -> None:
"""Test updating ZHA custom configuration."""
configuration: dict = deepcopy(BASE_CUSTOM_CONFIGURATION)
configuration["data"]["zha_options"]["default_light_transition"] = 10
with patch(
"bellows.zigbee.application.ControllerApplication.new",
return_value=app_controller,
):
await zha_client.send_json(
{ID: 5, TYPE: "zha/configuration/update", "data": configuration["data"]}
)
msg = await zha_client.receive_json()
assert msg["success"]
await zha_client.send_json({ID: 6, TYPE: "zha/configuration"})
msg = await zha_client.receive_json()
test_configuration = msg["result"]
assert test_configuration == configuration
await hass.config_entries.async_unload(config_entry.entry_id)
async def test_device_not_found(zha_client) -> None:
"""Test not found response from get device API."""
await zha_client.send_json(
{ID: 6, TYPE: "zha/device", ATTR_IEEE: "28:6d:97:00:01:04:11:8c"}
)
msg = await zha_client.receive_json()
assert msg["id"] == 6
assert msg["type"] == TYPE_RESULT
assert not msg["success"]
assert msg["error"]["code"] == ERR_NOT_FOUND
async def test_list_groups(zha_client) -> None:
"""Test getting ZHA zigbee groups."""
await zha_client.send_json({ID: 7, TYPE: "zha/groups"})
msg = await zha_client.receive_json()
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
groups = msg["result"]
assert len(groups) == 1
for group in groups:
assert group["group_id"] == FIXTURE_GRP_ID
assert group["name"] == FIXTURE_GRP_NAME
assert group["members"] == []
async def test_get_group(zha_client) -> None:
"""Test getting a specific ZHA zigbee group."""
await zha_client.send_json({ID: 8, TYPE: "zha/group", GROUP_ID: FIXTURE_GRP_ID})
msg = await zha_client.receive_json()
assert msg["id"] == 8
assert msg["type"] == TYPE_RESULT
group = msg["result"]
assert group is not None
assert group["group_id"] == FIXTURE_GRP_ID
assert group["name"] == FIXTURE_GRP_NAME
assert group["members"] == []
async def test_get_group_not_found(zha_client) -> None:
"""Test not found response from get group API."""
await zha_client.send_json({ID: 9, TYPE: "zha/group", GROUP_ID: 1_234_567})
msg = await zha_client.receive_json()
assert msg["id"] == 9
assert msg["type"] == TYPE_RESULT
assert not msg["success"]
assert msg["error"]["code"] == ERR_NOT_FOUND
async def test_list_groupable_devices(
hass: HomeAssistant, zha_client, zigpy_app_controller
) -> None:
"""Test getting ZHA devices that have a group cluster."""
# Ensure the coordinator doesn't have a group cluster
coordinator = zigpy_app_controller.get_device(nwk=0x0000)
del coordinator.endpoints[1].in_clusters[Groups.cluster_id]
await zha_client.send_json({ID: 10, TYPE: "zha/devices/groupable"})
msg = await zha_client.receive_json()
assert msg["id"] == 10
assert msg["type"] == TYPE_RESULT
device_endpoints = msg["result"]
assert len(device_endpoints) == 1
for endpoint in device_endpoints:
assert endpoint["device"][ATTR_IEEE] == "01:2d:6f:00:0a:90:69:e8"
assert endpoint["device"][ATTR_MANUFACTURER] is not None
assert endpoint["device"][ATTR_MODEL] is not None
assert endpoint["device"][ATTR_NAME] is not None
assert endpoint["device"][ATTR_QUIRK_APPLIED] is not None
assert endpoint["device"]["entities"] is not None
assert endpoint["endpoint_id"] is not None
assert endpoint["entities"] is not None
for entity_reference in endpoint["device"]["entities"]:
assert entity_reference[ATTR_NAME] is not None
assert entity_reference["entity_id"] is not None
for entity_reference in endpoint["entities"]:
assert entity_reference["original_name"] is not None
# Make sure there are no groupable devices when the device is unavailable
# Make device unavailable
get_zha_gateway_proxy(hass).device_proxies[
EUI64.convert(IEEE_GROUPABLE_DEVICE)
].device.available = False
await hass.async_block_till_done(wait_background_tasks=True)
await zha_client.send_json({ID: 11, TYPE: "zha/devices/groupable"})
msg = await zha_client.receive_json()
assert msg["id"] == 11
assert msg["type"] == TYPE_RESULT
device_endpoints = msg["result"]
assert len(device_endpoints) == 0
async def test_add_group(hass: HomeAssistant, zha_client) -> None:
"""Test adding and getting a new ZHA zigbee group."""
await zha_client.send_json(
{
ID: 12,
TYPE: "zha/group/add",
GROUP_NAME: "new_group",
"members": [{"ieee": IEEE_GROUPABLE_DEVICE, "endpoint_id": 1}],
}
)
msg = await zha_client.receive_json()
assert msg["id"] == 12
assert msg["type"] == TYPE_RESULT
added_group = msg["result"]
groupable_device = get_zha_gateway_proxy(hass).device_proxies[
EUI64.convert(IEEE_GROUPABLE_DEVICE)
]
assert added_group["name"] == "new_group"
assert len(added_group["members"]) == 1
assert added_group["members"][0]["device"]["ieee"] == IEEE_GROUPABLE_DEVICE
assert (
added_group["members"][0]["device"]["device_reg_id"]
== groupable_device.device_id
)
await zha_client.send_json({ID: 13, TYPE: "zha/groups"})
msg = await zha_client.receive_json()
assert msg["id"] == 13
assert msg["type"] == TYPE_RESULT
groups = msg["result"]
assert len(groups) == 2
for group in groups:
assert group["name"] == FIXTURE_GRP_NAME or group["name"] == "new_group"
async def test_remove_group(zha_client) -> None:
"""Test removing a new ZHA zigbee group."""
await zha_client.send_json({ID: 14, TYPE: "zha/groups"})
msg = await zha_client.receive_json()
assert msg["id"] == 14
assert msg["type"] == TYPE_RESULT
groups = msg["result"]
assert len(groups) == 1
await zha_client.send_json(
{ID: 15, TYPE: "zha/group/remove", GROUP_IDS: [FIXTURE_GRP_ID]}
)
msg = await zha_client.receive_json()
assert msg["id"] == 15
assert msg["type"] == TYPE_RESULT
groups_remaining = msg["result"]
assert len(groups_remaining) == 0
await zha_client.send_json({ID: 16, TYPE: "zha/groups"})
msg = await zha_client.receive_json()
assert msg["id"] == 16
assert msg["type"] == TYPE_RESULT
groups = msg["result"]
assert len(groups) == 0
async def test_add_group_member(hass: HomeAssistant, zha_client) -> None:
"""Test adding a ZHA zigbee group member."""
await zha_client.send_json(
{
ID: 12,
TYPE: "zha/group/add",
GROUP_NAME: "new_group",
}
)
msg = await zha_client.receive_json()
assert msg["id"] == 12
assert msg["type"] == TYPE_RESULT
added_group = msg["result"]
assert len(added_group["members"]) == 0
await zha_client.send_json(
{
ID: 13,
TYPE: "zha/group/members/add",
GROUP_ID: added_group["group_id"],
"members": [{"ieee": IEEE_GROUPABLE_DEVICE, "endpoint_id": 1}],
}
)
msg = await zha_client.receive_json()
assert msg["id"] == 13
assert msg["type"] == TYPE_RESULT
added_group = msg["result"]
assert len(added_group["members"]) == 1
assert added_group["name"] == "new_group"
assert added_group["members"][0]["device"]["ieee"] == IEEE_GROUPABLE_DEVICE
async def test_remove_group_member(hass: HomeAssistant, zha_client) -> None:
"""Test removing a ZHA zigbee group member."""
await zha_client.send_json(
{
ID: 12,
TYPE: "zha/group/add",
GROUP_NAME: "new_group",
"members": [{"ieee": IEEE_GROUPABLE_DEVICE, "endpoint_id": 1}],
}
)
msg = await zha_client.receive_json()
assert msg["id"] == 12
assert msg["type"] == TYPE_RESULT
added_group = msg["result"]
assert added_group["name"] == "new_group"
assert len(added_group["members"]) == 1
assert added_group["members"][0]["device"]["ieee"] == IEEE_GROUPABLE_DEVICE
await zha_client.send_json(
{
ID: 13,
TYPE: "zha/group/members/remove",
GROUP_ID: added_group["group_id"],
"members": [{"ieee": IEEE_GROUPABLE_DEVICE, "endpoint_id": 1}],
}
)
msg = await zha_client.receive_json()
assert msg["id"] == 13
assert msg["type"] == TYPE_RESULT
added_group = msg["result"]
assert len(added_group["members"]) == 0
@pytest.fixture
async def app_controller(
hass: HomeAssistant, setup_zha, zigpy_app_controller: ControllerApplication
) -> ControllerApplication:
"""Fixture for zigpy Application Controller."""
await setup_zha()
zigpy_app_controller.permit.reset_mock()
return zigpy_app_controller
@pytest.mark.parametrize(
("params", "duration", "node"),
[
({}, 60, None),
({ATTR_DURATION: 30}, 30, None),
(
{ATTR_DURATION: 33, ATTR_IEEE: "aa:bb:cc:dd:aa:bb:cc:dd"},
33,
zigpy.types.EUI64.convert("aa:bb:cc:dd:aa:bb:cc:dd"),
),
(
{ATTR_IEEE: "aa:bb:cc:dd:aa:bb:cc:d1"},
60,
zigpy.types.EUI64.convert("aa:bb:cc:dd:aa:bb:cc:d1"),
),
],
)
async def test_permit_ha12(
hass: HomeAssistant,
app_controller: ControllerApplication,
hass_admin_user: MockUser,
params,
duration,
node,
) -> None:
"""Test permit service."""
await hass.services.async_call(
DOMAIN, SERVICE_PERMIT, params, True, Context(user_id=hass_admin_user.id)
)
assert app_controller.permit.await_count == 1
assert app_controller.permit.await_args[1]["time_s"] == duration
assert app_controller.permit.await_args[1]["node"] == node
assert app_controller.permit_with_link_key.call_count == 0
IC_TEST_PARAMS = (
(
{
ATTR_SOURCE_IEEE: IEEE_SWITCH_DEVICE,
ATTR_INSTALL_CODE: "5279-7BF4-A508-4DAA-8E17-12B6-1741-CA02-4051",
},
zigpy.types.EUI64.convert(IEEE_SWITCH_DEVICE),
zigpy.util.convert_install_code(
unhexlify("52797BF4A5084DAA8E1712B61741CA024051")
),
),
(
{
ATTR_SOURCE_IEEE: IEEE_SWITCH_DEVICE,
ATTR_INSTALL_CODE: "52797BF4A5084DAA8E1712B61741CA024051",
},
zigpy.types.EUI64.convert(IEEE_SWITCH_DEVICE),
zigpy.util.convert_install_code(
unhexlify("52797BF4A5084DAA8E1712B61741CA024051")
),
),
)
@pytest.mark.parametrize(("params", "src_ieee", "code"), IC_TEST_PARAMS)
async def test_permit_with_install_code(
hass: HomeAssistant,
app_controller: ControllerApplication,
hass_admin_user: MockUser,
params,
src_ieee,
code,
) -> None:
"""Test permit service with install code."""
await hass.services.async_call(
DOMAIN, SERVICE_PERMIT, params, True, Context(user_id=hass_admin_user.id)
)
assert app_controller.permit.await_count == 0
assert app_controller.permit_with_link_key.call_count == 1
assert app_controller.permit_with_link_key.await_args[1]["time_s"] == 60
assert app_controller.permit_with_link_key.await_args[1]["node"] == src_ieee
assert app_controller.permit_with_link_key.await_args[1]["link_key"] == code
IC_FAIL_PARAMS = (
{
# wrong install code
ATTR_SOURCE_IEEE: IEEE_SWITCH_DEVICE,
ATTR_INSTALL_CODE: "5279-7BF4-A508-4DAA-8E17-12B6-1741-CA02-4052",
},
# incorrect service params
{ATTR_INSTALL_CODE: "5279-7BF4-A508-4DAA-8E17-12B6-1741-CA02-4051"},
{ATTR_SOURCE_IEEE: IEEE_SWITCH_DEVICE},
{
# incorrect service params
ATTR_INSTALL_CODE: "5279-7BF4-A508-4DAA-8E17-12B6-1741-CA02-4051",
ATTR_QR_CODE: "Z:000D6FFFFED4163B$I:52797BF4A5084DAA8E1712B61741CA024051",
},
{
# incorrect service params
ATTR_SOURCE_IEEE: IEEE_SWITCH_DEVICE,
ATTR_QR_CODE: "Z:000D6FFFFED4163B$I:52797BF4A5084DAA8E1712B61741CA024051",
},
{
# good regex match, but bad code
ATTR_QR_CODE: "Z:000D6FFFFED4163B$I:52797BF4A5084DAA8E1712B61741CA024052"
},
{
# good aqara regex match, but bad code
ATTR_QR_CODE: (
"G$M:751$S:357S00001579$D:000000000F350FFD%Z$A:04CF8CDF"
"3C3C3C3C$I:52797BF4A5084DAA8E1712B61741CA024052"
)
},
# good consciot regex match, but bad code
{ATTR_QR_CODE: "000D6FFFFED4163B|52797BF4A5084DAA8E1712B61741CA024052"},
)
@pytest.mark.parametrize("params", IC_FAIL_PARAMS)
async def test_permit_with_install_code_fail(
hass: HomeAssistant,
app_controller: ControllerApplication,
hass_admin_user: MockUser,
params,
) -> None:
"""Test permit service with install code."""
with pytest.raises(vol.Invalid):
await hass.services.async_call(
DOMAIN, SERVICE_PERMIT, params, True, Context(user_id=hass_admin_user.id)
)
assert app_controller.permit.await_count == 0
assert app_controller.permit_with_link_key.call_count == 0
IC_QR_CODE_TEST_PARAMS = (
(
{ATTR_QR_CODE: "000D6FFFFED4163B|52797BF4A5084DAA8E1712B61741CA024051"},
zigpy.types.EUI64.convert("00:0D:6F:FF:FE:D4:16:3B"),
zigpy.util.convert_install_code(
unhexlify("52797BF4A5084DAA8E1712B61741CA024051")
),
),
(
{ATTR_QR_CODE: "Z:000D6FFFFED4163B$I:52797BF4A5084DAA8E1712B61741CA024051"},
zigpy.types.EUI64.convert("00:0D:6F:FF:FE:D4:16:3B"),
zigpy.util.convert_install_code(
unhexlify("52797BF4A5084DAA8E1712B61741CA024051")
),
),
(
{
ATTR_QR_CODE: (
"G$M:751$S:357S00001579$D:000000000F350FFD%Z$A:04CF8CDF"
"3C3C3C3C$I:52797BF4A5084DAA8E1712B61741CA024051"
)
},
zigpy.types.EUI64.convert("04:CF:8C:DF:3C:3C:3C:3C"),
zigpy.util.convert_install_code(
unhexlify("52797BF4A5084DAA8E1712B61741CA024051")
),
),
(
{
ATTR_QR_CODE: (
"RB01SG"
"0D836591B3CC0010000000000000000000"
"000D6F0019107BB1"
"DLK"
"E4636CB6C41617C3E08F7325FFBFE1F9"
)
},
zigpy.types.EUI64.convert("00:0D:6F:00:19:10:7B:B1"),
zigpy.types.KeyData.convert("E4:63:6C:B6:C4:16:17:C3:E0:8F:73:25:FF:BF:E1:F9"),
),
)
@pytest.mark.parametrize(("params", "src_ieee", "code"), IC_QR_CODE_TEST_PARAMS)
async def test_permit_with_qr_code(
hass: HomeAssistant,
app_controller: ControllerApplication,
hass_admin_user: MockUser,
params,
src_ieee,
code,
) -> None:
"""Test permit service with install code from qr code."""
await hass.services.async_call(
DOMAIN, SERVICE_PERMIT, params, True, Context(user_id=hass_admin_user.id)
)
assert app_controller.permit.await_count == 0
assert app_controller.permit_with_link_key.call_count == 1
assert app_controller.permit_with_link_key.await_args[1]["time_s"] == 60
assert app_controller.permit_with_link_key.await_args[1]["node"] == src_ieee
assert app_controller.permit_with_link_key.await_args[1]["link_key"] == code
@pytest.mark.parametrize(("params", "src_ieee", "code"), IC_QR_CODE_TEST_PARAMS)
async def test_ws_permit_with_qr_code(
app_controller: ControllerApplication, zha_client, params, src_ieee, code
) -> None:
"""Test permit service with install code from qr code."""
await zha_client.send_json(
{ID: 14, TYPE: f"{DOMAIN}/devices/{SERVICE_PERMIT}", **params}
)
msg_type = None
while msg_type != TYPE_RESULT:
# There will be logging events coming over the websocket
# as well so we want to ignore those
msg = await zha_client.receive_json()
msg_type = msg["type"]
assert msg["id"] == 14
assert msg["type"] == TYPE_RESULT
assert msg["success"]
assert app_controller.permit.await_count == 0
assert app_controller.permit_with_link_key.call_count == 1
assert app_controller.permit_with_link_key.await_args[1]["time_s"] == 60
assert app_controller.permit_with_link_key.await_args[1]["node"] == src_ieee
assert app_controller.permit_with_link_key.await_args[1]["link_key"] == code
@pytest.mark.parametrize("params", IC_FAIL_PARAMS)
async def test_ws_permit_with_install_code_fail(
app_controller: ControllerApplication, zha_client, params
) -> None:
"""Test permit ws service with install code."""
await zha_client.send_json(
{ID: 14, TYPE: f"{DOMAIN}/devices/{SERVICE_PERMIT}", **params}
)
msg = await zha_client.receive_json()
assert msg["id"] == 14
assert msg["type"] == TYPE_RESULT
assert msg["success"] is False
assert app_controller.permit.await_count == 0
assert app_controller.permit_with_link_key.call_count == 0
@pytest.mark.parametrize(
("params", "duration", "node"),
[
({}, 60, None),
({ATTR_DURATION: 30}, 30, None),
(
{ATTR_DURATION: 33, ATTR_IEEE: "aa:bb:cc:dd:aa:bb:cc:dd"},
33,
zigpy.types.EUI64.convert("aa:bb:cc:dd:aa:bb:cc:dd"),
),
(
{ATTR_IEEE: "aa:bb:cc:dd:aa:bb:cc:d1"},
60,
zigpy.types.EUI64.convert("aa:bb:cc:dd:aa:bb:cc:d1"),
),
],
)
async def test_ws_permit_ha12(
app_controller: ControllerApplication, zha_client, params, duration, node
) -> None:
"""Test permit ws service."""
await zha_client.send_json(
{ID: 14, TYPE: f"{DOMAIN}/devices/{SERVICE_PERMIT}", **params}
)
msg_type = None
while msg_type != TYPE_RESULT:
# There will be logging events coming over the websocket
# as well so we want to ignore those
msg = await zha_client.receive_json()
msg_type = msg["type"]
assert msg["id"] == 14
assert msg["type"] == TYPE_RESULT
assert msg["success"]
assert app_controller.permit.await_count == 1
assert app_controller.permit.await_args[1]["time_s"] == duration
assert app_controller.permit.await_args[1]["node"] == node
assert app_controller.permit_with_link_key.call_count == 0
async def test_get_network_settings(
app_controller: ControllerApplication, zha_client
) -> None:
"""Test current network settings are returned."""
await app_controller.backups.create_backup()
await zha_client.send_json({ID: 6, TYPE: f"{DOMAIN}/network/settings"})
msg = await zha_client.receive_json()
assert msg["id"] == 6
assert msg["type"] == TYPE_RESULT
assert msg["success"]
assert "radio_type" in msg["result"]
assert "network_info" in msg["result"]["settings"]
assert "path" in msg["result"]["device"]
async def test_list_network_backups(
app_controller: ControllerApplication, zha_client
) -> None:
"""Test backups are serialized."""
await app_controller.backups.create_backup()
await zha_client.send_json({ID: 6, TYPE: f"{DOMAIN}/network/backups/list"})
msg = await zha_client.receive_json()
assert msg["id"] == 6
assert msg["type"] == TYPE_RESULT
assert msg["success"]
assert "network_info" in msg["result"][0]
async def test_create_network_backup(
app_controller: ControllerApplication, zha_client
) -> None:
"""Test creating backup."""
assert not app_controller.backups.backups
await zha_client.send_json({ID: 6, TYPE: f"{DOMAIN}/network/backups/create"})
msg = await zha_client.receive_json()
assert len(app_controller.backups.backups) == 1
assert msg["id"] == 6
assert msg["type"] == TYPE_RESULT
assert msg["success"]
assert "backup" in msg["result"] and "is_complete" in msg["result"]
async def test_restore_network_backup_success(
app_controller: ControllerApplication, zha_client
) -> None:
"""Test successfully restoring a backup."""
backup = zigpy.backups.NetworkBackup()
with patch.object(app_controller.backups, "restore_backup", new=AsyncMock()) as p:
await zha_client.send_json(
{
ID: 6,
TYPE: f"{DOMAIN}/network/backups/restore",
"backup": backup.as_dict(),
}
)
msg = await zha_client.receive_json()
p.assert_called_once_with(backup)
assert "ezsp" not in backup.network_info.stack_specific
assert msg["id"] == 6
assert msg["type"] == TYPE_RESULT
assert msg["success"]
async def test_restore_network_backup_force_write_eui64(
app_controller: ControllerApplication, zha_client
) -> None:
"""Test successfully restoring a backup."""
backup = zigpy.backups.NetworkBackup()
with patch.object(app_controller.backups, "restore_backup", new=AsyncMock()) as p:
await zha_client.send_json(
{
ID: 6,
TYPE: f"{DOMAIN}/network/backups/restore",
"backup": backup.as_dict(),
"ezsp_force_write_eui64": True,
}
)
msg = await zha_client.receive_json()
# EUI64 will be overwritten
p.assert_called_once_with(
backup.replace(
network_info=backup.network_info.replace(
stack_specific={"ezsp": {EZSP_OVERWRITE_EUI64: True}}
)
)
)
assert msg["id"] == 6
assert msg["type"] == TYPE_RESULT
assert msg["success"]
@patch("zigpy.backups.NetworkBackup.from_dict", new=lambda v: v)
async def test_restore_network_backup_failure(
app_controller: ControllerApplication, zha_client
) -> None:
"""Test successfully restoring a backup."""
with patch.object(
app_controller.backups,
"restore_backup",
new=AsyncMock(side_effect=ValueError("Restore failed")),
) as p:
await zha_client.send_json(
{ID: 6, TYPE: f"{DOMAIN}/network/backups/restore", "backup": "a backup"}
)
msg = await zha_client.receive_json()
p.assert_called_once_with("a backup")
assert msg["id"] == 6
assert msg["type"] == TYPE_RESULT
assert not msg["success"]
assert msg["error"]["code"] == ERR_INVALID_FORMAT
@pytest.mark.parametrize("new_channel", ["auto", 15])
async def test_websocket_change_channel(
new_channel: int | str, app_controller: ControllerApplication, zha_client
) -> None:
"""Test websocket API to migrate the network to a new channel."""
with patch(
"homeassistant.components.zha.websocket_api.async_change_channel",
autospec=True,
) as change_channel_mock:
await zha_client.send_json(
{
ID: 6,
TYPE: f"{DOMAIN}/network/change_channel",
"new_channel": new_channel,
}
)
msg = await zha_client.receive_json()
assert msg["id"] == 6
assert msg["type"] == TYPE_RESULT
assert msg["success"]
change_channel_mock.assert_has_calls([call(ANY, new_channel)])
@pytest.mark.parametrize(
"operation",
[("bind", zdo_types.ZDOCmd.Bind_req), ("unbind", zdo_types.ZDOCmd.Unbind_req)],
)
async def test_websocket_bind_unbind_devices(
operation: tuple[str, zdo_types.ZDOCmd],
app_controller: ControllerApplication,
zha_client,
) -> None:
"""Test websocket API for binding and unbinding devices to devices."""
command_type, req = operation
with patch(
"homeassistant.components.zha.websocket_api.async_binding_operation",
autospec=True,
) as binding_operation_mock:
await zha_client.send_json(
{
ID: 27,
TYPE: f"zha/devices/{command_type}",
ATTR_SOURCE_IEEE: IEEE_SWITCH_DEVICE,
ATTR_TARGET_IEEE: IEEE_GROUPABLE_DEVICE,
}
)
msg = await zha_client.receive_json()
assert msg["id"] == 27
assert msg["type"] == TYPE_RESULT
assert msg["success"]
assert binding_operation_mock.mock_calls == [
call(
ANY,
EUI64.convert(IEEE_SWITCH_DEVICE),
EUI64.convert(IEEE_GROUPABLE_DEVICE),
req,
)
]
@pytest.mark.parametrize("command_type", ["bind", "unbind"])
async def test_websocket_bind_unbind_group(
command_type: str,
hass: HomeAssistant,
app_controller: ControllerApplication,
zha_client,
) -> None:
"""Test websocket API for binding and unbinding devices to groups."""
test_group_id = 0x0001
gateway_mock = MagicMock()
with patch(
"homeassistant.components.zha.websocket_api.get_zha_gateway",
return_value=gateway_mock,
):
device_mock = MagicMock()
bind_mock = AsyncMock()
unbind_mock = AsyncMock()
device_mock.async_bind_to_group = bind_mock
device_mock.async_unbind_from_group = unbind_mock
gateway_mock.get_device = MagicMock()
gateway_mock.get_device.return_value = device_mock
await zha_client.send_json(
{
ID: 27,
TYPE: f"zha/groups/{command_type}",
ATTR_SOURCE_IEEE: IEEE_SWITCH_DEVICE,
GROUP_ID: test_group_id,
BINDINGS: [
{
ATTR_ENDPOINT_ID: 1,
ID: 6,
ATTR_NAME: "OnOff",
ATTR_TYPE: "out",
},
],
}
)
msg = await zha_client.receive_json()
assert msg["id"] == 27
assert msg["type"] == TYPE_RESULT
assert msg["success"]
if command_type == "bind":
assert bind_mock.mock_calls == [call(test_group_id, ANY)]
elif command_type == "unbind":
assert unbind_mock.mock_calls == [call(test_group_id, ANY)]
async def test_websocket_reconfigure(
hass: HomeAssistant, zha_client: MockHAClientWebSocket, zigpy_device_mock
) -> None:
"""Test websocket API to reconfigure a device."""
gateway = get_zha_gateway(hass)
zigpy_device = zigpy_device_mock(
{
1: {
SIG_EP_INPUT: [closures.WindowCovering.cluster_id],
SIG_EP_OUTPUT: [],
SIG_EP_TYPE: zigpy.profiles.zha.DeviceType.SHADE,
SIG_EP_PROFILE: zigpy.profiles.zha.PROFILE_ID,
}
},
)
zha_device = gateway.get_or_create_device(zigpy_device)
await gateway.async_device_initialized(zigpy_device)
await hass.async_block_till_done(wait_background_tasks=True)
zha_device_proxy = get_zha_gateway_proxy(hass).get_device_proxy(zha_device.ieee)
def mock_reconfigure() -> None:
zha_device_proxy.handle_zha_channel_configure_reporting(
ClusterConfigureReportingEvent(
cluster_name="Window Covering",
cluster_id=258,
attributes={
"current_position_lift_percentage": {
"min": 0,
"max": 900,
"id": "current_position_lift_percentage",
"name": "current_position_lift_percentage",
"change": 1,
"status": "SUCCESS",
},
"current_position_tilt_percentage": {
"min": 0,
"max": 900,
"id": "current_position_tilt_percentage",
"name": "current_position_tilt_percentage",
"change": 1,
"status": "SUCCESS",
},
},
cluster_handler_unique_id="28:2c:02:bf:ff:ea:05:68:1:0x0102",
event_type="zha_channel_message",
event="zha_channel_configure_reporting",
)
)
zha_device_proxy.handle_zha_channel_bind(
ClusterBindEvent(
cluster_name="Window Covering",
cluster_id=1,
success=True,
cluster_handler_unique_id="28:2c:02:bf:ff:ea:05:68:1:0x0012",
event_type="zha_channel_message",
event="zha_channel_bind",
)
)
zha_device_proxy.handle_zha_channel_cfg_done(
ClusterHandlerConfigurationComplete(
device_ieee="28:2c:02:bf:ff:ea:05:68",
unique_id="28:2c:02:bf:ff:ea:05:68",
event_type="zha_channel_message",
event="zha_channel_cfg_done",
)
)
with patch.object(
zha_device_proxy.device, "async_configure", side_effect=mock_reconfigure
):
await zha_client.send_json(
{
ID: 6,
TYPE: "zha/devices/reconfigure",
ATTR_IEEE: str(zha_device_proxy.device.ieee),
}
)
messages = []
while len(messages) != 3:
msg = await zha_client.receive_json()
if msg[ID] == 6:
messages.append(msg)
# Ensure the frontend receives progress events
assert {m["event"]["type"] for m in messages} == {
"zha_channel_configure_reporting",
"zha_channel_bind",
"zha_channel_cfg_done",
}