mirror of https://github.com/home-assistant/core
544 lines
15 KiB
Python
544 lines
15 KiB
Python
"""KNX Websocket API."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from collections.abc import Awaitable, Callable
|
|
from functools import wraps
|
|
from typing import TYPE_CHECKING, Any, Final, overload
|
|
|
|
import knx_frontend as knx_panel
|
|
import voluptuous as vol
|
|
from xknx.telegram import Telegram
|
|
from xknxproject.exceptions import XknxProjectException
|
|
|
|
from homeassistant.components import panel_custom, websocket_api
|
|
from homeassistant.components.http import StaticPathConfig
|
|
from homeassistant.const import CONF_ENTITY_ID, CONF_PLATFORM
|
|
from homeassistant.core import HomeAssistant, callback
|
|
from homeassistant.helpers import device_registry as dr
|
|
from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
|
from homeassistant.helpers.typing import UNDEFINED
|
|
from homeassistant.util.ulid import ulid_now
|
|
|
|
from .const import DOMAIN, KNX_MODULE_KEY
|
|
from .storage.config_store import ConfigStoreException
|
|
from .storage.const import CONF_DATA
|
|
from .storage.entity_store_schema import (
|
|
CREATE_ENTITY_BASE_SCHEMA,
|
|
UPDATE_ENTITY_BASE_SCHEMA,
|
|
)
|
|
from .storage.entity_store_validation import (
|
|
EntityStoreValidationException,
|
|
EntityStoreValidationSuccess,
|
|
validate_entity_data,
|
|
)
|
|
from .telegrams import SIGNAL_KNX_TELEGRAM, TelegramDict
|
|
|
|
if TYPE_CHECKING:
|
|
from . import KNXModule
|
|
|
|
URL_BASE: Final = "/knx_static"
|
|
|
|
|
|
async def register_panel(hass: HomeAssistant) -> None:
|
|
"""Register the KNX Panel and Websocket API."""
|
|
websocket_api.async_register_command(hass, ws_info)
|
|
websocket_api.async_register_command(hass, ws_project_file_process)
|
|
websocket_api.async_register_command(hass, ws_project_file_remove)
|
|
websocket_api.async_register_command(hass, ws_group_monitor_info)
|
|
websocket_api.async_register_command(hass, ws_group_telegrams)
|
|
websocket_api.async_register_command(hass, ws_subscribe_telegram)
|
|
websocket_api.async_register_command(hass, ws_get_knx_project)
|
|
websocket_api.async_register_command(hass, ws_validate_entity)
|
|
websocket_api.async_register_command(hass, ws_create_entity)
|
|
websocket_api.async_register_command(hass, ws_update_entity)
|
|
websocket_api.async_register_command(hass, ws_delete_entity)
|
|
websocket_api.async_register_command(hass, ws_get_entity_config)
|
|
websocket_api.async_register_command(hass, ws_get_entity_entries)
|
|
websocket_api.async_register_command(hass, ws_create_device)
|
|
|
|
if DOMAIN not in hass.data.get("frontend_panels", {}):
|
|
await hass.http.async_register_static_paths(
|
|
[
|
|
StaticPathConfig(
|
|
URL_BASE,
|
|
path=knx_panel.locate_dir(),
|
|
cache_headers=knx_panel.is_prod_build,
|
|
)
|
|
]
|
|
)
|
|
await panel_custom.async_register_panel(
|
|
hass=hass,
|
|
frontend_url_path=DOMAIN,
|
|
webcomponent_name=knx_panel.webcomponent_name,
|
|
sidebar_title=DOMAIN.upper(),
|
|
sidebar_icon="mdi:bus-electric",
|
|
module_url=f"{URL_BASE}/{knx_panel.entrypoint_js}",
|
|
embed_iframe=True,
|
|
require_admin=True,
|
|
)
|
|
|
|
|
|
type KnxWebSocketCommandHandler = Callable[
|
|
[HomeAssistant, KNXModule, websocket_api.ActiveConnection, dict[str, Any]], None
|
|
]
|
|
type KnxAsyncWebSocketCommandHandler = Callable[
|
|
[HomeAssistant, KNXModule, websocket_api.ActiveConnection, dict[str, Any]],
|
|
Awaitable[None],
|
|
]
|
|
|
|
|
|
@overload
|
|
def provide_knx(
|
|
func: KnxAsyncWebSocketCommandHandler,
|
|
) -> websocket_api.const.AsyncWebSocketCommandHandler: ...
|
|
@overload
|
|
def provide_knx(
|
|
func: KnxWebSocketCommandHandler,
|
|
) -> websocket_api.const.WebSocketCommandHandler: ...
|
|
|
|
|
|
def provide_knx(
|
|
func: KnxAsyncWebSocketCommandHandler | KnxWebSocketCommandHandler,
|
|
) -> (
|
|
websocket_api.const.AsyncWebSocketCommandHandler
|
|
| websocket_api.const.WebSocketCommandHandler
|
|
):
|
|
"""Websocket decorator to provide a KNXModule instance."""
|
|
|
|
def _send_not_loaded_error(
|
|
connection: websocket_api.ActiveConnection, msg_id: int
|
|
) -> None:
|
|
connection.send_error(
|
|
msg_id,
|
|
websocket_api.const.ERR_HOME_ASSISTANT_ERROR,
|
|
"KNX integration not loaded.",
|
|
)
|
|
|
|
if asyncio.iscoroutinefunction(func):
|
|
|
|
@wraps(func)
|
|
async def with_knx(
|
|
hass: HomeAssistant,
|
|
connection: websocket_api.ActiveConnection,
|
|
msg: dict[str, Any],
|
|
) -> None:
|
|
"""Add KNX Module to call function."""
|
|
try:
|
|
knx = hass.data[KNX_MODULE_KEY]
|
|
except KeyError:
|
|
_send_not_loaded_error(connection, msg["id"])
|
|
return
|
|
await func(hass, knx, connection, msg)
|
|
|
|
else:
|
|
|
|
@wraps(func)
|
|
def with_knx(
|
|
hass: HomeAssistant,
|
|
connection: websocket_api.ActiveConnection,
|
|
msg: dict[str, Any],
|
|
) -> None:
|
|
"""Add KNX Module to call function."""
|
|
try:
|
|
knx = hass.data[KNX_MODULE_KEY]
|
|
except KeyError:
|
|
_send_not_loaded_error(connection, msg["id"])
|
|
return
|
|
func(hass, knx, connection, msg)
|
|
|
|
return with_knx
|
|
|
|
|
|
@websocket_api.require_admin
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "knx/info",
|
|
}
|
|
)
|
|
@provide_knx
|
|
@callback
|
|
def ws_info(
|
|
hass: HomeAssistant,
|
|
knx: KNXModule,
|
|
connection: websocket_api.ActiveConnection,
|
|
msg: dict,
|
|
) -> None:
|
|
"""Handle get info command."""
|
|
_project_info = None
|
|
if project_info := knx.project.info:
|
|
_project_info = {
|
|
"name": project_info["name"],
|
|
"last_modified": project_info["last_modified"],
|
|
"tool_version": project_info["tool_version"],
|
|
"xknxproject_version": project_info["xknxproject_version"],
|
|
}
|
|
|
|
connection.send_result(
|
|
msg["id"],
|
|
{
|
|
"version": knx.xknx.version,
|
|
"connected": knx.xknx.connection_manager.connected.is_set(),
|
|
"current_address": str(knx.xknx.current_address),
|
|
"project": _project_info,
|
|
},
|
|
)
|
|
|
|
|
|
@websocket_api.require_admin
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "knx/get_knx_project",
|
|
}
|
|
)
|
|
@websocket_api.async_response
|
|
@provide_knx
|
|
async def ws_get_knx_project(
|
|
hass: HomeAssistant,
|
|
knx: KNXModule,
|
|
connection: websocket_api.ActiveConnection,
|
|
msg: dict,
|
|
) -> None:
|
|
"""Handle get KNX project."""
|
|
knxproject = await knx.project.get_knxproject()
|
|
connection.send_result(
|
|
msg["id"],
|
|
{
|
|
"project_loaded": knx.project.loaded,
|
|
"knxproject": knxproject,
|
|
},
|
|
)
|
|
|
|
|
|
@websocket_api.require_admin
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "knx/project_file_process",
|
|
vol.Required("file_id"): str,
|
|
vol.Required("password"): str,
|
|
}
|
|
)
|
|
@websocket_api.async_response
|
|
@provide_knx
|
|
async def ws_project_file_process(
|
|
hass: HomeAssistant,
|
|
knx: KNXModule,
|
|
connection: websocket_api.ActiveConnection,
|
|
msg: dict,
|
|
) -> None:
|
|
"""Handle get info command."""
|
|
try:
|
|
await knx.project.process_project_file(
|
|
xknx=knx.xknx,
|
|
file_id=msg["file_id"],
|
|
password=msg["password"],
|
|
)
|
|
except (ValueError, XknxProjectException) as err:
|
|
# ValueError could raise from file_upload integration
|
|
connection.send_error(
|
|
msg["id"], websocket_api.ERR_HOME_ASSISTANT_ERROR, str(err)
|
|
)
|
|
return
|
|
|
|
connection.send_result(msg["id"])
|
|
|
|
|
|
@websocket_api.require_admin
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "knx/project_file_remove",
|
|
}
|
|
)
|
|
@websocket_api.async_response
|
|
@provide_knx
|
|
async def ws_project_file_remove(
|
|
hass: HomeAssistant,
|
|
knx: KNXModule,
|
|
connection: websocket_api.ActiveConnection,
|
|
msg: dict,
|
|
) -> None:
|
|
"""Handle get info command."""
|
|
await knx.project.remove_project_file()
|
|
connection.send_result(msg["id"])
|
|
|
|
|
|
@websocket_api.require_admin
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "knx/group_monitor_info",
|
|
}
|
|
)
|
|
@provide_knx
|
|
@callback
|
|
def ws_group_monitor_info(
|
|
hass: HomeAssistant,
|
|
knx: KNXModule,
|
|
connection: websocket_api.ActiveConnection,
|
|
msg: dict,
|
|
) -> None:
|
|
"""Handle get info command of group monitor."""
|
|
recent_telegrams = [*knx.telegrams.recent_telegrams]
|
|
connection.send_result(
|
|
msg["id"],
|
|
{
|
|
"project_loaded": knx.project.loaded,
|
|
"recent_telegrams": recent_telegrams,
|
|
},
|
|
)
|
|
|
|
|
|
@websocket_api.require_admin
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "knx/group_telegrams",
|
|
}
|
|
)
|
|
@provide_knx
|
|
@callback
|
|
def ws_group_telegrams(
|
|
hass: HomeAssistant,
|
|
knx: KNXModule,
|
|
connection: websocket_api.ActiveConnection,
|
|
msg: dict,
|
|
) -> None:
|
|
"""Handle get group telegrams command."""
|
|
connection.send_result(
|
|
msg["id"],
|
|
knx.telegrams.last_ga_telegrams,
|
|
)
|
|
|
|
|
|
@websocket_api.require_admin
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "knx/subscribe_telegrams",
|
|
}
|
|
)
|
|
@callback
|
|
def ws_subscribe_telegram(
|
|
hass: HomeAssistant,
|
|
connection: websocket_api.ActiveConnection,
|
|
msg: dict,
|
|
) -> None:
|
|
"""Subscribe to incoming and outgoing KNX telegrams."""
|
|
|
|
@callback
|
|
def forward_telegram(_telegram: Telegram, telegram_dict: TelegramDict) -> None:
|
|
"""Forward telegram to websocket subscription."""
|
|
connection.send_event(
|
|
msg["id"],
|
|
telegram_dict,
|
|
)
|
|
|
|
connection.subscriptions[msg["id"]] = async_dispatcher_connect(
|
|
hass,
|
|
signal=SIGNAL_KNX_TELEGRAM,
|
|
target=forward_telegram,
|
|
)
|
|
connection.send_result(msg["id"])
|
|
|
|
|
|
@websocket_api.require_admin
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "knx/validate_entity",
|
|
**CREATE_ENTITY_BASE_SCHEMA,
|
|
}
|
|
)
|
|
@callback
|
|
def ws_validate_entity(
|
|
hass: HomeAssistant,
|
|
connection: websocket_api.ActiveConnection,
|
|
msg: dict,
|
|
) -> None:
|
|
"""Validate entity data."""
|
|
try:
|
|
validate_entity_data(msg)
|
|
except EntityStoreValidationException as exc:
|
|
connection.send_result(msg["id"], exc.validation_error)
|
|
return
|
|
connection.send_result(
|
|
msg["id"], EntityStoreValidationSuccess(success=True, entity_id=None)
|
|
)
|
|
|
|
|
|
@websocket_api.require_admin
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "knx/create_entity",
|
|
**CREATE_ENTITY_BASE_SCHEMA,
|
|
}
|
|
)
|
|
@websocket_api.async_response
|
|
@provide_knx
|
|
async def ws_create_entity(
|
|
hass: HomeAssistant,
|
|
knx: KNXModule,
|
|
connection: websocket_api.ActiveConnection,
|
|
msg: dict,
|
|
) -> None:
|
|
"""Create entity in entity store and load it."""
|
|
try:
|
|
validated_data = validate_entity_data(msg)
|
|
except EntityStoreValidationException as exc:
|
|
connection.send_result(msg["id"], exc.validation_error)
|
|
return
|
|
try:
|
|
entity_id = await knx.config_store.create_entity(
|
|
# use validation result so defaults are applied
|
|
validated_data[CONF_PLATFORM],
|
|
validated_data[CONF_DATA],
|
|
)
|
|
except ConfigStoreException as err:
|
|
connection.send_error(
|
|
msg["id"], websocket_api.const.ERR_HOME_ASSISTANT_ERROR, str(err)
|
|
)
|
|
return
|
|
connection.send_result(
|
|
msg["id"], EntityStoreValidationSuccess(success=True, entity_id=entity_id)
|
|
)
|
|
|
|
|
|
@websocket_api.require_admin
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "knx/update_entity",
|
|
**UPDATE_ENTITY_BASE_SCHEMA,
|
|
}
|
|
)
|
|
@websocket_api.async_response
|
|
@provide_knx
|
|
async def ws_update_entity(
|
|
hass: HomeAssistant,
|
|
knx: KNXModule,
|
|
connection: websocket_api.ActiveConnection,
|
|
msg: dict,
|
|
) -> None:
|
|
"""Update entity in entity store and reload it."""
|
|
try:
|
|
validated_data = validate_entity_data(msg)
|
|
except EntityStoreValidationException as exc:
|
|
connection.send_result(msg["id"], exc.validation_error)
|
|
return
|
|
try:
|
|
await knx.config_store.update_entity(
|
|
validated_data[CONF_PLATFORM],
|
|
validated_data[CONF_ENTITY_ID],
|
|
validated_data[CONF_DATA],
|
|
)
|
|
except ConfigStoreException as err:
|
|
connection.send_error(
|
|
msg["id"], websocket_api.const.ERR_HOME_ASSISTANT_ERROR, str(err)
|
|
)
|
|
return
|
|
connection.send_result(
|
|
msg["id"], EntityStoreValidationSuccess(success=True, entity_id=None)
|
|
)
|
|
|
|
|
|
@websocket_api.require_admin
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "knx/delete_entity",
|
|
vol.Required(CONF_ENTITY_ID): str,
|
|
}
|
|
)
|
|
@websocket_api.async_response
|
|
@provide_knx
|
|
async def ws_delete_entity(
|
|
hass: HomeAssistant,
|
|
knx: KNXModule,
|
|
connection: websocket_api.ActiveConnection,
|
|
msg: dict,
|
|
) -> None:
|
|
"""Delete entity from entity store and remove it."""
|
|
try:
|
|
await knx.config_store.delete_entity(msg[CONF_ENTITY_ID])
|
|
except ConfigStoreException as err:
|
|
connection.send_error(
|
|
msg["id"], websocket_api.const.ERR_HOME_ASSISTANT_ERROR, str(err)
|
|
)
|
|
return
|
|
connection.send_result(msg["id"])
|
|
|
|
|
|
@websocket_api.require_admin
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "knx/get_entity_entries",
|
|
}
|
|
)
|
|
@provide_knx
|
|
@callback
|
|
def ws_get_entity_entries(
|
|
hass: HomeAssistant,
|
|
knx: KNXModule,
|
|
connection: websocket_api.ActiveConnection,
|
|
msg: dict,
|
|
) -> None:
|
|
"""Get entities configured from entity store."""
|
|
entity_entries = [
|
|
entry.extended_dict for entry in knx.config_store.get_entity_entries()
|
|
]
|
|
connection.send_result(msg["id"], entity_entries)
|
|
|
|
|
|
@websocket_api.require_admin
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "knx/get_entity_config",
|
|
vol.Required(CONF_ENTITY_ID): str,
|
|
}
|
|
)
|
|
@provide_knx
|
|
@callback
|
|
def ws_get_entity_config(
|
|
hass: HomeAssistant,
|
|
knx: KNXModule,
|
|
connection: websocket_api.ActiveConnection,
|
|
msg: dict,
|
|
) -> None:
|
|
"""Get entity configuration from entity store."""
|
|
try:
|
|
config_info = knx.config_store.get_entity_config(msg[CONF_ENTITY_ID])
|
|
except ConfigStoreException as err:
|
|
connection.send_error(
|
|
msg["id"], websocket_api.const.ERR_HOME_ASSISTANT_ERROR, str(err)
|
|
)
|
|
return
|
|
connection.send_result(msg["id"], config_info)
|
|
|
|
|
|
@websocket_api.require_admin
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "knx/create_device",
|
|
vol.Required("name"): str,
|
|
vol.Optional("area_id"): str,
|
|
}
|
|
)
|
|
@provide_knx
|
|
@callback
|
|
def ws_create_device(
|
|
hass: HomeAssistant,
|
|
knx: KNXModule,
|
|
connection: websocket_api.ActiveConnection,
|
|
msg: dict,
|
|
) -> None:
|
|
"""Create a new KNX device."""
|
|
identifier = f"knx_vdev_{ulid_now()}"
|
|
device_registry = dr.async_get(hass)
|
|
_device = device_registry.async_get_or_create(
|
|
config_entry_id=knx.entry.entry_id,
|
|
manufacturer="KNX",
|
|
name=msg["name"],
|
|
identifiers={(DOMAIN, identifier)},
|
|
)
|
|
device_registry.async_update_device(
|
|
_device.id,
|
|
area_id=msg.get("area_id") or UNDEFINED,
|
|
configuration_url=f"homeassistant://knx/entities/view?device_id={_device.id}",
|
|
)
|
|
connection.send_result(msg["id"], _device.dict_repr)
|