mirror of https://github.com/home-assistant/core
292 lines
9.1 KiB
Python
292 lines
9.1 KiB
Python
"""KNX integration services."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from functools import partial
|
|
import logging
|
|
from typing import TYPE_CHECKING
|
|
|
|
import voluptuous as vol
|
|
from xknx.dpt import DPTArray, DPTBase, DPTBinary
|
|
from xknx.exceptions import ConversionError
|
|
from xknx.telegram import Telegram
|
|
from xknx.telegram.address import parse_device_group_address
|
|
from xknx.telegram.apci import GroupValueRead, GroupValueResponse, GroupValueWrite
|
|
|
|
from homeassistant.const import CONF_TYPE, SERVICE_RELOAD
|
|
from homeassistant.core import HomeAssistant, ServiceCall, callback
|
|
from homeassistant.exceptions import HomeAssistantError, ServiceValidationError
|
|
import homeassistant.helpers.config_validation as cv
|
|
from homeassistant.helpers.service import async_register_admin_service
|
|
|
|
from .const import (
|
|
DOMAIN,
|
|
KNX_ADDRESS,
|
|
KNX_MODULE_KEY,
|
|
SERVICE_KNX_ATTR_PAYLOAD,
|
|
SERVICE_KNX_ATTR_REMOVE,
|
|
SERVICE_KNX_ATTR_RESPONSE,
|
|
SERVICE_KNX_ATTR_TYPE,
|
|
SERVICE_KNX_EVENT_REGISTER,
|
|
SERVICE_KNX_EXPOSURE_REGISTER,
|
|
SERVICE_KNX_READ,
|
|
SERVICE_KNX_SEND,
|
|
)
|
|
from .expose import create_knx_exposure
|
|
from .schema import ExposeSchema, dpt_base_type_validator, ga_validator
|
|
|
|
if TYPE_CHECKING:
|
|
from . import KNXModule
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
@callback
|
|
def register_knx_services(hass: HomeAssistant) -> None:
|
|
"""Register KNX integration services."""
|
|
hass.services.async_register(
|
|
DOMAIN,
|
|
SERVICE_KNX_SEND,
|
|
partial(service_send_to_knx_bus, hass),
|
|
schema=SERVICE_KNX_SEND_SCHEMA,
|
|
)
|
|
|
|
hass.services.async_register(
|
|
DOMAIN,
|
|
SERVICE_KNX_READ,
|
|
partial(service_read_to_knx_bus, hass),
|
|
schema=SERVICE_KNX_READ_SCHEMA,
|
|
)
|
|
|
|
async_register_admin_service(
|
|
hass,
|
|
DOMAIN,
|
|
SERVICE_KNX_EVENT_REGISTER,
|
|
partial(service_event_register_modify, hass),
|
|
schema=SERVICE_KNX_EVENT_REGISTER_SCHEMA,
|
|
)
|
|
|
|
async_register_admin_service(
|
|
hass,
|
|
DOMAIN,
|
|
SERVICE_KNX_EXPOSURE_REGISTER,
|
|
partial(service_exposure_register_modify, hass),
|
|
schema=SERVICE_KNX_EXPOSURE_REGISTER_SCHEMA,
|
|
)
|
|
|
|
async_register_admin_service(
|
|
hass,
|
|
DOMAIN,
|
|
SERVICE_RELOAD,
|
|
partial(service_reload_integration, hass),
|
|
)
|
|
|
|
|
|
@callback
|
|
def get_knx_module(hass: HomeAssistant) -> KNXModule:
|
|
"""Return KNXModule instance."""
|
|
try:
|
|
return hass.data[KNX_MODULE_KEY]
|
|
except KeyError as err:
|
|
raise HomeAssistantError("KNX entry not loaded") from err
|
|
|
|
|
|
SERVICE_KNX_EVENT_REGISTER_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Required(KNX_ADDRESS): vol.All(
|
|
cv.ensure_list,
|
|
[ga_validator],
|
|
),
|
|
vol.Optional(CONF_TYPE): dpt_base_type_validator,
|
|
vol.Optional(SERVICE_KNX_ATTR_REMOVE, default=False): cv.boolean,
|
|
}
|
|
)
|
|
|
|
|
|
async def service_event_register_modify(hass: HomeAssistant, call: ServiceCall) -> None:
|
|
"""Service for adding or removing a GroupAddress to the knx_event filter."""
|
|
knx_module = get_knx_module(hass)
|
|
|
|
attr_address = call.data[KNX_ADDRESS]
|
|
group_addresses = list(map(parse_device_group_address, attr_address))
|
|
|
|
if call.data.get(SERVICE_KNX_ATTR_REMOVE):
|
|
for group_address in group_addresses:
|
|
try:
|
|
knx_module.knx_event_callback.group_addresses.remove(group_address)
|
|
except ValueError:
|
|
_LOGGER.warning(
|
|
"Service event_register could not remove event for '%s'",
|
|
str(group_address),
|
|
)
|
|
if group_address in knx_module.group_address_transcoder:
|
|
del knx_module.group_address_transcoder[group_address]
|
|
return
|
|
|
|
if (dpt := call.data.get(CONF_TYPE)) and (
|
|
transcoder := DPTBase.parse_transcoder(dpt)
|
|
):
|
|
knx_module.group_address_transcoder.update(
|
|
{_address: transcoder for _address in group_addresses}
|
|
)
|
|
for group_address in group_addresses:
|
|
if group_address in knx_module.knx_event_callback.group_addresses:
|
|
continue
|
|
knx_module.knx_event_callback.group_addresses.append(group_address)
|
|
_LOGGER.debug(
|
|
"Service event_register registered event for '%s'",
|
|
str(group_address),
|
|
)
|
|
|
|
|
|
SERVICE_KNX_EXPOSURE_REGISTER_SCHEMA = vol.Any(
|
|
ExposeSchema.EXPOSE_SENSOR_SCHEMA.extend(
|
|
{
|
|
vol.Optional(SERVICE_KNX_ATTR_REMOVE, default=False): cv.boolean,
|
|
}
|
|
),
|
|
vol.Schema(
|
|
# for removing only `address` is required
|
|
{
|
|
vol.Required(KNX_ADDRESS): ga_validator,
|
|
vol.Required(SERVICE_KNX_ATTR_REMOVE): vol.All(cv.boolean, True),
|
|
},
|
|
extra=vol.ALLOW_EXTRA,
|
|
),
|
|
)
|
|
|
|
|
|
async def service_exposure_register_modify(
|
|
hass: HomeAssistant, call: ServiceCall
|
|
) -> None:
|
|
"""Service for adding or removing an exposure to KNX bus."""
|
|
knx_module = get_knx_module(hass)
|
|
|
|
group_address = call.data[KNX_ADDRESS]
|
|
|
|
if call.data.get(SERVICE_KNX_ATTR_REMOVE):
|
|
try:
|
|
removed_exposure = knx_module.service_exposures.pop(group_address)
|
|
except KeyError as err:
|
|
raise ServiceValidationError(
|
|
f"Could not find exposure for '{group_address}' to remove."
|
|
) from err
|
|
|
|
removed_exposure.async_remove()
|
|
return
|
|
|
|
if group_address in knx_module.service_exposures:
|
|
replaced_exposure = knx_module.service_exposures.pop(group_address)
|
|
_LOGGER.warning(
|
|
(
|
|
"Service exposure_register replacing already registered exposure"
|
|
" for '%s' - %s"
|
|
),
|
|
group_address,
|
|
replaced_exposure.device.name,
|
|
)
|
|
replaced_exposure.async_remove()
|
|
exposure = create_knx_exposure(knx_module.hass, knx_module.xknx, call.data)
|
|
knx_module.service_exposures[group_address] = exposure
|
|
_LOGGER.debug(
|
|
"Service exposure_register registered exposure for '%s' - %s",
|
|
group_address,
|
|
exposure.device.name,
|
|
)
|
|
|
|
|
|
SERVICE_KNX_SEND_SCHEMA = vol.Any(
|
|
vol.Schema(
|
|
{
|
|
vol.Required(KNX_ADDRESS): vol.All(
|
|
cv.ensure_list,
|
|
[ga_validator],
|
|
),
|
|
vol.Required(SERVICE_KNX_ATTR_PAYLOAD): cv.match_all,
|
|
vol.Required(SERVICE_KNX_ATTR_TYPE): dpt_base_type_validator,
|
|
vol.Optional(SERVICE_KNX_ATTR_RESPONSE, default=False): cv.boolean,
|
|
}
|
|
),
|
|
vol.Schema(
|
|
# without type given payload is treated as raw bytes
|
|
{
|
|
vol.Required(KNX_ADDRESS): vol.All(
|
|
cv.ensure_list,
|
|
[ga_validator],
|
|
),
|
|
vol.Required(SERVICE_KNX_ATTR_PAYLOAD): vol.Any(
|
|
cv.positive_int, [cv.positive_int]
|
|
),
|
|
vol.Optional(SERVICE_KNX_ATTR_RESPONSE, default=False): cv.boolean,
|
|
}
|
|
),
|
|
)
|
|
|
|
|
|
async def service_send_to_knx_bus(hass: HomeAssistant, call: ServiceCall) -> None:
|
|
"""Service for sending an arbitrary KNX message to the KNX bus."""
|
|
knx_module = get_knx_module(hass)
|
|
|
|
attr_address = call.data[KNX_ADDRESS]
|
|
attr_payload = call.data[SERVICE_KNX_ATTR_PAYLOAD]
|
|
attr_type = call.data.get(SERVICE_KNX_ATTR_TYPE)
|
|
attr_response = call.data[SERVICE_KNX_ATTR_RESPONSE]
|
|
|
|
payload: DPTBinary | DPTArray
|
|
if attr_type is not None:
|
|
transcoder = DPTBase.parse_transcoder(attr_type)
|
|
if transcoder is None:
|
|
raise ServiceValidationError(
|
|
f"Invalid type for knx.send service: {attr_type}"
|
|
)
|
|
try:
|
|
payload = transcoder.to_knx(attr_payload)
|
|
except ConversionError as err:
|
|
raise ServiceValidationError(
|
|
f"Invalid payload for knx.send service: {err}"
|
|
) from err
|
|
elif isinstance(attr_payload, int):
|
|
payload = DPTBinary(attr_payload)
|
|
else:
|
|
payload = DPTArray(attr_payload)
|
|
|
|
for address in attr_address:
|
|
telegram = Telegram(
|
|
destination_address=parse_device_group_address(address),
|
|
payload=GroupValueResponse(payload)
|
|
if attr_response
|
|
else GroupValueWrite(payload),
|
|
source_address=knx_module.xknx.current_address,
|
|
)
|
|
await knx_module.xknx.telegrams.put(telegram)
|
|
|
|
|
|
SERVICE_KNX_READ_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Required(KNX_ADDRESS): vol.All(
|
|
cv.ensure_list,
|
|
[ga_validator],
|
|
)
|
|
}
|
|
)
|
|
|
|
|
|
async def service_read_to_knx_bus(hass: HomeAssistant, call: ServiceCall) -> None:
|
|
"""Service for sending a GroupValueRead telegram to the KNX bus."""
|
|
knx_module = get_knx_module(hass)
|
|
|
|
for address in call.data[KNX_ADDRESS]:
|
|
telegram = Telegram(
|
|
destination_address=parse_device_group_address(address),
|
|
payload=GroupValueRead(),
|
|
source_address=knx_module.xknx.current_address,
|
|
)
|
|
await knx_module.xknx.telegrams.put(telegram)
|
|
|
|
|
|
async def service_reload_integration(hass: HomeAssistant, call: ServiceCall) -> None:
|
|
"""Reload the integration."""
|
|
knx_module = get_knx_module(hass)
|
|
await hass.config_entries.async_reload(knx_module.entry.entry_id)
|
|
hass.bus.async_fire(f"event_{DOMAIN}_reloaded", context=call.context)
|