mirror of https://github.com/home-assistant/core
432 lines
13 KiB
Python
432 lines
13 KiB
Python
"""Config flow for Group integration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Callable, Coroutine, Mapping
|
|
from functools import partial
|
|
from typing import Any, cast
|
|
|
|
import voluptuous as vol
|
|
|
|
from homeassistant.components import websocket_api
|
|
from homeassistant.const import CONF_ENTITIES, CONF_TYPE
|
|
from homeassistant.core import HomeAssistant, callback
|
|
from homeassistant.exceptions import HomeAssistantError
|
|
from homeassistant.helpers import entity_registry as er, selector
|
|
from homeassistant.helpers.schema_config_entry_flow import (
|
|
SchemaCommonFlowHandler,
|
|
SchemaConfigFlowHandler,
|
|
SchemaFlowFormStep,
|
|
SchemaFlowMenuStep,
|
|
SchemaOptionsFlowHandler,
|
|
entity_selector_without_own_entities,
|
|
)
|
|
|
|
from .binary_sensor import CONF_ALL, async_create_preview_binary_sensor
|
|
from .button import async_create_preview_button
|
|
from .const import CONF_HIDE_MEMBERS, CONF_IGNORE_NON_NUMERIC, DOMAIN
|
|
from .cover import async_create_preview_cover
|
|
from .entity import GroupEntity
|
|
from .event import async_create_preview_event
|
|
from .fan import async_create_preview_fan
|
|
from .light import async_create_preview_light
|
|
from .lock import async_create_preview_lock
|
|
from .media_player import MediaPlayerGroup, async_create_preview_media_player
|
|
from .notify import async_create_preview_notify
|
|
from .sensor import async_create_preview_sensor
|
|
from .switch import async_create_preview_switch
|
|
|
|
_STATISTIC_MEASURES = [
|
|
"last",
|
|
"max",
|
|
"mean",
|
|
"median",
|
|
"min",
|
|
"product",
|
|
"range",
|
|
"stdev",
|
|
"sum",
|
|
]
|
|
|
|
|
|
async def basic_group_options_schema(
|
|
domain: str | list[str], handler: SchemaCommonFlowHandler | None
|
|
) -> vol.Schema:
|
|
"""Generate options schema."""
|
|
entity_selector: selector.Selector[Any] | vol.Schema
|
|
if handler is None:
|
|
entity_selector = selector.selector(
|
|
{"entity": {"domain": domain, "multiple": True}}
|
|
)
|
|
else:
|
|
entity_selector = entity_selector_without_own_entities(
|
|
cast(SchemaOptionsFlowHandler, handler.parent_handler),
|
|
selector.EntitySelectorConfig(domain=domain, multiple=True),
|
|
)
|
|
|
|
return vol.Schema(
|
|
{
|
|
vol.Required(CONF_ENTITIES): entity_selector,
|
|
vol.Required(CONF_HIDE_MEMBERS, default=False): selector.BooleanSelector(),
|
|
}
|
|
)
|
|
|
|
|
|
def basic_group_config_schema(domain: str | list[str]) -> vol.Schema:
|
|
"""Generate config schema."""
|
|
return vol.Schema(
|
|
{
|
|
vol.Required("name"): selector.TextSelector(),
|
|
vol.Required(CONF_ENTITIES): selector.EntitySelector(
|
|
selector.EntitySelectorConfig(domain=domain, multiple=True),
|
|
),
|
|
vol.Required(CONF_HIDE_MEMBERS, default=False): selector.BooleanSelector(),
|
|
}
|
|
)
|
|
|
|
|
|
async def binary_sensor_options_schema(
|
|
handler: SchemaCommonFlowHandler | None,
|
|
) -> vol.Schema:
|
|
"""Generate options schema."""
|
|
return (await basic_group_options_schema("binary_sensor", handler)).extend(
|
|
{
|
|
vol.Required(CONF_ALL, default=False): selector.BooleanSelector(),
|
|
}
|
|
)
|
|
|
|
|
|
BINARY_SENSOR_CONFIG_SCHEMA = basic_group_config_schema("binary_sensor").extend(
|
|
{
|
|
vol.Required(CONF_ALL, default=False): selector.BooleanSelector(),
|
|
}
|
|
)
|
|
|
|
SENSOR_CONFIG_EXTENDS = {
|
|
vol.Required(CONF_TYPE): selector.SelectSelector(
|
|
selector.SelectSelectorConfig(
|
|
options=_STATISTIC_MEASURES, translation_key=CONF_TYPE
|
|
),
|
|
),
|
|
}
|
|
SENSOR_OPTIONS = {
|
|
vol.Optional(CONF_IGNORE_NON_NUMERIC, default=False): selector.BooleanSelector(),
|
|
vol.Required(CONF_TYPE): selector.SelectSelector(
|
|
selector.SelectSelectorConfig(
|
|
options=_STATISTIC_MEASURES, translation_key=CONF_TYPE
|
|
),
|
|
),
|
|
}
|
|
|
|
|
|
async def sensor_options_schema(
|
|
domain: str, handler: SchemaCommonFlowHandler | None
|
|
) -> vol.Schema:
|
|
"""Generate options schema."""
|
|
return (
|
|
await basic_group_options_schema(["sensor", "number", "input_number"], handler)
|
|
).extend(SENSOR_OPTIONS)
|
|
|
|
|
|
SENSOR_CONFIG_SCHEMA = basic_group_config_schema(
|
|
["sensor", "number", "input_number"]
|
|
).extend(SENSOR_CONFIG_EXTENDS)
|
|
|
|
|
|
async def light_switch_options_schema(
|
|
domain: str, handler: SchemaCommonFlowHandler | None
|
|
) -> vol.Schema:
|
|
"""Generate options schema."""
|
|
return (await basic_group_options_schema(domain, handler)).extend(
|
|
{
|
|
vol.Required(
|
|
CONF_ALL, default=False, description={"advanced": True}
|
|
): selector.BooleanSelector(),
|
|
}
|
|
)
|
|
|
|
|
|
GROUP_TYPES = [
|
|
"binary_sensor",
|
|
"button",
|
|
"cover",
|
|
"event",
|
|
"fan",
|
|
"light",
|
|
"lock",
|
|
"media_player",
|
|
"notify",
|
|
"sensor",
|
|
"switch",
|
|
]
|
|
|
|
|
|
async def choose_options_step(options: dict[str, Any]) -> str:
|
|
"""Return next step_id for options flow according to group_type."""
|
|
return cast(str, options["group_type"])
|
|
|
|
|
|
def set_group_type(
|
|
group_type: str,
|
|
) -> Callable[
|
|
[SchemaCommonFlowHandler, dict[str, Any]], Coroutine[Any, Any, dict[str, Any]]
|
|
]:
|
|
"""Set group type."""
|
|
|
|
async def _set_group_type(
|
|
handler: SchemaCommonFlowHandler, user_input: dict[str, Any]
|
|
) -> dict[str, Any]:
|
|
"""Add group type to user input."""
|
|
return {"group_type": group_type, **user_input}
|
|
|
|
return _set_group_type
|
|
|
|
|
|
CONFIG_FLOW = {
|
|
"user": SchemaFlowMenuStep(GROUP_TYPES),
|
|
"binary_sensor": SchemaFlowFormStep(
|
|
BINARY_SENSOR_CONFIG_SCHEMA,
|
|
preview="group",
|
|
validate_user_input=set_group_type("binary_sensor"),
|
|
),
|
|
"button": SchemaFlowFormStep(
|
|
basic_group_config_schema("button"),
|
|
preview="group",
|
|
validate_user_input=set_group_type("button"),
|
|
),
|
|
"cover": SchemaFlowFormStep(
|
|
basic_group_config_schema("cover"),
|
|
preview="group",
|
|
validate_user_input=set_group_type("cover"),
|
|
),
|
|
"event": SchemaFlowFormStep(
|
|
basic_group_config_schema("event"),
|
|
preview="group",
|
|
validate_user_input=set_group_type("event"),
|
|
),
|
|
"fan": SchemaFlowFormStep(
|
|
basic_group_config_schema("fan"),
|
|
preview="group",
|
|
validate_user_input=set_group_type("fan"),
|
|
),
|
|
"light": SchemaFlowFormStep(
|
|
basic_group_config_schema("light"),
|
|
preview="group",
|
|
validate_user_input=set_group_type("light"),
|
|
),
|
|
"lock": SchemaFlowFormStep(
|
|
basic_group_config_schema("lock"),
|
|
preview="group",
|
|
validate_user_input=set_group_type("lock"),
|
|
),
|
|
"media_player": SchemaFlowFormStep(
|
|
basic_group_config_schema("media_player"),
|
|
preview="group",
|
|
validate_user_input=set_group_type("media_player"),
|
|
),
|
|
"notify": SchemaFlowFormStep(
|
|
basic_group_config_schema("notify"),
|
|
preview="group",
|
|
validate_user_input=set_group_type("notify"),
|
|
),
|
|
"sensor": SchemaFlowFormStep(
|
|
SENSOR_CONFIG_SCHEMA,
|
|
preview="group",
|
|
validate_user_input=set_group_type("sensor"),
|
|
),
|
|
"switch": SchemaFlowFormStep(
|
|
basic_group_config_schema("switch"),
|
|
preview="group",
|
|
validate_user_input=set_group_type("switch"),
|
|
),
|
|
}
|
|
|
|
|
|
OPTIONS_FLOW = {
|
|
"init": SchemaFlowFormStep(next_step=choose_options_step),
|
|
"binary_sensor": SchemaFlowFormStep(
|
|
binary_sensor_options_schema,
|
|
preview="group",
|
|
),
|
|
"button": SchemaFlowFormStep(
|
|
partial(basic_group_options_schema, "button"),
|
|
preview="group",
|
|
),
|
|
"cover": SchemaFlowFormStep(
|
|
partial(basic_group_options_schema, "cover"),
|
|
preview="group",
|
|
),
|
|
"event": SchemaFlowFormStep(
|
|
partial(basic_group_options_schema, "event"),
|
|
preview="group",
|
|
),
|
|
"fan": SchemaFlowFormStep(
|
|
partial(basic_group_options_schema, "fan"),
|
|
preview="group",
|
|
),
|
|
"light": SchemaFlowFormStep(
|
|
partial(light_switch_options_schema, "light"),
|
|
preview="group",
|
|
),
|
|
"lock": SchemaFlowFormStep(
|
|
partial(basic_group_options_schema, "lock"),
|
|
preview="group",
|
|
),
|
|
"media_player": SchemaFlowFormStep(
|
|
partial(basic_group_options_schema, "media_player"),
|
|
preview="group",
|
|
),
|
|
"notify": SchemaFlowFormStep(
|
|
partial(basic_group_options_schema, "notify"),
|
|
preview="group",
|
|
),
|
|
"sensor": SchemaFlowFormStep(
|
|
partial(sensor_options_schema, "sensor"),
|
|
preview="group",
|
|
),
|
|
"switch": SchemaFlowFormStep(
|
|
partial(light_switch_options_schema, "switch"),
|
|
preview="group",
|
|
),
|
|
}
|
|
|
|
PREVIEW_OPTIONS_SCHEMA: dict[str, vol.Schema] = {}
|
|
|
|
CREATE_PREVIEW_ENTITY: dict[
|
|
str,
|
|
Callable[[HomeAssistant, str, dict[str, Any]], GroupEntity | MediaPlayerGroup],
|
|
] = {
|
|
"binary_sensor": async_create_preview_binary_sensor,
|
|
"button": async_create_preview_button,
|
|
"cover": async_create_preview_cover,
|
|
"event": async_create_preview_event,
|
|
"fan": async_create_preview_fan,
|
|
"light": async_create_preview_light,
|
|
"lock": async_create_preview_lock,
|
|
"media_player": async_create_preview_media_player,
|
|
"notify": async_create_preview_notify,
|
|
"sensor": async_create_preview_sensor,
|
|
"switch": async_create_preview_switch,
|
|
}
|
|
|
|
|
|
class GroupConfigFlowHandler(SchemaConfigFlowHandler, domain=DOMAIN):
|
|
"""Handle a config or options flow for groups."""
|
|
|
|
config_flow = CONFIG_FLOW
|
|
options_flow = OPTIONS_FLOW
|
|
|
|
@callback
|
|
def async_config_entry_title(self, options: Mapping[str, Any]) -> str:
|
|
"""Return config entry title.
|
|
|
|
The options parameter contains config entry options, which is the union of user
|
|
input from the config flow steps.
|
|
"""
|
|
return cast(str, options["name"]) if "name" in options else ""
|
|
|
|
@callback
|
|
def async_config_flow_finished(self, options: Mapping[str, Any]) -> None:
|
|
"""Hide the group members if requested."""
|
|
if options[CONF_HIDE_MEMBERS]:
|
|
_async_hide_members(
|
|
self.hass, options[CONF_ENTITIES], er.RegistryEntryHider.INTEGRATION
|
|
)
|
|
|
|
@callback
|
|
@staticmethod
|
|
def async_options_flow_finished(
|
|
hass: HomeAssistant, options: Mapping[str, Any]
|
|
) -> None:
|
|
"""Hide or unhide the group members as requested."""
|
|
hidden_by = (
|
|
er.RegistryEntryHider.INTEGRATION if options[CONF_HIDE_MEMBERS] else None
|
|
)
|
|
_async_hide_members(hass, options[CONF_ENTITIES], hidden_by)
|
|
|
|
@staticmethod
|
|
async def async_setup_preview(hass: HomeAssistant) -> None:
|
|
"""Set up preview WS API."""
|
|
for group_type, form_step in OPTIONS_FLOW.items():
|
|
if group_type not in GROUP_TYPES:
|
|
continue
|
|
schema = cast(
|
|
Callable[
|
|
[SchemaCommonFlowHandler | None], Coroutine[Any, Any, vol.Schema]
|
|
],
|
|
form_step.schema,
|
|
)
|
|
PREVIEW_OPTIONS_SCHEMA[group_type] = await schema(None)
|
|
websocket_api.async_register_command(hass, ws_start_preview)
|
|
|
|
|
|
def _async_hide_members(
|
|
hass: HomeAssistant, members: list[str], hidden_by: er.RegistryEntryHider | None
|
|
) -> None:
|
|
"""Hide or unhide group members."""
|
|
registry = er.async_get(hass)
|
|
for member in members:
|
|
if not (entity_id := er.async_resolve_entity_id(registry, member)):
|
|
continue
|
|
if entity_id not in registry.entities:
|
|
continue
|
|
registry.async_update_entity(entity_id, hidden_by=hidden_by)
|
|
|
|
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "group/start_preview",
|
|
vol.Required("flow_id"): str,
|
|
vol.Required("flow_type"): vol.Any("config_flow", "options_flow"),
|
|
vol.Required("user_input"): dict,
|
|
}
|
|
)
|
|
@callback
|
|
def ws_start_preview(
|
|
hass: HomeAssistant,
|
|
connection: websocket_api.ActiveConnection,
|
|
msg: dict[str, Any],
|
|
) -> None:
|
|
"""Generate a preview."""
|
|
entity_registry_entry: er.RegistryEntry | None = None
|
|
if msg["flow_type"] == "config_flow":
|
|
flow_status = hass.config_entries.flow.async_get(msg["flow_id"])
|
|
group_type = flow_status["step_id"]
|
|
form_step = cast(SchemaFlowFormStep, CONFIG_FLOW[group_type])
|
|
schema = cast(vol.Schema, form_step.schema)
|
|
validated = schema(msg["user_input"])
|
|
name = validated["name"]
|
|
else:
|
|
flow_status = hass.config_entries.options.async_get(msg["flow_id"])
|
|
config_entry_id = flow_status["handler"]
|
|
config_entry = hass.config_entries.async_get_entry(config_entry_id)
|
|
if not config_entry:
|
|
raise HomeAssistantError
|
|
group_type = config_entry.options["group_type"]
|
|
name = config_entry.options["name"]
|
|
validated = PREVIEW_OPTIONS_SCHEMA[group_type](msg["user_input"])
|
|
entity_registry = er.async_get(hass)
|
|
entries = er.async_entries_for_config_entry(entity_registry, config_entry_id)
|
|
if entries:
|
|
entity_registry_entry = entries[0]
|
|
|
|
@callback
|
|
def async_preview_updated(state: str, attributes: Mapping[str, Any]) -> None:
|
|
"""Forward config entry state events to websocket."""
|
|
connection.send_message(
|
|
websocket_api.event_message(
|
|
msg["id"], {"attributes": attributes, "state": state}
|
|
)
|
|
)
|
|
|
|
preview_entity: GroupEntity | MediaPlayerGroup = CREATE_PREVIEW_ENTITY[group_type](
|
|
hass, name, validated
|
|
)
|
|
preview_entity.hass = hass
|
|
preview_entity.registry_entry = entity_registry_entry
|
|
|
|
connection.send_result(msg["id"])
|
|
connection.subscriptions[msg["id"]] = preview_entity.async_start_preview(
|
|
async_preview_updated
|
|
)
|