mirror of https://github.com/home-assistant/core
467 lines
14 KiB
Python
467 lines
14 KiB
Python
"""Support for RFXtrx devices."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import binascii
|
|
from collections.abc import Callable, Mapping
|
|
import copy
|
|
import logging
|
|
from typing import Any, NamedTuple, cast
|
|
|
|
import RFXtrx as rfxtrxmod
|
|
import voluptuous as vol
|
|
|
|
from homeassistant.config_entries import ConfigEntry
|
|
from homeassistant.const import (
|
|
ATTR_DEVICE_ID,
|
|
CONF_DEVICE,
|
|
CONF_DEVICE_ID,
|
|
CONF_DEVICES,
|
|
CONF_HOST,
|
|
CONF_PORT,
|
|
EVENT_HOMEASSISTANT_STOP,
|
|
Platform,
|
|
)
|
|
from homeassistant.core import Event, HomeAssistant, ServiceCall, callback
|
|
from homeassistant.exceptions import ConfigEntryNotReady
|
|
from homeassistant.helpers import config_validation as cv, device_registry as dr
|
|
from homeassistant.helpers.device_registry import EventDeviceRegistryUpdatedData
|
|
from homeassistant.helpers.dispatcher import (
|
|
async_dispatcher_connect,
|
|
async_dispatcher_send,
|
|
)
|
|
from homeassistant.helpers.entity import Entity
|
|
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
|
|
|
from .const import (
|
|
ATTR_EVENT,
|
|
CONF_AUTOMATIC_ADD,
|
|
CONF_DATA_BITS,
|
|
CONF_PROTOCOLS,
|
|
DATA_RFXOBJECT,
|
|
DEVICE_PACKET_TYPE_LIGHTING4,
|
|
DOMAIN,
|
|
EVENT_RFXTRX_EVENT,
|
|
SERVICE_SEND,
|
|
SIGNAL_EVENT,
|
|
)
|
|
|
|
DEFAULT_OFF_DELAY = 2.0
|
|
|
|
CONNECT_TIMEOUT = 30.0
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class DeviceTuple(NamedTuple):
|
|
"""Representation of a device in rfxtrx."""
|
|
|
|
packettype: str
|
|
subtype: str
|
|
id_string: str
|
|
|
|
|
|
def _bytearray_string(data: Any) -> bytearray:
|
|
val = cv.string(data)
|
|
try:
|
|
return bytearray.fromhex(val)
|
|
except ValueError as err:
|
|
raise vol.Invalid(
|
|
"Data must be a hex string with multiple of two characters"
|
|
) from err
|
|
|
|
|
|
SERVICE_SEND_SCHEMA = vol.Schema({ATTR_EVENT: _bytearray_string})
|
|
|
|
PLATFORMS = [
|
|
Platform.BINARY_SENSOR,
|
|
Platform.COVER,
|
|
Platform.EVENT,
|
|
Platform.LIGHT,
|
|
Platform.SENSOR,
|
|
Platform.SIREN,
|
|
Platform.SWITCH,
|
|
]
|
|
|
|
|
|
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|
"""Set up the RFXtrx component."""
|
|
hass.data.setdefault(DOMAIN, {})
|
|
|
|
await async_setup_internal(hass, entry)
|
|
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
|
|
|
|
return True
|
|
|
|
|
|
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|
"""Unload RFXtrx component."""
|
|
if not await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
|
|
return False
|
|
|
|
hass.services.async_remove(DOMAIN, SERVICE_SEND)
|
|
|
|
rfx_object = hass.data[DOMAIN][DATA_RFXOBJECT]
|
|
await hass.async_add_executor_job(rfx_object.close_connection)
|
|
|
|
hass.data.pop(DOMAIN)
|
|
|
|
return True
|
|
|
|
|
|
def _create_rfx(
|
|
config: Mapping[str, Any], event_callback: Callable[[rfxtrxmod.RFXtrxEvent], None]
|
|
) -> rfxtrxmod.Connect:
|
|
"""Construct a rfx object based on config."""
|
|
|
|
modes = config.get(CONF_PROTOCOLS)
|
|
|
|
if modes:
|
|
_LOGGER.debug("Using modes: %s", ",".join(modes))
|
|
else:
|
|
_LOGGER.debug("No modes defined, using device configuration")
|
|
|
|
if config[CONF_PORT] is not None:
|
|
# If port is set then we create a TCP connection
|
|
transport = rfxtrxmod.PyNetworkTransport((config[CONF_HOST], config[CONF_PORT]))
|
|
else:
|
|
transport = rfxtrxmod.PySerialTransport(config[CONF_DEVICE])
|
|
|
|
rfx = rfxtrxmod.Connect(
|
|
transport,
|
|
event_callback,
|
|
modes=modes,
|
|
)
|
|
|
|
try:
|
|
rfx.connect(CONNECT_TIMEOUT)
|
|
except TimeoutError as exc:
|
|
raise ConfigEntryNotReady("Timeout on connect") from exc
|
|
except rfxtrxmod.RFXtrxTransportError as exc:
|
|
raise ConfigEntryNotReady(str(exc)) from exc
|
|
|
|
return rfx
|
|
|
|
|
|
def _get_device_lookup(
|
|
devices: dict[str, dict[str, Any]],
|
|
) -> dict[DeviceTuple, dict[str, Any]]:
|
|
"""Get a lookup structure for devices."""
|
|
lookup = {}
|
|
for event_code, event_config in devices.items():
|
|
if (event := get_rfx_object(event_code)) is None:
|
|
continue
|
|
device_id = get_device_id(
|
|
event.device, data_bits=event_config.get(CONF_DATA_BITS)
|
|
)
|
|
lookup[device_id] = event_config
|
|
return lookup
|
|
|
|
|
|
async def async_setup_internal(hass: HomeAssistant, entry: ConfigEntry) -> None:
|
|
"""Set up the RFXtrx component."""
|
|
config = entry.data
|
|
|
|
# Setup some per device config
|
|
devices = _get_device_lookup(config[CONF_DEVICES])
|
|
pt2262_devices: set[str] = set()
|
|
|
|
device_registry = dr.async_get(hass)
|
|
|
|
# Declare the Handle event
|
|
@callback
|
|
def async_handle_receive(event: rfxtrxmod.RFXtrxEvent) -> None:
|
|
"""Handle received messages from RFXtrx gateway."""
|
|
|
|
if isinstance(event, rfxtrxmod.ConnectionLost):
|
|
_LOGGER.warning("Connection was lost, triggering reload")
|
|
hass.async_create_task(
|
|
hass.config_entries.async_reload(entry.entry_id),
|
|
f"config entry reload {entry.title} {entry.domain} {entry.entry_id}",
|
|
)
|
|
return
|
|
|
|
if not event.device or not event.device.id_string:
|
|
return
|
|
|
|
event_data = {
|
|
"packet_type": event.device.packettype,
|
|
"sub_type": event.device.subtype,
|
|
"type_string": event.device.type_string,
|
|
"id_string": event.device.id_string,
|
|
"data": binascii.hexlify(event.data).decode("ASCII"),
|
|
"values": getattr(event, "values", None),
|
|
}
|
|
|
|
_LOGGER.debug("Receive RFXCOM event: %s", event_data)
|
|
|
|
data_bits = get_device_data_bits(event.device, devices)
|
|
device_id = get_device_id(event.device, data_bits=data_bits)
|
|
|
|
if device_id not in devices:
|
|
if config[CONF_AUTOMATIC_ADD]:
|
|
_add_device(event, device_id)
|
|
else:
|
|
return
|
|
|
|
if event.device.packettype == DEVICE_PACKET_TYPE_LIGHTING4:
|
|
find_possible_pt2262_device(pt2262_devices, event.device.id_string)
|
|
pt2262_devices.add(event.device.id_string)
|
|
|
|
device_entry = device_registry.async_get_device(
|
|
identifiers={(DOMAIN, *device_id)}, # type: ignore[arg-type]
|
|
)
|
|
if device_entry:
|
|
event_data[ATTR_DEVICE_ID] = device_entry.id
|
|
|
|
# Callback to HA registered components.
|
|
async_dispatcher_send(hass, SIGNAL_EVENT, event, device_id)
|
|
|
|
# Signal event to any other listeners
|
|
hass.bus.async_fire(EVENT_RFXTRX_EVENT, event_data)
|
|
|
|
@callback
|
|
def _add_device(event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple) -> None:
|
|
"""Add a device to config entry."""
|
|
config = {}
|
|
config[CONF_DEVICE_ID] = device_id
|
|
|
|
_LOGGER.debug(
|
|
"Added device (Device ID: %s Class: %s Sub: %s, Event: %s)",
|
|
event.device.id_string.lower(),
|
|
event.device.__class__.__name__,
|
|
event.device.subtype,
|
|
"".join(f"{x:02x}" for x in event.data),
|
|
)
|
|
|
|
data = entry.data.copy()
|
|
data[CONF_DEVICES] = copy.deepcopy(entry.data[CONF_DEVICES])
|
|
event_code = binascii.hexlify(event.data).decode("ASCII")
|
|
data[CONF_DEVICES][event_code] = config
|
|
hass.config_entries.async_update_entry(entry=entry, data=data)
|
|
devices[device_id] = config
|
|
|
|
@callback
|
|
def _remove_device(device_id: DeviceTuple) -> None:
|
|
data = {
|
|
**entry.data,
|
|
CONF_DEVICES: {
|
|
packet_id: entity_info
|
|
for packet_id, entity_info in entry.data[CONF_DEVICES].items()
|
|
if tuple(entity_info.get(CONF_DEVICE_ID)) != device_id
|
|
},
|
|
}
|
|
hass.config_entries.async_update_entry(entry=entry, data=data)
|
|
devices.pop(device_id)
|
|
|
|
@callback
|
|
def _updated_device(event: Event[EventDeviceRegistryUpdatedData]) -> None:
|
|
if event.data["action"] != "remove":
|
|
return
|
|
device_entry = device_registry.deleted_devices[event.data["device_id"]]
|
|
if entry.entry_id not in device_entry.config_entries:
|
|
return
|
|
device_id = get_device_tuple_from_identifiers(device_entry.identifiers)
|
|
if device_id:
|
|
_remove_device(device_id)
|
|
|
|
# Initialize library
|
|
rfx_object = await hass.async_add_executor_job(
|
|
_create_rfx, config, lambda event: hass.add_job(async_handle_receive, event)
|
|
)
|
|
|
|
hass.data[DOMAIN][DATA_RFXOBJECT] = rfx_object
|
|
|
|
entry.async_on_unload(
|
|
hass.bus.async_listen(dr.EVENT_DEVICE_REGISTRY_UPDATED, _updated_device)
|
|
)
|
|
|
|
def _shutdown_rfxtrx(event: Event) -> None:
|
|
"""Close connection with RFXtrx."""
|
|
rfx_object.close_connection()
|
|
|
|
entry.async_on_unload(
|
|
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _shutdown_rfxtrx)
|
|
)
|
|
|
|
def send(call: ServiceCall) -> None:
|
|
event = call.data[ATTR_EVENT]
|
|
rfx_object.transport.send(event)
|
|
|
|
hass.services.async_register(DOMAIN, SERVICE_SEND, send, schema=SERVICE_SEND_SCHEMA)
|
|
|
|
|
|
async def async_setup_platform_entry(
|
|
hass: HomeAssistant,
|
|
config_entry: ConfigEntry,
|
|
async_add_entities: AddEntitiesCallback,
|
|
supported: Callable[[rfxtrxmod.RFXtrxEvent], bool],
|
|
constructor: Callable[
|
|
[
|
|
rfxtrxmod.RFXtrxEvent,
|
|
rfxtrxmod.RFXtrxEvent | None,
|
|
DeviceTuple,
|
|
dict[str, Any],
|
|
],
|
|
list[Entity],
|
|
],
|
|
) -> None:
|
|
"""Set up config entry."""
|
|
entry_data = config_entry.data
|
|
device_ids: set[DeviceTuple] = set()
|
|
|
|
# Add entities from config
|
|
entities = []
|
|
for packet_id, entity_info in entry_data[CONF_DEVICES].items():
|
|
if (event := get_rfx_object(packet_id)) is None:
|
|
_LOGGER.error("Invalid device: %s", packet_id)
|
|
continue
|
|
if not supported(event):
|
|
continue
|
|
|
|
device_id = get_device_id(
|
|
event.device, data_bits=entity_info.get(CONF_DATA_BITS)
|
|
)
|
|
if device_id in device_ids:
|
|
continue
|
|
device_ids.add(device_id)
|
|
|
|
entities.extend(constructor(event, None, device_id, entity_info))
|
|
|
|
async_add_entities(entities)
|
|
|
|
# If automatic add is on, hookup listener
|
|
if entry_data[CONF_AUTOMATIC_ADD]:
|
|
|
|
@callback
|
|
def _update(event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple) -> None:
|
|
"""Handle light updates from the RFXtrx gateway."""
|
|
if not supported(event):
|
|
return
|
|
|
|
if device_id in device_ids:
|
|
return
|
|
device_ids.add(device_id)
|
|
async_add_entities(constructor(event, event, device_id, {}))
|
|
|
|
config_entry.async_on_unload(
|
|
async_dispatcher_connect(hass, SIGNAL_EVENT, _update)
|
|
)
|
|
|
|
|
|
def get_rfx_object(packetid: str) -> rfxtrxmod.RFXtrxEvent | None:
|
|
"""Return the RFXObject with the packetid."""
|
|
try:
|
|
binarypacket = bytearray.fromhex(packetid)
|
|
except ValueError:
|
|
return None
|
|
return rfxtrxmod.RFXtrxTransport.parse(binarypacket)
|
|
|
|
|
|
def get_pt2262_deviceid(device_id: str, nb_data_bits: int | None) -> bytes | None:
|
|
"""Extract and return the address bits from a Lighting4/PT2262 packet."""
|
|
if nb_data_bits is None:
|
|
return None
|
|
|
|
try:
|
|
data = bytearray.fromhex(device_id)
|
|
except ValueError:
|
|
return None
|
|
mask = 0xFF & ~((1 << nb_data_bits) - 1)
|
|
|
|
data[len(data) - 1] &= mask
|
|
|
|
return binascii.hexlify(data)
|
|
|
|
|
|
def get_pt2262_cmd(device_id: str, data_bits: int) -> str | None:
|
|
"""Extract and return the data bits from a Lighting4/PT2262 packet."""
|
|
try:
|
|
data = bytearray.fromhex(device_id)
|
|
except ValueError:
|
|
return None
|
|
|
|
mask = 0xFF & ((1 << data_bits) - 1)
|
|
|
|
return hex(data[-1] & mask)
|
|
|
|
|
|
def get_device_data_bits(
|
|
device: rfxtrxmod.RFXtrxDevice, devices: dict[DeviceTuple, dict[str, Any]]
|
|
) -> int | None:
|
|
"""Deduce data bits for device based on a cache of device bits."""
|
|
data_bits = None
|
|
if device.packettype == DEVICE_PACKET_TYPE_LIGHTING4:
|
|
for device_id, entity_config in devices.items():
|
|
bits = entity_config.get(CONF_DATA_BITS)
|
|
if get_device_id(device, bits) == device_id:
|
|
data_bits = bits
|
|
break
|
|
return data_bits
|
|
|
|
|
|
def find_possible_pt2262_device(device_ids: set[str], device_id: str) -> str | None:
|
|
"""Look for the device which id matches the given device_id parameter."""
|
|
for dev_id in device_ids:
|
|
if len(dev_id) == len(device_id):
|
|
size = None
|
|
for i, (char1, char2) in enumerate(zip(dev_id, device_id, strict=False)):
|
|
if char1 != char2:
|
|
break
|
|
size = i
|
|
if size is not None:
|
|
size = len(dev_id) - size - 1
|
|
_LOGGER.debug(
|
|
(
|
|
"Found possible device %s for %s "
|
|
"with the following configuration:\n"
|
|
"data_bits=%d\n"
|
|
"command_on=0x%s\n"
|
|
"command_off=0x%s\n"
|
|
),
|
|
device_id,
|
|
dev_id,
|
|
size * 4,
|
|
dev_id[-size:],
|
|
device_id[-size:],
|
|
)
|
|
return dev_id
|
|
return None
|
|
|
|
|
|
def get_device_id(
|
|
device: rfxtrxmod.RFXtrxDevice, data_bits: int | None = None
|
|
) -> DeviceTuple:
|
|
"""Calculate a device id for device."""
|
|
id_string: str = device.id_string
|
|
if (
|
|
data_bits
|
|
and device.packettype == DEVICE_PACKET_TYPE_LIGHTING4
|
|
and (masked_id := get_pt2262_deviceid(id_string, data_bits))
|
|
):
|
|
id_string = masked_id.decode("ASCII")
|
|
|
|
return DeviceTuple(f"{device.packettype:x}", f"{device.subtype:x}", id_string)
|
|
|
|
|
|
def get_device_tuple_from_identifiers(
|
|
identifiers: set[tuple[str, str]],
|
|
) -> DeviceTuple | None:
|
|
"""Calculate the device tuple from a device entry."""
|
|
identifier = next((x for x in identifiers if x[0] == DOMAIN and len(x) == 4), None)
|
|
if not identifier:
|
|
return None
|
|
# work around legacy identifier, being a multi tuple value
|
|
identifier2 = cast(tuple[str, str, str, str], identifier)
|
|
return DeviceTuple(identifier2[1], identifier2[2], identifier2[3])
|
|
|
|
|
|
async def async_remove_config_entry_device(
|
|
hass: HomeAssistant, config_entry: ConfigEntry, device_entry: dr.DeviceEntry
|
|
) -> bool:
|
|
"""Remove config entry from a device.
|
|
|
|
The actual cleanup is done in the device registry event
|
|
"""
|
|
return True
|