mirror of https://github.com/home-assistant/core
495 lines
17 KiB
Python
495 lines
17 KiB
Python
"""Translation string lookup helpers."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from collections.abc import Iterable, Mapping
|
|
from contextlib import suppress
|
|
from dataclasses import dataclass
|
|
import logging
|
|
import pathlib
|
|
import string
|
|
from typing import Any
|
|
|
|
from homeassistant.const import (
|
|
EVENT_CORE_CONFIG_UPDATE,
|
|
STATE_UNAVAILABLE,
|
|
STATE_UNKNOWN,
|
|
)
|
|
from homeassistant.core import Event, HomeAssistant, async_get_hass, callback
|
|
from homeassistant.loader import (
|
|
Integration,
|
|
async_get_config_flows,
|
|
async_get_integrations,
|
|
bind_hass,
|
|
)
|
|
from homeassistant.util.json import load_json
|
|
|
|
from . import singleton
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
TRANSLATION_FLATTEN_CACHE = "translation_flatten_cache"
|
|
LOCALE_EN = "en"
|
|
|
|
|
|
def recursive_flatten(
|
|
prefix: str, data: dict[str, dict[str, Any] | str]
|
|
) -> dict[str, str]:
|
|
"""Return a flattened representation of dict data."""
|
|
output: dict[str, str] = {}
|
|
for key, value in data.items():
|
|
if isinstance(value, dict):
|
|
output.update(recursive_flatten(f"{prefix}{key}.", value))
|
|
else:
|
|
output[f"{prefix}{key}"] = value
|
|
return output
|
|
|
|
|
|
def _load_translations_files_by_language(
|
|
translation_files: dict[str, dict[str, pathlib.Path]],
|
|
) -> dict[str, dict[str, Any]]:
|
|
"""Load and parse translation.json files."""
|
|
loaded: dict[str, dict[str, Any]] = {}
|
|
for language, component_translation_file in translation_files.items():
|
|
loaded_for_language: dict[str, Any] = {}
|
|
loaded[language] = loaded_for_language
|
|
|
|
for component, translation_file in component_translation_file.items():
|
|
loaded_json = load_json(translation_file)
|
|
|
|
if not isinstance(loaded_json, dict):
|
|
_LOGGER.warning(
|
|
"Translation file is unexpected type %s. Expected dict for %s",
|
|
type(loaded_json),
|
|
translation_file,
|
|
)
|
|
continue
|
|
|
|
loaded_for_language[component] = loaded_json
|
|
|
|
return loaded
|
|
|
|
|
|
def build_resources(
|
|
translation_strings: dict[str, dict[str, dict[str, Any] | str]],
|
|
components: set[str],
|
|
category: str,
|
|
) -> dict[str, dict[str, Any] | str]:
|
|
"""Build the resources response for the given components."""
|
|
# Build response
|
|
return {
|
|
component: category_strings
|
|
for component in components
|
|
if (component_strings := translation_strings.get(component))
|
|
and (category_strings := component_strings.get(category))
|
|
}
|
|
|
|
|
|
async def _async_get_component_strings(
|
|
hass: HomeAssistant,
|
|
languages: Iterable[str],
|
|
components: set[str],
|
|
integrations: dict[str, Integration],
|
|
) -> dict[str, dict[str, Any]]:
|
|
"""Load translations."""
|
|
translations_by_language: dict[str, dict[str, Any]] = {}
|
|
# Determine paths of missing components/platforms
|
|
files_to_load_by_language: dict[str, dict[str, pathlib.Path]] = {}
|
|
loaded_translations_by_language: dict[str, dict[str, Any]] = {}
|
|
has_files_to_load = False
|
|
for language in languages:
|
|
file_name = f"{language}.json"
|
|
files_to_load: dict[str, pathlib.Path] = {
|
|
domain: integration.file_path / "translations" / file_name
|
|
for domain in components
|
|
if (
|
|
(integration := integrations.get(domain))
|
|
and integration.has_translations
|
|
)
|
|
}
|
|
files_to_load_by_language[language] = files_to_load
|
|
has_files_to_load |= bool(files_to_load)
|
|
|
|
if has_files_to_load:
|
|
loaded_translations_by_language = await hass.async_add_executor_job(
|
|
_load_translations_files_by_language, files_to_load_by_language
|
|
)
|
|
|
|
for language in languages:
|
|
loaded_translations = loaded_translations_by_language.setdefault(language, {})
|
|
for domain in components:
|
|
# Translations that miss "title" will get integration put in.
|
|
component_translations = loaded_translations.setdefault(domain, {})
|
|
if "title" not in component_translations and (
|
|
integration := integrations.get(domain)
|
|
):
|
|
component_translations["title"] = integration.name
|
|
|
|
translations_by_language.setdefault(language, {}).update(loaded_translations)
|
|
|
|
return translations_by_language
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class _TranslationsCacheData:
|
|
"""Data for the translation cache.
|
|
|
|
This class contains data that is designed to be shared
|
|
between multiple instances of the translation cache so
|
|
we only have to load the data once.
|
|
"""
|
|
|
|
loaded: dict[str, set[str]]
|
|
cache: dict[str, dict[str, dict[str, dict[str, str]]]]
|
|
|
|
|
|
class _TranslationCache:
|
|
"""Cache for flattened translations."""
|
|
|
|
__slots__ = ("hass", "cache_data", "lock")
|
|
|
|
def __init__(self, hass: HomeAssistant) -> None:
|
|
"""Initialize the cache."""
|
|
self.hass = hass
|
|
self.cache_data = _TranslationsCacheData({}, {})
|
|
self.lock = asyncio.Lock()
|
|
|
|
@callback
|
|
def async_is_loaded(self, language: str, components: set[str]) -> bool:
|
|
"""Return if the given components are loaded for the language."""
|
|
return components.issubset(self.cache_data.loaded.get(language, set()))
|
|
|
|
async def async_load(
|
|
self,
|
|
language: str,
|
|
components: set[str],
|
|
) -> None:
|
|
"""Load resources into the cache."""
|
|
loaded = self.cache_data.loaded.setdefault(language, set())
|
|
if components_to_load := components - loaded:
|
|
# Translations are never unloaded so if there are no components to load
|
|
# we can skip the lock which reduces contention when multiple different
|
|
# translations categories are being fetched at the same time which is
|
|
# common from the frontend.
|
|
async with self.lock:
|
|
# Check components to load again, as another task might have loaded
|
|
# them while we were waiting for the lock.
|
|
if components_to_load := components - loaded:
|
|
await self._async_load(language, components_to_load)
|
|
|
|
async def async_fetch(
|
|
self,
|
|
language: str,
|
|
category: str,
|
|
components: set[str],
|
|
) -> dict[str, str]:
|
|
"""Load resources into the cache and return them."""
|
|
await self.async_load(language, components)
|
|
|
|
return self.get_cached(language, category, components)
|
|
|
|
def get_cached(
|
|
self,
|
|
language: str,
|
|
category: str,
|
|
components: set[str],
|
|
) -> dict[str, str]:
|
|
"""Read resources from the cache."""
|
|
category_cache = self.cache_data.cache.get(language, {}).get(category, {})
|
|
# If only one component was requested, return it directly
|
|
# to avoid merging the dictionaries and keeping additional
|
|
# copies of the same data in memory.
|
|
if len(components) == 1 and (component := next(iter(components))):
|
|
return category_cache.get(component, {})
|
|
|
|
result: dict[str, str] = {}
|
|
for component in components.intersection(category_cache):
|
|
result.update(category_cache[component])
|
|
return result
|
|
|
|
async def _async_load(self, language: str, components: set[str]) -> None:
|
|
"""Populate the cache for a given set of components."""
|
|
loaded = self.cache_data.loaded
|
|
_LOGGER.debug(
|
|
"Cache miss for %s: %s",
|
|
language,
|
|
components,
|
|
)
|
|
# Fetch the English resources, as a fallback for missing keys
|
|
languages = [LOCALE_EN] if language == LOCALE_EN else [LOCALE_EN, language]
|
|
|
|
integrations: dict[str, Integration] = {}
|
|
ints_or_excs = await async_get_integrations(self.hass, components)
|
|
for domain, int_or_exc in ints_or_excs.items():
|
|
if isinstance(int_or_exc, Exception):
|
|
_LOGGER.warning(
|
|
"Failed to load integration for translation: %s", int_or_exc
|
|
)
|
|
continue
|
|
integrations[domain] = int_or_exc
|
|
|
|
translation_by_language_strings = await _async_get_component_strings(
|
|
self.hass, languages, components, integrations
|
|
)
|
|
|
|
# English is always the fallback language so we load them first
|
|
self._build_category_cache(
|
|
language, components, translation_by_language_strings[LOCALE_EN]
|
|
)
|
|
|
|
if language != LOCALE_EN:
|
|
# Now overlay the requested language on top of the English
|
|
self._build_category_cache(
|
|
language, components, translation_by_language_strings[language]
|
|
)
|
|
|
|
loaded_english_components = loaded.setdefault(LOCALE_EN, set())
|
|
# Since we just loaded english anyway we can avoid loading
|
|
# again if they switch back to english.
|
|
if loaded_english_components.isdisjoint(components):
|
|
self._build_category_cache(
|
|
LOCALE_EN, components, translation_by_language_strings[LOCALE_EN]
|
|
)
|
|
loaded_english_components.update(components)
|
|
|
|
loaded[language].update(components)
|
|
|
|
def _validate_placeholders(
|
|
self,
|
|
language: str,
|
|
updated_resources: dict[str, str],
|
|
cached_resources: dict[str, str] | None = None,
|
|
) -> dict[str, str]:
|
|
"""Validate if updated resources have same placeholders as cached resources."""
|
|
if cached_resources is None:
|
|
return updated_resources
|
|
|
|
mismatches: set[str] = set()
|
|
|
|
for key, value in updated_resources.items():
|
|
if key not in cached_resources:
|
|
continue
|
|
try:
|
|
tuples = list(string.Formatter().parse(value))
|
|
except ValueError:
|
|
_LOGGER.error(
|
|
("Error while parsing localized (%s) string %s"), language, key
|
|
)
|
|
continue
|
|
updated_placeholders = {tup[1] for tup in tuples if tup[1] is not None}
|
|
|
|
tuples = list(string.Formatter().parse(cached_resources[key]))
|
|
cached_placeholders = {tup[1] for tup in tuples if tup[1] is not None}
|
|
if updated_placeholders != cached_placeholders:
|
|
_LOGGER.error(
|
|
(
|
|
"Validation of translation placeholders for localized (%s) string "
|
|
"%s failed: (%s != %s)"
|
|
),
|
|
language,
|
|
key,
|
|
updated_placeholders,
|
|
cached_placeholders,
|
|
)
|
|
mismatches.add(key)
|
|
|
|
for mismatch in mismatches:
|
|
del updated_resources[mismatch]
|
|
|
|
return updated_resources
|
|
|
|
@callback
|
|
def _build_category_cache(
|
|
self,
|
|
language: str,
|
|
components: set[str],
|
|
translation_strings: dict[str, dict[str, Any]],
|
|
) -> None:
|
|
"""Extract resources into the cache."""
|
|
resource: dict[str, Any] | str
|
|
cached = self.cache_data.cache.setdefault(language, {})
|
|
categories = {
|
|
category
|
|
for component in translation_strings.values()
|
|
for category in component
|
|
}
|
|
|
|
for category in categories:
|
|
new_resources = build_resources(translation_strings, components, category)
|
|
category_cache = cached.setdefault(category, {})
|
|
|
|
for component, resource in new_resources.items():
|
|
component_cache = category_cache.setdefault(component, {})
|
|
|
|
if not isinstance(resource, dict):
|
|
component_cache[f"component.{component}.{category}"] = resource
|
|
continue
|
|
|
|
prefix = f"component.{component}.{category}."
|
|
flat = recursive_flatten(prefix, resource)
|
|
flat = self._validate_placeholders(language, flat, component_cache)
|
|
component_cache.update(flat)
|
|
|
|
|
|
@bind_hass
|
|
async def async_get_translations(
|
|
hass: HomeAssistant,
|
|
language: str,
|
|
category: str,
|
|
integrations: Iterable[str] | None = None,
|
|
config_flow: bool | None = None,
|
|
) -> dict[str, str]:
|
|
"""Return all backend translations.
|
|
|
|
If integration is specified, load it for that one.
|
|
Otherwise, default to loaded integrations combined with config flow
|
|
integrations if config_flow is true.
|
|
"""
|
|
if integrations is None and config_flow:
|
|
components = (await async_get_config_flows(hass)) - hass.config.components
|
|
elif integrations is not None:
|
|
components = set(integrations)
|
|
else:
|
|
components = hass.config.top_level_components
|
|
|
|
return await _async_get_translations_cache(hass).async_fetch(
|
|
language, category, components
|
|
)
|
|
|
|
|
|
@callback
|
|
def async_get_cached_translations(
|
|
hass: HomeAssistant,
|
|
language: str,
|
|
category: str,
|
|
integration: str | None = None,
|
|
) -> dict[str, str]:
|
|
"""Return all cached backend translations.
|
|
|
|
If integration is specified, return translations for it.
|
|
Otherwise, default to all loaded integrations.
|
|
"""
|
|
components = {integration} if integration else hass.config.top_level_components
|
|
return _async_get_translations_cache(hass).get_cached(
|
|
language, category, components
|
|
)
|
|
|
|
|
|
@singleton.singleton(TRANSLATION_FLATTEN_CACHE)
|
|
def _async_get_translations_cache(hass: HomeAssistant) -> _TranslationCache:
|
|
"""Return the translation cache."""
|
|
return _TranslationCache(hass)
|
|
|
|
|
|
@callback
|
|
def async_setup(hass: HomeAssistant) -> None:
|
|
"""Create translation cache and register listeners for translation loaders.
|
|
|
|
Listeners load translations for every loaded component and after config change.
|
|
"""
|
|
cache = _TranslationCache(hass)
|
|
current_language = hass.config.language
|
|
_async_get_translations_cache(hass)
|
|
|
|
@callback
|
|
def _async_load_translations_filter(event_data: Mapping[str, Any]) -> bool:
|
|
"""Filter out unwanted events."""
|
|
nonlocal current_language
|
|
if (
|
|
new_language := event_data.get("language")
|
|
) and new_language != current_language:
|
|
current_language = new_language
|
|
return True
|
|
return False
|
|
|
|
async def _async_load_translations(event: Event) -> None:
|
|
new_language = event.data["language"]
|
|
_LOGGER.debug("Loading translations for language: %s", new_language)
|
|
await cache.async_load(new_language, hass.config.components)
|
|
|
|
hass.bus.async_listen(
|
|
EVENT_CORE_CONFIG_UPDATE,
|
|
_async_load_translations,
|
|
event_filter=_async_load_translations_filter,
|
|
)
|
|
|
|
|
|
async def async_load_integrations(hass: HomeAssistant, integrations: set[str]) -> None:
|
|
"""Load translations for integrations."""
|
|
await _async_get_translations_cache(hass).async_load(
|
|
hass.config.language, integrations
|
|
)
|
|
|
|
|
|
@callback
|
|
def async_translations_loaded(hass: HomeAssistant, components: set[str]) -> bool:
|
|
"""Return if the given components are loaded for the language."""
|
|
return _async_get_translations_cache(hass).async_is_loaded(
|
|
hass.config.language, components
|
|
)
|
|
|
|
|
|
@callback
|
|
def async_get_exception_message(
|
|
translation_domain: str,
|
|
translation_key: str,
|
|
translation_placeholders: dict[str, str] | None = None,
|
|
) -> str:
|
|
"""Return a translated exception message.
|
|
|
|
Defaults to English, requires translations to already be cached.
|
|
"""
|
|
language = "en"
|
|
hass = async_get_hass()
|
|
localize_key = (
|
|
f"component.{translation_domain}.exceptions.{translation_key}.message"
|
|
)
|
|
translations = async_get_cached_translations(hass, language, "exceptions")
|
|
if localize_key in translations:
|
|
if message := translations[localize_key]:
|
|
message = message.rstrip(".")
|
|
if not translation_placeholders:
|
|
return message
|
|
with suppress(KeyError):
|
|
message = message.format(**translation_placeholders)
|
|
return message
|
|
|
|
# We return the translation key when was not found in the cache
|
|
return translation_key
|
|
|
|
|
|
@callback
|
|
def async_translate_state(
|
|
hass: HomeAssistant,
|
|
state: str,
|
|
domain: str,
|
|
platform: str | None,
|
|
translation_key: str | None,
|
|
device_class: str | None,
|
|
) -> str:
|
|
"""Translate provided state using cached translations for currently selected language."""
|
|
if state in [STATE_UNAVAILABLE, STATE_UNKNOWN]:
|
|
return state
|
|
language = hass.config.language
|
|
if platform is not None and translation_key is not None:
|
|
localize_key = (
|
|
f"component.{platform}.entity.{domain}.{translation_key}.state.{state}"
|
|
)
|
|
translations = async_get_cached_translations(hass, language, "entity")
|
|
if localize_key in translations:
|
|
return translations[localize_key]
|
|
|
|
translations = async_get_cached_translations(hass, language, "entity_component")
|
|
if device_class is not None:
|
|
localize_key = (
|
|
f"component.{domain}.entity_component.{device_class}.state.{state}"
|
|
)
|
|
if localize_key in translations:
|
|
return translations[localize_key]
|
|
localize_key = f"component.{domain}.entity_component._.state.{state}"
|
|
if localize_key in translations:
|
|
return translations[localize_key]
|
|
|
|
return state
|