core/homeassistant/components/stream/core.py

544 lines
19 KiB
Python

"""Provides core stream functionality."""
from __future__ import annotations
import asyncio
from collections import deque
from collections.abc import Callable, Coroutine, Iterable
from dataclasses import dataclass, field
import datetime
from enum import IntEnum
import logging
from typing import TYPE_CHECKING, Any, cast
from aiohttp import web
import numpy as np
from homeassistant.components.http import KEY_HASS, HomeAssistantView
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.helpers.event import async_call_later
from homeassistant.util.decorator import Registry
from .const import (
ATTR_STREAMS,
DOMAIN,
SEGMENT_DURATION_ADJUSTER,
TARGET_SEGMENT_DURATION_NON_LL_HLS,
)
if TYPE_CHECKING:
from av import Packet, VideoCodecContext
from homeassistant.components.camera import DynamicStreamSettings
from . import Stream
_LOGGER = logging.getLogger(__name__)
PROVIDERS: Registry[str, type[StreamOutput]] = Registry()
class Orientation(IntEnum):
"""Orientations for stream transforms. These are based on EXIF orientation tags."""
NO_TRANSFORM = 1
MIRROR = 2
ROTATE_180 = 3
FLIP = 4
ROTATE_LEFT_AND_FLIP = 5
ROTATE_LEFT = 6
ROTATE_RIGHT_AND_FLIP = 7
ROTATE_RIGHT = 8
@dataclass(slots=True)
class StreamSettings:
"""Stream settings."""
ll_hls: bool
min_segment_duration: float
part_target_duration: float
hls_advance_part_limit: int
hls_part_timeout: float
STREAM_SETTINGS_NON_LL_HLS = StreamSettings(
ll_hls=False,
min_segment_duration=TARGET_SEGMENT_DURATION_NON_LL_HLS - SEGMENT_DURATION_ADJUSTER,
part_target_duration=TARGET_SEGMENT_DURATION_NON_LL_HLS,
hls_advance_part_limit=3,
hls_part_timeout=TARGET_SEGMENT_DURATION_NON_LL_HLS,
)
@dataclass(slots=True)
class Part:
"""Represent a segment part."""
duration: float
has_keyframe: bool
# video data (moof+mdat)
data: bytes
@dataclass(slots=True)
class Segment:
"""Represent a segment."""
sequence: int
# the init of the mp4 the segment is based on
init: bytes
# For detecting discontinuities across stream restarts
stream_id: int
start_time: datetime.datetime
_stream_outputs: Iterable[StreamOutput]
duration: float = 0
parts: list[Part] = field(default_factory=list)
# Store text of this segment's hls playlist for reuse
# Use list[str] for easy appends
hls_playlist_template: list[str] = field(default_factory=list)
hls_playlist_parts: list[str] = field(default_factory=list)
# Number of playlist parts rendered so far
hls_num_parts_rendered: int = 0
# Set to true when all the parts are rendered
hls_playlist_complete: bool = False
def __post_init__(self) -> None:
"""Run after init."""
for output in self._stream_outputs:
output.put(self)
@property
def complete(self) -> bool:
"""Return whether the Segment is complete."""
return self.duration > 0
@property
def data_size_with_init(self) -> int:
"""Return the size of all part data + init in bytes."""
return len(self.init) + self.data_size
@property
def data_size(self) -> int:
"""Return the size of all part data without init in bytes."""
return sum(len(part.data) for part in self.parts)
@callback
def async_add_part(
self,
part: Part,
duration: float,
) -> None:
"""Add a part to the Segment.
Duration is non zero only for the last part.
"""
self.parts.append(part)
self.duration = duration
for output in self._stream_outputs:
output.part_put()
def get_data(self) -> bytes:
"""Return reconstructed data for all parts as bytes, without init."""
return b"".join([part.data for part in self.parts])
def _render_hls_template(self, last_stream_id: int, render_parts: bool) -> str:
"""Render the HLS playlist section for the Segment.
The Segment may still be in progress.
This method stores intermediate data in hls_playlist_parts,
hls_num_parts_rendered, and hls_playlist_complete to avoid redoing
work on subsequent calls.
"""
if self.hls_playlist_complete:
return self.hls_playlist_template[0]
if not self.hls_playlist_template:
# Logically EXT-X-DISCONTINUITY makes sense above the parts, but Apple's
# media stream validator seems to only want it before the segment
if last_stream_id != self.stream_id:
self.hls_playlist_template.append("#EXT-X-DISCONTINUITY")
# This is a placeholder where the rendered parts will be inserted
self.hls_playlist_template.append("{}")
if render_parts:
for part_num, part in enumerate(
self.parts[self.hls_num_parts_rendered :], self.hls_num_parts_rendered
):
self.hls_playlist_parts.append(
f"#EXT-X-PART:DURATION={part.duration:.3f},URI="
f'"./segment/{self.sequence}.{part_num}.m4s"'
f'{",INDEPENDENT=YES" if part.has_keyframe else ""}'
)
if self.complete:
# Construct the final playlist_template. The placeholder will share a
# line with the first element to avoid an extra newline when we don't
# render any parts. Append an empty string to create a trailing newline
# when we do render parts
self.hls_playlist_parts.append("")
self.hls_playlist_template = (
[] if last_stream_id == self.stream_id else ["#EXT-X-DISCONTINUITY"]
)
# Add the remaining segment metadata
# The placeholder goes on the same line as the next element
self.hls_playlist_template.extend(
[
"{}#EXT-X-PROGRAM-DATE-TIME:"
+ self.start_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
+ "Z",
f"#EXTINF:{self.duration:.3f},\n./segment/{self.sequence}.m4s",
]
)
# Store intermediate playlist data in member variables for reuse
self.hls_playlist_template = ["\n".join(self.hls_playlist_template)]
# lstrip discards extra preceding newline in case first render was empty
self.hls_playlist_parts = ["\n".join(self.hls_playlist_parts).lstrip()]
self.hls_num_parts_rendered = len(self.parts)
self.hls_playlist_complete = self.complete
return self.hls_playlist_template[0]
def render_hls(
self, last_stream_id: int, render_parts: bool, add_hint: bool
) -> str:
"""Render the HLS playlist section for the Segment including a hint if requested."""
playlist_template = self._render_hls_template(last_stream_id, render_parts)
playlist = playlist_template.format(
self.hls_playlist_parts[0] if render_parts else ""
)
if not add_hint:
return playlist
# Preload hints help save round trips by informing the client about the
# next part. The next part will usually be in this segment but will be
# first part of the next segment if this segment is already complete.
if self.complete: # Next part belongs to next segment
sequence = self.sequence + 1
part_num = 0
else: # Next part is in the same segment
sequence = self.sequence
part_num = len(self.parts)
hint = (
f'#EXT-X-PRELOAD-HINT:TYPE=PART,URI="./segment/{sequence}.{part_num}.m4s"'
)
return (playlist + "\n" + hint) if playlist else hint
class IdleTimer:
"""Invoke a callback after an inactivity timeout.
The IdleTimer invokes the callback after some timeout has passed. The awake() method
resets the internal alarm, extending the inactivity time.
"""
def __init__(
self,
hass: HomeAssistant,
timeout: int,
idle_callback: Callable[[], Coroutine[Any, Any, None]],
) -> None:
"""Initialize IdleTimer."""
self._hass = hass
self._timeout = timeout
self._callback = idle_callback
self._unsub: CALLBACK_TYPE | None = None
self.idle = False
def start(self) -> None:
"""Start the idle timer if not already started."""
self.idle = False
if self._unsub is None:
self._unsub = async_call_later(self._hass, self._timeout, self.fire)
def awake(self) -> None:
"""Keep the idle time alive by resetting the timeout."""
self.idle = False
# Reset idle timeout
self.clear()
self._unsub = async_call_later(self._hass, self._timeout, self.fire)
def clear(self) -> None:
"""Clear and disable the timer if it has not already fired."""
if self._unsub is not None:
self._unsub()
@callback
def fire(self, _now: datetime.datetime) -> None:
"""Invoke the idle timeout callback, called when the alarm fires."""
self.idle = True
self._unsub = None
self._hass.async_create_task(self._callback())
class StreamOutput:
"""Represents a stream output."""
def __init__(
self,
hass: HomeAssistant,
idle_timer: IdleTimer,
stream_settings: StreamSettings,
dynamic_stream_settings: DynamicStreamSettings,
deque_maxlen: int | None = None,
) -> None:
"""Initialize a stream output."""
self._hass = hass
self.idle_timer = idle_timer
self.stream_settings = stream_settings
self.dynamic_stream_settings = dynamic_stream_settings
self._event = asyncio.Event()
self._part_event = asyncio.Event()
self._segments: deque[Segment] = deque(maxlen=deque_maxlen)
@property
def name(self) -> str | None:
"""Return provider name."""
return None
@property
def idle(self) -> bool:
"""Return True if the output is idle."""
return self.idle_timer.idle
@property
def last_sequence(self) -> int:
"""Return the last sequence number without iterating."""
if self._segments:
return self._segments[-1].sequence
return -1
@property
def sequences(self) -> list[int]:
"""Return current sequence from segments."""
return [s.sequence for s in self._segments]
@property
def last_segment(self) -> Segment | None:
"""Return the last segment without iterating."""
if self._segments:
return self._segments[-1]
return None
def get_segment(self, sequence: int) -> Segment | None:
"""Retrieve a specific segment."""
# Most hits will come in the most recent segments, so iterate reversed
for segment in reversed(self._segments):
if segment.sequence == sequence:
return segment
return None
def get_segments(self) -> deque[Segment]:
"""Retrieve all segments."""
return self._segments
async def part_recv(self, timeout: float | None = None) -> bool:
"""Wait for an event signalling the latest part segment."""
try:
async with asyncio.timeout(timeout):
await self._part_event.wait()
except TimeoutError:
return False
return True
def part_put(self) -> None:
"""Set event signalling the latest part segment."""
# Start idle timeout when we start receiving data
self._part_event.set()
self._part_event.clear()
async def recv(self) -> bool:
"""Wait for the latest segment."""
await self._event.wait()
return self.last_segment is not None
def put(self, segment: Segment) -> None:
"""Store output."""
self._hass.loop.call_soon_threadsafe(self._async_put, segment)
@callback
def _async_put(self, segment: Segment) -> None:
"""Store output from event loop."""
# Start idle timeout when we start receiving data
self.idle_timer.start()
self._segments.append(segment)
self._event.set()
self._event.clear()
def cleanup(self) -> None:
"""Handle cleanup."""
self._event.set()
self.idle_timer.clear()
class StreamView(HomeAssistantView):
"""Base StreamView.
For implementation of a new stream format, define `url` and `name`
attributes, and implement `handle` method in a child class.
"""
requires_auth = False
async def get(
self, request: web.Request, token: str, sequence: str = "", part_num: str = ""
) -> web.StreamResponse:
"""Start a GET request."""
hass = request.app[KEY_HASS]
stream = next(
(s for s in hass.data[DOMAIN][ATTR_STREAMS] if s.access_token == token),
None,
)
if not stream:
raise web.HTTPNotFound
# Start worker if not already started
await stream.start()
return await self.handle(request, stream, sequence, part_num)
async def handle(
self, request: web.Request, stream: Stream, sequence: str, part_num: str
) -> web.StreamResponse:
"""Handle the stream request."""
raise NotImplementedError
TRANSFORM_IMAGE_FUNCTION = (
lambda image: image, # Unused
lambda image: image, # No transform
lambda image: np.fliplr(image).copy(), # Mirror
lambda image: np.rot90(image, 2).copy(), # Rotate 180
lambda image: np.flipud(image).copy(), # Flip
lambda image: np.flipud(np.rot90(image)).copy(), # Rotate left and flip
lambda image: np.rot90(image).copy(), # Rotate left
lambda image: np.flipud(np.rot90(image, -1)).copy(), # Rotate right and flip
lambda image: np.rot90(image, -1).copy(), # Rotate right
)
class KeyFrameConverter:
"""Enables generating and getting an image from the last keyframe seen in the stream.
An overview of the thread and state interaction:
the worker thread sets a packet
get_image is called from the main asyncio loop
get_image schedules _generate_image in an executor thread
_generate_image will try to create an image from the packet
_generate_image will clear the packet, so there will only be one attempt per packet
If successful, self._image will be updated and returned by get_image
If unsuccessful, get_image will return the previous image
"""
def __init__(
self,
hass: HomeAssistant,
stream_settings: StreamSettings,
dynamic_stream_settings: DynamicStreamSettings,
) -> None:
"""Initialize."""
# Keep import here so that we can import stream integration
# without installing reqs
# pylint: disable-next=import-outside-toplevel
from homeassistant.components.camera.img_util import TurboJPEGSingleton
self._packet: Packet | None = None
self._event: asyncio.Event = asyncio.Event()
self._hass = hass
self._image: bytes | None = None
self._turbojpeg = TurboJPEGSingleton.instance()
self._lock = asyncio.Lock()
self._codec_context: VideoCodecContext | None = None
self._stream_settings = stream_settings
self._dynamic_stream_settings = dynamic_stream_settings
def stash_keyframe_packet(self, packet: Packet) -> None:
"""Store the keyframe and set the asyncio.Event from the event loop.
This is called from the worker thread.
"""
self._packet = packet
self._hass.loop.call_soon_threadsafe(self._event.set)
def create_codec_context(self, codec_context: VideoCodecContext) -> None:
"""Create a codec context to be used for decoding the keyframes.
This is run by the worker thread and will only be called once per worker.
"""
if self._codec_context:
return
# Keep import here so that we can import stream integration without
# installing reqs
# pylint: disable-next=import-outside-toplevel
from av import CodecContext
self._codec_context = cast(
"VideoCodecContext", CodecContext.create(codec_context.name, "r")
)
self._codec_context.extradata = codec_context.extradata
self._codec_context.skip_frame = "NONKEY"
self._codec_context.thread_type = "NONE"
@staticmethod
def transform_image(image: np.ndarray, orientation: int) -> np.ndarray:
"""Transform image to a given orientation."""
return TRANSFORM_IMAGE_FUNCTION[orientation](image)
def _generate_image(self, width: int | None, height: int | None) -> None:
"""Generate the keyframe image.
This is run in an executor thread, but since it is called within an
the asyncio lock from the main thread, there will only be one entry
at a time per instance.
"""
if not (self._turbojpeg and self._packet and self._codec_context):
return
packet = self._packet
self._packet = None
for _ in range(2): # Retry once if codec context needs to be flushed
try:
# decode packet (flush afterwards)
frames = self._codec_context.decode(packet)
for _i in range(2):
if frames:
break
frames = self._codec_context.decode(None)
break
except EOFError:
_LOGGER.debug("Codec context needs flushing")
self._codec_context.flush_buffers()
else:
_LOGGER.debug("Unable to decode keyframe")
return
if frames:
frame = frames[0]
if width and height:
if self._dynamic_stream_settings.orientation >= 5:
frame = frame.reformat(width=height, height=width)
else:
frame = frame.reformat(width=width, height=height)
bgr_array = self.transform_image(
frame.to_ndarray(format="bgr24"),
self._dynamic_stream_settings.orientation,
)
self._image = bytes(self._turbojpeg.encode(bgr_array))
async def async_get_image(
self,
width: int | None = None,
height: int | None = None,
wait_for_next_keyframe: bool = False,
) -> bytes | None:
"""Fetch an image from the Stream and return it as a jpeg in bytes."""
# Use a lock to ensure only one thread is working on the keyframe at a time
if wait_for_next_keyframe:
self._event.clear()
await self._event.wait()
async with self._lock:
await self._hass.async_add_executor_job(self._generate_image, width, height)
return self._image