core/homeassistant/components/wemo/coordinator.py

321 lines
12 KiB
Python

"""Home Assistant wrapper for a pyWeMo device."""
from __future__ import annotations
import asyncio
from dataclasses import dataclass, fields
from datetime import timedelta
from functools import partial
import logging
from typing import TYPE_CHECKING, Literal
from pywemo import Insight, LongPressMixin, WeMoDevice
from pywemo.exceptions import ActionException, PyWeMoException
from pywemo.subscribe import EVENT_TYPE_LONG_PRESS, SubscriptionRegistry
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
ATTR_CONFIGURATION_URL,
ATTR_IDENTIFIERS,
CONF_DEVICE_ID,
CONF_NAME,
CONF_PARAMS,
CONF_TYPE,
CONF_UNIQUE_ID,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.device_registry import CONNECTION_UPNP, DeviceInfo
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DOMAIN, WEMO_SUBSCRIPTION_EVENT
from .models import async_wemo_data
_LOGGER = logging.getLogger(__name__)
# Literal values must match options.error keys from strings.json.
type ErrorStringKey = Literal["long_press_requires_subscription"]
# Literal values must match options.step.init.data keys from strings.json.
type OptionsFieldKey = Literal["enable_subscription", "enable_long_press"]
class OptionsValidationError(Exception):
"""Error validating options."""
def __init__(
self, field_key: OptionsFieldKey, error_key: ErrorStringKey, message: str
) -> None:
"""Store field and error_key so the exception handler can used them.
The field_key and error_key strings must be the same as in strings.json.
Args:
field_key: Name of the options.step.init.data key that corresponds to this error.
field_key must also match one of the field names inside the Options class.
error_key: Name of the options.error key that corresponds to this error.
message: Message for the Exception class.
"""
super().__init__(message)
self.field_key = field_key
self.error_key = error_key
@dataclass(frozen=True)
class Options:
"""Configuration options for the DeviceCoordinator class.
Note: The field names must match the keys (OptionsFieldKey)
from options.step.init.data in strings.json.
"""
# Subscribe to device local push updates.
enable_subscription: bool = True
# Register for device long-press events.
enable_long_press: bool = True
def __post_init__(self) -> None:
"""Validate parameters."""
if not self.enable_subscription and self.enable_long_press:
raise OptionsValidationError(
"enable_subscription",
"long_press_requires_subscription",
"Local push update subscriptions must be enabled to use long-press events",
)
class DeviceCoordinator(DataUpdateCoordinator[None]):
"""Home Assistant wrapper for a pyWeMo device."""
options: Options | None = None
def __init__(self, hass: HomeAssistant, wemo: WeMoDevice) -> None:
"""Initialize DeviceCoordinator."""
super().__init__(
hass,
_LOGGER,
name=wemo.name,
update_interval=timedelta(seconds=30),
)
self.hass = hass
self.wemo = wemo
self.device_id: str | None = None
self.device_info = _create_device_info(wemo)
self.supports_long_press = isinstance(wemo, LongPressMixin)
self.update_lock = asyncio.Lock()
@callback
def async_setup(self, device_id: str) -> None:
"""Set up the device coordinator."""
self.device_id = device_id
def subscription_callback(
self, _device: WeMoDevice, event_type: str, params: str
) -> None:
"""Receives push notifications from WeMo devices."""
_LOGGER.debug("Subscription event (%s) for %s", event_type, self.wemo.name)
if event_type == EVENT_TYPE_LONG_PRESS:
self.hass.bus.fire(
WEMO_SUBSCRIPTION_EVENT,
{
CONF_DEVICE_ID: self.device_id,
CONF_NAME: self.wemo.name,
CONF_TYPE: event_type,
CONF_PARAMS: params,
CONF_UNIQUE_ID: self.wemo.serial_number,
},
)
else:
updated = self.wemo.subscription_update(event_type, params)
self.hass.loop.call_soon_threadsafe(
partial(
self.hass.async_create_background_task,
self._async_subscription_callback(updated),
f"{self.name} subscription_callback",
eager_start=True,
)
)
async def async_shutdown(self) -> None:
"""Unregister push subscriptions and remove from coordinators dict."""
if self._shutdown_requested:
return
await super().async_shutdown()
if TYPE_CHECKING:
# mypy doesn't known that the device_id is set in async_setup.
assert self.device_id is not None
del _async_coordinators(self.hass)[self.device_id]
assert self.options # Always set by async_register_device.
if self.options.enable_subscription:
await self._async_set_enable_subscription(False)
# Check that the device is available (last_update_success) before disabling long
# press. That avoids long shutdown times for devices that are no longer connected.
if self.options.enable_long_press and self.last_update_success:
await self._async_set_enable_long_press(False)
async def _async_set_enable_subscription(self, enable_subscription: bool) -> None:
"""Turn on/off push updates from the device."""
registry = _async_registry(self.hass)
if enable_subscription:
registry.on(self.wemo, None, self.subscription_callback)
await self.hass.async_add_executor_job(registry.register, self.wemo)
elif self.options is not None:
await self.hass.async_add_executor_job(registry.unregister, self.wemo)
async def _async_set_enable_long_press(self, enable_long_press: bool) -> None:
"""Turn on/off long-press events from the device."""
if not (isinstance(self.wemo, LongPressMixin) and self.supports_long_press):
return
try:
if enable_long_press:
await self.hass.async_add_executor_job(
self.wemo.ensure_long_press_virtual_device
)
elif self.options is not None:
await self.hass.async_add_executor_job(
self.wemo.remove_long_press_virtual_device
)
except PyWeMoException:
_LOGGER.exception(
"Failed to enable long press support for device: %s", self.wemo.name
)
self.supports_long_press = False
async def async_set_options(
self, hass: HomeAssistant, config_entry: ConfigEntry
) -> None:
"""Update the configuration options for the device."""
options = Options(**config_entry.options)
_LOGGER.debug(
"async_set_options old(%s) new(%s)", repr(self.options), repr(options)
)
for field in fields(options):
new_value = getattr(options, field.name)
if self.options is None or getattr(self.options, field.name) != new_value:
# The value changed, call the _async_set_* method for the option.
await getattr(self, f"_async_set_{field.name}")(new_value)
self.options = options
async def _async_subscription_callback(self, updated: bool) -> None:
"""Update the state by the Wemo device."""
# If an update is in progress, we don't do anything.
if self.update_lock.locked():
return
try:
await self._async_locked_update(not updated)
except UpdateFailed as err:
self.last_exception = err
if self.last_update_success:
_LOGGER.exception("Subscription callback failed")
self.last_update_success = False
except Exception as err:
self.last_exception = err
self.last_update_success = False
_LOGGER.exception("Unexpected error fetching %s data", self.name)
else:
self.async_set_updated_data(None)
@property
def should_poll(self) -> bool:
"""Return True if polling is needed to update the state for the device.
The alternative, when this returns False, is to rely on the subscription
"push updates" to update the device state in Home Assistant.
"""
if isinstance(self.wemo, Insight) and self.wemo.get_state() == 0:
# The WeMo Insight device does not send subscription updates for the
# insight_params values when the device is off. Polling is required in
# this case so the Sensor entities are properly populated.
return True
return not (
_async_registry(self.hass).is_subscribed(self.wemo)
and self.last_update_success
)
async def _async_update_data(self) -> None:
"""Update WeMo state."""
# No need to poll if the device will push updates.
# The device_id will not be set until after the first
# update so we should not check should_poll until after
# the device_id is set.
if self.device_id and not self.should_poll:
return
# If an update is in progress, we don't do anything.
if self.update_lock.locked():
return
await self._async_locked_update(True)
async def _async_locked_update(self, force_update: bool) -> None:
"""Try updating within an async lock."""
async with self.update_lock:
try:
await self.hass.async_add_executor_job(
self.wemo.get_state, force_update
)
except ActionException as err:
raise UpdateFailed("WeMo update failed") from err
def _create_device_info(wemo: WeMoDevice) -> DeviceInfo:
"""Create device information. Modify if special device."""
_dev_info = _device_info(wemo)
if wemo.model_name.lower() == "dli emulated belkin socket":
_dev_info[ATTR_CONFIGURATION_URL] = f"http://{wemo.host}"
_dev_info[ATTR_IDENTIFIERS] = {(DOMAIN, wemo.serial_number[:-1])}
return _dev_info
def _device_info(wemo: WeMoDevice) -> DeviceInfo:
return DeviceInfo(
connections={(CONNECTION_UPNP, wemo.udn)},
identifiers={(DOMAIN, wemo.serial_number)},
manufacturer="Belkin",
model=wemo.model_name,
model_id=wemo.model,
name=wemo.name,
sw_version=wemo.firmware_version,
)
async def async_register_device(
hass: HomeAssistant, config_entry: ConfigEntry, wemo: WeMoDevice
) -> DeviceCoordinator:
"""Register a device with home assistant and enable pywemo event callbacks."""
device = DeviceCoordinator(hass, wemo)
await device.async_refresh()
if not device.last_update_success and device.last_exception:
raise device.last_exception
device_registry = dr.async_get(hass)
entry = device_registry.async_get_or_create(
config_entry_id=config_entry.entry_id, **_create_device_info(wemo)
)
device.async_setup(device_id=entry.id)
_async_coordinators(hass)[entry.id] = device
config_entry.async_on_unload(
config_entry.add_update_listener(device.async_set_options)
)
await device.async_set_options(hass, config_entry)
return device
@callback
def async_get_coordinator(hass: HomeAssistant, device_id: str) -> DeviceCoordinator:
"""Return DeviceCoordinator for device_id."""
return _async_coordinators(hass)[device_id]
@callback
def _async_coordinators(hass: HomeAssistant) -> dict[str, DeviceCoordinator]:
return async_wemo_data(hass).config_entry_data.device_coordinators
@callback
def _async_registry(hass: HomeAssistant) -> SubscriptionRegistry:
return async_wemo_data(hass).registry