core/homeassistant/components/config/config_entries.py

591 lines
20 KiB
Python

"""Http views to control the config manager."""
from __future__ import annotations
from collections.abc import Callable
from http import HTTPStatus
from typing import Any, NoReturn
from aiohttp import web
import aiohttp.web_exceptions
import voluptuous as vol
from homeassistant import config_entries, data_entry_flow
from homeassistant.auth.permissions.const import CAT_CONFIG_ENTRIES, POLICY_EDIT
from homeassistant.components import websocket_api
from homeassistant.components.http import KEY_HASS, HomeAssistantView, require_admin
from homeassistant.components.http.data_validator import RequestDataValidator
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import DependencyError, Unauthorized
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.data_entry_flow import (
FlowManagerIndexView,
FlowManagerResourceView,
)
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.json import json_fragment
from homeassistant.loader import (
Integration,
IntegrationNotFound,
async_get_config_flows,
async_get_integrations,
async_get_loaded_integration,
)
@callback
def async_setup(hass: HomeAssistant) -> bool:
"""Enable the Home Assistant views."""
hass.http.register_view(ConfigManagerEntryIndexView)
hass.http.register_view(ConfigManagerEntryResourceView)
hass.http.register_view(ConfigManagerEntryResourceReloadView)
hass.http.register_view(ConfigManagerFlowIndexView(hass.config_entries.flow))
hass.http.register_view(ConfigManagerFlowResourceView(hass.config_entries.flow))
hass.http.register_view(ConfigManagerAvailableFlowView)
hass.http.register_view(OptionManagerFlowIndexView(hass.config_entries.options))
hass.http.register_view(OptionManagerFlowResourceView(hass.config_entries.options))
websocket_api.async_register_command(hass, config_entries_get)
websocket_api.async_register_command(hass, config_entry_disable)
websocket_api.async_register_command(hass, config_entry_get_single)
websocket_api.async_register_command(hass, config_entry_update)
websocket_api.async_register_command(hass, config_entries_subscribe)
websocket_api.async_register_command(hass, config_entries_progress)
websocket_api.async_register_command(hass, ignore_config_flow)
return True
class ConfigManagerEntryIndexView(HomeAssistantView):
"""View to get available config entries."""
url = "/api/config/config_entries/entry"
name = "api:config:config_entries:entry"
async def get(self, request: web.Request) -> web.Response:
"""List available config entries."""
hass = request.app[KEY_HASS]
domain = None
if "domain" in request.query:
domain = request.query["domain"]
type_filter = None
if "type" in request.query:
type_filter = [request.query["type"]]
fragments = await _async_matching_config_entries_json_fragments(
hass, type_filter, domain
)
return self.json(fragments)
class ConfigManagerEntryResourceView(HomeAssistantView):
"""View to interact with a config entry."""
url = "/api/config/config_entries/entry/{entry_id}"
name = "api:config:config_entries:entry:resource"
async def delete(self, request: web.Request, entry_id: str) -> web.Response:
"""Delete a config entry."""
if not request["hass_user"].is_admin:
raise Unauthorized(config_entry_id=entry_id, permission="remove")
hass = request.app[KEY_HASS]
try:
result = await hass.config_entries.async_remove(entry_id)
except config_entries.UnknownEntry:
return self.json_message("Invalid entry specified", HTTPStatus.NOT_FOUND)
return self.json(result)
class ConfigManagerEntryResourceReloadView(HomeAssistantView):
"""View to reload a config entry."""
url = "/api/config/config_entries/entry/{entry_id}/reload"
name = "api:config:config_entries:entry:resource:reload"
async def post(self, request: web.Request, entry_id: str) -> web.Response:
"""Reload a config entry."""
if not request["hass_user"].is_admin:
raise Unauthorized(config_entry_id=entry_id, permission="remove")
hass = request.app[KEY_HASS]
entry = hass.config_entries.async_get_entry(entry_id)
if not entry:
return self.json_message("Invalid entry specified", HTTPStatus.NOT_FOUND)
assert isinstance(entry, config_entries.ConfigEntry)
try:
await hass.config_entries.async_reload(entry_id)
except config_entries.OperationNotAllowed:
return self.json_message("Entry cannot be reloaded", HTTPStatus.FORBIDDEN)
return self.json({"require_restart": not entry.state.recoverable})
def _prepare_config_flow_result_json(
result: data_entry_flow.FlowResult,
prepare_result_json: Callable[
[data_entry_flow.FlowResult], data_entry_flow.FlowResult
],
) -> data_entry_flow.FlowResult:
"""Convert result to JSON."""
if result["type"] != data_entry_flow.FlowResultType.CREATE_ENTRY:
return prepare_result_json(result)
data = result.copy()
entry: config_entries.ConfigEntry = data["result"]
data["result"] = entry.as_json_fragment
data.pop("data")
data.pop("context")
return data
class ConfigManagerFlowIndexView(
FlowManagerIndexView[config_entries.ConfigEntriesFlowManager]
):
"""View to create config flows."""
url = "/api/config/config_entries/flow"
name = "api:config:config_entries:flow"
async def get(self, request: web.Request) -> NoReturn:
"""Not implemented."""
raise aiohttp.web_exceptions.HTTPMethodNotAllowed("GET", ["POST"])
@require_admin(
error=Unauthorized(perm_category=CAT_CONFIG_ENTRIES, permission="add")
)
@RequestDataValidator(
vol.Schema(
{
vol.Required("handler"): vol.Any(str, list),
vol.Optional("show_advanced_options", default=False): cv.boolean,
vol.Optional("entry_id"): cv.string,
},
extra=vol.ALLOW_EXTRA,
)
)
async def post(self, request: web.Request, data: dict[str, Any]) -> web.Response:
"""Initialize a POST request for a config entry flow."""
return await self._post_impl(request, data)
async def _post_impl(
self, request: web.Request, data: dict[str, Any]
) -> web.Response:
"""Handle a POST request for a config entry flow."""
try:
return await super()._post_impl(request, data)
except DependencyError as exc:
return web.Response(
text=f"Failed dependencies {', '.join(exc.failed_dependencies)}",
status=HTTPStatus.BAD_REQUEST,
)
def get_context(self, data: dict[str, Any]) -> dict[str, Any]:
"""Return context."""
context = super().get_context(data)
context["source"] = config_entries.SOURCE_USER
if entry_id := data.get("entry_id"):
context["source"] = config_entries.SOURCE_RECONFIGURE
context["entry_id"] = entry_id
return context
def _prepare_result_json(
self, result: data_entry_flow.FlowResult
) -> data_entry_flow.FlowResult:
"""Convert result to JSON."""
return _prepare_config_flow_result_json(result, super()._prepare_result_json)
class ConfigManagerFlowResourceView(
FlowManagerResourceView[config_entries.ConfigEntriesFlowManager]
):
"""View to interact with the flow manager."""
url = "/api/config/config_entries/flow/{flow_id}"
name = "api:config:config_entries:flow:resource"
@require_admin(
error=Unauthorized(perm_category=CAT_CONFIG_ENTRIES, permission="add")
)
async def get(self, request: web.Request, /, flow_id: str) -> web.Response:
"""Get the current state of a data_entry_flow."""
return await super().get(request, flow_id)
@require_admin(
error=Unauthorized(perm_category=CAT_CONFIG_ENTRIES, permission="add")
)
async def post(self, request: web.Request, flow_id: str) -> web.Response:
"""Handle a POST request."""
return await super().post(request, flow_id)
def _prepare_result_json(
self, result: data_entry_flow.FlowResult
) -> data_entry_flow.FlowResult:
"""Convert result to JSON."""
return _prepare_config_flow_result_json(result, super()._prepare_result_json)
class ConfigManagerAvailableFlowView(HomeAssistantView):
"""View to query available flows."""
url = "/api/config/config_entries/flow_handlers"
name = "api:config:config_entries:flow_handlers"
async def get(self, request: web.Request) -> web.Response:
"""List available flow handlers."""
hass = request.app[KEY_HASS]
kwargs: dict[str, Any] = {}
if "type" in request.query:
kwargs["type_filter"] = request.query["type"]
return self.json(await async_get_config_flows(hass, **kwargs))
class OptionManagerFlowIndexView(
FlowManagerIndexView[config_entries.OptionsFlowManager]
):
"""View to create option flows."""
url = "/api/config/config_entries/options/flow"
name = "api:config:config_entries:option:flow"
@require_admin(
error=Unauthorized(perm_category=CAT_CONFIG_ENTRIES, permission=POLICY_EDIT)
)
async def post(self, request: web.Request) -> web.Response:
"""Handle a POST request.
handler in request is entry_id.
"""
return await super().post(request)
class OptionManagerFlowResourceView(
FlowManagerResourceView[config_entries.OptionsFlowManager]
):
"""View to interact with the option flow manager."""
url = "/api/config/config_entries/options/flow/{flow_id}"
name = "api:config:config_entries:options:flow:resource"
@require_admin(
error=Unauthorized(perm_category=CAT_CONFIG_ENTRIES, permission=POLICY_EDIT)
)
async def get(self, request: web.Request, /, flow_id: str) -> web.Response:
"""Get the current state of a data_entry_flow."""
return await super().get(request, flow_id)
@require_admin(
error=Unauthorized(perm_category=CAT_CONFIG_ENTRIES, permission=POLICY_EDIT)
)
async def post(self, request: web.Request, flow_id: str) -> web.Response:
"""Handle a POST request."""
return await super().post(request, flow_id)
@websocket_api.require_admin
@websocket_api.websocket_command({"type": "config_entries/flow/progress"})
def config_entries_progress(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict[str, Any],
) -> None:
"""List flows that are in progress but not started by a user.
Example of a non-user initiated flow is a discovered Hue hub that
requires user interaction to finish setup.
"""
connection.send_result(
msg["id"],
[
flw
for flw in hass.config_entries.flow.async_progress()
if flw["context"]["source"] != config_entries.SOURCE_USER
],
)
def send_entry_not_found(
connection: websocket_api.ActiveConnection, msg_id: int
) -> None:
"""Send Config entry not found error."""
connection.send_error(msg_id, websocket_api.ERR_NOT_FOUND, "Config entry not found")
def get_entry(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
entry_id: str,
msg_id: int,
) -> config_entries.ConfigEntry | None:
"""Get entry, send error message if it doesn't exist."""
if (entry := hass.config_entries.async_get_entry(entry_id)) is None:
send_entry_not_found(connection, msg_id)
return entry
@websocket_api.require_admin
@websocket_api.websocket_command(
{
"type": "config_entries/get_single",
"entry_id": str,
}
)
@websocket_api.async_response
async def config_entry_get_single(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict[str, Any],
) -> None:
"""Update config entry."""
entry = get_entry(hass, connection, msg["entry_id"], msg["id"])
if entry is None:
return
result = {"config_entry": entry.as_json_fragment}
connection.send_result(msg["id"], result)
@websocket_api.require_admin
@websocket_api.websocket_command(
{
"type": "config_entries/update",
"entry_id": str,
vol.Optional("title"): str,
vol.Optional("pref_disable_new_entities"): bool,
vol.Optional("pref_disable_polling"): bool,
}
)
@websocket_api.async_response
async def config_entry_update(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict[str, Any],
) -> None:
"""Update config entry."""
changes = dict(msg)
changes.pop("id")
changes.pop("type")
changes.pop("entry_id")
entry = get_entry(hass, connection, msg["entry_id"], msg["id"])
if entry is None:
return
old_disable_polling = entry.pref_disable_polling
hass.config_entries.async_update_entry(entry, **changes)
result = {
"config_entry": entry.as_json_fragment,
"require_restart": False,
}
initial_state = entry.state
if (
old_disable_polling != entry.pref_disable_polling
and initial_state is config_entries.ConfigEntryState.LOADED
):
if not await hass.config_entries.async_reload(entry.entry_id):
result["require_restart"] = (
entry.state is config_entries.ConfigEntryState.FAILED_UNLOAD
)
connection.send_result(msg["id"], result)
@websocket_api.require_admin
@websocket_api.websocket_command(
{
"type": "config_entries/disable",
"entry_id": str,
# We only allow setting disabled_by user via API.
# No Enum support like this in voluptuous, use .value
"disabled_by": vol.Any(config_entries.ConfigEntryDisabler.USER.value, None),
}
)
@websocket_api.async_response
async def config_entry_disable(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict[str, Any],
) -> None:
"""Disable config entry."""
if (disabled_by := msg["disabled_by"]) is not None:
disabled_by = config_entries.ConfigEntryDisabler(disabled_by)
success = False
try:
success = await hass.config_entries.async_set_disabled_by(
msg["entry_id"], disabled_by
)
except config_entries.OperationNotAllowed:
# Failed to unload the config entry
pass
except config_entries.UnknownEntry:
send_entry_not_found(connection, msg["id"])
return
result = {"require_restart": not success}
connection.send_result(msg["id"], result)
@websocket_api.require_admin
@websocket_api.websocket_command(
{"type": "config_entries/ignore_flow", "flow_id": str, "title": str}
)
@websocket_api.async_response
async def ignore_config_flow(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict[str, Any],
) -> None:
"""Ignore a config flow."""
flow = next(
(
flw
for flw in hass.config_entries.flow.async_progress()
if flw["flow_id"] == msg["flow_id"]
),
None,
)
if flow is None:
send_entry_not_found(connection, msg["id"])
return
if "unique_id" not in flow["context"]:
connection.send_error(
msg["id"], "no_unique_id", "Specified flow has no unique ID."
)
return
context = config_entries.ConfigFlowContext(source=config_entries.SOURCE_IGNORE)
if "discovery_key" in flow["context"]:
context["discovery_key"] = flow["context"]["discovery_key"]
await hass.config_entries.flow.async_init(
flow["handler"],
context=context,
data={"unique_id": flow["context"]["unique_id"], "title": msg["title"]},
)
connection.send_result(msg["id"])
@websocket_api.websocket_command(
{
vol.Required("type"): "config_entries/get",
vol.Optional("type_filter"): vol.All(cv.ensure_list, [str]),
vol.Optional("domain"): str,
}
)
@websocket_api.async_response
async def config_entries_get(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict[str, Any],
) -> None:
"""Return matching config entries by type and/or domain."""
fragments = await _async_matching_config_entries_json_fragments(
hass, msg.get("type_filter"), msg.get("domain")
)
connection.send_result(msg["id"], fragments)
@websocket_api.websocket_command(
{
vol.Required("type"): "config_entries/subscribe",
vol.Optional("type_filter"): vol.All(cv.ensure_list, [str]),
}
)
@websocket_api.async_response
async def config_entries_subscribe(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict[str, Any],
) -> None:
"""Subscribe to config entry updates."""
type_filter = msg.get("type_filter")
@callback
def async_forward_config_entry_changes(
change: config_entries.ConfigEntryChange, entry: config_entries.ConfigEntry
) -> None:
"""Forward config entry state events to websocket."""
if type_filter:
integration = async_get_loaded_integration(hass, entry.domain)
if integration.integration_type not in type_filter:
return
connection.send_message(
websocket_api.event_message(
msg["id"],
[
{
"type": change,
"entry": entry.as_json_fragment,
}
],
)
)
current_entries = await _async_matching_config_entries_json_fragments(
hass, type_filter, None
)
connection.subscriptions[msg["id"]] = async_dispatcher_connect(
hass,
config_entries.SIGNAL_CONFIG_ENTRY_CHANGED,
async_forward_config_entry_changes,
)
connection.send_result(msg["id"])
connection.send_message(
websocket_api.event_message(
msg["id"], [{"type": None, "entry": entry} for entry in current_entries]
)
)
async def _async_matching_config_entries_json_fragments(
hass: HomeAssistant, type_filter: list[str] | None, domain: str | None
) -> list[json_fragment]:
"""Return matching config entries by type and/or domain."""
if domain:
entries = hass.config_entries.async_entries(domain)
else:
entries = hass.config_entries.async_entries()
if not type_filter:
return [entry.as_json_fragment for entry in entries]
integrations: dict[str, Integration] = {}
# Fetch all the integrations so we can check their type
domains = {entry.domain for entry in entries}
for domain_key, integration_or_exc in (
await async_get_integrations(hass, domains)
).items():
if isinstance(integration_or_exc, Integration):
integrations[domain_key] = integration_or_exc
elif not isinstance(integration_or_exc, IntegrationNotFound):
raise integration_or_exc
# Filter out entries that don't match the type filter
# when only helpers are requested, also filter out entries
# from unknown integrations. This prevent them from showing
# up in the helpers UI.
filter_is_not_helper = type_filter != ["helper"]
filter_set = set(type_filter)
return [
entry.as_json_fragment
for entry in entries
# If the filter is not 'helper', we still include the integration
# even if its not returned from async_get_integrations for backwards
# compatibility.
if (
(integration := integrations.get(entry.domain))
and integration.integration_type in filter_set
)
or (filter_is_not_helper and entry.domain not in integrations)
]