mirror of https://github.com/home-assistant/core
169 lines
6.4 KiB
Python
169 lines
6.4 KiB
Python
"""Coordinator for the elmax-cloud integration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from asyncio import timeout
|
|
from datetime import timedelta
|
|
from logging import Logger
|
|
|
|
from elmax_api.exceptions import (
|
|
ElmaxApiError,
|
|
ElmaxBadLoginError,
|
|
ElmaxBadPinError,
|
|
ElmaxNetworkError,
|
|
ElmaxPanelBusyError,
|
|
)
|
|
from elmax_api.http import Elmax, GenericElmax
|
|
from elmax_api.model.actuator import Actuator
|
|
from elmax_api.model.area import Area
|
|
from elmax_api.model.cover import Cover
|
|
from elmax_api.model.endpoint import DeviceEndpoint
|
|
from elmax_api.model.panel import PanelEntry, PanelStatus
|
|
from elmax_api.push.push import PushNotificationHandler
|
|
from httpx import ConnectError, ConnectTimeout
|
|
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.exceptions import ConfigEntryAuthFailed, HomeAssistantError
|
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
|
|
|
from .const import DEFAULT_TIMEOUT
|
|
|
|
|
|
class ElmaxCoordinator(DataUpdateCoordinator[PanelStatus]):
|
|
"""Coordinator helper to handle Elmax API polling."""
|
|
|
|
_state_by_endpoint: dict[str, Actuator | Area | Cover | DeviceEndpoint]
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
logger: Logger,
|
|
elmax_api_client: GenericElmax,
|
|
panel: PanelEntry,
|
|
name: str,
|
|
update_interval: timedelta,
|
|
) -> None:
|
|
"""Instantiate the object."""
|
|
self._client = elmax_api_client
|
|
self._panel_entry = panel
|
|
self._state_by_endpoint = {}
|
|
self._push_notification_handler = None
|
|
super().__init__(
|
|
hass=hass, logger=logger, name=name, update_interval=update_interval
|
|
)
|
|
|
|
@property
|
|
def panel_entry(self) -> PanelEntry:
|
|
"""Return the panel entry."""
|
|
return self._panel_entry
|
|
|
|
def get_actuator_state(self, actuator_id: str) -> Actuator:
|
|
"""Return state of a specific actuator."""
|
|
if self._state_by_endpoint is not None:
|
|
return self._state_by_endpoint[actuator_id]
|
|
raise HomeAssistantError("Unknown actuator")
|
|
|
|
def get_zone_state(self, zone_id: str) -> Actuator:
|
|
"""Return state of a specific zone."""
|
|
if self._state_by_endpoint is not None:
|
|
return self._state_by_endpoint[zone_id]
|
|
raise HomeAssistantError("Unknown zone")
|
|
|
|
def get_area_state(self, area_id: str) -> Area:
|
|
"""Return state of a specific area."""
|
|
if self._state_by_endpoint is not None and area_id:
|
|
return self._state_by_endpoint[area_id]
|
|
raise HomeAssistantError("Unknown area")
|
|
|
|
def get_cover_state(self, cover_id: str) -> Cover:
|
|
"""Return state of a specific cover."""
|
|
if self._state_by_endpoint is not None:
|
|
return self._state_by_endpoint[cover_id]
|
|
raise HomeAssistantError("Unknown cover")
|
|
|
|
@property
|
|
def http_client(self):
|
|
"""Return the current http client being used by this instance."""
|
|
return self._client
|
|
|
|
@http_client.setter
|
|
def http_client(self, client: GenericElmax):
|
|
"""Set the client library instance for Elmax API."""
|
|
self._client = client
|
|
|
|
async def _async_update_data(self):
|
|
try:
|
|
async with timeout(DEFAULT_TIMEOUT):
|
|
# The following command might fail in case of the panel is offline.
|
|
# We handle this case in the following exception blocks.
|
|
status = await self._client.get_current_panel_status()
|
|
|
|
except ElmaxBadPinError as err:
|
|
raise ConfigEntryAuthFailed("Control panel pin was refused") from err
|
|
except ElmaxBadLoginError as err:
|
|
raise ConfigEntryAuthFailed("Refused username/password/pin") from err
|
|
except ElmaxApiError as err:
|
|
raise UpdateFailed(f"Error communicating with ELMAX API: {err}") from err
|
|
except ElmaxPanelBusyError as err:
|
|
raise UpdateFailed(
|
|
"Communication with the panel failed, as it is currently busy"
|
|
) from err
|
|
except (ConnectError, ConnectTimeout, ElmaxNetworkError) as err:
|
|
if isinstance(self._client, Elmax):
|
|
raise UpdateFailed(
|
|
"A communication error has occurred. "
|
|
"Make sure HA can reach the internet and that "
|
|
"your firewall allows communication with the Meross Cloud."
|
|
) from err
|
|
|
|
raise UpdateFailed(
|
|
"A communication error has occurred. "
|
|
"Make sure the panel is online and that "
|
|
"your firewall allows communication with it."
|
|
) from err
|
|
|
|
# Store a dictionary for fast endpoint state access
|
|
self._state_by_endpoint = {k.endpoint_id: k for k in status.all_endpoints}
|
|
|
|
# If panel supports it and a it hasn't been registered yet, register the push notification handler
|
|
if status.push_feature and self._push_notification_handler is None:
|
|
self._register_push_notification_handler()
|
|
|
|
self._fire_data_update(status)
|
|
return status
|
|
|
|
def _fire_data_update(self, status: PanelStatus):
|
|
# Store a dictionary for fast endpoint state access
|
|
self._state_by_endpoint = {k.endpoint_id: k for k in status.all_endpoints}
|
|
self.async_set_updated_data(status)
|
|
|
|
def _register_push_notification_handler(self):
|
|
ws_ep = (
|
|
f"{'wss' if self.http_client.base_url.scheme == 'https' else 'ws'}"
|
|
f"://{self.http_client.base_url.host}"
|
|
f":{self.http_client.base_url.port}"
|
|
f"{self.http_client.base_url.path}/push"
|
|
)
|
|
self._push_notification_handler = PushNotificationHandler(
|
|
endpoint=str(ws_ep),
|
|
http_client=self.http_client,
|
|
ssl_context=self.http_client.ssl_context,
|
|
)
|
|
self._push_notification_handler.register_push_notification_handler(
|
|
self._push_handler
|
|
)
|
|
self._push_notification_handler.start(loop=self.hass.loop)
|
|
|
|
async def _push_handler(self, status: PanelStatus) -> None:
|
|
self._fire_data_update(status)
|
|
|
|
async def async_shutdown(self) -> None:
|
|
"""Cancel any scheduled call, and ignore new runs."""
|
|
if self._push_notification_handler is not None:
|
|
self._push_notification_handler.unregister_push_notification_handler(
|
|
self._push_handler
|
|
)
|
|
self._push_notification_handler.stop()
|
|
self._push_notification_handler = None
|
|
return await super().async_shutdown()
|