mirror of https://github.com/home-assistant/core
311 lines
10 KiB
Python
311 lines
10 KiB
Python
"""Validate dependencies."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import contextlib
|
|
import json
|
|
import pathlib
|
|
import re
|
|
from typing import Any
|
|
|
|
import voluptuous as vol
|
|
from voluptuous.humanize import humanize_error
|
|
|
|
from homeassistant.const import CONF_SELECTOR
|
|
from homeassistant.exceptions import HomeAssistantError
|
|
from homeassistant.helpers import config_validation as cv, selector, service
|
|
from homeassistant.util.yaml import load_yaml_dict
|
|
|
|
from .model import Config, Integration
|
|
|
|
|
|
def exists(value: Any) -> Any:
|
|
"""Check if value exists."""
|
|
if value is None:
|
|
raise vol.Invalid("Value cannot be None")
|
|
return value
|
|
|
|
|
|
def unique_field_validator(fields: Any) -> Any:
|
|
"""Validate the inputs don't have duplicate keys under different sections."""
|
|
all_fields = set()
|
|
for key, value in fields.items():
|
|
if value and "fields" in value:
|
|
for key in value["fields"]:
|
|
if key in all_fields:
|
|
raise vol.Invalid(f"Duplicate use of field {key} in service.")
|
|
all_fields.add(key)
|
|
else:
|
|
if key in all_fields:
|
|
raise vol.Invalid(f"Duplicate use of field {key} in service.")
|
|
all_fields.add(key)
|
|
|
|
return fields
|
|
|
|
|
|
CORE_INTEGRATION_FIELD_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Optional("example"): exists,
|
|
vol.Optional("default"): exists,
|
|
vol.Optional("required"): bool,
|
|
vol.Optional("advanced"): bool,
|
|
vol.Optional(CONF_SELECTOR): selector.validate_selector,
|
|
vol.Optional("filter"): {
|
|
vol.Exclusive("attribute", "field_filter"): {
|
|
vol.Required(str): [vol.All(str, service.validate_attribute_option)],
|
|
},
|
|
vol.Exclusive("supported_features", "field_filter"): [
|
|
vol.All(str, service.validate_supported_feature)
|
|
],
|
|
},
|
|
}
|
|
)
|
|
|
|
CORE_INTEGRATION_SECTION_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Optional("collapsed"): bool,
|
|
vol.Required("fields"): vol.Schema({str: CORE_INTEGRATION_FIELD_SCHEMA}),
|
|
}
|
|
)
|
|
|
|
CUSTOM_INTEGRATION_FIELD_SCHEMA = CORE_INTEGRATION_FIELD_SCHEMA.extend(
|
|
{
|
|
vol.Optional("description"): str,
|
|
vol.Optional("name"): str,
|
|
}
|
|
)
|
|
|
|
CUSTOM_INTEGRATION_SECTION_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Optional("collapsed"): bool,
|
|
vol.Required("fields"): vol.Schema({str: CUSTOM_INTEGRATION_FIELD_SCHEMA}),
|
|
}
|
|
)
|
|
|
|
|
|
CORE_INTEGRATION_SERVICE_SCHEMA = vol.Any(
|
|
vol.Schema(
|
|
{
|
|
vol.Optional("target"): vol.Any(
|
|
selector.TargetSelector.CONFIG_SCHEMA, None
|
|
),
|
|
vol.Optional("fields"): vol.All(
|
|
vol.Schema(
|
|
{
|
|
str: vol.Any(
|
|
CORE_INTEGRATION_FIELD_SCHEMA,
|
|
CORE_INTEGRATION_SECTION_SCHEMA,
|
|
)
|
|
}
|
|
),
|
|
unique_field_validator,
|
|
),
|
|
}
|
|
),
|
|
None,
|
|
)
|
|
|
|
CUSTOM_INTEGRATION_SERVICE_SCHEMA = vol.Any(
|
|
vol.Schema(
|
|
{
|
|
vol.Optional("description"): str,
|
|
vol.Optional("name"): str,
|
|
vol.Optional("target"): vol.Any(
|
|
selector.TargetSelector.CONFIG_SCHEMA, None
|
|
),
|
|
vol.Optional("fields"): vol.All(
|
|
vol.Schema(
|
|
{
|
|
str: vol.Any(
|
|
CUSTOM_INTEGRATION_FIELD_SCHEMA,
|
|
CUSTOM_INTEGRATION_SECTION_SCHEMA,
|
|
)
|
|
}
|
|
),
|
|
unique_field_validator,
|
|
),
|
|
}
|
|
),
|
|
None,
|
|
)
|
|
|
|
CORE_INTEGRATION_SERVICES_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Remove(vol.All(str, service.starts_with_dot)): object,
|
|
cv.slug: CORE_INTEGRATION_SERVICE_SCHEMA,
|
|
}
|
|
)
|
|
CUSTOM_INTEGRATION_SERVICES_SCHEMA = vol.Schema(
|
|
{cv.slug: CUSTOM_INTEGRATION_SERVICE_SCHEMA}
|
|
)
|
|
|
|
VALIDATE_AS_CUSTOM_INTEGRATION = {
|
|
# Adding translations would be a breaking change
|
|
"foursquare",
|
|
}
|
|
|
|
|
|
def grep_dir(path: pathlib.Path, glob_pattern: str, search_pattern: str) -> bool:
|
|
"""Recursively go through a dir and it's children and find the regex."""
|
|
pattern = re.compile(search_pattern)
|
|
|
|
for fil in path.glob(glob_pattern):
|
|
if not fil.is_file():
|
|
continue
|
|
|
|
if pattern.search(fil.read_text()):
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def validate_services(config: Config, integration: Integration) -> None: # noqa: C901
|
|
"""Validate services."""
|
|
try:
|
|
data = load_yaml_dict(str(integration.path / "services.yaml"))
|
|
except FileNotFoundError:
|
|
# Find if integration uses services
|
|
has_services = grep_dir(
|
|
integration.path,
|
|
"**/*.py",
|
|
r"(hass\.services\.(register|async_register))|async_register_entity_service|async_register_admin_service",
|
|
)
|
|
|
|
if has_services:
|
|
integration.add_error(
|
|
"services", "Registers services but has no services.yaml"
|
|
)
|
|
return
|
|
except HomeAssistantError:
|
|
integration.add_error("services", "Invalid services.yaml")
|
|
return
|
|
|
|
try:
|
|
if (
|
|
integration.core
|
|
and integration.domain not in VALIDATE_AS_CUSTOM_INTEGRATION
|
|
):
|
|
services = CORE_INTEGRATION_SERVICES_SCHEMA(data)
|
|
else:
|
|
services = CUSTOM_INTEGRATION_SERVICES_SCHEMA(data)
|
|
except vol.Invalid as err:
|
|
integration.add_error(
|
|
"services", f"Invalid services.yaml: {humanize_error(data, err)}"
|
|
)
|
|
return
|
|
|
|
icons_file = integration.path / "icons.json"
|
|
icons = {}
|
|
if icons_file.is_file():
|
|
with contextlib.suppress(ValueError):
|
|
icons = json.loads(icons_file.read_text())
|
|
service_icons = icons.get("services", {})
|
|
|
|
# Try loading translation strings
|
|
if integration.core:
|
|
strings_file = integration.path / "strings.json"
|
|
else:
|
|
# For custom integrations, use the en.json file
|
|
strings_file = integration.path / "translations/en.json"
|
|
|
|
strings = {}
|
|
if strings_file.is_file():
|
|
with contextlib.suppress(ValueError):
|
|
strings = json.loads(strings_file.read_text())
|
|
|
|
error_msg_suffix = "in the translations file"
|
|
if not integration.core:
|
|
error_msg_suffix = f"and is not {error_msg_suffix}"
|
|
|
|
# For each service in the integration:
|
|
# 1. Check if the service description is set, if not,
|
|
# check if it's in the strings file else add an error.
|
|
# 2. Check if the service has an icon set in icons.json.
|
|
# raise an error if not.,
|
|
for service_name, service_schema in services.items():
|
|
if integration.core and service_name not in service_icons:
|
|
# This is enforced for Core integrations only
|
|
integration.add_error(
|
|
"services",
|
|
f"Service {service_name} has no icon in icons.json.",
|
|
)
|
|
if service_schema is None:
|
|
continue
|
|
if "name" not in service_schema:
|
|
try:
|
|
strings["services"][service_name]["name"]
|
|
except KeyError:
|
|
integration.add_error(
|
|
"services",
|
|
f"Service {service_name} has no name {error_msg_suffix}",
|
|
)
|
|
|
|
if "description" not in service_schema:
|
|
try:
|
|
strings["services"][service_name]["description"]
|
|
except KeyError:
|
|
integration.add_error(
|
|
"services",
|
|
f"Service {service_name} has no description {error_msg_suffix}",
|
|
)
|
|
|
|
# The same check is done for the description in each of the fields of the
|
|
# service schema.
|
|
for field_name, field_schema in service_schema.get("fields", {}).items():
|
|
if "fields" in field_schema:
|
|
# This is a section
|
|
continue
|
|
if "name" not in field_schema:
|
|
try:
|
|
strings["services"][service_name]["fields"][field_name]["name"]
|
|
except KeyError:
|
|
integration.add_error(
|
|
"services",
|
|
f"Service {service_name} has a field {field_name} with no name {error_msg_suffix}",
|
|
)
|
|
|
|
if "description" not in field_schema:
|
|
try:
|
|
strings["services"][service_name]["fields"][field_name][
|
|
"description"
|
|
]
|
|
except KeyError:
|
|
integration.add_error(
|
|
"services",
|
|
f"Service {service_name} has a field {field_name} with no description {error_msg_suffix}",
|
|
)
|
|
|
|
if "selector" in field_schema:
|
|
with contextlib.suppress(KeyError):
|
|
translation_key = field_schema["selector"]["select"][
|
|
"translation_key"
|
|
]
|
|
try:
|
|
strings["selector"][translation_key]
|
|
except KeyError:
|
|
integration.add_error(
|
|
"services",
|
|
f"Service {service_name} has a field {field_name} with a selector with a translation key {translation_key} that is not in the translations file",
|
|
)
|
|
|
|
# The same check is done for the description in each of the sections of the
|
|
# service schema.
|
|
for section_name, section_schema in service_schema.get("fields", {}).items():
|
|
if "fields" not in section_schema:
|
|
# This is not a section
|
|
continue
|
|
try:
|
|
strings["services"][service_name]["sections"][section_name]["name"]
|
|
except KeyError:
|
|
integration.add_error(
|
|
"services",
|
|
f"Service {service_name} has a section {section_name} with no name {error_msg_suffix}",
|
|
)
|
|
|
|
|
|
def validate(integrations: dict[str, Integration], config: Config) -> None:
|
|
"""Handle dependencies for integrations."""
|
|
# check services.yaml is cool
|
|
for integration in integrations.values():
|
|
validate_services(config, integration)
|