core/homeassistant/components/sensor/recorder.py

864 lines
32 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Statistics helper for sensor."""
from __future__ import annotations
from collections import defaultdict
from collections.abc import Callable, Iterable
from contextlib import suppress
import datetime
import itertools
import logging
import math
from typing import Any
from sqlalchemy.orm.session import Session
from homeassistant.components.recorder import (
DOMAIN as RECORDER_DOMAIN,
get_instance,
history,
statistics,
)
from homeassistant.components.recorder.models import (
StatisticData,
StatisticMetaData,
StatisticResult,
)
from homeassistant.const import (
ATTR_UNIT_OF_MEASUREMENT,
REVOLUTIONS_PER_MINUTE,
UnitOfIrradiance,
UnitOfSoundPressure,
UnitOfVolume,
)
from homeassistant.core import HomeAssistant, State, callback, split_entity_id
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import issue_registry as ir
from homeassistant.helpers.entity import entity_sources
from homeassistant.helpers.typing import UNDEFINED, UndefinedType
from homeassistant.loader import async_suggest_report_issue
from homeassistant.util import dt as dt_util
from homeassistant.util.async_ import run_callback_threadsafe
from homeassistant.util.enum import try_parse_enum
from homeassistant.util.hass_dict import HassKey
from .const import (
ATTR_LAST_RESET,
ATTR_STATE_CLASS,
DOMAIN,
SensorStateClass,
UnitOfVolumeFlowRate,
)
_LOGGER = logging.getLogger(__name__)
DEFAULT_STATISTICS = {
SensorStateClass.MEASUREMENT: {"mean", "min", "max"},
SensorStateClass.TOTAL: {"sum"},
SensorStateClass.TOTAL_INCREASING: {"sum"},
}
EQUIVALENT_UNITS = {
"BTU/(h×ft²)": UnitOfIrradiance.BTUS_PER_HOUR_SQUARE_FOOT,
"dBa": UnitOfSoundPressure.WEIGHTED_DECIBEL_A,
"RPM": REVOLUTIONS_PER_MINUTE,
"ft3": UnitOfVolume.CUBIC_FEET,
"m3": UnitOfVolume.CUBIC_METERS,
"ft³/m": UnitOfVolumeFlowRate.CUBIC_FEET_PER_MINUTE,
}
# Keep track of entities for which a warning about decreasing value has been logged
SEEN_DIP: HassKey[set[str]] = HassKey(f"{DOMAIN}_seen_total_increasing_dip")
WARN_DIP: HassKey[set[str]] = HassKey(f"{DOMAIN}_warn_total_increasing_dip")
# Keep track of entities for which a warning about negative value has been logged
WARN_NEGATIVE: HassKey[set[str]] = HassKey(f"{DOMAIN}_warn_total_increasing_negative")
# Keep track of entities for which a warning about unsupported unit has been logged
WARN_UNSUPPORTED_UNIT: HassKey[set[str]] = HassKey(f"{DOMAIN}_warn_unsupported_unit")
WARN_UNSTABLE_UNIT: HassKey[set[str]] = HassKey(f"{DOMAIN}_warn_unstable_unit")
# Link to dev statistics where issues around LTS can be fixed
LINK_DEV_STATISTICS = "https://my.home-assistant.io/redirect/developer_statistics"
def _get_sensor_states(hass: HomeAssistant) -> list[State]:
"""Get the current state of all sensors for which to compile statistics."""
instance = get_instance(hass)
# We check for state class first before calling the filter
# function as the filter function is much more expensive
# than checking the state class
entity_filter = instance.entity_filter
return [
state
for state in hass.states.all(DOMAIN)
if (state_class := state.attributes.get(ATTR_STATE_CLASS))
and (
type(state_class) is SensorStateClass
or try_parse_enum(SensorStateClass, state_class)
)
and (not entity_filter or entity_filter(state.entity_id))
]
def _time_weighted_average(
fstates: list[tuple[float, State]], start: datetime.datetime, end: datetime.datetime
) -> float:
"""Calculate a time weighted average.
The average is calculated by weighting the states by duration in seconds between
state changes.
Note: there's no interpolation of values between state changes.
"""
old_fstate: float | None = None
old_start_time: datetime.datetime | None = None
accumulated = 0.0
for fstate, state in fstates:
# The recorder will give us the last known state, which may be well
# before the requested start time for the statistics
start_time = max(state.last_updated, start)
if old_start_time is None:
# Adjust start time, if there was no last known state
start = start_time
else:
duration = start_time - old_start_time
# Accumulate the value, weighted by duration until next state change
assert old_fstate is not None
accumulated += old_fstate * duration.total_seconds()
old_fstate = fstate
old_start_time = start_time
if old_fstate is not None:
# Accumulate the value, weighted by duration until end of the period
assert old_start_time is not None
duration = end - old_start_time
accumulated += old_fstate * duration.total_seconds()
period_seconds = (end - start).total_seconds()
if period_seconds == 0:
# If the only state changed that happened was at the exact moment
# at the end of the period, we can't calculate a meaningful average
# so we return 0.0 since it represents a time duration smaller than
# we can measure. This probably means the precision of statistics
# column schema in the database is incorrect but it is actually possible
# to happen if the state change event fired at the exact microsecond
return 0.0
return accumulated / period_seconds
def _get_units(fstates: list[tuple[float, State]]) -> set[str | None]:
"""Return a set of all units."""
return {item[1].attributes.get(ATTR_UNIT_OF_MEASUREMENT) for item in fstates}
def _equivalent_units(units: set[str | None]) -> bool:
"""Return True if the units are equivalent."""
if len(units) == 1:
return True
units = {
EQUIVALENT_UNITS[unit] if unit in EQUIVALENT_UNITS else unit # noqa: SIM401
for unit in units
}
return len(units) == 1
def _entity_history_to_float_and_state(
entity_history: Iterable[State],
) -> list[tuple[float, State]]:
"""Return a list of (float, state) tuples for the given entity."""
float_states: list[tuple[float, State]] = []
append = float_states.append
isfinite = math.isfinite
for state in entity_history:
try:
if (float_state := float(state.state)) is not None and isfinite(
float_state
):
append((float_state, state))
except (ValueError, TypeError):
pass
return float_states
def _is_numeric(state: State) -> bool:
"""Return if the state is numeric."""
with suppress(ValueError, TypeError):
if (num_state := float(state.state)) is not None and math.isfinite(num_state):
return True
return False
def _normalize_states(
hass: HomeAssistant,
old_metadatas: dict[str, tuple[int, StatisticMetaData]],
fstates: list[tuple[float, State]],
entity_id: str,
) -> tuple[str | None, list[tuple[float, State]]]:
"""Normalize units."""
state_unit: str | None = None
statistics_unit: str | None
state_unit = fstates[0][1].attributes.get(ATTR_UNIT_OF_MEASUREMENT)
old_metadata = old_metadatas[entity_id][1] if entity_id in old_metadatas else None
if not old_metadata:
# We've not seen this sensor before, the first valid state determines the unit
# used for statistics
statistics_unit = state_unit
else:
# We have seen this sensor before, use the unit from metadata
statistics_unit = old_metadata["unit_of_measurement"]
if statistics_unit not in statistics.STATISTIC_UNIT_TO_UNIT_CONVERTER:
# The unit used by this sensor doesn't support unit conversion
all_units = _get_units(fstates)
if not _equivalent_units(all_units):
if WARN_UNSTABLE_UNIT not in hass.data:
hass.data[WARN_UNSTABLE_UNIT] = set()
if entity_id not in hass.data[WARN_UNSTABLE_UNIT]:
hass.data[WARN_UNSTABLE_UNIT].add(entity_id)
extra = ""
if old_metadata:
extra = (
" and matches the unit of already compiled statistics "
f"({old_metadata['unit_of_measurement']})"
)
_LOGGER.warning(
(
"The unit of %s is changing, got multiple %s, generation of"
" long term statistics will be suppressed unless the unit is"
" stable%s. Go to %s to fix this"
),
entity_id,
all_units,
extra,
LINK_DEV_STATISTICS,
)
return None, []
return state_unit, fstates
converter = statistics.STATISTIC_UNIT_TO_UNIT_CONVERTER[statistics_unit]
valid_fstates: list[tuple[float, State]] = []
convert: Callable[[float], float] | None = None
last_unit: str | None | UndefinedType = UNDEFINED
valid_units = converter.VALID_UNITS
for fstate, state in fstates:
state_unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
# Exclude states with unsupported unit from statistics
if state_unit not in valid_units:
if WARN_UNSUPPORTED_UNIT not in hass.data:
hass.data[WARN_UNSUPPORTED_UNIT] = set()
if entity_id not in hass.data[WARN_UNSUPPORTED_UNIT]:
hass.data[WARN_UNSUPPORTED_UNIT].add(entity_id)
_LOGGER.warning(
(
"The unit of %s (%s) cannot be converted to the unit of"
" previously compiled statistics (%s). Generation of long term"
" statistics will be suppressed unless the unit changes back to"
" %s or a compatible unit. Go to %s to fix this"
),
entity_id,
state_unit,
statistics_unit,
statistics_unit,
LINK_DEV_STATISTICS,
)
continue
if state_unit != last_unit:
# The unit of measurement has changed since the last state change
# recreate the converter factory
if state_unit == statistics_unit:
convert = None
else:
convert = converter.converter_factory(state_unit, statistics_unit)
last_unit = state_unit
if convert is not None:
fstate = convert(fstate)
valid_fstates.append((fstate, state))
return statistics_unit, valid_fstates
def _suggest_report_issue(hass: HomeAssistant, entity_id: str) -> str:
"""Suggest to report an issue."""
entity_info = entity_sources(hass).get(entity_id)
return async_suggest_report_issue(
hass, integration_domain=entity_info["domain"] if entity_info else None
)
def warn_dip(
hass: HomeAssistant, entity_id: str, state: State, previous_fstate: float
) -> None:
"""Log a warning once if a sensor with state_class_total has a decreasing value.
The log will be suppressed until two dips have been seen to prevent warning due to
rounding issues with databases storing the state as a single precision float, which
was fixed in recorder DB version 20.
"""
if SEEN_DIP not in hass.data:
hass.data[SEEN_DIP] = set()
if entity_id not in hass.data[SEEN_DIP]:
hass.data[SEEN_DIP].add(entity_id)
return
if WARN_DIP not in hass.data:
hass.data[WARN_DIP] = set()
if entity_id not in hass.data[WARN_DIP]:
hass.data[WARN_DIP].add(entity_id)
entity_info = entity_sources(hass).get(entity_id)
domain = entity_info["domain"] if entity_info else None
if domain in ["energy", "growatt_server", "solaredge"]:
return
_LOGGER.warning(
(
"Entity %s %shas state class total_increasing, but its state is not"
" strictly increasing. Triggered by state %s (%s) with last_updated set"
" to %s. Please %s"
),
entity_id,
f"from integration {domain} " if domain else "",
state.state,
previous_fstate,
state.last_updated.isoformat(),
_suggest_report_issue(hass, entity_id),
)
def warn_negative(hass: HomeAssistant, entity_id: str, state: State) -> None:
"""Log a warning once if a sensor with state_class_total has a negative value."""
if WARN_NEGATIVE not in hass.data:
hass.data[WARN_NEGATIVE] = set()
if entity_id not in hass.data[WARN_NEGATIVE]:
hass.data[WARN_NEGATIVE].add(entity_id)
entity_info = entity_sources(hass).get(entity_id)
domain = entity_info["domain"] if entity_info else None
_LOGGER.warning(
(
"Entity %s %shas state class total_increasing, but its state is "
"negative. Triggered by state %s with last_updated set to %s. Please %s"
),
entity_id,
f"from integration {domain} " if domain else "",
state.state,
state.last_updated.isoformat(),
_suggest_report_issue(hass, entity_id),
)
def reset_detected(
hass: HomeAssistant,
entity_id: str,
fstate: float,
previous_fstate: float | None,
state: State,
) -> bool:
"""Test if a total_increasing sensor has been reset."""
if previous_fstate is None:
return False
if 0.9 * previous_fstate <= fstate < previous_fstate:
warn_dip(hass, entity_id, state, previous_fstate)
if fstate < 0:
warn_negative(hass, entity_id, state)
raise HomeAssistantError
return fstate < 0.9 * previous_fstate
def _wanted_statistics(sensor_states: list[State]) -> dict[str, set[str]]:
"""Prepare a dict with wanted statistics for entities."""
return {
state.entity_id: DEFAULT_STATISTICS[state.attributes[ATTR_STATE_CLASS]]
for state in sensor_states
}
def _last_reset_as_utc_isoformat(last_reset_s: Any, entity_id: str) -> str | None:
"""Parse last_reset and convert it to UTC."""
if last_reset_s is None:
return None
if isinstance(last_reset_s, str):
last_reset = dt_util.parse_datetime(last_reset_s)
else:
last_reset = None
if last_reset is None:
_LOGGER.warning(
"Ignoring invalid last reset '%s' for %s", last_reset_s, entity_id
)
return None
return dt_util.as_utc(last_reset).isoformat()
def _timestamp_to_isoformat_or_none(timestamp: float | None) -> str | None:
"""Convert a timestamp to ISO format or return None."""
if timestamp is None:
return None
return dt_util.utc_from_timestamp(timestamp).isoformat()
def compile_statistics( # noqa: C901
hass: HomeAssistant,
session: Session,
start: datetime.datetime,
end: datetime.datetime,
) -> statistics.PlatformCompiledStatistics:
"""Compile statistics for all entities during start-end."""
result: list[StatisticResult] = []
sensor_states = _get_sensor_states(hass)
wanted_statistics = _wanted_statistics(sensor_states)
# Get history between start and end
entities_full_history = [
i.entity_id for i in sensor_states if "sum" in wanted_statistics[i.entity_id]
]
history_list: dict[str, list[State]] = {}
if entities_full_history:
history_list = history.get_full_significant_states_with_session(
hass,
session,
start - datetime.timedelta.resolution,
end,
entity_ids=entities_full_history,
significant_changes_only=False,
)
entities_significant_history = [
i.entity_id
for i in sensor_states
if "sum" not in wanted_statistics[i.entity_id]
]
if entities_significant_history:
_history_list = history.get_full_significant_states_with_session(
hass,
session,
start - datetime.timedelta.resolution,
end,
entity_ids=entities_significant_history,
)
history_list = {**history_list, **_history_list}
entities_with_float_states: dict[str, list[tuple[float, State]]] = {}
for _state in sensor_states:
entity_id = _state.entity_id
# If there are no recent state changes, the sensor's state may already be pruned
# from the recorder. Get the state from the state machine instead.
if not (entity_history := history_list.get(entity_id, [_state])):
continue
if not (float_states := _entity_history_to_float_and_state(entity_history)):
continue
entities_with_float_states[entity_id] = float_states
# Only lookup metadata for entities that have valid float states
# since it will result in cache misses for statistic_ids
# that are not in the metadata table and we are not working
# with them anyway.
old_metadatas = statistics.get_metadata_with_session(
get_instance(hass), session, statistic_ids=set(entities_with_float_states)
)
to_process: list[tuple[str, str | None, str, list[tuple[float, State]]]] = []
to_query: set[str] = set()
for _state in sensor_states:
entity_id = _state.entity_id
if not (maybe_float_states := entities_with_float_states.get(entity_id)):
continue
statistics_unit, valid_float_states = _normalize_states(
hass,
old_metadatas,
maybe_float_states,
entity_id,
)
if not valid_float_states:
continue
state_class: str = _state.attributes[ATTR_STATE_CLASS]
to_process.append((entity_id, statistics_unit, state_class, valid_float_states))
if "sum" in wanted_statistics[entity_id]:
to_query.add(entity_id)
last_stats = statistics.get_latest_short_term_statistics_with_session(
hass, session, to_query, {"last_reset", "state", "sum"}, metadata=old_metadatas
)
for ( # pylint: disable=too-many-nested-blocks
entity_id,
statistics_unit,
state_class,
valid_float_states,
) in to_process:
# Check metadata
if old_metadata := old_metadatas.get(entity_id):
if not _equivalent_units(
{old_metadata[1]["unit_of_measurement"], statistics_unit}
):
if WARN_UNSTABLE_UNIT not in hass.data:
hass.data[WARN_UNSTABLE_UNIT] = set()
if entity_id not in hass.data[WARN_UNSTABLE_UNIT]:
hass.data[WARN_UNSTABLE_UNIT].add(entity_id)
_LOGGER.warning(
(
"The unit of %s (%s) cannot be converted to the unit of"
" previously compiled statistics (%s). Generation of long"
" term statistics will be suppressed unless the unit"
" changes back to %s or a compatible unit. Go to %s to fix"
" this"
),
entity_id,
statistics_unit,
old_metadata[1]["unit_of_measurement"],
old_metadata[1]["unit_of_measurement"],
LINK_DEV_STATISTICS,
)
continue
# Set meta data
meta: StatisticMetaData = {
"has_mean": "mean" in wanted_statistics[entity_id],
"has_sum": "sum" in wanted_statistics[entity_id],
"name": None,
"source": RECORDER_DOMAIN,
"statistic_id": entity_id,
"unit_of_measurement": statistics_unit,
}
# Make calculations
stat: StatisticData = {"start": start}
if "max" in wanted_statistics[entity_id]:
stat["max"] = max(
*itertools.islice(zip(*valid_float_states, strict=False), 1)
)
if "min" in wanted_statistics[entity_id]:
stat["min"] = min(
*itertools.islice(zip(*valid_float_states, strict=False), 1)
)
if "mean" in wanted_statistics[entity_id]:
stat["mean"] = _time_weighted_average(valid_float_states, start, end)
if "sum" in wanted_statistics[entity_id]:
last_reset = old_last_reset = None
new_state = old_state = None
_sum = 0.0
if entity_id in last_stats:
# We have compiled history for this sensor before,
# use that as a starting point.
last_stat = last_stats[entity_id][0]
last_reset = _timestamp_to_isoformat_or_none(last_stat["last_reset"])
old_last_reset = last_reset
# If there are no previous values and has_sum
# was previously false there will be no last_stat
# for state or sum
new_state = old_state = last_stat.get("state")
_sum = last_stat.get("sum") or 0.0
for fstate, state in valid_float_states:
reset = False
if (
state_class != SensorStateClass.TOTAL_INCREASING
and (
last_reset := _last_reset_as_utc_isoformat(
state.attributes.get("last_reset"), entity_id
)
)
!= old_last_reset
and last_reset is not None
):
if old_state is None:
_LOGGER.info(
(
"Compiling initial sum statistics for %s, zero point"
" set to %s"
),
entity_id,
fstate,
)
else:
_LOGGER.info(
(
"Detected new cycle for %s, last_reset set to %s (old"
" last_reset %s)"
),
entity_id,
last_reset,
old_last_reset,
)
reset = True
elif old_state is None and last_reset is None:
reset = True
_LOGGER.info(
"Compiling initial sum statistics for %s, zero point set to %s",
entity_id,
fstate,
)
elif state_class == SensorStateClass.TOTAL_INCREASING:
try:
if old_state is None or reset_detected(
hass, entity_id, fstate, new_state, state
):
reset = True
_LOGGER.info(
(
"Detected new cycle for %s, value dropped from %s"
" to %s, triggered by state with last_updated set"
" to %s"
),
entity_id,
new_state,
fstate,
state.last_updated.isoformat(),
)
except HomeAssistantError:
continue
if reset:
# The sensor has been reset, update the sum
if old_state is not None and new_state is not None:
_sum += new_state - old_state
# ..and update the starting point
new_state = fstate
old_last_reset = last_reset
# Force a new cycle for an existing sensor to start at 0
if old_state is not None:
old_state = 0.0
else:
old_state = new_state
else:
new_state = fstate
if new_state is None or old_state is None:
# No valid updates
continue
# Update the sum with the last state
_sum += new_state - old_state
if last_reset is not None:
stat["last_reset"] = dt_util.parse_datetime(last_reset)
stat["sum"] = _sum
stat["state"] = new_state
result.append({"meta": meta, "stat": stat})
return statistics.PlatformCompiledStatistics(result, old_metadatas)
def list_statistic_ids(
hass: HomeAssistant,
statistic_ids: list[str] | tuple[str] | None = None,
statistic_type: str | None = None,
) -> dict:
"""Return all or filtered statistic_ids and meta data."""
entities = _get_sensor_states(hass)
result: dict[str, StatisticMetaData] = {}
for state in entities:
entity_id = state.entity_id
if statistic_ids is not None and entity_id not in statistic_ids:
continue
attributes = state.attributes
state_class = attributes[ATTR_STATE_CLASS]
provided_statistics = DEFAULT_STATISTICS[state_class]
if statistic_type is not None and statistic_type not in provided_statistics:
continue
if (
(has_sum := "sum" in provided_statistics)
and ATTR_LAST_RESET not in attributes
and state_class == SensorStateClass.MEASUREMENT
):
continue
result[entity_id] = {
"has_mean": "mean" in provided_statistics,
"has_sum": has_sum,
"name": None,
"source": RECORDER_DOMAIN,
"statistic_id": entity_id,
"unit_of_measurement": attributes.get(ATTR_UNIT_OF_MEASUREMENT),
}
return result
@callback
def _update_issues(
report_issue: Callable[[str, str, dict[str, Any]], None],
sensor_states: list[State],
metadatas: dict[str, tuple[int, StatisticMetaData]],
) -> None:
"""Update repair issues."""
for state in sensor_states:
entity_id = state.entity_id
numeric = _is_numeric(state)
state_class = try_parse_enum(
SensorStateClass, state.attributes.get(ATTR_STATE_CLASS)
)
state_unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
if metadata := metadatas.get(entity_id):
if numeric and state_class is None:
# Sensor no longer has a valid state class
report_issue(
"state_class_removed",
entity_id,
{"statistic_id": entity_id},
)
metadata_unit = metadata[1]["unit_of_measurement"]
converter = statistics.STATISTIC_UNIT_TO_UNIT_CONVERTER.get(metadata_unit)
if not converter:
if numeric and not _equivalent_units({state_unit, metadata_unit}):
# The unit has changed, and it's not possible to convert
report_issue(
"units_changed",
entity_id,
{
"statistic_id": entity_id,
"state_unit": state_unit,
"metadata_unit": metadata_unit,
"supported_unit": metadata_unit,
},
)
elif numeric and state_unit not in converter.VALID_UNITS:
# The state unit can't be converted to the unit in metadata
valid_units = (unit or "<None>" for unit in converter.VALID_UNITS)
valid_units_str = ", ".join(sorted(valid_units))
report_issue(
"units_changed",
entity_id,
{
"statistic_id": entity_id,
"state_unit": state_unit,
"metadata_unit": metadata_unit,
"supported_unit": valid_units_str,
},
)
def update_statistics_issues(
hass: HomeAssistant,
session: Session,
) -> None:
"""Validate statistics."""
instance = get_instance(hass)
sensor_states = hass.states.all(DOMAIN)
metadatas = statistics.get_metadata_with_session(
instance, session, statistic_source=RECORDER_DOMAIN
)
@callback
def get_sensor_statistics_issues(hass: HomeAssistant) -> set[str]:
"""Return a list of statistics issues."""
issues = set()
issue_registry = ir.async_get(hass)
for issue in issue_registry.issues.values():
if (
issue.domain != DOMAIN
or not (issue_data := issue.data)
or issue_data.get("issue_type")
not in ("state_class_removed", "units_changed")
):
continue
issues.add(issue.issue_id)
return issues
issues = run_callback_threadsafe(
hass.loop, get_sensor_statistics_issues, hass
).result()
def create_issue_registry_issue(
issue_type: str, statistic_id: str, data: dict[str, Any]
) -> None:
"""Create an issue registry issue."""
issue_id = f"{issue_type}_{statistic_id}"
issues.discard(issue_id)
ir.create_issue(
hass,
DOMAIN,
issue_id,
data=data | {"issue_type": issue_type},
is_fixable=False,
severity=ir.IssueSeverity.WARNING,
translation_key=issue_type,
translation_placeholders=data,
)
_update_issues(
create_issue_registry_issue,
sensor_states,
metadatas,
)
for issue_id in issues:
hass.loop.call_soon_threadsafe(ir.async_delete_issue, hass, DOMAIN, issue_id)
def validate_statistics(
hass: HomeAssistant,
) -> dict[str, list[statistics.ValidationIssue]]:
"""Validate statistics."""
validation_result = defaultdict(list)
sensor_states = hass.states.all(DOMAIN)
metadatas = statistics.get_metadata(hass, statistic_source=RECORDER_DOMAIN)
sensor_entity_ids = {i.entity_id for i in sensor_states}
sensor_statistic_ids = set(metadatas)
instance = get_instance(hass)
entity_filter = instance.entity_filter
def create_statistic_validation_issue(
issue_type: str, statistic_id: str, data: dict[str, Any]
) -> None:
"""Create a statistic validation issue."""
validation_result[statistic_id].append(
statistics.ValidationIssue(issue_type, data)
)
_update_issues(
create_statistic_validation_issue,
sensor_states,
metadatas,
)
for state in sensor_states:
entity_id = state.entity_id
state_class = try_parse_enum(
SensorStateClass, state.attributes.get(ATTR_STATE_CLASS)
)
if entity_id in metadatas:
if entity_filter and not entity_filter(state.entity_id):
# Sensor was previously recorded, but no longer is
validation_result[entity_id].append(
statistics.ValidationIssue(
"entity_no_longer_recorded",
{"statistic_id": entity_id},
)
)
elif state_class is not None:
if entity_filter and not entity_filter(state.entity_id):
# Sensor is not recorded
validation_result[entity_id].append(
statistics.ValidationIssue(
"entity_not_recorded",
{"statistic_id": entity_id},
)
)
for statistic_id in sensor_statistic_ids - sensor_entity_ids:
if split_entity_id(statistic_id)[0] != DOMAIN:
continue
# There is no sensor matching the statistics_id
validation_result[statistic_id].append(
statistics.ValidationIssue(
"no_state",
{
"statistic_id": statistic_id,
},
)
)
return validation_result