core/homeassistant/components/bond/entity.py

205 lines
7.1 KiB
Python

"""An abstract class common to all Bond entities."""
from __future__ import annotations
from abc import abstractmethod
from asyncio import Lock
from datetime import datetime
import logging
from aiohttp import ClientError
from homeassistant.const import (
ATTR_HW_VERSION,
ATTR_MODEL,
ATTR_NAME,
ATTR_SUGGESTED_AREA,
ATTR_SW_VERSION,
ATTR_VIA_DEVICE,
)
from homeassistant.core import CALLBACK_TYPE, HassJob, callback
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.event import async_call_later
from .const import DOMAIN
from .models import BondData
from .utils import BondDevice
_LOGGER = logging.getLogger(__name__)
_FALLBACK_SCAN_INTERVAL = 10
_BPUP_ALIVE_SCAN_INTERVAL = 60
class BondEntity(Entity):
"""Generic Bond entity encapsulating common features of any Bond controlled device."""
_attr_should_poll = False
def __init__(
self,
data: BondData,
device: BondDevice,
sub_device: str | None = None,
sub_device_id: str | None = None,
) -> None:
"""Initialize entity with API and device info."""
hub = data.hub
self._hub = hub
self._bond = hub.bond
self._device = device
self._device_id = device.device_id
self._sub_device = sub_device
self._attr_available = True
self._bpup_subs = data.bpup_subs
self._update_lock = Lock()
self._initialized = False
if sub_device_id:
sub_device_id = f"_{sub_device_id}"
elif sub_device:
sub_device_id = f"_{sub_device}"
else:
sub_device_id = ""
self._attr_unique_id = f"{hub.bond_id}_{device.device_id}{sub_device_id}"
if sub_device:
sub_device_name = sub_device.replace("_", " ").title()
self._attr_name = f"{device.name} {sub_device_name}"
else:
self._attr_name = device.name
self._attr_assumed_state = self._hub.is_bridge and not self._device.trust_state
self._apply_state()
self._bpup_polling_fallback: CALLBACK_TYPE | None = None
self._async_update_if_bpup_not_alive_job = HassJob(
self._async_update_if_bpup_not_alive
)
@property
def device_info(self) -> DeviceInfo:
"""Get a an HA device representing this Bond controlled device."""
device_info = DeviceInfo(
manufacturer=self._hub.make,
# type ignore: tuple items should not be Optional
identifiers={(DOMAIN, self._hub.bond_id, self._device_id)}, # type: ignore[arg-type]
configuration_url=f"http://{self._hub.host}",
)
if self.name is not None:
device_info[ATTR_NAME] = self._device.name
if self._hub.bond_id is not None:
device_info[ATTR_VIA_DEVICE] = (DOMAIN, self._hub.bond_id)
if self._device.location is not None:
device_info[ATTR_SUGGESTED_AREA] = self._device.location
if not self._hub.is_bridge:
if self._hub.model is not None:
device_info[ATTR_MODEL] = self._hub.model
if self._hub.fw_ver is not None:
device_info[ATTR_SW_VERSION] = self._hub.fw_ver
if self._hub.mcu_ver is not None:
device_info[ATTR_HW_VERSION] = self._hub.mcu_ver
else:
model_data = []
if self._device.branding_profile:
model_data.append(self._device.branding_profile)
if self._device.template:
model_data.append(self._device.template)
if model_data:
device_info[ATTR_MODEL] = " ".join(model_data)
return device_info
async def async_update(self) -> None:
"""Perform a manual update from API."""
await self._async_update_from_api()
@callback
def _async_update_if_bpup_not_alive(self, now: datetime) -> None:
"""Fetch via the API if BPUP is not alive."""
self._async_schedule_bpup_alive_or_poll()
if (
self.hass.is_stopping
or self._bpup_subs.alive
and self._initialized
and self.available
):
return
if self._update_lock.locked():
_LOGGER.warning(
"Updating %s took longer than the scheduled update interval %s",
self.entity_id,
_FALLBACK_SCAN_INTERVAL,
)
return
self.hass.async_create_background_task(
self._async_update(), f"{DOMAIN} {self.name} update", eager_start=True
)
async def _async_update(self) -> None:
"""Fetch via the API."""
async with self._update_lock:
await self._async_update_from_api()
self.async_write_ha_state()
async def _async_update_from_api(self) -> None:
"""Fetch via the API."""
try:
state: dict = await self._bond.device_state(self._device_id)
except (ClientError, TimeoutError, OSError) as error:
if self.available:
_LOGGER.warning(
"Entity %s has become unavailable", self.entity_id, exc_info=error
)
self._attr_available = False
else:
self._async_state_callback(state)
@abstractmethod
def _apply_state(self) -> None:
raise NotImplementedError
@callback
def _async_state_callback(self, state: dict) -> None:
"""Process a state change."""
self._initialized = True
if not self.available:
_LOGGER.info("Entity %s has come back", self.entity_id)
self._attr_available = True
_LOGGER.debug(
"Device state for %s (%s) is:\n%s", self.name, self.entity_id, state
)
self._device.state = state
self._apply_state()
@callback
def _async_bpup_callback(self, json_msg: dict) -> None:
"""Process a state change from BPUP."""
topic = json_msg["t"]
if topic != f"devices/{self._device_id}/state":
return
self._async_state_callback(json_msg["b"])
self.async_write_ha_state()
async def async_added_to_hass(self) -> None:
"""Subscribe to BPUP and start polling."""
await super().async_added_to_hass()
self._bpup_subs.subscribe(self._device_id, self._async_bpup_callback)
self._async_schedule_bpup_alive_or_poll()
@callback
def _async_schedule_bpup_alive_or_poll(self) -> None:
"""Schedule the BPUP alive or poll."""
alive = self._bpup_subs.alive
self._bpup_polling_fallback = async_call_later(
self.hass,
_BPUP_ALIVE_SCAN_INTERVAL if alive else _FALLBACK_SCAN_INTERVAL,
self._async_update_if_bpup_not_alive_job,
)
async def async_will_remove_from_hass(self) -> None:
"""Unsubscribe from BPUP data on remove."""
await super().async_will_remove_from_hass()
self._bpup_subs.unsubscribe(self._device_id, self._async_bpup_callback)
if self._bpup_polling_fallback:
self._bpup_polling_fallback()
self._bpup_polling_fallback = None