core/homeassistant/components/google_photos/media_source.py

306 lines
10 KiB
Python

"""Media source for Google Photos."""
from __future__ import annotations
from dataclasses import dataclass
from enum import StrEnum
import logging
from typing import Self, cast
from google_photos_library_api.exceptions import GooglePhotosApiError
from google_photos_library_api.model import Album, MediaItem
from homeassistant.components.media_player import MediaClass, MediaType
from homeassistant.components.media_source import (
BrowseError,
BrowseMediaSource,
MediaSource,
MediaSourceItem,
PlayMedia,
)
from homeassistant.core import HomeAssistant
from . import GooglePhotosConfigEntry
from .const import DOMAIN, READ_SCOPE
_LOGGER = logging.getLogger(__name__)
MEDIA_ITEMS_PAGE_SIZE = 100
ALBUM_PAGE_SIZE = 50
THUMBNAIL_SIZE = 256
LARGE_IMAGE_SIZE = 2160
# The PhotosIdentifier can be in the following forms:
# config-entry-id
# config-entry-id/a/album-media-id
# config-entry-id/p/photo-media-id
#
# The album-media-id can contain special reserved folder names for use by
# this integration for virtual folders like the `recent` album.
class PhotosIdentifierType(StrEnum):
"""Type for a PhotosIdentifier."""
PHOTO = "p"
ALBUM = "a"
@classmethod
def of(cls, name: str) -> PhotosIdentifierType:
"""Parse a PhotosIdentifierType by string value."""
for enum in PhotosIdentifierType:
if enum.value == name:
return enum
raise ValueError(f"Invalid PhotosIdentifierType: {name}")
@dataclass
class PhotosIdentifier:
"""Google Photos item identifier in a media source URL."""
config_entry_id: str
"""Identifies the account for the media item."""
id_type: PhotosIdentifierType | None = None
"""Type of identifier"""
media_id: str | None = None
"""Identifies the album or photo contents to show."""
def as_string(self) -> str:
"""Serialize the identifier as a string."""
if self.id_type is None:
return self.config_entry_id
return f"{self.config_entry_id}/{self.id_type}/{self.media_id}"
@classmethod
def of(cls, identifier: str) -> Self:
"""Parse a PhotosIdentifier form a string."""
parts = identifier.split("/")
if len(parts) == 1:
return cls(parts[0])
if len(parts) != 3:
raise BrowseError(f"Invalid identifier: {identifier}")
return cls(parts[0], PhotosIdentifierType.of(parts[1]), parts[2])
@classmethod
def album(cls, config_entry_id: str, media_id: str) -> Self:
"""Create an album PhotosIdentifier."""
return cls(config_entry_id, PhotosIdentifierType.ALBUM, media_id)
@classmethod
def photo(cls, config_entry_id: str, media_id: str) -> Self:
"""Create an album PhotosIdentifier."""
return cls(config_entry_id, PhotosIdentifierType.PHOTO, media_id)
async def async_get_media_source(hass: HomeAssistant) -> MediaSource:
"""Set up Google Photos media source."""
return GooglePhotosMediaSource(hass)
class GooglePhotosMediaSource(MediaSource):
"""Provide Google Photos as media sources."""
name = "Google Photos"
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize Google Photos source."""
super().__init__(DOMAIN)
self.hass = hass
async def async_resolve_media(self, item: MediaSourceItem) -> PlayMedia:
"""Resolve media identifier to a url.
This will resolve a specific media item to a url for the full photo or video contents.
"""
try:
identifier = PhotosIdentifier.of(item.identifier)
except ValueError as err:
raise BrowseError(f"Could not parse identifier: {item.identifier}") from err
if (
identifier.media_id is None
or identifier.id_type != PhotosIdentifierType.PHOTO
):
raise BrowseError(
f"Could not resolve identiifer that is not a Photo: {identifier}"
)
entry = self._async_config_entry(identifier.config_entry_id)
client = entry.runtime_data.client
media_item = await client.get_media_item(media_item_id=identifier.media_id)
if not media_item.mime_type:
raise BrowseError("Could not determine mime type of media item")
if media_item.media_metadata and (media_item.media_metadata.video is not None):
url = _video_url(media_item)
else:
url = _media_url(media_item, LARGE_IMAGE_SIZE)
return PlayMedia(
url=url,
mime_type=media_item.mime_type,
)
async def async_browse_media(self, item: MediaSourceItem) -> BrowseMediaSource:
"""Return details about the media source.
This renders the multi-level album structure for an account, its albums,
or the contents of an album. This will return a BrowseMediaSource with a
single level of children at the next level of the hierarchy.
"""
if not item.identifier:
# Top level view that lists all accounts.
return BrowseMediaSource(
domain=DOMAIN,
identifier=None,
media_class=MediaClass.DIRECTORY,
media_content_type=MediaClass.IMAGE,
title="Google Photos",
can_play=False,
can_expand=True,
children_media_class=MediaClass.DIRECTORY,
children=[
_build_account(entry, PhotosIdentifier(cast(str, entry.unique_id)))
for entry in self._async_config_entries()
],
)
# Determine the configuration entry for this item
identifier = PhotosIdentifier.of(item.identifier)
entry = self._async_config_entry(identifier.config_entry_id)
coordinator = entry.runtime_data
client = coordinator.client
source = _build_account(entry, identifier)
if identifier.id_type is None:
albums = await coordinator.list_albums()
source.children = [
_build_album(
album.title,
PhotosIdentifier.album(
identifier.config_entry_id,
album.id,
),
_cover_photo_url(album, THUMBNAIL_SIZE),
)
for album in albums
]
return source
if (
identifier.id_type != PhotosIdentifierType.ALBUM
or identifier.media_id is None
):
raise BrowseError(f"Unsupported identifier: {identifier}")
media_items: list[MediaItem] = []
try:
async for media_item_result in await client.list_media_items(
album_id=identifier.media_id, page_size=MEDIA_ITEMS_PAGE_SIZE
):
media_items.extend(media_item_result.media_items)
except GooglePhotosApiError as err:
raise BrowseError(f"Error listing media items: {err}") from err
source.children = [
_build_media_item(
PhotosIdentifier.photo(identifier.config_entry_id, media_item.id),
media_item,
)
for media_item in media_items
]
return source
def _async_config_entries(self) -> list[GooglePhotosConfigEntry]:
"""Return all config entries that support photo library reads."""
entries = []
for entry in self.hass.config_entries.async_loaded_entries(DOMAIN):
scopes = entry.data["token"]["scope"].split(" ")
if READ_SCOPE in scopes:
entries.append(entry)
return entries
def _async_config_entry(self, config_entry_id: str) -> GooglePhotosConfigEntry:
"""Return a config entry with the specified id."""
entry = self.hass.config_entries.async_entry_for_domain_unique_id(
DOMAIN, config_entry_id
)
if not entry:
raise BrowseError(
f"Could not find config entry for identifier: {config_entry_id}"
)
return entry
def _build_account(
config_entry: GooglePhotosConfigEntry,
identifier: PhotosIdentifier,
) -> BrowseMediaSource:
"""Build the root node for a Google Photos account for a config entry."""
return BrowseMediaSource(
domain=DOMAIN,
identifier=identifier.as_string(),
media_class=MediaClass.DIRECTORY,
media_content_type=MediaClass.IMAGE,
title=config_entry.title,
can_play=False,
can_expand=True,
)
def _build_album(
title: str, identifier: PhotosIdentifier, thumbnail_url: str | None = None
) -> BrowseMediaSource:
"""Build an album node."""
return BrowseMediaSource(
domain=DOMAIN,
identifier=identifier.as_string(),
media_class=MediaClass.ALBUM,
media_content_type=MediaClass.ALBUM,
title=title,
can_play=False,
can_expand=True,
thumbnail=thumbnail_url,
)
def _build_media_item(
identifier: PhotosIdentifier,
media_item: MediaItem,
) -> BrowseMediaSource:
"""Build the node for an individual photo or video."""
is_video = media_item.media_metadata and (
media_item.media_metadata.video is not None
)
return BrowseMediaSource(
domain=DOMAIN,
identifier=identifier.as_string(),
media_class=MediaClass.IMAGE if not is_video else MediaClass.VIDEO,
media_content_type=MediaType.IMAGE if not is_video else MediaType.VIDEO,
title=media_item.filename,
can_play=is_video,
can_expand=False,
thumbnail=_media_url(media_item, THUMBNAIL_SIZE),
)
def _media_url(media_item: MediaItem, max_size: int) -> str:
"""Return a media item url with the specified max thumbnail size on the longest edge.
See https://developers.google.com/photos/library/guides/access-media-items#base-urls
"""
return f"{media_item.base_url}=h{max_size}"
def _video_url(media_item: MediaItem) -> str:
"""Return a video url for the item.
See https://developers.google.com/photos/library/guides/access-media-items#base-urls
"""
return f"{media_item.base_url}=dv"
def _cover_photo_url(album: Album, max_size: int) -> str:
"""Return a media item url for the cover photo of the album."""
return f"{album.cover_photo_base_url}=h{max_size}"