mirror of https://github.com/home-assistant/core
185 lines
6.4 KiB
Python
185 lines
6.4 KiB
Python
"""KNX Telegram handler."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections import deque
|
|
from typing import Final, TypedDict
|
|
|
|
from xknx import XKNX
|
|
from xknx.dpt import DPTArray, DPTBase, DPTBinary
|
|
from xknx.dpt.dpt import DPTComplexData, DPTEnumData
|
|
from xknx.exceptions import XKNXException
|
|
from xknx.telegram import Telegram, TelegramDirection
|
|
from xknx.telegram.apci import GroupValueResponse, GroupValueWrite
|
|
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers.dispatcher import async_dispatcher_send
|
|
from homeassistant.helpers.storage import Store
|
|
import homeassistant.util.dt as dt_util
|
|
from homeassistant.util.signal_type import SignalType
|
|
|
|
from .const import DOMAIN
|
|
from .project import KNXProject
|
|
|
|
STORAGE_VERSION: Final = 1
|
|
STORAGE_KEY: Final = f"{DOMAIN}/telegrams_history.json"
|
|
|
|
# dispatcher signal for KNX interface device triggers
|
|
SIGNAL_KNX_TELEGRAM: SignalType[Telegram, TelegramDict] = SignalType("knx_telegram")
|
|
|
|
|
|
class DecodedTelegramPayload(TypedDict):
|
|
"""Decoded payload value and metadata."""
|
|
|
|
dpt_main: int | None
|
|
dpt_sub: int | None
|
|
dpt_name: str | None
|
|
unit: str | None
|
|
value: bool | str | int | float | dict[str, str | int | float | bool] | None
|
|
|
|
|
|
class TelegramDict(DecodedTelegramPayload):
|
|
"""Represent a Telegram as a dict."""
|
|
|
|
# this has to be in sync with the frontend implementation
|
|
destination: str
|
|
destination_name: str
|
|
direction: str
|
|
payload: int | tuple[int, ...] | None
|
|
source: str
|
|
source_name: str
|
|
telegramtype: str
|
|
timestamp: str # ISO format
|
|
|
|
|
|
class Telegrams:
|
|
"""Class to handle KNX telegrams."""
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
xknx: XKNX,
|
|
project: KNXProject,
|
|
log_size: int,
|
|
) -> None:
|
|
"""Initialize Telegrams class."""
|
|
self.hass = hass
|
|
self.project = project
|
|
self._history_store = Store[list[TelegramDict]](
|
|
hass, STORAGE_VERSION, STORAGE_KEY
|
|
)
|
|
self._xknx_telegram_cb_handle = (
|
|
xknx.telegram_queue.register_telegram_received_cb(
|
|
telegram_received_cb=self._xknx_telegram_cb,
|
|
match_for_outgoing=True,
|
|
)
|
|
)
|
|
self.recent_telegrams: deque[TelegramDict] = deque(maxlen=log_size)
|
|
self.last_ga_telegrams: dict[str, TelegramDict] = {}
|
|
|
|
async def load_history(self) -> None:
|
|
"""Load history from store."""
|
|
if (telegrams := await self._history_store.async_load()) is None:
|
|
return
|
|
if self.recent_telegrams.maxlen == 0:
|
|
await self._history_store.async_remove()
|
|
return
|
|
for telegram in telegrams:
|
|
# tuples are stored as lists in JSON
|
|
if isinstance(telegram["payload"], list):
|
|
telegram["payload"] = tuple(telegram["payload"]) # type: ignore[unreachable]
|
|
self.recent_telegrams.extend(telegrams)
|
|
self.last_ga_telegrams = {
|
|
t["destination"]: t for t in telegrams if t["payload"] is not None
|
|
}
|
|
|
|
async def save_history(self) -> None:
|
|
"""Save history to store."""
|
|
if self.recent_telegrams:
|
|
await self._history_store.async_save(list(self.recent_telegrams))
|
|
|
|
def _xknx_telegram_cb(self, telegram: Telegram) -> None:
|
|
"""Handle incoming and outgoing telegrams from xknx."""
|
|
telegram_dict = self.telegram_to_dict(telegram)
|
|
self.recent_telegrams.append(telegram_dict)
|
|
if telegram_dict["payload"] is not None:
|
|
# exclude GroupValueRead telegrams
|
|
self.last_ga_telegrams[telegram_dict["destination"]] = telegram_dict
|
|
async_dispatcher_send(self.hass, SIGNAL_KNX_TELEGRAM, telegram, telegram_dict)
|
|
|
|
def telegram_to_dict(self, telegram: Telegram) -> TelegramDict:
|
|
"""Convert a Telegram to a dict."""
|
|
dst_name = ""
|
|
payload_data: int | tuple[int, ...] | None = None
|
|
src_name = ""
|
|
transcoder = None
|
|
value = None
|
|
|
|
if (
|
|
ga_info := self.project.group_addresses.get(
|
|
f"{telegram.destination_address}"
|
|
)
|
|
) is not None:
|
|
dst_name = ga_info.name
|
|
|
|
if (
|
|
device := self.project.devices.get(f"{telegram.source_address}")
|
|
) is not None:
|
|
src_name = f"{device['manufacturer_name']} {device['name']}"
|
|
elif telegram.direction is TelegramDirection.OUTGOING:
|
|
src_name = "Home Assistant"
|
|
|
|
if isinstance(telegram.payload, (GroupValueWrite, GroupValueResponse)):
|
|
payload_data = telegram.payload.value.value
|
|
|
|
if telegram.decoded_data is not None:
|
|
transcoder = telegram.decoded_data.transcoder
|
|
value = _serializable_decoded_data(telegram.decoded_data.value)
|
|
|
|
return TelegramDict(
|
|
destination=f"{telegram.destination_address}",
|
|
destination_name=dst_name,
|
|
direction=telegram.direction.value,
|
|
dpt_main=transcoder.dpt_main_number if transcoder is not None else None,
|
|
dpt_sub=transcoder.dpt_sub_number if transcoder is not None else None,
|
|
dpt_name=transcoder.value_type if transcoder is not None else None,
|
|
payload=payload_data,
|
|
source=f"{telegram.source_address}",
|
|
source_name=src_name,
|
|
telegramtype=telegram.payload.__class__.__name__,
|
|
timestamp=dt_util.now().isoformat(),
|
|
unit=transcoder.unit if transcoder is not None else None,
|
|
value=value,
|
|
)
|
|
|
|
|
|
def _serializable_decoded_data(
|
|
value: bool | float | str | DPTComplexData | DPTEnumData,
|
|
) -> bool | str | int | float | dict[str, str | int | float | bool]:
|
|
"""Return a serializable representation of decoded data."""
|
|
if isinstance(value, DPTComplexData):
|
|
return value.as_dict()
|
|
if isinstance(value, DPTEnumData):
|
|
return value.name.lower()
|
|
return value
|
|
|
|
|
|
def decode_telegram_payload(
|
|
payload: DPTArray | DPTBinary, transcoder: type[DPTBase]
|
|
) -> DecodedTelegramPayload:
|
|
"""Decode the payload of a KNX telegram with custom transcoder."""
|
|
try:
|
|
value = transcoder.from_knx(payload)
|
|
except XKNXException:
|
|
value = "Error decoding value"
|
|
|
|
value = _serializable_decoded_data(value)
|
|
|
|
return DecodedTelegramPayload(
|
|
dpt_main=transcoder.dpt_main_number,
|
|
dpt_sub=transcoder.dpt_sub_number,
|
|
dpt_name=transcoder.value_type,
|
|
unit=transcoder.unit,
|
|
value=value,
|
|
)
|