mirror of https://github.com/home-assistant/core
258 lines
8.3 KiB
Python
258 lines
8.3 KiB
Python
"""Support for ESPHome media players."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from functools import partial
|
|
import logging
|
|
from typing import Any, cast
|
|
from urllib.parse import urlparse
|
|
|
|
from aioesphomeapi import (
|
|
EntityInfo,
|
|
MediaPlayerCommand,
|
|
MediaPlayerEntityState,
|
|
MediaPlayerFormatPurpose,
|
|
MediaPlayerInfo,
|
|
MediaPlayerState as EspMediaPlayerState,
|
|
MediaPlayerSupportedFormat,
|
|
)
|
|
|
|
from homeassistant.components import media_source
|
|
from homeassistant.components.media_player import (
|
|
ATTR_MEDIA_ANNOUNCE,
|
|
ATTR_MEDIA_EXTRA,
|
|
BrowseMedia,
|
|
MediaPlayerDeviceClass,
|
|
MediaPlayerEntity,
|
|
MediaPlayerEntityFeature,
|
|
MediaPlayerState,
|
|
MediaType,
|
|
async_process_play_media_url,
|
|
)
|
|
from homeassistant.core import callback
|
|
|
|
from .entity import (
|
|
EsphomeEntity,
|
|
convert_api_error_ha_error,
|
|
esphome_float_state_property,
|
|
esphome_state_property,
|
|
platform_async_setup_entry,
|
|
)
|
|
from .enum_mapper import EsphomeEnumMapper
|
|
from .ffmpeg_proxy import async_create_proxy_url
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
_STATES: EsphomeEnumMapper[EspMediaPlayerState, MediaPlayerState] = EsphomeEnumMapper(
|
|
{
|
|
EspMediaPlayerState.IDLE: MediaPlayerState.IDLE,
|
|
EspMediaPlayerState.PLAYING: MediaPlayerState.PLAYING,
|
|
EspMediaPlayerState.PAUSED: MediaPlayerState.PAUSED,
|
|
}
|
|
)
|
|
|
|
ATTR_BYPASS_PROXY = "bypass_proxy"
|
|
|
|
|
|
class EsphomeMediaPlayer(
|
|
EsphomeEntity[MediaPlayerInfo, MediaPlayerEntityState], MediaPlayerEntity
|
|
):
|
|
"""A media player implementation for esphome."""
|
|
|
|
_attr_device_class = MediaPlayerDeviceClass.SPEAKER
|
|
|
|
@callback
|
|
def _on_static_info_update(self, static_info: EntityInfo) -> None:
|
|
"""Set attrs from static info."""
|
|
super()._on_static_info_update(static_info)
|
|
flags = (
|
|
MediaPlayerEntityFeature.PLAY_MEDIA
|
|
| MediaPlayerEntityFeature.BROWSE_MEDIA
|
|
| MediaPlayerEntityFeature.STOP
|
|
| MediaPlayerEntityFeature.VOLUME_SET
|
|
| MediaPlayerEntityFeature.VOLUME_MUTE
|
|
| MediaPlayerEntityFeature.MEDIA_ANNOUNCE
|
|
)
|
|
if self._static_info.supports_pause:
|
|
flags |= MediaPlayerEntityFeature.PAUSE | MediaPlayerEntityFeature.PLAY
|
|
self._attr_supported_features = flags
|
|
self._entry_data.media_player_formats[static_info.unique_id] = cast(
|
|
MediaPlayerInfo, static_info
|
|
).supported_formats
|
|
|
|
@property
|
|
@esphome_state_property
|
|
def state(self) -> MediaPlayerState | None:
|
|
"""Return current state."""
|
|
return _STATES.from_esphome(self._state.state)
|
|
|
|
@property
|
|
@esphome_state_property
|
|
def is_volume_muted(self) -> bool:
|
|
"""Return true if volume is muted."""
|
|
return self._state.muted
|
|
|
|
@property
|
|
@esphome_float_state_property
|
|
def volume_level(self) -> float | None:
|
|
"""Volume level of the media player (0..1)."""
|
|
return self._state.volume
|
|
|
|
@convert_api_error_ha_error
|
|
async def async_play_media(
|
|
self, media_type: MediaType | str, media_id: str, **kwargs: Any
|
|
) -> None:
|
|
"""Send the play command with media url to the media player."""
|
|
if media_source.is_media_source_id(media_id):
|
|
sourced_media = await media_source.async_resolve_media(
|
|
self.hass, media_id, self.entity_id
|
|
)
|
|
media_id = sourced_media.url
|
|
|
|
media_id = async_process_play_media_url(self.hass, media_id)
|
|
announcement = kwargs.get(ATTR_MEDIA_ANNOUNCE)
|
|
bypass_proxy = kwargs.get(ATTR_MEDIA_EXTRA, {}).get(ATTR_BYPASS_PROXY)
|
|
|
|
supported_formats: list[MediaPlayerSupportedFormat] | None = (
|
|
self._entry_data.media_player_formats.get(self._static_info.unique_id)
|
|
)
|
|
|
|
if (
|
|
not bypass_proxy
|
|
and supported_formats
|
|
and _is_url(media_id)
|
|
and (
|
|
proxy_url := self._get_proxy_url(
|
|
supported_formats, media_id, announcement is True
|
|
)
|
|
)
|
|
):
|
|
# Substitute proxy URL
|
|
media_id = proxy_url
|
|
|
|
self._client.media_player_command(
|
|
self._key, media_url=media_id, announcement=announcement
|
|
)
|
|
|
|
async def async_will_remove_from_hass(self) -> None:
|
|
"""Handle entity being removed."""
|
|
await super().async_will_remove_from_hass()
|
|
self._entry_data.media_player_formats.pop(self.entity_id, None)
|
|
|
|
def _get_proxy_url(
|
|
self,
|
|
supported_formats: list[MediaPlayerSupportedFormat],
|
|
url: str,
|
|
announcement: bool,
|
|
) -> str | None:
|
|
"""Get URL for ffmpeg proxy."""
|
|
if self.device_entry is None:
|
|
# Device id is required
|
|
return None
|
|
|
|
# Choose the first default or announcement supported format
|
|
format_to_use: MediaPlayerSupportedFormat | None = None
|
|
for supported_format in supported_formats:
|
|
if (format_to_use is None) and (
|
|
supported_format.purpose == MediaPlayerFormatPurpose.DEFAULT
|
|
):
|
|
# First default format
|
|
format_to_use = supported_format
|
|
elif announcement and (
|
|
supported_format.purpose == MediaPlayerFormatPurpose.ANNOUNCEMENT
|
|
):
|
|
# First announcement format
|
|
format_to_use = supported_format
|
|
break
|
|
|
|
if format_to_use is None:
|
|
# No format for conversion
|
|
return None
|
|
|
|
# Replace the media URL with a proxy URL pointing to Home
|
|
# Assistant. When requested, Home Assistant will use ffmpeg to
|
|
# convert the source URL to the supported format.
|
|
_LOGGER.debug("Proxying media url %s with format %s", url, format_to_use)
|
|
device_id = self.device_entry.id
|
|
media_format = format_to_use.format
|
|
|
|
# 0 = None
|
|
rate: int | None = None
|
|
channels: int | None = None
|
|
width: int | None = None
|
|
if format_to_use.sample_rate > 0:
|
|
rate = format_to_use.sample_rate
|
|
|
|
if format_to_use.num_channels > 0:
|
|
channels = format_to_use.num_channels
|
|
|
|
if format_to_use.sample_bytes > 0:
|
|
width = format_to_use.sample_bytes
|
|
|
|
proxy_url = async_create_proxy_url(
|
|
self.hass,
|
|
device_id,
|
|
url,
|
|
media_format=media_format,
|
|
rate=rate,
|
|
channels=channels,
|
|
width=width,
|
|
)
|
|
|
|
# Resolve URL
|
|
return async_process_play_media_url(self.hass, proxy_url)
|
|
|
|
async def async_browse_media(
|
|
self,
|
|
media_content_type: MediaType | str | None = None,
|
|
media_content_id: str | None = None,
|
|
) -> BrowseMedia:
|
|
"""Implement the websocket media browsing helper."""
|
|
return await media_source.async_browse_media(
|
|
self.hass,
|
|
media_content_id,
|
|
content_filter=lambda item: item.media_content_type.startswith("audio/"),
|
|
)
|
|
|
|
@convert_api_error_ha_error
|
|
async def async_set_volume_level(self, volume: float) -> None:
|
|
"""Set volume level, range 0..1."""
|
|
self._client.media_player_command(self._key, volume=volume)
|
|
|
|
@convert_api_error_ha_error
|
|
async def async_media_pause(self) -> None:
|
|
"""Send pause command."""
|
|
self._client.media_player_command(self._key, command=MediaPlayerCommand.PAUSE)
|
|
|
|
@convert_api_error_ha_error
|
|
async def async_media_play(self) -> None:
|
|
"""Send play command."""
|
|
self._client.media_player_command(self._key, command=MediaPlayerCommand.PLAY)
|
|
|
|
@convert_api_error_ha_error
|
|
async def async_media_stop(self) -> None:
|
|
"""Send stop command."""
|
|
self._client.media_player_command(self._key, command=MediaPlayerCommand.STOP)
|
|
|
|
@convert_api_error_ha_error
|
|
async def async_mute_volume(self, mute: bool) -> None:
|
|
"""Mute the volume."""
|
|
self._client.media_player_command(
|
|
self._key,
|
|
command=MediaPlayerCommand.MUTE if mute else MediaPlayerCommand.UNMUTE,
|
|
)
|
|
|
|
|
|
def _is_url(url: str) -> bool:
|
|
"""Validate the URL can be parsed and at least has scheme + netloc."""
|
|
result = urlparse(url)
|
|
return all([result.scheme, result.netloc])
|
|
|
|
|
|
async_setup_entry = partial(
|
|
platform_async_setup_entry,
|
|
info_type=MediaPlayerInfo,
|
|
entity_type=EsphomeMediaPlayer,
|
|
state_type=MediaPlayerEntityState,
|
|
)
|