mirror of https://github.com/home-assistant/core
407 lines
14 KiB
Python
407 lines
14 KiB
Python
"""Code to support homekit_controller tests."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Callable
|
|
from dataclasses import dataclass
|
|
from datetime import timedelta
|
|
import logging
|
|
import os
|
|
from typing import Any, Final
|
|
from unittest import mock
|
|
|
|
from aiohomekit.controller.abstract import AbstractDescription, AbstractPairing
|
|
from aiohomekit.hkjson import loads as hkloads
|
|
from aiohomekit.model import Accessories, AccessoriesState, Accessory
|
|
from aiohomekit.model.services import Service
|
|
from aiohomekit.testing import FakeController, FakePairing
|
|
|
|
from homeassistant.components.device_automation import DeviceAutomationType
|
|
from homeassistant.components.homekit_controller.const import (
|
|
CONTROLLER,
|
|
DEBOUNCE_COOLDOWN,
|
|
DOMAIN,
|
|
HOMEKIT_ACCESSORY_DISPATCH,
|
|
IDENTIFIER_ACCESSORY_ID,
|
|
SUBSCRIBE_COOLDOWN,
|
|
)
|
|
from homeassistant.components.homekit_controller.utils import async_get_controller
|
|
from homeassistant.config_entries import ConfigEntry
|
|
from homeassistant.const import EntityCategory
|
|
from homeassistant.core import HomeAssistant, State, callback
|
|
from homeassistant.helpers import device_registry as dr, entity_registry as er
|
|
from homeassistant.helpers.service_info.bluetooth import BluetoothServiceInfo
|
|
from homeassistant.setup import async_setup_component
|
|
import homeassistant.util.dt as dt_util
|
|
|
|
from tests.common import (
|
|
MockConfigEntry,
|
|
async_fire_time_changed,
|
|
async_get_device_automations,
|
|
load_fixture,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# Root device in test harness always has an accessory id of this
|
|
HUB_TEST_ACCESSORY_ID: Final[str] = "00:00:00:00:00:00:aid:1"
|
|
|
|
TEST_ACCESSORY_ADDRESS = "AA:BB:CC:DD:EE:FF"
|
|
|
|
|
|
TEST_DEVICE_SERVICE_INFO = BluetoothServiceInfo(
|
|
name="test_accessory",
|
|
address=TEST_ACCESSORY_ADDRESS,
|
|
rssi=-56,
|
|
manufacturer_data={},
|
|
service_uuids=["0000ec88-0000-1000-8000-00805f9b34fb"],
|
|
service_data={},
|
|
source="local",
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class EntityTestInfo:
|
|
"""Describes how we expected an entity to be created by homekit_controller."""
|
|
|
|
entity_id: str
|
|
unique_id: str
|
|
friendly_name: str
|
|
state: str
|
|
supported_features: int = 0
|
|
capabilities: dict[str, Any] | None = None
|
|
entity_category: EntityCategory | None = None
|
|
unit_of_measurement: str | None = None
|
|
|
|
|
|
@dataclass
|
|
class DeviceTriggerInfo:
|
|
"""Describe a automation trigger we expect to be created.
|
|
|
|
We only use these for a stateless characteristic like a doorbell.
|
|
"""
|
|
|
|
type: str
|
|
subtype: str
|
|
|
|
|
|
@dataclass
|
|
class DeviceTestInfo:
|
|
"""Describes how we exepced a device to be created by homekit_controlller."""
|
|
|
|
name: str
|
|
manufacturer: str
|
|
model: str
|
|
sw_version: str
|
|
hw_version: str
|
|
|
|
devices: list[DeviceTestInfo]
|
|
entities: list[EntityTestInfo]
|
|
|
|
# At least one of these must be provided
|
|
unique_id: str | None = None
|
|
serial_number: str | None = None
|
|
|
|
# A homekit device can have events but no entity (like a doorbell or remote)
|
|
stateless_triggers: list[DeviceTriggerInfo] | None = None
|
|
|
|
|
|
class Helper:
|
|
"""Helper methods for interacting with HomeKit fakes."""
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
entity_id: str,
|
|
pairing: FakePairing,
|
|
accessory: Accessory,
|
|
config_entry: ConfigEntry,
|
|
) -> None:
|
|
"""Create a helper for a given accessory/entity."""
|
|
self.hass = hass
|
|
self.entity_id = entity_id
|
|
self.pairing = pairing
|
|
self.accessory = accessory
|
|
self.config_entry = config_entry
|
|
|
|
async def async_update(
|
|
self, service: str, characteristics: dict[str, Any]
|
|
) -> State:
|
|
"""Set the characteristics on this service."""
|
|
changes = []
|
|
|
|
service = self.accessory.services.first(service_type=service)
|
|
aid = service.accessory.aid
|
|
|
|
for ctype, value in characteristics.items():
|
|
char = service.characteristics.first(char_types=[ctype])
|
|
changes.append((aid, char.iid, value))
|
|
|
|
self.pairing.testing.update_aid_iid(changes)
|
|
|
|
if not self.pairing.testing.events_enabled:
|
|
# If events aren't enabled, explicitly do a poll
|
|
# If they are enabled, then HA will pick up the changes next time
|
|
# we yield control
|
|
await time_changed(self.hass, 60)
|
|
await time_changed(self.hass, DEBOUNCE_COOLDOWN)
|
|
|
|
await self.hass.async_block_till_done()
|
|
|
|
state = self.hass.states.get(self.entity_id)
|
|
assert state is not None
|
|
return state
|
|
|
|
async def async_set_aid_iid_status(
|
|
self, aid_iid_status: list[tuple[int, int, int]]
|
|
) -> None:
|
|
"""Set the status of a set of aid/iid pairs."""
|
|
self.pairing.testing.set_aid_iid_status(aid_iid_status)
|
|
await self.hass.async_block_till_done()
|
|
|
|
@callback
|
|
def async_assert_service_values(
|
|
self, service: str, characteristics: dict[str, Any]
|
|
) -> None:
|
|
"""Assert a service has characteristics with these values."""
|
|
service = self.accessory.services.first(service_type=service)
|
|
for ctype, value in characteristics.items():
|
|
assert service.value(ctype) == value
|
|
|
|
async def poll_and_get_state(self) -> State:
|
|
"""Trigger a time based poll and return the current entity state."""
|
|
await time_changed(self.hass, 60)
|
|
await time_changed(self.hass, DEBOUNCE_COOLDOWN)
|
|
|
|
state = self.hass.states.get(self.entity_id)
|
|
assert state is not None
|
|
return state
|
|
|
|
|
|
async def time_changed(hass: HomeAssistant, seconds: int) -> None:
|
|
"""Trigger time changed."""
|
|
next_update = dt_util.utcnow() + timedelta(seconds)
|
|
async_fire_time_changed(hass, next_update)
|
|
await hass.async_block_till_done()
|
|
|
|
|
|
async def setup_accessories_from_file(hass: HomeAssistant, path: str) -> Accessories:
|
|
"""Load an collection of accessory defs from JSON data."""
|
|
accessories_fixture = await hass.async_add_executor_job(
|
|
load_fixture, os.path.join("homekit_controller", path)
|
|
)
|
|
accessories_json = hkloads(accessories_fixture)
|
|
return Accessories.from_list(accessories_json)
|
|
|
|
|
|
async def setup_platform(hass: HomeAssistant) -> FakeController:
|
|
"""Load the platform but with a fake Controller API."""
|
|
config = {"discovery": {}}
|
|
|
|
with mock.patch(
|
|
"homeassistant.components.homekit_controller.utils.Controller", FakeController
|
|
):
|
|
await async_setup_component(hass, DOMAIN, config)
|
|
|
|
return await async_get_controller(hass)
|
|
|
|
|
|
async def setup_test_accessories(
|
|
hass: HomeAssistant, accessories: list[Accessory], connection: str | None = None
|
|
) -> tuple[MockConfigEntry, AbstractPairing]:
|
|
"""Load a fake homekit device based on captured JSON profile."""
|
|
fake_controller = await setup_platform(hass)
|
|
return await setup_test_accessories_with_controller(
|
|
hass, accessories, fake_controller, connection
|
|
)
|
|
|
|
|
|
async def setup_test_accessories_with_controller(
|
|
hass: HomeAssistant,
|
|
accessories: list[Accessory],
|
|
fake_controller: FakeController,
|
|
connection: str | None = None,
|
|
) -> tuple[MockConfigEntry, AbstractPairing]:
|
|
"""Load a fake homekit device based on captured JSON profile."""
|
|
|
|
pairing_id = "00:00:00:00:00:00"
|
|
|
|
accessories_obj = Accessories()
|
|
for accessory in accessories:
|
|
accessories_obj.add_accessory(accessory)
|
|
pairing = await fake_controller.add_paired_device(accessories_obj, pairing_id)
|
|
|
|
data = {"AccessoryPairingID": pairing_id}
|
|
if connection == "BLE":
|
|
data["Connection"] = "BLE"
|
|
data["AccessoryAddress"] = TEST_ACCESSORY_ADDRESS
|
|
|
|
config_entry = MockConfigEntry(
|
|
version=1,
|
|
domain="homekit_controller",
|
|
entry_id="TestData",
|
|
data=data,
|
|
title="test",
|
|
)
|
|
config_entry.add_to_hass(hass)
|
|
|
|
await hass.config_entries.async_setup(config_entry.entry_id)
|
|
await time_changed(hass, SUBSCRIBE_COOLDOWN)
|
|
await hass.async_block_till_done()
|
|
|
|
return config_entry, pairing
|
|
|
|
|
|
async def device_config_changed(hass: HomeAssistant, accessories: Accessories):
|
|
"""Discover new devices added to Home Assistant at runtime."""
|
|
# Update the accessories our FakePairing knows about
|
|
controller = hass.data[CONTROLLER]
|
|
pairing: AbstractPairing = controller.pairings["00:00:00:00:00:00"]
|
|
|
|
accessories_obj = Accessories()
|
|
for accessory in accessories:
|
|
accessories_obj.add_accessory(accessory)
|
|
|
|
new_config_num = pairing.config_num + 1
|
|
pairing._async_description_update(
|
|
AbstractDescription(
|
|
name="testdevice.local.",
|
|
id="00:00:00:00:00:00",
|
|
status_flags=0,
|
|
config_num=new_config_num,
|
|
category=1,
|
|
)
|
|
)
|
|
# Set the accessories state only after calling
|
|
# _async_description_update, otherwise the config_num will be
|
|
# overwritten
|
|
pairing._accessories_state = AccessoriesState(accessories_obj, new_config_num)
|
|
|
|
# Wait for services to reconfigure
|
|
await hass.async_block_till_done()
|
|
await hass.async_block_till_done()
|
|
|
|
|
|
async def setup_test_component(
|
|
hass: HomeAssistant,
|
|
aid: int,
|
|
setup_accessory: Callable[[Accessory], Service | None],
|
|
capitalize: bool = False,
|
|
suffix: str | None = None,
|
|
connection: str | None = None,
|
|
) -> Helper:
|
|
"""Load a fake homekit accessory based on a homekit accessory model.
|
|
|
|
If capitalize is True, property names will be in upper case.
|
|
|
|
If suffix is set, entityId will include the suffix
|
|
"""
|
|
accessory = Accessory.create_with_info(
|
|
aid, "TestDevice", "example.com", "Test", "0001", "0.1"
|
|
)
|
|
setup_accessory(accessory)
|
|
|
|
domain = None
|
|
for service in accessory.services:
|
|
service_name = service.type
|
|
if service_name in HOMEKIT_ACCESSORY_DISPATCH:
|
|
domain = HOMEKIT_ACCESSORY_DISPATCH[service_name]
|
|
break
|
|
|
|
assert domain, "Cannot map test homekit services to Home Assistant domain"
|
|
|
|
config_entry, pairing = await setup_test_accessories(hass, [accessory], connection)
|
|
entity = "testdevice" if suffix is None else f"testdevice_{suffix}"
|
|
return Helper(hass, f"{domain}.{entity}", pairing, accessory, config_entry)
|
|
|
|
|
|
async def assert_devices_and_entities_created(
|
|
hass: HomeAssistant, expected: DeviceTestInfo
|
|
):
|
|
"""Check that all expected devices and entities are loaded and enumerated as expected."""
|
|
entity_registry = er.async_get(hass)
|
|
device_registry = dr.async_get(hass)
|
|
|
|
async def _do_assertions(expected: DeviceTestInfo) -> dr.DeviceEntry:
|
|
# Note: homekit_controller currently uses a 3-tuple for device identifiers
|
|
# The current standard is a 2-tuple (hkc was not migrated when this change was brought in)
|
|
|
|
# There are currently really 3 cases here:
|
|
# - We can match exactly one device by serial number. This won't work for devices like the Ryse.
|
|
# These have nlank or broken serial numbers.
|
|
# - The device unique id is "00:00:00:00:00:00" - this is the pairing id. This is only set for
|
|
# the root (bridge) device.
|
|
# - The device unique id is "00:00:00:00:00:00-X", where X is a HAP aid. This is only set when
|
|
# we have detected broken serial numbers (and serial number is not used as an identifier).
|
|
|
|
device = device_registry.async_get_device(
|
|
identifiers={(IDENTIFIER_ACCESSORY_ID, expected.unique_id)}
|
|
)
|
|
|
|
logger.debug("Comparing device %r to %r", device, expected)
|
|
|
|
assert device
|
|
assert device.name == expected.name
|
|
assert device.model == expected.model
|
|
assert device.manufacturer == expected.manufacturer
|
|
assert device.hw_version == expected.hw_version
|
|
assert device.sw_version == expected.sw_version
|
|
|
|
# We might have matched the device by one identifier only
|
|
# Lets check that the other one is correct. Otherwise the test might silently be wrong.
|
|
accessory_id_set = False
|
|
|
|
for key, value in device.identifiers:
|
|
if key == IDENTIFIER_ACCESSORY_ID:
|
|
assert value == expected.unique_id
|
|
accessory_id_set = True
|
|
|
|
# If unique_id or serial is provided it MUST actually appear in the device registry entry.
|
|
assert (not expected.unique_id) ^ accessory_id_set
|
|
|
|
for entity_info in expected.entities:
|
|
entity = entity_registry.async_get(entity_info.entity_id)
|
|
logger.debug("Comparing entity %r to %r", entity, entity_info)
|
|
|
|
assert entity
|
|
assert entity.device_id == device.id
|
|
assert entity.unique_id == entity_info.unique_id
|
|
assert entity.supported_features == entity_info.supported_features
|
|
assert entity.entity_category == entity_info.entity_category
|
|
assert entity.unit_of_measurement == entity_info.unit_of_measurement
|
|
assert entity.capabilities == entity_info.capabilities
|
|
|
|
state = hass.states.get(entity_info.entity_id)
|
|
logger.debug("Comparing state %r to %r", state, entity_info)
|
|
|
|
assert state is not None
|
|
assert state.state == entity_info.state
|
|
assert state.attributes["friendly_name"] == entity_info.friendly_name
|
|
|
|
all_triggers = await async_get_device_automations(
|
|
hass, DeviceAutomationType.TRIGGER, device.id
|
|
)
|
|
stateless_triggers = []
|
|
for trigger in all_triggers:
|
|
if trigger.get("entity_id"):
|
|
continue
|
|
stateless_triggers.append(
|
|
DeviceTriggerInfo(
|
|
type=trigger.get("type"), subtype=trigger.get("subtype")
|
|
)
|
|
)
|
|
assert stateless_triggers == (expected.stateless_triggers or [])
|
|
|
|
for child in expected.devices:
|
|
child_device = await _do_assertions(child)
|
|
assert child_device.via_device_id == device.id
|
|
assert child_device.id != device.id
|
|
|
|
return device
|
|
|
|
root_device = await _do_assertions(expected)
|
|
|
|
# Root device must not have a via, otherwise its not the device
|
|
assert root_device.via_device_id is None
|