core/homeassistant/components/esphome/media_player.py

258 lines
8.3 KiB
Python

"""Support for ESPHome media players."""
from __future__ import annotations
from functools import partial
import logging
from typing import Any, cast
from urllib.parse import urlparse
from aioesphomeapi import (
EntityInfo,
MediaPlayerCommand,
MediaPlayerEntityState,
MediaPlayerFormatPurpose,
MediaPlayerInfo,
MediaPlayerState as EspMediaPlayerState,
MediaPlayerSupportedFormat,
)
from homeassistant.components import media_source
from homeassistant.components.media_player import (
ATTR_MEDIA_ANNOUNCE,
ATTR_MEDIA_EXTRA,
BrowseMedia,
MediaPlayerDeviceClass,
MediaPlayerEntity,
MediaPlayerEntityFeature,
MediaPlayerState,
MediaType,
async_process_play_media_url,
)
from homeassistant.core import callback
from .entity import (
EsphomeEntity,
convert_api_error_ha_error,
esphome_float_state_property,
esphome_state_property,
platform_async_setup_entry,
)
from .enum_mapper import EsphomeEnumMapper
from .ffmpeg_proxy import async_create_proxy_url
_LOGGER = logging.getLogger(__name__)
_STATES: EsphomeEnumMapper[EspMediaPlayerState, MediaPlayerState] = EsphomeEnumMapper(
{
EspMediaPlayerState.IDLE: MediaPlayerState.IDLE,
EspMediaPlayerState.PLAYING: MediaPlayerState.PLAYING,
EspMediaPlayerState.PAUSED: MediaPlayerState.PAUSED,
}
)
ATTR_BYPASS_PROXY = "bypass_proxy"
class EsphomeMediaPlayer(
EsphomeEntity[MediaPlayerInfo, MediaPlayerEntityState], MediaPlayerEntity
):
"""A media player implementation for esphome."""
_attr_device_class = MediaPlayerDeviceClass.SPEAKER
@callback
def _on_static_info_update(self, static_info: EntityInfo) -> None:
"""Set attrs from static info."""
super()._on_static_info_update(static_info)
flags = (
MediaPlayerEntityFeature.PLAY_MEDIA
| MediaPlayerEntityFeature.BROWSE_MEDIA
| MediaPlayerEntityFeature.STOP
| MediaPlayerEntityFeature.VOLUME_SET
| MediaPlayerEntityFeature.VOLUME_MUTE
| MediaPlayerEntityFeature.MEDIA_ANNOUNCE
)
if self._static_info.supports_pause:
flags |= MediaPlayerEntityFeature.PAUSE | MediaPlayerEntityFeature.PLAY
self._attr_supported_features = flags
self._entry_data.media_player_formats[static_info.unique_id] = cast(
MediaPlayerInfo, static_info
).supported_formats
@property
@esphome_state_property
def state(self) -> MediaPlayerState | None:
"""Return current state."""
return _STATES.from_esphome(self._state.state)
@property
@esphome_state_property
def is_volume_muted(self) -> bool:
"""Return true if volume is muted."""
return self._state.muted
@property
@esphome_float_state_property
def volume_level(self) -> float | None:
"""Volume level of the media player (0..1)."""
return self._state.volume
@convert_api_error_ha_error
async def async_play_media(
self, media_type: MediaType | str, media_id: str, **kwargs: Any
) -> None:
"""Send the play command with media url to the media player."""
if media_source.is_media_source_id(media_id):
sourced_media = await media_source.async_resolve_media(
self.hass, media_id, self.entity_id
)
media_id = sourced_media.url
media_id = async_process_play_media_url(self.hass, media_id)
announcement = kwargs.get(ATTR_MEDIA_ANNOUNCE)
bypass_proxy = kwargs.get(ATTR_MEDIA_EXTRA, {}).get(ATTR_BYPASS_PROXY)
supported_formats: list[MediaPlayerSupportedFormat] | None = (
self._entry_data.media_player_formats.get(self._static_info.unique_id)
)
if (
not bypass_proxy
and supported_formats
and _is_url(media_id)
and (
proxy_url := self._get_proxy_url(
supported_formats, media_id, announcement is True
)
)
):
# Substitute proxy URL
media_id = proxy_url
self._client.media_player_command(
self._key, media_url=media_id, announcement=announcement
)
async def async_will_remove_from_hass(self) -> None:
"""Handle entity being removed."""
await super().async_will_remove_from_hass()
self._entry_data.media_player_formats.pop(self.entity_id, None)
def _get_proxy_url(
self,
supported_formats: list[MediaPlayerSupportedFormat],
url: str,
announcement: bool,
) -> str | None:
"""Get URL for ffmpeg proxy."""
if self.device_entry is None:
# Device id is required
return None
# Choose the first default or announcement supported format
format_to_use: MediaPlayerSupportedFormat | None = None
for supported_format in supported_formats:
if (format_to_use is None) and (
supported_format.purpose == MediaPlayerFormatPurpose.DEFAULT
):
# First default format
format_to_use = supported_format
elif announcement and (
supported_format.purpose == MediaPlayerFormatPurpose.ANNOUNCEMENT
):
# First announcement format
format_to_use = supported_format
break
if format_to_use is None:
# No format for conversion
return None
# Replace the media URL with a proxy URL pointing to Home
# Assistant. When requested, Home Assistant will use ffmpeg to
# convert the source URL to the supported format.
_LOGGER.debug("Proxying media url %s with format %s", url, format_to_use)
device_id = self.device_entry.id
media_format = format_to_use.format
# 0 = None
rate: int | None = None
channels: int | None = None
width: int | None = None
if format_to_use.sample_rate > 0:
rate = format_to_use.sample_rate
if format_to_use.num_channels > 0:
channels = format_to_use.num_channels
if format_to_use.sample_bytes > 0:
width = format_to_use.sample_bytes
proxy_url = async_create_proxy_url(
self.hass,
device_id,
url,
media_format=media_format,
rate=rate,
channels=channels,
width=width,
)
# Resolve URL
return async_process_play_media_url(self.hass, proxy_url)
async def async_browse_media(
self,
media_content_type: MediaType | str | None = None,
media_content_id: str | None = None,
) -> BrowseMedia:
"""Implement the websocket media browsing helper."""
return await media_source.async_browse_media(
self.hass,
media_content_id,
content_filter=lambda item: item.media_content_type.startswith("audio/"),
)
@convert_api_error_ha_error
async def async_set_volume_level(self, volume: float) -> None:
"""Set volume level, range 0..1."""
self._client.media_player_command(self._key, volume=volume)
@convert_api_error_ha_error
async def async_media_pause(self) -> None:
"""Send pause command."""
self._client.media_player_command(self._key, command=MediaPlayerCommand.PAUSE)
@convert_api_error_ha_error
async def async_media_play(self) -> None:
"""Send play command."""
self._client.media_player_command(self._key, command=MediaPlayerCommand.PLAY)
@convert_api_error_ha_error
async def async_media_stop(self) -> None:
"""Send stop command."""
self._client.media_player_command(self._key, command=MediaPlayerCommand.STOP)
@convert_api_error_ha_error
async def async_mute_volume(self, mute: bool) -> None:
"""Mute the volume."""
self._client.media_player_command(
self._key,
command=MediaPlayerCommand.MUTE if mute else MediaPlayerCommand.UNMUTE,
)
def _is_url(url: str) -> bool:
"""Validate the URL can be parsed and at least has scheme + netloc."""
result = urlparse(url)
return all([result.scheme, result.netloc])
async_setup_entry = partial(
platform_async_setup_entry,
info_type=MediaPlayerInfo,
entity_type=EsphomeMediaPlayer,
state_type=MediaPlayerEntityState,
)