mirror of https://github.com/home-assistant/core
144 lines
5.3 KiB
Python
144 lines
5.3 KiB
Python
"""Data update coordinator for trigger based template entities."""
|
|
|
|
from collections.abc import Callable, Mapping
|
|
import logging
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
from homeassistant.const import EVENT_HOMEASSISTANT_START
|
|
from homeassistant.core import Context, CoreState, Event, HomeAssistant, callback
|
|
from homeassistant.helpers import condition, discovery, trigger as trigger_helper
|
|
from homeassistant.helpers.script import Script
|
|
from homeassistant.helpers.trace import trace_get
|
|
from homeassistant.helpers.typing import ConfigType, TemplateVarsType
|
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
|
|
|
|
from .const import CONF_ACTION, CONF_CONDITION, CONF_TRIGGER, DOMAIN, PLATFORMS
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class TriggerUpdateCoordinator(DataUpdateCoordinator):
|
|
"""Data update coordinator for trigger based template entities."""
|
|
|
|
REMOVE_TRIGGER = object()
|
|
|
|
def __init__(self, hass: HomeAssistant, config: dict[str, Any]) -> None:
|
|
"""Instantiate trigger data."""
|
|
super().__init__(
|
|
hass, _LOGGER, config_entry=None, name="Trigger Update Coordinator"
|
|
)
|
|
self.config = config
|
|
self._cond_func: Callable[[Mapping[str, Any] | None], bool] | None = None
|
|
self._unsub_start: Callable[[], None] | None = None
|
|
self._unsub_trigger: Callable[[], None] | None = None
|
|
self._script: Script | None = None
|
|
|
|
@property
|
|
def unique_id(self) -> str | None:
|
|
"""Return unique ID for the entity."""
|
|
return self.config.get("unique_id")
|
|
|
|
@callback
|
|
def async_remove(self) -> None:
|
|
"""Signal that the entities need to remove themselves."""
|
|
if self._unsub_start:
|
|
self._unsub_start()
|
|
if self._unsub_trigger:
|
|
self._unsub_trigger()
|
|
|
|
async def async_setup(self, hass_config: ConfigType) -> None:
|
|
"""Set up the trigger and create entities."""
|
|
if self.hass.state is CoreState.running:
|
|
await self._attach_triggers()
|
|
else:
|
|
self._unsub_start = self.hass.bus.async_listen_once(
|
|
EVENT_HOMEASSISTANT_START, self._attach_triggers
|
|
)
|
|
|
|
for platform_domain in PLATFORMS:
|
|
if platform_domain in self.config:
|
|
self.hass.async_create_task(
|
|
discovery.async_load_platform(
|
|
self.hass,
|
|
platform_domain,
|
|
DOMAIN,
|
|
{"coordinator": self, "entities": self.config[platform_domain]},
|
|
hass_config,
|
|
),
|
|
eager_start=True,
|
|
)
|
|
|
|
async def _attach_triggers(self, start_event: Event | None = None) -> None:
|
|
"""Attach the triggers."""
|
|
if CONF_ACTION in self.config:
|
|
self._script = Script(
|
|
self.hass,
|
|
self.config[CONF_ACTION],
|
|
self.name,
|
|
DOMAIN,
|
|
)
|
|
|
|
if CONF_CONDITION in self.config:
|
|
self._cond_func = await condition.async_conditions_from_config(
|
|
self.hass, self.config[CONF_CONDITION], _LOGGER, "template entity"
|
|
)
|
|
|
|
if start_event is not None:
|
|
self._unsub_start = None
|
|
|
|
if self._script:
|
|
action: Callable = self._handle_triggered_with_script
|
|
else:
|
|
action = self._handle_triggered
|
|
|
|
self._unsub_trigger = await trigger_helper.async_initialize_triggers(
|
|
self.hass,
|
|
self.config[CONF_TRIGGER],
|
|
action,
|
|
DOMAIN,
|
|
self.name,
|
|
self.logger.log,
|
|
start_event is not None,
|
|
)
|
|
|
|
async def _handle_triggered_with_script(
|
|
self, run_variables: TemplateVarsType, context: Context | None = None
|
|
) -> None:
|
|
if not self._check_condition(run_variables):
|
|
return
|
|
# Create a context referring to the trigger context.
|
|
trigger_context_id = None if context is None else context.id
|
|
script_context = Context(parent_id=trigger_context_id)
|
|
if TYPE_CHECKING:
|
|
# This method is only called if there's a script
|
|
assert self._script is not None
|
|
if script_result := await self._script.async_run(run_variables, script_context):
|
|
run_variables = script_result.variables
|
|
self._execute_update(run_variables, context)
|
|
|
|
async def _handle_triggered(
|
|
self, run_variables: TemplateVarsType, context: Context | None = None
|
|
) -> None:
|
|
if not self._check_condition(run_variables):
|
|
return
|
|
self._execute_update(run_variables, context)
|
|
|
|
def _check_condition(self, run_variables: TemplateVarsType) -> bool:
|
|
if not self._cond_func:
|
|
return True
|
|
condition_result = self._cond_func(run_variables)
|
|
if condition_result is False:
|
|
_LOGGER.debug(
|
|
"Conditions not met, aborting template trigger update. Condition summary: %s",
|
|
trace_get(clear=False),
|
|
)
|
|
return condition_result
|
|
|
|
@callback
|
|
def _execute_update(
|
|
self, run_variables: TemplateVarsType, context: Context | None = None
|
|
) -> None:
|
|
self.async_set_updated_data(
|
|
{"run_variables": run_variables, "context": context}
|
|
)
|