core/homeassistant/components/sensoterra/coordinator.py

55 lines
1.8 KiB
Python

"""Polling coordinator for the Sensoterra integration."""
from collections.abc import Callable
from datetime import timedelta
from sensoterra.customerapi import (
CustomerApi,
InvalidAuth as ApiAuthError,
Timeout as ApiTimeout,
)
from sensoterra.probe import Probe, Sensor
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryError
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import LOGGER, SCAN_INTERVAL_MINUTES
class SensoterraCoordinator(DataUpdateCoordinator[list[Probe]]):
"""Sensoterra coordinator."""
def __init__(self, hass: HomeAssistant, api: CustomerApi) -> None:
"""Initialize Sensoterra coordinator."""
super().__init__(
hass,
LOGGER,
name="Sensoterra probe",
update_interval=timedelta(minutes=SCAN_INTERVAL_MINUTES),
)
self.api = api
self.add_devices_callback: Callable[[list[Probe]], None] | None = None
async def _async_update_data(self) -> list[Probe]:
"""Fetch data from Sensoterra Customer API endpoint."""
try:
probes = await self.api.poll()
except ApiAuthError as err:
raise ConfigEntryError(err) from err
except ApiTimeout as err:
raise UpdateFailed("Timeout communicating with Sensotera API") from err
if self.add_devices_callback is not None:
self.add_devices_callback(probes)
return probes
def get_sensor(self, id: str | None) -> Sensor | None:
"""Try to find the sensor in the API result."""
for probe in self.data:
for sensor in probe.sensors():
if sensor.id == id:
return sensor
return None