core/homeassistant/components/risco/__init__.py

184 lines
5.9 KiB
Python

"""The Risco integration."""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass, field
import logging
from typing import Any
from pyrisco import CannotConnectError, RiscoCloud, RiscoLocal, UnauthorizedError
from pyrisco.common import Partition, System, Zone
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_HOST,
CONF_PASSWORD,
CONF_PIN,
CONF_PORT,
CONF_SCAN_INTERVAL,
CONF_TYPE,
CONF_USERNAME,
Platform,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.dispatcher import async_dispatcher_send
from .const import (
CONF_CONCURRENCY,
DATA_COORDINATOR,
DEFAULT_CONCURRENCY,
DEFAULT_SCAN_INTERVAL,
DOMAIN,
EVENTS_COORDINATOR,
SYSTEM_UPDATE_SIGNAL,
TYPE_LOCAL,
)
from .coordinator import RiscoDataUpdateCoordinator, RiscoEventsDataUpdateCoordinator
PLATFORMS = [
Platform.ALARM_CONTROL_PANEL,
Platform.BINARY_SENSOR,
Platform.SENSOR,
Platform.SWITCH,
]
_LOGGER = logging.getLogger(__name__)
@dataclass
class LocalData:
"""A data class for local data passed to the platforms."""
system: RiscoLocal
partition_updates: dict[int, Callable[[], Any]] = field(default_factory=dict)
def is_local(entry: ConfigEntry) -> bool:
"""Return whether the entry represents an instance with local communication."""
return entry.data.get(CONF_TYPE) == TYPE_LOCAL
def zone_update_signal(zone_id: int) -> str:
"""Return a signal for the dispatch of a zone update."""
return f"risco_zone_update_{zone_id}"
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Risco from a config entry."""
if is_local(entry):
return await _async_setup_local_entry(hass, entry)
return await _async_setup_cloud_entry(hass, entry)
async def _async_setup_local_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
data = entry.data
concurrency = entry.options.get(CONF_CONCURRENCY, DEFAULT_CONCURRENCY)
risco = RiscoLocal(
data[CONF_HOST], data[CONF_PORT], data[CONF_PIN], concurrency=concurrency
)
try:
await risco.connect()
except CannotConnectError as error:
raise ConfigEntryNotReady from error
except UnauthorizedError:
_LOGGER.exception("Failed to login to Risco cloud")
return False
async def _error(error: Exception) -> None:
_LOGGER.error("Error in Risco library", exc_info=error)
if isinstance(error, ConnectionResetError) and not hass.is_stopping:
_LOGGER.debug("Disconnected from panel. Reloading integration")
hass.async_create_task(hass.config_entries.async_reload(entry.entry_id))
entry.async_on_unload(risco.add_error_handler(_error))
async def _default(command: str, result: str, *params: list[str]) -> None:
_LOGGER.debug(
"Unhandled update from Risco library: %s, %s, %s", command, result, params
)
entry.async_on_unload(risco.add_default_handler(_default))
local_data = LocalData(risco)
async def _zone(zone_id: int, zone: Zone) -> None:
_LOGGER.debug("Risco zone update for %d", zone_id)
async_dispatcher_send(hass, zone_update_signal(zone_id))
entry.async_on_unload(risco.add_zone_handler(_zone))
async def _partition(partition_id: int, partition: Partition) -> None:
_LOGGER.debug("Risco partition update for %d", partition_id)
callback = local_data.partition_updates.get(partition_id)
if callback:
callback()
entry.async_on_unload(risco.add_partition_handler(_partition))
async def _system(system: System) -> None:
_LOGGER.debug("Risco system update")
async_dispatcher_send(hass, SYSTEM_UPDATE_SIGNAL)
entry.async_on_unload(risco.add_system_handler(_system))
entry.async_on_unload(entry.add_update_listener(_update_listener))
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][entry.entry_id] = local_data
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def _async_setup_cloud_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
data = entry.data
risco = RiscoCloud(data[CONF_USERNAME], data[CONF_PASSWORD], data[CONF_PIN])
try:
await risco.login(async_get_clientsession(hass))
except CannotConnectError as error:
raise ConfigEntryNotReady from error
except UnauthorizedError as error:
raise ConfigEntryAuthFailed from error
scan_interval = entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
coordinator = RiscoDataUpdateCoordinator(hass, risco, scan_interval)
await coordinator.async_config_entry_first_refresh()
events_coordinator = RiscoEventsDataUpdateCoordinator(
hass, risco, entry.entry_id, 60
)
entry.async_on_unload(entry.add_update_listener(_update_listener))
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][entry.entry_id] = {
DATA_COORDINATOR: coordinator,
EVENTS_COORDINATOR: events_coordinator,
}
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
await events_coordinator.async_refresh()
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
if is_local(entry):
local_data: LocalData = hass.data[DOMAIN][entry.entry_id]
await local_data.system.disconnect()
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok
async def _update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Handle options update."""
await hass.config_entries.async_reload(entry.entry_id)