mirror of https://github.com/home-assistant/core
216 lines
7.4 KiB
Python
216 lines
7.4 KiB
Python
"""Data coordinators for the ring integration."""
|
|
|
|
from asyncio import TaskGroup
|
|
from collections.abc import Callable, Coroutine
|
|
import logging
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
from ring_doorbell import (
|
|
AuthenticationError,
|
|
Ring,
|
|
RingDevices,
|
|
RingError,
|
|
RingEvent,
|
|
RingTimeout,
|
|
)
|
|
from ring_doorbell.listen import RingEventListener
|
|
|
|
from homeassistant import config_entries
|
|
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
|
|
from homeassistant.exceptions import ConfigEntryAuthFailed
|
|
from homeassistant.helpers.update_coordinator import (
|
|
BaseDataUpdateCoordinatorProtocol,
|
|
DataUpdateCoordinator,
|
|
UpdateFailed,
|
|
)
|
|
|
|
from .const import SCAN_INTERVAL
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
async def _call_api[*_Ts, _R](
|
|
hass: HomeAssistant,
|
|
target: Callable[[*_Ts], Coroutine[Any, Any, _R]],
|
|
*args: *_Ts,
|
|
msg_suffix: str = "",
|
|
) -> _R:
|
|
try:
|
|
return await target(*args)
|
|
except AuthenticationError as err:
|
|
# Raising ConfigEntryAuthFailed will cancel future updates
|
|
# and start a config flow with SOURCE_REAUTH (async_step_reauth)
|
|
raise ConfigEntryAuthFailed from err
|
|
except RingTimeout as err:
|
|
raise UpdateFailed(
|
|
f"Timeout communicating with API{msg_suffix}: {err}"
|
|
) from err
|
|
except RingError as err:
|
|
raise UpdateFailed(f"Error communicating with API{msg_suffix}: {err}") from err
|
|
|
|
|
|
class RingDataCoordinator(DataUpdateCoordinator[RingDevices]):
|
|
"""Base class for device coordinators."""
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
ring_api: Ring,
|
|
) -> None:
|
|
"""Initialize my coordinator."""
|
|
super().__init__(
|
|
hass,
|
|
name="devices",
|
|
logger=_LOGGER,
|
|
update_interval=SCAN_INTERVAL,
|
|
)
|
|
self.ring_api: Ring = ring_api
|
|
self.first_call: bool = True
|
|
|
|
async def _async_update_data(self) -> RingDevices:
|
|
"""Fetch data from API endpoint."""
|
|
update_method: str = (
|
|
"async_update_data" if self.first_call else "async_update_devices"
|
|
)
|
|
await _call_api(self.hass, getattr(self.ring_api, update_method))
|
|
self.first_call = False
|
|
devices: RingDevices = self.ring_api.devices()
|
|
subscribed_device_ids = set(self.async_contexts())
|
|
for device in devices.all_devices:
|
|
# Don't update all devices in the ring api, only those that set
|
|
# their device id as context when they subscribed.
|
|
if device.id in subscribed_device_ids:
|
|
try:
|
|
async with TaskGroup() as tg:
|
|
if device.has_capability("history"):
|
|
tg.create_task(
|
|
_call_api(
|
|
self.hass,
|
|
lambda device: device.async_history(limit=10),
|
|
device,
|
|
msg_suffix=f" for device {device.name}", # device_id is the mac
|
|
)
|
|
)
|
|
tg.create_task(
|
|
_call_api(
|
|
self.hass,
|
|
device.async_update_health_data,
|
|
msg_suffix=f" for device {device.name}",
|
|
)
|
|
)
|
|
except ExceptionGroup as eg:
|
|
raise eg.exceptions[0] # noqa: B904
|
|
|
|
return devices
|
|
|
|
|
|
class RingListenCoordinator(BaseDataUpdateCoordinatorProtocol):
|
|
"""Global notifications coordinator."""
|
|
|
|
config_entry: config_entries.ConfigEntry
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
ring_api: Ring,
|
|
listen_credentials: dict[str, Any] | None,
|
|
listen_credentials_updater: Callable[[dict[str, Any]], None],
|
|
) -> None:
|
|
"""Initialize my coordinator."""
|
|
self.hass = hass
|
|
self.logger = _LOGGER
|
|
self.ring_api: Ring = ring_api
|
|
self.event_listener = RingEventListener(
|
|
ring_api, listen_credentials, listen_credentials_updater
|
|
)
|
|
self._listeners: dict[CALLBACK_TYPE, tuple[CALLBACK_TYPE, object | None]] = {}
|
|
self._listen_callback_id: int | None = None
|
|
|
|
config_entry = config_entries.current_entry.get()
|
|
if TYPE_CHECKING:
|
|
assert config_entry
|
|
self.config_entry = config_entry
|
|
self.start_timeout = 10
|
|
self.config_entry.async_on_unload(self.async_shutdown)
|
|
self.index_alerts()
|
|
|
|
def index_alerts(self) -> None:
|
|
"Index the active alerts."
|
|
self.alerts = {
|
|
(alert.doorbot_id, alert.kind): alert
|
|
for alert in self.ring_api.active_alerts()
|
|
}
|
|
|
|
async def async_shutdown(self) -> None:
|
|
"""Cancel any scheduled call, and ignore new runs."""
|
|
if self.event_listener.started:
|
|
await self._async_stop_listen()
|
|
|
|
async def _async_stop_listen(self) -> None:
|
|
self.logger.debug("Stopped ring listener")
|
|
await self.event_listener.stop()
|
|
self.logger.debug("Stopped ring listener")
|
|
|
|
async def _async_start_listen(self) -> None:
|
|
"""Start listening for realtime events."""
|
|
self.logger.debug("Starting ring listener.")
|
|
await self.event_listener.start(
|
|
timeout=self.start_timeout,
|
|
)
|
|
if self.event_listener.started is True:
|
|
self.logger.debug("Started ring listener")
|
|
else:
|
|
self.logger.warning(
|
|
"Ring event listener failed to start after %s seconds",
|
|
self.start_timeout,
|
|
)
|
|
self._listen_callback_id = self.event_listener.add_notification_callback(
|
|
self._on_event
|
|
)
|
|
self.index_alerts()
|
|
# Update the listeners so they switch from Unavailable to Unknown
|
|
self._async_update_listeners()
|
|
|
|
def _on_event(self, event: RingEvent) -> None:
|
|
self.logger.debug("Ring event received: %s", event)
|
|
self.index_alerts()
|
|
self._async_update_listeners(event.doorbot_id)
|
|
|
|
@callback
|
|
def _async_update_listeners(self, doorbot_id: int | None = None) -> None:
|
|
"""Update all registered listeners."""
|
|
for update_callback, device_api_id in list(self._listeners.values()):
|
|
if not doorbot_id or device_api_id == doorbot_id:
|
|
update_callback()
|
|
|
|
@callback
|
|
def async_add_listener(
|
|
self, update_callback: CALLBACK_TYPE, context: Any = None
|
|
) -> Callable[[], None]:
|
|
"""Listen for data updates."""
|
|
start_listen = not self._listeners
|
|
|
|
@callback
|
|
def remove_listener() -> None:
|
|
"""Remove update listener."""
|
|
self._listeners.pop(remove_listener)
|
|
if not self._listeners:
|
|
self.config_entry.async_create_task(
|
|
self.hass,
|
|
self._async_stop_listen(),
|
|
"Ring event listener stop",
|
|
eager_start=True,
|
|
)
|
|
|
|
self._listeners[remove_listener] = (update_callback, context)
|
|
|
|
# This is the first listener, start the event listener.
|
|
if start_listen:
|
|
self.config_entry.async_create_task(
|
|
self.hass,
|
|
self._async_start_listen(),
|
|
"Ring event listener start",
|
|
eager_start=True,
|
|
)
|
|
return remove_listener
|