mirror of https://github.com/home-assistant/core
380 lines
11 KiB
Python
380 lines
11 KiB
Python
"""Support for Google Assistant Smart Home API."""
|
|
|
|
import asyncio
|
|
from collections.abc import Callable, Coroutine
|
|
from itertools import product
|
|
import logging
|
|
import pprint
|
|
from typing import Any
|
|
|
|
from homeassistant.const import ATTR_ENTITY_ID, __version__
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers import instance_id
|
|
from homeassistant.util.decorator import Registry
|
|
|
|
from .const import (
|
|
ERR_DEVICE_OFFLINE,
|
|
ERR_PROTOCOL_ERROR,
|
|
ERR_UNKNOWN_ERROR,
|
|
EVENT_COMMAND_RECEIVED,
|
|
EVENT_QUERY_RECEIVED,
|
|
EVENT_SYNC_RECEIVED,
|
|
)
|
|
from .data_redaction import async_redact_msg
|
|
from .error import SmartHomeError
|
|
from .helpers import GoogleEntity, RequestData, async_get_entities
|
|
|
|
EXECUTE_LIMIT = 2 # Wait 2 seconds for execute to finish
|
|
|
|
HANDLERS: Registry[
|
|
str,
|
|
Callable[
|
|
[HomeAssistant, RequestData, dict[str, Any]],
|
|
Coroutine[Any, Any, dict[str, Any] | None],
|
|
],
|
|
] = Registry()
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
async def async_handle_message(
|
|
hass, config, agent_user_id, local_user_id, message, source
|
|
):
|
|
"""Handle incoming API messages."""
|
|
if _LOGGER.isEnabledFor(logging.DEBUG):
|
|
_LOGGER.debug(
|
|
"Processing message:\n%s",
|
|
pprint.pformat(async_redact_msg(message, agent_user_id)),
|
|
)
|
|
|
|
data = RequestData(
|
|
config, local_user_id, source, message["requestId"], message.get("devices")
|
|
)
|
|
|
|
response = await _process(hass, data, message)
|
|
if _LOGGER.isEnabledFor(logging.DEBUG):
|
|
if response:
|
|
_LOGGER.debug(
|
|
"Response:\n%s",
|
|
pprint.pformat(async_redact_msg(response["payload"], agent_user_id)),
|
|
)
|
|
else:
|
|
_LOGGER.debug("Empty response")
|
|
|
|
if response and "errorCode" in response["payload"]:
|
|
_LOGGER.error(
|
|
"Error handling message\n:%s\nResponse:\n%s",
|
|
pprint.pformat(async_redact_msg(message, agent_user_id)),
|
|
pprint.pformat(async_redact_msg(response["payload"], agent_user_id)),
|
|
)
|
|
|
|
return response
|
|
|
|
|
|
async def _process(hass, data, message):
|
|
"""Process a message."""
|
|
inputs: list = message.get("inputs")
|
|
|
|
if len(inputs) != 1:
|
|
return {
|
|
"requestId": data.request_id,
|
|
"payload": {"errorCode": ERR_PROTOCOL_ERROR},
|
|
}
|
|
|
|
if (handler := HANDLERS.get(inputs[0].get("intent"))) is None:
|
|
return {
|
|
"requestId": data.request_id,
|
|
"payload": {"errorCode": ERR_PROTOCOL_ERROR},
|
|
}
|
|
|
|
try:
|
|
result = await handler(hass, data, inputs[0].get("payload"))
|
|
except SmartHomeError as err:
|
|
return {"requestId": data.request_id, "payload": {"errorCode": err.code}}
|
|
except Exception:
|
|
_LOGGER.exception("Unexpected error")
|
|
return {
|
|
"requestId": data.request_id,
|
|
"payload": {"errorCode": ERR_UNKNOWN_ERROR},
|
|
}
|
|
|
|
if result is None:
|
|
return None
|
|
|
|
return {"requestId": data.request_id, "payload": result}
|
|
|
|
|
|
async def async_devices_sync_response(hass, config, agent_user_id):
|
|
"""Generate the device serialization."""
|
|
entities = async_get_entities(hass, config)
|
|
instance_uuid = await instance_id.async_get(hass)
|
|
devices = []
|
|
|
|
for entity in entities:
|
|
if not entity.should_expose():
|
|
continue
|
|
|
|
try:
|
|
devices.append(entity.sync_serialize(agent_user_id, instance_uuid))
|
|
except Exception:
|
|
_LOGGER.exception("Error serializing %s", entity.entity_id)
|
|
|
|
return devices
|
|
|
|
|
|
@HANDLERS.register("action.devices.SYNC")
|
|
async def async_devices_sync(
|
|
hass: HomeAssistant, data: RequestData, payload: dict[str, Any]
|
|
) -> dict[str, Any]:
|
|
"""Handle action.devices.SYNC request.
|
|
|
|
https://developers.google.com/assistant/smarthome/develop/process-intents#SYNC
|
|
"""
|
|
hass.bus.async_fire(
|
|
EVENT_SYNC_RECEIVED,
|
|
{"request_id": data.request_id, "source": data.source},
|
|
context=data.context,
|
|
)
|
|
|
|
agent_user_id = data.config.get_agent_user_id_from_context(data.context)
|
|
await data.config.async_connect_agent_user(agent_user_id)
|
|
|
|
devices = await async_devices_sync_response(hass, data.config, agent_user_id)
|
|
return create_sync_response(agent_user_id, devices)
|
|
|
|
|
|
@HANDLERS.register("action.devices.QUERY")
|
|
async def async_devices_query(
|
|
hass: HomeAssistant, data: RequestData, payload: dict[str, Any]
|
|
) -> dict[str, Any]:
|
|
"""Handle action.devices.QUERY request.
|
|
|
|
https://developers.google.com/assistant/smarthome/develop/process-intents#QUERY
|
|
"""
|
|
payload_devices = payload.get("devices", [])
|
|
|
|
hass.bus.async_fire(
|
|
EVENT_QUERY_RECEIVED,
|
|
{
|
|
"request_id": data.request_id,
|
|
ATTR_ENTITY_ID: [device["id"] for device in payload_devices],
|
|
"source": data.source,
|
|
},
|
|
context=data.context,
|
|
)
|
|
|
|
return await async_devices_query_response(hass, data.config, payload_devices)
|
|
|
|
|
|
async def async_devices_query_response(hass, config, payload_devices):
|
|
"""Generate the device serialization."""
|
|
devices = {}
|
|
for device in payload_devices:
|
|
devid = device["id"]
|
|
|
|
if not (state := hass.states.get(devid)):
|
|
# If we can't find a state, the device is offline
|
|
devices[devid] = {"online": False}
|
|
continue
|
|
|
|
entity = GoogleEntity(hass, config, state)
|
|
try:
|
|
devices[devid] = entity.query_serialize()
|
|
except Exception:
|
|
_LOGGER.exception("Unexpected error serializing query for %s", state)
|
|
devices[devid] = {"online": False}
|
|
|
|
return {"devices": devices}
|
|
|
|
|
|
async def _entity_execute(entity, data, executions):
|
|
"""Execute all commands for an entity.
|
|
|
|
Returns a dict if a special result needs to be set.
|
|
"""
|
|
for execution in executions:
|
|
try:
|
|
await entity.execute(data, execution)
|
|
except SmartHomeError as err:
|
|
return {
|
|
"ids": [entity.entity_id],
|
|
"status": "ERROR",
|
|
**err.to_response(),
|
|
}
|
|
|
|
return None
|
|
|
|
|
|
@HANDLERS.register("action.devices.EXECUTE")
|
|
async def handle_devices_execute(
|
|
hass: HomeAssistant, data: RequestData, payload: dict[str, Any]
|
|
) -> dict[str, Any]:
|
|
"""Handle action.devices.EXECUTE request.
|
|
|
|
https://developers.google.com/assistant/smarthome/develop/process-intents#EXECUTE
|
|
"""
|
|
entities: dict[str, GoogleEntity] = {}
|
|
executions: dict[str, list[Any]] = {}
|
|
results: dict[str, dict[str, Any]] = {}
|
|
|
|
for command in payload["commands"]:
|
|
hass.bus.async_fire(
|
|
EVENT_COMMAND_RECEIVED,
|
|
{
|
|
"request_id": data.request_id,
|
|
ATTR_ENTITY_ID: [device["id"] for device in command["devices"]],
|
|
"execution": command["execution"],
|
|
"source": data.source,
|
|
},
|
|
context=data.context,
|
|
)
|
|
|
|
for device, execution in product(command["devices"], command["execution"]):
|
|
entity_id = device["id"]
|
|
|
|
# Happens if error occurred. Skip entity for further processing
|
|
if entity_id in results:
|
|
continue
|
|
|
|
if entity_id in entities:
|
|
executions[entity_id].append(execution)
|
|
continue
|
|
|
|
if (state := hass.states.get(entity_id)) is None:
|
|
results[entity_id] = {
|
|
"ids": [entity_id],
|
|
"status": "ERROR",
|
|
"errorCode": ERR_DEVICE_OFFLINE,
|
|
}
|
|
continue
|
|
|
|
entities[entity_id] = GoogleEntity(hass, data.config, state)
|
|
executions[entity_id] = [execution]
|
|
|
|
try:
|
|
execute_results = await asyncio.wait_for(
|
|
asyncio.shield(
|
|
asyncio.gather(
|
|
*(
|
|
_entity_execute(entities[entity_id], data, execution)
|
|
for entity_id, execution in executions.items()
|
|
)
|
|
)
|
|
),
|
|
EXECUTE_LIMIT,
|
|
)
|
|
results.update(
|
|
{
|
|
entity_id: result
|
|
for entity_id, result in zip(executions, execute_results, strict=False)
|
|
if result is not None
|
|
}
|
|
)
|
|
except TimeoutError:
|
|
pass
|
|
|
|
final_results = list(results.values())
|
|
|
|
for entity in entities.values():
|
|
if entity.entity_id in results:
|
|
continue
|
|
|
|
entity.async_update()
|
|
|
|
final_results.append(
|
|
{
|
|
"ids": [entity.entity_id],
|
|
"status": "SUCCESS",
|
|
"states": entity.query_serialize(),
|
|
}
|
|
)
|
|
|
|
return {"commands": final_results}
|
|
|
|
|
|
@HANDLERS.register("action.devices.DISCONNECT")
|
|
async def async_devices_disconnect(
|
|
hass: HomeAssistant, data: RequestData, payload: dict[str, Any]
|
|
) -> None:
|
|
"""Handle action.devices.DISCONNECT request.
|
|
|
|
https://developers.google.com/assistant/smarthome/develop/process-intents#DISCONNECT
|
|
"""
|
|
assert data.context.user_id is not None
|
|
await data.config.async_disconnect_agent_user(data.context.user_id)
|
|
|
|
|
|
@HANDLERS.register("action.devices.IDENTIFY")
|
|
async def async_devices_identify(
|
|
hass: HomeAssistant, data: RequestData, payload: dict[str, Any]
|
|
) -> dict[str, Any]:
|
|
"""Handle action.devices.IDENTIFY request.
|
|
|
|
https://developers.google.com/assistant/smarthome/develop/local#implement_the_identify_handler
|
|
"""
|
|
return {
|
|
"device": {
|
|
"id": data.config.get_agent_user_id_from_context(data.context),
|
|
"isLocalOnly": True,
|
|
"isProxy": True,
|
|
"deviceInfo": {
|
|
"hwVersion": "UNKNOWN_HW_VERSION",
|
|
"manufacturer": "Home Assistant",
|
|
"model": "Home Assistant",
|
|
"swVersion": __version__,
|
|
},
|
|
}
|
|
}
|
|
|
|
|
|
@HANDLERS.register("action.devices.REACHABLE_DEVICES")
|
|
async def async_devices_reachable(
|
|
hass: HomeAssistant, data: RequestData, payload: dict[str, Any]
|
|
) -> dict[str, Any]:
|
|
"""Handle action.devices.REACHABLE_DEVICES request.
|
|
|
|
https://developers.google.com/assistant/smarthome/develop/local#implement_the_reachable_devices_handler_hub_integrations_only
|
|
"""
|
|
google_ids = {dev["id"] for dev in (data.devices or [])}
|
|
|
|
return {
|
|
"devices": [
|
|
entity.reachable_device_serialize()
|
|
for entity in async_get_entities(hass, data.config)
|
|
if entity.entity_id in google_ids and entity.should_expose_local()
|
|
]
|
|
}
|
|
|
|
|
|
@HANDLERS.register("action.devices.PROXY_SELECTED")
|
|
async def async_devices_proxy_selected(
|
|
hass: HomeAssistant, data: RequestData, payload: dict[str, Any]
|
|
) -> dict[str, Any]:
|
|
"""Handle action.devices.PROXY_SELECTED request.
|
|
|
|
When selected for local SDK.
|
|
"""
|
|
return {}
|
|
|
|
|
|
def create_sync_response(agent_user_id: str, devices: list):
|
|
"""Return an empty sync response."""
|
|
return {
|
|
"agentUserId": agent_user_id,
|
|
"devices": devices,
|
|
}
|
|
|
|
|
|
def api_disabled_response(message, agent_user_id):
|
|
"""Return a device turned off response."""
|
|
inputs: list = message.get("inputs")
|
|
|
|
if inputs and inputs[0].get("intent") == "action.devices.SYNC":
|
|
payload = create_sync_response(agent_user_id, [])
|
|
else:
|
|
payload = {"errorCode": "deviceTurnedOff"}
|
|
|
|
return {
|
|
"requestId": message.get("requestId"),
|
|
"payload": payload,
|
|
}
|