mirror of https://github.com/home-assistant/core
2233 lines
78 KiB
Python
2233 lines
78 KiB
Python
"""Test data purging."""
|
|
|
|
from collections.abc import Generator
|
|
from datetime import datetime, timedelta
|
|
import json
|
|
import sqlite3
|
|
from unittest.mock import patch
|
|
|
|
from freezegun import freeze_time
|
|
import pytest
|
|
from sqlalchemy.exc import DatabaseError, OperationalError
|
|
from sqlalchemy.orm.session import Session
|
|
from voluptuous.error import MultipleInvalid
|
|
|
|
from homeassistant.components.recorder import DOMAIN as RECORDER_DOMAIN, Recorder
|
|
from homeassistant.components.recorder.const import SupportedDialect
|
|
from homeassistant.components.recorder.db_schema import (
|
|
Events,
|
|
EventTypes,
|
|
RecorderRuns,
|
|
StateAttributes,
|
|
States,
|
|
StatesMeta,
|
|
StatisticsRuns,
|
|
StatisticsShortTerm,
|
|
)
|
|
from homeassistant.components.recorder.history import get_significant_states
|
|
from homeassistant.components.recorder.purge import purge_old_data
|
|
from homeassistant.components.recorder.queries import select_event_type_ids
|
|
from homeassistant.components.recorder.services import (
|
|
SERVICE_PURGE,
|
|
SERVICE_PURGE_ENTITIES,
|
|
)
|
|
from homeassistant.components.recorder.tasks import PurgeTask
|
|
from homeassistant.components.recorder.util import session_scope
|
|
from homeassistant.const import EVENT_STATE_CHANGED, EVENT_THEMES_UPDATED, STATE_ON
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.util import dt as dt_util
|
|
|
|
from .common import (
|
|
async_recorder_block_till_done,
|
|
async_wait_purge_done,
|
|
async_wait_recording_done,
|
|
convert_pending_events_to_event_types,
|
|
convert_pending_states_to_meta,
|
|
)
|
|
|
|
from tests.typing import RecorderInstanceGenerator
|
|
|
|
TEST_EVENT_TYPES = (
|
|
"EVENT_TEST_AUTOPURGE",
|
|
"EVENT_TEST_PURGE",
|
|
"EVENT_TEST",
|
|
"EVENT_TEST_AUTOPURGE_WITH_EVENT_DATA",
|
|
"EVENT_TEST_PURGE_WITH_EVENT_DATA",
|
|
"EVENT_TEST_WITH_EVENT_DATA",
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
async def mock_recorder_before_hass(
|
|
async_test_recorder: RecorderInstanceGenerator,
|
|
) -> None:
|
|
"""Set up recorder."""
|
|
|
|
|
|
@pytest.fixture(name="use_sqlite")
|
|
def mock_use_sqlite(request: pytest.FixtureRequest) -> Generator[None]:
|
|
"""Pytest fixture to switch purge method."""
|
|
with patch(
|
|
"homeassistant.components.recorder.core.Recorder.dialect_name",
|
|
return_value=SupportedDialect.SQLITE
|
|
if request.param
|
|
else SupportedDialect.MYSQL,
|
|
):
|
|
yield
|
|
|
|
|
|
async def test_purge_big_database(hass: HomeAssistant, recorder_mock: Recorder) -> None:
|
|
"""Test deleting 2/3 old states from a big database."""
|
|
for _ in range(12):
|
|
await _add_test_states(hass, wait_recording_done=False)
|
|
await async_wait_recording_done(hass)
|
|
|
|
with (
|
|
patch.object(recorder_mock, "max_bind_vars", 72),
|
|
patch.object(recorder_mock.database_engine, "max_bind_vars", 72),
|
|
):
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
state_attributes = session.query(StateAttributes)
|
|
assert states.count() == 72
|
|
assert state_attributes.count() == 3
|
|
|
|
purge_before = dt_util.utcnow() - timedelta(days=4)
|
|
|
|
finished = purge_old_data(
|
|
recorder_mock,
|
|
purge_before,
|
|
states_batch_size=1,
|
|
events_batch_size=1,
|
|
repack=False,
|
|
)
|
|
assert not finished
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
state_attributes = session.query(StateAttributes)
|
|
assert states.count() == 24
|
|
assert state_attributes.count() == 1
|
|
|
|
|
|
async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) -> None:
|
|
"""Test deleting old states."""
|
|
await _add_test_states(hass)
|
|
|
|
# make sure we start with 6 states
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
state_attributes = session.query(StateAttributes)
|
|
|
|
assert states.count() == 6
|
|
assert states[0].old_state_id is None
|
|
assert states[5].old_state_id == states[4].state_id
|
|
assert state_attributes.count() == 3
|
|
|
|
events = session.query(Events).filter(Events.event_type == "state_changed")
|
|
assert events.count() == 0
|
|
|
|
assert "test.recorder2" in recorder_mock.states_manager._last_committed_id
|
|
|
|
purge_before = dt_util.utcnow() - timedelta(days=4)
|
|
|
|
# run purge_old_data()
|
|
finished = purge_old_data(
|
|
recorder_mock,
|
|
purge_before,
|
|
states_batch_size=1,
|
|
events_batch_size=1,
|
|
repack=False,
|
|
)
|
|
assert not finished
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
state_attributes = session.query(StateAttributes)
|
|
assert states.count() == 2
|
|
assert state_attributes.count() == 1
|
|
|
|
assert "test.recorder2" in recorder_mock.states_manager._last_committed_id
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states_after_purge = list(session.query(States))
|
|
# Since these states are deleted in batches, we can't guarantee the order
|
|
# but we can look them up by state
|
|
state_map_by_state = {state.state: state for state in states_after_purge}
|
|
dontpurgeme_5 = state_map_by_state["dontpurgeme_5"]
|
|
dontpurgeme_4 = state_map_by_state["dontpurgeme_4"]
|
|
|
|
assert dontpurgeme_5.old_state_id == dontpurgeme_4.state_id
|
|
assert dontpurgeme_4.old_state_id is None
|
|
|
|
finished = purge_old_data(recorder_mock, purge_before, repack=False)
|
|
assert finished
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
state_attributes = session.query(StateAttributes)
|
|
assert states.count() == 2
|
|
assert state_attributes.count() == 1
|
|
|
|
assert "test.recorder2" in recorder_mock.states_manager._last_committed_id
|
|
|
|
# run purge_old_data again
|
|
purge_before = dt_util.utcnow()
|
|
finished = purge_old_data(
|
|
recorder_mock,
|
|
purge_before,
|
|
states_batch_size=1,
|
|
events_batch_size=1,
|
|
repack=False,
|
|
)
|
|
assert not finished
|
|
|
|
with session_scope(hass=hass) as session:
|
|
assert states.count() == 0
|
|
assert state_attributes.count() == 0
|
|
|
|
assert "test.recorder2" not in recorder_mock.states_manager._last_committed_id
|
|
|
|
# Add some more states
|
|
await _add_test_states(hass)
|
|
|
|
# make sure we start with 6 states
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 6
|
|
assert states[0].old_state_id is None
|
|
assert states[5].old_state_id == states[4].state_id
|
|
|
|
events = session.query(Events).filter(Events.event_type == "state_changed")
|
|
assert events.count() == 0
|
|
assert "test.recorder2" in recorder_mock.states_manager._last_committed_id
|
|
|
|
state_attributes = session.query(StateAttributes)
|
|
assert state_attributes.count() == 3
|
|
|
|
|
|
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
|
|
@pytest.mark.usefixtures("recorder_mock", "skip_by_db_engine")
|
|
async def test_purge_old_states_encouters_database_corruption(
|
|
hass: HomeAssistant,
|
|
) -> None:
|
|
"""Test database image image is malformed while deleting old states.
|
|
|
|
This test is specific for SQLite, wiping the database on error only happens
|
|
with SQLite.
|
|
"""
|
|
await _add_test_states(hass)
|
|
await async_wait_recording_done(hass)
|
|
|
|
sqlite3_exception = DatabaseError("statement", {}, [])
|
|
sqlite3_exception.__cause__ = sqlite3.DatabaseError("not a database")
|
|
|
|
with (
|
|
patch(
|
|
"homeassistant.components.recorder.core.move_away_broken_database"
|
|
) as move_away,
|
|
patch(
|
|
"homeassistant.components.recorder.purge.purge_old_data",
|
|
side_effect=sqlite3_exception,
|
|
),
|
|
):
|
|
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, {"keep_days": 0})
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
|
|
assert move_away.called
|
|
|
|
# Ensure the whole database was reset due to the database error
|
|
with session_scope(hass=hass) as session:
|
|
states_after_purge = session.query(States)
|
|
assert states_after_purge.count() == 0
|
|
|
|
|
|
async def test_purge_old_states_encounters_temporary_mysql_error(
|
|
hass: HomeAssistant,
|
|
recorder_mock: Recorder,
|
|
caplog: pytest.LogCaptureFixture,
|
|
) -> None:
|
|
"""Test retry on specific mysql operational errors."""
|
|
await _add_test_states(hass)
|
|
await async_wait_recording_done(hass)
|
|
|
|
mysql_exception = OperationalError("statement", {}, [])
|
|
mysql_exception.orig = Exception(1205, "retryable")
|
|
|
|
with (
|
|
patch("homeassistant.components.recorder.util.time.sleep") as sleep_mock,
|
|
patch(
|
|
"homeassistant.components.recorder.purge._purge_old_recorder_runs",
|
|
side_effect=[mysql_exception, None],
|
|
),
|
|
patch.object(recorder_mock.engine.dialect, "name", "mysql"),
|
|
):
|
|
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, {"keep_days": 0})
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
await async_wait_recording_done(hass)
|
|
|
|
assert "retrying" in caplog.text
|
|
assert sleep_mock.called
|
|
|
|
|
|
@pytest.mark.usefixtures("recorder_mock")
|
|
async def test_purge_old_states_encounters_operational_error(
|
|
hass: HomeAssistant,
|
|
caplog: pytest.LogCaptureFixture,
|
|
) -> None:
|
|
"""Test error on operational errors that are not mysql does not retry."""
|
|
await _add_test_states(hass)
|
|
await async_wait_recording_done(hass)
|
|
|
|
exception = OperationalError("statement", {}, [])
|
|
|
|
with patch(
|
|
"homeassistant.components.recorder.purge._purge_old_recorder_runs",
|
|
side_effect=exception,
|
|
):
|
|
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, {"keep_days": 0})
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
await async_wait_recording_done(hass)
|
|
|
|
assert "retrying" not in caplog.text
|
|
assert "Error executing purge" in caplog.text
|
|
|
|
|
|
async def test_purge_old_events(hass: HomeAssistant, recorder_mock: Recorder) -> None:
|
|
"""Test deleting old events."""
|
|
await _add_test_events(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
events = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
|
|
)
|
|
assert events.count() == 6
|
|
|
|
purge_before = dt_util.utcnow() - timedelta(days=4)
|
|
|
|
# run purge_old_data()
|
|
finished = purge_old_data(
|
|
recorder_mock,
|
|
purge_before,
|
|
repack=False,
|
|
events_batch_size=1,
|
|
states_batch_size=1,
|
|
)
|
|
assert not finished
|
|
|
|
with session_scope(hass=hass) as session:
|
|
events = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
|
|
)
|
|
all_events = events.all()
|
|
assert events.count() == 2, f"Should have 2 events left: {all_events}"
|
|
|
|
# we should only have 2 events left
|
|
finished = purge_old_data(
|
|
recorder_mock,
|
|
purge_before,
|
|
repack=False,
|
|
events_batch_size=1,
|
|
states_batch_size=1,
|
|
)
|
|
assert finished
|
|
|
|
with session_scope(hass=hass) as session:
|
|
events = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
|
|
)
|
|
assert events.count() == 2
|
|
|
|
|
|
async def test_purge_old_recorder_runs(
|
|
hass: HomeAssistant, recorder_mock: Recorder
|
|
) -> None:
|
|
"""Test deleting old recorder runs keeps current run."""
|
|
await _add_test_recorder_runs(hass)
|
|
|
|
# make sure we start with 7 recorder runs
|
|
with session_scope(hass=hass) as session:
|
|
recorder_runs = session.query(RecorderRuns)
|
|
assert recorder_runs.count() == 7
|
|
|
|
purge_before = dt_util.utcnow()
|
|
|
|
# run purge_old_data()
|
|
finished = purge_old_data(
|
|
recorder_mock,
|
|
purge_before,
|
|
repack=False,
|
|
events_batch_size=1,
|
|
states_batch_size=1,
|
|
)
|
|
assert not finished
|
|
|
|
finished = purge_old_data(
|
|
recorder_mock,
|
|
purge_before,
|
|
repack=False,
|
|
events_batch_size=1,
|
|
states_batch_size=1,
|
|
)
|
|
assert finished
|
|
|
|
with session_scope(hass=hass) as session:
|
|
recorder_runs = session.query(RecorderRuns)
|
|
assert recorder_runs.count() == 1
|
|
|
|
|
|
async def test_purge_old_statistics_runs(
|
|
hass: HomeAssistant, recorder_mock: Recorder
|
|
) -> None:
|
|
"""Test deleting old statistics runs keeps the latest run."""
|
|
await _add_test_statistics_runs(hass)
|
|
|
|
# make sure we start with 7 statistics runs
|
|
with session_scope(hass=hass) as session:
|
|
statistics_runs = session.query(StatisticsRuns)
|
|
assert statistics_runs.count() == 7
|
|
|
|
purge_before = dt_util.utcnow()
|
|
|
|
# run purge_old_data()
|
|
finished = purge_old_data(recorder_mock, purge_before, repack=False)
|
|
assert not finished
|
|
|
|
finished = purge_old_data(recorder_mock, purge_before, repack=False)
|
|
assert finished
|
|
|
|
with session_scope(hass=hass) as session:
|
|
statistics_runs = session.query(StatisticsRuns)
|
|
assert statistics_runs.count() == 1
|
|
|
|
|
|
@pytest.mark.parametrize("use_sqlite", [True, False], indirect=True)
|
|
@pytest.mark.usefixtures("recorder_mock")
|
|
async def test_purge_method(
|
|
hass: HomeAssistant,
|
|
caplog: pytest.LogCaptureFixture,
|
|
use_sqlite: bool,
|
|
) -> None:
|
|
"""Test purge method."""
|
|
|
|
def assert_recorder_runs_equal(run1, run2):
|
|
assert run1.run_id == run2.run_id
|
|
assert run1.start == run2.start
|
|
assert run1.end == run2.end
|
|
assert run1.closed_incorrect == run2.closed_incorrect
|
|
assert run1.created == run2.created
|
|
|
|
def assert_statistic_runs_equal(run1, run2):
|
|
assert run1.run_id == run2.run_id
|
|
assert run1.start == run2.start
|
|
|
|
service_data = {"keep_days": 4}
|
|
await _add_test_events(hass)
|
|
await _add_test_states(hass)
|
|
await _add_test_statistics(hass)
|
|
await _add_test_recorder_runs(hass)
|
|
await _add_test_statistics_runs(hass)
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
|
|
# make sure we start with 6 states
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 6
|
|
|
|
events = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
|
|
)
|
|
assert events.count() == 6
|
|
|
|
statistics = session.query(StatisticsShortTerm)
|
|
assert statistics.count() == 6
|
|
|
|
recorder_runs = session.query(RecorderRuns)
|
|
assert recorder_runs.count() == 7
|
|
runs_before_purge = recorder_runs.all()
|
|
|
|
statistics_runs = session.query(StatisticsRuns).order_by(StatisticsRuns.run_id)
|
|
assert statistics_runs.count() == 7
|
|
statistic_runs_before_purge = statistics_runs.all()
|
|
|
|
for itm in runs_before_purge:
|
|
session.expunge(itm)
|
|
for itm in statistic_runs_before_purge:
|
|
session.expunge(itm)
|
|
|
|
await hass.async_block_till_done()
|
|
await async_wait_purge_done(hass)
|
|
|
|
# run purge method - no service data, use defaults
|
|
await hass.services.async_call("recorder", "purge")
|
|
await hass.async_block_till_done()
|
|
|
|
# Small wait for recorder thread
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
events = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
|
|
)
|
|
statistics = session.query(StatisticsShortTerm)
|
|
|
|
# only purged old states, events and statistics
|
|
assert states.count() == 4
|
|
assert events.count() == 4
|
|
assert statistics.count() == 4
|
|
|
|
# run purge method - correct service data
|
|
await hass.services.async_call("recorder", "purge", service_data=service_data)
|
|
await hass.async_block_till_done()
|
|
|
|
# Small wait for recorder thread
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
events = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
|
|
)
|
|
statistics = session.query(StatisticsShortTerm)
|
|
recorder_runs = session.query(RecorderRuns)
|
|
statistics_runs = session.query(StatisticsRuns)
|
|
|
|
# we should only have 2 states, events and statistics left after purging
|
|
assert states.count() == 2
|
|
assert events.count() == 2
|
|
assert statistics.count() == 2
|
|
|
|
# now we should only have 3 recorder runs left
|
|
runs = recorder_runs.all()
|
|
assert_recorder_runs_equal(runs[0], runs_before_purge[0])
|
|
assert_recorder_runs_equal(runs[1], runs_before_purge[5])
|
|
assert_recorder_runs_equal(runs[2], runs_before_purge[6])
|
|
|
|
# now we should only have 3 statistics runs left
|
|
runs = statistics_runs.all()
|
|
assert_statistic_runs_equal(runs[0], statistic_runs_before_purge[0])
|
|
assert_statistic_runs_equal(runs[1], statistic_runs_before_purge[5])
|
|
assert_statistic_runs_equal(runs[2], statistic_runs_before_purge[6])
|
|
|
|
assert "EVENT_TEST_PURGE" not in (event.event_type for event in events.all())
|
|
|
|
# run purge method - correct service data, with repack
|
|
service_data["repack"] = True
|
|
await hass.services.async_call("recorder", "purge", service_data=service_data)
|
|
await hass.async_block_till_done()
|
|
await async_wait_purge_done(hass)
|
|
assert (
|
|
"Vacuuming SQL DB to free space" in caplog.text
|
|
or "Optimizing SQL DB to free space" in caplog.text
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("use_sqlite", [True, False], indirect=True)
|
|
async def test_purge_edge_case(
|
|
hass: HomeAssistant,
|
|
recorder_mock: Recorder,
|
|
use_sqlite: bool,
|
|
) -> None:
|
|
"""Test states and events are purged even if they occurred shortly before purge_before."""
|
|
|
|
async def _add_db_entries(hass: HomeAssistant, timestamp: datetime) -> None:
|
|
with session_scope(hass=hass) as session:
|
|
session.add(
|
|
Events(
|
|
event_id=1001,
|
|
event_type="EVENT_TEST_PURGE",
|
|
event_data="{}",
|
|
origin="LOCAL",
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
session.add(
|
|
States(
|
|
entity_id="test.recorder2",
|
|
state="purgeme",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
event_id=1001,
|
|
attributes_id=1002,
|
|
)
|
|
)
|
|
session.add(
|
|
StateAttributes(
|
|
shared_attrs="{}",
|
|
hash=1234,
|
|
attributes_id=1002,
|
|
)
|
|
)
|
|
convert_pending_events_to_event_types(recorder_mock, session)
|
|
convert_pending_states_to_meta(recorder_mock, session)
|
|
|
|
await async_wait_purge_done(hass)
|
|
|
|
service_data = {"keep_days": 2}
|
|
timestamp = dt_util.utcnow() - timedelta(days=2, minutes=1)
|
|
|
|
await _add_db_entries(hass, timestamp)
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 1
|
|
|
|
state_attributes = session.query(StateAttributes)
|
|
assert state_attributes.count() == 1
|
|
|
|
events = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
|
|
)
|
|
assert events.count() == 1
|
|
|
|
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
|
|
await hass.async_block_till_done()
|
|
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 0
|
|
events = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
|
|
)
|
|
assert events.count() == 0
|
|
|
|
|
|
async def test_purge_cutoff_date(hass: HomeAssistant, recorder_mock: Recorder) -> None:
|
|
"""Test states and events are purged only if they occurred before "now() - keep_days"."""
|
|
|
|
async def _add_db_entries(hass: HomeAssistant, cutoff: datetime, rows: int) -> None:
|
|
timestamp_keep = cutoff
|
|
timestamp_purge = cutoff - timedelta(microseconds=1)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
session.add(
|
|
Events(
|
|
event_id=1000,
|
|
event_type="KEEP",
|
|
event_data="{}",
|
|
origin="LOCAL",
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp_keep),
|
|
)
|
|
)
|
|
session.add(
|
|
States(
|
|
entity_id="test.cutoff",
|
|
state="keep",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp_keep),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp_keep),
|
|
event_id=1000,
|
|
attributes_id=1000,
|
|
)
|
|
)
|
|
session.add(
|
|
StateAttributes(
|
|
shared_attrs="{}",
|
|
hash=1234,
|
|
attributes_id=1000,
|
|
)
|
|
)
|
|
for row in range(1, rows):
|
|
session.add(
|
|
Events(
|
|
event_id=1000 + row,
|
|
event_type="PURGE",
|
|
event_data="{}",
|
|
origin="LOCAL",
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp_purge),
|
|
)
|
|
)
|
|
session.add(
|
|
States(
|
|
entity_id="test.cutoff",
|
|
state="purge",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp_purge),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp_purge),
|
|
event_id=1000 + row,
|
|
attributes_id=1000 + row,
|
|
)
|
|
)
|
|
session.add(
|
|
StateAttributes(
|
|
shared_attrs="{}",
|
|
hash=1234,
|
|
attributes_id=1000 + row,
|
|
)
|
|
)
|
|
convert_pending_events_to_event_types(recorder_mock, session)
|
|
convert_pending_states_to_meta(recorder_mock, session)
|
|
|
|
await async_wait_purge_done(hass)
|
|
|
|
service_data = {"keep_days": 2}
|
|
|
|
# Force multiple purge batches to be run
|
|
rows = 999
|
|
cutoff = dt_util.utcnow() - timedelta(days=service_data["keep_days"])
|
|
await _add_db_entries(hass, cutoff, rows)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
state_attributes = session.query(StateAttributes)
|
|
assert states.filter(States.state == "purge").count() == rows - 1
|
|
assert states.filter(States.state == "keep").count() == 1
|
|
assert (
|
|
state_attributes.outerjoin(
|
|
States, StateAttributes.attributes_id == States.attributes_id
|
|
)
|
|
.filter(States.state == "keep")
|
|
.count()
|
|
== 1
|
|
)
|
|
assert (
|
|
session.query(Events)
|
|
.filter(Events.event_type_id.in_(select_event_type_ids(("PURGE",))))
|
|
.count()
|
|
== rows - 1
|
|
)
|
|
assert (
|
|
session.query(Events)
|
|
.filter(Events.event_type_id.in_(select_event_type_ids(("KEEP",))))
|
|
.count()
|
|
== 1
|
|
)
|
|
|
|
recorder_mock.queue_task(PurgeTask(cutoff, repack=False, apply_filter=False))
|
|
await hass.async_block_till_done()
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
state_attributes = session.query(StateAttributes)
|
|
session.query(Events)
|
|
assert states.filter(States.state == "purge").count() == 0
|
|
assert (
|
|
state_attributes.outerjoin(
|
|
States, StateAttributes.attributes_id == States.attributes_id
|
|
)
|
|
.filter(States.state == "purge")
|
|
.count()
|
|
== 0
|
|
)
|
|
assert states.filter(States.state == "keep").count() == 1
|
|
assert (
|
|
state_attributes.outerjoin(
|
|
States, StateAttributes.attributes_id == States.attributes_id
|
|
)
|
|
.filter(States.state == "keep")
|
|
.count()
|
|
== 1
|
|
)
|
|
assert (
|
|
session.query(Events)
|
|
.filter(Events.event_type_id.in_(select_event_type_ids(("PURGE",))))
|
|
.count()
|
|
== 0
|
|
)
|
|
assert (
|
|
session.query(Events)
|
|
.filter(Events.event_type_id.in_(select_event_type_ids(("KEEP",))))
|
|
.count()
|
|
== 1
|
|
)
|
|
|
|
# Make sure we can purge everything
|
|
recorder_mock.queue_task(
|
|
PurgeTask(dt_util.utcnow(), repack=False, apply_filter=False)
|
|
)
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
state_attributes = session.query(StateAttributes)
|
|
assert states.count() == 0
|
|
assert state_attributes.count() == 0
|
|
|
|
# Make sure we can purge everything when the db is already empty
|
|
recorder_mock.queue_task(
|
|
PurgeTask(dt_util.utcnow(), repack=False, apply_filter=False)
|
|
)
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
state_attributes = session.query(StateAttributes)
|
|
assert states.count() == 0
|
|
assert state_attributes.count() == 0
|
|
|
|
|
|
@pytest.mark.parametrize("use_sqlite", [True, False], indirect=True)
|
|
@pytest.mark.parametrize(
|
|
"recorder_config", [{"exclude": {"entities": ["sensor.excluded"]}}]
|
|
)
|
|
async def test_purge_filtered_states(
|
|
hass: HomeAssistant,
|
|
recorder_mock: Recorder,
|
|
use_sqlite: bool,
|
|
) -> None:
|
|
"""Test filtered states are purged."""
|
|
assert recorder_mock.entity_filter("sensor.excluded") is False
|
|
|
|
def _add_db_entries(hass: HomeAssistant) -> None:
|
|
with session_scope(hass=hass) as session:
|
|
# Add states and state_changed events that should be purged
|
|
for days in range(1, 4):
|
|
timestamp = dt_util.utcnow() - timedelta(days=days)
|
|
for event_id in range(1000, 1020):
|
|
_add_state_with_state_attributes(
|
|
session,
|
|
"sensor.excluded",
|
|
"purgeme",
|
|
timestamp,
|
|
event_id * days,
|
|
)
|
|
# Add state **without** state_changed event that should be purged
|
|
timestamp = dt_util.utcnow() - timedelta(days=1)
|
|
session.add(
|
|
States(
|
|
entity_id="sensor.excluded",
|
|
state="purgeme",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
# Add states and state_changed events that should be keeped
|
|
timestamp = dt_util.utcnow() - timedelta(days=2)
|
|
for event_id in range(200, 210):
|
|
_add_state_with_state_attributes(
|
|
session,
|
|
"sensor.keep",
|
|
"keep",
|
|
timestamp,
|
|
event_id,
|
|
)
|
|
# Add states with linked old_state_ids that need to be handled
|
|
timestamp = dt_util.utcnow() - timedelta(days=0)
|
|
state_attrs = StateAttributes(
|
|
hash=0,
|
|
shared_attrs=json.dumps(
|
|
{"sensor.linked_old_state_id": "sensor.linked_old_state_id"}
|
|
),
|
|
)
|
|
state_1 = States(
|
|
entity_id="sensor.linked_old_state_id",
|
|
state="keep",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
old_state_id=1,
|
|
state_attributes=state_attrs,
|
|
)
|
|
timestamp = dt_util.utcnow() - timedelta(days=4)
|
|
state_2 = States(
|
|
entity_id="sensor.linked_old_state_id",
|
|
state="keep",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
old_state_id=2,
|
|
state_attributes=state_attrs,
|
|
)
|
|
state_3 = States(
|
|
entity_id="sensor.linked_old_state_id",
|
|
state="keep",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
old_state_id=62, # keep
|
|
state_attributes=state_attrs,
|
|
)
|
|
session.add_all((state_attrs, state_1, state_2, state_3))
|
|
# Add event that should be keeped
|
|
session.add(
|
|
Events(
|
|
event_id=100,
|
|
event_type="EVENT_KEEP",
|
|
event_data="{}",
|
|
origin="LOCAL",
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
convert_pending_states_to_meta(recorder_mock, session)
|
|
convert_pending_events_to_event_types(recorder_mock, session)
|
|
|
|
service_data = {"keep_days": 10}
|
|
_add_db_entries(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 74
|
|
events_keep = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(("EVENT_KEEP",)))
|
|
)
|
|
assert events_keep.count() == 1
|
|
|
|
# Normal purge doesn't remove excluded entities
|
|
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
|
|
await hass.async_block_till_done()
|
|
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 74
|
|
events_keep = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(("EVENT_KEEP",)))
|
|
)
|
|
assert events_keep.count() == 1
|
|
|
|
# Test with 'apply_filter' = True
|
|
service_data["apply_filter"] = True
|
|
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
|
|
await hass.async_block_till_done()
|
|
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 13
|
|
events_keep = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(("EVENT_KEEP",)))
|
|
)
|
|
assert events_keep.count() == 1
|
|
|
|
states_sensor_excluded = (
|
|
session.query(States)
|
|
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
|
|
.filter(StatesMeta.entity_id == "sensor.excluded")
|
|
)
|
|
assert states_sensor_excluded.count() == 0
|
|
|
|
assert (
|
|
session.query(States).filter(States.state_id == 72).first().old_state_id
|
|
is None
|
|
)
|
|
assert (
|
|
session.query(States).filter(States.state_id == 72).first().attributes_id
|
|
== 71
|
|
)
|
|
assert (
|
|
session.query(States).filter(States.state_id == 73).first().old_state_id
|
|
is None
|
|
)
|
|
assert (
|
|
session.query(States).filter(States.state_id == 73).first().attributes_id
|
|
== 71
|
|
)
|
|
|
|
final_keep_state = session.query(States).filter(States.state_id == 74).first()
|
|
assert final_keep_state.old_state_id == 62 # should have been kept
|
|
assert final_keep_state.attributes_id == 71
|
|
|
|
assert session.query(StateAttributes).count() == 11
|
|
|
|
# Do it again to make sure nothing changes
|
|
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
final_keep_state = session.query(States).filter(States.state_id == 74).first()
|
|
assert final_keep_state.old_state_id == 62 # should have been kept
|
|
assert final_keep_state.attributes_id == 71
|
|
|
|
assert session.query(StateAttributes).count() == 11
|
|
|
|
service_data = {"keep_days": 0}
|
|
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
remaining = list(session.query(States))
|
|
for state in remaining:
|
|
assert state.event_id is None
|
|
assert len(remaining) == 0
|
|
assert session.query(StateAttributes).count() == 0
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"recorder_config", [{"exclude": {"entities": ["sensor.excluded"]}}]
|
|
)
|
|
async def test_purge_filtered_states_multiple_rounds(
|
|
hass: HomeAssistant,
|
|
recorder_mock: Recorder,
|
|
caplog: pytest.LogCaptureFixture,
|
|
) -> None:
|
|
"""Test filtered states are purged when there are multiple rounds to purge."""
|
|
assert recorder_mock.entity_filter("sensor.excluded") is False
|
|
|
|
def _add_db_entries(hass: HomeAssistant) -> None:
|
|
with session_scope(hass=hass) as session:
|
|
# Add states and state_changed events that should be purged
|
|
for days in range(1, 4):
|
|
timestamp = dt_util.utcnow() - timedelta(days=days)
|
|
for event_id in range(1000, 1020):
|
|
_add_state_with_state_attributes(
|
|
session,
|
|
"sensor.excluded",
|
|
"purgeme",
|
|
timestamp,
|
|
event_id * days,
|
|
)
|
|
# Add state **without** state_changed event that should be purged
|
|
timestamp = dt_util.utcnow() - timedelta(days=1)
|
|
session.add(
|
|
States(
|
|
entity_id="sensor.excluded",
|
|
state="purgeme",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
# Add states and state_changed events that should be keeped
|
|
timestamp = dt_util.utcnow() - timedelta(days=2)
|
|
for event_id in range(200, 210):
|
|
_add_state_with_state_attributes(
|
|
session,
|
|
"sensor.keep",
|
|
"keep",
|
|
timestamp,
|
|
event_id,
|
|
)
|
|
# Add states with linked old_state_ids that need to be handled
|
|
timestamp = dt_util.utcnow() - timedelta(days=0)
|
|
state_attrs = StateAttributes(
|
|
hash=0,
|
|
shared_attrs=json.dumps(
|
|
{"sensor.linked_old_state_id": "sensor.linked_old_state_id"}
|
|
),
|
|
)
|
|
state_1 = States(
|
|
entity_id="sensor.linked_old_state_id",
|
|
state="keep",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
old_state_id=1,
|
|
state_attributes=state_attrs,
|
|
)
|
|
timestamp = dt_util.utcnow() - timedelta(days=4)
|
|
state_2 = States(
|
|
entity_id="sensor.linked_old_state_id",
|
|
state="keep",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
old_state_id=2,
|
|
state_attributes=state_attrs,
|
|
)
|
|
state_3 = States(
|
|
entity_id="sensor.linked_old_state_id",
|
|
state="keep",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
old_state_id=62, # keep
|
|
state_attributes=state_attrs,
|
|
)
|
|
session.add_all((state_attrs, state_1, state_2, state_3))
|
|
# Add event that should be keeped
|
|
session.add(
|
|
Events(
|
|
event_id=100,
|
|
event_type="EVENT_KEEP",
|
|
event_data="{}",
|
|
origin="LOCAL",
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
convert_pending_states_to_meta(recorder_mock, session)
|
|
convert_pending_events_to_event_types(recorder_mock, session)
|
|
|
|
service_data = {"keep_days": 10, "apply_filter": True}
|
|
_add_db_entries(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 74
|
|
events_keep = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(("EVENT_KEEP",)))
|
|
)
|
|
assert events_keep.count() == 1
|
|
|
|
await hass.services.async_call(
|
|
RECORDER_DOMAIN, SERVICE_PURGE, service_data, blocking=True
|
|
)
|
|
|
|
for _ in range(2):
|
|
# Make sure the second round of purging runs
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
assert "Cleanup filtered data hasn't fully completed yet" in caplog.text
|
|
caplog.clear()
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 13
|
|
events_keep = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(("EVENT_KEEP",)))
|
|
)
|
|
assert events_keep.count() == 1
|
|
|
|
states_sensor_excluded = (
|
|
session.query(States)
|
|
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
|
|
.filter(StatesMeta.entity_id == "sensor.excluded")
|
|
)
|
|
assert states_sensor_excluded.count() == 0
|
|
query = session.query(States)
|
|
|
|
assert query.filter(States.state_id == 72).first().old_state_id is None
|
|
assert query.filter(States.state_id == 72).first().attributes_id == 71
|
|
assert query.filter(States.state_id == 73).first().old_state_id is None
|
|
assert query.filter(States.state_id == 73).first().attributes_id == 71
|
|
|
|
final_keep_state = session.query(States).filter(States.state_id == 74).first()
|
|
assert final_keep_state.old_state_id == 62 # should have been kept
|
|
assert final_keep_state.attributes_id == 71
|
|
|
|
assert session.query(StateAttributes).count() == 11
|
|
|
|
# Do it again to make sure nothing changes
|
|
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
final_keep_state = session.query(States).filter(States.state_id == 74).first()
|
|
assert final_keep_state.old_state_id == 62 # should have been kept
|
|
assert final_keep_state.attributes_id == 71
|
|
|
|
assert session.query(StateAttributes).count() == 11
|
|
|
|
for _ in range(2):
|
|
# Make sure the second round of purging runs
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
assert "Cleanup filtered data hasn't fully completed yet" not in caplog.text
|
|
|
|
|
|
@pytest.mark.parametrize("use_sqlite", [True, False], indirect=True)
|
|
@pytest.mark.parametrize(
|
|
"recorder_config", [{"exclude": {"entities": ["sensor.excluded"]}}]
|
|
)
|
|
async def test_purge_filtered_states_to_empty(
|
|
hass: HomeAssistant,
|
|
recorder_mock: Recorder,
|
|
use_sqlite: bool,
|
|
) -> None:
|
|
"""Test filtered states are purged all the way to an empty db."""
|
|
assert recorder_mock.entity_filter("sensor.excluded") is False
|
|
|
|
def _add_db_entries(hass: HomeAssistant) -> None:
|
|
with session_scope(hass=hass) as session:
|
|
# Add states and state_changed events that should be purged
|
|
for days in range(1, 4):
|
|
timestamp = dt_util.utcnow() - timedelta(days=days)
|
|
for event_id in range(1000, 1020):
|
|
_add_state_with_state_attributes(
|
|
session,
|
|
"sensor.excluded",
|
|
"purgeme",
|
|
timestamp,
|
|
event_id * days,
|
|
)
|
|
convert_pending_states_to_meta(recorder_mock, session)
|
|
|
|
service_data = {"keep_days": 10}
|
|
_add_db_entries(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
state_attributes = session.query(StateAttributes)
|
|
assert states.count() == 60
|
|
assert state_attributes.count() == 60
|
|
|
|
# Test with 'apply_filter' = True
|
|
service_data["apply_filter"] = True
|
|
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
state_attributes = session.query(StateAttributes)
|
|
assert states.count() == 0
|
|
assert state_attributes.count() == 0
|
|
|
|
# Do it again to make sure nothing changes
|
|
# Why do we do this? Should we check the end result?
|
|
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
|
|
@pytest.mark.parametrize("use_sqlite", [True, False], indirect=True)
|
|
@pytest.mark.parametrize(
|
|
"recorder_config", [{"exclude": {"entities": ["sensor.old_format"]}}]
|
|
)
|
|
async def test_purge_without_state_attributes_filtered_states_to_empty(
|
|
hass: HomeAssistant,
|
|
recorder_mock: Recorder,
|
|
use_sqlite: bool,
|
|
) -> None:
|
|
"""Test filtered legacy states without state attributes are purged all the way to an empty db."""
|
|
assert recorder_mock.entity_filter("sensor.old_format") is False
|
|
|
|
def _add_db_entries(hass: HomeAssistant) -> None:
|
|
with session_scope(hass=hass) as session:
|
|
# Add states and state_changed events that should be purged
|
|
# in the legacy format
|
|
timestamp = dt_util.utcnow() - timedelta(days=5)
|
|
event_id = 1021
|
|
session.add(
|
|
States(
|
|
entity_id="sensor.old_format",
|
|
state=STATE_ON,
|
|
attributes=json.dumps({"old": "not_using_state_attributes"}),
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
event_id=event_id,
|
|
state_attributes=None,
|
|
)
|
|
)
|
|
session.add(
|
|
Events(
|
|
event_id=event_id,
|
|
event_type=EVENT_STATE_CHANGED,
|
|
event_data="{}",
|
|
origin="LOCAL",
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
session.add(
|
|
Events(
|
|
event_id=event_id + 1,
|
|
event_type=EVENT_THEMES_UPDATED,
|
|
event_data="{}",
|
|
origin="LOCAL",
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
convert_pending_states_to_meta(recorder_mock, session)
|
|
convert_pending_events_to_event_types(recorder_mock, session)
|
|
|
|
service_data = {"keep_days": 10}
|
|
_add_db_entries(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
state_attributes = session.query(StateAttributes)
|
|
assert states.count() == 1
|
|
assert state_attributes.count() == 0
|
|
|
|
# Test with 'apply_filter' = True
|
|
service_data["apply_filter"] = True
|
|
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
state_attributes = session.query(StateAttributes)
|
|
assert states.count() == 0
|
|
assert state_attributes.count() == 0
|
|
|
|
# Do it again to make sure nothing changes
|
|
# Why do we do this? Should we check the end result?
|
|
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"recorder_config", [{"exclude": {"event_types": ["EVENT_PURGE"]}}]
|
|
)
|
|
async def test_purge_filtered_events(
|
|
hass: HomeAssistant, recorder_mock: Recorder
|
|
) -> None:
|
|
"""Test filtered events are purged."""
|
|
await async_wait_recording_done(hass)
|
|
|
|
def _add_db_entries(hass: HomeAssistant) -> None:
|
|
with session_scope(hass=hass) as session:
|
|
# Add events that should be purged
|
|
for days in range(1, 4):
|
|
timestamp = dt_util.utcnow() - timedelta(days=days)
|
|
for event_id in range(1000, 1020):
|
|
session.add(
|
|
Events(
|
|
event_id=event_id * days,
|
|
event_type="EVENT_PURGE",
|
|
event_data="{}",
|
|
origin="LOCAL",
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
|
|
# Add states and state_changed events that should be keeped
|
|
timestamp = dt_util.utcnow() - timedelta(days=1)
|
|
for event_id in range(200, 210):
|
|
_add_state_with_state_attributes(
|
|
session,
|
|
"sensor.keep",
|
|
"keep",
|
|
timestamp,
|
|
event_id,
|
|
)
|
|
convert_pending_events_to_event_types(recorder_mock, session)
|
|
convert_pending_states_to_meta(recorder_mock, session)
|
|
|
|
service_data = {"keep_days": 10}
|
|
await recorder_mock.async_add_executor_job(_add_db_entries, hass)
|
|
await async_wait_recording_done(hass)
|
|
|
|
with session_scope(hass=hass, read_only=True) as session:
|
|
events_purge = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(("EVENT_PURGE",)))
|
|
)
|
|
states = session.query(States)
|
|
assert events_purge.count() == 60
|
|
assert states.count() == 10
|
|
|
|
# Normal purge doesn't remove excluded events
|
|
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
|
|
await hass.async_block_till_done()
|
|
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass, read_only=True) as session:
|
|
events_purge = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(("EVENT_PURGE",)))
|
|
)
|
|
states = session.query(States)
|
|
assert events_purge.count() == 60
|
|
assert states.count() == 10
|
|
|
|
# Test with 'apply_filter' = True
|
|
service_data["apply_filter"] = True
|
|
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
|
|
await hass.async_block_till_done()
|
|
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass, read_only=True) as session:
|
|
events_purge = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(("EVENT_PURGE",)))
|
|
)
|
|
states = session.query(States)
|
|
assert events_purge.count() == 0
|
|
assert states.count() == 10
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"recorder_config",
|
|
[
|
|
{
|
|
"exclude": {
|
|
"event_types": ["excluded_event"],
|
|
"entities": ["sensor.excluded", "sensor.old_format"],
|
|
}
|
|
}
|
|
],
|
|
)
|
|
async def test_purge_filtered_events_state_changed(
|
|
hass: HomeAssistant, recorder_mock: Recorder
|
|
) -> None:
|
|
"""Test filtered state_changed events are purged. This should also remove all states."""
|
|
# Assert entity_id is NOT excluded
|
|
assert recorder_mock.entity_filter("sensor.excluded") is False
|
|
assert recorder_mock.entity_filter("sensor.old_format") is False
|
|
assert recorder_mock.entity_filter("sensor.keep") is True
|
|
assert "excluded_event" in recorder_mock.exclude_event_types
|
|
|
|
def _add_db_entries(hass: HomeAssistant) -> None:
|
|
with session_scope(hass=hass) as session:
|
|
# Add states and state_changed events that should be purged
|
|
for days in range(1, 4):
|
|
timestamp = dt_util.utcnow() - timedelta(days=days)
|
|
for event_id in range(1000, 1020):
|
|
_add_state_with_state_attributes(
|
|
session,
|
|
"sensor.excluded",
|
|
"purgeme",
|
|
timestamp,
|
|
event_id * days,
|
|
)
|
|
# Add events that should be keeped
|
|
timestamp = dt_util.utcnow() - timedelta(days=1)
|
|
for event_id in range(200, 210):
|
|
session.add(
|
|
Events(
|
|
event_id=event_id,
|
|
event_type="EVENT_KEEP",
|
|
event_data="{}",
|
|
origin="LOCAL",
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
# Add states with linked old_state_ids that need to be handled
|
|
timestamp = dt_util.utcnow() - timedelta(days=0)
|
|
state_1 = States(
|
|
entity_id="sensor.linked_old_state_id",
|
|
state="keep",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
old_state_id=1,
|
|
)
|
|
timestamp = dt_util.utcnow() - timedelta(days=4)
|
|
state_2 = States(
|
|
entity_id="sensor.linked_old_state_id",
|
|
state="keep",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
old_state_id=2,
|
|
)
|
|
state_3 = States(
|
|
entity_id="sensor.linked_old_state_id",
|
|
state="keep",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
old_state_id=62, # keep
|
|
)
|
|
session.add_all((state_1, state_2, state_3))
|
|
session.add(
|
|
Events(
|
|
event_id=231,
|
|
event_type="excluded_event",
|
|
event_data="{}",
|
|
origin="LOCAL",
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
session.add(
|
|
States(
|
|
entity_id="sensor.old_format",
|
|
state="remove",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
convert_pending_events_to_event_types(recorder_mock, session)
|
|
convert_pending_states_to_meta(recorder_mock, session)
|
|
|
|
service_data = {"keep_days": 10, "apply_filter": True}
|
|
_add_db_entries(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
events_keep = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(("EVENT_KEEP",)))
|
|
)
|
|
events_purge = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(("excluded_event",)))
|
|
)
|
|
states = session.query(States)
|
|
|
|
assert events_keep.count() == 10
|
|
assert events_purge.count() == 1
|
|
assert states.count() == 64
|
|
|
|
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
|
|
await hass.async_block_till_done()
|
|
|
|
for _ in range(4):
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
events_keep = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(("EVENT_KEEP",)))
|
|
)
|
|
events_purge = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(("excluded_event",)))
|
|
)
|
|
states = session.query(States)
|
|
|
|
assert events_keep.count() == 10
|
|
assert events_purge.count() == 0
|
|
assert states.count() == 3
|
|
|
|
assert (
|
|
session.query(States).filter(States.state_id == 61).first().old_state_id
|
|
is None
|
|
)
|
|
assert (
|
|
session.query(States).filter(States.state_id == 62).first().old_state_id
|
|
is None
|
|
)
|
|
assert (
|
|
session.query(States).filter(States.state_id == 63).first().old_state_id
|
|
== 62
|
|
) # should have been kept
|
|
|
|
|
|
async def test_purge_entities(hass: HomeAssistant, recorder_mock: Recorder) -> None:
|
|
"""Test purging of specific entities."""
|
|
|
|
async def _purge_entities(
|
|
hass: HomeAssistant, entity_ids: str, domains: str, entity_globs: str
|
|
) -> None:
|
|
service_data = {
|
|
"entity_id": entity_ids,
|
|
"domains": domains,
|
|
"entity_globs": entity_globs,
|
|
}
|
|
|
|
await hass.services.async_call(
|
|
RECORDER_DOMAIN, SERVICE_PURGE_ENTITIES, service_data
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
def _add_purge_records(hass: HomeAssistant) -> None:
|
|
with session_scope(hass=hass) as session:
|
|
# Add states and state_changed events that should be purged
|
|
for days in range(1, 4):
|
|
timestamp = dt_util.utcnow() - timedelta(days=days)
|
|
for event_id in range(1000, 1020):
|
|
_add_state_with_state_attributes(
|
|
session,
|
|
"sensor.purge_entity",
|
|
"purgeme",
|
|
timestamp,
|
|
event_id * days,
|
|
)
|
|
timestamp = dt_util.utcnow() - timedelta(days=days)
|
|
for event_id in range(10000, 10020):
|
|
_add_state_with_state_attributes(
|
|
session,
|
|
"purge_domain.entity",
|
|
"purgeme",
|
|
timestamp,
|
|
event_id * days,
|
|
)
|
|
timestamp = dt_util.utcnow() - timedelta(days=days)
|
|
for event_id in range(100000, 100020):
|
|
_add_state_with_state_attributes(
|
|
session,
|
|
"binary_sensor.purge_glob",
|
|
"purgeme",
|
|
timestamp,
|
|
event_id * days,
|
|
)
|
|
convert_pending_states_to_meta(recorder_mock, session)
|
|
convert_pending_events_to_event_types(recorder_mock, session)
|
|
|
|
def _add_keep_records(hass: HomeAssistant) -> None:
|
|
with session_scope(hass=hass) as session:
|
|
# Add states and state_changed events that should be kept
|
|
timestamp = dt_util.utcnow() - timedelta(days=2)
|
|
for event_id in range(200, 210):
|
|
_add_state_with_state_attributes(
|
|
session,
|
|
"sensor.keep",
|
|
"keep",
|
|
timestamp,
|
|
event_id,
|
|
)
|
|
convert_pending_states_to_meta(recorder_mock, session)
|
|
convert_pending_events_to_event_types(recorder_mock, session)
|
|
|
|
_add_purge_records(hass)
|
|
_add_keep_records(hass)
|
|
|
|
# Confirm standard service call
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 190
|
|
|
|
await _purge_entities(hass, "sensor.purge_entity", "purge_domain", "*purge_glob")
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 10
|
|
|
|
states_sensor_kept = (
|
|
session.query(States)
|
|
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
|
|
.filter(StatesMeta.entity_id == "sensor.keep")
|
|
)
|
|
assert states_sensor_kept.count() == 10
|
|
|
|
_add_purge_records(hass)
|
|
|
|
# Confirm each parameter purges only the associated records
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 190
|
|
|
|
await _purge_entities(hass, "sensor.purge_entity", [], [])
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 130
|
|
|
|
await _purge_entities(hass, [], "purge_domain", [])
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 70
|
|
|
|
await _purge_entities(hass, [], [], "*purge_glob")
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 10
|
|
|
|
states_sensor_kept = (
|
|
session.query(States)
|
|
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
|
|
.filter(StatesMeta.entity_id == "sensor.keep")
|
|
)
|
|
assert states_sensor_kept.count() == 10
|
|
|
|
# sensor.keep should remain in the StatesMeta table
|
|
states_meta_remain = session.query(StatesMeta).filter(
|
|
StatesMeta.entity_id == "sensor.keep"
|
|
)
|
|
assert states_meta_remain.count() == 1
|
|
|
|
# sensor.purge_entity should be removed from the StatesMeta table
|
|
states_meta_remain = session.query(StatesMeta).filter(
|
|
StatesMeta.entity_id == "sensor.purge_entity"
|
|
)
|
|
assert states_meta_remain.count() == 0
|
|
|
|
_add_purge_records(hass)
|
|
|
|
# Confirm calling service without arguments is invalid
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 190
|
|
|
|
with pytest.raises(MultipleInvalid):
|
|
await _purge_entities(hass, [], [], [])
|
|
|
|
with session_scope(hass=hass, read_only=True) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 190
|
|
|
|
states_meta_remain = session.query(StatesMeta)
|
|
assert states_meta_remain.count() == 4
|
|
|
|
|
|
async def _add_test_states(hass: HomeAssistant, wait_recording_done: bool = True):
|
|
"""Add multiple states to the db for testing."""
|
|
utcnow = dt_util.utcnow()
|
|
five_days_ago = utcnow - timedelta(days=5)
|
|
eleven_days_ago = utcnow - timedelta(days=11)
|
|
base_attributes = {"test_attr": 5, "test_attr_10": "nice"}
|
|
|
|
async def set_state(entity_id, state, **kwargs):
|
|
"""Set the state."""
|
|
hass.states.async_set(entity_id, state, **kwargs)
|
|
if wait_recording_done:
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
|
|
with freeze_time() as freezer:
|
|
for event_id in range(6):
|
|
if event_id < 2:
|
|
timestamp = eleven_days_ago
|
|
state = f"autopurgeme_{event_id}"
|
|
attributes = {"autopurgeme": True, **base_attributes}
|
|
elif event_id < 4:
|
|
timestamp = five_days_ago
|
|
state = f"purgeme_{event_id}"
|
|
attributes = {"purgeme": True, **base_attributes}
|
|
else:
|
|
timestamp = utcnow
|
|
state = f"dontpurgeme_{event_id}"
|
|
attributes = {"dontpurgeme": True, **base_attributes}
|
|
|
|
freezer.move_to(timestamp)
|
|
await set_state("test.recorder2", state, attributes=attributes)
|
|
|
|
|
|
async def _add_test_events(hass: HomeAssistant, iterations: int = 1):
|
|
"""Add a few events for testing."""
|
|
utcnow = dt_util.utcnow()
|
|
five_days_ago = utcnow - timedelta(days=5)
|
|
eleven_days_ago = utcnow - timedelta(days=11)
|
|
event_data = {"test_attr": 5, "test_attr_10": "nice"}
|
|
# Make sure recording is done before freezing time
|
|
# because the time freeze can affect the recorder
|
|
# thread as well can cause the test to fail
|
|
await async_wait_recording_done(hass)
|
|
|
|
with freeze_time() as freezer:
|
|
for _ in range(iterations):
|
|
for event_id in range(6):
|
|
if event_id < 2:
|
|
timestamp = eleven_days_ago
|
|
event_type = "EVENT_TEST_AUTOPURGE"
|
|
elif event_id < 4:
|
|
timestamp = five_days_ago
|
|
event_type = "EVENT_TEST_PURGE"
|
|
else:
|
|
timestamp = utcnow
|
|
event_type = "EVENT_TEST"
|
|
freezer.move_to(timestamp)
|
|
hass.bus.async_fire(event_type, event_data)
|
|
|
|
await async_wait_recording_done(hass)
|
|
|
|
|
|
async def _add_test_statistics(hass: HomeAssistant):
|
|
"""Add multiple statistics to the db for testing."""
|
|
utcnow = dt_util.utcnow()
|
|
five_days_ago = utcnow - timedelta(days=5)
|
|
eleven_days_ago = utcnow - timedelta(days=11)
|
|
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
for event_id in range(6):
|
|
if event_id < 2:
|
|
timestamp = eleven_days_ago
|
|
state = "-11"
|
|
elif event_id < 4:
|
|
timestamp = five_days_ago
|
|
state = "-5"
|
|
else:
|
|
timestamp = utcnow
|
|
state = "0"
|
|
|
|
session.add(
|
|
StatisticsShortTerm(
|
|
start_ts=timestamp.timestamp(),
|
|
state=state,
|
|
)
|
|
)
|
|
|
|
|
|
async def _add_test_recorder_runs(hass: HomeAssistant):
|
|
"""Add a few recorder_runs for testing."""
|
|
utcnow = dt_util.utcnow()
|
|
five_days_ago = utcnow - timedelta(days=5)
|
|
eleven_days_ago = utcnow - timedelta(days=11)
|
|
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
for rec_id in range(6):
|
|
if rec_id < 2:
|
|
timestamp = eleven_days_ago
|
|
elif rec_id < 4:
|
|
timestamp = five_days_ago
|
|
else:
|
|
timestamp = utcnow
|
|
|
|
session.add(
|
|
RecorderRuns(
|
|
start=timestamp,
|
|
created=dt_util.utcnow(),
|
|
end=timestamp + timedelta(days=1),
|
|
)
|
|
)
|
|
|
|
|
|
async def _add_test_statistics_runs(hass: HomeAssistant):
|
|
"""Add a few recorder_runs for testing."""
|
|
utcnow = dt_util.utcnow()
|
|
five_days_ago = utcnow - timedelta(days=5)
|
|
eleven_days_ago = utcnow - timedelta(days=11)
|
|
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
for rec_id in range(6):
|
|
if rec_id < 2:
|
|
timestamp = eleven_days_ago
|
|
elif rec_id < 4:
|
|
timestamp = five_days_ago
|
|
else:
|
|
timestamp = utcnow
|
|
|
|
session.add(
|
|
StatisticsRuns(
|
|
start=timestamp,
|
|
)
|
|
)
|
|
|
|
|
|
def _add_state_without_event_linkage(
|
|
session: Session,
|
|
entity_id: str,
|
|
state: str,
|
|
timestamp: datetime,
|
|
):
|
|
state_attrs = StateAttributes(
|
|
hash=1234, shared_attrs=json.dumps({entity_id: entity_id})
|
|
)
|
|
session.add(state_attrs)
|
|
session.add(
|
|
States(
|
|
entity_id=entity_id,
|
|
state=state,
|
|
attributes=None,
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
event_id=None,
|
|
state_attributes=state_attrs,
|
|
)
|
|
)
|
|
|
|
|
|
def _add_state_with_state_attributes(
|
|
session: Session,
|
|
entity_id: str,
|
|
state: str,
|
|
timestamp: datetime,
|
|
event_id: int,
|
|
) -> None:
|
|
"""Add state and state_changed event to database for testing."""
|
|
state_attrs = StateAttributes(
|
|
hash=event_id, shared_attrs=json.dumps({entity_id: entity_id})
|
|
)
|
|
session.add(state_attrs)
|
|
session.add(
|
|
States(
|
|
entity_id=entity_id,
|
|
state=state,
|
|
attributes=None,
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
event_id=event_id,
|
|
state_attributes=state_attrs,
|
|
)
|
|
)
|
|
|
|
|
|
@pytest.mark.timeout(30)
|
|
async def test_purge_many_old_events(
|
|
hass: HomeAssistant, recorder_mock: Recorder
|
|
) -> None:
|
|
"""Test deleting old events."""
|
|
old_events_count = 5
|
|
|
|
with (
|
|
patch.object(recorder_mock, "max_bind_vars", old_events_count),
|
|
patch.object(recorder_mock.database_engine, "max_bind_vars", old_events_count),
|
|
):
|
|
await _add_test_events(hass, old_events_count)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
events = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
|
|
)
|
|
assert events.count() == old_events_count * 6
|
|
|
|
purge_before = dt_util.utcnow() - timedelta(days=4)
|
|
|
|
# run purge_old_data()
|
|
finished = purge_old_data(
|
|
recorder_mock,
|
|
purge_before,
|
|
repack=False,
|
|
states_batch_size=3,
|
|
events_batch_size=3,
|
|
)
|
|
assert not finished
|
|
|
|
with session_scope(hass=hass) as session:
|
|
events = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
|
|
)
|
|
assert events.count() == old_events_count * 3
|
|
|
|
# we should only have 2 groups of events left
|
|
finished = purge_old_data(
|
|
recorder_mock,
|
|
purge_before,
|
|
repack=False,
|
|
states_batch_size=3,
|
|
events_batch_size=3,
|
|
)
|
|
assert finished
|
|
|
|
with session_scope(hass=hass) as session:
|
|
events = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
|
|
)
|
|
assert events.count() == old_events_count * 2
|
|
|
|
# we should now purge everything
|
|
finished = purge_old_data(
|
|
recorder_mock,
|
|
dt_util.utcnow(),
|
|
repack=False,
|
|
states_batch_size=20,
|
|
events_batch_size=20,
|
|
)
|
|
assert finished
|
|
|
|
with session_scope(hass=hass) as session:
|
|
events = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
|
|
)
|
|
assert events.count() == 0
|
|
|
|
|
|
async def test_purge_old_events_purges_the_event_type_ids(
|
|
hass: HomeAssistant, recorder_mock: Recorder
|
|
) -> None:
|
|
"""Test deleting old events purges event type ids."""
|
|
assert recorder_mock.event_type_manager.active is True
|
|
|
|
utcnow = dt_util.utcnow()
|
|
five_days_ago = utcnow - timedelta(days=5)
|
|
eleven_days_ago = utcnow - timedelta(days=11)
|
|
far_past = utcnow - timedelta(days=1000)
|
|
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
|
|
def _insert_events():
|
|
with session_scope(hass=hass) as session:
|
|
event_type_test_auto_purge = EventTypes(event_type="EVENT_TEST_AUTOPURGE")
|
|
event_type_test_purge = EventTypes(event_type="EVENT_TEST_PURGE")
|
|
event_type_test = EventTypes(event_type="EVENT_TEST")
|
|
event_type_unused = EventTypes(event_type="EVENT_TEST_UNUSED")
|
|
session.add_all(
|
|
(
|
|
event_type_test_auto_purge,
|
|
event_type_test_purge,
|
|
event_type_test,
|
|
event_type_unused,
|
|
)
|
|
)
|
|
session.flush()
|
|
for _ in range(5):
|
|
for event_id in range(6):
|
|
if event_id < 2:
|
|
timestamp = eleven_days_ago
|
|
event_type = event_type_test_auto_purge
|
|
elif event_id < 4:
|
|
timestamp = five_days_ago
|
|
event_type = event_type_test_purge
|
|
else:
|
|
timestamp = utcnow
|
|
event_type = event_type_test
|
|
|
|
session.add(
|
|
Events(
|
|
event_type=None,
|
|
event_type_id=event_type.event_type_id,
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
return recorder_mock.event_type_manager.get_many(
|
|
[
|
|
"EVENT_TEST_AUTOPURGE",
|
|
"EVENT_TEST_PURGE",
|
|
"EVENT_TEST",
|
|
"EVENT_TEST_UNUSED",
|
|
],
|
|
session,
|
|
)
|
|
|
|
event_type_to_id = await recorder_mock.async_add_executor_job(_insert_events)
|
|
test_event_type_ids = event_type_to_id.values()
|
|
with session_scope(hass=hass) as session:
|
|
events = session.query(Events).where(
|
|
Events.event_type_id.in_(test_event_type_ids)
|
|
)
|
|
event_types = session.query(EventTypes).where(
|
|
EventTypes.event_type_id.in_(test_event_type_ids)
|
|
)
|
|
|
|
assert events.count() == 30
|
|
assert event_types.count() == 4
|
|
|
|
# run purge_old_data()
|
|
finished = purge_old_data(
|
|
recorder_mock,
|
|
far_past,
|
|
repack=False,
|
|
)
|
|
assert finished
|
|
|
|
with session_scope(hass=hass) as session:
|
|
events = session.query(Events).where(
|
|
Events.event_type_id.in_(test_event_type_ids)
|
|
)
|
|
event_types = session.query(EventTypes).where(
|
|
EventTypes.event_type_id.in_(test_event_type_ids)
|
|
)
|
|
assert events.count() == 30
|
|
# We should remove the unused event type
|
|
assert event_types.count() == 3
|
|
|
|
assert "EVENT_TEST_UNUSED" not in recorder_mock.event_type_manager._id_map
|
|
|
|
# we should only have 10 events left since
|
|
# only one event type was recorded now
|
|
finished = purge_old_data(
|
|
recorder_mock,
|
|
utcnow,
|
|
repack=False,
|
|
)
|
|
assert finished
|
|
|
|
with session_scope(hass=hass) as session:
|
|
events = session.query(Events).where(
|
|
Events.event_type_id.in_(test_event_type_ids)
|
|
)
|
|
event_types = session.query(EventTypes).where(
|
|
EventTypes.event_type_id.in_(test_event_type_ids)
|
|
)
|
|
assert events.count() == 10
|
|
assert event_types.count() == 1
|
|
|
|
# Purge everything
|
|
finished = purge_old_data(
|
|
recorder_mock,
|
|
utcnow + timedelta(seconds=1),
|
|
repack=False,
|
|
)
|
|
assert finished
|
|
|
|
with session_scope(hass=hass) as session:
|
|
events = session.query(Events).where(
|
|
Events.event_type_id.in_(test_event_type_ids)
|
|
)
|
|
event_types = session.query(EventTypes).where(
|
|
EventTypes.event_type_id.in_(test_event_type_ids)
|
|
)
|
|
assert events.count() == 0
|
|
assert event_types.count() == 0
|
|
|
|
|
|
async def test_purge_old_states_purges_the_state_metadata_ids(
|
|
hass: HomeAssistant, recorder_mock: Recorder
|
|
) -> None:
|
|
"""Test deleting old states purges state metadata_ids."""
|
|
assert recorder_mock.states_meta_manager.active is True
|
|
|
|
utcnow = dt_util.utcnow()
|
|
five_days_ago = utcnow - timedelta(days=5)
|
|
eleven_days_ago = utcnow - timedelta(days=11)
|
|
far_past = utcnow - timedelta(days=1000)
|
|
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
|
|
def _insert_states():
|
|
with session_scope(hass=hass) as session:
|
|
states_meta_sensor_one = StatesMeta(entity_id="sensor.one")
|
|
states_meta_sensor_two = StatesMeta(entity_id="sensor.two")
|
|
states_meta_sensor_three = StatesMeta(entity_id="sensor.three")
|
|
states_meta_sensor_unused = StatesMeta(entity_id="sensor.unused")
|
|
session.add_all(
|
|
(
|
|
states_meta_sensor_one,
|
|
states_meta_sensor_two,
|
|
states_meta_sensor_three,
|
|
states_meta_sensor_unused,
|
|
)
|
|
)
|
|
session.flush()
|
|
for _ in range(5):
|
|
for event_id in range(6):
|
|
if event_id < 2:
|
|
timestamp = eleven_days_ago
|
|
metadata_id = states_meta_sensor_one.metadata_id
|
|
elif event_id < 4:
|
|
timestamp = five_days_ago
|
|
metadata_id = states_meta_sensor_two.metadata_id
|
|
else:
|
|
timestamp = utcnow
|
|
metadata_id = states_meta_sensor_three.metadata_id
|
|
|
|
session.add(
|
|
States(
|
|
metadata_id=metadata_id,
|
|
state="any",
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
return recorder_mock.states_meta_manager.get_many(
|
|
["sensor.one", "sensor.two", "sensor.three", "sensor.unused"],
|
|
session,
|
|
True,
|
|
)
|
|
|
|
entity_id_to_metadata_id = await recorder_mock.async_add_executor_job(
|
|
_insert_states
|
|
)
|
|
test_metadata_ids = entity_id_to_metadata_id.values()
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States).where(States.metadata_id.in_(test_metadata_ids))
|
|
states_meta = session.query(StatesMeta).where(
|
|
StatesMeta.metadata_id.in_(test_metadata_ids)
|
|
)
|
|
|
|
assert states.count() == 30
|
|
assert states_meta.count() == 4
|
|
|
|
# run purge_old_data()
|
|
finished = purge_old_data(
|
|
recorder_mock,
|
|
far_past,
|
|
repack=False,
|
|
)
|
|
assert finished
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States).where(States.metadata_id.in_(test_metadata_ids))
|
|
states_meta = session.query(StatesMeta).where(
|
|
StatesMeta.metadata_id.in_(test_metadata_ids)
|
|
)
|
|
assert states.count() == 30
|
|
# We should remove the unused entity_id
|
|
assert states_meta.count() == 3
|
|
|
|
assert "sensor.unused" not in recorder_mock.event_type_manager._id_map
|
|
|
|
# we should only have 10 states left since
|
|
# only one event type was recorded now
|
|
finished = purge_old_data(
|
|
recorder_mock,
|
|
utcnow,
|
|
repack=False,
|
|
)
|
|
assert finished
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States).where(States.metadata_id.in_(test_metadata_ids))
|
|
states_meta = session.query(StatesMeta).where(
|
|
StatesMeta.metadata_id.in_(test_metadata_ids)
|
|
)
|
|
assert states.count() == 10
|
|
assert states_meta.count() == 1
|
|
|
|
# Purge everything
|
|
finished = purge_old_data(
|
|
recorder_mock,
|
|
utcnow + timedelta(seconds=1),
|
|
repack=False,
|
|
)
|
|
assert finished
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States).where(States.metadata_id.in_(test_metadata_ids))
|
|
states_meta = session.query(StatesMeta).where(
|
|
StatesMeta.metadata_id.in_(test_metadata_ids)
|
|
)
|
|
assert states.count() == 0
|
|
assert states_meta.count() == 0
|
|
|
|
|
|
async def test_purge_entities_keep_days(
|
|
hass: HomeAssistant, recorder_mock: Recorder
|
|
) -> None:
|
|
"""Test purging states with an entity filter and keep_days."""
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
start = dt_util.utcnow()
|
|
two_days_ago = start - timedelta(days=2)
|
|
one_week_ago = start - timedelta(days=7)
|
|
one_month_ago = start - timedelta(days=30)
|
|
with freeze_time(one_week_ago):
|
|
hass.states.async_set("sensor.keep", "initial")
|
|
hass.states.async_set("sensor.purge", "initial")
|
|
|
|
await async_wait_recording_done(hass)
|
|
|
|
with freeze_time(two_days_ago):
|
|
hass.states.async_set("sensor.purge", "two_days_ago")
|
|
|
|
await async_wait_recording_done(hass)
|
|
|
|
hass.states.async_set("sensor.purge", "now")
|
|
hass.states.async_set("sensor.keep", "now")
|
|
await async_recorder_block_till_done(hass)
|
|
|
|
states = await recorder_mock.async_add_executor_job(
|
|
get_significant_states,
|
|
hass,
|
|
one_month_ago,
|
|
None,
|
|
["sensor.keep", "sensor.purge"],
|
|
)
|
|
assert len(states["sensor.keep"]) == 2
|
|
assert len(states["sensor.purge"]) == 3
|
|
|
|
await hass.services.async_call(
|
|
RECORDER_DOMAIN,
|
|
SERVICE_PURGE_ENTITIES,
|
|
{
|
|
"entity_id": "sensor.purge",
|
|
"keep_days": 1,
|
|
},
|
|
)
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
states = await recorder_mock.async_add_executor_job(
|
|
get_significant_states,
|
|
hass,
|
|
one_month_ago,
|
|
None,
|
|
["sensor.keep", "sensor.purge"],
|
|
)
|
|
assert len(states["sensor.keep"]) == 2
|
|
assert len(states["sensor.purge"]) == 1
|
|
|
|
await hass.services.async_call(
|
|
RECORDER_DOMAIN,
|
|
SERVICE_PURGE_ENTITIES,
|
|
{
|
|
"entity_id": "sensor.purge",
|
|
},
|
|
)
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
states = await recorder_mock.async_add_executor_job(
|
|
get_significant_states,
|
|
hass,
|
|
one_month_ago,
|
|
None,
|
|
["sensor.keep", "sensor.purge"],
|
|
)
|
|
assert len(states["sensor.keep"]) == 2
|
|
assert "sensor.purge" not in states
|