core/homeassistant/helpers/translation.py

495 lines
17 KiB
Python

"""Translation string lookup helpers."""
from __future__ import annotations
import asyncio
from collections.abc import Iterable, Mapping
from contextlib import suppress
from dataclasses import dataclass
import logging
import pathlib
import string
from typing import Any
from homeassistant.const import (
EVENT_CORE_CONFIG_UPDATE,
STATE_UNAVAILABLE,
STATE_UNKNOWN,
)
from homeassistant.core import Event, HomeAssistant, async_get_hass, callback
from homeassistant.loader import (
Integration,
async_get_config_flows,
async_get_integrations,
bind_hass,
)
from homeassistant.util.json import load_json
from . import singleton
_LOGGER = logging.getLogger(__name__)
TRANSLATION_FLATTEN_CACHE = "translation_flatten_cache"
LOCALE_EN = "en"
def recursive_flatten(
prefix: str, data: dict[str, dict[str, Any] | str]
) -> dict[str, str]:
"""Return a flattened representation of dict data."""
output: dict[str, str] = {}
for key, value in data.items():
if isinstance(value, dict):
output.update(recursive_flatten(f"{prefix}{key}.", value))
else:
output[f"{prefix}{key}"] = value
return output
def _load_translations_files_by_language(
translation_files: dict[str, dict[str, pathlib.Path]],
) -> dict[str, dict[str, Any]]:
"""Load and parse translation.json files."""
loaded: dict[str, dict[str, Any]] = {}
for language, component_translation_file in translation_files.items():
loaded_for_language: dict[str, Any] = {}
loaded[language] = loaded_for_language
for component, translation_file in component_translation_file.items():
loaded_json = load_json(translation_file)
if not isinstance(loaded_json, dict):
_LOGGER.warning(
"Translation file is unexpected type %s. Expected dict for %s",
type(loaded_json),
translation_file,
)
continue
loaded_for_language[component] = loaded_json
return loaded
def build_resources(
translation_strings: dict[str, dict[str, dict[str, Any] | str]],
components: set[str],
category: str,
) -> dict[str, dict[str, Any] | str]:
"""Build the resources response for the given components."""
# Build response
return {
component: category_strings
for component in components
if (component_strings := translation_strings.get(component))
and (category_strings := component_strings.get(category))
}
async def _async_get_component_strings(
hass: HomeAssistant,
languages: Iterable[str],
components: set[str],
integrations: dict[str, Integration],
) -> dict[str, dict[str, Any]]:
"""Load translations."""
translations_by_language: dict[str, dict[str, Any]] = {}
# Determine paths of missing components/platforms
files_to_load_by_language: dict[str, dict[str, pathlib.Path]] = {}
loaded_translations_by_language: dict[str, dict[str, Any]] = {}
has_files_to_load = False
for language in languages:
file_name = f"{language}.json"
files_to_load: dict[str, pathlib.Path] = {
domain: integration.file_path / "translations" / file_name
for domain in components
if (
(integration := integrations.get(domain))
and integration.has_translations
)
}
files_to_load_by_language[language] = files_to_load
has_files_to_load |= bool(files_to_load)
if has_files_to_load:
loaded_translations_by_language = await hass.async_add_executor_job(
_load_translations_files_by_language, files_to_load_by_language
)
for language in languages:
loaded_translations = loaded_translations_by_language.setdefault(language, {})
for domain in components:
# Translations that miss "title" will get integration put in.
component_translations = loaded_translations.setdefault(domain, {})
if "title" not in component_translations and (
integration := integrations.get(domain)
):
component_translations["title"] = integration.name
translations_by_language.setdefault(language, {}).update(loaded_translations)
return translations_by_language
@dataclass(slots=True)
class _TranslationsCacheData:
"""Data for the translation cache.
This class contains data that is designed to be shared
between multiple instances of the translation cache so
we only have to load the data once.
"""
loaded: dict[str, set[str]]
cache: dict[str, dict[str, dict[str, dict[str, str]]]]
class _TranslationCache:
"""Cache for flattened translations."""
__slots__ = ("hass", "cache_data", "lock")
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize the cache."""
self.hass = hass
self.cache_data = _TranslationsCacheData({}, {})
self.lock = asyncio.Lock()
@callback
def async_is_loaded(self, language: str, components: set[str]) -> bool:
"""Return if the given components are loaded for the language."""
return components.issubset(self.cache_data.loaded.get(language, set()))
async def async_load(
self,
language: str,
components: set[str],
) -> None:
"""Load resources into the cache."""
loaded = self.cache_data.loaded.setdefault(language, set())
if components_to_load := components - loaded:
# Translations are never unloaded so if there are no components to load
# we can skip the lock which reduces contention when multiple different
# translations categories are being fetched at the same time which is
# common from the frontend.
async with self.lock:
# Check components to load again, as another task might have loaded
# them while we were waiting for the lock.
if components_to_load := components - loaded:
await self._async_load(language, components_to_load)
async def async_fetch(
self,
language: str,
category: str,
components: set[str],
) -> dict[str, str]:
"""Load resources into the cache and return them."""
await self.async_load(language, components)
return self.get_cached(language, category, components)
def get_cached(
self,
language: str,
category: str,
components: set[str],
) -> dict[str, str]:
"""Read resources from the cache."""
category_cache = self.cache_data.cache.get(language, {}).get(category, {})
# If only one component was requested, return it directly
# to avoid merging the dictionaries and keeping additional
# copies of the same data in memory.
if len(components) == 1 and (component := next(iter(components))):
return category_cache.get(component, {})
result: dict[str, str] = {}
for component in components.intersection(category_cache):
result.update(category_cache[component])
return result
async def _async_load(self, language: str, components: set[str]) -> None:
"""Populate the cache for a given set of components."""
loaded = self.cache_data.loaded
_LOGGER.debug(
"Cache miss for %s: %s",
language,
components,
)
# Fetch the English resources, as a fallback for missing keys
languages = [LOCALE_EN] if language == LOCALE_EN else [LOCALE_EN, language]
integrations: dict[str, Integration] = {}
ints_or_excs = await async_get_integrations(self.hass, components)
for domain, int_or_exc in ints_or_excs.items():
if isinstance(int_or_exc, Exception):
_LOGGER.warning(
"Failed to load integration for translation: %s", int_or_exc
)
continue
integrations[domain] = int_or_exc
translation_by_language_strings = await _async_get_component_strings(
self.hass, languages, components, integrations
)
# English is always the fallback language so we load them first
self._build_category_cache(
language, components, translation_by_language_strings[LOCALE_EN]
)
if language != LOCALE_EN:
# Now overlay the requested language on top of the English
self._build_category_cache(
language, components, translation_by_language_strings[language]
)
loaded_english_components = loaded.setdefault(LOCALE_EN, set())
# Since we just loaded english anyway we can avoid loading
# again if they switch back to english.
if loaded_english_components.isdisjoint(components):
self._build_category_cache(
LOCALE_EN, components, translation_by_language_strings[LOCALE_EN]
)
loaded_english_components.update(components)
loaded[language].update(components)
def _validate_placeholders(
self,
language: str,
updated_resources: dict[str, str],
cached_resources: dict[str, str] | None = None,
) -> dict[str, str]:
"""Validate if updated resources have same placeholders as cached resources."""
if cached_resources is None:
return updated_resources
mismatches: set[str] = set()
for key, value in updated_resources.items():
if key not in cached_resources:
continue
try:
tuples = list(string.Formatter().parse(value))
except ValueError:
_LOGGER.error(
("Error while parsing localized (%s) string %s"), language, key
)
continue
updated_placeholders = {tup[1] for tup in tuples if tup[1] is not None}
tuples = list(string.Formatter().parse(cached_resources[key]))
cached_placeholders = {tup[1] for tup in tuples if tup[1] is not None}
if updated_placeholders != cached_placeholders:
_LOGGER.error(
(
"Validation of translation placeholders for localized (%s) string "
"%s failed: (%s != %s)"
),
language,
key,
updated_placeholders,
cached_placeholders,
)
mismatches.add(key)
for mismatch in mismatches:
del updated_resources[mismatch]
return updated_resources
@callback
def _build_category_cache(
self,
language: str,
components: set[str],
translation_strings: dict[str, dict[str, Any]],
) -> None:
"""Extract resources into the cache."""
resource: dict[str, Any] | str
cached = self.cache_data.cache.setdefault(language, {})
categories = {
category
for component in translation_strings.values()
for category in component
}
for category in categories:
new_resources = build_resources(translation_strings, components, category)
category_cache = cached.setdefault(category, {})
for component, resource in new_resources.items():
component_cache = category_cache.setdefault(component, {})
if not isinstance(resource, dict):
component_cache[f"component.{component}.{category}"] = resource
continue
prefix = f"component.{component}.{category}."
flat = recursive_flatten(prefix, resource)
flat = self._validate_placeholders(language, flat, component_cache)
component_cache.update(flat)
@bind_hass
async def async_get_translations(
hass: HomeAssistant,
language: str,
category: str,
integrations: Iterable[str] | None = None,
config_flow: bool | None = None,
) -> dict[str, str]:
"""Return all backend translations.
If integration is specified, load it for that one.
Otherwise, default to loaded integrations combined with config flow
integrations if config_flow is true.
"""
if integrations is None and config_flow:
components = (await async_get_config_flows(hass)) - hass.config.components
elif integrations is not None:
components = set(integrations)
else:
components = hass.config.top_level_components
return await _async_get_translations_cache(hass).async_fetch(
language, category, components
)
@callback
def async_get_cached_translations(
hass: HomeAssistant,
language: str,
category: str,
integration: str | None = None,
) -> dict[str, str]:
"""Return all cached backend translations.
If integration is specified, return translations for it.
Otherwise, default to all loaded integrations.
"""
components = {integration} if integration else hass.config.top_level_components
return _async_get_translations_cache(hass).get_cached(
language, category, components
)
@singleton.singleton(TRANSLATION_FLATTEN_CACHE)
def _async_get_translations_cache(hass: HomeAssistant) -> _TranslationCache:
"""Return the translation cache."""
return _TranslationCache(hass)
@callback
def async_setup(hass: HomeAssistant) -> None:
"""Create translation cache and register listeners for translation loaders.
Listeners load translations for every loaded component and after config change.
"""
cache = _TranslationCache(hass)
current_language = hass.config.language
_async_get_translations_cache(hass)
@callback
def _async_load_translations_filter(event_data: Mapping[str, Any]) -> bool:
"""Filter out unwanted events."""
nonlocal current_language
if (
new_language := event_data.get("language")
) and new_language != current_language:
current_language = new_language
return True
return False
async def _async_load_translations(event: Event) -> None:
new_language = event.data["language"]
_LOGGER.debug("Loading translations for language: %s", new_language)
await cache.async_load(new_language, hass.config.components)
hass.bus.async_listen(
EVENT_CORE_CONFIG_UPDATE,
_async_load_translations,
event_filter=_async_load_translations_filter,
)
async def async_load_integrations(hass: HomeAssistant, integrations: set[str]) -> None:
"""Load translations for integrations."""
await _async_get_translations_cache(hass).async_load(
hass.config.language, integrations
)
@callback
def async_translations_loaded(hass: HomeAssistant, components: set[str]) -> bool:
"""Return if the given components are loaded for the language."""
return _async_get_translations_cache(hass).async_is_loaded(
hass.config.language, components
)
@callback
def async_get_exception_message(
translation_domain: str,
translation_key: str,
translation_placeholders: dict[str, str] | None = None,
) -> str:
"""Return a translated exception message.
Defaults to English, requires translations to already be cached.
"""
language = "en"
hass = async_get_hass()
localize_key = (
f"component.{translation_domain}.exceptions.{translation_key}.message"
)
translations = async_get_cached_translations(hass, language, "exceptions")
if localize_key in translations:
if message := translations[localize_key]:
message = message.rstrip(".")
if not translation_placeholders:
return message
with suppress(KeyError):
message = message.format(**translation_placeholders)
return message
# We return the translation key when was not found in the cache
return translation_key
@callback
def async_translate_state(
hass: HomeAssistant,
state: str,
domain: str,
platform: str | None,
translation_key: str | None,
device_class: str | None,
) -> str:
"""Translate provided state using cached translations for currently selected language."""
if state in [STATE_UNAVAILABLE, STATE_UNKNOWN]:
return state
language = hass.config.language
if platform is not None and translation_key is not None:
localize_key = (
f"component.{platform}.entity.{domain}.{translation_key}.state.{state}"
)
translations = async_get_cached_translations(hass, language, "entity")
if localize_key in translations:
return translations[localize_key]
translations = async_get_cached_translations(hass, language, "entity_component")
if device_class is not None:
localize_key = (
f"component.{domain}.entity_component.{device_class}.state.{state}"
)
if localize_key in translations:
return translations[localize_key]
localize_key = f"component.{domain}.entity_component._.state.{state}"
if localize_key in translations:
return translations[localize_key]
return state