mirror of https://github.com/home-assistant/core
444 lines
15 KiB
Python
444 lines
15 KiB
Python
"""Common code for tplink."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from abc import ABC, abstractmethod
|
|
from collections.abc import Awaitable, Callable, Coroutine, Mapping
|
|
from dataclasses import dataclass, replace
|
|
import logging
|
|
from typing import Any, Concatenate
|
|
|
|
from kasa import (
|
|
AuthenticationError,
|
|
Device,
|
|
DeviceType,
|
|
Feature,
|
|
KasaException,
|
|
TimeoutError,
|
|
)
|
|
|
|
from homeassistant.const import EntityCategory
|
|
from homeassistant.core import HomeAssistant, callback
|
|
from homeassistant.exceptions import HomeAssistantError
|
|
from homeassistant.helpers import device_registry as dr
|
|
from homeassistant.helpers.device_registry import DeviceInfo
|
|
from homeassistant.helpers.entity import EntityDescription
|
|
from homeassistant.helpers.typing import UNDEFINED, UndefinedType
|
|
from homeassistant.helpers.update_coordinator import CoordinatorEntity
|
|
|
|
from . import get_device_name, legacy_device_id
|
|
from .const import (
|
|
ATTR_CURRENT_A,
|
|
ATTR_CURRENT_POWER_W,
|
|
ATTR_TODAY_ENERGY_KWH,
|
|
ATTR_TOTAL_ENERGY_KWH,
|
|
DOMAIN,
|
|
PRIMARY_STATE_ID,
|
|
)
|
|
from .coordinator import TPLinkDataUpdateCoordinator
|
|
from .deprecate import DeprecatedInfo, async_check_create_deprecated
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
# Mapping from upstream category to homeassistant category
|
|
FEATURE_CATEGORY_TO_ENTITY_CATEGORY = {
|
|
Feature.Category.Config: EntityCategory.CONFIG,
|
|
Feature.Category.Info: EntityCategory.DIAGNOSTIC,
|
|
Feature.Category.Debug: EntityCategory.DIAGNOSTIC,
|
|
}
|
|
|
|
# Skips creating entities for primary features supported by a specialized platform.
|
|
# For example, we do not need a separate "state" switch for light bulbs.
|
|
DEVICETYPES_WITH_SPECIALIZED_PLATFORMS = {
|
|
DeviceType.Bulb,
|
|
DeviceType.LightStrip,
|
|
DeviceType.Dimmer,
|
|
DeviceType.Fan,
|
|
DeviceType.Thermostat,
|
|
}
|
|
|
|
# Primary features to always include even when the device type has its own platform
|
|
FEATURES_ALLOW_LIST = {
|
|
# lights have current_consumption and a specialized platform
|
|
"current_consumption"
|
|
}
|
|
|
|
|
|
# Features excluded due to future platform additions
|
|
EXCLUDED_FEATURES = {
|
|
# update
|
|
"current_firmware_version",
|
|
"available_firmware_version",
|
|
"update_available",
|
|
"check_latest_firmware",
|
|
# siren
|
|
"alarm",
|
|
}
|
|
|
|
|
|
LEGACY_KEY_MAPPING = {
|
|
"current": ATTR_CURRENT_A,
|
|
"current_consumption": ATTR_CURRENT_POWER_W,
|
|
"consumption_today": ATTR_TODAY_ENERGY_KWH,
|
|
"consumption_total": ATTR_TOTAL_ENERGY_KWH,
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True, kw_only=True)
|
|
class TPLinkFeatureEntityDescription(EntityDescription):
|
|
"""Base class for a TPLink feature based entity description."""
|
|
|
|
deprecated_info: DeprecatedInfo | None = None
|
|
|
|
|
|
def async_refresh_after[_T: CoordinatedTPLinkEntity, **_P](
|
|
func: Callable[Concatenate[_T, _P], Awaitable[None]],
|
|
) -> Callable[Concatenate[_T, _P], Coroutine[Any, Any, None]]:
|
|
"""Define a wrapper to raise HA errors and refresh after."""
|
|
|
|
async def _async_wrap(self: _T, *args: _P.args, **kwargs: _P.kwargs) -> None:
|
|
try:
|
|
await func(self, *args, **kwargs)
|
|
except AuthenticationError as ex:
|
|
self.coordinator.config_entry.async_start_reauth(self.hass)
|
|
raise HomeAssistantError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="device_authentication",
|
|
translation_placeholders={
|
|
"func": func.__name__,
|
|
"exc": str(ex),
|
|
},
|
|
) from ex
|
|
except TimeoutError as ex:
|
|
raise HomeAssistantError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="device_timeout",
|
|
translation_placeholders={
|
|
"func": func.__name__,
|
|
"exc": str(ex),
|
|
},
|
|
) from ex
|
|
except KasaException as ex:
|
|
raise HomeAssistantError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="device_error",
|
|
translation_placeholders={
|
|
"func": func.__name__,
|
|
"exc": str(ex),
|
|
},
|
|
) from ex
|
|
await self.coordinator.async_request_refresh()
|
|
|
|
return _async_wrap
|
|
|
|
|
|
class CoordinatedTPLinkEntity(CoordinatorEntity[TPLinkDataUpdateCoordinator], ABC):
|
|
"""Common base class for all coordinated tplink entities."""
|
|
|
|
_attr_has_entity_name = True
|
|
_device: Device
|
|
|
|
def __init__(
|
|
self,
|
|
device: Device,
|
|
coordinator: TPLinkDataUpdateCoordinator,
|
|
*,
|
|
feature: Feature | None = None,
|
|
parent: Device | None = None,
|
|
) -> None:
|
|
"""Initialize the entity."""
|
|
super().__init__(coordinator)
|
|
self._device: Device = device
|
|
self._feature = feature
|
|
|
|
registry_device = device
|
|
device_name = get_device_name(device, parent=parent)
|
|
if parent and parent.device_type is not Device.Type.Hub:
|
|
if not feature or feature.id == PRIMARY_STATE_ID:
|
|
# Entity will be added to parent if not a hub and no feature
|
|
# parameter (i.e. core platform like Light, Fan) or the feature
|
|
# is the primary state
|
|
registry_device = parent
|
|
device_name = get_device_name(registry_device)
|
|
else:
|
|
# Prefix the device name with the parent name unless it is a
|
|
# hub attached device. Sensible default for child devices like
|
|
# strip plugs or the ks240 where the child alias makes more
|
|
# sense in the context of the parent. i.e. Hall Ceiling Fan &
|
|
# Bedroom Ceiling Fan; Child device aliases will be Ceiling Fan
|
|
# and Dimmer Switch for both so should be distinguished by the
|
|
# parent name.
|
|
device_name = f"{get_device_name(parent)} {get_device_name(device, parent=parent)}"
|
|
|
|
self._attr_device_info = DeviceInfo(
|
|
identifiers={(DOMAIN, str(registry_device.device_id))},
|
|
manufacturer="TP-Link",
|
|
model=registry_device.model,
|
|
name=device_name,
|
|
sw_version=registry_device.hw_info["sw_ver"],
|
|
hw_version=registry_device.hw_info["hw_ver"],
|
|
)
|
|
|
|
if (
|
|
parent is not None
|
|
and parent != registry_device
|
|
and parent.device_type is not Device.Type.WallSwitch
|
|
):
|
|
self._attr_device_info["via_device"] = (DOMAIN, parent.device_id)
|
|
else:
|
|
self._attr_device_info["connections"] = {
|
|
(dr.CONNECTION_NETWORK_MAC, device.mac)
|
|
}
|
|
|
|
self._attr_unique_id = self._get_unique_id()
|
|
|
|
self._async_call_update_attrs()
|
|
|
|
def _get_unique_id(self) -> str:
|
|
"""Return unique ID for the entity."""
|
|
return legacy_device_id(self._device)
|
|
|
|
@abstractmethod
|
|
@callback
|
|
def _async_update_attrs(self) -> None:
|
|
"""Platforms implement this to update the entity internals."""
|
|
raise NotImplementedError
|
|
|
|
@callback
|
|
def _async_call_update_attrs(self) -> None:
|
|
"""Call update_attrs and make entity unavailable on errors."""
|
|
try:
|
|
self._async_update_attrs()
|
|
except Exception as ex: # noqa: BLE001
|
|
if self._attr_available:
|
|
_LOGGER.warning(
|
|
"Unable to read data for %s %s: %s",
|
|
self._device,
|
|
self.entity_id,
|
|
ex,
|
|
)
|
|
self._attr_available = False
|
|
else:
|
|
self._attr_available = True
|
|
|
|
@callback
|
|
def _handle_coordinator_update(self) -> None:
|
|
"""Handle updated data from the coordinator."""
|
|
self._async_call_update_attrs()
|
|
super()._handle_coordinator_update()
|
|
|
|
@property
|
|
def available(self) -> bool:
|
|
"""Return if entity is available."""
|
|
return self.coordinator.last_update_success and self._attr_available
|
|
|
|
|
|
class CoordinatedTPLinkFeatureEntity(CoordinatedTPLinkEntity, ABC):
|
|
"""Common base class for all coordinated tplink feature entities."""
|
|
|
|
entity_description: TPLinkFeatureEntityDescription
|
|
_feature: Feature
|
|
|
|
def __init__(
|
|
self,
|
|
device: Device,
|
|
coordinator: TPLinkDataUpdateCoordinator,
|
|
*,
|
|
feature: Feature,
|
|
description: TPLinkFeatureEntityDescription,
|
|
parent: Device | None = None,
|
|
) -> None:
|
|
"""Initialize the entity."""
|
|
self.entity_description = description
|
|
super().__init__(device, coordinator, parent=parent, feature=feature)
|
|
|
|
def _get_unique_id(self) -> str:
|
|
"""Return unique ID for the entity."""
|
|
return self._get_feature_unique_id(self._device, self.entity_description)
|
|
|
|
@staticmethod
|
|
def _get_feature_unique_id(
|
|
device: Device, entity_description: TPLinkFeatureEntityDescription
|
|
) -> str:
|
|
"""Return unique ID for the entity."""
|
|
key = entity_description.key
|
|
# The unique id for the state feature in the switch platform is the
|
|
# device_id
|
|
if key == PRIMARY_STATE_ID:
|
|
return legacy_device_id(device)
|
|
|
|
# Historically the legacy device emeter attributes which are now
|
|
# replaced with features used slightly different keys. This ensures
|
|
# that those entities are not orphaned. Returns the mapped key or the
|
|
# provided key if not mapped.
|
|
key = LEGACY_KEY_MAPPING.get(key, key)
|
|
return f"{legacy_device_id(device)}_{key}"
|
|
|
|
@classmethod
|
|
def _category_for_feature(cls, feature: Feature | None) -> EntityCategory | None:
|
|
"""Return entity category for a feature."""
|
|
# Main controls have no category
|
|
if feature is None or feature.category is Feature.Category.Primary:
|
|
return None
|
|
|
|
if (
|
|
entity_category := FEATURE_CATEGORY_TO_ENTITY_CATEGORY.get(feature.category)
|
|
) is None:
|
|
_LOGGER.error(
|
|
"Unhandled category %s, fallback to DIAGNOSTIC", feature.category
|
|
)
|
|
entity_category = EntityCategory.DIAGNOSTIC
|
|
|
|
return entity_category
|
|
|
|
@classmethod
|
|
def _description_for_feature[_D: EntityDescription](
|
|
cls,
|
|
feature: Feature,
|
|
descriptions: Mapping[str, _D],
|
|
*,
|
|
device: Device,
|
|
parent: Device | None = None,
|
|
) -> _D | None:
|
|
"""Return description object for the given feature.
|
|
|
|
This is responsible for setting the common parameters & deciding
|
|
based on feature id which additional parameters are passed.
|
|
"""
|
|
|
|
if descriptions and (desc := descriptions.get(feature.id)):
|
|
translation_key: str | None = feature.id
|
|
# HA logic is to name entities based on the following logic:
|
|
# _attr_name > translation.name > description.name
|
|
# > device_class (if base platform supports).
|
|
name: str | None | UndefinedType = UNDEFINED
|
|
|
|
# The state feature gets the device name or the child device
|
|
# name if it's a child device
|
|
if feature.id == PRIMARY_STATE_ID:
|
|
translation_key = None
|
|
# if None will use device name
|
|
name = get_device_name(device, parent=parent) if parent else None
|
|
|
|
return replace(
|
|
desc,
|
|
translation_key=translation_key,
|
|
name=name, # if undefined will use translation key
|
|
entity_category=cls._category_for_feature(feature),
|
|
# enabled_default can be overridden to False in the description
|
|
entity_registry_enabled_default=feature.category
|
|
is not Feature.Category.Debug
|
|
and desc.entity_registry_enabled_default,
|
|
)
|
|
|
|
_LOGGER.debug(
|
|
"Device feature: %s (%s) needs an entity description defined in HA",
|
|
feature.name,
|
|
feature.id,
|
|
)
|
|
return None
|
|
|
|
@classmethod
|
|
def _entities_for_device[
|
|
_E: CoordinatedTPLinkFeatureEntity,
|
|
_D: TPLinkFeatureEntityDescription,
|
|
](
|
|
cls,
|
|
hass: HomeAssistant,
|
|
device: Device,
|
|
coordinator: TPLinkDataUpdateCoordinator,
|
|
*,
|
|
feature_type: Feature.Type,
|
|
entity_class: type[_E],
|
|
descriptions: Mapping[str, _D],
|
|
parent: Device | None = None,
|
|
) -> list[_E]:
|
|
"""Return a list of entities to add.
|
|
|
|
This filters out unwanted features to avoid creating unnecessary entities
|
|
for device features that are implemented by specialized platforms like light.
|
|
"""
|
|
entities: list[_E] = [
|
|
entity_class(
|
|
device,
|
|
coordinator,
|
|
feature=feat,
|
|
description=desc,
|
|
parent=parent,
|
|
)
|
|
for feat in device.features.values()
|
|
if feat.type == feature_type
|
|
and feat.id not in EXCLUDED_FEATURES
|
|
and (
|
|
feat.category is not Feature.Category.Primary
|
|
or device.device_type not in DEVICETYPES_WITH_SPECIALIZED_PLATFORMS
|
|
or feat.id in FEATURES_ALLOW_LIST
|
|
)
|
|
and (
|
|
desc := cls._description_for_feature(
|
|
feat, descriptions, device=device, parent=parent
|
|
)
|
|
)
|
|
and async_check_create_deprecated(
|
|
hass,
|
|
cls._get_feature_unique_id(device, desc),
|
|
desc,
|
|
)
|
|
]
|
|
return entities
|
|
|
|
@classmethod
|
|
def entities_for_device_and_its_children[
|
|
_E: CoordinatedTPLinkFeatureEntity,
|
|
_D: TPLinkFeatureEntityDescription,
|
|
](
|
|
cls,
|
|
hass: HomeAssistant,
|
|
device: Device,
|
|
coordinator: TPLinkDataUpdateCoordinator,
|
|
*,
|
|
feature_type: Feature.Type,
|
|
entity_class: type[_E],
|
|
descriptions: Mapping[str, _D],
|
|
child_coordinators: list[TPLinkDataUpdateCoordinator] | None = None,
|
|
) -> list[_E]:
|
|
"""Create entities for device and its children.
|
|
|
|
This is a helper that calls *_entities_for_device* for the device and its children.
|
|
"""
|
|
entities: list[_E] = []
|
|
# Add parent entities before children so via_device id works.
|
|
entities.extend(
|
|
cls._entities_for_device(
|
|
hass,
|
|
device,
|
|
coordinator=coordinator,
|
|
feature_type=feature_type,
|
|
entity_class=entity_class,
|
|
descriptions=descriptions,
|
|
)
|
|
)
|
|
if device.children:
|
|
_LOGGER.debug("Initializing device with %s children", len(device.children))
|
|
for idx, child in enumerate(device.children):
|
|
# HS300 does not like too many concurrent requests and its
|
|
# emeter data requires a request for each socket, so we receive
|
|
# separate coordinators.
|
|
if child_coordinators:
|
|
child_coordinator = child_coordinators[idx]
|
|
else:
|
|
child_coordinator = coordinator
|
|
entities.extend(
|
|
cls._entities_for_device(
|
|
hass,
|
|
child,
|
|
coordinator=child_coordinator,
|
|
feature_type=feature_type,
|
|
entity_class=entity_class,
|
|
descriptions=descriptions,
|
|
parent=device,
|
|
)
|
|
)
|
|
|
|
return entities
|