core/homeassistant/components/eafm/coordinator.py

58 lines
2.0 KiB
Python

"""UK Environment Agency Flood Monitoring Integration."""
import asyncio
from datetime import timedelta
import logging
from typing import Any
from aioeafm import get_station
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
PLATFORMS = [Platform.SENSOR]
_LOGGER = logging.getLogger(__name__)
type EafmConfigEntry = ConfigEntry[EafmCoordinator]
def _get_measures(station_data: dict[str, Any]) -> list[dict[str, Any]]:
"""Force measure key to always be a list."""
if "measures" not in station_data:
return []
if isinstance(station_data["measures"], dict):
return [station_data["measures"]]
return station_data["measures"]
class EafmCoordinator(DataUpdateCoordinator[dict[str, dict[str, Any]]]):
"""Class to manage fetching UK Flood Monitoring data."""
def __init__(self, hass: HomeAssistant, entry: EafmConfigEntry) -> None:
"""Initialize."""
self._station_key = entry.data["station"]
self._session = async_get_clientsession(hass=hass)
super().__init__(
hass,
_LOGGER,
config_entry=entry,
name="sensor",
update_interval=timedelta(seconds=15 * 60),
)
async def _async_update_data(self) -> dict[str, dict[str, Any]]:
"""Fetch the latest data from the source."""
# DataUpdateCoordinator will handle aiohttp ClientErrors and timeouts
async with asyncio.timeout(30):
data = await get_station(self._session, self._station_key)
measures = _get_measures(data)
# Turn data.measures into a dict rather than a list so easier for entities to
# find themselves.
data["measures"] = {measure["@id"]: measure for measure in measures}
return data