mirror of https://github.com/home-assistant/core
385 lines
13 KiB
Python
385 lines
13 KiB
Python
"""Blueprint models."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from collections.abc import Awaitable, Callable
|
|
import logging
|
|
import pathlib
|
|
import shutil
|
|
from typing import Any
|
|
|
|
from awesomeversion import AwesomeVersion
|
|
import voluptuous as vol
|
|
from voluptuous.humanize import humanize_error
|
|
|
|
from homeassistant import loader
|
|
from homeassistant.const import (
|
|
CONF_DEFAULT,
|
|
CONF_DOMAIN,
|
|
CONF_NAME,
|
|
CONF_PATH,
|
|
__version__,
|
|
)
|
|
from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN, HomeAssistant, callback
|
|
from homeassistant.exceptions import HomeAssistantError
|
|
from homeassistant.util import yaml
|
|
|
|
from .const import (
|
|
BLUEPRINT_FOLDER,
|
|
CONF_BLUEPRINT,
|
|
CONF_HOMEASSISTANT,
|
|
CONF_INPUT,
|
|
CONF_MIN_VERSION,
|
|
CONF_SOURCE_URL,
|
|
CONF_USE_BLUEPRINT,
|
|
DOMAIN,
|
|
)
|
|
from .errors import (
|
|
BlueprintException,
|
|
BlueprintInUse,
|
|
FailedToLoad,
|
|
FileAlreadyExists,
|
|
InvalidBlueprint,
|
|
InvalidBlueprintInputs,
|
|
MissingInput,
|
|
)
|
|
from .schemas import BLUEPRINT_INSTANCE_FIELDS
|
|
|
|
|
|
class Blueprint:
|
|
"""Blueprint of a configuration structure."""
|
|
|
|
def __init__(
|
|
self,
|
|
data: dict[str, Any],
|
|
*,
|
|
path: str | None = None,
|
|
expected_domain: str | None = None,
|
|
schema: Callable[[Any], Any],
|
|
) -> None:
|
|
"""Initialize a blueprint."""
|
|
try:
|
|
data = self.data = schema(data)
|
|
except vol.Invalid as err:
|
|
raise InvalidBlueprint(expected_domain, path, data, err) from err
|
|
|
|
# In future, we will treat this as "incorrect" and allow to recover from this
|
|
data_domain = data[CONF_BLUEPRINT][CONF_DOMAIN]
|
|
if expected_domain is not None and data_domain != expected_domain:
|
|
raise InvalidBlueprint(
|
|
expected_domain,
|
|
path or self.name,
|
|
data,
|
|
(
|
|
f"Found incorrect blueprint type {data_domain}, expected"
|
|
f" {expected_domain}"
|
|
),
|
|
)
|
|
|
|
self.domain = data_domain
|
|
|
|
missing = yaml.extract_inputs(data) - set(self.inputs)
|
|
|
|
if missing:
|
|
raise InvalidBlueprint(
|
|
data_domain,
|
|
path or self.name,
|
|
data,
|
|
f"Missing input definition for {', '.join(missing)}",
|
|
)
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
"""Return blueprint name."""
|
|
return self.data[CONF_BLUEPRINT][CONF_NAME] # type: ignore[no-any-return]
|
|
|
|
@property
|
|
def inputs(self) -> dict[str, Any]:
|
|
"""Return flattened blueprint inputs."""
|
|
inputs = {}
|
|
for key, value in self.data[CONF_BLUEPRINT][CONF_INPUT].items():
|
|
if value and CONF_INPUT in value:
|
|
inputs.update(dict(value[CONF_INPUT]))
|
|
else:
|
|
inputs[key] = value
|
|
return inputs
|
|
|
|
@property
|
|
def metadata(self) -> dict[str, Any]:
|
|
"""Return blueprint metadata."""
|
|
return self.data[CONF_BLUEPRINT] # type: ignore[no-any-return]
|
|
|
|
def update_metadata(self, *, source_url: str | None = None) -> None:
|
|
"""Update metadata."""
|
|
if source_url is not None:
|
|
self.data[CONF_BLUEPRINT][CONF_SOURCE_URL] = source_url
|
|
|
|
def yaml(self) -> str:
|
|
"""Dump blueprint as YAML."""
|
|
return yaml.dump(self.data)
|
|
|
|
@callback
|
|
def validate(self) -> list[str] | None:
|
|
"""Test if the Home Assistant installation supports this blueprint.
|
|
|
|
Return list of errors if not valid.
|
|
"""
|
|
errors = []
|
|
metadata = self.metadata
|
|
min_version = metadata.get(CONF_HOMEASSISTANT, {}).get(CONF_MIN_VERSION)
|
|
|
|
if min_version is not None and AwesomeVersion(__version__) < AwesomeVersion(
|
|
min_version
|
|
):
|
|
errors.append(f"Requires at least Home Assistant {min_version}")
|
|
|
|
return errors or None
|
|
|
|
|
|
class BlueprintInputs:
|
|
"""Inputs for a blueprint."""
|
|
|
|
def __init__(
|
|
self, blueprint: Blueprint, config_with_inputs: dict[str, Any]
|
|
) -> None:
|
|
"""Instantiate a blueprint inputs object."""
|
|
self.blueprint = blueprint
|
|
self.config_with_inputs = config_with_inputs
|
|
|
|
@property
|
|
def inputs(self) -> dict[str, Any]:
|
|
"""Return the inputs."""
|
|
return self.config_with_inputs[CONF_USE_BLUEPRINT][CONF_INPUT] # type: ignore[no-any-return]
|
|
|
|
@property
|
|
def inputs_with_default(self) -> dict[str, Any]:
|
|
"""Return the inputs and fallback to defaults."""
|
|
no_input = set(self.blueprint.inputs) - set(self.inputs)
|
|
|
|
inputs_with_default = dict(self.inputs)
|
|
|
|
for inp in no_input:
|
|
blueprint_input = self.blueprint.inputs[inp]
|
|
if isinstance(blueprint_input, dict) and CONF_DEFAULT in blueprint_input:
|
|
inputs_with_default[inp] = blueprint_input[CONF_DEFAULT]
|
|
|
|
return inputs_with_default
|
|
|
|
def validate(self) -> None:
|
|
"""Validate the inputs."""
|
|
missing = set(self.blueprint.inputs) - set(self.inputs_with_default)
|
|
|
|
if missing:
|
|
raise MissingInput(self.blueprint.domain, self.blueprint.name, missing)
|
|
|
|
# In future we can see if entities are correct domain, areas exist etc
|
|
# using the new selector helper.
|
|
|
|
@callback
|
|
def async_substitute(self) -> dict:
|
|
"""Get the blueprint value with the inputs substituted."""
|
|
processed = yaml.substitute(self.blueprint.data, self.inputs_with_default)
|
|
combined = {**processed, **self.config_with_inputs}
|
|
# From config_with_inputs
|
|
combined.pop(CONF_USE_BLUEPRINT)
|
|
# From blueprint
|
|
combined.pop(CONF_BLUEPRINT)
|
|
return combined
|
|
|
|
|
|
class DomainBlueprints:
|
|
"""Blueprints for a specific domain."""
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
domain: str,
|
|
logger: logging.Logger,
|
|
blueprint_in_use: Callable[[HomeAssistant, str], bool],
|
|
reload_blueprint_consumers: Callable[[HomeAssistant, str], Awaitable[None]],
|
|
blueprint_schema: Callable[[Any], Any],
|
|
) -> None:
|
|
"""Initialize a domain blueprints instance."""
|
|
self.hass = hass
|
|
self.domain = domain
|
|
self.logger = logger
|
|
self._blueprint_in_use = blueprint_in_use
|
|
self._reload_blueprint_consumers = reload_blueprint_consumers
|
|
self._blueprints: dict[str, Blueprint | None] = {}
|
|
self._load_lock = asyncio.Lock()
|
|
self._blueprint_schema = blueprint_schema
|
|
|
|
hass.data.setdefault(DOMAIN, {})[domain] = self
|
|
|
|
@property
|
|
def blueprint_folder(self) -> pathlib.Path:
|
|
"""Return the blueprint folder."""
|
|
return pathlib.Path(self.hass.config.path(BLUEPRINT_FOLDER, self.domain))
|
|
|
|
async def async_reset_cache(self) -> None:
|
|
"""Reset the blueprint cache."""
|
|
async with self._load_lock:
|
|
self._blueprints = {}
|
|
|
|
def _load_blueprint(self, blueprint_path: str) -> Blueprint:
|
|
"""Load a blueprint."""
|
|
try:
|
|
blueprint_data = yaml.load_yaml_dict(self.blueprint_folder / blueprint_path)
|
|
except FileNotFoundError as err:
|
|
raise FailedToLoad(
|
|
self.domain,
|
|
blueprint_path,
|
|
FileNotFoundError(f"Unable to find {blueprint_path}"),
|
|
) from err
|
|
except HomeAssistantError as err:
|
|
raise FailedToLoad(self.domain, blueprint_path, err) from err
|
|
|
|
return Blueprint(
|
|
blueprint_data,
|
|
expected_domain=self.domain,
|
|
path=blueprint_path,
|
|
schema=self._blueprint_schema,
|
|
)
|
|
|
|
def _load_blueprints(self) -> dict[str, Blueprint | BlueprintException | None]:
|
|
"""Load all the blueprints."""
|
|
blueprint_folder = pathlib.Path(
|
|
self.hass.config.path(BLUEPRINT_FOLDER, self.domain)
|
|
)
|
|
results: dict[str, Blueprint | BlueprintException | None] = {}
|
|
|
|
for path in blueprint_folder.glob("**/*.yaml"):
|
|
blueprint_path = str(path.relative_to(blueprint_folder))
|
|
if self._blueprints.get(blueprint_path) is None:
|
|
try:
|
|
self._blueprints[blueprint_path] = self._load_blueprint(
|
|
blueprint_path
|
|
)
|
|
except BlueprintException as err:
|
|
self._blueprints[blueprint_path] = None
|
|
results[blueprint_path] = err
|
|
continue
|
|
|
|
results[blueprint_path] = self._blueprints[blueprint_path]
|
|
|
|
return results
|
|
|
|
async def async_get_blueprints(
|
|
self,
|
|
) -> dict[str, Blueprint | BlueprintException | None]:
|
|
"""Get all the blueprints."""
|
|
async with self._load_lock:
|
|
return await self.hass.async_add_executor_job(self._load_blueprints)
|
|
|
|
async def async_get_blueprint(self, blueprint_path: str) -> Blueprint:
|
|
"""Get a blueprint."""
|
|
|
|
def load_from_cache() -> Blueprint:
|
|
"""Load blueprint from cache."""
|
|
if (blueprint := self._blueprints[blueprint_path]) is None:
|
|
raise FailedToLoad(
|
|
self.domain,
|
|
blueprint_path,
|
|
FileNotFoundError(f"Unable to find {blueprint_path}"),
|
|
)
|
|
return blueprint
|
|
|
|
if blueprint_path in self._blueprints:
|
|
return load_from_cache()
|
|
|
|
async with self._load_lock:
|
|
# Check it again
|
|
if blueprint_path in self._blueprints:
|
|
return load_from_cache()
|
|
|
|
try:
|
|
blueprint = await self.hass.async_add_executor_job(
|
|
self._load_blueprint, blueprint_path
|
|
)
|
|
except FailedToLoad:
|
|
self._blueprints[blueprint_path] = None
|
|
raise
|
|
|
|
self._blueprints[blueprint_path] = blueprint
|
|
return blueprint
|
|
|
|
async def async_inputs_from_config(
|
|
self, config_with_blueprint: dict
|
|
) -> BlueprintInputs:
|
|
"""Process a blueprint config."""
|
|
try:
|
|
config_with_blueprint = BLUEPRINT_INSTANCE_FIELDS(config_with_blueprint)
|
|
except vol.Invalid as err:
|
|
raise InvalidBlueprintInputs(
|
|
self.domain, humanize_error(config_with_blueprint, err)
|
|
) from err
|
|
|
|
bp_conf = config_with_blueprint[CONF_USE_BLUEPRINT]
|
|
blueprint = await self.async_get_blueprint(bp_conf[CONF_PATH])
|
|
inputs = BlueprintInputs(blueprint, config_with_blueprint)
|
|
inputs.validate()
|
|
return inputs
|
|
|
|
async def async_remove_blueprint(self, blueprint_path: str) -> None:
|
|
"""Remove a blueprint file."""
|
|
if self._blueprint_in_use(self.hass, blueprint_path):
|
|
raise BlueprintInUse(self.domain, blueprint_path)
|
|
path = self.blueprint_folder / blueprint_path
|
|
await self.hass.async_add_executor_job(path.unlink)
|
|
self._blueprints[blueprint_path] = None
|
|
|
|
def _create_file(
|
|
self, blueprint: Blueprint, blueprint_path: str, allow_override: bool
|
|
) -> bool:
|
|
"""Create blueprint file.
|
|
|
|
Returns true if the action overrides an existing blueprint.
|
|
"""
|
|
|
|
path = pathlib.Path(
|
|
self.hass.config.path(BLUEPRINT_FOLDER, self.domain, blueprint_path)
|
|
)
|
|
exists = path.exists()
|
|
|
|
if not allow_override and exists:
|
|
raise FileAlreadyExists(self.domain, blueprint_path)
|
|
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(blueprint.yaml(), encoding="utf-8")
|
|
return exists
|
|
|
|
async def async_add_blueprint(
|
|
self, blueprint: Blueprint, blueprint_path: str, allow_override: bool = False
|
|
) -> bool:
|
|
"""Add a blueprint."""
|
|
overrides_existing = await self.hass.async_add_executor_job(
|
|
self._create_file, blueprint, blueprint_path, allow_override
|
|
)
|
|
|
|
self._blueprints[blueprint_path] = blueprint
|
|
|
|
if overrides_existing:
|
|
await self._reload_blueprint_consumers(self.hass, blueprint_path)
|
|
|
|
return overrides_existing
|
|
|
|
async def async_populate(self) -> None:
|
|
"""Create folder if it doesn't exist and populate with examples."""
|
|
if self._blueprints:
|
|
# If we have already loaded some blueprint the blueprint folder must exist
|
|
return
|
|
|
|
integration = await loader.async_get_integration(self.hass, self.domain)
|
|
|
|
def populate() -> None:
|
|
if self.blueprint_folder.exists():
|
|
return
|
|
|
|
shutil.copytree(
|
|
integration.file_path / BLUEPRINT_FOLDER,
|
|
self.blueprint_folder / HOMEASSISTANT_DOMAIN,
|
|
)
|
|
|
|
await self.hass.async_add_executor_job(populate)
|