core/homeassistant/components/ezviz/coordinator.py

51 lines
1.5 KiB
Python

"""Provides the ezviz DataUpdateCoordinator."""
import asyncio
from datetime import timedelta
import logging
from pyezviz.client import EzvizClient
from pyezviz.exceptions import (
EzvizAuthTokenExpired,
EzvizAuthVerificationCode,
HTTPError,
InvalidURL,
PyEzvizError,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
class EzvizDataUpdateCoordinator(DataUpdateCoordinator):
"""Class to manage fetching EZVIZ data."""
def __init__(
self, hass: HomeAssistant, *, api: EzvizClient, api_timeout: int
) -> None:
"""Initialize global EZVIZ data updater."""
self.ezviz_client = api
self._api_timeout = api_timeout
update_interval = timedelta(seconds=30)
super().__init__(hass, _LOGGER, name=DOMAIN, update_interval=update_interval)
async def _async_update_data(self) -> dict:
"""Fetch data from EZVIZ."""
try:
async with asyncio.timeout(self._api_timeout):
return await self.hass.async_add_executor_job(
self.ezviz_client.load_cameras
)
except (EzvizAuthTokenExpired, EzvizAuthVerificationCode) as error:
raise ConfigEntryAuthFailed from error
except (InvalidURL, HTTPError, PyEzvizError) as error:
raise UpdateFailed(f"Invalid response from API: {error}") from error