core/homeassistant/components/elmax/coordinator.py

169 lines
6.4 KiB
Python

"""Coordinator for the elmax-cloud integration."""
from __future__ import annotations
from asyncio import timeout
from datetime import timedelta
from logging import Logger
from elmax_api.exceptions import (
ElmaxApiError,
ElmaxBadLoginError,
ElmaxBadPinError,
ElmaxNetworkError,
ElmaxPanelBusyError,
)
from elmax_api.http import Elmax, GenericElmax
from elmax_api.model.actuator import Actuator
from elmax_api.model.area import Area
from elmax_api.model.cover import Cover
from elmax_api.model.endpoint import DeviceEndpoint
from elmax_api.model.panel import PanelEntry, PanelStatus
from elmax_api.push.push import PushNotificationHandler
from httpx import ConnectError, ConnectTimeout
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed, HomeAssistantError
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DEFAULT_TIMEOUT
class ElmaxCoordinator(DataUpdateCoordinator[PanelStatus]):
"""Coordinator helper to handle Elmax API polling."""
_state_by_endpoint: dict[str, Actuator | Area | Cover | DeviceEndpoint]
def __init__(
self,
hass: HomeAssistant,
logger: Logger,
elmax_api_client: GenericElmax,
panel: PanelEntry,
name: str,
update_interval: timedelta,
) -> None:
"""Instantiate the object."""
self._client = elmax_api_client
self._panel_entry = panel
self._state_by_endpoint = {}
self._push_notification_handler = None
super().__init__(
hass=hass, logger=logger, name=name, update_interval=update_interval
)
@property
def panel_entry(self) -> PanelEntry:
"""Return the panel entry."""
return self._panel_entry
def get_actuator_state(self, actuator_id: str) -> Actuator:
"""Return state of a specific actuator."""
if self._state_by_endpoint is not None:
return self._state_by_endpoint[actuator_id]
raise HomeAssistantError("Unknown actuator")
def get_zone_state(self, zone_id: str) -> Actuator:
"""Return state of a specific zone."""
if self._state_by_endpoint is not None:
return self._state_by_endpoint[zone_id]
raise HomeAssistantError("Unknown zone")
def get_area_state(self, area_id: str) -> Area:
"""Return state of a specific area."""
if self._state_by_endpoint is not None and area_id:
return self._state_by_endpoint[area_id]
raise HomeAssistantError("Unknown area")
def get_cover_state(self, cover_id: str) -> Cover:
"""Return state of a specific cover."""
if self._state_by_endpoint is not None:
return self._state_by_endpoint[cover_id]
raise HomeAssistantError("Unknown cover")
@property
def http_client(self):
"""Return the current http client being used by this instance."""
return self._client
@http_client.setter
def http_client(self, client: GenericElmax):
"""Set the client library instance for Elmax API."""
self._client = client
async def _async_update_data(self):
try:
async with timeout(DEFAULT_TIMEOUT):
# The following command might fail in case of the panel is offline.
# We handle this case in the following exception blocks.
status = await self._client.get_current_panel_status()
except ElmaxBadPinError as err:
raise ConfigEntryAuthFailed("Control panel pin was refused") from err
except ElmaxBadLoginError as err:
raise ConfigEntryAuthFailed("Refused username/password/pin") from err
except ElmaxApiError as err:
raise UpdateFailed(f"Error communicating with ELMAX API: {err}") from err
except ElmaxPanelBusyError as err:
raise UpdateFailed(
"Communication with the panel failed, as it is currently busy"
) from err
except (ConnectError, ConnectTimeout, ElmaxNetworkError) as err:
if isinstance(self._client, Elmax):
raise UpdateFailed(
"A communication error has occurred. "
"Make sure HA can reach the internet and that "
"your firewall allows communication with the Meross Cloud."
) from err
raise UpdateFailed(
"A communication error has occurred. "
"Make sure the panel is online and that "
"your firewall allows communication with it."
) from err
# Store a dictionary for fast endpoint state access
self._state_by_endpoint = {k.endpoint_id: k for k in status.all_endpoints}
# If panel supports it and a it hasn't been registered yet, register the push notification handler
if status.push_feature and self._push_notification_handler is None:
self._register_push_notification_handler()
self._fire_data_update(status)
return status
def _fire_data_update(self, status: PanelStatus):
# Store a dictionary for fast endpoint state access
self._state_by_endpoint = {k.endpoint_id: k for k in status.all_endpoints}
self.async_set_updated_data(status)
def _register_push_notification_handler(self):
ws_ep = (
f"{'wss' if self.http_client.base_url.scheme == 'https' else 'ws'}"
f"://{self.http_client.base_url.host}"
f":{self.http_client.base_url.port}"
f"{self.http_client.base_url.path}/push"
)
self._push_notification_handler = PushNotificationHandler(
endpoint=str(ws_ep),
http_client=self.http_client,
ssl_context=self.http_client.ssl_context,
)
self._push_notification_handler.register_push_notification_handler(
self._push_handler
)
self._push_notification_handler.start(loop=self.hass.loop)
async def _push_handler(self, status: PanelStatus) -> None:
self._fire_data_update(status)
async def async_shutdown(self) -> None:
"""Cancel any scheduled call, and ignore new runs."""
if self._push_notification_handler is not None:
self._push_notification_handler.unregister_push_notification_handler(
self._push_handler
)
self._push_notification_handler.stop()
self._push_notification_handler = None
return await super().async_shutdown()