mirror of https://github.com/home-assistant/core
1715 lines
61 KiB
Python
1715 lines
61 KiB
Python
"""An abstract class for entities."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from abc import ABCMeta
|
|
import asyncio
|
|
from collections import deque
|
|
from collections.abc import Callable, Coroutine, Iterable, Mapping
|
|
import dataclasses
|
|
from enum import Enum, IntFlag, auto
|
|
import functools as ft
|
|
import logging
|
|
import math
|
|
from operator import attrgetter
|
|
import sys
|
|
import threading
|
|
import time
|
|
from types import FunctionType
|
|
from typing import TYPE_CHECKING, Any, Final, Literal, NotRequired, TypedDict, final
|
|
|
|
from propcache import cached_property
|
|
import voluptuous as vol
|
|
|
|
from homeassistant.const import (
|
|
ATTR_ASSUMED_STATE,
|
|
ATTR_ATTRIBUTION,
|
|
ATTR_DEVICE_CLASS,
|
|
ATTR_ENTITY_PICTURE,
|
|
ATTR_FRIENDLY_NAME,
|
|
ATTR_ICON,
|
|
ATTR_SUPPORTED_FEATURES,
|
|
ATTR_UNIT_OF_MEASUREMENT,
|
|
DEVICE_DEFAULT_NAME,
|
|
STATE_OFF,
|
|
STATE_ON,
|
|
STATE_UNAVAILABLE,
|
|
STATE_UNKNOWN,
|
|
EntityCategory,
|
|
)
|
|
from homeassistant.core import (
|
|
CALLBACK_TYPE,
|
|
Context,
|
|
Event,
|
|
HassJobType,
|
|
HomeAssistant,
|
|
ReleaseChannel,
|
|
callback,
|
|
get_hassjob_callable_job_type,
|
|
get_release_channel,
|
|
)
|
|
from homeassistant.core_config import DATA_CUSTOMIZE
|
|
from homeassistant.exceptions import (
|
|
HomeAssistantError,
|
|
InvalidStateError,
|
|
NoEntitySpecifiedError,
|
|
)
|
|
from homeassistant.loader import async_suggest_report_issue, bind_hass
|
|
from homeassistant.util import ensure_unique_string, slugify
|
|
from homeassistant.util.frozen_dataclass_compat import FrozenOrThawed
|
|
|
|
from . import device_registry as dr, entity_registry as er, singleton
|
|
from .device_registry import DeviceInfo, EventDeviceRegistryUpdatedData
|
|
from .event import (
|
|
async_track_device_registry_updated_event,
|
|
async_track_entity_registry_updated_event,
|
|
)
|
|
from .frame import report_non_thread_safe_operation
|
|
from .typing import UNDEFINED, StateType, UndefinedType
|
|
|
|
timer = time.time
|
|
|
|
if TYPE_CHECKING:
|
|
from .entity_platform import EntityPlatform
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
SLOW_UPDATE_WARNING = 10
|
|
DATA_ENTITY_SOURCE = "entity_info"
|
|
|
|
# Used when converting float states to string: limit precision according to machine
|
|
# epsilon to make the string representation readable
|
|
FLOAT_PRECISION = abs(int(math.floor(math.log10(abs(sys.float_info.epsilon))))) - 1
|
|
|
|
# How many times per hour we allow capabilities to be updated before logging a warning
|
|
CAPABILITIES_UPDATE_LIMIT = 100
|
|
|
|
CONTEXT_RECENT_TIME_SECONDS = 5 # Time that a context is considered recent
|
|
|
|
|
|
@callback
|
|
def async_setup(hass: HomeAssistant) -> None:
|
|
"""Set up entity sources."""
|
|
entity_sources(hass)
|
|
|
|
|
|
@callback
|
|
@bind_hass
|
|
@singleton.singleton(DATA_ENTITY_SOURCE)
|
|
def entity_sources(hass: HomeAssistant) -> dict[str, EntityInfo]:
|
|
"""Get the entity sources."""
|
|
return {}
|
|
|
|
|
|
def generate_entity_id(
|
|
entity_id_format: str,
|
|
name: str | None,
|
|
current_ids: list[str] | None = None,
|
|
hass: HomeAssistant | None = None,
|
|
) -> str:
|
|
"""Generate a unique entity ID based on given entity IDs or used IDs."""
|
|
return async_generate_entity_id(entity_id_format, name, current_ids, hass)
|
|
|
|
|
|
@callback
|
|
def async_generate_entity_id(
|
|
entity_id_format: str,
|
|
name: str | None,
|
|
current_ids: Iterable[str] | None = None,
|
|
hass: HomeAssistant | None = None,
|
|
) -> str:
|
|
"""Generate a unique entity ID based on given entity IDs or used IDs."""
|
|
name = (name or DEVICE_DEFAULT_NAME).lower()
|
|
preferred_string = entity_id_format.format(slugify(name))
|
|
|
|
if current_ids is not None:
|
|
return ensure_unique_string(preferred_string, current_ids)
|
|
|
|
if hass is None:
|
|
raise ValueError("Missing required parameter current_ids or hass")
|
|
|
|
test_string = preferred_string
|
|
tries = 1
|
|
while not hass.states.async_available(test_string):
|
|
tries += 1
|
|
test_string = f"{preferred_string}_{tries}"
|
|
|
|
return test_string
|
|
|
|
|
|
def get_capability(hass: HomeAssistant, entity_id: str, capability: str) -> Any | None:
|
|
"""Get a capability attribute of an entity.
|
|
|
|
First try the statemachine, then entity registry.
|
|
"""
|
|
if state := hass.states.get(entity_id):
|
|
return state.attributes.get(capability)
|
|
|
|
entity_registry = er.async_get(hass)
|
|
if not (entry := entity_registry.async_get(entity_id)):
|
|
raise HomeAssistantError(f"Unknown entity {entity_id}")
|
|
|
|
return entry.capabilities.get(capability) if entry.capabilities else None
|
|
|
|
|
|
def get_device_class(hass: HomeAssistant, entity_id: str) -> str | None:
|
|
"""Get device class of an entity.
|
|
|
|
First try the statemachine, then entity registry.
|
|
"""
|
|
if state := hass.states.get(entity_id):
|
|
return state.attributes.get(ATTR_DEVICE_CLASS)
|
|
|
|
entity_registry = er.async_get(hass)
|
|
if not (entry := entity_registry.async_get(entity_id)):
|
|
raise HomeAssistantError(f"Unknown entity {entity_id}")
|
|
|
|
return entry.device_class or entry.original_device_class
|
|
|
|
|
|
def get_supported_features(hass: HomeAssistant, entity_id: str) -> int:
|
|
"""Get supported features for an entity.
|
|
|
|
First try the statemachine, then entity registry.
|
|
"""
|
|
if state := hass.states.get(entity_id):
|
|
return state.attributes.get(ATTR_SUPPORTED_FEATURES, 0) # type: ignore[no-any-return]
|
|
|
|
entity_registry = er.async_get(hass)
|
|
if not (entry := entity_registry.async_get(entity_id)):
|
|
raise HomeAssistantError(f"Unknown entity {entity_id}")
|
|
|
|
return entry.supported_features or 0
|
|
|
|
|
|
def get_unit_of_measurement(hass: HomeAssistant, entity_id: str) -> str | None:
|
|
"""Get unit of measurement of an entity.
|
|
|
|
First try the statemachine, then entity registry.
|
|
"""
|
|
if state := hass.states.get(entity_id):
|
|
return state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
|
|
|
|
entity_registry = er.async_get(hass)
|
|
if not (entry := entity_registry.async_get(entity_id)):
|
|
raise HomeAssistantError(f"Unknown entity {entity_id}")
|
|
|
|
return entry.unit_of_measurement
|
|
|
|
|
|
ENTITY_CATEGORIES_SCHEMA: Final = vol.Coerce(EntityCategory)
|
|
|
|
|
|
class EntityInfo(TypedDict):
|
|
"""Entity info."""
|
|
|
|
domain: str
|
|
custom_component: bool
|
|
config_entry: NotRequired[str]
|
|
|
|
|
|
class StateInfo(TypedDict):
|
|
"""State info."""
|
|
|
|
unrecorded_attributes: frozenset[str]
|
|
|
|
|
|
class EntityPlatformState(Enum):
|
|
"""The platform state of an entity."""
|
|
|
|
# Not Added: Not yet added to a platform, polling updates
|
|
# are written to the state machine.
|
|
NOT_ADDED = auto()
|
|
|
|
# Added: Added to a platform, polling updates
|
|
# are written to the state machine.
|
|
ADDED = auto()
|
|
|
|
# Removed: Removed from a platform, polling updates
|
|
# are not written to the state machine.
|
|
REMOVED = auto()
|
|
|
|
|
|
_SENTINEL = object()
|
|
|
|
|
|
class EntityDescription(metaclass=FrozenOrThawed, frozen_or_thawed=True):
|
|
"""A class that describes Home Assistant entities."""
|
|
|
|
# This is the key identifier for this entity
|
|
key: str
|
|
|
|
device_class: str | None = None
|
|
entity_category: EntityCategory | None = None
|
|
entity_registry_enabled_default: bool = True
|
|
entity_registry_visible_default: bool = True
|
|
force_update: bool = False
|
|
icon: str | None = None
|
|
has_entity_name: bool = False
|
|
name: str | UndefinedType | None = UNDEFINED
|
|
translation_key: str | None = None
|
|
translation_placeholders: Mapping[str, str] | None = None
|
|
unit_of_measurement: str | None = None
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True, slots=True)
|
|
class CalculatedState:
|
|
"""Container with state and attributes.
|
|
|
|
Returned by Entity._async_calculate_state.
|
|
"""
|
|
|
|
state: str
|
|
# The union of all attributes, after overriding with entity registry settings
|
|
attributes: dict[str, Any]
|
|
# Capability attributes returned by the capability_attributes property
|
|
capability_attributes: Mapping[str, Any] | None
|
|
|
|
|
|
class CachedProperties(type):
|
|
"""Metaclass which invalidates cached entity properties on write to _attr_.
|
|
|
|
A class which has CachedProperties can optionally have a list of cached
|
|
properties, passed as cached_properties, which must be a set of strings.
|
|
- Each item in the cached_property set must be the name of a method decorated
|
|
with @cached_property
|
|
- For each item in the cached_property set, a property function with the
|
|
same name, prefixed with _attr_, will be created
|
|
- The property _attr_-property functions allow setting, getting and deleting
|
|
data, which will be stored in an attribute prefixed with __attr_
|
|
- The _attr_-property setter will invalidate the @cached_property by calling
|
|
delattr on it
|
|
"""
|
|
|
|
def __new__(
|
|
mcs, # noqa: N804 ruff bug, ruff does not understand this is a metaclass
|
|
name: str,
|
|
bases: tuple[type, ...],
|
|
namespace: dict[Any, Any],
|
|
cached_properties: set[str] | None = None,
|
|
**kwargs: Any,
|
|
) -> Any:
|
|
"""Start creating a new CachedProperties.
|
|
|
|
Pop cached_properties and store it in the namespace.
|
|
"""
|
|
namespace["_CachedProperties__cached_properties"] = cached_properties or set()
|
|
return super().__new__(mcs, name, bases, namespace, **kwargs)
|
|
|
|
def __init__(
|
|
cls,
|
|
name: str,
|
|
bases: tuple[type, ...],
|
|
namespace: dict[Any, Any],
|
|
**kwargs: Any,
|
|
) -> None:
|
|
"""Finish creating a new CachedProperties.
|
|
|
|
Wrap _attr_ for cached properties in property objects.
|
|
"""
|
|
|
|
def deleter(name: str) -> Callable[[Any], None]:
|
|
"""Create a deleter for an _attr_ property."""
|
|
private_attr_name = f"__attr_{name}"
|
|
|
|
def _deleter(o: Any) -> None:
|
|
"""Delete an _attr_ property.
|
|
|
|
Does two things:
|
|
- Delete the __attr_ attribute
|
|
- Invalidate the cache of the cached property
|
|
|
|
Raises AttributeError if the __attr_ attribute does not exist
|
|
"""
|
|
# Invalidate the cache of the cached property
|
|
o.__dict__.pop(name, None)
|
|
# Delete the __attr_ attribute
|
|
delattr(o, private_attr_name)
|
|
|
|
return _deleter
|
|
|
|
def setter(name: str) -> Callable[[Any, Any], None]:
|
|
"""Create a setter for an _attr_ property."""
|
|
private_attr_name = f"__attr_{name}"
|
|
|
|
def _setter(o: Any, val: Any) -> None:
|
|
"""Set an _attr_ property to the backing __attr attribute.
|
|
|
|
Also invalidates the corresponding cached_property by calling
|
|
delattr on it.
|
|
"""
|
|
if (
|
|
old_val := getattr(o, private_attr_name, _SENTINEL)
|
|
) == val and type(old_val) is type(val):
|
|
return
|
|
setattr(o, private_attr_name, val)
|
|
# Invalidate the cache of the cached property
|
|
o.__dict__.pop(name, None)
|
|
|
|
return _setter
|
|
|
|
def make_property(name: str) -> property:
|
|
"""Help create a property object."""
|
|
return property(
|
|
fget=attrgetter(f"__attr_{name}"), fset=setter(name), fdel=deleter(name)
|
|
)
|
|
|
|
def wrap_attr(cls: CachedProperties, property_name: str) -> None:
|
|
"""Wrap a cached property's corresponding _attr in a property.
|
|
|
|
If the class being created has an _attr class attribute, move it, and its
|
|
annotations, to the __attr attribute.
|
|
"""
|
|
attr_name = f"_attr_{property_name}"
|
|
private_attr_name = f"__attr_{property_name}"
|
|
# Check if an _attr_ class attribute exits and move it to __attr_. We check
|
|
# __dict__ here because we don't care about _attr_ class attributes in parents.
|
|
if attr_name in cls.__dict__:
|
|
attr = getattr(cls, attr_name)
|
|
if isinstance(attr, (FunctionType, property)):
|
|
raise TypeError(f"Can't override {attr_name} in subclass")
|
|
setattr(cls, private_attr_name, attr)
|
|
annotations = cls.__annotations__
|
|
if attr_name in annotations:
|
|
annotations[private_attr_name] = annotations.pop(attr_name)
|
|
# Create the _attr_ property
|
|
setattr(cls, attr_name, make_property(property_name))
|
|
|
|
cached_properties: set[str] = namespace["_CachedProperties__cached_properties"]
|
|
seen_props: set[str] = set() # Keep track of properties which have been handled
|
|
for property_name in cached_properties:
|
|
wrap_attr(cls, property_name)
|
|
seen_props.add(property_name)
|
|
|
|
# Look for cached properties of parent classes where this class has
|
|
# corresponding _attr_ class attributes and re-wrap them.
|
|
for parent in cls.__mro__[:0:-1]:
|
|
if "_CachedProperties__cached_properties" not in parent.__dict__:
|
|
continue
|
|
cached_properties = getattr(parent, "_CachedProperties__cached_properties")
|
|
for property_name in cached_properties:
|
|
if property_name in seen_props:
|
|
continue
|
|
attr_name = f"_attr_{property_name}"
|
|
# Check if an _attr_ class attribute exits. We check __dict__ here because
|
|
# we don't care about _attr_ class attributes in parents.
|
|
if (attr_name) not in cls.__dict__:
|
|
continue
|
|
wrap_attr(cls, property_name)
|
|
seen_props.add(property_name)
|
|
|
|
|
|
class ABCCachedProperties(CachedProperties, ABCMeta):
|
|
"""Add ABCMeta to CachedProperties."""
|
|
|
|
|
|
CACHED_PROPERTIES_WITH_ATTR_ = {
|
|
"assumed_state",
|
|
"attribution",
|
|
"available",
|
|
"capability_attributes",
|
|
"device_class",
|
|
"device_info",
|
|
"entity_category",
|
|
"has_entity_name",
|
|
"entity_picture",
|
|
"entity_registry_enabled_default",
|
|
"entity_registry_visible_default",
|
|
"extra_state_attributes",
|
|
"force_update",
|
|
"icon",
|
|
"name",
|
|
"should_poll",
|
|
"state",
|
|
"supported_features",
|
|
"translation_key",
|
|
"translation_placeholders",
|
|
"unique_id",
|
|
"unit_of_measurement",
|
|
}
|
|
|
|
|
|
class Entity(
|
|
metaclass=ABCCachedProperties, cached_properties=CACHED_PROPERTIES_WITH_ATTR_
|
|
):
|
|
"""An abstract class for Home Assistant entities."""
|
|
|
|
# SAFE TO OVERWRITE
|
|
# The properties and methods here are safe to overwrite when inheriting
|
|
# this class. These may be used to customize the behavior of the entity.
|
|
entity_id: str = None # type: ignore[assignment]
|
|
|
|
# Owning hass instance. Set by EntityPlatform by calling add_to_platform_start
|
|
# While not purely typed, it makes typehinting more useful for us
|
|
# and removes the need for constant None checks or asserts.
|
|
hass: HomeAssistant = None # type: ignore[assignment]
|
|
|
|
# Owning platform instance. Set by EntityPlatform by calling add_to_platform_start
|
|
# While not purely typed, it makes typehinting more useful for us
|
|
# and removes the need for constant None checks or asserts.
|
|
platform: EntityPlatform = None # type: ignore[assignment]
|
|
|
|
# Entity description instance for this Entity
|
|
entity_description: EntityDescription
|
|
|
|
# If we reported if this entity was slow
|
|
_slow_reported = False
|
|
|
|
# If we reported deprecated supported features constants
|
|
_deprecated_supported_features_reported = False
|
|
|
|
# If we reported this entity is updated while disabled
|
|
_disabled_reported = False
|
|
|
|
# If we reported this entity is using async_update_ha_state, while
|
|
# it should be using async_write_ha_state.
|
|
_async_update_ha_state_reported = False
|
|
|
|
# If we reported this entity was added without its platform set
|
|
_no_platform_reported = False
|
|
|
|
# If we reported the name translation placeholders do not match the name
|
|
_name_translation_placeholders_reported = False
|
|
|
|
# Protect for multiple updates
|
|
_update_staged = False
|
|
|
|
# _verified_state_writable is set to True if the entity has been verified
|
|
# to be writable. This is used to avoid repeated checks.
|
|
_verified_state_writable = False
|
|
|
|
# Process updates in parallel
|
|
parallel_updates: asyncio.Semaphore | None = None
|
|
|
|
# Entry in the entity registry
|
|
registry_entry: er.RegistryEntry | None = None
|
|
|
|
# If the entity is removed from the entity registry
|
|
_removed_from_registry: bool = False
|
|
|
|
# The device entry for this entity
|
|
device_entry: dr.DeviceEntry | None = None
|
|
|
|
# Hold list for functions to call on remove.
|
|
_on_remove: list[CALLBACK_TYPE] | None = None
|
|
|
|
_unsub_device_updates: CALLBACK_TYPE | None = None
|
|
|
|
# Context
|
|
_context: Context | None = None
|
|
_context_set: float | None = None
|
|
|
|
# If entity is added to an entity platform
|
|
_platform_state = EntityPlatformState.NOT_ADDED
|
|
|
|
# Attributes to exclude from recording, only set by base components, e.g. light
|
|
_entity_component_unrecorded_attributes: frozenset[str] = frozenset()
|
|
# Additional integration specific attributes to exclude from recording, set by
|
|
# platforms, e.g. a derived class in hue.light
|
|
_unrecorded_attributes: frozenset[str] = frozenset()
|
|
# Union of _entity_component_unrecorded_attributes and _unrecorded_attributes,
|
|
# set automatically by __init_subclass__
|
|
__combined_unrecorded_attributes: frozenset[str] = (
|
|
_entity_component_unrecorded_attributes | _unrecorded_attributes
|
|
)
|
|
# Job type cache
|
|
_job_types: dict[str, HassJobType] | None = None
|
|
|
|
# StateInfo. Set by EntityPlatform by calling async_internal_added_to_hass
|
|
# While not purely typed, it makes typehinting more useful for us
|
|
# and removes the need for constant None checks or asserts.
|
|
_state_info: StateInfo = None # type: ignore[assignment]
|
|
|
|
__capabilities_updated_at: deque[float]
|
|
__capabilities_updated_at_reported: bool = False
|
|
__remove_future: asyncio.Future[None] | None = None
|
|
|
|
# Entity Properties
|
|
_attr_assumed_state: bool = False
|
|
_attr_attribution: str | None = None
|
|
_attr_available: bool = True
|
|
_attr_capability_attributes: dict[str, Any] | None = None
|
|
_attr_device_class: str | None
|
|
_attr_device_info: DeviceInfo | None = None
|
|
_attr_entity_category: EntityCategory | None
|
|
_attr_has_entity_name: bool
|
|
_attr_entity_picture: str | None = None
|
|
_attr_entity_registry_enabled_default: bool
|
|
_attr_entity_registry_visible_default: bool
|
|
_attr_extra_state_attributes: dict[str, Any]
|
|
_attr_force_update: bool
|
|
_attr_icon: str | None
|
|
_attr_name: str | None
|
|
_attr_should_poll: bool = True
|
|
_attr_state: StateType = STATE_UNKNOWN
|
|
_attr_supported_features: int | None = None
|
|
_attr_translation_key: str | None
|
|
_attr_translation_placeholders: Mapping[str, str]
|
|
_attr_unique_id: str | None = None
|
|
_attr_unit_of_measurement: str | None
|
|
|
|
def __init_subclass__(cls, **kwargs: Any) -> None:
|
|
"""Initialize an Entity subclass."""
|
|
super().__init_subclass__(**kwargs)
|
|
cls.__combined_unrecorded_attributes = (
|
|
cls._entity_component_unrecorded_attributes | cls._unrecorded_attributes
|
|
)
|
|
|
|
def get_hassjob_type(self, function_name: str) -> HassJobType:
|
|
"""Get the job type function for the given name.
|
|
|
|
This is used for entity service calls to avoid
|
|
figuring out the job type each time.
|
|
"""
|
|
if not self._job_types:
|
|
self._job_types = {}
|
|
if function_name not in self._job_types:
|
|
self._job_types[function_name] = get_hassjob_callable_job_type(
|
|
getattr(self, function_name)
|
|
)
|
|
return self._job_types[function_name]
|
|
|
|
@cached_property
|
|
def should_poll(self) -> bool:
|
|
"""Return True if entity has to be polled for state.
|
|
|
|
False if entity pushes its state to HA.
|
|
"""
|
|
return self._attr_should_poll
|
|
|
|
@cached_property
|
|
def unique_id(self) -> str | None:
|
|
"""Return a unique ID."""
|
|
return self._attr_unique_id
|
|
|
|
@cached_property
|
|
def use_device_name(self) -> bool:
|
|
"""Return if this entity does not have its own name.
|
|
|
|
Should be True if the entity represents the single main feature of a device.
|
|
"""
|
|
if hasattr(self, "_attr_name"):
|
|
return not self._attr_name
|
|
if (
|
|
name_translation_key := self._name_translation_key
|
|
) and name_translation_key in self.platform.platform_translations:
|
|
return False
|
|
if hasattr(self, "entity_description"):
|
|
return not self.entity_description.name
|
|
return not self.name
|
|
|
|
@cached_property
|
|
def has_entity_name(self) -> bool:
|
|
"""Return if the name of the entity is describing only the entity itself."""
|
|
if hasattr(self, "_attr_has_entity_name"):
|
|
return self._attr_has_entity_name
|
|
if hasattr(self, "entity_description"):
|
|
return self.entity_description.has_entity_name
|
|
return False
|
|
|
|
def _device_class_name_helper(
|
|
self,
|
|
component_translations: dict[str, str],
|
|
) -> str | None:
|
|
"""Return a translated name of the entity based on its device class."""
|
|
if not self.has_entity_name:
|
|
return None
|
|
device_class_key = self.device_class or "_"
|
|
platform = self.platform
|
|
name_translation_key = (
|
|
f"component.{platform.domain}.entity_component.{device_class_key}.name"
|
|
)
|
|
return component_translations.get(name_translation_key)
|
|
|
|
@cached_property
|
|
def _object_id_device_class_name(self) -> str | None:
|
|
"""Return a translated name of the entity based on its device class."""
|
|
return self._device_class_name_helper(
|
|
self.platform.object_id_component_translations
|
|
)
|
|
|
|
@cached_property
|
|
def _device_class_name(self) -> str | None:
|
|
"""Return a translated name of the entity based on its device class."""
|
|
return self._device_class_name_helper(self.platform.component_translations)
|
|
|
|
def _default_to_device_class_name(self) -> bool:
|
|
"""Return True if an unnamed entity should be named by its device class."""
|
|
return False
|
|
|
|
@cached_property
|
|
def _name_translation_key(self) -> str | None:
|
|
"""Return translation key for entity name."""
|
|
if self.translation_key is None:
|
|
return None
|
|
platform = self.platform
|
|
return (
|
|
f"component.{platform.platform_name}.entity.{platform.domain}"
|
|
f".{self.translation_key}.name"
|
|
)
|
|
|
|
def _substitute_name_placeholders(self, name: str) -> str:
|
|
"""Substitute placeholders in entity name."""
|
|
try:
|
|
return name.format(**self.translation_placeholders)
|
|
except KeyError as err:
|
|
if not self._name_translation_placeholders_reported:
|
|
if get_release_channel() is not ReleaseChannel.STABLE:
|
|
raise HomeAssistantError(f"Missing placeholder {err}") from err
|
|
report_issue = self._suggest_report_issue()
|
|
_LOGGER.warning(
|
|
(
|
|
"Entity %s (%s) has translation placeholders '%s' which do not "
|
|
"match the name '%s', please %s"
|
|
),
|
|
self.entity_id,
|
|
type(self),
|
|
self.translation_placeholders,
|
|
name,
|
|
report_issue,
|
|
)
|
|
self._name_translation_placeholders_reported = True
|
|
return name
|
|
|
|
def _name_internal(
|
|
self,
|
|
device_class_name: str | None,
|
|
platform_translations: dict[str, str],
|
|
) -> str | UndefinedType | None:
|
|
"""Return the name of the entity."""
|
|
if hasattr(self, "_attr_name"):
|
|
return self._attr_name
|
|
if (
|
|
self.has_entity_name
|
|
and (name_translation_key := self._name_translation_key)
|
|
and (name := platform_translations.get(name_translation_key))
|
|
):
|
|
return self._substitute_name_placeholders(name)
|
|
if hasattr(self, "entity_description"):
|
|
description_name = self.entity_description.name
|
|
if description_name is UNDEFINED and self._default_to_device_class_name():
|
|
return device_class_name
|
|
return description_name
|
|
|
|
# The entity has no name set by _attr_name, translation_key or entity_description
|
|
# Check if the entity should be named by its device class
|
|
if self._default_to_device_class_name():
|
|
return device_class_name
|
|
return UNDEFINED
|
|
|
|
@property
|
|
def suggested_object_id(self) -> str | None:
|
|
"""Return input for object id."""
|
|
if (
|
|
# Check our class has overridden the name property from Entity
|
|
# We need to use type.__getattribute__ to retrieve the underlying
|
|
# property or cached_property object instead of the property's
|
|
# value.
|
|
type.__getattribute__(self.__class__, "name")
|
|
is type.__getattribute__(Entity, "name")
|
|
# The check for self.platform guards against integrations not using an
|
|
# EntityComponent and can be removed in HA Core 2024.1
|
|
and self.platform
|
|
):
|
|
name = self._name_internal(
|
|
self._object_id_device_class_name,
|
|
self.platform.object_id_platform_translations,
|
|
)
|
|
else:
|
|
name = self.name
|
|
return None if name is UNDEFINED else name
|
|
|
|
@cached_property
|
|
def name(self) -> str | UndefinedType | None:
|
|
"""Return the name of the entity."""
|
|
# The check for self.platform guards against integrations not using an
|
|
# EntityComponent and can be removed in HA Core 2024.1
|
|
if not self.platform:
|
|
return self._name_internal(None, {})
|
|
return self._name_internal(
|
|
self._device_class_name,
|
|
self.platform.platform_translations,
|
|
)
|
|
|
|
@cached_property
|
|
def state(self) -> StateType:
|
|
"""Return the state of the entity."""
|
|
return self._attr_state
|
|
|
|
@cached_property
|
|
def capability_attributes(self) -> dict[str, Any] | None:
|
|
"""Return the capability attributes.
|
|
|
|
Attributes that explain the capabilities of an entity.
|
|
|
|
Implemented by component base class. Convention for attribute names
|
|
is lowercase snake_case.
|
|
"""
|
|
return self._attr_capability_attributes
|
|
|
|
def get_initial_entity_options(self) -> er.EntityOptionsType | None:
|
|
"""Return initial entity options.
|
|
|
|
These will be stored in the entity registry the first time the entity is seen,
|
|
and then never updated.
|
|
|
|
Implemented by component base class, should not be extended by integrations.
|
|
|
|
Note: Not a property to avoid calculating unless needed.
|
|
"""
|
|
return None
|
|
|
|
@cached_property
|
|
def state_attributes(self) -> dict[str, Any] | None:
|
|
"""Return the state attributes.
|
|
|
|
Implemented by component base class, should not be extended by integrations.
|
|
Convention for attribute names is lowercase snake_case.
|
|
"""
|
|
return None
|
|
|
|
@cached_property
|
|
def extra_state_attributes(self) -> Mapping[str, Any] | None:
|
|
"""Return entity specific state attributes.
|
|
|
|
Implemented by platform classes. Convention for attribute names
|
|
is lowercase snake_case.
|
|
"""
|
|
if hasattr(self, "_attr_extra_state_attributes"):
|
|
return self._attr_extra_state_attributes
|
|
return None
|
|
|
|
@cached_property
|
|
def device_info(self) -> DeviceInfo | None:
|
|
"""Return device specific attributes.
|
|
|
|
Implemented by platform classes.
|
|
"""
|
|
return self._attr_device_info
|
|
|
|
@cached_property
|
|
def device_class(self) -> str | None:
|
|
"""Return the class of this device, from component DEVICE_CLASSES."""
|
|
if hasattr(self, "_attr_device_class"):
|
|
return self._attr_device_class
|
|
if hasattr(self, "entity_description"):
|
|
return self.entity_description.device_class
|
|
return None
|
|
|
|
@cached_property
|
|
def unit_of_measurement(self) -> str | None:
|
|
"""Return the unit of measurement of this entity, if any."""
|
|
if hasattr(self, "_attr_unit_of_measurement"):
|
|
return self._attr_unit_of_measurement
|
|
if hasattr(self, "entity_description"):
|
|
return self.entity_description.unit_of_measurement
|
|
return None
|
|
|
|
@cached_property
|
|
def icon(self) -> str | None:
|
|
"""Return the icon to use in the frontend, if any."""
|
|
if hasattr(self, "_attr_icon"):
|
|
return self._attr_icon
|
|
if hasattr(self, "entity_description"):
|
|
return self.entity_description.icon
|
|
return None
|
|
|
|
@cached_property
|
|
def entity_picture(self) -> str | None:
|
|
"""Return the entity picture to use in the frontend, if any."""
|
|
return self._attr_entity_picture
|
|
|
|
@cached_property
|
|
def available(self) -> bool:
|
|
"""Return True if entity is available."""
|
|
return self._attr_available
|
|
|
|
@cached_property
|
|
def assumed_state(self) -> bool:
|
|
"""Return True if unable to access real state of the entity."""
|
|
return self._attr_assumed_state
|
|
|
|
@cached_property
|
|
def force_update(self) -> bool:
|
|
"""Return True if state updates should be forced.
|
|
|
|
If True, a state change will be triggered anytime the state property is
|
|
updated, not just when the value changes.
|
|
"""
|
|
if hasattr(self, "_attr_force_update"):
|
|
return self._attr_force_update
|
|
if hasattr(self, "entity_description"):
|
|
return self.entity_description.force_update
|
|
return False
|
|
|
|
@cached_property
|
|
def supported_features(self) -> int | None:
|
|
"""Flag supported features."""
|
|
return self._attr_supported_features
|
|
|
|
@cached_property
|
|
def entity_registry_enabled_default(self) -> bool:
|
|
"""Return if the entity should be enabled when first added.
|
|
|
|
This only applies when fist added to the entity registry.
|
|
"""
|
|
if hasattr(self, "_attr_entity_registry_enabled_default"):
|
|
return self._attr_entity_registry_enabled_default
|
|
if hasattr(self, "entity_description"):
|
|
return self.entity_description.entity_registry_enabled_default
|
|
return True
|
|
|
|
@cached_property
|
|
def entity_registry_visible_default(self) -> bool:
|
|
"""Return if the entity should be visible when first added.
|
|
|
|
This only applies when fist added to the entity registry.
|
|
"""
|
|
if hasattr(self, "_attr_entity_registry_visible_default"):
|
|
return self._attr_entity_registry_visible_default
|
|
if hasattr(self, "entity_description"):
|
|
return self.entity_description.entity_registry_visible_default
|
|
return True
|
|
|
|
@cached_property
|
|
def attribution(self) -> str | None:
|
|
"""Return the attribution."""
|
|
return self._attr_attribution
|
|
|
|
@cached_property
|
|
def entity_category(self) -> EntityCategory | None:
|
|
"""Return the category of the entity, if any."""
|
|
if hasattr(self, "_attr_entity_category"):
|
|
return self._attr_entity_category
|
|
if hasattr(self, "entity_description"):
|
|
return self.entity_description.entity_category
|
|
return None
|
|
|
|
@cached_property
|
|
def translation_key(self) -> str | None:
|
|
"""Return the translation key to translate the entity's states."""
|
|
if hasattr(self, "_attr_translation_key"):
|
|
return self._attr_translation_key
|
|
if hasattr(self, "entity_description"):
|
|
return self.entity_description.translation_key
|
|
return None
|
|
|
|
@final
|
|
@cached_property
|
|
def translation_placeholders(self) -> Mapping[str, str]:
|
|
"""Return the translation placeholders for translated entity's name."""
|
|
if hasattr(self, "_attr_translation_placeholders"):
|
|
return self._attr_translation_placeholders
|
|
if hasattr(self, "entity_description"):
|
|
return self.entity_description.translation_placeholders or {}
|
|
return {}
|
|
|
|
# DO NOT OVERWRITE
|
|
# These properties and methods are either managed by Home Assistant or they
|
|
# are used to perform a very specific function. Overwriting these may
|
|
# produce undesirable effects in the entity's operation.
|
|
|
|
@property
|
|
def enabled(self) -> bool:
|
|
"""Return if the entity is enabled in the entity registry.
|
|
|
|
If an entity is not part of the registry, it cannot be disabled
|
|
and will therefore always be enabled.
|
|
"""
|
|
return self.registry_entry is None or not self.registry_entry.disabled
|
|
|
|
@callback
|
|
def async_set_context(self, context: Context) -> None:
|
|
"""Set the context the entity currently operates under."""
|
|
self._context = context
|
|
self._context_set = time.time()
|
|
|
|
async def async_update_ha_state(self, force_refresh: bool = False) -> None:
|
|
"""Update Home Assistant with current state of entity.
|
|
|
|
If force_refresh == True will update entity before setting state.
|
|
|
|
This method must be run in the event loop.
|
|
"""
|
|
if self.hass is None:
|
|
raise RuntimeError(f"Attribute hass is None for {self}")
|
|
|
|
if self.entity_id is None:
|
|
raise NoEntitySpecifiedError(
|
|
f"No entity id specified for entity {self.name}"
|
|
)
|
|
|
|
# update entity data
|
|
if force_refresh:
|
|
try:
|
|
await self.async_device_update()
|
|
except Exception:
|
|
_LOGGER.exception("Update for %s fails", self.entity_id)
|
|
return
|
|
elif not self._async_update_ha_state_reported:
|
|
report_issue = self._suggest_report_issue()
|
|
_LOGGER.warning(
|
|
(
|
|
"Entity %s (%s) is using self.async_update_ha_state(), without"
|
|
" enabling force_refresh. Instead it should use"
|
|
" self.async_write_ha_state(), please %s"
|
|
),
|
|
self.entity_id,
|
|
type(self),
|
|
report_issue,
|
|
)
|
|
self._async_update_ha_state_reported = True
|
|
|
|
self._async_write_ha_state()
|
|
|
|
@callback
|
|
def _async_verify_state_writable(self) -> None:
|
|
"""Verify the entity is in a writable state."""
|
|
if self.hass is None:
|
|
raise RuntimeError(f"Attribute hass is None for {self}")
|
|
|
|
# The check for self.platform guards against integrations not using an
|
|
# EntityComponent and can be removed in HA Core 2024.1
|
|
if self.platform is None and not self._no_platform_reported: # type: ignore[unreachable]
|
|
report_issue = self._suggest_report_issue() # type: ignore[unreachable]
|
|
_LOGGER.warning(
|
|
(
|
|
"Entity %s (%s) does not have a platform, this may be caused by "
|
|
"adding it manually instead of with an EntityComponent helper"
|
|
", please %s"
|
|
),
|
|
self.entity_id,
|
|
type(self),
|
|
report_issue,
|
|
)
|
|
self._no_platform_reported = True
|
|
|
|
if self.entity_id is None:
|
|
raise NoEntitySpecifiedError(
|
|
f"No entity id specified for entity {self.name}"
|
|
)
|
|
|
|
self._verified_state_writable = True
|
|
|
|
@callback
|
|
def _async_write_ha_state_from_call_soon_threadsafe(self) -> None:
|
|
"""Write the state to the state machine from the event loop thread."""
|
|
if not self.hass or not self._verified_state_writable:
|
|
self._async_verify_state_writable()
|
|
self._async_write_ha_state()
|
|
|
|
@callback
|
|
def async_write_ha_state(self) -> None:
|
|
"""Write the state to the state machine."""
|
|
if not self.hass or not self._verified_state_writable:
|
|
self._async_verify_state_writable()
|
|
if self.hass.loop_thread_id != threading.get_ident():
|
|
report_non_thread_safe_operation("async_write_ha_state")
|
|
self._async_write_ha_state()
|
|
|
|
def _stringify_state(self, available: bool) -> str:
|
|
"""Convert state to string."""
|
|
if not available:
|
|
return STATE_UNAVAILABLE
|
|
if (state := self.state) is None:
|
|
return STATE_UNKNOWN
|
|
if type(state) is str: # noqa: E721
|
|
# fast path for strings
|
|
return state
|
|
if isinstance(state, float):
|
|
# If the entity's state is a float, limit precision according to machine
|
|
# epsilon to make the string representation readable
|
|
return f"{state:.{FLOAT_PRECISION}}"
|
|
return str(state)
|
|
|
|
def _friendly_name_internal(self) -> str | None:
|
|
"""Return the friendly name.
|
|
|
|
If has_entity_name is False, this returns self.name
|
|
If has_entity_name is True, this returns device.name + self.name
|
|
"""
|
|
name = self.name
|
|
if name is UNDEFINED:
|
|
name = None
|
|
|
|
if not self.has_entity_name or not (device_entry := self.device_entry):
|
|
return name
|
|
|
|
device_name = device_entry.name_by_user or device_entry.name
|
|
if name is None and self.use_device_name:
|
|
return device_name
|
|
return f"{device_name} {name}" if device_name else name
|
|
|
|
@callback
|
|
def _async_calculate_state(self) -> CalculatedState:
|
|
"""Calculate state string and attribute mapping."""
|
|
state, attr, capabilities, _, _ = self.__async_calculate_state()
|
|
return CalculatedState(state, attr, capabilities)
|
|
|
|
def __async_calculate_state(
|
|
self,
|
|
) -> tuple[str, dict[str, Any], Mapping[str, Any] | None, str | None, int | None]:
|
|
"""Calculate state string and attribute mapping.
|
|
|
|
Returns a tuple:
|
|
state - the stringified state
|
|
attr - the attribute dictionary
|
|
capability_attr - a mapping with capability attributes
|
|
original_device_class - the device class which may be overridden
|
|
supported_features - the supported features
|
|
|
|
This method is called when writing the state to avoid the overhead of creating
|
|
a dataclass object.
|
|
"""
|
|
entry = self.registry_entry
|
|
|
|
capability_attr = self.capability_attributes
|
|
attr = capability_attr.copy() if capability_attr else {}
|
|
|
|
available = self.available # only call self.available once per update cycle
|
|
state = self._stringify_state(available)
|
|
if available:
|
|
if state_attributes := self.state_attributes:
|
|
attr.update(state_attributes)
|
|
if extra_state_attributes := self.extra_state_attributes:
|
|
attr.update(extra_state_attributes)
|
|
|
|
if (unit_of_measurement := self.unit_of_measurement) is not None:
|
|
attr[ATTR_UNIT_OF_MEASUREMENT] = unit_of_measurement
|
|
|
|
if assumed_state := self.assumed_state:
|
|
attr[ATTR_ASSUMED_STATE] = assumed_state
|
|
|
|
if (attribution := self.attribution) is not None:
|
|
attr[ATTR_ATTRIBUTION] = attribution
|
|
|
|
original_device_class = self.device_class
|
|
if (
|
|
device_class := (entry and entry.device_class) or original_device_class
|
|
) is not None:
|
|
attr[ATTR_DEVICE_CLASS] = str(device_class)
|
|
|
|
if (entity_picture := self.entity_picture) is not None:
|
|
attr[ATTR_ENTITY_PICTURE] = entity_picture
|
|
|
|
if (icon := (entry and entry.icon) or self.icon) is not None:
|
|
attr[ATTR_ICON] = icon
|
|
|
|
if (
|
|
name := (entry and entry.name) or self._friendly_name_internal()
|
|
) is not None:
|
|
attr[ATTR_FRIENDLY_NAME] = name
|
|
|
|
if (supported_features := self.supported_features) is not None:
|
|
attr[ATTR_SUPPORTED_FEATURES] = supported_features
|
|
|
|
return (state, attr, capability_attr, original_device_class, supported_features)
|
|
|
|
@callback
|
|
def _async_write_ha_state(self) -> None:
|
|
"""Write the state to the state machine."""
|
|
if self._platform_state is EntityPlatformState.REMOVED:
|
|
# Polling returned after the entity has already been removed
|
|
return
|
|
|
|
hass = self.hass
|
|
entity_id = self.entity_id
|
|
|
|
if (entry := self.registry_entry) and entry.disabled_by:
|
|
if not self._disabled_reported:
|
|
self._disabled_reported = True
|
|
_LOGGER.warning(
|
|
(
|
|
"Entity %s is incorrectly being triggered for updates while it"
|
|
" is disabled. This is a bug in the %s integration"
|
|
),
|
|
entity_id,
|
|
self.platform.platform_name,
|
|
)
|
|
return
|
|
|
|
state_calculate_start = timer()
|
|
state, attr, capabilities, original_device_class, supported_features = (
|
|
self.__async_calculate_state()
|
|
)
|
|
time_now = timer()
|
|
|
|
if entry:
|
|
# Make sure capabilities in the entity registry are up to date. Capabilities
|
|
# include capability attributes, device class and supported features
|
|
supported_features = supported_features or 0
|
|
if (
|
|
capabilities != entry.capabilities
|
|
or original_device_class != entry.original_device_class
|
|
or supported_features != entry.supported_features
|
|
):
|
|
if not self.__capabilities_updated_at_reported:
|
|
# _Entity__capabilities_updated_at is because of name mangling
|
|
if not (
|
|
capabilities_updated_at := getattr(
|
|
self, "_Entity__capabilities_updated_at", None
|
|
)
|
|
):
|
|
self.__capabilities_updated_at = deque(
|
|
maxlen=CAPABILITIES_UPDATE_LIMIT + 1
|
|
)
|
|
capabilities_updated_at = self.__capabilities_updated_at
|
|
capabilities_updated_at.append(time_now)
|
|
while time_now - capabilities_updated_at[0] > 3600:
|
|
capabilities_updated_at.popleft()
|
|
if len(capabilities_updated_at) > CAPABILITIES_UPDATE_LIMIT:
|
|
self.__capabilities_updated_at_reported = True
|
|
report_issue = self._suggest_report_issue()
|
|
_LOGGER.warning(
|
|
(
|
|
"Entity %s (%s) is updating its capabilities too often,"
|
|
" please %s"
|
|
),
|
|
entity_id,
|
|
type(self),
|
|
report_issue,
|
|
)
|
|
entity_registry = er.async_get(self.hass)
|
|
self.registry_entry = entity_registry.async_update_entity(
|
|
self.entity_id,
|
|
capabilities=capabilities,
|
|
original_device_class=original_device_class,
|
|
supported_features=supported_features,
|
|
)
|
|
|
|
if time_now - state_calculate_start > 0.4 and not self._slow_reported:
|
|
self._slow_reported = True
|
|
report_issue = self._suggest_report_issue()
|
|
_LOGGER.warning(
|
|
"Updating state for %s (%s) took %.3f seconds. Please %s",
|
|
entity_id,
|
|
type(self),
|
|
time_now - state_calculate_start,
|
|
report_issue,
|
|
)
|
|
|
|
try:
|
|
# Most of the time this will already be
|
|
# set and since try is near zero cost
|
|
# on py3.11+ its faster to assume it is
|
|
# set and catch the exception if it is not.
|
|
customize = hass.data[DATA_CUSTOMIZE]
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
# Overwrite properties that have been set in the config file.
|
|
if custom := customize.get(entity_id):
|
|
attr.update(custom)
|
|
|
|
if (
|
|
self._context_set is not None
|
|
and time_now - self._context_set > CONTEXT_RECENT_TIME_SECONDS
|
|
):
|
|
self._context = None
|
|
self._context_set = None
|
|
|
|
try:
|
|
hass.states.async_set_internal(
|
|
entity_id,
|
|
state,
|
|
attr,
|
|
self.force_update,
|
|
self._context,
|
|
self._state_info,
|
|
time_now,
|
|
)
|
|
except InvalidStateError:
|
|
_LOGGER.exception(
|
|
"Failed to set state for %s, fall back to %s", entity_id, STATE_UNKNOWN
|
|
)
|
|
hass.states.async_set(
|
|
entity_id, STATE_UNKNOWN, {}, self.force_update, self._context
|
|
)
|
|
|
|
def schedule_update_ha_state(self, force_refresh: bool = False) -> None:
|
|
"""Schedule an update ha state change task.
|
|
|
|
Scheduling the update avoids executor deadlocks.
|
|
|
|
Entity state and attributes are read when the update ha state change
|
|
task is executed.
|
|
If state is changed more than once before the ha state change task has
|
|
been executed, the intermediate state transitions will be missed.
|
|
"""
|
|
if force_refresh:
|
|
self.hass.create_task(
|
|
self.async_update_ha_state(force_refresh),
|
|
f"Entity {self.entity_id} schedule update ha state",
|
|
)
|
|
else:
|
|
self.hass.loop.call_soon_threadsafe(
|
|
self._async_write_ha_state_from_call_soon_threadsafe
|
|
)
|
|
|
|
@callback
|
|
def async_schedule_update_ha_state(self, force_refresh: bool = False) -> None:
|
|
"""Schedule an update ha state change task.
|
|
|
|
This method must be run in the event loop.
|
|
Scheduling the update avoids executor deadlocks.
|
|
|
|
Entity state and attributes are read when the update ha state change
|
|
task is executed.
|
|
If state is changed more than once before the ha state change task has
|
|
been executed, the intermediate state transitions will be missed.
|
|
"""
|
|
if force_refresh:
|
|
self.hass.async_create_task(
|
|
self.async_update_ha_state(force_refresh),
|
|
f"Entity schedule update ha state {self.entity_id}",
|
|
eager_start=True,
|
|
)
|
|
else:
|
|
self.async_write_ha_state()
|
|
|
|
@callback
|
|
def _async_slow_update_warning(self) -> None:
|
|
"""Log a warning if update is taking too long."""
|
|
_LOGGER.warning(
|
|
"Update of %s is taking over %s seconds",
|
|
self.entity_id,
|
|
SLOW_UPDATE_WARNING,
|
|
)
|
|
|
|
async def async_device_update(self, warning: bool = True) -> None:
|
|
"""Process 'update' or 'async_update' from entity.
|
|
|
|
This method is a coroutine.
|
|
"""
|
|
if self._update_staged:
|
|
return
|
|
|
|
hass = self.hass
|
|
assert hass is not None
|
|
|
|
self._update_staged = True
|
|
|
|
# Process update sequential
|
|
if self.parallel_updates:
|
|
await self.parallel_updates.acquire()
|
|
|
|
if warning:
|
|
update_warn = hass.loop.call_at(
|
|
hass.loop.time() + SLOW_UPDATE_WARNING, self._async_slow_update_warning
|
|
)
|
|
|
|
try:
|
|
if hasattr(self, "async_update"):
|
|
await self.async_update()
|
|
elif hasattr(self, "update"):
|
|
await hass.async_add_executor_job(self.update)
|
|
else:
|
|
return
|
|
finally:
|
|
self._update_staged = False
|
|
if warning:
|
|
update_warn.cancel()
|
|
if self.parallel_updates:
|
|
self.parallel_updates.release()
|
|
|
|
@callback
|
|
def async_on_remove(self, func: CALLBACK_TYPE) -> None:
|
|
"""Add a function to call when entity is removed or not added."""
|
|
if self._on_remove is None:
|
|
self._on_remove = []
|
|
self._on_remove.append(func)
|
|
|
|
async def async_removed_from_registry(self) -> None:
|
|
"""Run when entity has been removed from entity registry.
|
|
|
|
To be extended by integrations.
|
|
"""
|
|
|
|
@callback
|
|
def add_to_platform_start(
|
|
self,
|
|
hass: HomeAssistant,
|
|
platform: EntityPlatform,
|
|
parallel_updates: asyncio.Semaphore | None,
|
|
) -> None:
|
|
"""Start adding an entity to a platform."""
|
|
if self._platform_state is not EntityPlatformState.NOT_ADDED:
|
|
raise HomeAssistantError(
|
|
f"Entity '{self.entity_id}' cannot be added a second time to an entity"
|
|
" platform"
|
|
)
|
|
|
|
self.hass = hass
|
|
self.platform = platform
|
|
self.parallel_updates = parallel_updates
|
|
self._platform_state = EntityPlatformState.ADDED
|
|
|
|
def _call_on_remove_callbacks(self) -> None:
|
|
"""Call callbacks registered by async_on_remove."""
|
|
if self._on_remove is None:
|
|
return
|
|
while self._on_remove:
|
|
self._on_remove.pop()()
|
|
|
|
@callback
|
|
def add_to_platform_abort(self) -> None:
|
|
"""Abort adding an entity to a platform."""
|
|
|
|
self._platform_state = EntityPlatformState.REMOVED
|
|
self._call_on_remove_callbacks()
|
|
|
|
self.hass = None # type: ignore[assignment]
|
|
self.platform = None # type: ignore[assignment]
|
|
self.parallel_updates = None
|
|
|
|
async def add_to_platform_finish(self) -> None:
|
|
"""Finish adding an entity to a platform."""
|
|
await self.async_internal_added_to_hass()
|
|
await self.async_added_to_hass()
|
|
self.async_write_ha_state()
|
|
|
|
@final
|
|
async def async_remove(self, *, force_remove: bool = False) -> None:
|
|
"""Remove entity from Home Assistant.
|
|
|
|
If the entity has a non disabled entry in the entity registry,
|
|
the entity's state will be set to unavailable, in the same way
|
|
as when the entity registry is loaded.
|
|
|
|
If the entity doesn't have a non disabled entry in the entity registry,
|
|
or if force_remove=True, its state will be removed.
|
|
"""
|
|
if self.__remove_future is not None:
|
|
await self.__remove_future
|
|
return
|
|
|
|
self.__remove_future = self.hass.loop.create_future()
|
|
try:
|
|
await self.__async_remove_impl(force_remove)
|
|
except BaseException as ex:
|
|
self.__remove_future.set_exception(ex)
|
|
raise
|
|
finally:
|
|
self.__remove_future.set_result(None)
|
|
|
|
@final
|
|
async def __async_remove_impl(self, force_remove: bool) -> None:
|
|
"""Remove entity from Home Assistant."""
|
|
|
|
self._platform_state = EntityPlatformState.REMOVED
|
|
|
|
self._call_on_remove_callbacks()
|
|
|
|
await self.async_internal_will_remove_from_hass()
|
|
await self.async_will_remove_from_hass()
|
|
|
|
# Check if entry still exists in entity registry (e.g. unloading config entry)
|
|
if (
|
|
not force_remove
|
|
and self.registry_entry
|
|
and not self.registry_entry.disabled
|
|
# Check if entity is still in the entity registry
|
|
# by checking self._removed_from_registry
|
|
#
|
|
# Because self.registry_entry is unset in a task,
|
|
# its possible that the entity has been removed but
|
|
# the task has not yet been executed.
|
|
#
|
|
# self._removed_from_registry is set to True in a
|
|
# callback which does not have the same issue.
|
|
#
|
|
and not self._removed_from_registry
|
|
):
|
|
# Set the entity's state will to unavailable + ATTR_RESTORED: True
|
|
self.registry_entry.write_unavailable_state(self.hass)
|
|
else:
|
|
self.hass.states.async_remove(self.entity_id, context=self._context)
|
|
|
|
async def async_added_to_hass(self) -> None:
|
|
"""Run when entity about to be added to hass.
|
|
|
|
To be extended by integrations.
|
|
"""
|
|
|
|
async def async_will_remove_from_hass(self) -> None:
|
|
"""Run when entity will be removed from hass.
|
|
|
|
To be extended by integrations.
|
|
"""
|
|
|
|
@callback
|
|
def async_registry_entry_updated(self) -> None:
|
|
"""Run when the entity registry entry has been updated.
|
|
|
|
To be extended by integrations.
|
|
"""
|
|
|
|
async def async_internal_added_to_hass(self) -> None:
|
|
"""Run when entity about to be added to hass.
|
|
|
|
Not to be extended by integrations.
|
|
"""
|
|
is_custom_component = "custom_components" in type(self).__module__
|
|
entity_info: EntityInfo = {
|
|
"domain": self.platform.platform_name,
|
|
"custom_component": is_custom_component,
|
|
}
|
|
if self.platform.config_entry:
|
|
entity_info["config_entry"] = self.platform.config_entry.entry_id
|
|
|
|
entity_sources(self.hass)[self.entity_id] = entity_info
|
|
|
|
self._state_info = {
|
|
"unrecorded_attributes": self.__combined_unrecorded_attributes
|
|
}
|
|
|
|
if self.registry_entry is not None:
|
|
# This is an assert as it should never happen, but helps in tests
|
|
assert (
|
|
not self.registry_entry.disabled_by
|
|
), f"Entity '{self.entity_id}' is being added while it's disabled"
|
|
|
|
self.async_on_remove(
|
|
async_track_entity_registry_updated_event(
|
|
self.hass,
|
|
self.entity_id,
|
|
self._async_registry_updated,
|
|
job_type=HassJobType.Callback,
|
|
)
|
|
)
|
|
self._async_subscribe_device_updates()
|
|
|
|
async def async_internal_will_remove_from_hass(self) -> None:
|
|
"""Run when entity will be removed from hass.
|
|
|
|
Not to be extended by integrations.
|
|
"""
|
|
# The check for self.platform guards against integrations not using an
|
|
# EntityComponent and can be removed in HA Core 2024.1
|
|
if self.platform:
|
|
del entity_sources(self.hass)[self.entity_id]
|
|
|
|
@callback
|
|
def _async_registry_updated(
|
|
self, event: Event[er.EventEntityRegistryUpdatedData]
|
|
) -> None:
|
|
"""Handle entity registry update."""
|
|
action = event.data["action"]
|
|
is_remove = action == "remove"
|
|
self._removed_from_registry = is_remove
|
|
if action == "update" or is_remove:
|
|
self.hass.async_create_task_internal(
|
|
self._async_process_registry_update_or_remove(event), eager_start=True
|
|
)
|
|
|
|
async def _async_process_registry_update_or_remove(
|
|
self, event: Event[er.EventEntityRegistryUpdatedData]
|
|
) -> None:
|
|
"""Handle entity registry update or remove."""
|
|
data = event.data
|
|
if data["action"] == "remove":
|
|
await self.async_removed_from_registry()
|
|
self.registry_entry = None
|
|
await self.async_remove()
|
|
|
|
if data["action"] != "update":
|
|
return
|
|
|
|
if "device_id" in data["changes"]:
|
|
self._async_subscribe_device_updates()
|
|
|
|
ent_reg = er.async_get(self.hass)
|
|
old = self.registry_entry
|
|
registry_entry = ent_reg.async_get(data["entity_id"])
|
|
assert registry_entry is not None
|
|
self.registry_entry = registry_entry
|
|
|
|
if device_id := registry_entry.device_id:
|
|
self.device_entry = dr.async_get(self.hass).async_get(device_id)
|
|
|
|
if registry_entry.disabled:
|
|
await self.async_remove()
|
|
return
|
|
|
|
assert old is not None
|
|
if registry_entry.entity_id == old.entity_id:
|
|
self.async_registry_entry_updated()
|
|
self.async_write_ha_state()
|
|
return
|
|
|
|
await self.async_remove(force_remove=True)
|
|
|
|
self.entity_id = registry_entry.entity_id
|
|
|
|
# Clear the remove future to handle entity added again after entity id change
|
|
self.__remove_future = None
|
|
self._platform_state = EntityPlatformState.NOT_ADDED
|
|
await self.platform.async_add_entities([self])
|
|
|
|
@callback
|
|
def _async_unsubscribe_device_updates(self) -> None:
|
|
"""Unsubscribe from device registry updates."""
|
|
if not self._unsub_device_updates:
|
|
return
|
|
self._unsub_device_updates()
|
|
self._unsub_device_updates = None
|
|
|
|
@callback
|
|
def _async_device_registry_updated(
|
|
self, event: Event[EventDeviceRegistryUpdatedData]
|
|
) -> None:
|
|
"""Handle device registry update."""
|
|
data = event.data
|
|
|
|
if data["action"] != "update":
|
|
return
|
|
|
|
if "name" not in data["changes"] and "name_by_user" not in data["changes"]:
|
|
return
|
|
|
|
self.device_entry = dr.async_get(self.hass).async_get(data["device_id"])
|
|
self.async_write_ha_state()
|
|
|
|
@callback
|
|
def _async_subscribe_device_updates(self) -> None:
|
|
"""Subscribe to device registry updates."""
|
|
assert self.registry_entry
|
|
|
|
self._async_unsubscribe_device_updates()
|
|
|
|
if (device_id := self.registry_entry.device_id) is None:
|
|
return
|
|
|
|
if not self.has_entity_name:
|
|
return
|
|
|
|
self._unsub_device_updates = async_track_device_registry_updated_event(
|
|
self.hass,
|
|
device_id,
|
|
self._async_device_registry_updated,
|
|
job_type=HassJobType.Callback,
|
|
)
|
|
if (
|
|
not self._on_remove
|
|
or self._async_unsubscribe_device_updates not in self._on_remove
|
|
):
|
|
self.async_on_remove(self._async_unsubscribe_device_updates)
|
|
|
|
def __repr__(self) -> str:
|
|
"""Return the representation.
|
|
|
|
If the entity is not added to a platform it's not safe to call _stringify_state.
|
|
"""
|
|
if self._platform_state is not EntityPlatformState.ADDED:
|
|
return f"<entity unknown.unknown={STATE_UNKNOWN}>"
|
|
return f"<entity {self.entity_id}={self._stringify_state(self.available)}>"
|
|
|
|
async def async_request_call[_T](self, coro: Coroutine[Any, Any, _T]) -> _T:
|
|
"""Process request batched."""
|
|
if self.parallel_updates:
|
|
await self.parallel_updates.acquire()
|
|
|
|
try:
|
|
return await coro
|
|
finally:
|
|
if self.parallel_updates:
|
|
self.parallel_updates.release()
|
|
|
|
def _suggest_report_issue(self) -> str:
|
|
"""Suggest to report an issue."""
|
|
# The check for self.platform guards against integrations not using an
|
|
# EntityComponent and can be removed in HA Core 2024.1
|
|
platform_name = self.platform.platform_name if self.platform else None
|
|
return async_suggest_report_issue(
|
|
self.hass, integration_domain=platform_name, module=type(self).__module__
|
|
)
|
|
|
|
@callback
|
|
def _report_deprecated_supported_features_values(
|
|
self, replacement: IntFlag
|
|
) -> None:
|
|
"""Report deprecated supported features values."""
|
|
if self._deprecated_supported_features_reported is True:
|
|
return
|
|
self._deprecated_supported_features_reported = True
|
|
report_issue = self._suggest_report_issue()
|
|
report_issue += (
|
|
" and reference "
|
|
"https://developers.home-assistant.io/blog/2023/12/28/support-feature-magic-numbers-deprecation"
|
|
)
|
|
_LOGGER.warning(
|
|
(
|
|
"Entity %s (%s) is using deprecated supported features"
|
|
" values which will be removed in HA Core 2025.1. Instead it should use"
|
|
" %s, please %s"
|
|
),
|
|
self.entity_id,
|
|
type(self),
|
|
repr(replacement),
|
|
report_issue,
|
|
)
|
|
|
|
|
|
class ToggleEntityDescription(EntityDescription, frozen_or_thawed=True):
|
|
"""A class that describes toggle entities."""
|
|
|
|
|
|
TOGGLE_ENTITY_CACHED_PROPERTIES_WITH_ATTR_ = {"is_on"}
|
|
|
|
|
|
class ToggleEntity(
|
|
Entity, cached_properties=TOGGLE_ENTITY_CACHED_PROPERTIES_WITH_ATTR_
|
|
):
|
|
"""An abstract class for entities that can be turned on and off."""
|
|
|
|
entity_description: ToggleEntityDescription
|
|
_attr_is_on: bool | None = None
|
|
_attr_state: None = None
|
|
|
|
@property
|
|
@final
|
|
def state(self) -> Literal["on", "off"] | None:
|
|
"""Return the state."""
|
|
if (is_on := self.is_on) is None:
|
|
return None
|
|
return STATE_ON if is_on else STATE_OFF
|
|
|
|
@cached_property
|
|
def is_on(self) -> bool | None:
|
|
"""Return True if entity is on."""
|
|
return self._attr_is_on
|
|
|
|
def turn_on(self, **kwargs: Any) -> None:
|
|
"""Turn the entity on."""
|
|
raise NotImplementedError
|
|
|
|
async def async_turn_on(self, **kwargs: Any) -> None:
|
|
"""Turn the entity on."""
|
|
await self.hass.async_add_executor_job(ft.partial(self.turn_on, **kwargs))
|
|
|
|
def turn_off(self, **kwargs: Any) -> None:
|
|
"""Turn the entity off."""
|
|
raise NotImplementedError
|
|
|
|
async def async_turn_off(self, **kwargs: Any) -> None:
|
|
"""Turn the entity off."""
|
|
await self.hass.async_add_executor_job(ft.partial(self.turn_off, **kwargs))
|
|
|
|
@final
|
|
def toggle(self, **kwargs: Any) -> None:
|
|
"""Toggle the entity.
|
|
|
|
This method will never be called by Home Assistant and should not be implemented
|
|
by integrations.
|
|
"""
|
|
|
|
async def async_toggle(self, **kwargs: Any) -> None:
|
|
"""Toggle the entity.
|
|
|
|
This method should typically not be implemented by integrations, it's enough to
|
|
implement async_turn_on + async_turn_off or turn_on + turn_off.
|
|
"""
|
|
if self.is_on:
|
|
await self.async_turn_off(**kwargs)
|
|
else:
|
|
await self.async_turn_on(**kwargs)
|