core/homeassistant/helpers/intent.py

1441 lines
45 KiB
Python

"""Module to coordinate user intentions."""
from __future__ import annotations
from abc import abstractmethod
import asyncio
from collections.abc import Callable, Collection, Coroutine, Iterable
import dataclasses
from dataclasses import dataclass, field
from enum import Enum, StrEnum, auto
from itertools import groupby
import logging
from typing import Any
from propcache import cached_property
import voluptuous as vol
from homeassistant.components.homeassistant.exposed_entities import async_should_expose
from homeassistant.const import (
ATTR_DEVICE_CLASS,
ATTR_ENTITY_ID,
ATTR_SUPPORTED_FEATURES,
)
from homeassistant.core import Context, HomeAssistant, State, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.loader import bind_hass
from homeassistant.util.hass_dict import HassKey
from . import (
area_registry,
config_validation as cv,
device_registry,
entity_registry,
floor_registry,
)
from .typing import VolSchemaType
_LOGGER = logging.getLogger(__name__)
type _SlotsType = dict[str, Any]
type _IntentSlotsType = dict[
str | tuple[str, str], VolSchemaType | Callable[[Any], Any]
]
INTENT_TURN_OFF = "HassTurnOff"
INTENT_TURN_ON = "HassTurnOn"
INTENT_TOGGLE = "HassToggle"
INTENT_GET_STATE = "HassGetState"
INTENT_NEVERMIND = "HassNevermind"
INTENT_SET_POSITION = "HassSetPosition"
INTENT_START_TIMER = "HassStartTimer"
INTENT_CANCEL_TIMER = "HassCancelTimer"
INTENT_INCREASE_TIMER = "HassIncreaseTimer"
INTENT_DECREASE_TIMER = "HassDecreaseTimer"
INTENT_PAUSE_TIMER = "HassPauseTimer"
INTENT_UNPAUSE_TIMER = "HassUnpauseTimer"
INTENT_TIMER_STATUS = "HassTimerStatus"
INTENT_GET_CURRENT_DATE = "HassGetCurrentDate"
INTENT_GET_CURRENT_TIME = "HassGetCurrentTime"
INTENT_RESPOND = "HassRespond"
SLOT_SCHEMA = vol.Schema({}, extra=vol.ALLOW_EXTRA)
DATA_KEY: HassKey[dict[str, IntentHandler]] = HassKey("intent")
SPEECH_TYPE_PLAIN = "plain"
SPEECH_TYPE_SSML = "ssml"
@callback
@bind_hass
def async_register(hass: HomeAssistant, handler: IntentHandler) -> None:
"""Register an intent with Home Assistant."""
if (intents := hass.data.get(DATA_KEY)) is None:
intents = {}
hass.data[DATA_KEY] = intents
assert getattr(handler, "intent_type", None), "intent_type should be set"
if handler.intent_type in intents:
_LOGGER.warning(
"Intent %s is being overwritten by %s", handler.intent_type, handler
)
intents[handler.intent_type] = handler
@callback
@bind_hass
def async_remove(hass: HomeAssistant, intent_type: str) -> None:
"""Remove an intent from Home Assistant."""
if (intents := hass.data.get(DATA_KEY)) is None:
return
intents.pop(intent_type, None)
@callback
def async_get(hass: HomeAssistant) -> Iterable[IntentHandler]:
"""Return registered intents."""
return hass.data.get(DATA_KEY, {}).values()
@bind_hass
async def async_handle(
hass: HomeAssistant,
platform: str,
intent_type: str,
slots: _SlotsType | None = None,
text_input: str | None = None,
context: Context | None = None,
language: str | None = None,
assistant: str | None = None,
device_id: str | None = None,
conversation_agent_id: str | None = None,
) -> IntentResponse:
"""Handle an intent."""
handler = hass.data.get(DATA_KEY, {}).get(intent_type)
if handler is None:
raise UnknownIntent(f"Unknown intent {intent_type}")
if context is None:
context = Context()
if language is None:
language = hass.config.language
intent = Intent(
hass,
platform=platform,
intent_type=intent_type,
slots=slots or {},
text_input=text_input,
context=context,
language=language,
assistant=assistant,
device_id=device_id,
conversation_agent_id=conversation_agent_id,
)
try:
_LOGGER.info("Triggering intent handler %s", handler)
result = await handler.async_handle(intent)
except vol.Invalid as err:
_LOGGER.warning("Received invalid slot info for %s: %s", intent_type, err)
raise InvalidSlotInfo(f"Received invalid slot info for {intent_type}") from err
except IntentError:
raise # bubble up intent related errors
except Exception as err:
_LOGGER.exception("Error handling %s", intent_type)
raise IntentUnexpectedError(f"Error handling {intent_type}") from err
return result
class IntentError(HomeAssistantError):
"""Base class for intent related errors."""
class UnknownIntent(IntentError):
"""When the intent is not registered."""
class InvalidSlotInfo(IntentError):
"""When the slot data is invalid."""
class IntentHandleError(IntentError):
"""Error while handling intent."""
def __init__(self, message: str = "", response_key: str | None = None) -> None:
"""Initialize error."""
super().__init__(message)
self.response_key = response_key
class IntentUnexpectedError(IntentError):
"""Unexpected error while handling intent."""
class MatchFailedReason(Enum):
"""Possible reasons for match failure in async_match_targets."""
NAME = auto()
"""No entities matched name constraint."""
AREA = auto()
"""No entities matched area constraint."""
FLOOR = auto()
"""No entities matched floor constraint."""
DOMAIN = auto()
"""No entities matched domain constraint."""
DEVICE_CLASS = auto()
"""No entities matched device class constraint."""
FEATURE = auto()
"""No entities matched supported features constraint."""
STATE = auto()
"""No entities matched required states constraint."""
ASSISTANT = auto()
"""No entities matched exposed to assistant constraint."""
INVALID_AREA = auto()
"""Area name from constraint does not exist."""
INVALID_FLOOR = auto()
"""Floor name from constraint does not exist."""
DUPLICATE_NAME = auto()
"""Two or more entities matched the same name constraint and could not be disambiguated."""
def is_no_entities_reason(self) -> bool:
"""Return True if the match failed because no entities matched."""
return self not in (
MatchFailedReason.INVALID_AREA,
MatchFailedReason.INVALID_FLOOR,
MatchFailedReason.DUPLICATE_NAME,
)
@dataclass
class MatchTargetsConstraints:
"""Constraints for async_match_targets."""
name: str | None = None
"""Entity name or alias."""
area_name: str | None = None
"""Area name, id, or alias."""
floor_name: str | None = None
"""Floor name, id, or alias."""
domains: Collection[str] | None = None
"""Domain names."""
device_classes: Collection[str] | None = None
"""Device class names."""
features: int | None = None
"""Required supported features."""
states: Collection[str] | None = None
"""Required states for entities."""
assistant: str | None = None
"""Name of assistant that entities should be exposed to."""
allow_duplicate_names: bool = False
"""True if entities with duplicate names are allowed in result."""
@property
def has_constraints(self) -> bool:
"""Returns True if at least one constraint is set (ignores assistant)."""
return bool(
self.name
or self.area_name
or self.floor_name
or self.domains
or self.device_classes
or self.features
or self.states
)
@dataclass
class MatchTargetsPreferences:
"""Preferences used to disambiguate duplicate name matches in async_match_targets."""
area_id: str | None = None
"""Id of area to use when deduplicating names."""
floor_id: str | None = None
"""Id of floor to use when deduplicating names."""
@dataclass
class MatchTargetsResult:
"""Result from async_match_targets."""
is_match: bool
"""True if one or more entities matched."""
no_match_reason: MatchFailedReason | None = None
"""Reason for failed match when is_match = False."""
states: list[State] = field(default_factory=list)
"""List of matched entity states when is_match = True."""
no_match_name: str | None = None
"""Name of invalid area/floor or duplicate name when match fails for those reasons."""
areas: list[area_registry.AreaEntry] = field(default_factory=list)
"""Areas that were targeted."""
floors: list[floor_registry.FloorEntry] = field(default_factory=list)
"""Floors that were targeted."""
class MatchFailedError(IntentError):
"""Error when target matching fails."""
def __init__(
self,
result: MatchTargetsResult,
constraints: MatchTargetsConstraints,
preferences: MatchTargetsPreferences | None = None,
) -> None:
"""Initialize error."""
super().__init__()
self.result = result
self.constraints = constraints
self.preferences = preferences
def __str__(self) -> str:
"""Return string representation."""
return f"<MatchFailedError result={self.result}, constraints={self.constraints}, preferences={self.preferences}>"
class NoStatesMatchedError(MatchFailedError):
"""Error when no states match the intent's constraints."""
def __init__(
self,
reason: MatchFailedReason,
name: str | None = None,
area: str | None = None,
floor: str | None = None,
domains: set[str] | None = None,
device_classes: set[str] | None = None,
) -> None:
"""Initialize error."""
super().__init__(
result=MatchTargetsResult(False, reason),
constraints=MatchTargetsConstraints(
name=name,
area_name=area,
floor_name=floor,
domains=domains,
device_classes=device_classes,
),
)
@dataclass
class MatchTargetsCandidate:
"""Candidate for async_match_targets."""
state: State
is_exposed: bool
entity: entity_registry.RegistryEntry | None = None
area: area_registry.AreaEntry | None = None
floor: floor_registry.FloorEntry | None = None
device: device_registry.DeviceEntry | None = None
matched_name: str | None = None
def find_areas(
name: str, areas: area_registry.AreaRegistry
) -> Iterable[area_registry.AreaEntry]:
"""Find all areas matching a name (including aliases)."""
name_norm = _normalize_name(name)
for area in areas.async_list_areas():
# Accept name or area id
if (area.id == name) or (_normalize_name(area.name) == name_norm):
yield area
continue
if not area.aliases:
continue
for alias in area.aliases:
if _normalize_name(alias) == name_norm:
yield area
break
def find_floors(
name: str, floors: floor_registry.FloorRegistry
) -> Iterable[floor_registry.FloorEntry]:
"""Find all floors matching a name (including aliases)."""
name_norm = _normalize_name(name)
for floor in floors.async_list_floors():
# Accept name or floor id
if (floor.floor_id == name) or (_normalize_name(floor.name) == name_norm):
yield floor
continue
if not floor.aliases:
continue
for alias in floor.aliases:
if _normalize_name(alias) == name_norm:
yield floor
break
def _normalize_name(name: str) -> str:
"""Normalize name for comparison."""
return name.strip().casefold()
def _filter_by_name(
name: str,
candidates: Iterable[MatchTargetsCandidate],
) -> Iterable[MatchTargetsCandidate]:
"""Filter candidates by name."""
name_norm = _normalize_name(name)
for candidate in candidates:
# Accept name or entity id
if (candidate.state.entity_id == name) or _normalize_name(
candidate.state.name
) == name_norm:
candidate.matched_name = name
yield candidate
continue
if candidate.entity is None:
continue
if candidate.entity.name and (
_normalize_name(candidate.entity.name) == name_norm
):
candidate.matched_name = name
yield candidate
continue
# Check aliases
if candidate.entity.aliases:
for alias in candidate.entity.aliases:
if _normalize_name(alias) == name_norm:
candidate.matched_name = name
yield candidate
break
def _filter_by_features(
features: int,
candidates: Iterable[MatchTargetsCandidate],
) -> Iterable[MatchTargetsCandidate]:
"""Filter candidates by supported features."""
for candidate in candidates:
if (candidate.entity is not None) and (
(candidate.entity.supported_features & features) == features
):
yield candidate
continue
supported_features = candidate.state.attributes.get(ATTR_SUPPORTED_FEATURES, 0)
if (supported_features & features) == features:
yield candidate
def _filter_by_device_classes(
device_classes: Iterable[str],
candidates: Iterable[MatchTargetsCandidate],
) -> Iterable[MatchTargetsCandidate]:
"""Filter candidates by device classes."""
for candidate in candidates:
if (
(candidate.entity is not None)
and candidate.entity.device_class
and (candidate.entity.device_class in device_classes)
):
yield candidate
continue
device_class = candidate.state.attributes.get(ATTR_DEVICE_CLASS)
if device_class and (device_class in device_classes):
yield candidate
def _add_areas(
areas: area_registry.AreaRegistry,
devices: device_registry.DeviceRegistry,
candidates: Iterable[MatchTargetsCandidate],
) -> None:
"""Add area and device entries to match candidates."""
for candidate in candidates:
if candidate.entity is None:
continue
if candidate.entity.device_id:
candidate.device = devices.async_get(candidate.entity.device_id)
if candidate.entity.area_id:
# Use entity area first
candidate.area = areas.async_get_area(candidate.entity.area_id)
assert candidate.area is not None
elif (candidate.device is not None) and candidate.device.area_id:
# Fall back to device area
candidate.area = areas.async_get_area(candidate.device.area_id)
@callback
def async_match_targets( # noqa: C901
hass: HomeAssistant,
constraints: MatchTargetsConstraints,
preferences: MatchTargetsPreferences | None = None,
states: list[State] | None = None,
) -> MatchTargetsResult:
"""Match entities based on constraints in order to handle an intent."""
preferences = preferences or MatchTargetsPreferences()
filtered_by_domain = False
if not states:
# Get all states and filter by domain
states = hass.states.async_all(constraints.domains)
filtered_by_domain = True
if not states:
return MatchTargetsResult(False, MatchFailedReason.DOMAIN)
candidates = [
MatchTargetsCandidate(
state=state,
is_exposed=(
async_should_expose(hass, constraints.assistant, state.entity_id)
if constraints.assistant
else True
),
)
for state in states
]
if constraints.domains and (not filtered_by_domain):
# Filter by domain (if we didn't already do it)
candidates = [c for c in candidates if c.state.domain in constraints.domains]
if not candidates:
return MatchTargetsResult(False, MatchFailedReason.DOMAIN)
if constraints.states:
# Filter by state
candidates = [c for c in candidates if c.state.state in constraints.states]
if not candidates:
return MatchTargetsResult(False, MatchFailedReason.STATE)
# Try to exit early so we can avoid registry lookups
if not (
constraints.name
or constraints.features
or constraints.device_classes
or constraints.area_name
or constraints.floor_name
):
if constraints.assistant:
# Check exposure
candidates = [c for c in candidates if c.is_exposed]
if not candidates:
return MatchTargetsResult(False, MatchFailedReason.ASSISTANT)
return MatchTargetsResult(True, states=[c.state for c in candidates])
# We need entity registry entries now
er = entity_registry.async_get(hass)
for candidate in candidates:
candidate.entity = er.async_get(candidate.state.entity_id)
if constraints.name:
# Filter by entity name or alias
candidates = list(_filter_by_name(constraints.name, candidates))
if not candidates:
return MatchTargetsResult(False, MatchFailedReason.NAME)
if constraints.features:
# Filter by supported features
candidates = list(_filter_by_features(constraints.features, candidates))
if not candidates:
return MatchTargetsResult(False, MatchFailedReason.FEATURE)
if constraints.device_classes:
# Filter by device class
candidates = list(
_filter_by_device_classes(constraints.device_classes, candidates)
)
if not candidates:
return MatchTargetsResult(False, MatchFailedReason.DEVICE_CLASS)
# Check floor/area constraints
targeted_floors: list[floor_registry.FloorEntry] | None = None
targeted_areas: list[area_registry.AreaEntry] | None = None
# True when area information has been added to candidates
areas_added = False
if constraints.floor_name or constraints.area_name:
ar = area_registry.async_get(hass)
dr = device_registry.async_get(hass)
_add_areas(ar, dr, candidates)
areas_added = True
if constraints.floor_name:
# Filter by areas associated with floor
fr = floor_registry.async_get(hass)
targeted_floors = list(find_floors(constraints.floor_name, fr))
if not targeted_floors:
return MatchTargetsResult(
False,
MatchFailedReason.INVALID_FLOOR,
no_match_name=constraints.floor_name,
)
possible_floor_ids = {floor.floor_id for floor in targeted_floors}
possible_area_ids = {
area.id
for area in ar.async_list_areas()
if area.floor_id in possible_floor_ids
}
candidates = [
c
for c in candidates
if (c.area is not None) and (c.area.id in possible_area_ids)
]
if not candidates:
return MatchTargetsResult(
False, MatchFailedReason.FLOOR, floors=targeted_floors
)
else:
# All areas are possible
possible_area_ids = {area.id for area in ar.async_list_areas()}
if constraints.area_name:
targeted_areas = list(find_areas(constraints.area_name, ar))
if not targeted_areas:
return MatchTargetsResult(
False,
MatchFailedReason.INVALID_AREA,
no_match_name=constraints.area_name,
)
matching_area_ids = {area.id for area in targeted_areas}
# May be constrained by floors above
possible_area_ids.intersection_update(matching_area_ids)
candidates = [
c
for c in candidates
if (c.area is not None) and (c.area.id in possible_area_ids)
]
if not candidates:
return MatchTargetsResult(
False, MatchFailedReason.AREA, areas=targeted_areas
)
if constraints.assistant:
# Check exposure
candidates = [c for c in candidates if c.is_exposed]
if not candidates:
return MatchTargetsResult(False, MatchFailedReason.ASSISTANT)
if constraints.name and (not constraints.allow_duplicate_names):
# Check for duplicates
if not areas_added:
ar = area_registry.async_get(hass)
dr = device_registry.async_get(hass)
_add_areas(ar, dr, candidates)
areas_added = True
sorted_candidates = sorted(
[c for c in candidates if c.matched_name],
key=lambda c: c.matched_name or "",
)
final_candidates: list[MatchTargetsCandidate] = []
for name, group in groupby(sorted_candidates, key=lambda c: c.matched_name):
group_candidates = list(group)
if len(group_candidates) < 2:
# No duplicates for name
final_candidates.extend(group_candidates)
continue
# Try to disambiguate by preferences
if preferences.floor_id:
group_candidates = [
c
for c in group_candidates
if (c.area is not None)
and (c.area.floor_id == preferences.floor_id)
]
if len(group_candidates) < 2:
# Disambiguated by floor
final_candidates.extend(group_candidates)
continue
if preferences.area_id:
group_candidates = [
c
for c in group_candidates
if (c.area is not None) and (c.area.id == preferences.area_id)
]
if len(group_candidates) < 2:
# Disambiguated by area
final_candidates.extend(group_candidates)
continue
# Couldn't disambiguate duplicate names
return MatchTargetsResult(
False,
MatchFailedReason.DUPLICATE_NAME,
no_match_name=name,
areas=targeted_areas or [],
floors=targeted_floors or [],
)
if not final_candidates:
return MatchTargetsResult(
False,
MatchFailedReason.NAME,
areas=targeted_areas or [],
floors=targeted_floors or [],
)
candidates = final_candidates
return MatchTargetsResult(
True,
None,
states=[c.state for c in candidates],
areas=targeted_areas or [],
floors=targeted_floors or [],
)
@callback
@bind_hass
def async_match_states(
hass: HomeAssistant,
name: str | None = None,
area_name: str | None = None,
floor_name: str | None = None,
domains: Collection[str] | None = None,
device_classes: Collection[str] | None = None,
states: list[State] | None = None,
assistant: str | None = None,
) -> Iterable[State]:
"""Simplified interface to async_match_targets that returns states matching the constraints."""
result = async_match_targets(
hass,
constraints=MatchTargetsConstraints(
name=name,
area_name=area_name,
floor_name=floor_name,
domains=domains,
device_classes=device_classes,
assistant=assistant,
),
states=states,
)
return result.states
@callback
def async_test_feature(state: State, feature: int, feature_name: str) -> None:
"""Test if state supports a feature."""
if state.attributes.get(ATTR_SUPPORTED_FEATURES, 0) & feature == 0:
raise IntentHandleError(f"Entity {state.name} does not support {feature_name}")
class IntentHandler:
"""Intent handler registration."""
intent_type: str
platforms: set[str] | None = None
description: str | None = None
@property
def slot_schema(self) -> dict | None:
"""Return a slot schema."""
return None
@callback
def async_can_handle(self, intent_obj: Intent) -> bool:
"""Test if an intent can be handled."""
return self.platforms is None or intent_obj.platform in self.platforms
@callback
def async_validate_slots(self, slots: _SlotsType) -> _SlotsType:
"""Validate slot information."""
if self.slot_schema is None:
return slots
return self._slot_schema(slots) # type: ignore[no-any-return]
@cached_property
def _slot_schema(self) -> vol.Schema:
"""Create validation schema for slots."""
assert self.slot_schema is not None
return vol.Schema(
{
key: SLOT_SCHEMA.extend({"value": validator})
for key, validator in self.slot_schema.items()
},
extra=vol.ALLOW_EXTRA,
)
async def async_handle(self, intent_obj: Intent) -> IntentResponse:
"""Handle the intent."""
raise NotImplementedError
def __repr__(self) -> str:
"""Represent a string of an intent handler."""
return f"<{self.__class__.__name__} - {self.intent_type}>"
def non_empty_string(value: Any) -> str:
"""Coerce value to string and fail if string is empty or whitespace."""
value_str = cv.string(value)
if not value_str.strip():
raise vol.Invalid("string value is empty")
return value_str
class DynamicServiceIntentHandler(IntentHandler):
"""Service Intent handler registration (dynamic).
Service specific intent handler that calls a service by name/entity_id.
"""
# We use a small timeout in service calls to (hopefully) pass validation
# checks, but not try to wait for the call to fully complete.
service_timeout: float = 0.2
def __init__(
self,
intent_type: str,
speech: str | None = None,
required_slots: _IntentSlotsType | None = None,
optional_slots: _IntentSlotsType | None = None,
required_domains: set[str] | None = None,
required_features: int | None = None,
required_states: set[str] | None = None,
description: str | None = None,
platforms: set[str] | None = None,
device_classes: set[type[StrEnum]] | None = None,
) -> None:
"""Create Service Intent Handler."""
self.intent_type = intent_type
self.speech = speech
self.required_domains = required_domains
self.required_features = required_features
self.required_states = required_states
self.description = description
self.platforms = platforms
self.device_classes = device_classes
self.required_slots: _IntentSlotsType = {}
if required_slots:
for key, value_schema in required_slots.items():
if isinstance(key, str):
# Slot name/service data key
key = (key, key)
self.required_slots[key] = value_schema
self.optional_slots: _IntentSlotsType = {}
if optional_slots:
for key, value_schema in optional_slots.items():
if isinstance(key, str):
# Slot name/service data key
key = (key, key)
self.optional_slots[key] = value_schema
@cached_property
def slot_schema(self) -> dict:
"""Return a slot schema."""
domain_validator = (
vol.In(list(self.required_domains)) if self.required_domains else cv.string
)
slot_schema = {
vol.Any("name", "area", "floor"): non_empty_string,
vol.Optional("domain"): vol.All(cv.ensure_list, [domain_validator]),
}
if self.device_classes:
# The typical way to match enums is with vol.Coerce, but we build a
# flat list to make the API simpler to describe programmatically
flattened_device_classes = vol.In(
[
device_class.value
for device_class_enum in self.device_classes
for device_class in device_class_enum
]
)
slot_schema.update(
{
vol.Optional("device_class"): vol.All(
cv.ensure_list,
[flattened_device_classes],
)
}
)
slot_schema.update(
{
vol.Optional("preferred_area_id"): cv.string,
vol.Optional("preferred_floor_id"): cv.string,
}
)
if self.required_slots:
slot_schema.update(
{
vol.Required(key[0]): validator
for key, validator in self.required_slots.items()
}
)
if self.optional_slots:
slot_schema.update(
{
vol.Optional(key[0]): validator
for key, validator in self.optional_slots.items()
}
)
return slot_schema
@abstractmethod
def get_domain_and_service(
self, intent_obj: Intent, state: State
) -> tuple[str, str]:
"""Get the domain and service name to call."""
raise NotImplementedError
async def async_handle(self, intent_obj: Intent) -> IntentResponse:
"""Handle the hass intent."""
hass = intent_obj.hass
slots = self.async_validate_slots(intent_obj.slots)
name_slot = slots.get("name", {})
entity_name: str | None = name_slot.get("value")
entity_text: str | None = name_slot.get("text")
if entity_name == "all":
# Don't match on name if targeting all entities
entity_name = None
# Get area/floor info
area_slot = slots.get("area", {})
area_id = area_slot.get("value")
floor_slot = slots.get("floor", {})
floor_id = floor_slot.get("value")
# Optional domain/device class filters.
# Convert to sets for speed.
domains: set[str] | None = self.required_domains
device_classes: set[str] | None = None
if "domain" in slots:
domains = set(slots["domain"]["value"])
if "device_class" in slots:
device_classes = set(slots["device_class"]["value"])
match_constraints = MatchTargetsConstraints(
name=entity_name,
area_name=area_id,
floor_name=floor_id,
domains=domains,
device_classes=device_classes,
assistant=intent_obj.assistant,
features=self.required_features,
states=self.required_states,
)
if not match_constraints.has_constraints:
# Fail if attempting to target all devices in the house
raise IntentHandleError("Service handler cannot target all devices")
match_preferences = MatchTargetsPreferences(
area_id=slots.get("preferred_area_id", {}).get("value"),
floor_id=slots.get("preferred_floor_id", {}).get("value"),
)
match_result = async_match_targets(hass, match_constraints, match_preferences)
if not match_result.is_match:
raise MatchFailedError(
result=match_result,
constraints=match_constraints,
preferences=match_preferences,
)
# Ensure name is text
if ("name" in slots) and entity_text:
slots["name"]["value"] = entity_text
# Replace area/floor values with the resolved ids for use in templates
if ("area" in slots) and match_result.areas:
slots["area"]["value"] = match_result.areas[0].id
if ("floor" in slots) and match_result.floors:
slots["floor"]["value"] = match_result.floors[0].floor_id
# Update intent slots to include any transformations done by the schemas
intent_obj.slots = slots
response = await self.async_handle_states(
intent_obj, match_result, match_constraints, match_preferences
)
# Make the matched states available in the response
response.async_set_states(
matched_states=match_result.states, unmatched_states=[]
)
return response
async def async_handle_states(
self,
intent_obj: Intent,
match_result: MatchTargetsResult,
match_constraints: MatchTargetsConstraints,
match_preferences: MatchTargetsPreferences | None = None,
) -> IntentResponse:
"""Complete action on matched entity states."""
states = match_result.states
response = intent_obj.create_response()
hass = intent_obj.hass
success_results: list[IntentResponseTarget] = []
if match_result.floors:
success_results.extend(
IntentResponseTarget(
type=IntentResponseTargetType.FLOOR,
name=floor.name,
id=floor.floor_id,
)
for floor in match_result.floors
)
speech_name = match_result.floors[0].name
elif match_result.areas:
success_results.extend(
IntentResponseTarget(
type=IntentResponseTargetType.AREA, name=area.name, id=area.id
)
for area in match_result.areas
)
speech_name = match_result.areas[0].name
else:
speech_name = states[0].name
service_coros: list[Coroutine[Any, Any, None]] = []
for state in states:
domain, service = self.get_domain_and_service(intent_obj, state)
service_coros.append(
self.async_call_service(domain, service, intent_obj, state)
)
# Handle service calls in parallel, noting failures as they occur.
failed_results: list[IntentResponseTarget] = []
for state, service_coro in zip(
states, asyncio.as_completed(service_coros), strict=False
):
target = IntentResponseTarget(
type=IntentResponseTargetType.ENTITY,
name=state.name,
id=state.entity_id,
)
try:
await service_coro
success_results.append(target)
except Exception:
failed_results.append(target)
_LOGGER.exception("Service call failed for %s", state.entity_id)
if not success_results:
# If no entities succeeded, raise an error.
failed_entity_ids = [target.id for target in failed_results]
raise IntentHandleError(
f"Failed to call {service} for: {failed_entity_ids}"
)
response.async_set_results(
success_results=success_results, failed_results=failed_results
)
# Update all states
states = [hass.states.get(state.entity_id) or state for state in states]
response.async_set_states(states)
if self.speech is not None:
response.async_set_speech(self.speech.format(speech_name))
return response
async def async_call_service(
self, domain: str, service: str, intent_obj: Intent, state: State
) -> None:
"""Call service on entity."""
hass = intent_obj.hass
service_data: dict[str, Any] = {ATTR_ENTITY_ID: state.entity_id}
if self.required_slots:
service_data.update(
{
key[1]: intent_obj.slots[key[0]]["value"]
for key in self.required_slots
}
)
if self.optional_slots:
for key in self.optional_slots:
value = intent_obj.slots.get(key[0])
if value:
service_data[key[1]] = value["value"]
await self._run_then_background(
hass.async_create_task_internal(
hass.services.async_call(
domain,
service,
service_data,
context=intent_obj.context,
blocking=True,
),
f"intent_call_service_{domain}_{service}",
)
)
async def _run_then_background(self, task: asyncio.Task[Any]) -> None:
"""Run task with timeout to (hopefully) catch validation errors.
After the timeout the task will continue to run in the background.
"""
try:
await asyncio.wait({task}, timeout=self.service_timeout)
except TimeoutError:
pass
except asyncio.CancelledError:
# Task calling us was cancelled, so cancel service call task, and wait for
# it to be cancelled, within reason, before leaving.
_LOGGER.debug("Service call was cancelled: %s", task.get_name())
task.cancel()
await asyncio.wait({task}, timeout=5)
raise
class ServiceIntentHandler(DynamicServiceIntentHandler):
"""Service Intent handler registration.
Service specific intent handler that calls a service by name/entity_id.
"""
def __init__(
self,
intent_type: str,
domain: str,
service: str,
speech: str | None = None,
required_slots: _IntentSlotsType | None = None,
optional_slots: _IntentSlotsType | None = None,
required_domains: set[str] | None = None,
required_features: int | None = None,
required_states: set[str] | None = None,
description: str | None = None,
platforms: set[str] | None = None,
device_classes: set[type[StrEnum]] | None = None,
) -> None:
"""Create service handler."""
super().__init__(
intent_type,
speech=speech,
required_slots=required_slots,
optional_slots=optional_slots,
required_domains=required_domains,
required_features=required_features,
required_states=required_states,
description=description,
platforms=platforms,
device_classes=device_classes,
)
self.domain = domain
self.service = service
def get_domain_and_service(
self, intent_obj: Intent, state: State
) -> tuple[str, str]:
"""Get the domain and service name to call."""
return (self.domain, self.service)
class IntentCategory(Enum):
"""Category of an intent."""
ACTION = "action"
"""Trigger an action like turning an entity on or off"""
QUERY = "query"
"""Get information about the state of an entity"""
class Intent:
"""Hold the intent."""
__slots__ = [
"hass",
"platform",
"intent_type",
"slots",
"text_input",
"context",
"language",
"category",
"assistant",
"device_id",
"conversation_agent_id",
]
def __init__(
self,
hass: HomeAssistant,
platform: str,
intent_type: str,
slots: _SlotsType,
text_input: str | None,
context: Context,
language: str,
category: IntentCategory | None = None,
assistant: str | None = None,
device_id: str | None = None,
conversation_agent_id: str | None = None,
) -> None:
"""Initialize an intent."""
self.hass = hass
self.platform = platform
self.intent_type = intent_type
self.slots = slots
self.text_input = text_input
self.context = context
self.language = language
self.category = category
self.assistant = assistant
self.device_id = device_id
self.conversation_agent_id = conversation_agent_id
@callback
def create_response(self) -> IntentResponse:
"""Create a response."""
return IntentResponse(language=self.language, intent=self)
class IntentResponseType(Enum):
"""Type of the intent response."""
ACTION_DONE = "action_done"
"""Intent caused an action to occur"""
PARTIAL_ACTION_DONE = "partial_action_done"
"""Intent caused an action, but it could only be partially done"""
QUERY_ANSWER = "query_answer"
"""Response is an answer to a query"""
ERROR = "error"
"""Response is an error"""
class IntentResponseErrorCode(str, Enum):
"""Reason for an intent response error."""
NO_INTENT_MATCH = "no_intent_match"
"""Text could not be matched to an intent"""
NO_VALID_TARGETS = "no_valid_targets"
"""Intent was matched, but no valid areas/devices/entities were targeted"""
FAILED_TO_HANDLE = "failed_to_handle"
"""Unexpected error occurred while handling intent"""
UNKNOWN = "unknown"
"""Error outside the scope of intent processing"""
class IntentResponseTargetType(str, Enum):
"""Type of target for an intent response."""
AREA = "area"
FLOOR = "floor"
DEVICE = "device"
ENTITY = "entity"
DOMAIN = "domain"
DEVICE_CLASS = "device_class"
CUSTOM = "custom"
@dataclass(slots=True)
class IntentResponseTarget:
"""Target of the intent response."""
name: str
type: IntentResponseTargetType
id: str | None = None
class IntentResponse:
"""Response to an intent."""
def __init__(
self,
language: str,
intent: Intent | None = None,
) -> None:
"""Initialize an IntentResponse."""
self.language = language
self.intent = intent
self.speech: dict[str, dict[str, Any]] = {}
self.reprompt: dict[str, dict[str, Any]] = {}
self.card: dict[str, dict[str, str]] = {}
self.error_code: IntentResponseErrorCode | None = None
self.intent_targets: list[IntentResponseTarget] = []
self.success_results: list[IntentResponseTarget] = []
self.failed_results: list[IntentResponseTarget] = []
self.matched_states: list[State] = []
self.unmatched_states: list[State] = []
self.speech_slots: dict[str, Any] = {}
if (self.intent is not None) and (self.intent.category == IntentCategory.QUERY):
# speech will be the answer to the query
self.response_type = IntentResponseType.QUERY_ANSWER
else:
self.response_type = IntentResponseType.ACTION_DONE
@callback
def async_set_speech(
self,
speech: str,
speech_type: str = "plain",
extra_data: Any | None = None,
) -> None:
"""Set speech response."""
self.speech[speech_type] = {
"speech": speech,
"extra_data": extra_data,
}
@callback
def async_set_reprompt(
self,
speech: str,
speech_type: str = "plain",
extra_data: Any | None = None,
) -> None:
"""Set reprompt response."""
self.reprompt[speech_type] = {
"reprompt": speech,
"extra_data": extra_data,
}
@callback
def async_set_card(
self, title: str, content: str, card_type: str = "simple"
) -> None:
"""Set card response."""
self.card[card_type] = {"title": title, "content": content}
@callback
def async_set_error(self, code: IntentResponseErrorCode, message: str) -> None:
"""Set response error."""
self.response_type = IntentResponseType.ERROR
self.error_code = code
# Speak error message
self.async_set_speech(message)
@callback
def async_set_targets(
self,
intent_targets: list[IntentResponseTarget],
) -> None:
"""Set response targets."""
self.intent_targets = intent_targets
@callback
def async_set_results(
self,
success_results: list[IntentResponseTarget],
failed_results: list[IntentResponseTarget] | None = None,
) -> None:
"""Set response results."""
self.success_results = success_results
self.failed_results = failed_results if failed_results is not None else []
@callback
def async_set_states(
self, matched_states: list[State], unmatched_states: list[State] | None = None
) -> None:
"""Set entity states that were matched or not matched during intent handling (query)."""
self.matched_states = matched_states
self.unmatched_states = unmatched_states or []
@callback
def async_set_speech_slots(self, speech_slots: dict[str, Any]) -> None:
"""Set slots that will be used in the response template of the default agent."""
self.speech_slots = speech_slots
@callback
def as_dict(self) -> dict[str, Any]:
"""Return a dictionary representation of an intent response."""
response_dict: dict[str, Any] = {
"speech": self.speech,
"card": self.card,
"language": self.language,
"response_type": self.response_type.value,
}
if self.reprompt:
response_dict["reprompt"] = self.reprompt
if self.speech_slots:
response_dict["speech_slots"] = self.speech_slots
response_data: dict[str, Any] = {}
if self.response_type == IntentResponseType.ERROR:
assert self.error_code is not None, "error code is required"
response_data["code"] = self.error_code.value
else:
# action done or query answer
response_data["targets"] = [
dataclasses.asdict(target) for target in self.intent_targets
]
# Add success/failed targets
response_data["success"] = [
dataclasses.asdict(target) for target in self.success_results
]
response_data["failed"] = [
dataclasses.asdict(target) for target in self.failed_results
]
response_dict["data"] = response_data
return response_dict