mirror of https://github.com/home-assistant/core
280 lines
9.6 KiB
Python
280 lines
9.6 KiB
Python
"""Qbus coordinator."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime
|
|
import logging
|
|
from typing import cast
|
|
|
|
from qbusmqttapi.discovery import QbusDiscovery, QbusMqttDevice, QbusMqttOutput
|
|
from qbusmqttapi.factory import QbusMqttMessageFactory, QbusMqttTopicFactory
|
|
|
|
from homeassistant.components.mqtt import (
|
|
ReceiveMessage,
|
|
async_wait_for_mqtt_client,
|
|
client as mqtt,
|
|
)
|
|
from homeassistant.config_entries import ConfigEntry
|
|
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
|
|
from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant
|
|
from homeassistant.helpers import device_registry as dr
|
|
from homeassistant.helpers.device_registry import format_mac
|
|
from homeassistant.helpers.event import async_call_later
|
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
|
|
from homeassistant.util.hass_dict import HassKey
|
|
|
|
from .const import CONF_SERIAL_NUMBER, DOMAIN, MANUFACTURER
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
type QbusConfigEntry = ConfigEntry[QbusControllerCoordinator]
|
|
QBUS_KEY: HassKey[QbusConfigCoordinator] = HassKey(DOMAIN)
|
|
|
|
|
|
class QbusControllerCoordinator(DataUpdateCoordinator[list[QbusMqttOutput]]):
|
|
"""Qbus data coordinator."""
|
|
|
|
_STATE_REQUEST_DELAY = 3
|
|
|
|
def __init__(self, hass: HomeAssistant, entry: QbusConfigEntry) -> None:
|
|
"""Initialize Qbus coordinator."""
|
|
|
|
_LOGGER.debug("%s - Initializing coordinator", entry.unique_id)
|
|
self.config_entry: QbusConfigEntry
|
|
|
|
super().__init__(
|
|
hass,
|
|
_LOGGER,
|
|
config_entry=entry,
|
|
name=entry.unique_id or entry.entry_id,
|
|
always_update=False,
|
|
)
|
|
|
|
self._message_factory = QbusMqttMessageFactory()
|
|
self._topic_factory = QbusMqttTopicFactory()
|
|
|
|
self._controller_activated = False
|
|
self._subscribed_to_controller_state = False
|
|
self._controller: QbusMqttDevice | None = None
|
|
|
|
# Clean up when HA stops
|
|
self.config_entry.async_on_unload(
|
|
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.shutdown)
|
|
)
|
|
|
|
async def _async_update_data(self) -> list[QbusMqttOutput]:
|
|
return self._controller.outputs if self._controller else []
|
|
|
|
def shutdown(self, event: Event | None = None) -> None:
|
|
"""Shutdown Qbus coordinator."""
|
|
_LOGGER.debug(
|
|
"%s - Shutting down entry coordinator", self.config_entry.unique_id
|
|
)
|
|
|
|
self._controller_activated = False
|
|
self._subscribed_to_controller_state = False
|
|
self._controller = None
|
|
|
|
async def async_update_controller_config(self, config: QbusDiscovery) -> None:
|
|
"""Update the controller based on the config."""
|
|
_LOGGER.debug("%s - Updating config", self.config_entry.unique_id)
|
|
serial = self.config_entry.data.get(CONF_SERIAL_NUMBER, "")
|
|
controller = config.get_device_by_serial(serial)
|
|
|
|
if controller is None:
|
|
_LOGGER.warning(
|
|
"%s - Controller with serial %s not found",
|
|
self.config_entry.unique_id,
|
|
serial,
|
|
)
|
|
return
|
|
|
|
self._controller = controller
|
|
|
|
self._update_device_info()
|
|
await self._async_subscribe_to_controller_state()
|
|
await self.async_refresh()
|
|
self._request_controller_state()
|
|
self._request_entity_states()
|
|
|
|
def _update_device_info(self) -> None:
|
|
if self._controller is None:
|
|
return
|
|
|
|
device_registry = dr.async_get(self.hass)
|
|
device_registry.async_get_or_create(
|
|
config_entry_id=self.config_entry.entry_id,
|
|
identifiers={(DOMAIN, format_mac(self._controller.mac))},
|
|
manufacturer=MANUFACTURER,
|
|
model="CTD3.x",
|
|
name=f"CTD {self._controller.serial_number}",
|
|
serial_number=self._controller.serial_number,
|
|
sw_version=self._controller.version,
|
|
)
|
|
|
|
async def _async_subscribe_to_controller_state(self) -> None:
|
|
if self._controller is None or self._subscribed_to_controller_state is True:
|
|
return
|
|
|
|
controller_state_topic = self._topic_factory.get_device_state_topic(
|
|
self._controller.id
|
|
)
|
|
_LOGGER.debug(
|
|
"%s - Subscribing to %s",
|
|
self.config_entry.unique_id,
|
|
controller_state_topic,
|
|
)
|
|
self._subscribed_to_controller_state = True
|
|
self.config_entry.async_on_unload(
|
|
await mqtt.async_subscribe(
|
|
self.hass,
|
|
controller_state_topic,
|
|
self._controller_state_received,
|
|
)
|
|
)
|
|
|
|
async def _controller_state_received(self, msg: ReceiveMessage) -> None:
|
|
_LOGGER.debug(
|
|
"%s - Receiving controller state %s", self.config_entry.unique_id, msg.topic
|
|
)
|
|
|
|
if self._controller is None or self._controller_activated:
|
|
return
|
|
|
|
state = self._message_factory.parse_device_state(msg.payload)
|
|
|
|
if state and state.properties and state.properties.connectable is False:
|
|
_LOGGER.debug(
|
|
"%s - Activating controller %s", self.config_entry.unique_id, state.id
|
|
)
|
|
self._controller_activated = True
|
|
request = self._message_factory.create_device_activate_request(
|
|
self._controller
|
|
)
|
|
await mqtt.async_publish(self.hass, request.topic, request.payload)
|
|
|
|
def _request_entity_states(self) -> None:
|
|
async def request_state(_: datetime) -> None:
|
|
if self._controller is None:
|
|
return
|
|
|
|
_LOGGER.debug(
|
|
"%s - Requesting %s entity states",
|
|
self.config_entry.unique_id,
|
|
len(self._controller.outputs),
|
|
)
|
|
|
|
request = self._message_factory.create_state_request(
|
|
[item.id for item in self._controller.outputs]
|
|
)
|
|
|
|
await mqtt.async_publish(self.hass, request.topic, request.payload)
|
|
|
|
if self._controller and len(self._controller.outputs) > 0:
|
|
async_call_later(self.hass, self._STATE_REQUEST_DELAY, request_state)
|
|
|
|
def _request_controller_state(self) -> None:
|
|
async def request_controller_state(_: datetime) -> None:
|
|
if self._controller is None:
|
|
return
|
|
|
|
_LOGGER.debug(
|
|
"%s - Requesting controller state", self.config_entry.unique_id
|
|
)
|
|
request = self._message_factory.create_device_state_request(
|
|
self._controller
|
|
)
|
|
await mqtt.async_publish(self.hass, request.topic, request.payload)
|
|
|
|
if self._controller:
|
|
async_call_later(
|
|
self.hass, self._STATE_REQUEST_DELAY, request_controller_state
|
|
)
|
|
|
|
|
|
class QbusConfigCoordinator:
|
|
"""Class responsible for Qbus config updates."""
|
|
|
|
_qbus_config: QbusDiscovery | None = None
|
|
|
|
def __init__(self, hass: HomeAssistant) -> None:
|
|
"""Initialize config coordinator."""
|
|
|
|
self._hass = hass
|
|
self._message_factory = QbusMqttMessageFactory()
|
|
self._topic_factory = QbusMqttTopicFactory()
|
|
self._cleanup_callbacks: list[CALLBACK_TYPE] = []
|
|
|
|
self._cleanup_callbacks.append(
|
|
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.shutdown)
|
|
)
|
|
|
|
@classmethod
|
|
def get_or_create(cls, hass: HomeAssistant) -> QbusConfigCoordinator:
|
|
"""Get the coordinator and create if necessary."""
|
|
if (coordinator := hass.data.get(QBUS_KEY)) is None:
|
|
coordinator = cls(hass)
|
|
hass.data[QBUS_KEY] = coordinator
|
|
|
|
return coordinator
|
|
|
|
def shutdown(self, event: Event | None = None) -> None:
|
|
"""Shutdown Qbus config coordinator."""
|
|
_LOGGER.debug("Shutting down Qbus config coordinator")
|
|
while self._cleanup_callbacks:
|
|
cleanup_callback = self._cleanup_callbacks.pop()
|
|
cleanup_callback()
|
|
|
|
async def async_subscribe_to_config(self) -> None:
|
|
"""Subscribe to config changes."""
|
|
config_topic = self._topic_factory.get_config_topic()
|
|
_LOGGER.debug("Subscribing to %s", config_topic)
|
|
|
|
self._cleanup_callbacks.append(
|
|
await mqtt.async_subscribe(self._hass, config_topic, self._config_received)
|
|
)
|
|
|
|
async def async_get_or_request_config(self) -> QbusDiscovery | None:
|
|
"""Get or request Qbus config."""
|
|
_LOGGER.debug("Requesting Qbus config")
|
|
|
|
# Config already available
|
|
if self._qbus_config:
|
|
_LOGGER.debug("Qbus config already available")
|
|
return self._qbus_config
|
|
|
|
if not await async_wait_for_mqtt_client(self._hass):
|
|
_LOGGER.debug("MQTT client not ready yet")
|
|
return None
|
|
|
|
# Request config
|
|
_LOGGER.debug("Publishing config request")
|
|
await mqtt.async_publish(
|
|
self._hass, self._topic_factory.get_get_config_topic(), b""
|
|
)
|
|
|
|
return self._qbus_config
|
|
|
|
def store_config(self, config: QbusDiscovery) -> None:
|
|
"Store the Qbus config."
|
|
_LOGGER.debug("Storing config")
|
|
|
|
self._qbus_config = config
|
|
|
|
async def _config_received(self, msg: ReceiveMessage) -> None:
|
|
"""Handle the received MQTT message containing the Qbus config."""
|
|
_LOGGER.debug("Receiving Qbus config")
|
|
|
|
config = self._message_factory.parse_discovery(msg.payload)
|
|
|
|
if config is None:
|
|
_LOGGER.debug("Incomplete Qbus config")
|
|
return
|
|
|
|
self.store_config(config)
|
|
|
|
for entry in self._hass.config_entries.async_loaded_entries(DOMAIN):
|
|
entry = cast(QbusConfigEntry, entry)
|
|
await entry.runtime_data.async_update_controller_config(config)
|