mirror of https://github.com/home-assistant/core
195 lines
6.6 KiB
Python
195 lines
6.6 KiB
Python
"""Component providing support to the Ring Door Bell camera."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import timedelta
|
|
import logging
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
from aiohttp import web
|
|
from haffmpeg.camera import CameraMjpeg
|
|
from ring_doorbell import RingDoorBell
|
|
|
|
from homeassistant.components import ffmpeg
|
|
from homeassistant.components.camera import Camera
|
|
from homeassistant.core import HomeAssistant, callback
|
|
from homeassistant.helpers.aiohttp_client import async_aiohttp_proxy_stream
|
|
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
|
from homeassistant.util import dt as dt_util
|
|
|
|
from . import RingConfigEntry
|
|
from .coordinator import RingDataCoordinator
|
|
from .entity import RingEntity, exception_wrap
|
|
|
|
FORCE_REFRESH_INTERVAL = timedelta(minutes=3)
|
|
MOTION_DETECTION_CAPABILITY = "motion_detection"
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
async def async_setup_entry(
|
|
hass: HomeAssistant,
|
|
entry: RingConfigEntry,
|
|
async_add_entities: AddEntitiesCallback,
|
|
) -> None:
|
|
"""Set up a Ring Door Bell and StickUp Camera."""
|
|
ring_data = entry.runtime_data
|
|
devices_coordinator = ring_data.devices_coordinator
|
|
ffmpeg_manager = ffmpeg.get_ffmpeg_manager(hass)
|
|
|
|
cams = [
|
|
RingCam(camera, devices_coordinator, ffmpeg_manager)
|
|
for camera in ring_data.devices.video_devices
|
|
if camera.has_subscription
|
|
]
|
|
|
|
async_add_entities(cams)
|
|
|
|
|
|
class RingCam(RingEntity[RingDoorBell], Camera):
|
|
"""An implementation of a Ring Door Bell camera."""
|
|
|
|
_attr_name = None
|
|
|
|
def __init__(
|
|
self,
|
|
device: RingDoorBell,
|
|
coordinator: RingDataCoordinator,
|
|
ffmpeg_manager: ffmpeg.FFmpegManager,
|
|
) -> None:
|
|
"""Initialize a Ring Door Bell camera."""
|
|
super().__init__(device, coordinator)
|
|
Camera.__init__(self)
|
|
self._ffmpeg_manager = ffmpeg_manager
|
|
self._last_event: dict[str, Any] | None = None
|
|
self._last_video_id: int | None = None
|
|
self._video_url: str | None = None
|
|
self._image: bytes | None = None
|
|
self._expires_at = dt_util.utcnow() - FORCE_REFRESH_INTERVAL
|
|
self._attr_unique_id = str(device.id)
|
|
if device.has_capability(MOTION_DETECTION_CAPABILITY):
|
|
self._attr_motion_detection_enabled = device.motion_detection
|
|
|
|
@callback
|
|
def _handle_coordinator_update(self) -> None:
|
|
"""Call update method."""
|
|
self._device = self._get_coordinator_data().get_video_device(
|
|
self._device.device_api_id
|
|
)
|
|
history_data = self._device.last_history
|
|
if history_data:
|
|
self._last_event = history_data[0]
|
|
# will call async_update to update the attributes and get the
|
|
# video url from the api
|
|
self.async_schedule_update_ha_state(True)
|
|
else:
|
|
self._last_event = None
|
|
self._last_video_id = None
|
|
self._video_url = None
|
|
self._image = None
|
|
self._expires_at = dt_util.utcnow()
|
|
self.async_write_ha_state()
|
|
|
|
@property
|
|
def extra_state_attributes(self) -> dict[str, Any]:
|
|
"""Return the state attributes."""
|
|
return {
|
|
"video_url": self._video_url,
|
|
"last_video_id": self._last_video_id,
|
|
}
|
|
|
|
async def async_camera_image(
|
|
self, width: int | None = None, height: int | None = None
|
|
) -> bytes | None:
|
|
"""Return a still image response from the camera."""
|
|
if self._image is None and self._video_url is not None:
|
|
image = await ffmpeg.async_get_image(
|
|
self.hass,
|
|
self._video_url,
|
|
width=width,
|
|
height=height,
|
|
)
|
|
|
|
if image:
|
|
self._image = image
|
|
|
|
return self._image
|
|
|
|
async def handle_async_mjpeg_stream(
|
|
self, request: web.Request
|
|
) -> web.StreamResponse | None:
|
|
"""Generate an HTTP MJPEG stream from the camera."""
|
|
if self._video_url is None:
|
|
return None
|
|
|
|
stream = CameraMjpeg(self._ffmpeg_manager.binary)
|
|
await stream.open_camera(self._video_url)
|
|
|
|
try:
|
|
stream_reader = await stream.get_reader()
|
|
return await async_aiohttp_proxy_stream(
|
|
self.hass,
|
|
request,
|
|
stream_reader,
|
|
self._ffmpeg_manager.ffmpeg_stream_content_type,
|
|
)
|
|
finally:
|
|
await stream.close()
|
|
|
|
async def async_update(self) -> None:
|
|
"""Update camera entity and refresh attributes."""
|
|
if (
|
|
self._device.has_capability(MOTION_DETECTION_CAPABILITY)
|
|
and self._attr_motion_detection_enabled != self._device.motion_detection
|
|
):
|
|
self._attr_motion_detection_enabled = self._device.motion_detection
|
|
self.async_write_ha_state()
|
|
|
|
if TYPE_CHECKING:
|
|
# _last_event is set before calling update so will never be None
|
|
assert self._last_event
|
|
|
|
if self._last_event["recording"]["status"] != "ready":
|
|
return
|
|
|
|
utcnow = dt_util.utcnow()
|
|
if self._last_video_id == self._last_event["id"] and utcnow <= self._expires_at:
|
|
return
|
|
|
|
if self._last_video_id != self._last_event["id"]:
|
|
self._image = None
|
|
|
|
self._video_url = await self._async_get_video()
|
|
|
|
self._last_video_id = self._last_event["id"]
|
|
self._expires_at = FORCE_REFRESH_INTERVAL + utcnow
|
|
|
|
@exception_wrap
|
|
async def _async_get_video(self) -> str | None:
|
|
if TYPE_CHECKING:
|
|
# _last_event is set before calling update so will never be None
|
|
assert self._last_event
|
|
event_id = self._last_event.get("id")
|
|
assert event_id and isinstance(event_id, int)
|
|
return await self._device.async_recording_url(event_id)
|
|
|
|
@exception_wrap
|
|
async def _async_set_motion_detection_enabled(self, new_state: bool) -> None:
|
|
if not self._device.has_capability(MOTION_DETECTION_CAPABILITY):
|
|
_LOGGER.error(
|
|
"Entity %s does not have motion detection capability", self.entity_id
|
|
)
|
|
return
|
|
|
|
await self._device.async_set_motion_detection(new_state)
|
|
self._attr_motion_detection_enabled = new_state
|
|
self.async_write_ha_state()
|
|
|
|
async def async_enable_motion_detection(self) -> None:
|
|
"""Enable motion detection in the camera."""
|
|
await self._async_set_motion_detection_enabled(True)
|
|
|
|
async def async_disable_motion_detection(self) -> None:
|
|
"""Disable motion detection in camera."""
|
|
await self._async_set_motion_detection_enabled(False)
|