core/homeassistant/components/guardian/coordinator.py

81 lines
2.4 KiB
Python

"""Define Guardian-specific utilities."""
from __future__ import annotations
import asyncio
from collections.abc import Callable, Coroutine
from datetime import timedelta
from typing import Any, cast
from aioguardian import Client
from aioguardian.errors import GuardianError
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import LOGGER
DEFAULT_UPDATE_INTERVAL = timedelta(seconds=30)
SIGNAL_REBOOT_REQUESTED = "guardian_reboot_requested_{0}"
class GuardianDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"""Define an extended DataUpdateCoordinator with some Guardian goodies."""
config_entry: ConfigEntry
def __init__(
self,
hass: HomeAssistant,
*,
entry: ConfigEntry,
client: Client,
api_name: str,
api_coro: Callable[[], Coroutine[Any, Any, dict[str, Any]]],
api_lock: asyncio.Lock,
valve_controller_uid: str,
) -> None:
"""Initialize."""
super().__init__(
hass,
LOGGER,
name=f"{valve_controller_uid}_{api_name}",
update_interval=DEFAULT_UPDATE_INTERVAL,
)
self._api_coro = api_coro
self._api_lock = api_lock
self._client = client
self.config_entry = entry
self.signal_reboot_requested = SIGNAL_REBOOT_REQUESTED.format(
self.config_entry.entry_id
)
async def _async_update_data(self) -> dict[str, Any]:
"""Execute a "locked" API request against the valve controller."""
async with self._api_lock, self._client:
try:
resp = await self._api_coro()
except GuardianError as err:
raise UpdateFailed(err) from err
return cast(dict[str, Any], resp["data"])
async def async_initialize(self) -> None:
"""Initialize the coordinator."""
@callback
def async_reboot_requested() -> None:
"""Respond to a reboot request."""
self.last_update_success = False
self.async_update_listeners()
self.config_entry.async_on_unload(
async_dispatcher_connect(
self.hass, self.signal_reboot_requested, async_reboot_requested
)
)