mirror of https://github.com/home-assistant/core
528 lines
18 KiB
Python
528 lines
18 KiB
Python
"""Config flow for the Template integration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Callable, Coroutine, Mapping
|
|
from functools import partial
|
|
from typing import Any, cast
|
|
|
|
import voluptuous as vol
|
|
|
|
from homeassistant.components import websocket_api
|
|
from homeassistant.components.binary_sensor import BinarySensorDeviceClass
|
|
from homeassistant.components.button import ButtonDeviceClass
|
|
from homeassistant.components.sensor import (
|
|
CONF_STATE_CLASS,
|
|
DEVICE_CLASS_STATE_CLASSES,
|
|
DEVICE_CLASS_UNITS,
|
|
SensorDeviceClass,
|
|
SensorStateClass,
|
|
)
|
|
from homeassistant.const import (
|
|
CONF_DEVICE_CLASS,
|
|
CONF_DEVICE_ID,
|
|
CONF_NAME,
|
|
CONF_STATE,
|
|
CONF_UNIT_OF_MEASUREMENT,
|
|
CONF_URL,
|
|
CONF_VALUE_TEMPLATE,
|
|
CONF_VERIFY_SSL,
|
|
Platform,
|
|
)
|
|
from homeassistant.core import HomeAssistant, callback
|
|
from homeassistant.exceptions import HomeAssistantError
|
|
from homeassistant.helpers import entity_registry as er, selector
|
|
from homeassistant.helpers.schema_config_entry_flow import (
|
|
SchemaCommonFlowHandler,
|
|
SchemaConfigFlowHandler,
|
|
SchemaFlowFormStep,
|
|
SchemaFlowMenuStep,
|
|
)
|
|
|
|
from .alarm_control_panel import (
|
|
CONF_ARM_AWAY_ACTION,
|
|
CONF_ARM_CUSTOM_BYPASS_ACTION,
|
|
CONF_ARM_HOME_ACTION,
|
|
CONF_ARM_NIGHT_ACTION,
|
|
CONF_ARM_VACATION_ACTION,
|
|
CONF_CODE_ARM_REQUIRED,
|
|
CONF_CODE_FORMAT,
|
|
CONF_DISARM_ACTION,
|
|
CONF_TRIGGER_ACTION,
|
|
TemplateCodeFormat,
|
|
)
|
|
from .binary_sensor import async_create_preview_binary_sensor
|
|
from .const import CONF_PRESS, CONF_TURN_OFF, CONF_TURN_ON, DOMAIN
|
|
from .number import (
|
|
CONF_MAX,
|
|
CONF_MIN,
|
|
CONF_SET_VALUE,
|
|
CONF_STEP,
|
|
DEFAULT_MAX_VALUE,
|
|
DEFAULT_MIN_VALUE,
|
|
DEFAULT_STEP,
|
|
async_create_preview_number,
|
|
)
|
|
from .select import CONF_OPTIONS, CONF_SELECT_OPTION
|
|
from .sensor import async_create_preview_sensor
|
|
from .switch import async_create_preview_switch
|
|
from .template_entity import TemplateEntity
|
|
|
|
_SCHEMA_STATE: dict[vol.Marker, Any] = {
|
|
vol.Required(CONF_STATE): selector.TemplateSelector(),
|
|
}
|
|
|
|
|
|
def generate_schema(domain: str, flow_type: str) -> vol.Schema:
|
|
"""Generate schema."""
|
|
schema: dict[vol.Marker, Any] = {}
|
|
|
|
if flow_type == "config":
|
|
schema = {vol.Required(CONF_NAME): selector.TextSelector()}
|
|
|
|
if domain == Platform.ALARM_CONTROL_PANEL:
|
|
schema |= {
|
|
vol.Optional(CONF_VALUE_TEMPLATE): selector.TemplateSelector(),
|
|
vol.Optional(CONF_DISARM_ACTION): selector.ActionSelector(),
|
|
vol.Optional(CONF_ARM_AWAY_ACTION): selector.ActionSelector(),
|
|
vol.Optional(CONF_ARM_CUSTOM_BYPASS_ACTION): selector.ActionSelector(),
|
|
vol.Optional(CONF_ARM_HOME_ACTION): selector.ActionSelector(),
|
|
vol.Optional(CONF_ARM_NIGHT_ACTION): selector.ActionSelector(),
|
|
vol.Optional(CONF_ARM_VACATION_ACTION): selector.ActionSelector(),
|
|
vol.Optional(CONF_TRIGGER_ACTION): selector.ActionSelector(),
|
|
vol.Optional(
|
|
CONF_CODE_ARM_REQUIRED, default=True
|
|
): selector.BooleanSelector(),
|
|
vol.Optional(
|
|
CONF_CODE_FORMAT, default=TemplateCodeFormat.number.name
|
|
): selector.SelectSelector(
|
|
selector.SelectSelectorConfig(
|
|
options=[e.name for e in TemplateCodeFormat],
|
|
mode=selector.SelectSelectorMode.DROPDOWN,
|
|
translation_key="alarm_control_panel_code_format",
|
|
)
|
|
),
|
|
}
|
|
|
|
if domain == Platform.BINARY_SENSOR:
|
|
schema |= _SCHEMA_STATE
|
|
if flow_type == "config":
|
|
schema |= {
|
|
vol.Optional(CONF_DEVICE_CLASS): selector.SelectSelector(
|
|
selector.SelectSelectorConfig(
|
|
options=[cls.value for cls in BinarySensorDeviceClass],
|
|
mode=selector.SelectSelectorMode.DROPDOWN,
|
|
translation_key="binary_sensor_device_class",
|
|
sort=True,
|
|
),
|
|
),
|
|
}
|
|
|
|
if domain == Platform.BUTTON:
|
|
schema |= {
|
|
vol.Optional(CONF_PRESS): selector.ActionSelector(),
|
|
}
|
|
if flow_type == "config":
|
|
schema |= {
|
|
vol.Optional(CONF_DEVICE_CLASS): selector.SelectSelector(
|
|
selector.SelectSelectorConfig(
|
|
options=[cls.value for cls in ButtonDeviceClass],
|
|
mode=selector.SelectSelectorMode.DROPDOWN,
|
|
translation_key="button_device_class",
|
|
sort=True,
|
|
),
|
|
)
|
|
}
|
|
|
|
if domain == Platform.IMAGE:
|
|
schema |= {
|
|
vol.Required(CONF_URL): selector.TemplateSelector(),
|
|
vol.Optional(CONF_VERIFY_SSL, default=True): selector.BooleanSelector(),
|
|
}
|
|
|
|
if domain == Platform.NUMBER:
|
|
schema |= {
|
|
vol.Required(CONF_STATE): selector.TemplateSelector(),
|
|
vol.Required(CONF_MIN, default=DEFAULT_MIN_VALUE): selector.NumberSelector(
|
|
selector.NumberSelectorConfig(mode=selector.NumberSelectorMode.BOX),
|
|
),
|
|
vol.Required(CONF_MAX, default=DEFAULT_MAX_VALUE): selector.NumberSelector(
|
|
selector.NumberSelectorConfig(mode=selector.NumberSelectorMode.BOX),
|
|
),
|
|
vol.Required(CONF_STEP, default=DEFAULT_STEP): selector.NumberSelector(
|
|
selector.NumberSelectorConfig(mode=selector.NumberSelectorMode.BOX),
|
|
),
|
|
vol.Optional(CONF_UNIT_OF_MEASUREMENT): selector.TextSelector(
|
|
selector.TextSelectorConfig(
|
|
type=selector.TextSelectorType.TEXT, multiline=False
|
|
)
|
|
),
|
|
vol.Optional(CONF_SET_VALUE): selector.ActionSelector(),
|
|
}
|
|
|
|
if domain == Platform.SELECT:
|
|
schema |= _SCHEMA_STATE | {
|
|
vol.Required(CONF_OPTIONS): selector.TemplateSelector(),
|
|
vol.Optional(CONF_SELECT_OPTION): selector.ActionSelector(),
|
|
}
|
|
|
|
if domain == Platform.SENSOR:
|
|
schema |= _SCHEMA_STATE | {
|
|
vol.Optional(CONF_UNIT_OF_MEASUREMENT): selector.SelectSelector(
|
|
selector.SelectSelectorConfig(
|
|
options=list(
|
|
{
|
|
str(unit)
|
|
for units in DEVICE_CLASS_UNITS.values()
|
|
for unit in units
|
|
if unit is not None
|
|
}
|
|
),
|
|
mode=selector.SelectSelectorMode.DROPDOWN,
|
|
translation_key="sensor_unit_of_measurement",
|
|
custom_value=True,
|
|
sort=True,
|
|
),
|
|
),
|
|
vol.Optional(CONF_DEVICE_CLASS): selector.SelectSelector(
|
|
selector.SelectSelectorConfig(
|
|
options=[
|
|
cls.value
|
|
for cls in SensorDeviceClass
|
|
if cls != SensorDeviceClass.ENUM
|
|
],
|
|
mode=selector.SelectSelectorMode.DROPDOWN,
|
|
translation_key="sensor_device_class",
|
|
sort=True,
|
|
),
|
|
),
|
|
vol.Optional(CONF_STATE_CLASS): selector.SelectSelector(
|
|
selector.SelectSelectorConfig(
|
|
options=[cls.value for cls in SensorStateClass],
|
|
mode=selector.SelectSelectorMode.DROPDOWN,
|
|
translation_key="sensor_state_class",
|
|
sort=True,
|
|
),
|
|
),
|
|
}
|
|
|
|
if domain == Platform.SWITCH:
|
|
schema |= {
|
|
vol.Optional(CONF_VALUE_TEMPLATE): selector.TemplateSelector(),
|
|
vol.Optional(CONF_TURN_ON): selector.ActionSelector(),
|
|
vol.Optional(CONF_TURN_OFF): selector.ActionSelector(),
|
|
}
|
|
|
|
schema[vol.Optional(CONF_DEVICE_ID)] = selector.DeviceSelector()
|
|
|
|
return vol.Schema(schema)
|
|
|
|
|
|
options_schema = partial(generate_schema, flow_type="options")
|
|
|
|
config_schema = partial(generate_schema, flow_type="config")
|
|
|
|
|
|
async def choose_options_step(options: dict[str, Any]) -> str:
|
|
"""Return next step_id for options flow according to template_type."""
|
|
return cast(str, options["template_type"])
|
|
|
|
|
|
def _validate_unit(options: dict[str, Any]) -> None:
|
|
"""Validate unit of measurement."""
|
|
if (
|
|
(device_class := options.get(CONF_DEVICE_CLASS))
|
|
and (units := DEVICE_CLASS_UNITS.get(device_class)) is not None
|
|
and (unit := options.get(CONF_UNIT_OF_MEASUREMENT)) not in units
|
|
):
|
|
sorted_units = sorted(
|
|
[f"'{unit!s}'" if unit else "no unit of measurement" for unit in units],
|
|
key=str.casefold,
|
|
)
|
|
if len(sorted_units) == 1:
|
|
units_string = sorted_units[0]
|
|
else:
|
|
units_string = f"one of {', '.join(sorted_units)}"
|
|
|
|
raise vol.Invalid(
|
|
f"'{unit}' is not a valid unit for device class '{device_class}'; "
|
|
f"expected {units_string}"
|
|
)
|
|
|
|
|
|
def _validate_state_class(options: dict[str, Any]) -> None:
|
|
"""Validate state class."""
|
|
if (
|
|
(state_class := options.get(CONF_STATE_CLASS))
|
|
and (device_class := options.get(CONF_DEVICE_CLASS))
|
|
and (state_classes := DEVICE_CLASS_STATE_CLASSES.get(device_class)) is not None
|
|
and state_class not in state_classes
|
|
):
|
|
sorted_state_classes = sorted(
|
|
[f"'{state_class!s}'" for state_class in state_classes],
|
|
key=str.casefold,
|
|
)
|
|
if len(sorted_state_classes) == 0:
|
|
state_classes_string = "no state class"
|
|
elif len(sorted_state_classes) == 1:
|
|
state_classes_string = sorted_state_classes[0]
|
|
else:
|
|
state_classes_string = f"one of {', '.join(sorted_state_classes)}"
|
|
|
|
raise vol.Invalid(
|
|
f"'{state_class}' is not a valid state class for device class "
|
|
f"'{device_class}'; expected {state_classes_string}"
|
|
)
|
|
|
|
|
|
def validate_user_input(
|
|
template_type: str,
|
|
) -> Callable[
|
|
[SchemaCommonFlowHandler, dict[str, Any]],
|
|
Coroutine[Any, Any, dict[str, Any]],
|
|
]:
|
|
"""Do post validation of user input.
|
|
|
|
For sensors: Validate unit of measurement.
|
|
For all domaines: Set template type.
|
|
"""
|
|
|
|
async def _validate_user_input(
|
|
_: SchemaCommonFlowHandler,
|
|
user_input: dict[str, Any],
|
|
) -> dict[str, Any]:
|
|
"""Add template type to user input."""
|
|
if template_type == Platform.SENSOR:
|
|
_validate_unit(user_input)
|
|
_validate_state_class(user_input)
|
|
return {"template_type": template_type} | user_input
|
|
|
|
return _validate_user_input
|
|
|
|
|
|
TEMPLATE_TYPES = [
|
|
"alarm_control_panel",
|
|
"binary_sensor",
|
|
"button",
|
|
"image",
|
|
"number",
|
|
"select",
|
|
"sensor",
|
|
"switch",
|
|
]
|
|
|
|
CONFIG_FLOW = {
|
|
"user": SchemaFlowMenuStep(TEMPLATE_TYPES),
|
|
Platform.ALARM_CONTROL_PANEL: SchemaFlowFormStep(
|
|
config_schema(Platform.ALARM_CONTROL_PANEL),
|
|
validate_user_input=validate_user_input(Platform.ALARM_CONTROL_PANEL),
|
|
),
|
|
Platform.BINARY_SENSOR: SchemaFlowFormStep(
|
|
config_schema(Platform.BINARY_SENSOR),
|
|
preview="template",
|
|
validate_user_input=validate_user_input(Platform.BINARY_SENSOR),
|
|
),
|
|
Platform.BUTTON: SchemaFlowFormStep(
|
|
config_schema(Platform.BUTTON),
|
|
validate_user_input=validate_user_input(Platform.BUTTON),
|
|
),
|
|
Platform.IMAGE: SchemaFlowFormStep(
|
|
config_schema(Platform.IMAGE),
|
|
validate_user_input=validate_user_input(Platform.IMAGE),
|
|
),
|
|
Platform.NUMBER: SchemaFlowFormStep(
|
|
config_schema(Platform.NUMBER),
|
|
preview="template",
|
|
validate_user_input=validate_user_input(Platform.NUMBER),
|
|
),
|
|
Platform.SELECT: SchemaFlowFormStep(
|
|
config_schema(Platform.SELECT),
|
|
validate_user_input=validate_user_input(Platform.SELECT),
|
|
),
|
|
Platform.SENSOR: SchemaFlowFormStep(
|
|
config_schema(Platform.SENSOR),
|
|
preview="template",
|
|
validate_user_input=validate_user_input(Platform.SENSOR),
|
|
),
|
|
Platform.SWITCH: SchemaFlowFormStep(
|
|
config_schema(Platform.SWITCH),
|
|
preview="template",
|
|
validate_user_input=validate_user_input(Platform.SWITCH),
|
|
),
|
|
}
|
|
|
|
|
|
OPTIONS_FLOW = {
|
|
"init": SchemaFlowFormStep(next_step=choose_options_step),
|
|
Platform.ALARM_CONTROL_PANEL: SchemaFlowFormStep(
|
|
options_schema(Platform.ALARM_CONTROL_PANEL),
|
|
validate_user_input=validate_user_input(Platform.ALARM_CONTROL_PANEL),
|
|
),
|
|
Platform.BINARY_SENSOR: SchemaFlowFormStep(
|
|
options_schema(Platform.BINARY_SENSOR),
|
|
preview="template",
|
|
validate_user_input=validate_user_input(Platform.BINARY_SENSOR),
|
|
),
|
|
Platform.BUTTON: SchemaFlowFormStep(
|
|
options_schema(Platform.BUTTON),
|
|
validate_user_input=validate_user_input(Platform.BUTTON),
|
|
),
|
|
Platform.IMAGE: SchemaFlowFormStep(
|
|
options_schema(Platform.IMAGE),
|
|
validate_user_input=validate_user_input(Platform.IMAGE),
|
|
),
|
|
Platform.NUMBER: SchemaFlowFormStep(
|
|
options_schema(Platform.NUMBER),
|
|
preview="template",
|
|
validate_user_input=validate_user_input(Platform.NUMBER),
|
|
),
|
|
Platform.SELECT: SchemaFlowFormStep(
|
|
options_schema(Platform.SELECT),
|
|
validate_user_input=validate_user_input(Platform.SELECT),
|
|
),
|
|
Platform.SENSOR: SchemaFlowFormStep(
|
|
options_schema(Platform.SENSOR),
|
|
preview="template",
|
|
validate_user_input=validate_user_input(Platform.SENSOR),
|
|
),
|
|
Platform.SWITCH: SchemaFlowFormStep(
|
|
options_schema(Platform.SWITCH),
|
|
preview="template",
|
|
validate_user_input=validate_user_input(Platform.SWITCH),
|
|
),
|
|
}
|
|
|
|
CREATE_PREVIEW_ENTITY: dict[
|
|
str,
|
|
Callable[[HomeAssistant, str, dict[str, Any]], TemplateEntity],
|
|
] = {
|
|
"binary_sensor": async_create_preview_binary_sensor,
|
|
"number": async_create_preview_number,
|
|
"sensor": async_create_preview_sensor,
|
|
"switch": async_create_preview_switch,
|
|
}
|
|
|
|
|
|
class TemplateConfigFlowHandler(SchemaConfigFlowHandler, domain=DOMAIN):
|
|
"""Handle config flow for template helper."""
|
|
|
|
config_flow = CONFIG_FLOW
|
|
options_flow = OPTIONS_FLOW
|
|
|
|
@callback
|
|
def async_config_entry_title(self, options: Mapping[str, Any]) -> str:
|
|
"""Return config entry title."""
|
|
return cast(str, options["name"])
|
|
|
|
@staticmethod
|
|
async def async_setup_preview(hass: HomeAssistant) -> None:
|
|
"""Set up preview WS API."""
|
|
websocket_api.async_register_command(hass, ws_start_preview)
|
|
|
|
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "template/start_preview",
|
|
vol.Required("flow_id"): str,
|
|
vol.Required("flow_type"): vol.Any("config_flow", "options_flow"),
|
|
vol.Required("user_input"): dict,
|
|
}
|
|
)
|
|
@callback
|
|
def ws_start_preview(
|
|
hass: HomeAssistant,
|
|
connection: websocket_api.ActiveConnection,
|
|
msg: dict[str, Any],
|
|
) -> None:
|
|
"""Generate a preview."""
|
|
|
|
def _validate(schema: vol.Schema, domain: str, user_input: dict[str, Any]) -> Any:
|
|
errors = {}
|
|
key: vol.Marker
|
|
for key, validator in schema.schema.items():
|
|
if key.schema not in user_input:
|
|
continue
|
|
try:
|
|
validator(user_input[key.schema])
|
|
except vol.Invalid as ex:
|
|
errors[key.schema] = str(ex.msg)
|
|
|
|
if domain == Platform.SENSOR:
|
|
try:
|
|
_validate_unit(user_input)
|
|
except vol.Invalid as ex:
|
|
errors[CONF_UNIT_OF_MEASUREMENT] = str(ex.msg)
|
|
try:
|
|
_validate_state_class(user_input)
|
|
except vol.Invalid as ex:
|
|
errors[CONF_STATE_CLASS] = str(ex.msg)
|
|
|
|
return errors
|
|
|
|
entity_registry_entry: er.RegistryEntry | None = None
|
|
if msg["flow_type"] == "config_flow":
|
|
flow_status = hass.config_entries.flow.async_get(msg["flow_id"])
|
|
template_type = flow_status["step_id"]
|
|
form_step = cast(SchemaFlowFormStep, CONFIG_FLOW[template_type])
|
|
schema = cast(vol.Schema, form_step.schema)
|
|
name = msg["user_input"]["name"]
|
|
else:
|
|
flow_status = hass.config_entries.options.async_get(msg["flow_id"])
|
|
config_entry = hass.config_entries.async_get_entry(flow_status["handler"])
|
|
if not config_entry:
|
|
raise HomeAssistantError
|
|
template_type = config_entry.options["template_type"]
|
|
name = config_entry.options["name"]
|
|
schema = cast(vol.Schema, OPTIONS_FLOW[template_type].schema)
|
|
entity_registry = er.async_get(hass)
|
|
entries = er.async_entries_for_config_entry(
|
|
entity_registry, flow_status["handler"]
|
|
)
|
|
if entries:
|
|
entity_registry_entry = entries[0]
|
|
|
|
errors = _validate(schema, template_type, msg["user_input"])
|
|
|
|
@callback
|
|
def async_preview_updated(
|
|
state: str | None,
|
|
attributes: Mapping[str, Any] | None,
|
|
listeners: dict[str, bool | set[str]] | None,
|
|
error: str | None,
|
|
) -> None:
|
|
"""Forward config entry state events to websocket."""
|
|
if error is not None:
|
|
connection.send_message(
|
|
websocket_api.event_message(
|
|
msg["id"],
|
|
{"error": error},
|
|
)
|
|
)
|
|
return
|
|
connection.send_message(
|
|
websocket_api.event_message(
|
|
msg["id"],
|
|
{"attributes": attributes, "listeners": listeners, "state": state},
|
|
)
|
|
)
|
|
|
|
if errors:
|
|
connection.send_message(
|
|
{
|
|
"id": msg["id"],
|
|
"type": websocket_api.TYPE_RESULT,
|
|
"success": False,
|
|
"error": {"code": "invalid_user_input", "message": errors},
|
|
}
|
|
)
|
|
return
|
|
|
|
preview_entity = CREATE_PREVIEW_ENTITY[template_type](hass, name, msg["user_input"])
|
|
preview_entity.hass = hass
|
|
preview_entity.registry_entry = entity_registry_entry
|
|
|
|
connection.send_result(msg["id"])
|
|
connection.subscriptions[msg["id"]] = preview_entity.async_start_preview(
|
|
async_preview_updated
|
|
)
|