core/homeassistant/components/spotify/media_player.py

406 lines
13 KiB
Python

"""Support for interacting with Spotify Connect."""
from __future__ import annotations
import asyncio
from collections.abc import Awaitable, Callable, Coroutine
import datetime as dt
import logging
from typing import TYPE_CHECKING, Any, Concatenate
from spotifyaio import (
Device,
Episode,
Item,
ItemType,
PlaybackState,
ProductType,
RepeatMode as SpotifyRepeatMode,
Track,
)
from yarl import URL
from homeassistant.components.media_player import (
ATTR_MEDIA_ENQUEUE,
BrowseMedia,
MediaPlayerEnqueue,
MediaPlayerEntity,
MediaPlayerEntityFeature,
MediaPlayerState,
MediaType,
RepeatMode,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .browse_media import async_browse_media_internal
from .const import MEDIA_PLAYER_PREFIX, PLAYABLE_MEDIA_TYPES
from .coordinator import SpotifyConfigEntry, SpotifyCoordinator
from .entity import SpotifyEntity
_LOGGER = logging.getLogger(__name__)
SUPPORT_SPOTIFY = (
MediaPlayerEntityFeature.BROWSE_MEDIA
| MediaPlayerEntityFeature.NEXT_TRACK
| MediaPlayerEntityFeature.PAUSE
| MediaPlayerEntityFeature.PLAY
| MediaPlayerEntityFeature.PLAY_MEDIA
| MediaPlayerEntityFeature.PREVIOUS_TRACK
| MediaPlayerEntityFeature.REPEAT_SET
| MediaPlayerEntityFeature.SEEK
| MediaPlayerEntityFeature.SELECT_SOURCE
| MediaPlayerEntityFeature.SHUFFLE_SET
| MediaPlayerEntityFeature.VOLUME_SET
)
REPEAT_MODE_MAPPING_TO_HA = {
SpotifyRepeatMode.CONTEXT: RepeatMode.ALL,
SpotifyRepeatMode.OFF: RepeatMode.OFF,
SpotifyRepeatMode.TRACK: RepeatMode.ONE,
}
REPEAT_MODE_MAPPING_TO_SPOTIFY = {
value: key for key, value in REPEAT_MODE_MAPPING_TO_HA.items()
}
AFTER_REQUEST_SLEEP = 1
async def async_setup_entry(
hass: HomeAssistant,
entry: SpotifyConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up Spotify based on a config entry."""
data = entry.runtime_data
assert entry.unique_id is not None
spotify = SpotifyMediaPlayer(
data.coordinator,
data.devices,
)
async_add_entities([spotify])
def ensure_item[_R](
func: Callable[[SpotifyMediaPlayer, Item], _R],
) -> Callable[[SpotifyMediaPlayer], _R | None]:
"""Ensure that the currently playing item is available."""
def wrapper(self: SpotifyMediaPlayer) -> _R | None:
if not self.currently_playing or not self.currently_playing.item:
return None
return func(self, self.currently_playing.item)
return wrapper
def async_refresh_after[_T: SpotifyEntity, **_P](
func: Callable[Concatenate[_T, _P], Awaitable[None]],
) -> Callable[Concatenate[_T, _P], Coroutine[Any, Any, None]]:
"""Define a wrapper to yield and refresh after."""
async def _async_wrap(self: _T, *args: _P.args, **kwargs: _P.kwargs) -> None:
await func(self, *args, **kwargs)
await asyncio.sleep(AFTER_REQUEST_SLEEP)
await self.coordinator.async_refresh()
return _async_wrap
class SpotifyMediaPlayer(SpotifyEntity, MediaPlayerEntity):
"""Representation of a Spotify controller."""
_attr_media_image_remotely_accessible = False
_attr_name = None
_attr_translation_key = "spotify"
def __init__(
self,
coordinator: SpotifyCoordinator,
device_coordinator: DataUpdateCoordinator[list[Device]],
) -> None:
"""Initialize."""
super().__init__(coordinator)
self.devices = device_coordinator
self._attr_unique_id = coordinator.current_user.user_id
@property
def currently_playing(self) -> PlaybackState | None:
"""Return the current playback."""
return self.coordinator.data.current_playback
@property
def supported_features(self) -> MediaPlayerEntityFeature:
"""Return the supported features."""
if self.coordinator.current_user.product != ProductType.PREMIUM:
return MediaPlayerEntityFeature(0)
if not self.currently_playing or self.currently_playing.device.is_restricted:
return MediaPlayerEntityFeature.SELECT_SOURCE
return SUPPORT_SPOTIFY
@property
def state(self) -> MediaPlayerState:
"""Return the playback state."""
if not self.currently_playing:
return MediaPlayerState.IDLE
if self.currently_playing.is_playing:
return MediaPlayerState.PLAYING
return MediaPlayerState.PAUSED
@property
def volume_level(self) -> float | None:
"""Return the device volume."""
if not self.currently_playing:
return None
return self.currently_playing.device.volume_percent / 100
@property
@ensure_item
def media_content_id(self, item: Item) -> str: # noqa: PLR0206
"""Return the media URL."""
return item.uri
@property
@ensure_item
def media_content_type(self, item: Item) -> str: # noqa: PLR0206
"""Return the media type."""
return MediaType.PODCAST if item.type == ItemType.EPISODE else MediaType.MUSIC
@property
@ensure_item
def media_duration(self, item: Item) -> int: # noqa: PLR0206
"""Duration of current playing media in seconds."""
return round(item.duration_ms / 1000)
@property
def media_position(self) -> int | None:
"""Position of current playing media in seconds."""
if not self.currently_playing or self.currently_playing.progress_ms is None:
return None
return round(self.currently_playing.progress_ms / 1000)
@property
def media_position_updated_at(self) -> dt.datetime | None:
"""When was the position of the current playing media valid."""
if not self.currently_playing:
return None
return self.coordinator.data.position_updated_at
@property
@ensure_item
def media_image_url(self, item: Item) -> str | None: # noqa: PLR0206
"""Return the media image URL."""
if item.type == ItemType.EPISODE:
if TYPE_CHECKING:
assert isinstance(item, Episode)
if item.images:
return item.images[0].url
if item.show and item.show.images:
return item.show.images[0].url
return None
if TYPE_CHECKING:
assert isinstance(item, Track)
if not item.album.images:
return None
return item.album.images[0].url
@property
@ensure_item
def media_title(self, item: Item) -> str: # noqa: PLR0206
"""Return the media title."""
return item.name
@property
@ensure_item
def media_artist(self, item: Item) -> str: # noqa: PLR0206
"""Return the media artist."""
if item.type == ItemType.EPISODE:
if TYPE_CHECKING:
assert isinstance(item, Episode)
return item.show.publisher
if TYPE_CHECKING:
assert isinstance(item, Track)
return ", ".join(artist.name for artist in item.artists)
@property
@ensure_item
def media_album_name(self, item: Item) -> str: # noqa: PLR0206
"""Return the media album."""
if item.type == ItemType.EPISODE:
if TYPE_CHECKING:
assert isinstance(item, Episode)
return item.show.name
if TYPE_CHECKING:
assert isinstance(item, Track)
return item.album.name
@property
@ensure_item
def media_track(self, item: Item) -> int | None: # noqa: PLR0206
"""Track number of current playing media, music track only."""
if item.type == ItemType.EPISODE:
return None
if TYPE_CHECKING:
assert isinstance(item, Track)
return item.track_number
@property
def media_playlist(self) -> str | None:
"""Title of Playlist currently playing."""
if self.coordinator.data.dj_playlist:
return "DJ"
if self.coordinator.data.playlist is None:
return None
return self.coordinator.data.playlist.name
@property
def source(self) -> str | None:
"""Return the current playback device."""
if not self.currently_playing:
return None
return self.currently_playing.device.name
@property
def source_list(self) -> list[str] | None:
"""Return a list of source devices."""
return [device.name for device in self.devices.data]
@property
def shuffle(self) -> bool | None:
"""Shuffling state."""
if not self.currently_playing:
return None
return self.currently_playing.shuffle
@property
def repeat(self) -> RepeatMode | None:
"""Return current repeat mode."""
if not self.currently_playing:
return None
return REPEAT_MODE_MAPPING_TO_HA.get(self.currently_playing.repeat_mode)
@async_refresh_after
async def async_set_volume_level(self, volume: float) -> None:
"""Set the volume level."""
await self.coordinator.client.set_volume(int(volume * 100))
@async_refresh_after
async def async_media_play(self) -> None:
"""Start or resume playback."""
await self.coordinator.client.start_playback()
@async_refresh_after
async def async_media_pause(self) -> None:
"""Pause playback."""
await self.coordinator.client.pause_playback()
@async_refresh_after
async def async_media_previous_track(self) -> None:
"""Skip to previous track."""
await self.coordinator.client.previous_track()
@async_refresh_after
async def async_media_next_track(self) -> None:
"""Skip to next track."""
await self.coordinator.client.next_track()
@async_refresh_after
async def async_media_seek(self, position: float) -> None:
"""Send seek command."""
await self.coordinator.client.seek_track(int(position * 1000))
@async_refresh_after
async def async_play_media(
self, media_type: MediaType | str, media_id: str, **kwargs: Any
) -> None:
"""Play media."""
media_type = media_type.removeprefix(MEDIA_PLAYER_PREFIX)
enqueue: MediaPlayerEnqueue = kwargs.get(
ATTR_MEDIA_ENQUEUE, MediaPlayerEnqueue.REPLACE
)
kwargs = {}
# Spotify can't handle URI's with query strings or anchors
# Yet, they do generate those types of URI in their official clients.
media_id = str(URL(media_id).with_query(None).with_fragment(None))
if media_type in {MediaType.TRACK, MediaType.EPISODE, MediaType.MUSIC}:
kwargs["uris"] = [media_id]
elif media_type in PLAYABLE_MEDIA_TYPES:
kwargs["context_uri"] = media_id
else:
_LOGGER.error("Media type %s is not supported", media_type)
return
if not self.currently_playing and self.devices.data:
kwargs["device_id"] = self.devices.data[0].device_id
if enqueue == MediaPlayerEnqueue.ADD:
if media_type not in {
MediaType.TRACK,
MediaType.EPISODE,
MediaType.MUSIC,
}:
raise ValueError(
f"Media type {media_type} is not supported when enqueue is ADD"
)
await self.coordinator.client.add_to_queue(
media_id, kwargs.get("device_id")
)
return
await self.coordinator.client.start_playback(**kwargs)
@async_refresh_after
async def async_select_source(self, source: str) -> None:
"""Select playback device."""
for device in self.devices.data:
if device.name == source:
await self.coordinator.client.transfer_playback(device.device_id)
return
@async_refresh_after
async def async_set_shuffle(self, shuffle: bool) -> None:
"""Enable/Disable shuffle mode."""
await self.coordinator.client.set_shuffle(state=shuffle)
@async_refresh_after
async def async_set_repeat(self, repeat: RepeatMode) -> None:
"""Set repeat mode."""
if repeat not in REPEAT_MODE_MAPPING_TO_SPOTIFY:
raise ValueError(f"Unsupported repeat mode: {repeat}")
await self.coordinator.client.set_repeat(REPEAT_MODE_MAPPING_TO_SPOTIFY[repeat])
async def async_browse_media(
self,
media_content_type: MediaType | str | None = None,
media_content_id: str | None = None,
) -> BrowseMedia:
"""Implement the websocket media browsing helper."""
return await async_browse_media_internal(
self.hass,
self.coordinator.client,
media_content_type,
media_content_id,
)
@callback
def _handle_devices_update(self) -> None:
"""Handle updated data from the coordinator."""
if not self.enabled:
return
self.async_write_ha_state()
async def async_added_to_hass(self) -> None:
"""When entity is added to hass."""
await super().async_added_to_hass()
self.async_on_remove(
self.devices.async_add_listener(self._handle_devices_update)
)