mirror of https://github.com/home-assistant/core
78 lines
2.7 KiB
Python
78 lines
2.7 KiB
Python
"""The Litter-Robot coordinator."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Generator
|
|
from datetime import timedelta
|
|
import logging
|
|
|
|
from pylitterbot import Account, FeederRobot, LitterRobot
|
|
from pylitterbot.exceptions import LitterRobotException, LitterRobotLoginException
|
|
|
|
from homeassistant.config_entries import ConfigEntry
|
|
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.exceptions import ConfigEntryAuthFailed
|
|
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
|
|
|
from .const import DOMAIN
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
UPDATE_INTERVAL = timedelta(minutes=5)
|
|
|
|
type LitterRobotConfigEntry = ConfigEntry[LitterRobotDataUpdateCoordinator]
|
|
|
|
|
|
class LitterRobotDataUpdateCoordinator(DataUpdateCoordinator[None]):
|
|
"""The Litter-Robot data update coordinator."""
|
|
|
|
config_entry: LitterRobotConfigEntry
|
|
|
|
def __init__(
|
|
self, hass: HomeAssistant, config_entry: LitterRobotConfigEntry
|
|
) -> None:
|
|
"""Initialize the Litter-Robot data update coordinator."""
|
|
super().__init__(
|
|
hass,
|
|
_LOGGER,
|
|
config_entry=config_entry,
|
|
name=DOMAIN,
|
|
update_interval=UPDATE_INTERVAL,
|
|
)
|
|
|
|
self.account = Account(websession=async_get_clientsession(hass))
|
|
|
|
async def _async_update_data(self) -> None:
|
|
"""Update all device states from the Litter-Robot API."""
|
|
await self.account.refresh_robots()
|
|
await self.account.load_pets()
|
|
|
|
async def _async_setup(self) -> None:
|
|
"""Set up the coordinator."""
|
|
try:
|
|
await self.account.connect(
|
|
username=self.config_entry.data[CONF_USERNAME],
|
|
password=self.config_entry.data[CONF_PASSWORD],
|
|
load_robots=True,
|
|
subscribe_for_updates=True,
|
|
load_pets=True,
|
|
)
|
|
except LitterRobotLoginException as ex:
|
|
raise ConfigEntryAuthFailed("Invalid credentials") from ex
|
|
except LitterRobotException as ex:
|
|
raise UpdateFailed("Unable to connect to Litter-Robot API") from ex
|
|
|
|
def litter_robots(self) -> Generator[LitterRobot]:
|
|
"""Get Litter-Robots from the account."""
|
|
return (
|
|
robot for robot in self.account.robots if isinstance(robot, LitterRobot)
|
|
)
|
|
|
|
def feeder_robots(self) -> Generator[FeederRobot]:
|
|
"""Get Feeder-Robots from the account."""
|
|
return (
|
|
robot for robot in self.account.robots if isinstance(robot, FeederRobot)
|
|
)
|