mirror of https://github.com/home-assistant/core
68 lines
2.0 KiB
Python
68 lines
2.0 KiB
Python
"""Coordinator for the Omnilogic Integration."""
|
|
|
|
from datetime import timedelta
|
|
import logging
|
|
from typing import Any
|
|
|
|
from omnilogic import OmniLogic, OmniLogicException
|
|
|
|
from homeassistant.config_entries import ConfigEntry
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
|
|
|
from .const import ALL_ITEM_KINDS
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class OmniLogicUpdateCoordinator(DataUpdateCoordinator[dict[tuple, dict[str, Any]]]):
|
|
"""Class to manage fetching update data from single endpoint."""
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
api: OmniLogic,
|
|
name: str,
|
|
config_entry: ConfigEntry,
|
|
polling_interval: int,
|
|
) -> None:
|
|
"""Initialize the global Omnilogic data updater."""
|
|
self.api = api
|
|
self.config_entry = config_entry
|
|
|
|
super().__init__(
|
|
hass=hass,
|
|
logger=_LOGGER,
|
|
name=name,
|
|
update_interval=timedelta(seconds=polling_interval),
|
|
)
|
|
|
|
async def _async_update_data(self):
|
|
"""Fetch data from OmniLogic."""
|
|
try:
|
|
data = await self.api.get_telemetry_data()
|
|
|
|
except OmniLogicException as error:
|
|
raise UpdateFailed(f"Error updating from OmniLogic: {error}") from error
|
|
|
|
parsed_data = {}
|
|
|
|
def get_item_data(item, item_kind, current_id, data):
|
|
"""Get data per kind of Omnilogic API item."""
|
|
if isinstance(item, list):
|
|
for single_item in item:
|
|
data = get_item_data(single_item, item_kind, current_id, data)
|
|
|
|
if "systemId" in item:
|
|
system_id = item["systemId"]
|
|
current_id = (*current_id, item_kind, system_id)
|
|
data[current_id] = item
|
|
|
|
for kind in ALL_ITEM_KINDS:
|
|
if kind in item:
|
|
data = get_item_data(item[kind], kind, current_id, data)
|
|
|
|
return data
|
|
|
|
return get_item_data(data, "Backyard", (), parsed_data)
|