core/tests/components/recorder/test_init.py

2749 lines
95 KiB
Python

"""The tests for the Recorder component."""
from __future__ import annotations
import asyncio
from collections.abc import Generator
from datetime import datetime, timedelta
import sqlite3
import sys
import threading
from typing import Any, cast
from unittest.mock import MagicMock, Mock, patch
from freezegun.api import FrozenDateTimeFactory
import pytest
from sqlalchemy.exc import DatabaseError, OperationalError, SQLAlchemyError
from sqlalchemy.pool import QueuePool
from homeassistant.components import recorder
from homeassistant.components.lock import LockState
from homeassistant.components.recorder import (
CONF_AUTO_PURGE,
CONF_AUTO_REPACK,
CONF_COMMIT_INTERVAL,
CONF_DB_MAX_RETRIES,
CONF_DB_RETRY_WAIT,
CONF_DB_URL,
CONFIG_SCHEMA,
DOMAIN,
Recorder,
db_schema,
get_instance,
migration,
statistics,
)
from homeassistant.components.recorder.const import (
EVENT_RECORDER_5MIN_STATISTICS_GENERATED,
EVENT_RECORDER_HOURLY_STATISTICS_GENERATED,
KEEPALIVE_TIME,
SupportedDialect,
)
from homeassistant.components.recorder.db_schema import (
SCHEMA_VERSION,
EventData,
Events,
EventTypes,
RecorderRuns,
StateAttributes,
States,
StatesMeta,
StatisticsRuns,
)
from homeassistant.components.recorder.models import process_timestamp
from homeassistant.components.recorder.queries import select_event_type_ids
from homeassistant.components.recorder.services import (
SERVICE_DISABLE,
SERVICE_ENABLE,
SERVICE_PURGE,
SERVICE_PURGE_ENTITIES,
)
from homeassistant.components.recorder.table_managers import (
state_attributes as state_attributes_table_manager,
states_meta as states_meta_table_manager,
)
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import (
EVENT_COMPONENT_LOADED,
EVENT_HOMEASSISTANT_CLOSE,
EVENT_HOMEASSISTANT_FINAL_WRITE,
EVENT_HOMEASSISTANT_STARTED,
EVENT_HOMEASSISTANT_STOP,
MATCH_ALL,
)
from homeassistant.core import Context, CoreState, Event, HomeAssistant, State, callback
from homeassistant.helpers import (
entity_registry as er,
issue_registry as ir,
recorder as recorder_helper,
)
from homeassistant.helpers.typing import ConfigType
from homeassistant.setup import async_setup_component
from homeassistant.util import dt as dt_util
from homeassistant.util.json import json_loads
from .common import (
async_block_recorder,
async_recorder_block_till_done,
async_wait_recording_done,
convert_pending_states_to_meta,
corrupt_db_file,
run_information_with_session,
)
from tests.common import (
MockEntity,
MockEntityPlatform,
async_fire_time_changed,
async_test_home_assistant,
mock_platform,
)
from tests.typing import RecorderInstanceGenerator
@pytest.fixture
async def mock_recorder_before_hass(
async_test_recorder: RecorderInstanceGenerator,
) -> None:
"""Set up recorder."""
@pytest.fixture
def setup_recorder(recorder_mock: Recorder) -> None:
"""Set up recorder."""
@pytest.fixture
def small_cache_size() -> Generator[None]:
"""Patch the default cache size to 8."""
with (
patch.object(state_attributes_table_manager, "CACHE_SIZE", 8),
patch.object(states_meta_table_manager, "CACHE_SIZE", 8),
):
yield
def _default_recorder(hass: HomeAssistant) -> Recorder:
"""Return a recorder with reasonable defaults."""
return Recorder(
hass,
auto_purge=True,
auto_repack=True,
keep_days=7,
commit_interval=1,
uri="sqlite://",
db_max_retries=10,
db_retry_wait=3,
entity_filter=CONFIG_SCHEMA({DOMAIN: {}}),
exclude_event_types=set(),
)
@pytest.mark.parametrize("persistent_database", [True])
async def test_shutdown_before_startup_finishes(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
) -> None:
"""Test shutdown before recorder starts is clean.
On-disk database because this test does not play nice with the MutexPool.
"""
config = {
recorder.CONF_COMMIT_INTERVAL: 1,
}
hass.set_state(CoreState.not_running)
recorder_helper.async_initialize_recorder(hass)
hass.async_create_task(async_setup_recorder_instance(hass, config))
await recorder_helper.async_wait_recorder(hass)
instance = get_instance(hass)
session = await instance.async_add_executor_job(instance.get_session)
with patch.object(instance, "engine"):
hass.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE)
await hass.async_block_till_done()
await hass.async_stop()
# The database executor is shutdown so we must run the
# query in the main thread for testing
instance.recorder_and_worker_thread_ids.add(threading.get_ident())
run_info = run_information_with_session(session)
assert run_info.run_id == 1
assert run_info.start is not None
assert run_info.end is not None
# We patched out engine to prevent the close from happening
# so we need to manually close the session
session.close()
await hass.async_add_executor_job(instance._shutdown)
async def test_canceled_before_startup_finishes(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test recorder shuts down when its startup future is canceled out from under it."""
hass.set_state(CoreState.not_running)
recorder_helper.async_initialize_recorder(hass)
hass.async_create_task(async_setup_recorder_instance(hass))
await recorder_helper.async_wait_recorder(hass)
instance = get_instance(hass)
instance._hass_started.cancel()
with patch.object(instance, "engine"):
await hass.async_block_till_done()
await hass.async_add_executor_job(instance.join)
assert (
"Recorder startup was externally canceled before it could complete"
in caplog.text
)
# We patched out engine to prevent the close from happening
# so we need to manually close the session
await hass.async_add_executor_job(instance._shutdown)
async def test_shutdown_closes_connections(
hass: HomeAssistant, setup_recorder: None
) -> None:
"""Test shutdown closes connections."""
hass.set_state(CoreState.not_running)
instance = recorder.get_instance(hass)
await instance.async_db_ready
await hass.async_block_till_done()
pool = instance.engine
def _ensure_connected():
with session_scope(hass=hass, read_only=True) as session:
list(session.query(States))
await instance.async_add_executor_job(_ensure_connected)
with patch.object(pool, "dispose", wraps=pool.dispose) as dispose:
hass.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE)
await hass.async_block_till_done()
assert len(dispose.mock_calls) == 1
with pytest.raises(RuntimeError):
assert instance.get_session()
async def test_state_gets_saved_when_set_before_start_event(
hass: HomeAssistant, async_setup_recorder_instance: RecorderInstanceGenerator
) -> None:
"""Test we can record an event when starting with not running."""
hass.set_state(CoreState.not_running)
recorder_helper.async_initialize_recorder(hass)
hass.async_create_task(async_setup_recorder_instance(hass))
await recorder_helper.async_wait_recorder(hass)
entity_id = "test.recorder"
state = "restoring_from_db"
attributes = {"test_attr": 5, "test_attr_10": "nice"}
hass.states.async_set(entity_id, state, attributes)
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await async_wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
db_states = list(session.query(States))
assert len(db_states) == 1
assert db_states[0].event_id is None
async def test_saving_state(hass: HomeAssistant, setup_recorder: None) -> None:
"""Test saving and restoring a state."""
entity_id = "test.recorder"
state = "restoring_from_db"
attributes = {"test_attr": 5, "test_attr_10": "nice"}
hass.states.async_set(entity_id, state, attributes)
await async_wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
db_states = []
for db_state, db_state_attributes, states_meta in (
session.query(States, StateAttributes, StatesMeta)
.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
):
db_state.entity_id = states_meta.entity_id
db_states.append(db_state)
state = db_state.to_native()
state.attributes = db_state_attributes.to_native()
assert len(db_states) == 1
assert db_states[0].event_id is None
assert state.as_dict() == _state_with_context(hass, entity_id).as_dict()
@pytest.mark.parametrize(
("db_engine", "expected_attributes"),
[
(SupportedDialect.MYSQL, {"test_attr": 5, "test_attr_10": "silly\0stuff"}),
(SupportedDialect.POSTGRESQL, {"test_attr": 5, "test_attr_10": "silly"}),
(SupportedDialect.SQLITE, {"test_attr": 5, "test_attr_10": "silly\0stuff"}),
],
)
async def test_saving_state_with_nul(
hass: HomeAssistant,
db_engine: str,
recorder_dialect_name: None,
setup_recorder: None,
expected_attributes: dict[str, Any],
) -> None:
"""Test saving and restoring a state with nul in attributes."""
entity_id = "test.recorder"
state = "restoring_from_db"
attributes = {"test_attr": 5, "test_attr_10": "silly\0stuff"}
hass.states.async_set(entity_id, state, attributes)
await async_wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
db_states = []
for db_state, db_state_attributes, states_meta in (
session.query(States, StateAttributes, StatesMeta)
.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
):
db_state.entity_id = states_meta.entity_id
db_states.append(db_state)
state = db_state.to_native()
state.attributes = db_state_attributes.to_native()
assert len(db_states) == 1
assert db_states[0].event_id is None
expected = _state_with_context(hass, entity_id)
expected.attributes = expected_attributes
assert state.as_dict() == expected.as_dict()
async def test_saving_many_states(
hass: HomeAssistant, async_setup_recorder_instance: RecorderInstanceGenerator
) -> None:
"""Test we expire after many commits."""
instance = await async_setup_recorder_instance(
hass, {recorder.CONF_COMMIT_INTERVAL: 0}
)
entity_id = "test.recorder"
attributes = {"test_attr": 5, "test_attr_10": "nice"}
with (
patch.object(instance.event_session, "expire_all") as expire_all,
patch.object(recorder.core, "EXPIRE_AFTER_COMMITS", 2),
):
for _ in range(3):
hass.states.async_set(entity_id, "on", attributes)
await async_wait_recording_done(hass)
hass.states.async_set(entity_id, "off", attributes)
await async_wait_recording_done(hass)
assert expire_all.called
with session_scope(hass=hass, read_only=True) as session:
db_states = list(session.query(States))
assert len(db_states) == 6
assert db_states[0].event_id is None
async def test_saving_state_with_intermixed_time_changes(
hass: HomeAssistant, setup_recorder: None
) -> None:
"""Test saving states with intermixed time changes."""
entity_id = "test.recorder"
state = "restoring_from_db"
attributes = {"test_attr": 5, "test_attr_10": "nice"}
attributes2 = {"test_attr": 10, "test_attr_10": "mean"}
for _ in range(KEEPALIVE_TIME + 1):
async_fire_time_changed(hass, dt_util.utcnow())
hass.states.async_set(entity_id, state, attributes)
for _ in range(KEEPALIVE_TIME + 1):
async_fire_time_changed(hass, dt_util.utcnow())
hass.states.async_set(entity_id, state, attributes2)
await async_wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
db_states = list(session.query(States))
assert len(db_states) == 2
assert db_states[0].event_id is None
async def test_saving_state_with_exception(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
setup_recorder: None,
) -> None:
"""Test saving and restoring a state."""
entity_id = "test.recorder"
state = "restoring_from_db"
attributes = {"test_attr": 5, "test_attr_10": "nice"}
def _throw_if_state_in_session(*args, **kwargs):
for obj in get_instance(hass).event_session:
if isinstance(obj, States):
raise OperationalError(
"insert the state", "fake params", "forced to fail"
)
with (
patch("time.sleep"),
patch.object(
get_instance(hass).event_session,
"flush",
side_effect=_throw_if_state_in_session,
),
):
hass.states.async_set(entity_id, "fail", attributes)
await async_wait_recording_done(hass)
assert "Error executing query" in caplog.text
assert "Error saving events" not in caplog.text
caplog.clear()
hass.states.async_set(entity_id, state, attributes)
await async_wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
db_states = list(session.query(States))
assert len(db_states) >= 1
assert "Error executing query" not in caplog.text
assert "Error saving events" not in caplog.text
async def test_saving_state_with_sqlalchemy_exception(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
setup_recorder: None,
) -> None:
"""Test saving state when there is an SQLAlchemyError."""
entity_id = "test.recorder"
state = "restoring_from_db"
attributes = {"test_attr": 5, "test_attr_10": "nice"}
def _throw_if_state_in_session(*args, **kwargs):
for obj in get_instance(hass).event_session:
if isinstance(obj, States):
raise SQLAlchemyError(
"insert the state", "fake params", "forced to fail"
)
with (
patch("time.sleep"),
patch.object(
get_instance(hass).event_session,
"flush",
side_effect=_throw_if_state_in_session,
),
):
hass.states.async_set(entity_id, "fail", attributes)
await async_wait_recording_done(hass)
assert "SQLAlchemyError error processing task" in caplog.text
caplog.clear()
hass.states.async_set(entity_id, state, attributes)
await async_wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
db_states = list(session.query(States))
assert len(db_states) >= 1
assert "Error executing query" not in caplog.text
assert "Error saving events" not in caplog.text
assert "SQLAlchemyError error processing task" not in caplog.text
async def test_force_shutdown_with_queue_of_writes_that_generate_exceptions(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test forcing shutdown."""
instance = await async_setup_recorder_instance(hass)
entity_id = "test.recorder"
attributes = {"test_attr": 5, "test_attr_10": "nice"}
await async_wait_recording_done(hass)
with (
patch.object(instance, "db_retry_wait", 0.01),
patch.object(
instance.event_session,
"flush",
side_effect=OperationalError(
"insert the state", "fake params", "forced to fail"
),
),
):
for _ in range(100):
hass.states.async_set(entity_id, "on", attributes)
hass.states.async_set(entity_id, "off", attributes)
hass.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE)
hass.bus.async_fire(EVENT_HOMEASSISTANT_CLOSE)
await hass.async_block_till_done()
assert "Error executing query" in caplog.text
assert "Error saving events" not in caplog.text
async def test_saving_event(hass: HomeAssistant, setup_recorder: None) -> None:
"""Test saving and restoring an event."""
event_type = "EVENT_TEST"
event_data = {"test_attr": 5, "test_attr_10": "nice"}
events = []
@callback
def event_listener(event):
"""Record events from eventbus."""
if event.event_type == event_type:
events.append(event)
hass.bus.async_listen(MATCH_ALL, event_listener)
hass.bus.async_fire(event_type, event_data)
await async_wait_recording_done(hass)
assert len(events) == 1
event: Event = events[0]
await async_recorder_block_till_done(hass)
events: list[Event] = []
with session_scope(hass=hass, read_only=True) as session:
for select_event, event_data, event_types in (
session.query(Events, EventData, EventTypes)
.filter(Events.event_type_id.in_(select_event_type_ids((event_type,))))
.outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id))
.outerjoin(EventData, Events.data_id == EventData.data_id)
):
select_event = cast(Events, select_event)
event_data = cast(EventData, event_data)
event_types = cast(EventTypes, event_types)
native_event = select_event.to_native()
native_event.data = event_data.to_native()
native_event.event_type = event_types.event_type
events.append(native_event)
db_event = events[0]
assert event.event_type == db_event.event_type
assert event.data == db_event.data
assert event.origin == db_event.origin
# Recorder uses SQLite and stores datetimes as integer unix timestamps
assert event.time_fired.replace(microsecond=0) == db_event.time_fired.replace(
microsecond=0
)
async def test_saving_state_with_commit_interval_zero(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
) -> None:
"""Test saving a state with a commit interval of zero."""
await async_setup_recorder_instance(hass, {"commit_interval": 0})
assert get_instance(hass).commit_interval == 0
entity_id = "test.recorder"
state = "restoring_from_db"
attributes = {"test_attr": 5, "test_attr_10": "nice"}
hass.states.async_set(entity_id, state, attributes)
await async_wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
db_states = list(session.query(States))
assert len(db_states) == 1
assert db_states[0].event_id is None
async def _add_entities(hass: HomeAssistant, entity_ids: list[str]) -> list[State]:
"""Add entities."""
attributes = {"test_attr": 5, "test_attr_10": "nice"}
for idx, entity_id in enumerate(entity_ids):
hass.states.async_set(entity_id, f"state{idx}", attributes)
await async_wait_recording_done(hass)
with session_scope(hass=hass) as session:
states = []
for db_state, db_state_attributes, states_meta in (
session.query(States, StateAttributes, StatesMeta)
.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
):
db_state.entity_id = states_meta.entity_id
native_state = db_state.to_native()
native_state.attributes = db_state_attributes.to_native()
states.append(native_state)
convert_pending_states_to_meta(get_instance(hass), session)
return states
def _state_with_context(hass: HomeAssistant, entity_id: str) -> State | None:
# We don't restore context unless we need it by joining the
# events table on the event_id for state_changed events
return hass.states.get(entity_id)
async def test_setup_without_migration(
hass: HomeAssistant, setup_recorder: None
) -> None:
"""Verify the schema version without a migration."""
assert recorder.get_instance(hass).schema_version == SCHEMA_VERSION
async def test_saving_state_include_domains(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
) -> None:
"""Test saving and restoring a state."""
await async_setup_recorder_instance(hass, {"include": {"domains": "test2"}})
states = await _add_entities(hass, ["test.recorder", "test2.recorder"])
assert len(states) == 1
assert _state_with_context(hass, "test2.recorder").as_dict() == states[0].as_dict()
async def test_saving_state_include_domains_globs(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
) -> None:
"""Test saving and restoring a state."""
await async_setup_recorder_instance(
hass, {"include": {"domains": "test2", "entity_globs": "*.included_*"}}
)
states = await _add_entities(
hass, ["test.recorder", "test2.recorder", "test3.included_entity"]
)
assert len(states) == 2
state_map = {state.entity_id: state for state in states}
assert (
_state_with_context(hass, "test2.recorder").as_dict()
== state_map["test2.recorder"].as_dict()
)
assert (
_state_with_context(hass, "test3.included_entity").as_dict()
== state_map["test3.included_entity"].as_dict()
)
async def test_saving_state_incl_entities(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
) -> None:
"""Test saving and restoring a state."""
await async_setup_recorder_instance(
hass, {"include": {"entities": "test2.recorder"}}
)
states = await _add_entities(hass, ["test.recorder", "test2.recorder"])
assert len(states) == 1
assert _state_with_context(hass, "test2.recorder").as_dict() == states[0].as_dict()
async def test_saving_event_exclude_event_type(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
) -> None:
"""Test saving and restoring an event."""
config = {
"exclude": {
"event_types": [
"service_registered",
"homeassistant_start",
"component_loaded",
"core_config_updated",
"homeassistant_started",
"test",
]
}
}
instance = await async_setup_recorder_instance(hass, config)
events = ["test", "test2"]
for event_type in events:
hass.bus.async_fire(event_type)
await async_wait_recording_done(hass)
def _get_events(hass: HomeAssistant, event_type_list: list[str]) -> list[Event]:
with session_scope(hass=hass, read_only=True) as session:
events = []
for event, event_data, event_types in (
session.query(Events, EventData, EventTypes)
.outerjoin(
EventTypes, (Events.event_type_id == EventTypes.event_type_id)
)
.outerjoin(EventData, Events.data_id == EventData.data_id)
.where(EventTypes.event_type.in_(event_type_list))
):
event = cast(Events, event)
event_data = cast(EventData, event_data)
event_types = cast(EventTypes, event_types)
native_event = event.to_native()
if event_data:
native_event.data = event_data.to_native()
native_event.event_type = event_types.event_type
events.append(native_event)
return events
events = await instance.async_add_executor_job(_get_events, hass, ["test", "test2"])
assert len(events) == 1
assert events[0].event_type == "test2"
async def test_saving_state_exclude_domains(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
) -> None:
"""Test saving and restoring a state."""
await async_setup_recorder_instance(hass, {"exclude": {"domains": "test"}})
states = await _add_entities(hass, ["test.recorder", "test2.recorder"])
assert len(states) == 1
assert _state_with_context(hass, "test2.recorder").as_dict() == states[0].as_dict()
async def test_saving_state_exclude_domains_globs(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
) -> None:
"""Test saving and restoring a state."""
await async_setup_recorder_instance(
hass, {"exclude": {"domains": "test", "entity_globs": "*.excluded_*"}}
)
states = await _add_entities(
hass, ["test.recorder", "test2.recorder", "test2.excluded_entity"]
)
assert len(states) == 1
assert _state_with_context(hass, "test2.recorder").as_dict() == states[0].as_dict()
async def test_saving_state_exclude_entities(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
) -> None:
"""Test saving and restoring a state."""
await async_setup_recorder_instance(
hass, {"exclude": {"entities": "test.recorder"}}
)
states = await _add_entities(hass, ["test.recorder", "test2.recorder"])
assert len(states) == 1
assert _state_with_context(hass, "test2.recorder").as_dict() == states[0].as_dict()
async def test_saving_state_exclude_domain_include_entity(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
) -> None:
"""Test saving and restoring a state."""
await async_setup_recorder_instance(
hass,
{
"include": {"entities": "test.recorder"},
"exclude": {"domains": "test"},
},
)
states = await _add_entities(hass, ["test.recorder", "test2.recorder"])
assert len(states) == 2
async def test_saving_state_exclude_domain_glob_include_entity(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
) -> None:
"""Test saving and restoring a state."""
await async_setup_recorder_instance(
hass,
{
"include": {"entities": ["test.recorder", "test.excluded_entity"]},
"exclude": {"domains": "test", "entity_globs": "*._excluded_*"},
},
)
states = await _add_entities(
hass, ["test.recorder", "test2.recorder", "test.excluded_entity"]
)
assert len(states) == 3
async def test_saving_state_include_domain_exclude_entity(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
) -> None:
"""Test saving and restoring a state."""
await async_setup_recorder_instance(
hass,
{
"exclude": {"entities": "test.recorder"},
"include": {"domains": "test"},
},
)
states = await _add_entities(hass, ["test.recorder", "test2.recorder", "test.ok"])
assert len(states) == 1
assert _state_with_context(hass, "test.ok").as_dict() == states[0].as_dict()
assert _state_with_context(hass, "test.ok").state == "state2"
async def test_saving_state_include_domain_glob_exclude_entity(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
) -> None:
"""Test saving and restoring a state."""
await async_setup_recorder_instance(
hass,
{
"exclude": {"entities": ["test.recorder", "test2.included_entity"]},
"include": {"domains": "test", "entity_globs": "*._included_*"},
},
)
states = await _add_entities(
hass, ["test.recorder", "test2.recorder", "test.ok", "test2.included_entity"]
)
assert len(states) == 1
assert _state_with_context(hass, "test.ok").as_dict() == states[0].as_dict()
assert _state_with_context(hass, "test.ok").state == "state2"
async def test_saving_state_and_removing_entity(
hass: HomeAssistant,
setup_recorder: None,
) -> None:
"""Test saving the state of a removed entity."""
entity_id = "lock.mine"
hass.states.async_set(entity_id, LockState.LOCKED)
hass.states.async_set(entity_id, LockState.UNLOCKED)
hass.states.async_remove(entity_id)
await async_wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
states = list(
session.query(StatesMeta.entity_id, States.state)
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
.order_by(States.last_updated_ts)
)
assert len(states) == 3
assert states[0].entity_id == entity_id
assert states[0].state == LockState.LOCKED
assert states[1].entity_id == entity_id
assert states[1].state == LockState.UNLOCKED
assert states[2].entity_id == entity_id
assert states[2].state is None
async def test_saving_state_with_oversized_attributes(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
setup_recorder: None,
) -> None:
"""Test saving states is limited to 16KiB of JSON encoded attributes."""
massive_dict = {"a": "b" * 16384}
attributes = {"test_attr": 5, "test_attr_10": "nice"}
hass.states.async_set("switch.sane", "on", attributes)
hass.states.async_set("switch.too_big", "on", massive_dict)
await async_wait_recording_done(hass)
states = []
with session_scope(hass=hass, read_only=True) as session:
for db_state, db_state_attributes, states_meta in (
session.query(States, StateAttributes, StatesMeta)
.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
):
db_state.entity_id = states_meta.entity_id
native_state = db_state.to_native()
native_state.attributes = db_state_attributes.to_native()
states.append(native_state)
assert "switch.too_big" in caplog.text
assert len(states) == 2
assert _state_with_context(hass, "switch.sane").as_dict() == states[0].as_dict()
assert states[1].state == "on"
assert states[1].entity_id == "switch.too_big"
assert states[1].attributes == {}
async def test_saving_event_with_oversized_data(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
setup_recorder: None,
) -> None:
"""Test saving events is limited to 32KiB of JSON encoded data."""
massive_dict = {"a": "b" * 32768}
event_data = {"test_attr": 5, "test_attr_10": "nice"}
hass.bus.async_fire("test_event", event_data)
hass.bus.async_fire("test_event_too_big", massive_dict)
await async_wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
events = {
event_type: data
for _, data, event_type in (
session.query(
Events.event_id, EventData.shared_data, EventTypes.event_type
)
.outerjoin(EventData, Events.data_id == EventData.data_id)
.outerjoin(EventTypes, Events.event_type_id == EventTypes.event_type_id)
.where(EventTypes.event_type.in_(["test_event", "test_event_too_big"]))
)
}
assert "test_event_too_big" in caplog.text
assert len(events) == 2
assert json_loads(events["test_event"]) == event_data
assert json_loads(events["test_event_too_big"]) == {}
async def test_saving_event_invalid_context_ulid(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
setup_recorder: None,
) -> None:
"""Test we handle invalid manually injected context ids."""
event_data = {"test_attr": 5, "test_attr_10": "nice"}
hass.bus.async_fire("test_event", event_data, context=Context(id="invalid"))
await async_wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
events = {
event_type: data
for _, data, event_type in (
session.query(
Events.event_id, EventData.shared_data, EventTypes.event_type
)
.outerjoin(EventData, Events.data_id == EventData.data_id)
.outerjoin(EventTypes, Events.event_type_id == EventTypes.event_type_id)
.where(EventTypes.event_type.in_(["test_event"]))
)
}
assert len(events) == 1
assert json_loads(events["test_event"]) == event_data
async def test_recorder_setup_failure(hass: HomeAssistant) -> None:
"""Test some exceptions."""
recorder_helper.async_initialize_recorder(hass)
with (
patch.object(Recorder, "_setup_connection") as setup,
patch("homeassistant.components.recorder.core.time.sleep"),
):
setup.side_effect = ImportError("driver not found")
rec = _default_recorder(hass)
rec.async_initialize()
rec.start()
rec.join()
hass.stop()
async def test_recorder_validate_schema_failure(hass: HomeAssistant) -> None:
"""Test some exceptions."""
recorder_helper.async_initialize_recorder(hass)
with (
patch(
"homeassistant.components.recorder.migration._get_schema_version"
) as inspect_schema_version,
patch("homeassistant.components.recorder.core.time.sleep"),
):
inspect_schema_version.side_effect = ImportError("driver not found")
rec = _default_recorder(hass)
rec.async_initialize()
rec.start()
rec.join()
hass.stop()
async def test_recorder_setup_failure_without_event_listener(
hass: HomeAssistant,
) -> None:
"""Test recorder setup failure when the event listener is not setup."""
recorder_helper.async_initialize_recorder(hass)
with (
patch.object(Recorder, "_setup_connection") as setup,
patch("homeassistant.components.recorder.core.time.sleep"),
):
setup.side_effect = ImportError("driver not found")
rec = _default_recorder(hass)
rec.start()
rec.join()
hass.stop()
async def test_defaults_set(hass: HomeAssistant) -> None:
"""Test the config defaults are set."""
recorder_config = None
async def mock_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Mock setup."""
nonlocal recorder_config
recorder_config = config["recorder"]
return True
with patch("homeassistant.components.recorder.async_setup", side_effect=mock_setup):
assert await async_setup_component(hass, "history", {})
assert recorder_config is not None
assert recorder_config["auto_purge"]
assert recorder_config["auto_repack"]
assert recorder_config["purge_keep_days"] == 10
async def run_tasks_at_time(hass: HomeAssistant, test_time: datetime) -> None:
"""Advance the clock and wait for any callbacks to finish."""
async_fire_time_changed(hass, test_time)
await hass.async_block_till_done(wait_background_tasks=True)
await async_recorder_block_till_done(hass)
await hass.async_block_till_done(wait_background_tasks=True)
@pytest.mark.parametrize("enable_nightly_purge", [True])
async def test_auto_purge(hass: HomeAssistant, setup_recorder: None) -> None:
"""Test periodic purge scheduling."""
timezone = "Europe/Copenhagen"
await hass.config.async_set_time_zone(timezone)
tz = dt_util.get_time_zone(timezone)
# Purging is scheduled to happen at 4:12am every day. Exercise this behavior by
# firing time changed events and advancing the clock around this time. Pick an
# arbitrary year in the future to avoid boundary conditions relative to the current
# date.
#
# The clock is started at 4:15am then advanced forward below
now = dt_util.utcnow()
test_time = datetime(now.year + 2, 1, 1, 4, 15, 0, tzinfo=tz)
await run_tasks_at_time(hass, test_time)
with (
patch(
"homeassistant.components.recorder.purge.purge_old_data", return_value=True
) as purge_old_data,
patch(
"homeassistant.components.recorder.tasks.periodic_db_cleanups"
) as periodic_db_cleanups,
):
assert len(purge_old_data.mock_calls) == 0
assert len(periodic_db_cleanups.mock_calls) == 0
# Advance one day, and the purge task should run
test_time = test_time + timedelta(days=1)
await run_tasks_at_time(hass, test_time)
assert len(purge_old_data.mock_calls) == 1
assert len(periodic_db_cleanups.mock_calls) == 1
purge_old_data.reset_mock()
periodic_db_cleanups.reset_mock()
# Advance one day, and the purge task should run again
test_time = test_time + timedelta(days=1)
await run_tasks_at_time(hass, test_time)
assert len(purge_old_data.mock_calls) == 1
assert len(periodic_db_cleanups.mock_calls) == 1
purge_old_data.reset_mock()
periodic_db_cleanups.reset_mock()
# Advance less than one full day. The alarm should not yet fire.
test_time = test_time + timedelta(hours=23)
await run_tasks_at_time(hass, test_time)
assert len(purge_old_data.mock_calls) == 0
assert len(periodic_db_cleanups.mock_calls) == 0
# Advance to the next day and fire the alarm again
test_time = test_time + timedelta(hours=1)
await run_tasks_at_time(hass, test_time)
assert len(purge_old_data.mock_calls) == 1
assert len(periodic_db_cleanups.mock_calls) == 1
@pytest.mark.parametrize("enable_nightly_purge", [True])
async def test_auto_purge_auto_repack_on_second_sunday(
hass: HomeAssistant,
setup_recorder: None,
) -> None:
"""Test periodic purge scheduling does a repack on the 2nd sunday."""
timezone = "Europe/Copenhagen"
await hass.config.async_set_time_zone(timezone)
tz = dt_util.get_time_zone(timezone)
# Purging is scheduled to happen at 4:12am every day. Exercise this behavior by
# firing time changed events and advancing the clock around this time. Pick an
# arbitrary year in the future to avoid boundary conditions relative to the current
# date.
#
# The clock is started at 4:15am then advanced forward below
now = dt_util.utcnow()
test_time = datetime(now.year + 2, 1, 1, 4, 15, 0, tzinfo=tz)
await run_tasks_at_time(hass, test_time)
with (
patch(
"homeassistant.components.recorder.core.is_second_sunday", return_value=True
),
patch(
"homeassistant.components.recorder.purge.purge_old_data", return_value=True
) as purge_old_data,
patch(
"homeassistant.components.recorder.tasks.periodic_db_cleanups"
) as periodic_db_cleanups,
):
assert len(purge_old_data.mock_calls) == 0
assert len(periodic_db_cleanups.mock_calls) == 0
# Advance one day, and the purge task should run
test_time = test_time + timedelta(days=1)
await run_tasks_at_time(hass, test_time)
assert len(purge_old_data.mock_calls) == 1
args, _ = purge_old_data.call_args_list[0]
assert args[2] is True # repack
assert len(periodic_db_cleanups.mock_calls) == 1
@pytest.mark.parametrize("enable_nightly_purge", [True])
async def test_auto_purge_auto_repack_disabled_on_second_sunday(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
) -> None:
"""Test periodic purge scheduling does not auto repack on the 2nd sunday if disabled."""
timezone = "Europe/Copenhagen"
await hass.config.async_set_time_zone(timezone)
await async_setup_recorder_instance(hass, {CONF_AUTO_REPACK: False})
tz = dt_util.get_time_zone(timezone)
# Purging is scheduled to happen at 4:12am every day. Exercise this behavior by
# firing time changed events and advancing the clock around this time. Pick an
# arbitrary year in the future to avoid boundary conditions relative to the current
# date.
#
# The clock is started at 4:15am then advanced forward below
now = dt_util.utcnow()
test_time = datetime(now.year + 2, 1, 1, 4, 15, 0, tzinfo=tz)
await run_tasks_at_time(hass, test_time)
with (
patch(
"homeassistant.components.recorder.core.is_second_sunday", return_value=True
),
patch(
"homeassistant.components.recorder.purge.purge_old_data", return_value=True
) as purge_old_data,
patch(
"homeassistant.components.recorder.tasks.periodic_db_cleanups"
) as periodic_db_cleanups,
):
assert len(purge_old_data.mock_calls) == 0
assert len(periodic_db_cleanups.mock_calls) == 0
# Advance one day, and the purge task should run
test_time = test_time + timedelta(days=1)
await run_tasks_at_time(hass, test_time)
assert len(purge_old_data.mock_calls) == 1
args, _ = purge_old_data.call_args_list[0]
assert args[2] is False # repack
assert len(periodic_db_cleanups.mock_calls) == 1
@pytest.mark.parametrize("enable_nightly_purge", [True])
async def test_auto_purge_no_auto_repack_on_not_second_sunday(
hass: HomeAssistant,
setup_recorder: None,
) -> None:
"""Test periodic purge scheduling does not do a repack unless its the 2nd sunday."""
timezone = "Europe/Copenhagen"
await hass.config.async_set_time_zone(timezone)
tz = dt_util.get_time_zone(timezone)
# Purging is scheduled to happen at 4:12am every day. Exercise this behavior by
# firing time changed events and advancing the clock around this time. Pick an
# arbitrary year in the future to avoid boundary conditions relative to the current
# date.
#
# The clock is started at 4:15am then advanced forward below
now = dt_util.utcnow()
test_time = datetime(now.year + 2, 1, 1, 4, 15, 0, tzinfo=tz)
await run_tasks_at_time(hass, test_time)
with (
patch(
"homeassistant.components.recorder.core.is_second_sunday",
return_value=False,
),
patch(
"homeassistant.components.recorder.purge.purge_old_data", return_value=True
) as purge_old_data,
patch(
"homeassistant.components.recorder.tasks.periodic_db_cleanups"
) as periodic_db_cleanups,
):
assert len(purge_old_data.mock_calls) == 0
assert len(periodic_db_cleanups.mock_calls) == 0
# Advance one day, and the purge task should run
test_time = test_time + timedelta(days=1)
await run_tasks_at_time(hass, test_time)
assert len(purge_old_data.mock_calls) == 1
args, _ = purge_old_data.call_args_list[0]
assert args[2] is False # repack
assert len(periodic_db_cleanups.mock_calls) == 1
@pytest.mark.parametrize("enable_nightly_purge", [True])
async def test_auto_purge_disabled(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
) -> None:
"""Test periodic db cleanup still run when auto purge is disabled."""
timezone = "Europe/Copenhagen"
await hass.config.async_set_time_zone(timezone)
await async_setup_recorder_instance(hass, {CONF_AUTO_PURGE: False})
tz = dt_util.get_time_zone(timezone)
# Purging is scheduled to happen at 4:12am every day. We want
# to verify that when auto purge is disabled periodic db cleanups
# are still scheduled
#
# The clock is started at 4:15am then advanced forward below
now = dt_util.utcnow()
test_time = datetime(now.year + 2, 1, 1, 4, 15, 0, tzinfo=tz)
await run_tasks_at_time(hass, test_time)
with (
patch(
"homeassistant.components.recorder.purge.purge_old_data", return_value=True
) as purge_old_data,
patch(
"homeassistant.components.recorder.tasks.periodic_db_cleanups"
) as periodic_db_cleanups,
):
assert len(purge_old_data.mock_calls) == 0
assert len(periodic_db_cleanups.mock_calls) == 0
# Advance one day, and the purge task should run
test_time = test_time + timedelta(days=1)
await run_tasks_at_time(hass, test_time)
assert len(purge_old_data.mock_calls) == 0
assert len(periodic_db_cleanups.mock_calls) == 1
purge_old_data.reset_mock()
periodic_db_cleanups.reset_mock()
@pytest.mark.parametrize("enable_statistics", [True])
async def test_auto_statistics(
hass: HomeAssistant,
setup_recorder: None,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test periodic statistics scheduling."""
timezone = "Europe/Copenhagen"
await hass.config.async_set_time_zone(timezone)
tz = dt_util.get_time_zone(timezone)
stats_5min = []
stats_hourly = []
@callback
def async_5min_stats_updated_listener(event: Event) -> None:
"""Handle recorder 5 min stat updated."""
stats_5min.append(event)
@callback
def async_hourly_stats_updated_listener(event: Event) -> None:
"""Handle recorder 5 min stat updated."""
stats_hourly.append(event)
# Statistics is scheduled to happen every 5 minutes. Exercise this behavior by
# firing time changed events and advancing the clock around this time. Pick an
# arbitrary year in the future to avoid boundary conditions relative to the current
# date.
#
# The clock is started at 4:51am then advanced forward below
now = dt_util.utcnow()
test_time = datetime(now.year + 2, 1, 1, 4, 51, 0, tzinfo=tz)
freezer.move_to(test_time.isoformat())
await run_tasks_at_time(hass, test_time)
hass.bus.async_listen(
EVENT_RECORDER_5MIN_STATISTICS_GENERATED, async_5min_stats_updated_listener
)
hass.bus.async_listen(
EVENT_RECORDER_HOURLY_STATISTICS_GENERATED, async_hourly_stats_updated_listener
)
real_compile_statistics = statistics.compile_statistics
with patch(
"homeassistant.components.recorder.statistics.compile_statistics",
side_effect=real_compile_statistics,
autospec=True,
) as compile_statistics:
# Advance 5 minutes, and the statistics task should run
test_time = test_time + timedelta(minutes=5)
freezer.move_to(test_time.isoformat())
await run_tasks_at_time(hass, test_time)
assert len(compile_statistics.mock_calls) == 1
assert len(stats_5min) == 1
assert len(stats_hourly) == 0
compile_statistics.reset_mock()
# Advance 5 minutes, and the statistics task should run again
test_time = test_time + timedelta(minutes=5, seconds=1)
freezer.move_to(test_time.isoformat())
await run_tasks_at_time(hass, test_time)
assert len(compile_statistics.mock_calls) == 1
assert len(stats_5min) == 2
assert len(stats_hourly) == 1
compile_statistics.reset_mock()
# Advance less than 5 minutes. The task should not run.
test_time = test_time + timedelta(minutes=3)
freezer.move_to(test_time.isoformat())
await run_tasks_at_time(hass, test_time)
assert len(compile_statistics.mock_calls) == 0
assert len(stats_5min) == 2
assert len(stats_hourly) == 1
# Advance 5 minutes, and the statistics task should run again
test_time = test_time + timedelta(minutes=5, seconds=1)
freezer.move_to(test_time.isoformat())
await run_tasks_at_time(hass, test_time)
assert len(compile_statistics.mock_calls) == 1
assert len(stats_5min) == 3
assert len(stats_hourly) == 1
async def test_statistics_runs_initiated(
hass: HomeAssistant, async_setup_recorder_instance: RecorderInstanceGenerator
) -> None:
"""Test statistics_runs is initiated when DB is created."""
now = dt_util.utcnow()
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=now
):
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
statistics_runs = list(session.query(StatisticsRuns))
assert len(statistics_runs) == 1
last_run = process_timestamp(statistics_runs[0].start)
assert process_timestamp(last_run) == now.replace(
minute=now.minute - now.minute % 5, second=0, microsecond=0
) - timedelta(minutes=5)
@pytest.mark.freeze_time("2022-09-13 09:00:00+02:00")
@pytest.mark.parametrize("persistent_database", [True])
@pytest.mark.parametrize("enable_missing_statistics", [True])
@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage
async def test_compile_missing_statistics(
async_test_recorder: RecorderInstanceGenerator, freezer: FrozenDateTimeFactory
) -> None:
"""Test missing statistics are compiled on startup."""
now = dt_util.utcnow().replace(minute=0, second=0, microsecond=0)
def get_statistic_runs(hass: HomeAssistant) -> list:
with session_scope(hass=hass, read_only=True) as session:
return list(session.query(StatisticsRuns))
async with (
async_test_home_assistant() as hass,
async_test_recorder(hass, wait_recorder=False) as instance,
):
await hass.async_start()
await async_wait_recording_done(hass)
await async_wait_recording_done(hass)
statistics_runs = await instance.async_add_executor_job(
get_statistic_runs, hass
)
assert len(statistics_runs) == 1
last_run = process_timestamp(statistics_runs[0].start)
assert last_run == now - timedelta(minutes=5)
await async_wait_recording_done(hass)
await async_wait_recording_done(hass)
await hass.async_stop()
# Start Home Assistant one hour later
stats_5min = []
stats_hourly = []
@callback
def async_5min_stats_updated_listener(event: Event) -> None:
"""Handle recorder 5 min stat updated."""
stats_5min.append(event)
def async_hourly_stats_updated_listener(event: Event) -> None:
"""Handle recorder 5 min stat updated."""
stats_hourly.append(event)
freezer.tick(timedelta(hours=1))
async with (
async_test_home_assistant() as hass,
async_test_recorder(hass, wait_recorder=False) as instance,
):
hass.bus.async_listen(
EVENT_RECORDER_5MIN_STATISTICS_GENERATED, async_5min_stats_updated_listener
)
hass.bus.async_listen(
EVENT_RECORDER_HOURLY_STATISTICS_GENERATED,
async_hourly_stats_updated_listener,
)
await async_wait_recording_done(hass)
await async_wait_recording_done(hass)
statistics_runs = await instance.async_add_executor_job(
get_statistic_runs, hass
)
assert len(statistics_runs) == 13 # 12 5-minute runs
last_run = process_timestamp(statistics_runs[1].start)
assert last_run == now
assert len(stats_5min) == 1
assert len(stats_hourly) == 1
await async_wait_recording_done(hass)
await async_wait_recording_done(hass)
await hass.async_stop()
async def test_saving_sets_old_state(hass: HomeAssistant, setup_recorder: None) -> None:
"""Test saving sets old state."""
hass.states.async_set("test.one", "s1", {})
hass.states.async_set("test.two", "s2", {})
hass.states.async_set("test.one", "s3", {})
hass.states.async_set("test.two", "s4", {})
await async_wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
states = list(
session.query(
StatesMeta.entity_id, States.state_id, States.old_state_id, States.state
).outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
)
assert len(states) == 4
states_by_state = {state.state: state for state in states}
assert states_by_state["s1"].entity_id == "test.one"
assert states_by_state["s2"].entity_id == "test.two"
assert states_by_state["s3"].entity_id == "test.one"
assert states_by_state["s4"].entity_id == "test.two"
assert states_by_state["s1"].old_state_id is None
assert states_by_state["s2"].old_state_id is None
assert states_by_state["s3"].old_state_id == states_by_state["s1"].state_id
assert states_by_state["s4"].old_state_id == states_by_state["s2"].state_id
async def test_saving_state_with_serializable_data(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, setup_recorder: None
) -> None:
"""Test saving data that cannot be serialized does not crash."""
hass.bus.async_fire("bad_event", {"fail": CannotSerializeMe()})
hass.states.async_set("test.one", "s1", {"fail": CannotSerializeMe()})
hass.states.async_set("test.two", "s2", {})
hass.states.async_set("test.two", "s3", {})
await async_wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
states = list(
session.query(
StatesMeta.entity_id, States.state_id, States.old_state_id, States.state
).outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
)
assert len(states) == 2
states_by_state = {state.state: state for state in states}
assert states_by_state["s2"].entity_id == "test.two"
assert states_by_state["s3"].entity_id == "test.two"
assert states_by_state["s2"].old_state_id is None
assert states_by_state["s3"].old_state_id == states_by_state["s2"].state_id
assert "State is not JSON serializable" in caplog.text
async def test_has_services(hass: HomeAssistant, setup_recorder: None) -> None:
"""Test the services exist."""
assert hass.services.has_service(DOMAIN, SERVICE_DISABLE)
assert hass.services.has_service(DOMAIN, SERVICE_ENABLE)
assert hass.services.has_service(DOMAIN, SERVICE_PURGE)
assert hass.services.has_service(DOMAIN, SERVICE_PURGE_ENTITIES)
async def test_service_disable_events_not_recording(
hass: HomeAssistant,
setup_recorder: None,
) -> None:
"""Test that events are not recorded when recorder is disabled using service."""
await hass.services.async_call(
DOMAIN,
SERVICE_DISABLE,
{},
blocking=True,
)
event_type = "EVENT_TEST"
events = []
@callback
def event_listener(event):
"""Record events from eventbus."""
if event.event_type == event_type:
events.append(event)
hass.bus.async_listen(MATCH_ALL, event_listener)
event_data1 = {"test_attr": 5, "test_attr_10": "nice"}
hass.bus.async_fire(event_type, event_data1)
await async_wait_recording_done(hass)
assert len(events) == 1
event = events[0]
with session_scope(hass=hass, read_only=True) as session:
db_events = list(
session.query(Events)
.filter(Events.event_type_id.in_(select_event_type_ids((event_type,))))
.outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id))
)
assert len(db_events) == 0
await hass.services.async_call(
DOMAIN,
SERVICE_ENABLE,
{},
blocking=True,
)
event_data2 = {"attr_one": 5, "attr_two": "nice"}
hass.bus.async_fire(event_type, event_data2)
await async_wait_recording_done(hass)
assert len(events) == 2
assert events[0] != events[1]
assert events[0].data != events[1].data
db_events = []
with session_scope(hass=hass, read_only=True) as session:
for select_event, event_data, event_types in (
session.query(Events, EventData, EventTypes)
.filter(Events.event_type_id.in_(select_event_type_ids((event_type,))))
.outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id))
.outerjoin(EventData, Events.data_id == EventData.data_id)
):
select_event = cast(Events, select_event)
event_data = cast(EventData, event_data)
event_types = cast(EventTypes, event_types)
native_event = select_event.to_native()
native_event.data = event_data.to_native()
native_event.event_type = event_types.event_type
db_events.append(native_event)
assert len(db_events) == 1
db_event = db_events[0]
event = events[1]
assert event.event_type == db_event.event_type
assert event.data == db_event.data
assert event.origin == db_event.origin
assert event.time_fired.replace(microsecond=0) == db_event.time_fired.replace(
microsecond=0
)
async def test_service_disable_states_not_recording(
hass: HomeAssistant,
setup_recorder: None,
) -> None:
"""Test that state changes are not recorded when recorder is disabled using service."""
await hass.services.async_call(
DOMAIN,
SERVICE_DISABLE,
{},
blocking=True,
)
hass.states.async_set("test.one", "on", {})
await async_wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
assert len(list(session.query(States))) == 0
await hass.services.async_call(
DOMAIN,
SERVICE_ENABLE,
{},
blocking=True,
)
hass.states.async_set("test.two", "off", {})
await async_wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
db_states = list(session.query(States))
assert len(db_states) == 1
assert db_states[0].event_id is None
db_states[0].entity_id = "test.two"
assert (
db_states[0].to_native().as_dict()
== _state_with_context(hass, "test.two").as_dict()
)
@pytest.mark.parametrize("persistent_database", [True])
@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage
async def test_service_disable_run_information_recorded(
async_test_recorder: RecorderInstanceGenerator,
) -> None:
"""Test that runs are still recorded when recorder is disabled."""
def get_recorder_runs(hass: HomeAssistant) -> list:
with session_scope(hass=hass, read_only=True) as session:
return list(session.query(RecorderRuns))
async with (
async_test_home_assistant() as hass,
async_test_recorder(hass) as instance,
):
await hass.async_start()
await async_wait_recording_done(hass)
db_run_info = await instance.async_add_executor_job(get_recorder_runs, hass)
assert len(db_run_info) == 1
assert db_run_info[0].start is not None
assert db_run_info[0].end is None
await hass.services.async_call(
DOMAIN,
SERVICE_DISABLE,
{},
blocking=True,
)
await async_wait_recording_done(hass)
await hass.async_stop()
async with (
async_test_home_assistant() as hass,
async_test_recorder(hass) as instance,
):
await hass.async_start()
await async_wait_recording_done(hass)
db_run_info = await instance.async_add_executor_job(get_recorder_runs, hass)
assert len(db_run_info) == 2
assert db_run_info[0].start is not None
assert db_run_info[0].end is not None
assert db_run_info[1].start is not None
assert db_run_info[1].end is None
await hass.async_stop()
class CannotSerializeMe:
"""A class that the JSONEncoder cannot serialize."""
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
@pytest.mark.parametrize("persistent_database", [True])
@pytest.mark.parametrize("recorder_config", [{CONF_COMMIT_INTERVAL: 0}])
async def test_database_corruption_while_running(
hass: HomeAssistant,
recorder_mock: Recorder,
recorder_db_url: str,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test we can recover from sqlite3 db corruption."""
await hass.async_block_till_done()
caplog.clear()
instance = get_instance(hass)
original_start_time = instance.recorder_runs_manager.recording_start
hass.states.async_set("test.lost", "on", {})
sqlite3_exception = DatabaseError("statement", {}, [])
sqlite3_exception.__cause__ = sqlite3.DatabaseError(
"database disk image is malformed"
)
await async_wait_recording_done(hass)
with patch.object(
get_instance(hass).event_session,
"close",
side_effect=OperationalError("statement", {}, []),
):
await async_wait_recording_done(hass)
test_db_file = recorder_db_url.removeprefix("sqlite:///")
await hass.async_add_executor_job(corrupt_db_file, test_db_file)
await async_wait_recording_done(hass)
with patch.object(
get_instance(hass).event_session,
"commit",
side_effect=[sqlite3_exception, None],
):
# This state will not be recorded because
# the database corruption will be discovered
# and we will have to rollback to recover
hass.states.async_set("test.one", "off", {})
await async_wait_recording_done(hass)
assert "Unrecoverable sqlite3 database corruption detected" in caplog.text
assert "The system will rename the corrupt database file" in caplog.text
assert "Connected to recorder database" in caplog.text
# This state should go into the new database
hass.states.async_set("test.two", "on", {})
await async_wait_recording_done(hass)
def _get_last_state():
with session_scope(hass=hass, read_only=True) as session:
db_states = list(session.query(States))
assert len(db_states) == 1
db_states[0].entity_id = "test.two"
assert db_states[0].event_id is None
return db_states[0].to_native()
state = await instance.async_add_executor_job(_get_last_state)
assert state.entity_id == "test.two"
assert state.state == "on"
new_start_time = instance.recorder_runs_manager.recording_start
assert original_start_time < new_start_time
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
await hass.async_block_till_done()
hass.stop()
async def test_entity_id_filter(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
) -> None:
"""Test that entity ID filtering filters string and list."""
await async_setup_recorder_instance(
hass,
{
"include": {"domains": "hello"},
"exclude": {"domains": "hidden_domain"},
},
)
event_types = ("hello",)
for idx, data in enumerate(
(
{},
{"entity_id": "hello.world"},
{"entity_id": ["hello.world"]},
{"entity_id": ["hello.world", "hidden_domain.person"]},
{"entity_id": {"unexpected": "data"}},
)
):
hass.bus.async_fire("hello", data)
await async_wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
db_events = list(
session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(event_types))
)
)
assert len(db_events) == idx + 1, data
for data in (
{"entity_id": "hidden_domain.person"},
{"entity_id": ["hidden_domain.person"]},
):
hass.bus.async_fire("hello", data)
await async_wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
db_events = list(
session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(event_types))
)
)
# Keep referring idx + 1, as no new events are being added
assert len(db_events) == idx + 1, data
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
@pytest.mark.parametrize("persistent_database", [True])
async def test_database_lock_and_unlock(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
) -> None:
"""Test writing events during lock getting written after unlocking.
This test is specific for SQLite: Locking is not implemented for other engines.
Use file DB, in memory DB cannot do write locks.
"""
config = {
recorder.CONF_COMMIT_INTERVAL: 0,
}
await async_setup_recorder_instance(hass, config)
await hass.async_block_till_done()
event_type = "EVENT_TEST"
event_types = (event_type,)
def _get_db_events():
with session_scope(hass=hass, read_only=True) as session:
return list(
session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(event_types))
)
)
instance = get_instance(hass)
assert await instance.lock_database()
assert not await instance.lock_database()
event_data = {"test_attr": 5, "test_attr_10": "nice"}
hass.bus.async_fire(event_type, event_data)
task = asyncio.create_task(async_wait_recording_done(hass))
# Recording can't be finished while lock is held
with pytest.raises(TimeoutError):
await asyncio.wait_for(asyncio.shield(task), timeout=0.25)
db_events = await hass.async_add_executor_job(_get_db_events)
assert len(db_events) == 0
assert instance.unlock_database()
await task
db_events = await instance.async_add_executor_job(_get_db_events)
assert len(db_events) == 1
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
@pytest.mark.parametrize("persistent_database", [True])
async def test_database_lock_and_overflow(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
caplog: pytest.LogCaptureFixture,
issue_registry: ir.IssueRegistry,
) -> None:
"""Test writing events during lock leading to overflow the queue causes the database to unlock.
This test is specific for SQLite: Locking is not implemented for other engines.
Use file DB, in memory DB cannot do write locks.
"""
config = {
recorder.CONF_COMMIT_INTERVAL: 0,
}
def _get_db_events():
with session_scope(hass=hass, read_only=True) as session:
return list(
session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(event_types))
)
)
with (
patch.object(recorder.core, "MAX_QUEUE_BACKLOG_MIN_VALUE", 1),
patch.object(recorder.core, "DB_LOCK_QUEUE_CHECK_TIMEOUT", 0.01),
patch.object(
recorder.core, "MIN_AVAILABLE_MEMORY_FOR_QUEUE_BACKLOG", sys.maxsize
),
):
await async_setup_recorder_instance(hass, config)
await hass.async_block_till_done()
event_type = "EVENT_TEST"
event_types = (event_type,)
instance = get_instance(hass)
await instance.lock_database()
event_data = {"test_attr": 5, "test_attr_10": "nice"}
hass.bus.async_fire(event_type, event_data)
# Check that this causes the queue to overflow and write succeeds
# even before unlocking.
await async_wait_recording_done(hass)
db_events = await instance.async_add_executor_job(_get_db_events)
assert len(db_events) == 1
assert "Database queue backlog reached more than" in caplog.text
assert not instance.unlock_database()
issue = issue_registry.async_get_issue(DOMAIN, "backup_failed_out_of_resources")
assert issue is not None
assert "start_time" in issue.translation_placeholders
start_time = issue.translation_placeholders["start_time"]
assert start_time is not None
# Should be in H:M:S format
assert start_time.count(":") == 2
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
@pytest.mark.parametrize("persistent_database", [True])
async def test_database_lock_and_overflow_checks_available_memory(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
caplog: pytest.LogCaptureFixture,
issue_registry: ir.IssueRegistry,
) -> None:
"""Test writing events during lock leading to overflow the queue causes the database to unlock.
This test is specific for SQLite: Locking is not implemented for other engines.
Use file DB, in memory DB cannot do write locks.
"""
config = {
recorder.CONF_COMMIT_INTERVAL: 0,
}
def _get_db_events():
with session_scope(hass=hass, read_only=True) as session:
return list(
session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(event_types))
)
)
with patch(
"homeassistant.components.recorder.core.QUEUE_CHECK_INTERVAL",
timedelta(seconds=1),
):
await async_setup_recorder_instance(hass, config)
await hass.async_block_till_done()
event_type = "EVENT_TEST"
event_types = (event_type,)
await async_wait_recording_done(hass)
min_available_memory = 256 * 1024**2
out_of_ram = False
def _get_available_memory(*args: Any, **kwargs: Any) -> int:
nonlocal out_of_ram
return min_available_memory / 2 if out_of_ram else min_available_memory
with (
patch.object(recorder.core, "MAX_QUEUE_BACKLOG_MIN_VALUE", 1),
patch.object(
recorder.core,
"MIN_AVAILABLE_MEMORY_FOR_QUEUE_BACKLOG",
min_available_memory,
),
patch.object(recorder.core, "DB_LOCK_QUEUE_CHECK_TIMEOUT", 0.01),
patch.object(
recorder.core.Recorder,
"_available_memory",
side_effect=_get_available_memory,
),
):
instance = get_instance(hass)
assert await instance.lock_database()
db_events = await instance.async_add_executor_job(_get_db_events)
assert len(db_events) == 0
# Record up to the extended limit (which takes into account the available memory)
for _ in range(2):
event_data = {"test_attr": 5, "test_attr_10": "nice"}
hass.bus.async_fire(event_type, event_data)
def _wait_database_unlocked():
return instance._database_lock_task.database_unlock.wait(0.2)
databack_unlocked = await hass.async_add_executor_job(_wait_database_unlocked)
assert not databack_unlocked
db_events = await instance.async_add_executor_job(_get_db_events)
assert len(db_events) == 0
assert "Database queue backlog reached more than" not in caplog.text
out_of_ram = True
# Record beyond the extended limit (which takes into account the available memory)
for _ in range(20):
event_data = {"test_attr": 5, "test_attr_10": "nice"}
hass.bus.async_fire(event_type, event_data)
# Check that this causes the queue to overflow and write succeeds
# even before unlocking.
await async_wait_recording_done(hass)
assert not instance.unlock_database()
assert "Database queue backlog reached more than" in caplog.text
db_events = await instance.async_add_executor_job(_get_db_events)
assert len(db_events) >= 2
issue = issue_registry.async_get_issue(DOMAIN, "backup_failed_out_of_resources")
assert issue is not None
assert "start_time" in issue.translation_placeholders
start_time = issue.translation_placeholders["start_time"]
assert start_time is not None
# Should be in H:M:S format
assert start_time.count(":") == 2
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_database_lock_timeout(
hass: HomeAssistant, setup_recorder: None, recorder_db_url: str
) -> None:
"""Test locking database timeout when recorder stopped.
This test is specific for SQLite: Locking is not implemented for other engines.
"""
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
instance = get_instance(hass)
class BlockQueue(recorder.tasks.RecorderTask):
event: threading.Event = threading.Event()
def run(self, instance: Recorder) -> None:
self.event.wait()
block_task = BlockQueue()
instance.queue_task(block_task)
with patch.object(recorder.core, "DB_LOCK_TIMEOUT", 0.1):
try:
with pytest.raises(TimeoutError):
await instance.lock_database()
finally:
instance.unlock_database()
block_task.event.set()
async def test_database_lock_without_instance(
hass: HomeAssistant, setup_recorder: None
) -> None:
"""Test database lock doesn't fail if instance is not initialized."""
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
instance = get_instance(hass)
with patch.object(instance, "engine"):
try:
assert await instance.lock_database()
finally:
assert instance.unlock_database()
async def test_in_memory_database(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
"""Test connecting to an in-memory recorder is not allowed."""
assert not await async_setup_component(
hass, recorder.DOMAIN, {recorder.DOMAIN: {recorder.CONF_DB_URL: "sqlite://"}}
)
assert "In-memory SQLite database is not supported" in caplog.text
@pytest.mark.parametrize("db_engine", ["mysql"])
async def test_database_connection_keep_alive(
hass: HomeAssistant,
recorder_dialect_name: None,
async_setup_recorder_instance: RecorderInstanceGenerator,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test we keep alive socket based dialects."""
instance = await async_setup_recorder_instance(hass)
# We have to mock this since we don't have a mock
# MySQL server available in tests.
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await instance.async_recorder_ready.wait()
async_fire_time_changed(
hass, dt_util.utcnow() + timedelta(seconds=recorder.core.KEEPALIVE_TIME)
)
await async_wait_recording_done(hass)
assert "Sending keepalive" in caplog.text
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_database_connection_keep_alive_disabled_on_sqlite(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
caplog: pytest.LogCaptureFixture,
recorder_db_url: str,
) -> None:
"""Test we do not do keep alive for sqlite.
This test is specific for SQLite, keepalive runs on other engines.
"""
instance = await async_setup_recorder_instance(hass)
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await instance.async_recorder_ready.wait()
async_fire_time_changed(
hass, dt_util.utcnow() + timedelta(seconds=recorder.core.KEEPALIVE_TIME)
)
await async_wait_recording_done(hass)
assert "Sending keepalive" not in caplog.text
async def test_deduplication_event_data_inside_commit_interval(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, setup_recorder: None
) -> None:
"""Test deduplication of event data inside the commit interval."""
for _ in range(10):
hass.bus.async_fire("this_event", {"de": "dupe"})
for _ in range(10):
hass.bus.async_fire("this_event", {"de": "dupe"})
await async_wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
event_types = ("this_event",)
events = list(
session.query(Events)
.filter(Events.event_type_id.in_(select_event_type_ids(event_types)))
.outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id))
.outerjoin(EventData, (Events.data_id == EventData.data_id))
)
assert len(events) == 20
first_data_id = events[0].data_id
assert all(event.data_id == first_data_id for event in events)
async def test_deduplication_state_attributes_inside_commit_interval(
small_cache_size: None,
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
setup_recorder: None,
) -> None:
"""Test deduplication of state attributes inside the commit interval."""
entity_id = "test.recorder"
attributes = {"test_attr": 5, "test_attr_10": "nice"}
hass.states.async_set(entity_id, "on", attributes)
hass.states.async_set(entity_id, "off", attributes)
# Now exhaust the cache to ensure we go back to the db
for attr_id in range(5):
hass.states.async_set(entity_id, "on", {"test_attr": attr_id})
hass.states.async_set(entity_id, "off", {"test_attr": attr_id})
for _ in range(5):
hass.states.async_set(entity_id, "on", attributes)
hass.states.async_set(entity_id, "off", attributes)
await async_wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
states = list(
session.query(States).outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
)
)
assert len(states) == 22
first_attributes_id = states[0].attributes_id
last_attributes_id = states[-1].attributes_id
assert first_attributes_id == last_attributes_id
async def test_async_block_till_done(
hass: HomeAssistant, async_setup_recorder_instance: RecorderInstanceGenerator
) -> None:
"""Test we can block until recordering is done."""
instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
entity_id = "test.recorder"
attributes = {"test_attr": 5, "test_attr_10": "nice"}
hass.states.async_set(entity_id, "on", attributes)
hass.states.async_set(entity_id, "off", attributes)
def _fetch_states():
with session_scope(hass=hass, read_only=True) as session:
return list(session.query(States))
await async_block_recorder(hass, 0.1)
await instance.async_block_till_done()
states = await instance.async_add_executor_job(_fetch_states)
assert len(states) == 2
await hass.async_block_till_done()
@pytest.mark.parametrize(
("db_url", "echo"),
[
("sqlite://blabla", None),
("mariadb://blabla", False),
("mysql://blabla", False),
("mariadb+pymysql://blabla", False),
("mysql+pymysql://blabla", False),
("postgresql://blabla", False),
],
)
async def test_disable_echo(
hass: HomeAssistant, db_url, echo, caplog: pytest.LogCaptureFixture
) -> None:
"""Test echo is disabled for non sqlite databases."""
recorder_helper.async_initialize_recorder(hass)
class MockEvent:
def listen(self, _, _2, callback):
callback(None, None)
mock_event = MockEvent()
with (
patch(
"homeassistant.components.recorder.core.create_engine"
) as create_engine_mock,
patch("homeassistant.components.recorder.core.sqlalchemy_event", mock_event),
):
await async_setup_component(hass, DOMAIN, {DOMAIN: {CONF_DB_URL: db_url}})
create_engine_mock.assert_called_once()
assert create_engine_mock.mock_calls[0][2].get("echo") == echo
@pytest.mark.parametrize(
("config_url", "expected_connect_args"),
[
(
"mariadb://user:password@SERVER_IP/DB_NAME",
{"charset": "utf8mb4"},
),
(
"mariadb+pymysql://user:password@SERVER_IP/DB_NAME",
{"charset": "utf8mb4"},
),
(
"mysql://user:password@SERVER_IP/DB_NAME",
{"charset": "utf8mb4"},
),
(
"mysql+pymysql://user:password@SERVER_IP/DB_NAME",
{"charset": "utf8mb4"},
),
(
"mysql://user:password@SERVER_IP/DB_NAME?charset=utf8mb4",
{"charset": "utf8mb4"},
),
(
"mysql://user:password@SERVER_IP/DB_NAME?blah=bleh&charset=other",
{"charset": "utf8mb4"},
),
(
"postgresql://blabla",
{},
),
(
"sqlite://blabla",
{},
),
],
)
async def test_mysql_missing_utf8mb4(
hass: HomeAssistant, config_url, expected_connect_args
) -> None:
"""Test recorder fails to setup if charset=utf8mb4 is missing from db_url."""
recorder_helper.async_initialize_recorder(hass)
class MockEvent:
def listen(self, _, _2, callback):
callback(None, None)
mock_event = MockEvent()
with (
patch(
"homeassistant.components.recorder.core.create_engine"
) as create_engine_mock,
patch("homeassistant.components.recorder.core.sqlalchemy_event", mock_event),
):
await async_setup_component(hass, DOMAIN, {DOMAIN: {CONF_DB_URL: config_url}})
create_engine_mock.assert_called_once()
connect_args = create_engine_mock.mock_calls[0][2].get("connect_args", {})
for key, value in expected_connect_args.items():
assert connect_args[key] == value
@pytest.mark.parametrize(
"config_url",
[
"mysql://user:password@SERVER_IP/DB_NAME",
"mysql://user:password@SERVER_IP/DB_NAME?charset=utf8mb4",
"mysql://user:password@SERVER_IP/DB_NAME?blah=bleh&charset=other",
],
)
async def test_connect_args_priority(hass: HomeAssistant, config_url) -> None:
"""Test connect_args has priority over URL query."""
connect_params = []
recorder_helper.async_initialize_recorder(hass)
class MockDialect:
"""Non functioning dialect, good enough that SQLAlchemy tries connecting."""
__bases__ = []
_has_events = False
def __init__(self, *args: Any, **kwargs: Any) -> None: ...
@property
def is_async(self):
return False
def connect(self, *args, **params):
nonlocal connect_params
connect_params.append(params)
return True
def create_connect_args(self, url):
return ([], {"charset": "invalid"})
@property
def name(self) -> str:
return "mysql"
@classmethod
def import_dbapi(cls): ...
def engine_created(*args): ...
def get_dialect_pool_class(self, *args):
return QueuePool
def initialize(*args): ...
def on_connect_url(self, url):
return False
def _builtin_onconnect(self): ...
class MockEntrypoint:
def engine_created(*_): ...
def get_dialect_cls(*_):
return MockDialect
with (
patch("sqlalchemy.engine.url.URL._get_entrypoint", MockEntrypoint),
patch("sqlalchemy.engine.create.util.get_cls_kwargs", return_value=["echo"]),
):
await async_setup_component(
hass,
DOMAIN,
{
DOMAIN: {
CONF_DB_URL: config_url,
CONF_DB_MAX_RETRIES: 1,
CONF_DB_RETRY_WAIT: 0,
}
},
)
assert connect_params[0]["charset"] == "utf8mb4"
async def test_excluding_attributes_by_integration(
hass: HomeAssistant,
entity_registry: er.EntityRegistry,
setup_recorder: None,
) -> None:
"""Test that an entity can exclude attributes from being recorded."""
state = "restoring_from_db"
attributes = {"test_attr": 5, "excluded_component": 10, "excluded_integration": 20}
mock_platform(
hass,
"fake_integration.recorder",
Mock(exclude_attributes=lambda hass: {"excluded"}),
)
hass.config.components.add("fake_integration")
hass.bus.async_fire(EVENT_COMPONENT_LOADED, {"component": "fake_integration"})
await hass.async_block_till_done()
class EntityWithExcludedAttributes(MockEntity):
_entity_component_unrecorded_attributes = frozenset({"excluded_component"})
_unrecorded_attributes = frozenset({"excluded_integration"})
entity_id = "test.fake_integration_recorder"
entity_platform = MockEntityPlatform(hass, platform_name="fake_integration")
entity = EntityWithExcludedAttributes(
entity_id=entity_id,
extra_state_attributes=attributes,
)
await entity_platform.async_add_entities([entity])
await hass.async_block_till_done()
await async_wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
db_states = []
for db_state, db_state_attributes, states_meta in (
session.query(States, StateAttributes, StatesMeta)
.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
):
db_state.entity_id = states_meta.entity_id
db_states.append(db_state)
state = db_state.to_native()
state.attributes = db_state_attributes.to_native()
assert len(db_states) == 1
assert db_states[0].event_id is None
expected = _state_with_context(hass, entity_id)
expected.attributes = {"test_attr": 5}
assert state.as_dict() == expected.as_dict()
async def test_excluding_all_attributes_by_integration(
hass: HomeAssistant,
entity_registry: er.EntityRegistry,
setup_recorder: None,
) -> None:
"""Test that an entity can exclude all attributes from being recorded using MATCH_ALL."""
state = "restoring_from_db"
attributes = {
"test_attr": 5,
"excluded_component": 10,
"excluded_integration": 20,
"device_class": "test",
"state_class": "test",
"friendly_name": "Test entity",
"unit_of_measurement": "mm",
}
mock_platform(
hass,
"fake_integration.recorder",
Mock(exclude_attributes=lambda hass: {"excluded"}),
)
hass.config.components.add("fake_integration")
hass.bus.async_fire(EVENT_COMPONENT_LOADED, {"component": "fake_integration"})
await hass.async_block_till_done()
class EntityWithExcludedAttributes(MockEntity):
_unrecorded_attributes = frozenset({MATCH_ALL})
entity_id = "test.fake_integration_recorder"
entity_platform = MockEntityPlatform(hass, platform_name="fake_integration")
entity = EntityWithExcludedAttributes(
entity_id=entity_id,
extra_state_attributes=attributes,
)
await entity_platform.async_add_entities([entity])
await hass.async_block_till_done()
await async_wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
db_states = []
for db_state, db_state_attributes, states_meta in (
session.query(States, StateAttributes, StatesMeta)
.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
):
db_state.entity_id = states_meta.entity_id
db_states.append(db_state)
state = db_state.to_native()
state.attributes = db_state_attributes.to_native()
assert len(db_states) == 1
assert db_states[0].event_id is None
expected = _state_with_context(hass, entity_id)
expected.attributes = {
"device_class": "test",
"state_class": "test",
"friendly_name": "Test entity",
"unit_of_measurement": "mm",
}
assert state.as_dict() == expected.as_dict()
async def test_lru_increases_with_many_entities(
small_cache_size: None, hass: HomeAssistant, setup_recorder: None
) -> None:
"""Test that the recorder's internal LRU cache increases with many entities."""
mock_entity_count = 16
for idx in range(mock_entity_count):
hass.states.async_set(f"test.entity{idx}", "on")
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(minutes=10))
await async_wait_recording_done(hass)
instance = get_instance(hass)
assert instance.state_attributes_manager._id_map.get_size() == mock_entity_count * 2
assert instance.states_meta_manager._id_map.get_size() == mock_entity_count * 2
async def test_clean_shutdown_when_recorder_thread_raises_during_initialize_database(
hass: HomeAssistant,
) -> None:
"""Test we still shutdown cleanly when the recorder thread raises during initialize_database."""
with (
patch.object(migration, "initialize_database", side_effect=Exception),
patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True),
):
if recorder.DOMAIN not in hass.data:
recorder_helper.async_initialize_recorder(hass)
assert not await async_setup_component(
hass,
recorder.DOMAIN,
{
recorder.DOMAIN: {
CONF_DB_URL: "sqlite://",
CONF_DB_RETRY_WAIT: 0,
CONF_DB_MAX_RETRIES: 1,
}
},
)
await hass.async_block_till_done()
instance = recorder.get_instance(hass)
await hass.async_stop()
assert instance.engine is None
async def test_clean_shutdown_when_recorder_thread_raises_during_validate_db_schema(
hass: HomeAssistant,
) -> None:
"""Test we still shutdown cleanly when the recorder thread raises during validate_db_schema."""
with (
patch.object(migration, "validate_db_schema", side_effect=Exception),
patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True),
):
if recorder.DOMAIN not in hass.data:
recorder_helper.async_initialize_recorder(hass)
assert not await async_setup_component(
hass,
recorder.DOMAIN,
{
recorder.DOMAIN: {
CONF_DB_URL: "sqlite://",
CONF_DB_RETRY_WAIT: 0,
CONF_DB_MAX_RETRIES: 1,
}
},
)
await hass.async_block_till_done()
instance = recorder.get_instance(hass)
await hass.async_stop()
assert instance.engine is None
@pytest.mark.parametrize(
("func_to_patch", "expected_setup_result"),
[("migrate_schema_non_live", False), ("migrate_schema_live", False)],
)
async def test_clean_shutdown_when_schema_migration_fails(
hass: HomeAssistant, func_to_patch: str, expected_setup_result: bool
) -> None:
"""Test we still shutdown cleanly when schema migration fails."""
with (
patch.object(
migration,
"validate_db_schema",
return_value=MagicMock(valid=False, current_version=1),
),
patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True),
patch.object(
migration,
func_to_patch,
side_effect=Exception,
),
):
if recorder.DOMAIN not in hass.data:
recorder_helper.async_initialize_recorder(hass)
setup_result = await async_setup_component(
hass,
recorder.DOMAIN,
{
recorder.DOMAIN: {
CONF_DB_URL: "sqlite://",
CONF_DB_RETRY_WAIT: 0,
CONF_DB_MAX_RETRIES: 1,
}
},
)
assert setup_result == expected_setup_result
await hass.async_block_till_done()
instance = recorder.get_instance(hass)
await hass.async_stop()
assert instance.engine is None
async def test_events_are_recorded_until_final_write(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
) -> None:
"""Test that events are recorded until the final write."""
instance = await async_setup_recorder_instance(hass, {})
await hass.async_block_till_done()
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
await hass.async_block_till_done()
hass.bus.async_fire("fake_event")
await async_wait_recording_done(hass)
def get_events() -> list[Event]:
events: list[Event] = []
with session_scope(hass=hass, read_only=True) as session:
for select_event, event_types in (
session.query(Events, EventTypes)
.filter(
Events.event_type_id.in_(
select_event_type_ids(("fake_event", "after_final_write"))
)
)
.outerjoin(
EventTypes, (Events.event_type_id == EventTypes.event_type_id)
)
):
select_event = cast(Events, select_event)
event_types = cast(EventTypes, event_types)
native_event = select_event.to_native()
native_event.event_type = event_types.event_type
events.append(native_event)
return events
events = await instance.async_add_executor_job(get_events)
assert len(events) == 1
db_event = events[0]
assert db_event.event_type == "fake_event"
hass.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE)
await hass.async_block_till_done()
assert not instance.engine
async def test_commit_before_commits_pending_writes(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
recorder_db_url: str,
) -> None:
"""Test commit_before with a non-zero commit interval.
All of our test run with a commit interval of 0 by
default, so we need to test this with a non-zero commit
"""
config = {
recorder.CONF_DB_URL: recorder_db_url,
recorder.CONF_COMMIT_INTERVAL: 60,
}
recorder_helper.async_initialize_recorder(hass)
hass.async_create_task(async_setup_recorder_instance(hass, config))
await recorder_helper.async_wait_recorder(hass)
instance = get_instance(hass)
assert instance.commit_interval == 60
verify_states_in_queue_future = hass.loop.create_future()
verify_session_commit_future = hass.loop.create_future()
class VerifyCommitBeforeTask(recorder.tasks.RecorderTask):
"""Task to verify that commit before ran.
If commit_before is true, we should have no pending writes.
"""
commit_before = True
def run(self, instance: Recorder) -> None:
if not instance._event_session_has_pending_writes:
hass.loop.call_soon_threadsafe(
verify_session_commit_future.set_result, None
)
return
hass.loop.call_soon_threadsafe(
verify_session_commit_future.set_exception,
RuntimeError("Session still has pending write"),
)
class VerifyStatesInQueueTask(recorder.tasks.RecorderTask):
"""Task to verify that states are in the queue."""
commit_before = False
def run(self, instance: Recorder) -> None:
if instance._event_session_has_pending_writes:
hass.loop.call_soon_threadsafe(
verify_states_in_queue_future.set_result, None
)
return
hass.loop.call_soon_threadsafe(
verify_states_in_queue_future.set_exception,
RuntimeError("Session has no pending write"),
)
# First insert an event
instance.queue_task(Event("fake_event"))
# Next verify that the event session has pending writes
instance.queue_task(VerifyStatesInQueueTask())
# Finally, verify that the session was committed
instance.queue_task(VerifyCommitBeforeTask())
await verify_states_in_queue_future
await verify_session_commit_future
async def test_all_tables_use_default_table_args(hass: HomeAssistant) -> None:
"""Test that all tables use the default table args."""
for table in db_schema.Base.metadata.tables.values():
assert table.kwargs.items() >= db_schema._DEFAULT_TABLE_ARGS.items()
async def test_empty_entity_id(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test the recorder can handle an empty entity_id."""
await async_setup_recorder_instance(
hass,
{
"exclude": {"domains": "hidden_domain"},
},
)
hass.bus.async_fire("hello", {"entity_id": ""})
await async_wait_recording_done(hass)
assert "Invalid entity ID" not in caplog.text