mirror of https://github.com/home-assistant/core
581 lines
18 KiB
Python
581 lines
18 KiB
Python
"""Websocket API for the history integration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from collections.abc import Callable, Iterable
|
|
from dataclasses import dataclass
|
|
from datetime import datetime as dt, timedelta
|
|
import logging
|
|
from typing import Any, cast
|
|
|
|
import voluptuous as vol
|
|
|
|
from homeassistant.components import websocket_api
|
|
from homeassistant.components.recorder import get_instance, history
|
|
from homeassistant.components.websocket_api import ActiveConnection, messages
|
|
from homeassistant.const import (
|
|
COMPRESSED_STATE_ATTRIBUTES,
|
|
COMPRESSED_STATE_LAST_CHANGED,
|
|
COMPRESSED_STATE_LAST_UPDATED,
|
|
COMPRESSED_STATE_STATE,
|
|
)
|
|
from homeassistant.core import (
|
|
CALLBACK_TYPE,
|
|
Event,
|
|
EventStateChangedData,
|
|
HomeAssistant,
|
|
State,
|
|
callback,
|
|
is_callback,
|
|
valid_entity_id,
|
|
)
|
|
from homeassistant.helpers.event import (
|
|
async_track_point_in_utc_time,
|
|
async_track_state_change_event,
|
|
)
|
|
from homeassistant.helpers.json import json_bytes
|
|
from homeassistant.util.async_ import create_eager_task
|
|
import homeassistant.util.dt as dt_util
|
|
|
|
from .const import EVENT_COALESCE_TIME, MAX_PENDING_HISTORY_STATES
|
|
from .helpers import entities_may_have_state_changes_after, has_recorder_run_after
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class HistoryLiveStream:
|
|
"""Track a history live stream."""
|
|
|
|
stream_queue: asyncio.Queue[Event]
|
|
subscriptions: list[CALLBACK_TYPE]
|
|
end_time_unsub: CALLBACK_TYPE | None = None
|
|
task: asyncio.Task | None = None
|
|
wait_sync_task: asyncio.Task | None = None
|
|
|
|
|
|
@callback
|
|
def async_setup(hass: HomeAssistant) -> None:
|
|
"""Set up the history websocket API."""
|
|
websocket_api.async_register_command(hass, ws_get_history_during_period)
|
|
websocket_api.async_register_command(hass, ws_stream)
|
|
|
|
|
|
def _ws_get_significant_states(
|
|
hass: HomeAssistant,
|
|
msg_id: int,
|
|
start_time: dt,
|
|
end_time: dt | None,
|
|
entity_ids: list[str] | None,
|
|
include_start_time_state: bool,
|
|
significant_changes_only: bool,
|
|
minimal_response: bool,
|
|
no_attributes: bool,
|
|
) -> bytes:
|
|
"""Fetch history significant_states and convert them to json in the executor."""
|
|
return json_bytes(
|
|
messages.result_message(
|
|
msg_id,
|
|
history.get_significant_states(
|
|
hass,
|
|
start_time,
|
|
end_time,
|
|
entity_ids,
|
|
None,
|
|
include_start_time_state,
|
|
significant_changes_only,
|
|
minimal_response,
|
|
no_attributes,
|
|
True,
|
|
),
|
|
)
|
|
)
|
|
|
|
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "history/history_during_period",
|
|
vol.Required("start_time"): str,
|
|
vol.Optional("end_time"): str,
|
|
vol.Required("entity_ids"): [str],
|
|
vol.Optional("include_start_time_state", default=True): bool,
|
|
vol.Optional("significant_changes_only", default=True): bool,
|
|
vol.Optional("minimal_response", default=False): bool,
|
|
vol.Optional("no_attributes", default=False): bool,
|
|
}
|
|
)
|
|
@websocket_api.async_response
|
|
async def ws_get_history_during_period(
|
|
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
|
|
) -> None:
|
|
"""Handle history during period websocket command."""
|
|
start_time_str = msg["start_time"]
|
|
end_time_str = msg.get("end_time")
|
|
|
|
if start_time := dt_util.parse_datetime(start_time_str):
|
|
start_time = dt_util.as_utc(start_time)
|
|
else:
|
|
connection.send_error(msg["id"], "invalid_start_time", "Invalid start_time")
|
|
return
|
|
|
|
if end_time_str:
|
|
if end_time := dt_util.parse_datetime(end_time_str):
|
|
end_time = dt_util.as_utc(end_time)
|
|
else:
|
|
connection.send_error(msg["id"], "invalid_end_time", "Invalid end_time")
|
|
return
|
|
else:
|
|
end_time = None
|
|
|
|
if start_time > dt_util.utcnow():
|
|
connection.send_result(msg["id"], {})
|
|
return
|
|
|
|
entity_ids: list[str] = msg["entity_ids"]
|
|
for entity_id in entity_ids:
|
|
if not hass.states.get(entity_id) and not valid_entity_id(entity_id):
|
|
connection.send_error(msg["id"], "invalid_entity_ids", "Invalid entity_ids")
|
|
return
|
|
|
|
include_start_time_state = msg["include_start_time_state"]
|
|
no_attributes = msg["no_attributes"]
|
|
|
|
if (
|
|
(end_time and not has_recorder_run_after(hass, end_time))
|
|
or not include_start_time_state
|
|
and entity_ids
|
|
and not entities_may_have_state_changes_after(
|
|
hass, entity_ids, start_time, no_attributes
|
|
)
|
|
):
|
|
connection.send_result(msg["id"], {})
|
|
return
|
|
|
|
significant_changes_only = msg["significant_changes_only"]
|
|
minimal_response = msg["minimal_response"]
|
|
|
|
connection.send_message(
|
|
await get_instance(hass).async_add_executor_job(
|
|
_ws_get_significant_states,
|
|
hass,
|
|
msg["id"],
|
|
start_time,
|
|
end_time,
|
|
entity_ids,
|
|
include_start_time_state,
|
|
significant_changes_only,
|
|
minimal_response,
|
|
no_attributes,
|
|
)
|
|
)
|
|
|
|
|
|
def _generate_stream_message(
|
|
states: dict[str, list[dict[str, Any]]],
|
|
start_day: dt,
|
|
end_day: dt,
|
|
) -> dict[str, Any]:
|
|
"""Generate a history stream message response."""
|
|
return {
|
|
"states": states,
|
|
"start_time": start_day.timestamp(),
|
|
"end_time": end_day.timestamp(),
|
|
}
|
|
|
|
|
|
@callback
|
|
def _async_send_empty_response(
|
|
connection: ActiveConnection, msg_id: int, start_time: dt, end_time: dt | None
|
|
) -> None:
|
|
"""Send an empty response when we know all results are filtered away."""
|
|
connection.send_result(msg_id)
|
|
stream_end_time = end_time or dt_util.utcnow()
|
|
connection.send_message(
|
|
_generate_websocket_response(msg_id, start_time, stream_end_time, {})
|
|
)
|
|
|
|
|
|
def _generate_websocket_response(
|
|
msg_id: int,
|
|
start_time: dt,
|
|
end_time: dt,
|
|
states: dict[str, list[dict[str, Any]]],
|
|
) -> bytes:
|
|
"""Generate a websocket response."""
|
|
return json_bytes(
|
|
messages.event_message(
|
|
msg_id, _generate_stream_message(states, start_time, end_time)
|
|
)
|
|
)
|
|
|
|
|
|
def _generate_historical_response(
|
|
hass: HomeAssistant,
|
|
msg_id: int,
|
|
start_time: dt,
|
|
end_time: dt,
|
|
entity_ids: list[str] | None,
|
|
include_start_time_state: bool,
|
|
significant_changes_only: bool,
|
|
minimal_response: bool,
|
|
no_attributes: bool,
|
|
send_empty: bool,
|
|
) -> tuple[float, dt | None, bytes | None]:
|
|
"""Generate a historical response."""
|
|
states = cast(
|
|
dict[str, list[dict[str, Any]]],
|
|
history.get_significant_states(
|
|
hass,
|
|
start_time,
|
|
end_time,
|
|
entity_ids,
|
|
None,
|
|
include_start_time_state,
|
|
significant_changes_only,
|
|
minimal_response,
|
|
no_attributes,
|
|
True,
|
|
),
|
|
)
|
|
last_time_ts = 0.0
|
|
for state_list in states.values():
|
|
if (
|
|
state_list
|
|
and (state_last_time := state_list[-1][COMPRESSED_STATE_LAST_UPDATED])
|
|
> last_time_ts
|
|
):
|
|
last_time_ts = cast(float, state_last_time)
|
|
|
|
if last_time_ts == 0:
|
|
# If we did not send any states ever, we need to send an empty response
|
|
# so the websocket client knows it should render/process/consume the
|
|
# data.
|
|
if not send_empty:
|
|
return last_time_ts, None, None
|
|
last_time_dt = end_time
|
|
else:
|
|
last_time_dt = dt_util.utc_from_timestamp(last_time_ts)
|
|
|
|
return (
|
|
last_time_ts,
|
|
last_time_dt,
|
|
_generate_websocket_response(msg_id, start_time, last_time_dt, states),
|
|
)
|
|
|
|
|
|
async def _async_send_historical_states(
|
|
hass: HomeAssistant,
|
|
connection: ActiveConnection,
|
|
msg_id: int,
|
|
start_time: dt,
|
|
end_time: dt,
|
|
entity_ids: list[str] | None,
|
|
include_start_time_state: bool,
|
|
significant_changes_only: bool,
|
|
minimal_response: bool,
|
|
no_attributes: bool,
|
|
send_empty: bool,
|
|
) -> dt | None:
|
|
"""Fetch history significant_states and send them to the client."""
|
|
instance = get_instance(hass)
|
|
last_time_ts, last_time_dt, payload = await instance.async_add_executor_job(
|
|
_generate_historical_response,
|
|
hass,
|
|
msg_id,
|
|
start_time,
|
|
end_time,
|
|
entity_ids,
|
|
include_start_time_state,
|
|
significant_changes_only,
|
|
minimal_response,
|
|
no_attributes,
|
|
send_empty,
|
|
)
|
|
if payload:
|
|
connection.send_message(payload)
|
|
return last_time_dt if last_time_ts != 0 else None
|
|
|
|
|
|
def _history_compressed_state(state: State, no_attributes: bool) -> dict[str, Any]:
|
|
"""Convert a state to a compressed state."""
|
|
comp_state: dict[str, Any] = {COMPRESSED_STATE_STATE: state.state}
|
|
if not no_attributes or state.domain in history.NEED_ATTRIBUTE_DOMAINS:
|
|
comp_state[COMPRESSED_STATE_ATTRIBUTES] = state.attributes
|
|
comp_state[COMPRESSED_STATE_LAST_UPDATED] = state.last_updated_timestamp
|
|
if state.last_changed != state.last_updated:
|
|
comp_state[COMPRESSED_STATE_LAST_CHANGED] = state.last_changed_timestamp
|
|
return comp_state
|
|
|
|
|
|
def _events_to_compressed_states(
|
|
events: Iterable[Event], no_attributes: bool
|
|
) -> dict[str, list[dict[str, Any]]]:
|
|
"""Convert events to a compressed states."""
|
|
states_by_entity_ids: dict[str, list[dict[str, Any]]] = {}
|
|
for event in events:
|
|
state: State = event.data["new_state"]
|
|
entity_id: str = state.entity_id
|
|
states_by_entity_ids.setdefault(entity_id, []).append(
|
|
_history_compressed_state(state, no_attributes)
|
|
)
|
|
return states_by_entity_ids
|
|
|
|
|
|
async def _async_events_consumer(
|
|
subscriptions_setup_complete_time: dt,
|
|
connection: ActiveConnection,
|
|
msg_id: int,
|
|
stream_queue: asyncio.Queue[Event],
|
|
no_attributes: bool,
|
|
) -> None:
|
|
"""Stream events from the queue."""
|
|
subscriptions_setup_complete_timestamp = (
|
|
subscriptions_setup_complete_time.timestamp()
|
|
)
|
|
while True:
|
|
events: list[Event] = [await stream_queue.get()]
|
|
# If the event is older than the last db
|
|
# event we already sent it so we skip it.
|
|
if events[0].time_fired_timestamp <= subscriptions_setup_complete_timestamp:
|
|
continue
|
|
# We sleep for the EVENT_COALESCE_TIME so
|
|
# we can group events together to minimize
|
|
# the number of websocket messages when the
|
|
# system is overloaded with an event storm
|
|
await asyncio.sleep(EVENT_COALESCE_TIME)
|
|
while not stream_queue.empty():
|
|
events.append(stream_queue.get_nowait())
|
|
|
|
if history_states := _events_to_compressed_states(events, no_attributes):
|
|
connection.send_message(
|
|
json_bytes(
|
|
messages.event_message(
|
|
msg_id,
|
|
{"states": history_states},
|
|
)
|
|
)
|
|
)
|
|
|
|
|
|
@callback
|
|
def _async_subscribe_events(
|
|
hass: HomeAssistant,
|
|
subscriptions: list[CALLBACK_TYPE],
|
|
target: Callable[[Event[Any]], None],
|
|
entity_ids: list[str],
|
|
significant_changes_only: bool,
|
|
minimal_response: bool,
|
|
) -> None:
|
|
"""Subscribe to events for the entities and devices or all.
|
|
|
|
These are the events we need to listen for to do
|
|
the live history stream.
|
|
"""
|
|
assert is_callback(target), "target must be a callback"
|
|
|
|
@callback
|
|
def _forward_state_events_filtered(event: Event[EventStateChangedData]) -> None:
|
|
"""Filter state events and forward them."""
|
|
if (new_state := event.data["new_state"]) is None or (
|
|
old_state := event.data["old_state"]
|
|
) is None:
|
|
return
|
|
if (
|
|
(significant_changes_only or minimal_response)
|
|
and new_state.state == old_state.state
|
|
and new_state.domain not in history.SIGNIFICANT_DOMAINS
|
|
):
|
|
return
|
|
target(event)
|
|
|
|
subscriptions.append(
|
|
async_track_state_change_event(hass, entity_ids, _forward_state_events_filtered)
|
|
)
|
|
|
|
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "history/stream",
|
|
vol.Required("start_time"): str,
|
|
vol.Optional("end_time"): str,
|
|
vol.Required("entity_ids"): [str],
|
|
vol.Optional("include_start_time_state", default=True): bool,
|
|
vol.Optional("significant_changes_only", default=True): bool,
|
|
vol.Optional("minimal_response", default=False): bool,
|
|
vol.Optional("no_attributes", default=False): bool,
|
|
}
|
|
)
|
|
@websocket_api.async_response
|
|
async def ws_stream(
|
|
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
|
|
) -> None:
|
|
"""Handle history stream websocket command."""
|
|
start_time_str = msg["start_time"]
|
|
msg_id: int = msg["id"]
|
|
utc_now = dt_util.utcnow()
|
|
|
|
if start_time := dt_util.parse_datetime(start_time_str):
|
|
start_time = dt_util.as_utc(start_time)
|
|
|
|
if not start_time or start_time > utc_now:
|
|
connection.send_error(msg_id, "invalid_start_time", "Invalid start_time")
|
|
return
|
|
|
|
end_time_str = msg.get("end_time")
|
|
end_time: dt | None = None
|
|
if end_time_str:
|
|
if not (end_time := dt_util.parse_datetime(end_time_str)):
|
|
connection.send_error(msg_id, "invalid_end_time", "Invalid end_time")
|
|
return
|
|
end_time = dt_util.as_utc(end_time)
|
|
if end_time < start_time:
|
|
connection.send_error(msg_id, "invalid_end_time", "Invalid end_time")
|
|
return
|
|
|
|
entity_ids: list[str] = msg["entity_ids"]
|
|
for entity_id in entity_ids:
|
|
if not hass.states.get(entity_id) and not valid_entity_id(entity_id):
|
|
connection.send_error(msg["id"], "invalid_entity_ids", "Invalid entity_ids")
|
|
return
|
|
|
|
include_start_time_state = msg["include_start_time_state"]
|
|
significant_changes_only = msg["significant_changes_only"]
|
|
no_attributes = msg["no_attributes"]
|
|
minimal_response = msg["minimal_response"]
|
|
|
|
if end_time and end_time <= utc_now:
|
|
if (
|
|
not include_start_time_state
|
|
and entity_ids
|
|
and not entities_may_have_state_changes_after(
|
|
hass, entity_ids, start_time, no_attributes
|
|
)
|
|
):
|
|
_async_send_empty_response(connection, msg_id, start_time, end_time)
|
|
return
|
|
|
|
connection.subscriptions[msg_id] = callback(lambda: None)
|
|
connection.send_result(msg_id)
|
|
await _async_send_historical_states(
|
|
hass,
|
|
connection,
|
|
msg_id,
|
|
start_time,
|
|
end_time,
|
|
entity_ids,
|
|
include_start_time_state,
|
|
significant_changes_only,
|
|
minimal_response,
|
|
no_attributes,
|
|
True,
|
|
)
|
|
return
|
|
|
|
subscriptions: list[CALLBACK_TYPE] = []
|
|
stream_queue: asyncio.Queue[Event] = asyncio.Queue(MAX_PENDING_HISTORY_STATES)
|
|
live_stream = HistoryLiveStream(
|
|
subscriptions=subscriptions, stream_queue=stream_queue
|
|
)
|
|
|
|
@callback
|
|
def _unsub(*_utc_time: Any) -> None:
|
|
"""Unsubscribe from all events."""
|
|
for subscription in subscriptions:
|
|
subscription()
|
|
subscriptions.clear()
|
|
if live_stream.task:
|
|
live_stream.task.cancel()
|
|
if live_stream.wait_sync_task:
|
|
live_stream.wait_sync_task.cancel()
|
|
if live_stream.end_time_unsub:
|
|
live_stream.end_time_unsub()
|
|
live_stream.end_time_unsub = None
|
|
|
|
if end_time:
|
|
live_stream.end_time_unsub = async_track_point_in_utc_time(
|
|
hass, _unsub, end_time
|
|
)
|
|
|
|
@callback
|
|
def _queue_or_cancel(event: Event) -> None:
|
|
"""Queue an event to be processed or cancel."""
|
|
try:
|
|
stream_queue.put_nowait(event)
|
|
except asyncio.QueueFull:
|
|
_LOGGER.debug(
|
|
"Client exceeded max pending messages of %s",
|
|
MAX_PENDING_HISTORY_STATES,
|
|
)
|
|
_unsub()
|
|
|
|
_async_subscribe_events(
|
|
hass,
|
|
subscriptions,
|
|
_queue_or_cancel,
|
|
entity_ids,
|
|
significant_changes_only=significant_changes_only,
|
|
minimal_response=minimal_response,
|
|
)
|
|
subscriptions_setup_complete_time = dt_util.utcnow()
|
|
connection.subscriptions[msg_id] = _unsub
|
|
connection.send_result(msg_id)
|
|
# Fetch everything from history
|
|
last_event_time = await _async_send_historical_states(
|
|
hass,
|
|
connection,
|
|
msg_id,
|
|
start_time,
|
|
subscriptions_setup_complete_time,
|
|
entity_ids,
|
|
include_start_time_state,
|
|
significant_changes_only,
|
|
minimal_response,
|
|
no_attributes,
|
|
True,
|
|
)
|
|
|
|
if msg_id not in connection.subscriptions:
|
|
# Unsubscribe happened while sending historical states
|
|
return
|
|
|
|
live_stream.task = create_eager_task(
|
|
_async_events_consumer(
|
|
subscriptions_setup_complete_time,
|
|
connection,
|
|
msg_id,
|
|
stream_queue,
|
|
no_attributes,
|
|
)
|
|
)
|
|
|
|
live_stream.wait_sync_task = create_eager_task(
|
|
get_instance(hass).async_block_till_done()
|
|
)
|
|
await live_stream.wait_sync_task
|
|
|
|
#
|
|
# Fetch any states from the database that have
|
|
# not been committed since the original fetch
|
|
# so we can switch over to using the subscriptions
|
|
#
|
|
# We only want states that happened after the last state
|
|
# we had from the last database query
|
|
#
|
|
await _async_send_historical_states(
|
|
hass,
|
|
connection,
|
|
msg_id,
|
|
# Add one microsecond so we are outside the window of
|
|
# the last event we got from the database since otherwise
|
|
# we could fetch the same event twice
|
|
(last_event_time or start_time) + timedelta(microseconds=1),
|
|
subscriptions_setup_complete_time,
|
|
entity_ids,
|
|
False, # We don't want the start time state again
|
|
significant_changes_only,
|
|
minimal_response,
|
|
no_attributes,
|
|
send_empty=not last_event_time,
|
|
)
|