core/homeassistant/components/arcam_fmj/media_player.py

375 lines
12 KiB
Python

"""Arcam media player."""
from __future__ import annotations
from collections.abc import Callable, Coroutine
import functools
import logging
from typing import Any
from arcam.fmj import ConnectionFailed, SourceCodes
from arcam.fmj.state import State
from homeassistant.components.media_player import (
BrowseError,
BrowseMedia,
MediaClass,
MediaPlayerEntity,
MediaPlayerEntityFeature,
MediaPlayerState,
MediaType,
)
from homeassistant.const import ATTR_ENTITY_ID
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import ArcamFmjConfigEntry
from .const import (
DOMAIN,
EVENT_TURN_ON,
SIGNAL_CLIENT_DATA,
SIGNAL_CLIENT_STARTED,
SIGNAL_CLIENT_STOPPED,
)
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ArcamFmjConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the configuration entry."""
client = config_entry.runtime_data
async_add_entities(
[
ArcamFmj(
config_entry.title,
State(client, zone),
config_entry.unique_id or config_entry.entry_id,
)
for zone in (1, 2)
],
True,
)
def convert_exception[**_P, _R](
func: Callable[_P, Coroutine[Any, Any, _R]],
) -> Callable[_P, Coroutine[Any, Any, _R]]:
"""Return decorator to convert a connection error into a home assistant error."""
@functools.wraps(func)
async def _convert_exception(*args: _P.args, **kwargs: _P.kwargs) -> _R:
try:
return await func(*args, **kwargs)
except ConnectionFailed as exception:
raise HomeAssistantError(
f"Connection failed to device during {func}"
) from exception
return _convert_exception
class ArcamFmj(MediaPlayerEntity):
"""Representation of a media device."""
_attr_should_poll = False
_attr_has_entity_name = True
def __init__(
self,
device_name: str,
state: State,
uuid: str,
) -> None:
"""Initialize device."""
self._state = state
self._attr_name = f"Zone {state.zn}"
self._attr_supported_features = (
MediaPlayerEntityFeature.SELECT_SOURCE
| MediaPlayerEntityFeature.PLAY_MEDIA
| MediaPlayerEntityFeature.BROWSE_MEDIA
| MediaPlayerEntityFeature.VOLUME_SET
| MediaPlayerEntityFeature.VOLUME_MUTE
| MediaPlayerEntityFeature.VOLUME_STEP
| MediaPlayerEntityFeature.TURN_OFF
| MediaPlayerEntityFeature.TURN_ON
)
if state.zn == 1:
self._attr_supported_features |= MediaPlayerEntityFeature.SELECT_SOUND_MODE
self._attr_unique_id = f"{uuid}-{state.zn}"
self._attr_entity_registry_enabled_default = state.zn == 1
self._attr_device_info = DeviceInfo(
identifiers={
(DOMAIN, uuid),
},
manufacturer="Arcam",
model="Arcam FMJ AVR",
name=device_name,
)
@property
def state(self) -> MediaPlayerState:
"""Return the state of the device."""
if self._state.get_power():
return MediaPlayerState.ON
return MediaPlayerState.OFF
async def async_added_to_hass(self) -> None:
"""Once registered, add listener for events."""
await self._state.start()
try:
await self._state.update()
except ConnectionFailed as connection:
_LOGGER.debug("Connection lost during addition: %s", connection)
@callback
def _data(host: str) -> None:
if host == self._state.client.host:
self.async_write_ha_state()
@callback
def _started(host: str) -> None:
if host == self._state.client.host:
self.async_schedule_update_ha_state(force_refresh=True)
@callback
def _stopped(host: str) -> None:
if host == self._state.client.host:
self.async_schedule_update_ha_state(force_refresh=True)
self.async_on_remove(
async_dispatcher_connect(self.hass, SIGNAL_CLIENT_DATA, _data)
)
self.async_on_remove(
async_dispatcher_connect(self.hass, SIGNAL_CLIENT_STARTED, _started)
)
self.async_on_remove(
async_dispatcher_connect(self.hass, SIGNAL_CLIENT_STOPPED, _stopped)
)
async def async_update(self) -> None:
"""Force update of state."""
_LOGGER.debug("Update state %s", self.name)
try:
await self._state.update()
except ConnectionFailed as connection:
_LOGGER.debug("Connection lost during update: %s", connection)
@convert_exception
async def async_mute_volume(self, mute: bool) -> None:
"""Send mute command."""
await self._state.set_mute(mute)
self.async_write_ha_state()
@convert_exception
async def async_select_source(self, source: str) -> None:
"""Select a specific source."""
try:
value = SourceCodes[source]
except KeyError:
_LOGGER.error("Unsupported source %s", source)
return
await self._state.set_source(value)
self.async_write_ha_state()
@convert_exception
async def async_select_sound_mode(self, sound_mode: str) -> None:
"""Select a specific source."""
try:
await self._state.set_decode_mode(sound_mode)
except (KeyError, ValueError) as exception:
raise HomeAssistantError(
f"Unsupported sound_mode {sound_mode}"
) from exception
self.async_write_ha_state()
@convert_exception
async def async_set_volume_level(self, volume: float) -> None:
"""Set volume level, range 0..1."""
await self._state.set_volume(round(volume * 99.0))
self.async_write_ha_state()
@convert_exception
async def async_volume_up(self) -> None:
"""Turn volume up for media player."""
await self._state.inc_volume()
self.async_write_ha_state()
@convert_exception
async def async_volume_down(self) -> None:
"""Turn volume up for media player."""
await self._state.dec_volume()
self.async_write_ha_state()
@convert_exception
async def async_turn_on(self) -> None:
"""Turn the media player on."""
if self._state.get_power() is not None:
_LOGGER.debug("Turning on device using connection")
await self._state.set_power(True)
else:
_LOGGER.debug("Firing event to turn on device")
self.hass.bus.async_fire(EVENT_TURN_ON, {ATTR_ENTITY_ID: self.entity_id})
@convert_exception
async def async_turn_off(self) -> None:
"""Turn the media player off."""
await self._state.set_power(False)
async def async_browse_media(
self,
media_content_type: MediaType | str | None = None,
media_content_id: str | None = None,
) -> BrowseMedia:
"""Implement the websocket media browsing helper."""
if media_content_id not in (None, "root"):
raise BrowseError(
f"Media not found: {media_content_type} / {media_content_id}"
)
presets = self._state.get_preset_details()
radio = [
BrowseMedia(
title=preset.name,
media_class=MediaClass.MUSIC,
media_content_id=f"preset:{preset.index}",
media_content_type=MediaType.MUSIC,
can_play=True,
can_expand=False,
)
for preset in presets.values()
]
return BrowseMedia(
title="Arcam FMJ Receiver",
media_class=MediaClass.DIRECTORY,
media_content_id="root",
media_content_type="library",
can_play=False,
can_expand=True,
children=radio,
)
@convert_exception
async def async_play_media(
self, media_type: MediaType | str, media_id: str, **kwargs: Any
) -> None:
"""Play media."""
if media_id.startswith("preset:"):
preset = int(media_id[7:])
await self._state.set_tuner_preset(preset)
else:
_LOGGER.error("Media %s is not supported", media_id)
return
@property
def source(self) -> str | None:
"""Return the current input source."""
if (value := self._state.get_source()) is None:
return None
return value.name
@property
def source_list(self) -> list[str]:
"""List of available input sources."""
return [x.name for x in self._state.get_source_list()]
@property
def sound_mode(self) -> str | None:
"""Name of the current sound mode."""
if (value := self._state.get_decode_mode()) is None:
return None
return value.name
@property
def sound_mode_list(self) -> list[str] | None:
"""List of available sound modes."""
if (values := self._state.get_decode_modes()) is None:
return None
return [x.name for x in values]
@property
def is_volume_muted(self) -> bool | None:
"""Boolean if volume is currently muted."""
if (value := self._state.get_mute()) is None:
return None
return value
@property
def volume_level(self) -> float | None:
"""Volume level of device."""
if (value := self._state.get_volume()) is None:
return None
return value / 99.0
@property
def media_content_type(self) -> MediaType | None:
"""Content type of current playing media."""
source = self._state.get_source()
if source in (SourceCodes.DAB, SourceCodes.FM):
value = MediaType.MUSIC
else:
value = None
return value
@property
def media_content_id(self) -> str | None:
"""Content type of current playing media."""
source = self._state.get_source()
if source in (SourceCodes.DAB, SourceCodes.FM):
if preset := self._state.get_tuner_preset():
value = f"preset:{preset}"
else:
value = None
else:
value = None
return value
@property
def media_channel(self) -> str | None:
"""Channel currently playing."""
source = self._state.get_source()
if source == SourceCodes.DAB:
value = self._state.get_dab_station()
elif source == SourceCodes.FM:
value = self._state.get_rds_information()
else:
value = None
return value
@property
def media_artist(self) -> str | None:
"""Artist of current playing media, music track only."""
if self._state.get_source() == SourceCodes.DAB:
value = self._state.get_dls_pdt()
else:
value = None
return value
@property
def media_title(self) -> str | None:
"""Title of current playing media."""
if (source := self._state.get_source()) is None:
return None
if channel := self.media_channel:
value = f"{source.name} - {channel}"
else:
value = source.name
return value