core/homeassistant/components/music_assistant/media_player.py

562 lines
22 KiB
Python

"""MediaPlayer platform for Music Assistant integration."""
from __future__ import annotations
import asyncio
from collections.abc import Awaitable, Callable, Coroutine, Mapping
from contextlib import suppress
import functools
import os
from typing import TYPE_CHECKING, Any
from music_assistant_models.enums import (
EventType,
MediaType,
PlayerFeature,
QueueOption,
RepeatMode as MassRepeatMode,
)
from music_assistant_models.errors import MediaNotFoundError, MusicAssistantError
from music_assistant_models.event import MassEvent
from music_assistant_models.media_items import ItemMapping, MediaItemType, Track
from homeassistant.components import media_source
from homeassistant.components.media_player import (
ATTR_MEDIA_EXTRA,
BrowseMedia,
MediaPlayerDeviceClass,
MediaPlayerEnqueue,
MediaPlayerEntity,
MediaPlayerEntityFeature,
MediaPlayerState,
MediaType as HAMediaType,
RepeatMode,
async_process_play_media_url,
)
from homeassistant.const import STATE_OFF
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.util.dt import utc_from_timestamp
from . import MusicAssistantConfigEntry
from .const import ATTR_ACTIVE_QUEUE, ATTR_MASS_PLAYER_TYPE, DOMAIN
from .entity import MusicAssistantEntity
if TYPE_CHECKING:
from music_assistant_client import MusicAssistantClient
from music_assistant_models.player import Player
from music_assistant_models.player_queue import PlayerQueue
SUPPORTED_FEATURES = (
MediaPlayerEntityFeature.PAUSE
| MediaPlayerEntityFeature.VOLUME_SET
| MediaPlayerEntityFeature.STOP
| MediaPlayerEntityFeature.PREVIOUS_TRACK
| MediaPlayerEntityFeature.NEXT_TRACK
| MediaPlayerEntityFeature.SHUFFLE_SET
| MediaPlayerEntityFeature.REPEAT_SET
| MediaPlayerEntityFeature.TURN_ON
| MediaPlayerEntityFeature.TURN_OFF
| MediaPlayerEntityFeature.PLAY
| MediaPlayerEntityFeature.PLAY_MEDIA
| MediaPlayerEntityFeature.VOLUME_STEP
| MediaPlayerEntityFeature.CLEAR_PLAYLIST
| MediaPlayerEntityFeature.BROWSE_MEDIA
| MediaPlayerEntityFeature.MEDIA_ENQUEUE
| MediaPlayerEntityFeature.MEDIA_ANNOUNCE
| MediaPlayerEntityFeature.SEEK
)
QUEUE_OPTION_MAP = {
# map from HA enqueue options to MA enqueue options
# which are the same but just in case
MediaPlayerEnqueue.ADD: QueueOption.ADD,
MediaPlayerEnqueue.NEXT: QueueOption.NEXT,
MediaPlayerEnqueue.PLAY: QueueOption.PLAY,
MediaPlayerEnqueue.REPLACE: QueueOption.REPLACE,
}
ATTR_RADIO_MODE = "radio_mode"
ATTR_MEDIA_ID = "media_id"
ATTR_MEDIA_TYPE = "media_type"
ATTR_ARTIST = "artist"
ATTR_ALBUM = "album"
ATTR_URL = "url"
ATTR_USE_PRE_ANNOUNCE = "use_pre_announce"
ATTR_ANNOUNCE_VOLUME = "announce_volume"
ATTR_SOURCE_PLAYER = "source_player"
ATTR_AUTO_PLAY = "auto_play"
def catch_musicassistant_error[_R, **P](
func: Callable[..., Awaitable[_R]],
) -> Callable[..., Coroutine[Any, Any, _R | None]]:
"""Check and log commands to players."""
@functools.wraps(func)
async def wrapper(
self: MusicAssistantPlayer, *args: P.args, **kwargs: P.kwargs
) -> _R | None:
"""Catch Music Assistant errors and convert to Home Assistant error."""
try:
return await func(self, *args, **kwargs)
except MusicAssistantError as err:
error_msg = str(err) or err.__class__.__name__
raise HomeAssistantError(error_msg) from err
return wrapper
async def async_setup_entry(
hass: HomeAssistant,
entry: MusicAssistantConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up Music Assistant MediaPlayer(s) from Config Entry."""
mass = entry.runtime_data.mass
added_ids = set()
async def handle_player_added(event: MassEvent) -> None:
"""Handle Mass Player Added event."""
if TYPE_CHECKING:
assert event.object_id is not None
if event.object_id in added_ids:
return
added_ids.add(event.object_id)
async_add_entities([MusicAssistantPlayer(mass, event.object_id)])
# register listener for new players
entry.async_on_unload(mass.subscribe(handle_player_added, EventType.PLAYER_ADDED))
mass_players = []
# add all current players
for player in mass.players:
added_ids.add(player.player_id)
mass_players.append(MusicAssistantPlayer(mass, player.player_id))
async_add_entities(mass_players)
class MusicAssistantPlayer(MusicAssistantEntity, MediaPlayerEntity):
"""Representation of MediaPlayerEntity from Music Assistant Player."""
_attr_name = None
_attr_media_image_remotely_accessible = True
_attr_media_content_type = HAMediaType.MUSIC
def __init__(self, mass: MusicAssistantClient, player_id: str) -> None:
"""Initialize MediaPlayer entity."""
super().__init__(mass, player_id)
self._attr_icon = self.player.icon.replace("mdi-", "mdi:")
self._attr_supported_features = SUPPORTED_FEATURES
if PlayerFeature.SYNC in self.player.supported_features:
self._attr_supported_features |= MediaPlayerEntityFeature.GROUPING
if PlayerFeature.VOLUME_MUTE in self.player.supported_features:
self._attr_supported_features |= MediaPlayerEntityFeature.VOLUME_MUTE
self._attr_device_class = MediaPlayerDeviceClass.SPEAKER
self._prev_time: float = 0
async def async_added_to_hass(self) -> None:
"""Register callbacks."""
await super().async_added_to_hass()
# we subscribe to player queue time update but we only
# accept a state change on big time jumps (e.g. seeking)
async def queue_time_updated(event: MassEvent) -> None:
if event.object_id != self.player.active_source:
return
if abs((self._prev_time or 0) - event.data) > 5:
await self.async_on_update()
self.async_write_ha_state()
self._prev_time = event.data
self.async_on_remove(
self.mass.subscribe(
queue_time_updated,
EventType.QUEUE_TIME_UPDATED,
)
)
@property
def active_queue(self) -> PlayerQueue | None:
"""Return the active queue for this player (if any)."""
if not self.player.active_source:
return None
return self.mass.player_queues.get(self.player.active_source)
@property
def extra_state_attributes(self) -> Mapping[str, Any]:
"""Return additional state attributes."""
return {
ATTR_MASS_PLAYER_TYPE: self.player.type.value,
ATTR_ACTIVE_QUEUE: (
self.active_queue.queue_id if self.active_queue else None
),
}
async def async_on_update(self) -> None:
"""Handle player updates."""
if not self.available:
return
player = self.player
active_queue = self.active_queue
# update generic attributes
if player.powered and active_queue is not None:
self._attr_state = MediaPlayerState(active_queue.state.value)
if player.powered and player.state is not None:
self._attr_state = MediaPlayerState(player.state.value)
else:
self._attr_state = MediaPlayerState(STATE_OFF)
group_members_entity_ids: list[str] = []
if player.group_childs:
# translate MA group_childs to HA group_members as entity id's
entity_registry = er.async_get(self.hass)
group_members_entity_ids = [
entity_id
for child_id in player.group_childs
if (
entity_id := entity_registry.async_get_entity_id(
self.platform.domain, DOMAIN, child_id
)
)
]
# NOTE: we sort the group_members for now,
# until the MA API returns them sorted (group_childs is now a set)
self._attr_group_members = sorted(group_members_entity_ids)
self._attr_volume_level = (
player.volume_level / 100 if player.volume_level is not None else None
)
self._attr_is_volume_muted = player.volume_muted
self._update_media_attributes(player, active_queue)
self._update_media_image_url(player, active_queue)
@catch_musicassistant_error
async def async_media_play(self) -> None:
"""Send play command to device."""
await self.mass.players.player_command_play(self.player_id)
@catch_musicassistant_error
async def async_media_pause(self) -> None:
"""Send pause command to device."""
await self.mass.players.player_command_pause(self.player_id)
@catch_musicassistant_error
async def async_media_stop(self) -> None:
"""Send stop command to device."""
await self.mass.players.player_command_stop(self.player_id)
@catch_musicassistant_error
async def async_media_next_track(self) -> None:
"""Send next track command to device."""
await self.mass.players.player_command_next_track(self.player_id)
@catch_musicassistant_error
async def async_media_previous_track(self) -> None:
"""Send previous track command to device."""
await self.mass.players.player_command_previous_track(self.player_id)
@catch_musicassistant_error
async def async_media_seek(self, position: float) -> None:
"""Send seek command."""
position = int(position)
await self.mass.players.player_command_seek(self.player_id, position)
@catch_musicassistant_error
async def async_mute_volume(self, mute: bool) -> None:
"""Mute the volume."""
await self.mass.players.player_command_volume_mute(self.player_id, mute)
@catch_musicassistant_error
async def async_set_volume_level(self, volume: float) -> None:
"""Send new volume_level to device."""
volume = int(volume * 100)
await self.mass.players.player_command_volume_set(self.player_id, volume)
@catch_musicassistant_error
async def async_volume_up(self) -> None:
"""Send new volume_level to device."""
await self.mass.players.player_command_volume_up(self.player_id)
@catch_musicassistant_error
async def async_volume_down(self) -> None:
"""Send new volume_level to device."""
await self.mass.players.player_command_volume_down(self.player_id)
@catch_musicassistant_error
async def async_turn_on(self) -> None:
"""Turn on device."""
await self.mass.players.player_command_power(self.player_id, True)
@catch_musicassistant_error
async def async_turn_off(self) -> None:
"""Turn off device."""
await self.mass.players.player_command_power(self.player_id, False)
@catch_musicassistant_error
async def async_set_shuffle(self, shuffle: bool) -> None:
"""Set shuffle state."""
if not self.active_queue:
return
await self.mass.player_queues.queue_command_shuffle(
self.active_queue.queue_id, shuffle
)
@catch_musicassistant_error
async def async_set_repeat(self, repeat: RepeatMode) -> None:
"""Set repeat state."""
if not self.active_queue:
return
await self.mass.player_queues.queue_command_repeat(
self.active_queue.queue_id, MassRepeatMode(repeat)
)
@catch_musicassistant_error
async def async_clear_playlist(self) -> None:
"""Clear players playlist."""
if TYPE_CHECKING:
assert self.player.active_source is not None
if queue := self.mass.player_queues.get(self.player.active_source):
await self.mass.player_queues.queue_command_clear(queue.queue_id)
@catch_musicassistant_error
async def async_play_media(
self,
media_type: MediaType | str,
media_id: str,
enqueue: MediaPlayerEnqueue | None = None,
announce: bool | None = None,
**kwargs: Any,
) -> None:
"""Send the play_media command to the media player."""
if media_source.is_media_source_id(media_id):
# Handle media_source
sourced_media = await media_source.async_resolve_media(
self.hass, media_id, self.entity_id
)
media_id = sourced_media.url
media_id = async_process_play_media_url(self.hass, media_id)
if announce:
await self._async_handle_play_announcement(
media_id,
use_pre_announce=kwargs[ATTR_MEDIA_EXTRA].get("use_pre_announce"),
announce_volume=kwargs[ATTR_MEDIA_EXTRA].get("announce_volume"),
)
return
# forward to our advanced play_media handler
await self._async_handle_play_media(
media_id=[media_id],
enqueue=enqueue,
media_type=media_type,
radio_mode=kwargs[ATTR_MEDIA_EXTRA].get(ATTR_RADIO_MODE),
)
@catch_musicassistant_error
async def async_join_players(self, group_members: list[str]) -> None:
"""Join `group_members` as a player group with the current player."""
player_ids: list[str] = []
for child_entity_id in group_members:
# resolve HA entity_id to MA player_id
if (hass_state := self.hass.states.get(child_entity_id)) is None:
continue
if (mass_player_id := hass_state.attributes.get("mass_player_id")) is None:
continue
player_ids.append(mass_player_id)
await self.mass.players.player_command_sync_many(self.player_id, player_ids)
@catch_musicassistant_error
async def async_unjoin_player(self) -> None:
"""Remove this player from any group."""
await self.mass.players.player_command_unsync(self.player_id)
@catch_musicassistant_error
async def _async_handle_play_media(
self,
media_id: list[str],
enqueue: MediaPlayerEnqueue | QueueOption | None = None,
radio_mode: bool | None = None,
media_type: str | None = None,
) -> None:
"""Send the play_media command to the media player."""
media_uris: list[str] = []
item: MediaItemType | ItemMapping | None = None
# work out (all) uri(s) to play
for media_id_str in media_id:
# URL or URI string
if "://" in media_id_str:
media_uris.append(media_id_str)
continue
# try content id as library id
if media_type and media_id_str.isnumeric():
with suppress(MediaNotFoundError):
item = await self.mass.music.get_item(
MediaType(media_type), media_id_str, "library"
)
if isinstance(item, MediaItemType | ItemMapping) and item.uri:
media_uris.append(item.uri)
continue
# try local accessible filename
elif await asyncio.to_thread(os.path.isfile, media_id_str):
media_uris.append(media_id_str)
continue
if not media_uris:
raise HomeAssistantError(
f"Could not resolve {media_id} to playable media item"
)
# determine active queue to send the play request to
if TYPE_CHECKING:
assert self.player.active_source is not None
if queue := self.mass.player_queues.get(self.player.active_source):
queue_id = queue.queue_id
else:
queue_id = self.player_id
await self.mass.player_queues.play_media(
queue_id,
media=media_uris,
option=self._convert_queueoption_to_media_player_enqueue(enqueue),
radio_mode=radio_mode if radio_mode else False,
)
@catch_musicassistant_error
async def _async_handle_play_announcement(
self,
url: str,
use_pre_announce: bool | None = None,
announce_volume: int | None = None,
) -> None:
"""Send the play_announcement command to the media player."""
await self.mass.players.play_announcement(
self.player_id, url, use_pre_announce, announce_volume
)
async def async_browse_media(
self,
media_content_type: MediaType | str | None = None,
media_content_id: str | None = None,
) -> BrowseMedia:
"""Implement the websocket media browsing helper."""
return await media_source.async_browse_media(
self.hass,
media_content_id,
content_filter=lambda item: item.media_content_type.startswith("audio/"),
)
def _update_media_image_url(
self, player: Player, queue: PlayerQueue | None
) -> None:
"""Update image URL for the active queue item."""
if queue is None or queue.current_item is None:
self._attr_media_image_url = None
return
if image_url := self.mass.get_media_item_image_url(queue.current_item):
self._attr_media_image_remotely_accessible = (
self.mass.server_url not in image_url
)
self._attr_media_image_url = image_url
return
self._attr_media_image_url = None
def _update_media_attributes(
self, player: Player, queue: PlayerQueue | None
) -> None:
"""Update media attributes for the active queue item."""
# pylint: disable=too-many-statements
self._attr_media_artist = None
self._attr_media_album_artist = None
self._attr_media_album_name = None
self._attr_media_title = None
self._attr_media_content_id = None
self._attr_media_duration = None
self._attr_media_position = None
self._attr_media_position_updated_at = None
if queue is None and player.current_media:
# player has some external source active
self._attr_media_content_id = player.current_media.uri
self._attr_app_id = player.active_source
self._attr_media_title = player.current_media.title
self._attr_media_artist = player.current_media.artist
self._attr_media_album_name = player.current_media.album
self._attr_media_duration = player.current_media.duration
# shuffle and repeat are not (yet) supported for external sources
self._attr_shuffle = None
self._attr_repeat = None
if TYPE_CHECKING:
assert player.elapsed_time is not None
self._attr_media_position = int(player.elapsed_time)
self._attr_media_position_updated_at = (
utc_from_timestamp(player.elapsed_time_last_updated)
if player.elapsed_time_last_updated
else None
)
if TYPE_CHECKING:
assert player.elapsed_time is not None
self._prev_time = player.elapsed_time
return
if queue is None:
# player has no MA queue active
self._attr_source = player.active_source
self._attr_app_id = player.active_source
return
# player has an MA queue active (either its own queue or some group queue)
self._attr_app_id = DOMAIN
self._attr_shuffle = queue.shuffle_enabled
self._attr_repeat = queue.repeat_mode.value
if not (cur_item := queue.current_item):
# queue is empty
return
self._attr_media_content_id = queue.current_item.uri
self._attr_media_duration = queue.current_item.duration
self._attr_media_position = int(queue.elapsed_time)
self._attr_media_position_updated_at = utc_from_timestamp(
queue.elapsed_time_last_updated
)
self._prev_time = queue.elapsed_time
# handle stream title (radio station icy metadata)
if (stream_details := cur_item.streamdetails) and stream_details.stream_title:
self._attr_media_album_name = cur_item.name
if " - " in stream_details.stream_title:
stream_title_parts = stream_details.stream_title.split(" - ", 1)
self._attr_media_title = stream_title_parts[1]
self._attr_media_artist = stream_title_parts[0]
else:
self._attr_media_title = stream_details.stream_title
return
if not (media_item := cur_item.media_item):
# queue is not playing a regular media item (edge case?!)
self._attr_media_title = cur_item.name
return
# queue is playing regular media item
self._attr_media_title = media_item.name
# for tracks we can extract more info
if media_item.media_type == MediaType.TRACK:
if TYPE_CHECKING:
assert isinstance(media_item, Track)
self._attr_media_artist = media_item.artist_str
if media_item.version:
self._attr_media_title += f" ({media_item.version})"
if media_item.album:
self._attr_media_album_name = media_item.album.name
self._attr_media_album_artist = getattr(
media_item.album, "artist_str", None
)
def _convert_queueoption_to_media_player_enqueue(
self, queue_option: MediaPlayerEnqueue | QueueOption | None
) -> QueueOption | None:
"""Convert a QueueOption to a MediaPlayerEnqueue."""
if isinstance(queue_option, MediaPlayerEnqueue):
queue_option = QUEUE_OPTION_MAP.get(queue_option)
return queue_option