core/homeassistant/components/radio_browser/media_source.py

295 lines
9.7 KiB
Python

"""Expose Radio Browser as a media source."""
from __future__ import annotations
import mimetypes
import pycountry
from radios import FilterBy, Order, RadioBrowser, Station
from homeassistant.components.media_player import MediaClass, MediaType
from homeassistant.components.media_source import (
BrowseMediaSource,
MediaSource,
MediaSourceItem,
PlayMedia,
Unresolvable,
)
from homeassistant.core import HomeAssistant, callback
from . import RadioBrowserConfigEntry
from .const import DOMAIN
CODEC_TO_MIMETYPE = {
"MP3": "audio/mpeg",
"AAC": "audio/aac",
"AAC+": "audio/aac",
"OGG": "application/ogg",
}
async def async_get_media_source(hass: HomeAssistant) -> RadioMediaSource:
"""Set up Radio Browser media source."""
# Radio browser supports only a single config entry
entry = hass.config_entries.async_entries(DOMAIN)[0]
return RadioMediaSource(hass, entry)
class RadioMediaSource(MediaSource):
"""Provide Radio stations as media sources."""
name = "Radio Browser"
def __init__(self, hass: HomeAssistant, entry: RadioBrowserConfigEntry) -> None:
"""Initialize RadioMediaSource."""
super().__init__(DOMAIN)
self.hass = hass
self.entry = entry
@property
def radios(self) -> RadioBrowser:
"""Return the radio browser."""
return self.entry.runtime_data
async def async_resolve_media(self, item: MediaSourceItem) -> PlayMedia:
"""Resolve selected Radio station to a streaming URL."""
radios = self.radios
station = await radios.station(uuid=item.identifier)
if not station:
raise Unresolvable("Radio station is no longer available")
if not (mime_type := self._async_get_station_mime_type(station)):
raise Unresolvable("Could not determine stream type of radio station")
# Register "click" with Radio Browser
await radios.station_click(uuid=station.uuid)
return PlayMedia(station.url, mime_type)
async def async_browse_media(
self,
item: MediaSourceItem,
) -> BrowseMediaSource:
"""Return media."""
radios = self.radios
return BrowseMediaSource(
domain=DOMAIN,
identifier=None,
media_class=MediaClass.CHANNEL,
media_content_type=MediaType.MUSIC,
title=self.entry.title,
can_play=False,
can_expand=True,
children_media_class=MediaClass.DIRECTORY,
children=[
*await self._async_build_popular(radios, item),
*await self._async_build_by_tag(radios, item),
*await self._async_build_by_language(radios, item),
*await self._async_build_by_country(radios, item),
],
)
@callback
@staticmethod
def _async_get_station_mime_type(station: Station) -> str | None:
"""Determine mime type of a radio station."""
mime_type = CODEC_TO_MIMETYPE.get(station.codec)
if not mime_type:
mime_type, _ = mimetypes.guess_type(station.url)
return mime_type
@callback
def _async_build_stations(
self, radios: RadioBrowser, stations: list[Station]
) -> list[BrowseMediaSource]:
"""Build list of media sources from radio stations."""
items: list[BrowseMediaSource] = []
for station in stations:
if station.codec == "UNKNOWN" or not (
mime_type := self._async_get_station_mime_type(station)
):
continue
items.append(
BrowseMediaSource(
domain=DOMAIN,
identifier=station.uuid,
media_class=MediaClass.MUSIC,
media_content_type=mime_type,
title=station.name,
can_play=True,
can_expand=False,
thumbnail=station.favicon,
)
)
return items
async def _async_build_by_country(
self, radios: RadioBrowser, item: MediaSourceItem
) -> list[BrowseMediaSource]:
"""Handle browsing radio stations by country."""
category, _, country_code = (item.identifier or "").partition("/")
if country_code:
stations = await radios.stations(
filter_by=FilterBy.COUNTRY_CODE_EXACT,
filter_term=country_code,
hide_broken=True,
order=Order.NAME,
reverse=False,
)
return self._async_build_stations(radios, stations)
# We show country in the root additionally, when there is no item
if not item.identifier or category == "country":
# Trigger the lazy loading of the country database to happen inside the executor
await self.hass.async_add_executor_job(lambda: len(pycountry.countries))
countries = await radios.countries(order=Order.NAME)
return [
BrowseMediaSource(
domain=DOMAIN,
identifier=f"country/{country.code}",
media_class=MediaClass.DIRECTORY,
media_content_type=MediaType.MUSIC,
title=country.name,
can_play=False,
can_expand=True,
thumbnail=country.favicon,
)
for country in countries
]
return []
async def _async_build_by_language(
self, radios: RadioBrowser, item: MediaSourceItem
) -> list[BrowseMediaSource]:
"""Handle browsing radio stations by language."""
category, _, language = (item.identifier or "").partition("/")
if category == "language" and language:
stations = await radios.stations(
filter_by=FilterBy.LANGUAGE_EXACT,
filter_term=language,
hide_broken=True,
order=Order.NAME,
reverse=False,
)
return self._async_build_stations(radios, stations)
if category == "language":
languages = await radios.languages(order=Order.NAME, hide_broken=True)
return [
BrowseMediaSource(
domain=DOMAIN,
identifier=f"language/{language.code}",
media_class=MediaClass.DIRECTORY,
media_content_type=MediaType.MUSIC,
title=language.name,
can_play=False,
can_expand=True,
thumbnail=language.favicon,
)
for language in languages
]
if not item.identifier:
return [
BrowseMediaSource(
domain=DOMAIN,
identifier="language",
media_class=MediaClass.DIRECTORY,
media_content_type=MediaType.MUSIC,
title="By Language",
can_play=False,
can_expand=True,
)
]
return []
async def _async_build_popular(
self, radios: RadioBrowser, item: MediaSourceItem
) -> list[BrowseMediaSource]:
"""Handle browsing popular radio stations."""
if item.identifier == "popular":
stations = await radios.stations(
hide_broken=True,
limit=250,
order=Order.CLICK_COUNT,
reverse=True,
)
return self._async_build_stations(radios, stations)
if not item.identifier:
return [
BrowseMediaSource(
domain=DOMAIN,
identifier="popular",
media_class=MediaClass.DIRECTORY,
media_content_type=MediaType.MUSIC,
title="Popular",
can_play=False,
can_expand=True,
)
]
return []
async def _async_build_by_tag(
self, radios: RadioBrowser, item: MediaSourceItem
) -> list[BrowseMediaSource]:
"""Handle browsing radio stations by tags."""
category, _, tag = (item.identifier or "").partition("/")
if category == "tag" and tag:
stations = await radios.stations(
filter_by=FilterBy.TAG_EXACT,
filter_term=tag,
hide_broken=True,
order=Order.NAME,
reverse=False,
)
return self._async_build_stations(radios, stations)
if category == "tag":
tags = await radios.tags(
hide_broken=True,
limit=100,
order=Order.STATION_COUNT,
reverse=True,
)
# Now we have the top tags, reorder them by name
tags.sort(key=lambda tag: tag.name)
return [
BrowseMediaSource(
domain=DOMAIN,
identifier=f"tag/{tag.name}",
media_class=MediaClass.DIRECTORY,
media_content_type=MediaType.MUSIC,
title=tag.name.title(),
can_play=False,
can_expand=True,
)
for tag in tags
]
if not item.identifier:
return [
BrowseMediaSource(
domain=DOMAIN,
identifier="tag",
media_class=MediaClass.DIRECTORY,
media_content_type=MediaType.MUSIC,
title="By Category",
can_play=False,
can_expand=True,
)
]
return []