core/tests/components/logbook/test_websocket_api.py

3044 lines
103 KiB
Python

"""The tests for the logbook component."""
import asyncio
from collections.abc import Callable
from datetime import timedelta
from typing import Any
from unittest.mock import ANY, patch
from freezegun import freeze_time
import pytest
from homeassistant import core
from homeassistant.components import logbook, recorder
from homeassistant.components.automation import ATTR_SOURCE, EVENT_AUTOMATION_TRIGGERED
from homeassistant.components.logbook import websocket_api
from homeassistant.components.recorder import Recorder
from homeassistant.components.recorder.util import get_instance
from homeassistant.components.script import EVENT_SCRIPT_STARTED
from homeassistant.components.websocket_api import TYPE_RESULT
from homeassistant.const import (
ATTR_DOMAIN,
ATTR_ENTITY_ID,
ATTR_FRIENDLY_NAME,
ATTR_NAME,
ATTR_UNIT_OF_MEASUREMENT,
CONF_DOMAINS,
CONF_ENTITIES,
CONF_EXCLUDE,
CONF_INCLUDE,
EVENT_HOMEASSISTANT_FINAL_WRITE,
EVENT_HOMEASSISTANT_START,
STATE_OFF,
STATE_ON,
)
from homeassistant.core import Event, HomeAssistant, State, callback
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers.entityfilter import CONF_ENTITY_GLOBS
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
from tests.common import MockConfigEntry, async_fire_time_changed
from tests.components.recorder.common import (
async_block_recorder,
async_recorder_block_till_done,
async_wait_recording_done,
)
from tests.typing import RecorderInstanceGenerator, WebSocketGenerator
def listeners_without_writes(listeners: dict[str, int]) -> dict[str, int]:
"""Return listeners without final write listeners since we are not testing for these."""
return {
key: value
for key, value in listeners.items()
if key != EVENT_HOMEASSISTANT_FINAL_WRITE
}
async def _async_mock_logbook_platform_with_broken_describe(
hass: HomeAssistant,
) -> None:
class MockLogbookPlatform:
"""Mock a logbook platform with broken describe."""
@core.callback
def async_describe_events(
hass: HomeAssistant, # noqa: N805
async_describe_event: Callable[
[str, str, Callable[[Event], dict[str, str]]], None
],
) -> None:
"""Describe logbook events."""
@core.callback
def async_describe_test_event(event: Event) -> dict[str, str]:
"""Describe mock logbook event."""
raise ValueError("Broken")
async_describe_event("test", "mock_event", async_describe_test_event)
logbook._process_logbook_platform(hass, "test", MockLogbookPlatform)
async def _async_mock_logbook_platform(hass: HomeAssistant) -> None:
class MockLogbookPlatform:
"""Mock a logbook platform."""
@core.callback
def async_describe_events(
hass: HomeAssistant, # noqa: N805
async_describe_event: Callable[
[str, str, Callable[[Event], dict[str, str]]], None
],
) -> None:
"""Describe logbook events."""
@core.callback
def async_describe_test_event(event: Event) -> dict[str, str]:
"""Describe mock logbook event."""
return {
"name": "device name",
"message": event.data.get("message", "is on fire"),
}
async_describe_event("test", "mock_event", async_describe_test_event)
logbook._process_logbook_platform(hass, "test", MockLogbookPlatform)
async def _async_mock_entity_with_broken_logbook_platform(
hass: HomeAssistant, entity_registry: er.EntityRegistry
) -> er.RegistryEntry:
"""Mock an integration that provides an entity that are described by the logbook that raises."""
entry = MockConfigEntry(domain="test", data={"first": True}, options=None)
entry.add_to_hass(hass)
entry = entity_registry.async_get_or_create(
platform="test",
domain="sensor",
config_entry=entry,
unique_id="1234",
suggested_object_id="test",
)
await _async_mock_logbook_platform_with_broken_describe(hass)
return entry
async def _async_mock_entity_with_logbook_platform(
hass: HomeAssistant, entity_registry: er.EntityRegistry
) -> er.RegistryEntry:
"""Mock an integration that provides an entity that are described by the logbook."""
entry = MockConfigEntry(domain="test", data={"first": True}, options=None)
entry.add_to_hass(hass)
entry = entity_registry.async_get_or_create(
platform="test",
domain="sensor",
config_entry=entry,
unique_id="1234",
suggested_object_id="test",
)
await _async_mock_logbook_platform(hass)
return entry
async def _async_mock_devices_with_logbook_platform(
hass: HomeAssistant, device_registry: dr.DeviceRegistry
) -> list[dr.DeviceEntry]:
"""Mock an integration that provides a device that are described by the logbook."""
entry = MockConfigEntry(domain="test", data={"first": True}, options=None)
entry.add_to_hass(hass)
device = device_registry.async_get_or_create(
config_entry_id=entry.entry_id,
connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")},
identifiers={("bridgeid", "0123")},
sw_version="sw-version",
name="device name",
manufacturer="manufacturer",
model="model",
suggested_area="Game Room",
)
device2 = device_registry.async_get_or_create(
config_entry_id=entry.entry_id,
connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:CC")},
identifiers={("bridgeid", "4567")},
sw_version="sw-version",
name="device name",
manufacturer="manufacturer",
model="model",
suggested_area="Living Room",
)
await _async_mock_logbook_platform(hass)
return [device, device2]
async def test_get_events(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test logbook get_events."""
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook")
]
)
await async_recorder_block_till_done(hass)
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
hass.states.async_set("light.kitchen", STATE_OFF)
await hass.async_block_till_done()
hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 100})
await hass.async_block_till_done()
hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 200})
await hass.async_block_till_done()
hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 300})
await hass.async_block_till_done()
hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 400})
await hass.async_block_till_done()
context = core.Context(
id="01GTDGKBCH00GW0X276W5TEDDD",
user_id="b400facee45711eaa9308bfd3d19e474",
)
hass.states.async_set("light.kitchen", STATE_OFF, context=context)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "logbook/get_events",
"start_time": now.isoformat(),
"end_time": now.isoformat(),
"entity_ids": ["light.kitchen"],
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == []
await client.send_json(
{
"id": 2,
"type": "logbook/get_events",
"start_time": now.isoformat(),
"entity_ids": ["sensor.test"],
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 2
assert response["result"] == []
await client.send_json(
{
"id": 3,
"type": "logbook/get_events",
"start_time": now.isoformat(),
"entity_ids": ["light.kitchen"],
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 3
results = response["result"]
assert results[0]["entity_id"] == "light.kitchen"
assert results[0]["state"] == "on"
assert results[1]["entity_id"] == "light.kitchen"
assert results[1]["state"] == "off"
await client.send_json(
{
"id": 4,
"type": "logbook/get_events",
"start_time": now.isoformat(),
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 4
results = response["result"]
assert len(results) == 3
assert results[0]["message"] == "started"
assert results[1]["entity_id"] == "light.kitchen"
assert results[1]["state"] == "on"
assert isinstance(results[1]["when"], float)
assert results[2]["entity_id"] == "light.kitchen"
assert results[2]["state"] == "off"
assert isinstance(results[2]["when"], float)
await client.send_json(
{
"id": 5,
"type": "logbook/get_events",
"start_time": now.isoformat(),
"context_id": "01GTDGKBCH00GW0X276W5TEDDD",
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 5
results = response["result"]
assert len(results) == 1
assert results[0]["entity_id"] == "light.kitchen"
assert results[0]["state"] == "off"
assert isinstance(results[0]["when"], float)
async def test_get_events_entities_filtered_away(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test logbook get_events all entities filtered away."""
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook")
]
)
await async_recorder_block_till_done(hass)
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
hass.states.async_set("light.kitchen", STATE_ON)
await hass.async_block_till_done()
hass.states.async_set(
"light.filtered", STATE_ON, {"brightness": 100, ATTR_UNIT_OF_MEASUREMENT: "any"}
)
await hass.async_block_till_done()
hass.states.async_set("light.kitchen", STATE_OFF, {"brightness": 200})
await hass.async_block_till_done()
hass.states.async_set(
"light.filtered",
STATE_OFF,
{"brightness": 300, ATTR_UNIT_OF_MEASUREMENT: "any"},
)
await async_wait_recording_done(hass)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "logbook/get_events",
"start_time": now.isoformat(),
"entity_ids": ["light.kitchen"],
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 1
results = response["result"]
assert results[0]["entity_id"] == "light.kitchen"
assert results[0]["state"] == "off"
await client.send_json(
{
"id": 2,
"type": "logbook/get_events",
"start_time": now.isoformat(),
"entity_ids": ["light.filtered"],
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 2
results = response["result"]
assert len(results) == 0
async def test_get_events_future_start_time(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test get_events with a future start time."""
await async_setup_component(hass, "logbook", {})
await async_recorder_block_till_done(hass)
future = dt_util.utcnow() + timedelta(hours=10)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "logbook/get_events",
"start_time": future.isoformat(),
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 1
results = response["result"]
assert isinstance(results, list)
assert len(results) == 0
async def test_get_events_bad_start_time(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test get_events bad start time."""
await async_setup_component(hass, "logbook", {})
await async_recorder_block_till_done(hass)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "logbook/get_events",
"start_time": "cats",
}
)
response = await client.receive_json()
assert not response["success"]
assert response["error"]["code"] == "invalid_start_time"
async def test_get_events_bad_end_time(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test get_events bad end time."""
now = dt_util.utcnow()
await async_setup_component(hass, "logbook", {})
await async_recorder_block_till_done(hass)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "logbook/get_events",
"start_time": now.isoformat(),
"end_time": "dogs",
}
)
response = await client.receive_json()
assert not response["success"]
assert response["error"]["code"] == "invalid_end_time"
async def test_get_events_invalid_filters(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test get_events invalid filters."""
await async_setup_component(hass, "logbook", {})
await async_recorder_block_till_done(hass)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "logbook/get_events",
"entity_ids": [],
}
)
response = await client.receive_json()
assert not response["success"]
assert response["error"]["code"] == "invalid_format"
await client.send_json(
{
"id": 2,
"type": "logbook/get_events",
"device_ids": [],
}
)
response = await client.receive_json()
assert not response["success"]
assert response["error"]["code"] == "invalid_format"
async def test_get_events_with_device_ids(
recorder_mock: Recorder,
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
device_registry: dr.DeviceRegistry,
) -> None:
"""Test logbook get_events for device ids."""
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook")
]
)
devices = await _async_mock_devices_with_logbook_platform(hass, device_registry)
device = devices[0]
device2 = devices[1]
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
hass.bus.async_fire("mock_event", {"device_id": device.id})
hass.bus.async_fire("mock_event", {"device_id": device2.id})
hass.states.async_set("light.kitchen", STATE_OFF)
await hass.async_block_till_done()
hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 100})
await hass.async_block_till_done()
hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 200})
await hass.async_block_till_done()
hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 300})
await hass.async_block_till_done()
hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 400})
await hass.async_block_till_done()
context = core.Context(
id="01GTDGKBCH00GW0X276W5TEDDD",
user_id="b400facee45711eaa9308bfd3d19e474",
)
hass.states.async_set("light.kitchen", STATE_OFF, context=context)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "logbook/get_events",
"start_time": now.isoformat(),
"device_ids": [device.id, device2.id],
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 1
results = response["result"]
assert len(results) == 2
assert results[0]["name"] == "device name"
assert results[0]["message"] == "is on fire"
assert isinstance(results[0]["when"], float)
assert results[1]["name"] == "device name"
assert results[1]["message"] == "is on fire"
assert isinstance(results[1]["when"], float)
await client.send_json(
{
"id": 2,
"type": "logbook/get_events",
"start_time": now.isoformat(),
"entity_ids": ["light.kitchen"],
"device_ids": [device.id],
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 2
results = response["result"]
assert results[0]["domain"] == "test"
assert results[0]["message"] == "is on fire"
assert results[0]["name"] == "device name"
assert results[1]["entity_id"] == "light.kitchen"
assert results[1]["state"] == "on"
assert results[2]["entity_id"] == "light.kitchen"
assert results[2]["state"] == "off"
await client.send_json(
{
"id": 3,
"type": "logbook/get_events",
"start_time": now.isoformat(),
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 3
results = response["result"]
assert len(results) == 5
assert results[0]["message"] == "started"
assert results[1]["name"] == "device name"
assert results[1]["message"] == "is on fire"
assert isinstance(results[1]["when"], float)
assert results[2]["name"] == "device name"
assert results[2]["message"] == "is on fire"
assert isinstance(results[2]["when"], float)
assert results[3]["entity_id"] == "light.kitchen"
assert results[3]["state"] == "on"
assert isinstance(results[3]["when"], float)
assert results[4]["entity_id"] == "light.kitchen"
assert results[4]["state"] == "off"
assert isinstance(results[4]["when"], float)
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
async def test_subscribe_unsubscribe_logbook_stream_excluded_entities(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test subscribe/unsubscribe logbook stream with excluded entities."""
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "automation", "script")
]
)
await async_setup_component(
hass,
logbook.DOMAIN,
{
logbook.DOMAIN: {
CONF_EXCLUDE: {
CONF_ENTITIES: ["light.exc"],
CONF_DOMAINS: ["switch"],
CONF_ENTITY_GLOBS: ["*.excluded"],
}
},
},
)
await hass.async_block_till_done()
hass.states.async_set("light.exc", STATE_ON)
hass.states.async_set("light.exc", STATE_OFF)
hass.states.async_set("switch.any", STATE_ON)
hass.states.async_set("switch.any", STATE_OFF)
hass.states.async_set("cover.excluded", STATE_ON)
hass.states.async_set("cover.excluded", STATE_OFF)
hass.states.async_set("binary_sensor.is_light", STATE_ON)
hass.states.async_set("binary_sensor.is_light", STATE_OFF)
state: State = hass.states.get("binary_sensor.is_light")
await hass.async_block_till_done()
await async_wait_recording_done(hass)
websocket_client = await hass_ws_client()
init_listeners = hass.bus.async_listeners()
await websocket_client.send_json(
{"id": 7, "type": "logbook/event_stream", "start_time": now.isoformat()}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{
"entity_id": "binary_sensor.is_light",
"state": "off",
"when": state.last_updated_timestamp,
}
]
assert msg["event"]["start_time"] == now.timestamp()
assert msg["event"]["end_time"] > msg["event"]["start_time"]
assert msg["event"]["partial"] is True
await get_instance(hass).async_block_till_done()
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]["events"]
assert msg["event"]["events"] == []
hass.states.async_set("light.exc", STATE_ON)
hass.states.async_set("light.exc", STATE_OFF)
hass.states.async_set("switch.any", STATE_ON)
hass.states.async_set("switch.any", STATE_OFF)
hass.states.async_set("cover.excluded", STATE_ON)
hass.states.async_set("cover.excluded", STATE_OFF)
hass.states.async_set("light.alpha", "on")
hass.states.async_set("light.alpha", "off")
alpha_off_state: State = hass.states.get("light.alpha")
hass.states.async_set("light.zulu", "on", {"color": "blue"})
hass.states.async_set("light.zulu", "off", {"effect": "help"})
zulu_off_state: State = hass.states.get("light.zulu")
hass.states.async_set(
"light.zulu", "on", {"effect": "help", "color": ["blue", "green"]}
)
zulu_on_state: State = hass.states.get("light.zulu")
await hass.async_block_till_done()
hass.states.async_remove("light.zulu")
await hass.async_block_till_done()
hass.states.async_set("light.zulu", "on", {"effect": "help", "color": "blue"})
await get_instance(hass).async_block_till_done()
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]["events"]
assert msg["event"]["events"] == [
{
"entity_id": "light.alpha",
"state": "off",
"when": alpha_off_state.last_updated_timestamp,
},
{
"entity_id": "light.zulu",
"state": "off",
"when": zulu_off_state.last_updated_timestamp,
},
{
"entity_id": "light.zulu",
"state": "on",
"when": zulu_on_state.last_updated_timestamp,
},
]
await async_wait_recording_done(hass)
hass.bus.async_fire(
EVENT_AUTOMATION_TRIGGERED,
{ATTR_NAME: "Mock automation 3", ATTR_ENTITY_ID: "cover.excluded"},
)
hass.bus.async_fire(
EVENT_AUTOMATION_TRIGGERED,
{
ATTR_NAME: "Mock automation switch matching entity",
ATTR_ENTITY_ID: "switch.match_domain",
},
)
hass.bus.async_fire(
EVENT_AUTOMATION_TRIGGERED,
{ATTR_NAME: "Mock automation switch matching domain", ATTR_DOMAIN: "switch"},
)
hass.bus.async_fire(
EVENT_AUTOMATION_TRIGGERED,
{ATTR_NAME: "Mock automation matches nothing"},
)
hass.bus.async_fire(
EVENT_AUTOMATION_TRIGGERED,
{ATTR_NAME: "Mock automation 3", ATTR_ENTITY_ID: "light.keep"},
)
hass.states.async_set("cover.excluded", STATE_ON)
hass.states.async_set("cover.excluded", STATE_OFF)
await hass.async_block_till_done()
msg = await websocket_client.receive_json()
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{
"context_id": ANY,
"domain": "automation",
"entity_id": None,
"message": "triggered",
"name": "Mock automation matches nothing",
"source": None,
"when": ANY,
},
{
"context_id": ANY,
"domain": "automation",
"entity_id": "light.keep",
"message": "triggered",
"name": "Mock automation 3",
"source": None,
"when": ANY,
},
]
await websocket_client.send_json(
{"id": 8, "type": "unsubscribe_events", "subscription": 7}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 8
assert msg["type"] == TYPE_RESULT
assert msg["success"]
# Check our listener got unsubscribed
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
async def test_subscribe_unsubscribe_logbook_stream_included_entities(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test subscribe/unsubscribe logbook stream with included entities."""
test_entities = (
"light.inc",
"switch.any",
"cover.included",
"cover.not_included",
"automation.not_included",
"binary_sensor.is_light",
)
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "automation", "script")
]
)
await async_setup_component(
hass,
logbook.DOMAIN,
{
logbook.DOMAIN: {
CONF_INCLUDE: {
CONF_ENTITIES: ["light.inc"],
CONF_DOMAINS: ["switch"],
CONF_ENTITY_GLOBS: ["*.included"],
}
},
},
)
await hass.async_block_till_done()
for entity_id in test_entities:
hass.states.async_set(entity_id, STATE_ON)
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
websocket_client = await hass_ws_client()
init_listeners = hass.bus.async_listeners()
await websocket_client.send_json(
{"id": 7, "type": "logbook/event_stream", "start_time": now.isoformat()}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{"entity_id": "light.inc", "state": "off", "when": ANY},
{"entity_id": "switch.any", "state": "off", "when": ANY},
{"entity_id": "cover.included", "state": "off", "when": ANY},
]
assert msg["event"]["start_time"] == now.timestamp()
assert msg["event"]["end_time"] > msg["event"]["start_time"]
assert msg["event"]["partial"] is True
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]["events"]
assert msg["event"]["events"] == []
for entity_id in test_entities:
hass.states.async_set(entity_id, STATE_ON)
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
hass.states.async_remove("light.zulu")
await hass.async_block_till_done()
hass.states.async_set("light.zulu", "on", {"effect": "help", "color": "blue"})
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]["events"]
assert msg["event"]["events"] == [
{"entity_id": "light.inc", "state": "on", "when": ANY},
{"entity_id": "light.inc", "state": "off", "when": ANY},
{"entity_id": "switch.any", "state": "on", "when": ANY},
{"entity_id": "switch.any", "state": "off", "when": ANY},
{"entity_id": "cover.included", "state": "on", "when": ANY},
{"entity_id": "cover.included", "state": "off", "when": ANY},
]
for _ in range(3):
for entity_id in test_entities:
hass.states.async_set(entity_id, STATE_ON)
hass.states.async_set(entity_id, STATE_OFF)
await async_wait_recording_done(hass)
msg = await websocket_client.receive_json()
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{"entity_id": "light.inc", "state": "on", "when": ANY},
{"entity_id": "light.inc", "state": "off", "when": ANY},
{"entity_id": "switch.any", "state": "on", "when": ANY},
{"entity_id": "switch.any", "state": "off", "when": ANY},
{"entity_id": "cover.included", "state": "on", "when": ANY},
{"entity_id": "cover.included", "state": "off", "when": ANY},
]
hass.bus.async_fire(
EVENT_AUTOMATION_TRIGGERED,
{ATTR_NAME: "Mock automation 3", ATTR_ENTITY_ID: "cover.included"},
)
hass.bus.async_fire(
EVENT_AUTOMATION_TRIGGERED,
{ATTR_NAME: "Mock automation 3", ATTR_ENTITY_ID: "cover.excluded"},
)
hass.bus.async_fire(
EVENT_AUTOMATION_TRIGGERED,
{
ATTR_NAME: "Mock automation switch matching entity",
ATTR_ENTITY_ID: "switch.match_domain",
},
)
hass.bus.async_fire(
EVENT_AUTOMATION_TRIGGERED,
{ATTR_NAME: "Mock automation switch matching domain", ATTR_DOMAIN: "switch"},
)
hass.bus.async_fire(
EVENT_AUTOMATION_TRIGGERED,
{ATTR_NAME: "Mock automation matches nothing"},
)
hass.bus.async_fire(
EVENT_AUTOMATION_TRIGGERED,
{ATTR_NAME: "Mock automation 3", ATTR_ENTITY_ID: "light.inc"},
)
await hass.async_block_till_done()
msg = await websocket_client.receive_json()
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{
"context_id": ANY,
"domain": "automation",
"entity_id": "cover.included",
"message": "triggered",
"name": "Mock automation 3",
"source": None,
"when": ANY,
},
{
"context_id": ANY,
"domain": "automation",
"entity_id": "switch.match_domain",
"message": "triggered",
"name": "Mock automation switch matching entity",
"source": None,
"when": ANY,
},
{
"context_id": ANY,
"domain": "automation",
"entity_id": None,
"message": "triggered",
"name": "Mock automation switch matching domain",
"source": None,
"when": ANY,
},
{
"context_id": ANY,
"domain": "automation",
"entity_id": None,
"message": "triggered",
"name": "Mock automation matches nothing",
"source": None,
"when": ANY,
},
{
"context_id": ANY,
"domain": "automation",
"entity_id": "light.inc",
"message": "triggered",
"name": "Mock automation 3",
"source": None,
"when": ANY,
},
]
await websocket_client.send_json(
{"id": 8, "type": "unsubscribe_events", "subscription": 7}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 8
assert msg["type"] == TYPE_RESULT
assert msg["success"]
# Check our listener got unsubscribed
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
async def test_logbook_stream_excluded_entities_inherits_filters_from_recorder(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test subscribe/unsubscribe logbook stream inherits filters from recorder."""
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "automation", "script")
]
)
await async_setup_component(
hass,
logbook.DOMAIN,
{
logbook.DOMAIN: {
CONF_EXCLUDE: {
CONF_ENTITIES: ["light.additional_excluded"],
}
},
recorder.DOMAIN: {
CONF_EXCLUDE: {
CONF_ENTITIES: ["light.exc"],
CONF_DOMAINS: ["switch"],
CONF_ENTITY_GLOBS: ["*.excluded", "*.no_matches"],
}
},
},
)
await hass.async_block_till_done()
hass.states.async_set("light.exc", STATE_ON)
hass.states.async_set("light.exc", STATE_OFF)
hass.states.async_set("switch.any", STATE_ON)
hass.states.async_set("switch.any", STATE_OFF)
hass.states.async_set("cover.excluded", STATE_ON)
hass.states.async_set("cover.excluded", STATE_OFF)
hass.states.async_set("light.additional_excluded", STATE_ON)
hass.states.async_set("light.additional_excluded", STATE_OFF)
hass.states.async_set("binary_sensor.is_light", STATE_ON)
hass.states.async_set("binary_sensor.is_light", STATE_OFF)
state: State = hass.states.get("binary_sensor.is_light")
await hass.async_block_till_done()
await async_wait_recording_done(hass)
websocket_client = await hass_ws_client()
init_listeners = hass.bus.async_listeners()
await websocket_client.send_json(
{"id": 7, "type": "logbook/event_stream", "start_time": now.isoformat()}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{
"entity_id": "binary_sensor.is_light",
"state": "off",
"when": state.last_updated_timestamp,
}
]
assert msg["event"]["start_time"] == now.timestamp()
assert msg["event"]["end_time"] > msg["event"]["start_time"]
assert msg["event"]["partial"] is True
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]["events"]
assert msg["event"]["events"] == []
hass.states.async_set("light.exc", STATE_ON)
hass.states.async_set("light.exc", STATE_OFF)
hass.states.async_set("switch.any", STATE_ON)
hass.states.async_set("switch.any", STATE_OFF)
hass.states.async_set("cover.excluded", STATE_ON)
hass.states.async_set("cover.excluded", STATE_OFF)
hass.states.async_set("light.additional_excluded", STATE_ON)
hass.states.async_set("light.additional_excluded", STATE_OFF)
hass.states.async_set("light.alpha", "on")
hass.states.async_set("light.alpha", "off")
alpha_off_state: State = hass.states.get("light.alpha")
hass.states.async_set("light.zulu", "on", {"color": "blue"})
hass.states.async_set("light.zulu", "off", {"effect": "help"})
zulu_off_state: State = hass.states.get("light.zulu")
hass.states.async_set(
"light.zulu", "on", {"effect": "help", "color": ["blue", "green"]}
)
zulu_on_state: State = hass.states.get("light.zulu")
await hass.async_block_till_done()
hass.states.async_remove("light.zulu")
await hass.async_block_till_done()
hass.states.async_set("light.zulu", "on", {"effect": "help", "color": "blue"})
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]["events"]
assert msg["event"]["events"] == [
{
"entity_id": "light.alpha",
"state": "off",
"when": alpha_off_state.last_updated_timestamp,
},
{
"entity_id": "light.zulu",
"state": "off",
"when": zulu_off_state.last_updated_timestamp,
},
{
"entity_id": "light.zulu",
"state": "on",
"when": zulu_on_state.last_updated_timestamp,
},
]
await async_wait_recording_done(hass)
hass.bus.async_fire(
EVENT_AUTOMATION_TRIGGERED,
{ATTR_NAME: "Mock automation 3", ATTR_ENTITY_ID: "cover.excluded"},
)
hass.bus.async_fire(
EVENT_AUTOMATION_TRIGGERED,
{
ATTR_NAME: "Mock automation switch matching entity",
ATTR_ENTITY_ID: "switch.match_domain",
},
)
hass.bus.async_fire(
EVENT_AUTOMATION_TRIGGERED,
{ATTR_NAME: "Mock automation switch matching domain", ATTR_DOMAIN: "switch"},
)
hass.bus.async_fire(
EVENT_AUTOMATION_TRIGGERED,
{ATTR_NAME: "Mock automation matches nothing"},
)
hass.bus.async_fire(
EVENT_AUTOMATION_TRIGGERED,
{ATTR_NAME: "Mock automation 3", ATTR_ENTITY_ID: "light.keep"},
)
hass.states.async_set("cover.excluded", STATE_ON)
hass.states.async_set("cover.excluded", STATE_OFF)
await hass.async_block_till_done()
msg = await websocket_client.receive_json()
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{
"context_id": ANY,
"domain": "automation",
"entity_id": None,
"message": "triggered",
"name": "Mock automation matches nothing",
"source": None,
"when": ANY,
},
{
"context_id": ANY,
"domain": "automation",
"entity_id": "light.keep",
"message": "triggered",
"name": "Mock automation 3",
"source": None,
"when": ANY,
},
]
await websocket_client.send_json(
{"id": 8, "type": "unsubscribe_events", "subscription": 7}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 8
assert msg["type"] == TYPE_RESULT
assert msg["success"]
# Check our listener got unsubscribed
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
async def test_subscribe_unsubscribe_logbook_stream(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test subscribe/unsubscribe logbook stream."""
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook", "automation", "script")
]
)
await hass.async_block_till_done()
hass.states.async_set("binary_sensor.is_light", STATE_ON)
hass.states.async_set("binary_sensor.is_light", STATE_OFF)
state: State = hass.states.get("binary_sensor.is_light")
await hass.async_block_till_done()
await async_wait_recording_done(hass)
websocket_client = await hass_ws_client()
init_listeners = hass.bus.async_listeners()
init_listeners = {
**init_listeners,
EVENT_HOMEASSISTANT_START: init_listeners[EVENT_HOMEASSISTANT_START] - 1,
}
await websocket_client.send_json(
{"id": 7, "type": "logbook/event_stream", "start_time": now.isoformat()}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{
"entity_id": "binary_sensor.is_light",
"state": "off",
"when": state.last_updated_timestamp,
}
]
assert msg["event"]["start_time"] == now.timestamp()
assert msg["event"]["end_time"] > msg["event"]["start_time"]
assert msg["event"]["partial"] is True
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]["events"]
assert msg["event"]["events"] == []
hass.states.async_set("light.alpha", "on")
hass.states.async_set("light.alpha", "off")
alpha_off_state: State = hass.states.get("light.alpha")
hass.states.async_set("light.zulu", "on", {"color": "blue"})
hass.states.async_set("light.zulu", "off", {"effect": "help"})
zulu_off_state: State = hass.states.get("light.zulu")
hass.states.async_set(
"light.zulu", "on", {"effect": "help", "color": ["blue", "green"]}
)
zulu_on_state: State = hass.states.get("light.zulu")
await hass.async_block_till_done()
hass.states.async_remove("light.zulu")
await hass.async_block_till_done()
hass.states.async_set("light.zulu", "on", {"effect": "help", "color": "blue"})
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]["events"]
assert msg["event"]["events"] == [
{
"entity_id": "light.alpha",
"state": "off",
"when": alpha_off_state.last_updated_timestamp,
},
{
"entity_id": "light.zulu",
"state": "off",
"when": zulu_off_state.last_updated_timestamp,
},
{
"entity_id": "light.zulu",
"state": "on",
"when": zulu_on_state.last_updated_timestamp,
},
]
hass.bus.async_fire(
EVENT_AUTOMATION_TRIGGERED,
{
ATTR_NAME: "Mock automation",
ATTR_ENTITY_ID: "automation.mock_automation",
ATTR_SOURCE: "numeric state of sensor.hungry_dogs",
},
)
hass.bus.async_fire(
EVENT_SCRIPT_STARTED,
{
ATTR_NAME: "Mock script",
ATTR_ENTITY_ID: "script.mock_script",
ATTR_SOURCE: "numeric state of sensor.hungry_dogs",
},
)
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{
"context_id": ANY,
"domain": "automation",
"entity_id": "automation.mock_automation",
"message": "triggered by numeric state of sensor.hungry_dogs",
"name": "Mock automation",
"source": "numeric state of sensor.hungry_dogs",
"when": ANY,
},
{
"context_id": ANY,
"domain": "script",
"entity_id": "script.mock_script",
"message": "started",
"name": "Mock script",
"when": ANY,
},
{
"domain": "homeassistant",
"icon": "mdi:home-assistant",
"message": "started",
"name": "Home Assistant",
"when": ANY,
},
]
context = core.Context(
id="01GTDGKBCH00GW0X276W5TEDDD",
user_id="b400facee45711eaa9308bfd3d19e474",
)
automation_entity_id_test = "automation.alarm"
hass.bus.async_fire(
EVENT_AUTOMATION_TRIGGERED,
{
ATTR_NAME: "Mock automation",
ATTR_ENTITY_ID: automation_entity_id_test,
ATTR_SOURCE: "state of binary_sensor.dog_food_ready",
},
context=context,
)
hass.bus.async_fire(
EVENT_SCRIPT_STARTED,
{ATTR_NAME: "Mock script", ATTR_ENTITY_ID: "script.mock_script"},
context=context,
)
hass.states.async_set(
automation_entity_id_test,
STATE_ON,
{ATTR_FRIENDLY_NAME: "Alarm Automation"},
context=context,
)
entity_id_test = "alarm_control_panel.area_001"
hass.states.async_set(entity_id_test, STATE_OFF, context=context)
hass.states.async_set(entity_id_test, STATE_ON, context=context)
entity_id_second = "alarm_control_panel.area_002"
hass.states.async_set(entity_id_second, STATE_OFF, context=context)
hass.states.async_set(entity_id_second, STATE_ON, context=context)
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{
"context_id": "01GTDGKBCH00GW0X276W5TEDDD",
"context_user_id": "b400facee45711eaa9308bfd3d19e474",
"domain": "automation",
"entity_id": "automation.alarm",
"message": "triggered by state of binary_sensor.dog_food_ready",
"name": "Mock automation",
"source": "state of binary_sensor.dog_food_ready",
"when": ANY,
},
{
"context_domain": "automation",
"context_entity_id": "automation.alarm",
"context_event_type": "automation_triggered",
"context_id": "01GTDGKBCH00GW0X276W5TEDDD",
"context_message": "triggered by state of binary_sensor.dog_food_ready",
"context_name": "Mock automation",
"context_source": "state of binary_sensor.dog_food_ready",
"context_user_id": "b400facee45711eaa9308bfd3d19e474",
"domain": "script",
"entity_id": "script.mock_script",
"message": "started",
"name": "Mock script",
"when": ANY,
},
{
"context_domain": "automation",
"context_entity_id": "automation.alarm",
"context_event_type": "automation_triggered",
"context_message": "triggered by state of binary_sensor.dog_food_ready",
"context_name": "Mock automation",
"context_source": "state of binary_sensor.dog_food_ready",
"context_user_id": "b400facee45711eaa9308bfd3d19e474",
"entity_id": "alarm_control_panel.area_001",
"state": "on",
"when": ANY,
},
{
"context_domain": "automation",
"context_entity_id": "automation.alarm",
"context_event_type": "automation_triggered",
"context_message": "triggered by state of binary_sensor.dog_food_ready",
"context_name": "Mock automation",
"context_source": "state of binary_sensor.dog_food_ready",
"context_user_id": "b400facee45711eaa9308bfd3d19e474",
"entity_id": "alarm_control_panel.area_002",
"state": "on",
"when": ANY,
},
]
hass.bus.async_fire(
EVENT_AUTOMATION_TRIGGERED,
{ATTR_NAME: "Mock automation 2", ATTR_ENTITY_ID: automation_entity_id_test},
context=context,
)
await hass.async_block_till_done()
msg = await websocket_client.receive_json()
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{
"context_domain": "automation",
"context_entity_id": "automation.alarm",
"context_event_type": "automation_triggered",
"context_id": "01GTDGKBCH00GW0X276W5TEDDD",
"context_message": "triggered by state of binary_sensor.dog_food_ready",
"context_name": "Mock automation",
"context_source": "state of binary_sensor.dog_food_ready",
"context_user_id": "b400facee45711eaa9308bfd3d19e474",
"domain": "automation",
"entity_id": "automation.alarm",
"message": "triggered",
"name": "Mock automation 2",
"source": None,
"when": ANY,
}
]
await async_wait_recording_done(hass)
hass.bus.async_fire(
EVENT_AUTOMATION_TRIGGERED,
{ATTR_NAME: "Mock automation 3", ATTR_ENTITY_ID: automation_entity_id_test},
context=context,
)
await hass.async_block_till_done()
msg = await websocket_client.receive_json()
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{
"context_domain": "automation",
"context_entity_id": "automation.alarm",
"context_event_type": "automation_triggered",
"context_id": "01GTDGKBCH00GW0X276W5TEDDD",
"context_message": "triggered by state of binary_sensor.dog_food_ready",
"context_name": "Mock automation",
"context_source": "state of binary_sensor.dog_food_ready",
"context_user_id": "b400facee45711eaa9308bfd3d19e474",
"domain": "automation",
"entity_id": "automation.alarm",
"message": "triggered",
"name": "Mock automation 3",
"source": None,
"when": ANY,
}
]
await websocket_client.send_json(
{"id": 8, "type": "unsubscribe_events", "subscription": 7}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 8
assert msg["type"] == TYPE_RESULT
assert msg["success"]
# Check our listener got unsubscribed
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
async def test_subscribe_unsubscribe_logbook_stream_entities(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test subscribe/unsubscribe logbook stream with specific entities."""
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook", "automation", "script")
]
)
await hass.async_block_till_done()
hass.states.async_set("light.small", STATE_ON)
hass.states.async_set("binary_sensor.is_light", STATE_ON)
hass.states.async_set("binary_sensor.is_light", STATE_OFF)
state: State = hass.states.get("binary_sensor.is_light")
await hass.async_block_till_done()
await async_wait_recording_done(hass)
websocket_client = await hass_ws_client()
init_listeners = hass.bus.async_listeners()
await websocket_client.send_json(
{
"id": 7,
"type": "logbook/event_stream",
"start_time": now.isoformat(),
"entity_ids": ["light.small", "binary_sensor.is_light"],
}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "start_time" in msg["event"]
assert "end_time" in msg["event"]
assert msg["event"]["partial"] is True
assert msg["event"]["events"] == [
{
"entity_id": "binary_sensor.is_light",
"state": "off",
"when": state.last_updated_timestamp,
}
]
await get_instance(hass).async_block_till_done()
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "start_time" in msg["event"]
assert "end_time" in msg["event"]
assert "partial" not in msg["event"]
assert msg["event"]["events"] == []
hass.states.async_set("light.alpha", STATE_ON)
hass.states.async_set("light.alpha", STATE_OFF)
hass.states.async_set("light.small", STATE_OFF, {"effect": "help", "color": "blue"})
await get_instance(hass).async_block_till_done()
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]
assert msg["event"]["events"] == [
{
"entity_id": "light.small",
"state": "off",
"when": ANY,
},
]
hass.states.async_remove("light.alpha")
hass.states.async_remove("light.small")
await get_instance(hass).async_block_till_done()
await hass.async_block_till_done()
await websocket_client.send_json(
{"id": 8, "type": "unsubscribe_events", "subscription": 7}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 8
assert msg["type"] == TYPE_RESULT
assert msg["success"]
# Check our listener got unsubscribed
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
async def test_subscribe_unsubscribe_logbook_stream_entities_with_end_time(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test subscribe/unsubscribe logbook stream with specific entities and an end_time."""
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook", "automation", "script")
]
)
await hass.async_block_till_done()
hass.states.async_set("light.small", STATE_ON)
hass.states.async_set("binary_sensor.is_light", STATE_ON)
hass.states.async_set("binary_sensor.is_light", STATE_OFF)
state: State = hass.states.get("binary_sensor.is_light")
await hass.async_block_till_done()
await async_wait_recording_done(hass)
websocket_client = await hass_ws_client()
init_listeners = hass.bus.async_listeners()
await websocket_client.send_json(
{
"id": 7,
"type": "logbook/event_stream",
"start_time": now.isoformat(),
"end_time": (now + timedelta(minutes=10)).isoformat(),
"entity_ids": ["light.small", "binary_sensor.is_light"],
}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["partial"] is True
assert msg["event"]["events"] == [
{
"entity_id": "binary_sensor.is_light",
"state": "off",
"when": state.last_updated_timestamp,
}
]
await get_instance(hass).async_block_till_done()
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]
assert msg["event"]["events"] == []
hass.states.async_set("light.alpha", STATE_ON)
hass.states.async_set("light.alpha", STATE_OFF)
hass.states.async_set("light.small", STATE_OFF, {"effect": "help", "color": "blue"})
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]
assert msg["event"]["events"] == [
{
"entity_id": "light.small",
"state": "off",
"when": ANY,
},
]
hass.states.async_remove("light.alpha")
hass.states.async_remove("light.small")
await hass.async_block_till_done()
async_fire_time_changed(hass, now + timedelta(minutes=11))
await hass.async_block_till_done()
# These states should not be sent since we should be unsubscribed
hass.states.async_set("light.small", STATE_ON)
hass.states.async_set("light.small", STATE_OFF)
await hass.async_block_till_done()
await websocket_client.send_json(
{"id": 8, "type": "unsubscribe_events", "subscription": 7}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 8
assert msg["type"] == TYPE_RESULT
assert msg["success"]
# Check our listener got unsubscribed
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
async def test_subscribe_unsubscribe_logbook_stream_entities_past_only(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test subscribe/unsubscribe logbook stream with specific entities in the past."""
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook", "automation", "script")
]
)
await hass.async_block_till_done()
hass.states.async_set("light.small", STATE_ON)
hass.states.async_set("binary_sensor.is_light", STATE_ON)
hass.states.async_set("binary_sensor.is_light", STATE_OFF)
state: State = hass.states.get("binary_sensor.is_light")
await hass.async_block_till_done()
await async_wait_recording_done(hass)
websocket_client = await hass_ws_client()
init_listeners = hass.bus.async_listeners()
await websocket_client.send_json(
{
"id": 7,
"type": "logbook/event_stream",
"start_time": now.isoformat(),
"end_time": (dt_util.utcnow() - timedelta(microseconds=1)).isoformat(),
"entity_ids": ["light.small", "binary_sensor.is_light"],
}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{
"entity_id": "binary_sensor.is_light",
"state": "off",
"when": state.last_updated_timestamp,
}
]
# These states should not be sent since we should be unsubscribed
# since we only asked for the past
hass.states.async_set("light.small", STATE_ON)
hass.states.async_set("light.small", STATE_OFF)
await hass.async_block_till_done()
await websocket_client.send_json(
{"id": 8, "type": "unsubscribe_events", "subscription": 7}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 8
assert msg["type"] == TYPE_RESULT
assert msg["success"]
# Check our listener got unsubscribed
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
async def test_subscribe_unsubscribe_logbook_stream_big_query(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test subscribe/unsubscribe logbook stream and ask for a large time frame.
We should get the data for the first 24 hours in the first message, and
anything older will come in a followup message.
"""
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook", "automation", "script")
]
)
await hass.async_block_till_done()
four_days_ago = now - timedelta(days=4)
five_days_ago = now - timedelta(days=5)
with freeze_time(four_days_ago):
hass.states.async_set("binary_sensor.four_days_ago", STATE_ON)
hass.states.async_set("binary_sensor.four_days_ago", STATE_OFF)
four_day_old_state: State = hass.states.get("binary_sensor.four_days_ago")
await hass.async_block_till_done()
await async_wait_recording_done(hass)
# Verify our state was recorded in the past
assert (now - four_day_old_state.last_updated).total_seconds() > 86400 * 3
hass.states.async_set("binary_sensor.is_light", STATE_OFF)
hass.states.async_set("binary_sensor.is_light", STATE_ON)
current_state: State = hass.states.get("binary_sensor.is_light")
# Verify our new state was recorded in the recent timeframe
assert (now - current_state.last_updated).total_seconds() < 2
await async_wait_recording_done(hass)
websocket_client = await hass_ws_client()
init_listeners = hass.bus.async_listeners()
await websocket_client.send_json(
{
"id": 7,
"type": "logbook/event_stream",
"start_time": five_days_ago.isoformat(),
}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
# With a big query we get the current state first
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{
"entity_id": "binary_sensor.is_light",
"state": "on",
"when": current_state.last_updated_timestamp,
}
]
# With a big query we get the old states second
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["partial"] is True
assert msg["event"]["events"] == [
{
"entity_id": "binary_sensor.four_days_ago",
"state": "off",
"when": four_day_old_state.last_updated_timestamp,
}
]
# And finally a response without partial set to indicate no more
# historical data is coming
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == []
await websocket_client.send_json(
{"id": 8, "type": "unsubscribe_events", "subscription": 7}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 8
assert msg["type"] == TYPE_RESULT
assert msg["success"]
# Check our listener got unsubscribed
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
async def test_subscribe_unsubscribe_logbook_stream_device(
recorder_mock: Recorder,
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
device_registry: dr.DeviceRegistry,
) -> None:
"""Test subscribe/unsubscribe logbook stream with a device."""
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook", "automation", "script")
]
)
devices = await _async_mock_devices_with_logbook_platform(hass, device_registry)
device = devices[0]
device2 = devices[1]
await hass.async_block_till_done()
await async_wait_recording_done(hass)
websocket_client = await hass_ws_client()
init_listeners = hass.bus.async_listeners()
await websocket_client.send_json(
{
"id": 7,
"type": "logbook/event_stream",
"start_time": now.isoformat(),
"device_ids": [device.id, device2.id],
}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
await async_wait_recording_done(hass)
# There are no answers to our initial query
# so we get an empty reply. This is to ensure
# consumers of the api know there are no results
# and its not a failure case. This is useful
# in the frontend so we can tell the user there
# are no results vs waiting for them to appear
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == []
assert "partial" in msg["event"]
await async_wait_recording_done(hass)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == []
assert "partial" not in msg["event"]
await async_wait_recording_done(hass)
hass.states.async_set("binary_sensor.should_not_appear", STATE_ON)
hass.states.async_set("binary_sensor.should_not_appear", STATE_OFF)
hass.bus.async_fire("mock_event", {"device_id": device.id})
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{"domain": "test", "message": "is on fire", "name": "device name", "when": ANY}
]
for _ in range(3):
hass.bus.async_fire("mock_event", {"device_id": device.id})
hass.bus.async_fire("mock_event", {"device_id": device2.id})
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{
"domain": "test",
"message": "is on fire",
"name": "device name",
"when": ANY,
},
{
"domain": "test",
"message": "is on fire",
"name": "device name",
"when": ANY,
},
]
await websocket_client.send_json(
{"id": 8, "type": "unsubscribe_events", "subscription": 7}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 8
assert msg["type"] == TYPE_RESULT
assert msg["success"]
# Check our listener got unsubscribed
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)
async def test_event_stream_bad_start_time(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test event_stream bad start time."""
await async_setup_component(hass, "logbook", {})
await async_recorder_block_till_done(hass)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "logbook/event_stream",
"start_time": "cats",
}
)
response = await client.receive_json()
assert not response["success"]
assert response["error"]["code"] == "invalid_start_time"
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
async def test_logbook_stream_match_multiple_entities(
recorder_mock: Recorder,
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
entity_registry: er.EntityRegistry,
) -> None:
"""Test logbook stream with a described integration that uses multiple entities."""
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook", "automation", "script")
]
)
entry = await _async_mock_entity_with_logbook_platform(hass, entity_registry)
entity_id = entry.entity_id
hass.states.async_set(entity_id, STATE_ON)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
websocket_client = await hass_ws_client()
init_listeners = hass.bus.async_listeners()
await websocket_client.send_json(
{
"id": 7,
"type": "logbook/event_stream",
"start_time": now.isoformat(),
"entity_ids": [entity_id],
}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
# There are no answers to our initial query
# so we get an empty reply. This is to ensure
# consumers of the api know there are no results
# and its not a failure case. This is useful
# in the frontend so we can tell the user there
# are no results vs waiting for them to appear
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == []
assert "partial" in msg["event"]
await async_wait_recording_done(hass)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == []
assert "partial" not in msg["event"]
await async_wait_recording_done(hass)
hass.states.async_set("binary_sensor.should_not_appear", STATE_ON)
hass.states.async_set("binary_sensor.should_not_appear", STATE_OFF)
context = core.Context(
id="01GTDGKBCH00GW0X276W5TEDDD",
user_id="b400facee45711eaa9308bfd3d19e474",
)
hass.bus.async_fire(
"mock_event", {"entity_id": ["sensor.any", entity_id]}, context=context
)
hass.bus.async_fire("mock_event", {"entity_id": [f"sensor.any,{entity_id}"]})
hass.bus.async_fire("mock_event", {"entity_id": ["sensor.no_match", "light.off"]})
hass.states.async_set(entity_id, STATE_OFF, context=context)
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{
"context_user_id": "b400facee45711eaa9308bfd3d19e474",
"domain": "test",
"message": "is on fire",
"name": "device name",
"when": ANY,
},
{
"context_domain": "test",
"context_event_type": "mock_event",
"context_message": "is on fire",
"context_name": "device name",
"context_user_id": "b400facee45711eaa9308bfd3d19e474",
"entity_id": "sensor.test",
"state": "off",
"when": ANY,
},
]
await websocket_client.send_json(
{"id": 8, "type": "unsubscribe_events", "subscription": 7}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 8
assert msg["type"] == TYPE_RESULT
assert msg["success"]
# Check our listener got unsubscribed
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
async def test_logbook_stream_match_multiple_entities_one_with_broken_logbook_platform(
recorder_mock: Recorder,
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
entity_registry: er.EntityRegistry,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test logbook stream with a described integration that uses multiple entities.
One of the entities has a broken logbook platform.
"""
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook", "automation", "script")
]
)
entry = await _async_mock_entity_with_broken_logbook_platform(hass, entity_registry)
entity_id = entry.entity_id
hass.states.async_set(entity_id, STATE_ON)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
websocket_client = await hass_ws_client()
init_listeners = hass.bus.async_listeners()
await websocket_client.send_json(
{
"id": 7,
"type": "logbook/event_stream",
"start_time": now.isoformat(),
"entity_ids": [entity_id],
}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
# There are no answers to our initial query
# so we get an empty reply. This is to ensure
# consumers of the api know there are no results
# and its not a failure case. This is useful
# in the frontend so we can tell the user there
# are no results vs waiting for them to appear
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == []
assert "partial" in msg["event"]
await async_wait_recording_done(hass)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == []
assert "partial" not in msg["event"]
await async_wait_recording_done(hass)
hass.states.async_set("binary_sensor.should_not_appear", STATE_ON)
hass.states.async_set("binary_sensor.should_not_appear", STATE_OFF)
context = core.Context(
id="01GTDGKBCH00GW0X276W5TEDDD",
user_id="b400facee45711eaa9308bfd3d19e474",
)
hass.bus.async_fire(
"mock_event", {"entity_id": ["sensor.any", entity_id]}, context=context
)
hass.bus.async_fire("mock_event", {"entity_id": [f"sensor.any,{entity_id}"]})
hass.bus.async_fire("mock_event", {"entity_id": ["sensor.no_match", "light.off"]})
hass.states.async_set(entity_id, STATE_OFF, context=context)
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{
"entity_id": "sensor.test",
"context_domain": "test",
"context_event_type": "mock_event",
"context_user_id": "b400facee45711eaa9308bfd3d19e474",
"state": "off",
"when": ANY,
},
]
await websocket_client.send_json(
{"id": 8, "type": "unsubscribe_events", "subscription": 7}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 8
assert msg["type"] == TYPE_RESULT
assert msg["success"]
# Check our listener got unsubscribed
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)
assert "Error with test describe event" in caplog.text
async def test_event_stream_bad_end_time(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test event_stream bad end time."""
await async_setup_component(hass, "logbook", {})
await async_recorder_block_till_done(hass)
utc_now = dt_util.utcnow()
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "logbook/event_stream",
"start_time": utc_now.isoformat(),
"end_time": "cats",
}
)
response = await client.receive_json()
assert not response["success"]
assert response["error"]["code"] == "invalid_end_time"
await client.send_json(
{
"id": 2,
"type": "logbook/event_stream",
"start_time": utc_now.isoformat(),
"end_time": (utc_now - timedelta(hours=5)).isoformat(),
}
)
response = await client.receive_json()
assert not response["success"]
assert response["error"]["code"] == "invalid_end_time"
async def test_live_stream_with_one_second_commit_interval(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
device_registry: dr.DeviceRegistry,
) -> None:
"""Test the recorder with a 1s commit interval."""
config = {recorder.CONF_COMMIT_INTERVAL: 0.5}
await async_setup_recorder_instance(hass, config)
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook", "automation", "script")
]
)
devices = await _async_mock_devices_with_logbook_platform(hass, device_registry)
device = devices[0]
await hass.async_block_till_done()
hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "1"})
await async_wait_recording_done(hass)
hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "2"})
await hass.async_block_till_done()
hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "3"})
websocket_client = await hass_ws_client()
init_listeners = hass.bus.async_listeners()
await websocket_client.send_json(
{
"id": 7,
"type": "logbook/event_stream",
"start_time": now.isoformat(),
"device_ids": [device.id],
}
)
hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "4"})
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "5"})
recieved_rows = []
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
recieved_rows.extend(msg["event"]["events"])
hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "6"})
await hass.async_block_till_done()
hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "7"})
while len(recieved_rows) < 7:
msg = await asyncio.wait_for(websocket_client.receive_json(), 2.5)
assert msg["id"] == 7
assert msg["type"] == "event"
recieved_rows.extend(msg["event"]["events"])
# Make sure we get rows back in order
assert recieved_rows == [
{"domain": "test", "message": "1", "name": "device name", "when": ANY},
{"domain": "test", "message": "2", "name": "device name", "when": ANY},
{"domain": "test", "message": "3", "name": "device name", "when": ANY},
{"domain": "test", "message": "4", "name": "device name", "when": ANY},
{"domain": "test", "message": "5", "name": "device name", "when": ANY},
{"domain": "test", "message": "6", "name": "device name", "when": ANY},
{"domain": "test", "message": "7", "name": "device name", "when": ANY},
]
await websocket_client.send_json(
{"id": 8, "type": "unsubscribe_events", "subscription": 7}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 8
assert msg["type"] == TYPE_RESULT
assert msg["success"]
# Check our listener got unsubscribed
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
async def test_subscribe_disconnected(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test subscribe/unsubscribe logbook stream gets disconnected."""
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook", "automation", "script")
]
)
await async_wait_recording_done(hass)
hass.states.async_set("light.small", STATE_ON)
hass.states.async_set("binary_sensor.is_light", STATE_ON)
hass.states.async_set("binary_sensor.is_light", STATE_OFF)
state: State = hass.states.get("binary_sensor.is_light")
await hass.async_block_till_done()
await async_wait_recording_done(hass)
# We will compare event subscriptions after closing the websocket connection,
# count the listeners before setting it up
init_listeners = hass.bus.async_listeners()
websocket_client = await hass_ws_client()
await websocket_client.send_json(
{
"id": 7,
"type": "logbook/event_stream",
"start_time": now.isoformat(),
"entity_ids": ["light.small", "binary_sensor.is_light"],
}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{
"entity_id": "binary_sensor.is_light",
"state": "off",
"when": state.last_updated_timestamp,
}
]
await websocket_client.close()
await hass.async_block_till_done()
# Check our listener got unsubscribed
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
async def test_stream_consumer_stop_processing(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test we unsubscribe if the stream consumer fails or is canceled."""
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook", "automation", "script")
]
)
await async_wait_recording_done(hass)
init_listeners = hass.bus.async_listeners()
hass.states.async_set("light.small", STATE_ON)
hass.states.async_set("binary_sensor.is_light", STATE_ON)
hass.states.async_set("binary_sensor.is_light", STATE_OFF)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
websocket_client = await hass_ws_client()
after_ws_created_listeners = hass.bus.async_listeners()
with (
patch.object(websocket_api, "MAX_PENDING_LOGBOOK_EVENTS", 5),
patch.object(websocket_api, "_async_events_consumer"),
):
await websocket_client.send_json(
{
"id": 7,
"type": "logbook/event_stream",
"start_time": now.isoformat(),
"entity_ids": ["light.small", "binary_sensor.is_light"],
}
)
await async_wait_recording_done(hass)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
assert listeners_without_writes(
hass.bus.async_listeners()
) != listeners_without_writes(init_listeners)
for _ in range(5):
hass.states.async_set("binary_sensor.is_light", STATE_ON)
hass.states.async_set("binary_sensor.is_light", STATE_OFF)
await async_wait_recording_done(hass)
# Check our listener got unsubscribed because
# the queue got full and the overload safety tripped
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(after_ws_created_listeners)
await websocket_client.close()
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
async def test_recorder_is_far_behind(
recorder_mock: Recorder,
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
caplog: pytest.LogCaptureFixture,
device_registry: dr.DeviceRegistry,
) -> None:
"""Test we still start live streaming if the recorder is far behind."""
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook", "automation", "script")
]
)
await async_wait_recording_done(hass)
devices = await _async_mock_devices_with_logbook_platform(hass, device_registry)
device = devices[0]
await async_wait_recording_done(hass)
# Block the recorder queue
await async_block_recorder(hass, 0.3)
await hass.async_block_till_done()
websocket_client = await hass_ws_client()
await websocket_client.send_json(
{
"id": 7,
"type": "logbook/event_stream",
"start_time": now.isoformat(),
"device_ids": [device.id],
}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
# There are no answers to our initial query
# so we get an empty reply. This is to ensure
# consumers of the api know there are no results
# and its not a failure case. This is useful
# in the frontend so we can tell the user there
# are no results vs waiting for them to appear
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == []
await async_wait_recording_done(hass)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == []
hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "1"})
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{"domain": "test", "message": "1", "name": "device name", "when": ANY}
]
hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "2"})
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{"domain": "test", "message": "2", "name": "device name", "when": ANY}
]
await websocket_client.send_json(
{"id": 8, "type": "unsubscribe_events", "subscription": 7}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 8
assert msg["type"] == TYPE_RESULT
assert msg["success"]
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
async def test_subscribe_all_entities_are_continuous(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test subscribe/unsubscribe logbook stream with entities that are always filtered."""
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook", "automation", "script")
]
)
await async_wait_recording_done(hass)
entity_ids = ("sensor.uom", "sensor.uom_two")
def _cycle_entities():
for entity_id in entity_ids:
for state in ("1", "2", "3"):
hass.states.async_set(
entity_id, state, {ATTR_UNIT_OF_MEASUREMENT: "any"}
)
hass.states.async_set("counter.any", state)
hass.states.async_set("proximity.any", state)
# We will compare event subscriptions after closing the websocket connection,
# count the listeners before setting it up
init_listeners = hass.bus.async_listeners()
_cycle_entities()
await async_wait_recording_done(hass)
websocket_client = await hass_ws_client()
await websocket_client.send_json(
{
"id": 7,
"type": "logbook/event_stream",
"start_time": now.isoformat(),
"entity_ids": ["sensor.uom", "counter.any", "proximity.any"],
}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
_cycle_entities()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == []
await websocket_client.close()
await hass.async_block_till_done()
# Check our listener got unsubscribed
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
async def test_subscribe_all_entities_have_uom_multiple(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test logbook stream with specific request for multiple entities that are always filtered."""
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook", "automation", "script")
]
)
await async_wait_recording_done(hass)
entity_ids = ("sensor.uom", "sensor.uom_two")
def _cycle_entities():
for entity_id in entity_ids:
for state in ("1", "2", "3"):
hass.states.async_set(
entity_id, state, {ATTR_UNIT_OF_MEASUREMENT: "any"}
)
# We will compare event subscriptions after closing the websocket connection,
# count the listeners before setting it up
init_listeners = hass.bus.async_listeners()
_cycle_entities()
await async_wait_recording_done(hass)
websocket_client = await hass_ws_client()
await websocket_client.send_json(
{
"id": 7,
"type": "logbook/event_stream",
"start_time": now.isoformat(),
"entity_ids": [*entity_ids],
}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
_cycle_entities()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == []
await websocket_client.close()
await hass.async_block_till_done()
# Check our listener got unsubscribed
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
async def test_subscribe_entities_some_have_uom_multiple(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test logbook stream with uom filtered entities and non-filtered entities."""
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook", "automation", "script")
]
)
await async_wait_recording_done(hass)
filtered_entity_ids = ("sensor.uom", "sensor.uom_two")
non_filtered_entity_ids = ("sensor.keep", "sensor.keep_two")
def _cycle_entities():
for entity_id in filtered_entity_ids:
for state in ("1", "2", "3"):
hass.states.async_set(
entity_id, state, {ATTR_UNIT_OF_MEASUREMENT: "any"}
)
for entity_id in non_filtered_entity_ids:
for state in (STATE_ON, STATE_OFF):
hass.states.async_set(entity_id, state)
# We will compare event subscriptions after closing the websocket connection,
# count the listeners before setting it up
init_listeners = hass.bus.async_listeners()
_cycle_entities()
await async_wait_recording_done(hass)
websocket_client = await hass_ws_client()
await websocket_client.send_json(
{
"id": 7,
"type": "logbook/event_stream",
"start_time": now.isoformat(),
"entity_ids": [*filtered_entity_ids, *non_filtered_entity_ids],
}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
await get_instance(hass).async_block_till_done()
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{"entity_id": "sensor.keep", "state": "off", "when": ANY},
{"entity_id": "sensor.keep_two", "state": "off", "when": ANY},
]
assert msg["event"]["partial"] is True
await get_instance(hass).async_block_till_done()
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]
assert msg["event"]["events"] == []
_cycle_entities()
await get_instance(hass).async_block_till_done()
await hass.async_block_till_done()
_cycle_entities()
await get_instance(hass).async_block_till_done()
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{"entity_id": "sensor.keep", "state": "on", "when": ANY},
{"entity_id": "sensor.keep", "state": "off", "when": ANY},
{"entity_id": "sensor.keep_two", "state": "on", "when": ANY},
{"entity_id": "sensor.keep_two", "state": "off", "when": ANY},
]
assert "partial" not in msg["event"]
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{"entity_id": "sensor.keep", "state": "on", "when": ANY},
{"entity_id": "sensor.keep", "state": "off", "when": ANY},
{"entity_id": "sensor.keep_two", "state": "on", "when": ANY},
{"entity_id": "sensor.keep_two", "state": "off", "when": ANY},
]
assert "partial" not in msg["event"]
await websocket_client.close()
await hass.async_block_till_done()
# Check our listener got unsubscribed
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
async def test_logbook_stream_ignores_forced_updates(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test logbook live stream ignores forced updates."""
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook", "automation", "script")
]
)
await hass.async_block_till_done()
hass.states.async_set("binary_sensor.is_light", STATE_ON)
hass.states.async_set("binary_sensor.is_light", STATE_OFF)
state: State = hass.states.get("binary_sensor.is_light")
await hass.async_block_till_done()
await async_wait_recording_done(hass)
websocket_client = await hass_ws_client()
init_listeners = hass.bus.async_listeners()
await websocket_client.send_json(
{"id": 7, "type": "logbook/event_stream", "start_time": now.isoformat()}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{
"entity_id": "binary_sensor.is_light",
"state": "off",
"when": state.last_updated_timestamp,
}
]
assert msg["event"]["start_time"] == now.timestamp()
assert msg["event"]["end_time"] > msg["event"]["start_time"]
assert msg["event"]["partial"] is True
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]["events"]
assert msg["event"]["events"] == []
hass.states.async_set("binary_sensor.is_light", STATE_ON)
hass.states.async_set("binary_sensor.is_light", STATE_OFF)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]["events"]
assert msg["event"]["events"] == [
{
"entity_id": "binary_sensor.is_light",
"state": STATE_ON,
"when": ANY,
},
{
"entity_id": "binary_sensor.is_light",
"state": STATE_OFF,
"when": ANY,
},
]
# Now we force an update to make sure we ignore
# forced updates when the state has not actually changed
hass.states.async_set("binary_sensor.is_light", STATE_ON)
for _ in range(3):
hass.states.async_set("binary_sensor.is_light", STATE_OFF, force_update=True)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]["events"]
assert msg["event"]["events"] == [
{
"entity_id": "binary_sensor.is_light",
"state": STATE_ON,
"when": ANY,
},
# We should only get the first one and ignore
# the other forced updates since the state
# has not actually changed
{
"entity_id": "binary_sensor.is_light",
"state": STATE_OFF,
"when": ANY,
},
]
await websocket_client.send_json(
{"id": 8, "type": "unsubscribe_events", "subscription": 7}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 8
assert msg["type"] == TYPE_RESULT
assert msg["success"]
# Check our listener got unsubscribed
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
async def test_subscribe_all_entities_are_continuous_with_device(
recorder_mock: Recorder,
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
device_registry: dr.DeviceRegistry,
) -> None:
"""Test subscribe/unsubscribe logbook stream with entities that are always filtered and a device."""
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook", "automation", "script")
]
)
await async_wait_recording_done(hass)
devices = await _async_mock_devices_with_logbook_platform(hass, device_registry)
device = devices[0]
device2 = devices[1]
entity_ids = ("sensor.uom", "sensor.uom_two")
def _create_events():
for entity_id in entity_ids:
for state in ("1", "2", "3"):
hass.states.async_set(
entity_id, state, {ATTR_UNIT_OF_MEASUREMENT: "any"}
)
hass.states.async_set("counter.any", state)
hass.states.async_set("proximity.any", state)
hass.bus.async_fire("mock_event", {"device_id": device.id})
hass.bus.async_fire("mock_event", {"device_id": device2.id})
# We will compare event subscriptions after closing the websocket connection,
# count the listeners before setting it up
init_listeners = hass.bus.async_listeners()
_create_events()
await async_wait_recording_done(hass)
websocket_client = await hass_ws_client()
await websocket_client.send_json(
{
"id": 7,
"type": "logbook/event_stream",
"start_time": now.isoformat(),
"entity_ids": ["sensor.uom", "counter.any", "proximity.any"],
"device_ids": [device.id, device2.id],
}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{"domain": "test", "message": "is on fire", "name": "device name", "when": ANY},
{"domain": "test", "message": "is on fire", "name": "device name", "when": ANY},
]
assert msg["event"]["partial"] is True
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == []
assert "partial" not in msg["event"]
for _ in range(2):
_create_events()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{
"domain": "test",
"message": "is on fire",
"name": "device name",
"when": ANY,
},
{
"domain": "test",
"message": "is on fire",
"name": "device name",
"when": ANY,
},
]
assert "partial" not in msg["event"]
await websocket_client.close()
await hass.async_block_till_done()
# Check our listener got unsubscribed
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)
@pytest.mark.parametrize("params", [{"entity_ids": ["binary_sensor.is_light"]}, {}])
async def test_live_stream_with_changed_state_change(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
params: dict[str, Any],
) -> None:
"""Test the live logbook stream with chained events."""
config = {recorder.CONF_COMMIT_INTERVAL: 0.5}
await async_setup_recorder_instance(hass, config)
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook")
]
)
hass.states.async_set("binary_sensor.is_light", "unavailable")
hass.states.async_set("binary_sensor.is_light", "unknown")
await async_wait_recording_done(hass)
@callback
def auto_off_listener(event):
hass.states.async_set("binary_sensor.is_light", STATE_OFF)
async_track_state_change_event(hass, ["binary_sensor.is_light"], auto_off_listener)
websocket_client = await hass_ws_client()
init_listeners = hass.bus.async_listeners()
await websocket_client.send_json(
{
"id": 7,
"type": "logbook/event_stream",
"start_time": now.isoformat(),
**params,
}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
await hass.async_block_till_done()
hass.states.async_set("binary_sensor.is_light", STATE_ON)
recieved_rows = []
while len(recieved_rows) < 3:
msg = await asyncio.wait_for(websocket_client.receive_json(), 2.5)
assert msg["id"] == 7
assert msg["type"] == "event"
recieved_rows.extend(msg["event"]["events"])
# Make sure we get rows back in order
assert recieved_rows == [
{"entity_id": "binary_sensor.is_light", "state": "unknown", "when": ANY},
{"entity_id": "binary_sensor.is_light", "state": "on", "when": ANY},
{"entity_id": "binary_sensor.is_light", "state": "off", "when": ANY},
]
await websocket_client.send_json(
{"id": 8, "type": "unsubscribe_events", "subscription": 7}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 8
assert msg["type"] == TYPE_RESULT
assert msg["success"]
# Check our listener got unsubscribed
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)