core/tests/components/ecovacs/test_event.py

97 lines
3.2 KiB
Python

"""Tests for Ecovacs event entities."""
from datetime import timedelta
from deebot_client.events import CleanJobStatus, ReportStatsEvent
from freezegun.api import FrozenDateTimeFactory
import pytest
from syrupy import SnapshotAssertion
from homeassistant.components.ecovacs.const import DOMAIN
from homeassistant.components.ecovacs.controller import EcovacsController
from homeassistant.components.event import ATTR_EVENT_TYPE
from homeassistant.const import STATE_UNKNOWN, Platform
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr, entity_registry as er
from .util import notify_and_wait
pytestmark = [pytest.mark.usefixtures("init_integration")]
@pytest.fixture
def platforms() -> Platform | list[Platform]:
"""Platforms, which should be loaded during the test."""
return Platform.EVENT
async def test_last_job(
hass: HomeAssistant,
device_registry: dr.DeviceRegistry,
entity_registry: er.EntityRegistry,
snapshot: SnapshotAssertion,
freezer: FrozenDateTimeFactory,
controller: EcovacsController,
) -> None:
"""Test last job event entity."""
freezer.move_to("2024-03-20T00:00:00+00:00")
entity_id = "event.ozmo_950_last_job"
assert (state := hass.states.get(entity_id))
assert state.state == STATE_UNKNOWN
assert (entity_entry := entity_registry.async_get(state.entity_id))
assert entity_entry == snapshot(name=f"{entity_id}-entity_entry")
assert entity_entry.device_id
device = controller.devices[0]
assert (device_entry := device_registry.async_get(entity_entry.device_id))
assert device_entry.identifiers == {(DOMAIN, device.device_info["did"])}
event_bus = device.events
await notify_and_wait(
hass,
event_bus,
ReportStatsEvent(10, 5, "spotArea", "1", CleanJobStatus.FINISHED, [1, 2]),
)
assert (state := hass.states.get(state.entity_id))
assert state == snapshot(name=f"{entity_id}-state")
freezer.tick(timedelta(minutes=5))
await notify_and_wait(
hass,
event_bus,
ReportStatsEvent(
100, 50, "spotArea", "2", CleanJobStatus.FINISHED_WITH_WARNINGS, [2, 3]
),
)
assert (state := hass.states.get(state.entity_id))
assert state.state == "2024-03-20T00:05:00.000+00:00"
assert state.attributes[ATTR_EVENT_TYPE] == "finished_with_warnings"
freezer.tick(timedelta(minutes=5))
await notify_and_wait(
hass,
event_bus,
ReportStatsEvent(0, 1, "spotArea", "3", CleanJobStatus.MANUALLY_STOPPED, [1]),
)
assert (state := hass.states.get(state.entity_id))
assert state.state == "2024-03-20T00:10:00.000+00:00"
assert state.attributes[ATTR_EVENT_TYPE] == "manually_stopped"
freezer.tick(timedelta(minutes=5))
for status in (CleanJobStatus.NO_STATUS, CleanJobStatus.CLEANING):
# we should not trigger on these statuses
await notify_and_wait(
hass,
event_bus,
ReportStatsEvent(12, 11, "spotArea", "4", status, [1, 2, 3]),
)
assert (state := hass.states.get(state.entity_id))
assert state.state == "2024-03-20T00:10:00.000+00:00"
assert state.attributes[ATTR_EVENT_TYPE] == "manually_stopped"