core/homeassistant/components/sonos/media_browser.py

586 lines
18 KiB
Python

"""Support for media browsing."""
from __future__ import annotations
from collections.abc import Callable
from contextlib import suppress
from functools import partial
import logging
from typing import cast
import urllib.parse
from soco.data_structures import DidlObject
from soco.ms_data_structures import MusicServiceItem
from soco.music_library import MusicLibrary
from homeassistant.components import media_source, plex, spotify
from homeassistant.components.media_player import (
BrowseError,
BrowseMedia,
MediaClass,
MediaType,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.network import is_internal_request
from .const import (
DOMAIN,
EXPANDABLE_MEDIA_TYPES,
LIBRARY_TITLES_MAPPING,
MEDIA_TYPES_TO_SONOS,
PLAYABLE_MEDIA_TYPES,
SONOS_ALBUM,
SONOS_ALBUM_ARTIST,
SONOS_GENRE,
SONOS_TO_MEDIA_CLASSES,
SONOS_TO_MEDIA_TYPES,
SONOS_TRACKS,
SONOS_TYPES_MAPPING,
)
from .exception import UnknownMediaType
from .favorites import SonosFavorites
from .speaker import SonosMedia, SonosSpeaker
_LOGGER = logging.getLogger(__name__)
type GetBrowseImageUrlType = Callable[[str, str, str | None], str]
def fix_image_url(url: str) -> str:
"""Update the image url to fully encode characters to allow image display in media_browser UI.
Images whose file path contains characters such as ',()+ are not loaded without escaping them.
"""
# Before parsing encode the plus sign; otherwise it'll be interpreted as a space.
original_url: str = urllib.parse.unquote(url).replace("+", "%2B")
parsed_url = urllib.parse.urlparse(original_url)
query_params = urllib.parse.parse_qsl(parsed_url.query)
new_url = urllib.parse.urlunsplit(
(
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
urllib.parse.urlencode(
query_params, quote_via=urllib.parse.quote, safe="/:"
),
"",
)
)
if original_url != new_url:
_LOGGER.debug("fix_sonos_image_url original: %s new: %s", original_url, new_url)
return new_url
def get_thumbnail_url_full(
media: SonosMedia,
is_internal: bool,
get_browse_image_url: GetBrowseImageUrlType,
media_content_type: str,
media_content_id: str,
media_image_id: str | None = None,
item: MusicServiceItem | None = None,
) -> str | None:
"""Get thumbnail URL."""
if is_internal:
if not item:
item = get_media(
media.library,
media_content_id,
media_content_type,
)
return fix_image_url(getattr(item, "album_art_uri", ""))
return urllib.parse.unquote(
get_browse_image_url(
media_content_type,
media_content_id,
media_image_id,
)
)
def media_source_filter(item: BrowseMedia) -> bool:
"""Filter media sources."""
return item.media_content_type.startswith("audio/")
async def async_browse_media(
hass: HomeAssistant,
speaker: SonosSpeaker,
media: SonosMedia,
get_browse_image_url: GetBrowseImageUrlType,
media_content_id: str | None,
media_content_type: str | None,
) -> BrowseMedia:
"""Browse media."""
if media_content_id is None:
return await root_payload(
hass,
speaker,
media,
get_browse_image_url,
)
assert media_content_type is not None
if media_source.is_media_source_id(media_content_id):
return await media_source.async_browse_media(
hass, media_content_id, content_filter=media_source_filter
)
if plex.is_plex_media_id(media_content_id):
return await plex.async_browse_media(
hass, media_content_type, media_content_id, platform=DOMAIN
)
if media_content_type == "plex":
return await plex.async_browse_media(hass, None, None, platform=DOMAIN)
if spotify.is_spotify_media_type(media_content_type):
return await spotify.async_browse_media(
hass, media_content_type, media_content_id, can_play_artist=False
)
if media_content_type == "library":
return await hass.async_add_executor_job(
library_payload,
media.library,
partial(
get_thumbnail_url_full,
media,
is_internal_request(hass),
get_browse_image_url,
),
)
if media_content_type == "favorites":
return await hass.async_add_executor_job(
favorites_payload,
speaker.favorites,
)
if media_content_type == "favorites_folder":
return await hass.async_add_executor_job(
favorites_folder_payload,
speaker.favorites,
media_content_id,
)
payload = {
"search_type": media_content_type,
"idstring": media_content_id,
}
response = await hass.async_add_executor_job(
build_item_response,
media.library,
payload,
partial(
get_thumbnail_url_full,
media,
is_internal_request(hass),
get_browse_image_url,
),
)
if response is None:
raise BrowseError(f"Media not found: {media_content_type} / {media_content_id}")
return response
def build_item_response(
media_library: MusicLibrary, payload: dict[str, str], get_thumbnail_url=None
) -> BrowseMedia | None:
"""Create response payload for the provided media query."""
if payload["search_type"] == MediaType.ALBUM and payload["idstring"].startswith(
("A:GENRE", "A:COMPOSER")
):
payload["idstring"] = "A:ALBUMARTIST/" + "/".join(
payload["idstring"].split("/")[2:]
)
payload["idstring"] = urllib.parse.unquote(payload["idstring"])
try:
search_type = MEDIA_TYPES_TO_SONOS[payload["search_type"]]
except KeyError:
_LOGGER.debug(
"Unknown media type received when building item response: %s",
payload["search_type"],
)
return None
media = media_library.browse_by_idstring(
search_type,
payload["idstring"],
full_album_art_uri=True,
max_items=0,
)
if media is None:
return None
thumbnail = None
title = None
# Fetch album info for titles and thumbnails
# Can't be extracted from track info
if (
payload["search_type"] == MediaType.ALBUM
and media[0].item_class == "object.item.audioItem.musicTrack"
):
idstring = payload["idstring"]
if idstring.startswith("A:ALBUMARTIST/"):
search_type = SONOS_ALBUM_ARTIST
elif idstring.startswith("A:ALBUM/"):
search_type = SONOS_ALBUM
item = get_media(media_library, idstring, search_type)
title = getattr(item, "title", None)
thumbnail = get_thumbnail_url(search_type, payload["idstring"])
if not title:
try:
title = urllib.parse.unquote(payload["idstring"].split("/")[1])
except IndexError:
title = LIBRARY_TITLES_MAPPING[payload["idstring"]]
try:
media_class = SONOS_TO_MEDIA_CLASSES[
MEDIA_TYPES_TO_SONOS[payload["search_type"]]
]
except KeyError:
_LOGGER.debug("Unknown media type received %s", payload["search_type"])
return None
children = []
for item in media:
with suppress(UnknownMediaType):
children.append(item_payload(item, get_thumbnail_url))
return BrowseMedia(
title=title,
thumbnail=thumbnail,
media_class=media_class,
media_content_id=payload["idstring"],
media_content_type=payload["search_type"],
children=children,
can_play=can_play(payload["search_type"]),
can_expand=can_expand(payload["search_type"]),
)
def item_payload(item: DidlObject, get_thumbnail_url=None) -> BrowseMedia:
"""Create response payload for a single media item.
Used by async_browse_media.
"""
media_type = get_media_type(item)
try:
media_class = SONOS_TO_MEDIA_CLASSES[media_type]
except KeyError as err:
_LOGGER.debug("Unknown media type received %s", media_type)
raise UnknownMediaType from err
content_id = get_content_id(item)
thumbnail = None
if getattr(item, "album_art_uri", None):
thumbnail = get_thumbnail_url(media_class, content_id, item=item)
return BrowseMedia(
title=item.title,
thumbnail=thumbnail,
media_class=media_class,
media_content_id=content_id,
media_content_type=SONOS_TO_MEDIA_TYPES[media_type],
can_play=can_play(item.item_class),
can_expand=can_expand(item),
)
async def root_payload(
hass: HomeAssistant,
speaker: SonosSpeaker,
media: SonosMedia,
get_browse_image_url: GetBrowseImageUrlType,
) -> BrowseMedia:
"""Return root payload for Sonos."""
children: list[BrowseMedia] = []
if speaker.favorites:
children.append(
BrowseMedia(
title="Favorites",
media_class=MediaClass.DIRECTORY,
media_content_id="",
media_content_type="favorites",
thumbnail="https://brands.home-assistant.io/_/sonos/logo.png",
can_play=False,
can_expand=True,
)
)
if await hass.async_add_executor_job(
partial(media.library.browse_by_idstring, "tracks", "", max_items=1)
):
children.append(
BrowseMedia(
title="Music Library",
media_class=MediaClass.DIRECTORY,
media_content_id="",
media_content_type="library",
thumbnail="https://brands.home-assistant.io/_/sonos/logo.png",
can_play=False,
can_expand=True,
)
)
if "plex" in hass.config.components:
children.append(
BrowseMedia(
title="Plex",
media_class=MediaClass.APP,
media_content_id="",
media_content_type="plex",
thumbnail="https://brands.home-assistant.io/_/plex/logo.png",
can_play=False,
can_expand=True,
)
)
if "spotify" in hass.config.components:
result = await spotify.async_browse_media(hass, None, None)
if result.children:
children.extend(result.children)
try:
item = await media_source.async_browse_media(
hass, None, content_filter=media_source_filter
)
# If domain is None, it's overview of available sources
if item.domain is None and item.children is not None:
children.extend(item.children)
else:
children.append(item)
except media_source.BrowseError:
pass
if len(children) == 1:
return await async_browse_media(
hass,
speaker,
media,
get_browse_image_url,
children[0].media_content_id,
children[0].media_content_type,
)
return BrowseMedia(
title="Sonos",
media_class=MediaClass.DIRECTORY,
media_content_id="",
media_content_type="root",
can_play=False,
can_expand=True,
children=children,
)
def library_payload(media_library: MusicLibrary, get_thumbnail_url=None) -> BrowseMedia:
"""Create response payload to describe contents of a specific library.
Used by async_browse_media.
"""
children = []
for item in media_library.browse():
with suppress(UnknownMediaType):
children.append(item_payload(item, get_thumbnail_url))
return BrowseMedia(
title="Music Library",
media_class=MediaClass.DIRECTORY,
media_content_id="library",
media_content_type="library",
can_play=False,
can_expand=True,
children=children,
)
def favorites_payload(favorites: SonosFavorites) -> BrowseMedia:
"""Create response payload to describe contents of a specific library.
Used by async_browse_media.
"""
children: list[BrowseMedia] = []
group_types: set[str] = {fav.reference.item_class for fav in favorites}
for group_type in sorted(group_types):
try:
media_content_type = SONOS_TYPES_MAPPING[group_type]
media_class = SONOS_TO_MEDIA_CLASSES[group_type]
except KeyError:
_LOGGER.debug("Unknown media type or class received %s", group_type)
continue
children.append(
BrowseMedia(
title=media_content_type.title(),
media_class=media_class,
media_content_id=group_type,
media_content_type="favorites_folder",
can_play=False,
can_expand=True,
)
)
return BrowseMedia(
title="Favorites",
media_class=MediaClass.DIRECTORY,
media_content_id="",
media_content_type="favorites",
can_play=False,
can_expand=True,
children=children,
)
def favorites_folder_payload(
favorites: SonosFavorites, media_content_id: str
) -> BrowseMedia:
"""Create response payload to describe all items of a type of favorite.
Used by async_browse_media.
"""
children: list[BrowseMedia] = []
content_type = SONOS_TYPES_MAPPING[media_content_id]
for favorite in favorites:
if favorite.reference.item_class != media_content_id:
continue
children.append(
BrowseMedia(
title=favorite.title,
media_class=SONOS_TO_MEDIA_CLASSES[favorite.reference.item_class],
media_content_id=favorite.item_id,
media_content_type="favorite_item_id",
can_play=True,
can_expand=False,
thumbnail=getattr(favorite, "album_art_uri", None),
)
)
return BrowseMedia(
title=content_type.title(),
media_class=MediaClass.DIRECTORY,
media_content_id="",
media_content_type="favorites",
can_play=False,
can_expand=True,
children=children,
)
def get_media_type(item: DidlObject) -> str:
"""Extract media type of item."""
if item.item_class == "object.item.audioItem.musicTrack":
return SONOS_TRACKS
if (
item.item_class == "object.container.album.musicAlbum"
and SONOS_TYPES_MAPPING.get(item.item_id.split("/")[0])
in [
SONOS_ALBUM_ARTIST,
SONOS_GENRE,
]
):
return SONOS_TYPES_MAPPING[item.item_class]
return SONOS_TYPES_MAPPING.get(item.item_id.split("/")[0], item.item_class)
def can_play(item: DidlObject) -> bool:
"""Test if playable.
Used by async_browse_media.
"""
return SONOS_TO_MEDIA_TYPES.get(item) in PLAYABLE_MEDIA_TYPES
def can_expand(item: DidlObject) -> bool:
"""Test if expandable.
Used by async_browse_media.
"""
if isinstance(item, str):
return SONOS_TYPES_MAPPING.get(item) in EXPANDABLE_MEDIA_TYPES
if SONOS_TO_MEDIA_TYPES.get(item.item_class) in EXPANDABLE_MEDIA_TYPES:
return True
return SONOS_TYPES_MAPPING.get(item.item_id) in EXPANDABLE_MEDIA_TYPES
def get_content_id(item: DidlObject) -> str:
"""Extract content id or uri."""
if item.item_class == "object.item.audioItem.musicTrack":
return cast(str, item.get_uri())
return cast(str, item.item_id)
def get_media(
media_library: MusicLibrary, item_id: str, search_type: str
) -> MusicServiceItem | None:
"""Fetch a single media/album."""
_LOGGER.debug("get_media item_id [%s], search_type [%s]", item_id, search_type)
search_type = MEDIA_TYPES_TO_SONOS.get(search_type, search_type)
if search_type == "playlists":
# Format is S:TITLE or S:ITEM_ID
splits = item_id.split(":")
title = splits[1] if len(splits) > 1 else None
return next(
(
p
for p in media_library.get_playlists()
if (item_id == p.item_id or title == p.title)
),
None,
)
if not item_id.startswith("A:ALBUM") and search_type == SONOS_ALBUM:
item_id = "A:ALBUMARTIST/" + "/".join(item_id.split("/")[2:])
if item_id.startswith("A:ALBUM/") or search_type == "tracks":
search_term = urllib.parse.unquote(item_id.split("/")[-1])
matches = media_library.get_music_library_information(
search_type, search_term=search_term, full_album_art_uri=True
)
else:
# When requesting media by album_artist, composer, genre use the browse interface
# to navigate the hierarchy. This occurs when invoked from media browser or service
# calls
# Example: A:ALBUMARTIST/Neil Young/Greatest Hits - get specific album
# Example: A:ALBUMARTIST/Neil Young - get all albums
# Others: composer, genre
# A:<topic>/<name>/<optional title>
splits = item_id.split("/")
title = urllib.parse.unquote(splits[2]) if len(splits) > 2 else None
browse_id_string = splits[0] + "/" + splits[1]
matches = media_library.browse_by_idstring(
search_type, browse_id_string, full_album_art_uri=True
)
if title:
result = next(
(item for item in matches if (title == item.title)),
None,
)
matches = [result]
_LOGGER.debug(
"get_media search_type [%s] item_id [%s] matches [%d]",
search_type,
item_id,
len(matches),
)
if len(matches) > 0:
return matches[0]
return None