mirror of https://github.com/home-assistant/core
453 lines
15 KiB
Python
453 lines
15 KiB
Python
"""Shared Entity definition for UniFi Protect Integration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Callable, Coroutine, Sequence
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from functools import partial
|
|
import logging
|
|
from operator import attrgetter
|
|
from typing import TYPE_CHECKING, Any, Generic, TypeVar
|
|
|
|
from uiprotect import make_enabled_getter, make_required_getter, make_value_getter
|
|
from uiprotect.data import (
|
|
NVR,
|
|
Event,
|
|
ModelType,
|
|
ProtectAdoptableDeviceModel,
|
|
SmartDetectObjectType,
|
|
StateType,
|
|
)
|
|
|
|
from homeassistant.core import callback
|
|
import homeassistant.helpers.device_registry as dr
|
|
from homeassistant.helpers.device_registry import DeviceInfo
|
|
from homeassistant.helpers.entity import Entity, EntityDescription
|
|
|
|
from .const import (
|
|
ATTR_EVENT_ID,
|
|
ATTR_EVENT_SCORE,
|
|
DEFAULT_ATTRIBUTION,
|
|
DEFAULT_BRAND,
|
|
DOMAIN,
|
|
)
|
|
from .data import ProtectData, ProtectDeviceType
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
T = TypeVar("T", bound=ProtectAdoptableDeviceModel | NVR)
|
|
|
|
|
|
class PermRequired(int, Enum):
|
|
"""Type of permission level required for entity."""
|
|
|
|
NO_WRITE = 1
|
|
WRITE = 2
|
|
DELETE = 3
|
|
|
|
|
|
@callback
|
|
def _async_device_entities(
|
|
data: ProtectData,
|
|
klass: type[BaseProtectEntity],
|
|
model_type: ModelType,
|
|
descs: Sequence[ProtectEntityDescription],
|
|
unadopted_descs: Sequence[ProtectEntityDescription] | None = None,
|
|
ufp_device: ProtectAdoptableDeviceModel | None = None,
|
|
) -> list[BaseProtectEntity]:
|
|
if not descs and not unadopted_descs:
|
|
return []
|
|
|
|
entities: list[BaseProtectEntity] = []
|
|
devices = (
|
|
[ufp_device]
|
|
if ufp_device is not None
|
|
else data.get_by_types({model_type}, ignore_unadopted=False)
|
|
)
|
|
auth_user = data.api.bootstrap.auth_user
|
|
for device in devices:
|
|
if TYPE_CHECKING:
|
|
assert isinstance(device, ProtectAdoptableDeviceModel)
|
|
if not device.is_adopted_by_us:
|
|
if unadopted_descs:
|
|
for description in unadopted_descs:
|
|
entities.append(
|
|
klass(
|
|
data,
|
|
device=device,
|
|
description=description,
|
|
)
|
|
)
|
|
_LOGGER.debug(
|
|
"Adding %s entity %s for %s",
|
|
klass.__name__,
|
|
description.name,
|
|
device.display_name,
|
|
)
|
|
continue
|
|
|
|
can_write = device.can_write(auth_user)
|
|
for description in descs:
|
|
if (perms := description.ufp_perm) is not None:
|
|
if perms is PermRequired.WRITE and not can_write:
|
|
continue
|
|
if perms is PermRequired.NO_WRITE and can_write:
|
|
continue
|
|
if perms is PermRequired.DELETE and not device.can_delete(auth_user):
|
|
continue
|
|
|
|
if not description.has_required(device):
|
|
continue
|
|
|
|
entities.append(
|
|
klass(
|
|
data,
|
|
device=device,
|
|
description=description,
|
|
)
|
|
)
|
|
_LOGGER.debug(
|
|
"Adding %s entity %s for %s",
|
|
klass.__name__,
|
|
description.name,
|
|
device.display_name,
|
|
)
|
|
|
|
return entities
|
|
|
|
|
|
_ALL_MODEL_TYPES = (
|
|
ModelType.CAMERA,
|
|
ModelType.LIGHT,
|
|
ModelType.SENSOR,
|
|
ModelType.VIEWPORT,
|
|
ModelType.DOORLOCK,
|
|
ModelType.CHIME,
|
|
)
|
|
|
|
|
|
@callback
|
|
def _combine_model_descs(
|
|
model_type: ModelType,
|
|
model_descriptions: dict[ModelType, Sequence[ProtectEntityDescription]] | None,
|
|
all_descs: Sequence[ProtectEntityDescription] | None,
|
|
) -> list[ProtectEntityDescription]:
|
|
"""Combine all the descriptions with descriptions a model type."""
|
|
descs: list[ProtectEntityDescription] = list(all_descs) if all_descs else []
|
|
if model_descriptions and (model_descs := model_descriptions.get(model_type)):
|
|
descs.extend(model_descs)
|
|
return descs
|
|
|
|
|
|
@callback
|
|
def async_all_device_entities(
|
|
data: ProtectData,
|
|
klass: type[BaseProtectEntity],
|
|
model_descriptions: dict[ModelType, Sequence[ProtectEntityDescription]]
|
|
| None = None,
|
|
all_descs: Sequence[ProtectEntityDescription] | None = None,
|
|
unadopted_descs: list[ProtectEntityDescription] | None = None,
|
|
ufp_device: ProtectAdoptableDeviceModel | None = None,
|
|
) -> list[BaseProtectEntity]:
|
|
"""Generate a list of all the device entities."""
|
|
if ufp_device is None:
|
|
entities: list[BaseProtectEntity] = []
|
|
for model_type in _ALL_MODEL_TYPES:
|
|
descs = _combine_model_descs(model_type, model_descriptions, all_descs)
|
|
entities.extend(
|
|
_async_device_entities(data, klass, model_type, descs, unadopted_descs)
|
|
)
|
|
return entities
|
|
|
|
device_model_type = ufp_device.model
|
|
assert device_model_type is not None
|
|
descs = _combine_model_descs(device_model_type, model_descriptions, all_descs)
|
|
return _async_device_entities(
|
|
data, klass, device_model_type, descs, unadopted_descs, ufp_device
|
|
)
|
|
|
|
|
|
class BaseProtectEntity(Entity):
|
|
"""Base class for UniFi protect entities."""
|
|
|
|
device: ProtectDeviceType
|
|
|
|
_attr_should_poll = False
|
|
_attr_attribution = DEFAULT_ATTRIBUTION
|
|
_state_attrs: tuple[str, ...] = ("_attr_available",)
|
|
_attr_has_entity_name = True
|
|
_async_get_ufp_enabled: Callable[[ProtectAdoptableDeviceModel], bool] | None = None
|
|
|
|
def __init__(
|
|
self,
|
|
data: ProtectData,
|
|
device: ProtectDeviceType,
|
|
description: EntityDescription | None = None,
|
|
) -> None:
|
|
"""Initialize the entity."""
|
|
super().__init__()
|
|
self.data = data
|
|
self.device = device
|
|
|
|
if description is None:
|
|
self._attr_unique_id = self.device.mac
|
|
self._attr_name = None
|
|
else:
|
|
self.entity_description = description
|
|
self._attr_unique_id = f"{self.device.mac}_{description.key}"
|
|
if isinstance(description, ProtectEntityDescription):
|
|
self._async_get_ufp_enabled = description.get_ufp_enabled
|
|
|
|
self._async_set_device_info()
|
|
self._state_getters = tuple(
|
|
partial(attrgetter(attr), self) for attr in self._state_attrs
|
|
)
|
|
|
|
async def async_update(self) -> None:
|
|
"""Update the entity.
|
|
|
|
Only used by the generic entity update service.
|
|
"""
|
|
await self.data.async_refresh()
|
|
|
|
@callback
|
|
def _async_set_device_info(self) -> None:
|
|
"""Set device info."""
|
|
|
|
@callback
|
|
def _async_update_device_from_protect(self, device: ProtectDeviceType) -> None:
|
|
"""Update Entity object from Protect device."""
|
|
was_available = self._attr_available
|
|
if last_updated_success := self.data.last_update_success:
|
|
self.device = device
|
|
|
|
if device.model is ModelType.NVR:
|
|
available = last_updated_success
|
|
else:
|
|
if TYPE_CHECKING:
|
|
assert isinstance(device, ProtectAdoptableDeviceModel)
|
|
connected = device.state is StateType.CONNECTED or (
|
|
not device.is_adopted_by_us and device.can_adopt
|
|
)
|
|
async_get_ufp_enabled = self._async_get_ufp_enabled
|
|
enabled = not async_get_ufp_enabled or async_get_ufp_enabled(device)
|
|
available = last_updated_success and connected and enabled
|
|
|
|
if available != was_available:
|
|
self._attr_available = available
|
|
|
|
@callback
|
|
def _async_updated_event(self, device: ProtectDeviceType) -> None:
|
|
"""When device is updated from Protect."""
|
|
previous_attrs = [getter() for getter in self._state_getters]
|
|
self._async_update_device_from_protect(device)
|
|
changed = False
|
|
for idx, getter in enumerate(self._state_getters):
|
|
if previous_attrs[idx] != getter():
|
|
changed = True
|
|
break
|
|
|
|
if changed:
|
|
if _LOGGER.isEnabledFor(logging.DEBUG):
|
|
device_name = device.name or ""
|
|
if hasattr(self, "entity_description") and self.entity_description.name:
|
|
device_name += f" {self.entity_description.name}"
|
|
|
|
_LOGGER.debug(
|
|
"Updating state [%s (%s)] %s -> %s",
|
|
device_name,
|
|
device.mac,
|
|
previous_attrs,
|
|
tuple((getattr(self, attr)) for attr in self._state_attrs),
|
|
)
|
|
self.async_write_ha_state()
|
|
|
|
async def async_added_to_hass(self) -> None:
|
|
"""When entity is added to hass."""
|
|
await super().async_added_to_hass()
|
|
self.async_on_remove(
|
|
self.data.async_subscribe(self.device.mac, self._async_updated_event)
|
|
)
|
|
self._async_update_device_from_protect(self.device)
|
|
|
|
|
|
class ProtectIsOnEntity(BaseProtectEntity):
|
|
"""Base class for entities with is_on property."""
|
|
|
|
_state_attrs: tuple[str, ...] = ("_attr_available", "_attr_is_on")
|
|
_attr_is_on: bool | None
|
|
entity_description: ProtectEntityDescription
|
|
|
|
def _async_update_device_from_protect(
|
|
self, device: ProtectAdoptableDeviceModel | NVR
|
|
) -> None:
|
|
super()._async_update_device_from_protect(device)
|
|
was_on = self._attr_is_on
|
|
if was_on != (is_on := self.entity_description.get_ufp_value(device) is True):
|
|
self._attr_is_on = is_on
|
|
|
|
|
|
class ProtectDeviceEntity(BaseProtectEntity):
|
|
"""Base class for UniFi protect entities."""
|
|
|
|
@callback
|
|
def _async_set_device_info(self) -> None:
|
|
self._attr_device_info = DeviceInfo(
|
|
name=self.device.display_name,
|
|
manufacturer=DEFAULT_BRAND,
|
|
model=self.device.market_name or self.device.type,
|
|
model_id=self.device.type,
|
|
via_device=(DOMAIN, self.data.api.bootstrap.nvr.mac),
|
|
sw_version=self.device.firmware_version,
|
|
connections={(dr.CONNECTION_NETWORK_MAC, self.device.mac)},
|
|
configuration_url=self.device.protect_url,
|
|
)
|
|
|
|
|
|
class ProtectNVREntity(BaseProtectEntity):
|
|
"""Base class for unifi protect entities."""
|
|
|
|
device: NVR
|
|
|
|
@callback
|
|
def _async_set_device_info(self) -> None:
|
|
self._attr_device_info = DeviceInfo(
|
|
connections={(dr.CONNECTION_NETWORK_MAC, self.device.mac)},
|
|
identifiers={(DOMAIN, self.device.mac)},
|
|
manufacturer=DEFAULT_BRAND,
|
|
name=self.device.display_name,
|
|
model=self.device.type,
|
|
sw_version=str(self.device.version),
|
|
configuration_url=self.device.api.base_url,
|
|
)
|
|
|
|
|
|
class EventEntityMixin(ProtectDeviceEntity):
|
|
"""Adds motion event attributes to sensor."""
|
|
|
|
entity_description: ProtectEventMixin
|
|
_unrecorded_attributes = frozenset({ATTR_EVENT_ID, ATTR_EVENT_SCORE})
|
|
_event: Event | None = None
|
|
_event_end: datetime | None = None
|
|
|
|
@callback
|
|
def _set_event_done(self) -> None:
|
|
"""Clear the event and state."""
|
|
|
|
@callback
|
|
def _set_event_attrs(self, event: Event) -> None:
|
|
"""Set event attrs."""
|
|
self._attr_extra_state_attributes = {
|
|
ATTR_EVENT_ID: event.id,
|
|
ATTR_EVENT_SCORE: event.score,
|
|
}
|
|
|
|
@callback
|
|
def _async_event_with_immediate_end(self) -> None:
|
|
# If the event is so short that the detection is received
|
|
# in the same message as the end of the event we need to write
|
|
# state and than clear the event and write state again.
|
|
self.async_write_ha_state()
|
|
self._set_event_done()
|
|
self.async_write_ha_state()
|
|
|
|
@callback
|
|
def _event_already_ended(
|
|
self, prev_event: Event | None, prev_event_end: datetime | None
|
|
) -> bool:
|
|
"""Determine if the event has already ended.
|
|
|
|
The event_end time is passed because the prev_event and event object
|
|
may be the same object, and the uiprotect code will mutate the
|
|
event object so we need to check the datetime object that was
|
|
saved from the last time the entity was updated.
|
|
"""
|
|
return bool(
|
|
(event := self._event)
|
|
and event.end
|
|
and prev_event
|
|
and prev_event_end
|
|
and prev_event.id == event.id
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True, kw_only=True)
|
|
class ProtectEntityDescription(EntityDescription, Generic[T]):
|
|
"""Base class for protect entity descriptions."""
|
|
|
|
ufp_required_field: str | None = None
|
|
ufp_value: str | None = None
|
|
ufp_value_fn: Callable[[T], Any] | None = None
|
|
ufp_enabled: str | None = None
|
|
ufp_perm: PermRequired | None = None
|
|
|
|
# The below are set in __post_init__
|
|
has_required: Callable[[T], bool] = bool
|
|
get_ufp_enabled: Callable[[T], bool] | None = None
|
|
|
|
def get_ufp_value(self, obj: T) -> Any:
|
|
"""Return value from UniFi Protect device; overridden in __post_init__."""
|
|
# ufp_value or ufp_value_fn are required, the
|
|
# RuntimeError is to catch any issues in the code
|
|
# with new descriptions.
|
|
raise RuntimeError( # pragma: no cover
|
|
f"`ufp_value` or `ufp_value_fn` is required for {self}"
|
|
)
|
|
|
|
def __post_init__(self) -> None:
|
|
"""Override get_ufp_value, has_required, and get_ufp_enabled if required."""
|
|
_setter = partial(object.__setattr__, self)
|
|
|
|
if (ufp_value := self.ufp_value) is not None:
|
|
_setter("get_ufp_value", make_value_getter(ufp_value))
|
|
elif (ufp_value_fn := self.ufp_value_fn) is not None:
|
|
_setter("get_ufp_value", ufp_value_fn)
|
|
|
|
if (ufp_enabled := self.ufp_enabled) is not None:
|
|
_setter("get_ufp_enabled", make_enabled_getter(ufp_enabled))
|
|
|
|
if (ufp_required_field := self.ufp_required_field) is not None:
|
|
_setter("has_required", make_required_getter(ufp_required_field))
|
|
|
|
|
|
@dataclass(frozen=True, kw_only=True)
|
|
class ProtectEventMixin(ProtectEntityDescription[T]):
|
|
"""Mixin for events."""
|
|
|
|
ufp_event_obj: str | None = None
|
|
ufp_obj_type: SmartDetectObjectType | None = None
|
|
|
|
def get_event_obj(self, obj: T) -> Event | None:
|
|
"""Return value from UniFi Protect device."""
|
|
return None
|
|
|
|
def has_matching_smart(self, event: Event) -> bool:
|
|
"""Determine if the detection type is a match."""
|
|
return (
|
|
not (obj_type := self.ufp_obj_type) or obj_type in event.smart_detect_types
|
|
)
|
|
|
|
def __post_init__(self) -> None:
|
|
"""Override get_event_obj if ufp_event_obj is set."""
|
|
if (_ufp_event_obj := self.ufp_event_obj) is not None:
|
|
object.__setattr__(self, "get_event_obj", attrgetter(_ufp_event_obj))
|
|
super().__post_init__()
|
|
|
|
|
|
@dataclass(frozen=True, kw_only=True)
|
|
class ProtectSetableKeysMixin(ProtectEntityDescription[T]):
|
|
"""Mixin for settable values."""
|
|
|
|
ufp_set_method: str | None = None
|
|
ufp_set_method_fn: Callable[[T, Any], Coroutine[Any, Any, None]] | None = None
|
|
|
|
async def ufp_set(self, obj: T, value: Any) -> None:
|
|
"""Set value for UniFi Protect device."""
|
|
_LOGGER.debug("Setting %s to %s for %s", self.name, value, obj.display_name)
|
|
if self.ufp_set_method is not None:
|
|
await getattr(obj, self.ufp_set_method)(value)
|
|
elif self.ufp_set_method_fn is not None:
|
|
await self.ufp_set_method_fn(obj, value)
|