mirror of https://github.com/home-assistant/core
375 lines
12 KiB
Python
375 lines
12 KiB
Python
"""Arcam media player."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Callable, Coroutine
|
|
import functools
|
|
import logging
|
|
from typing import Any
|
|
|
|
from arcam.fmj import ConnectionFailed, SourceCodes
|
|
from arcam.fmj.state import State
|
|
|
|
from homeassistant.components.media_player import (
|
|
BrowseError,
|
|
BrowseMedia,
|
|
MediaClass,
|
|
MediaPlayerEntity,
|
|
MediaPlayerEntityFeature,
|
|
MediaPlayerState,
|
|
MediaType,
|
|
)
|
|
from homeassistant.const import ATTR_ENTITY_ID
|
|
from homeassistant.core import HomeAssistant, callback
|
|
from homeassistant.exceptions import HomeAssistantError
|
|
from homeassistant.helpers.device_registry import DeviceInfo
|
|
from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
|
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
|
|
|
from . import ArcamFmjConfigEntry
|
|
from .const import (
|
|
DOMAIN,
|
|
EVENT_TURN_ON,
|
|
SIGNAL_CLIENT_DATA,
|
|
SIGNAL_CLIENT_STARTED,
|
|
SIGNAL_CLIENT_STOPPED,
|
|
)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
async def async_setup_entry(
|
|
hass: HomeAssistant,
|
|
config_entry: ArcamFmjConfigEntry,
|
|
async_add_entities: AddEntitiesCallback,
|
|
) -> None:
|
|
"""Set up the configuration entry."""
|
|
|
|
client = config_entry.runtime_data
|
|
|
|
async_add_entities(
|
|
[
|
|
ArcamFmj(
|
|
config_entry.title,
|
|
State(client, zone),
|
|
config_entry.unique_id or config_entry.entry_id,
|
|
)
|
|
for zone in (1, 2)
|
|
],
|
|
True,
|
|
)
|
|
|
|
|
|
def convert_exception[**_P, _R](
|
|
func: Callable[_P, Coroutine[Any, Any, _R]],
|
|
) -> Callable[_P, Coroutine[Any, Any, _R]]:
|
|
"""Return decorator to convert a connection error into a home assistant error."""
|
|
|
|
@functools.wraps(func)
|
|
async def _convert_exception(*args: _P.args, **kwargs: _P.kwargs) -> _R:
|
|
try:
|
|
return await func(*args, **kwargs)
|
|
except ConnectionFailed as exception:
|
|
raise HomeAssistantError(
|
|
f"Connection failed to device during {func}"
|
|
) from exception
|
|
|
|
return _convert_exception
|
|
|
|
|
|
class ArcamFmj(MediaPlayerEntity):
|
|
"""Representation of a media device."""
|
|
|
|
_attr_should_poll = False
|
|
_attr_has_entity_name = True
|
|
|
|
def __init__(
|
|
self,
|
|
device_name: str,
|
|
state: State,
|
|
uuid: str,
|
|
) -> None:
|
|
"""Initialize device."""
|
|
self._state = state
|
|
self._attr_name = f"Zone {state.zn}"
|
|
self._attr_supported_features = (
|
|
MediaPlayerEntityFeature.SELECT_SOURCE
|
|
| MediaPlayerEntityFeature.PLAY_MEDIA
|
|
| MediaPlayerEntityFeature.BROWSE_MEDIA
|
|
| MediaPlayerEntityFeature.VOLUME_SET
|
|
| MediaPlayerEntityFeature.VOLUME_MUTE
|
|
| MediaPlayerEntityFeature.VOLUME_STEP
|
|
| MediaPlayerEntityFeature.TURN_OFF
|
|
| MediaPlayerEntityFeature.TURN_ON
|
|
)
|
|
if state.zn == 1:
|
|
self._attr_supported_features |= MediaPlayerEntityFeature.SELECT_SOUND_MODE
|
|
self._attr_unique_id = f"{uuid}-{state.zn}"
|
|
self._attr_entity_registry_enabled_default = state.zn == 1
|
|
self._attr_device_info = DeviceInfo(
|
|
identifiers={
|
|
(DOMAIN, uuid),
|
|
},
|
|
manufacturer="Arcam",
|
|
model="Arcam FMJ AVR",
|
|
name=device_name,
|
|
)
|
|
|
|
@property
|
|
def state(self) -> MediaPlayerState:
|
|
"""Return the state of the device."""
|
|
if self._state.get_power():
|
|
return MediaPlayerState.ON
|
|
return MediaPlayerState.OFF
|
|
|
|
async def async_added_to_hass(self) -> None:
|
|
"""Once registered, add listener for events."""
|
|
await self._state.start()
|
|
try:
|
|
await self._state.update()
|
|
except ConnectionFailed as connection:
|
|
_LOGGER.debug("Connection lost during addition: %s", connection)
|
|
|
|
@callback
|
|
def _data(host: str) -> None:
|
|
if host == self._state.client.host:
|
|
self.async_write_ha_state()
|
|
|
|
@callback
|
|
def _started(host: str) -> None:
|
|
if host == self._state.client.host:
|
|
self.async_schedule_update_ha_state(force_refresh=True)
|
|
|
|
@callback
|
|
def _stopped(host: str) -> None:
|
|
if host == self._state.client.host:
|
|
self.async_schedule_update_ha_state(force_refresh=True)
|
|
|
|
self.async_on_remove(
|
|
async_dispatcher_connect(self.hass, SIGNAL_CLIENT_DATA, _data)
|
|
)
|
|
|
|
self.async_on_remove(
|
|
async_dispatcher_connect(self.hass, SIGNAL_CLIENT_STARTED, _started)
|
|
)
|
|
|
|
self.async_on_remove(
|
|
async_dispatcher_connect(self.hass, SIGNAL_CLIENT_STOPPED, _stopped)
|
|
)
|
|
|
|
async def async_update(self) -> None:
|
|
"""Force update of state."""
|
|
_LOGGER.debug("Update state %s", self.name)
|
|
try:
|
|
await self._state.update()
|
|
except ConnectionFailed as connection:
|
|
_LOGGER.debug("Connection lost during update: %s", connection)
|
|
|
|
@convert_exception
|
|
async def async_mute_volume(self, mute: bool) -> None:
|
|
"""Send mute command."""
|
|
await self._state.set_mute(mute)
|
|
self.async_write_ha_state()
|
|
|
|
@convert_exception
|
|
async def async_select_source(self, source: str) -> None:
|
|
"""Select a specific source."""
|
|
try:
|
|
value = SourceCodes[source]
|
|
except KeyError:
|
|
_LOGGER.error("Unsupported source %s", source)
|
|
return
|
|
|
|
await self._state.set_source(value)
|
|
self.async_write_ha_state()
|
|
|
|
@convert_exception
|
|
async def async_select_sound_mode(self, sound_mode: str) -> None:
|
|
"""Select a specific source."""
|
|
try:
|
|
await self._state.set_decode_mode(sound_mode)
|
|
except (KeyError, ValueError) as exception:
|
|
raise HomeAssistantError(
|
|
f"Unsupported sound_mode {sound_mode}"
|
|
) from exception
|
|
|
|
self.async_write_ha_state()
|
|
|
|
@convert_exception
|
|
async def async_set_volume_level(self, volume: float) -> None:
|
|
"""Set volume level, range 0..1."""
|
|
await self._state.set_volume(round(volume * 99.0))
|
|
self.async_write_ha_state()
|
|
|
|
@convert_exception
|
|
async def async_volume_up(self) -> None:
|
|
"""Turn volume up for media player."""
|
|
await self._state.inc_volume()
|
|
self.async_write_ha_state()
|
|
|
|
@convert_exception
|
|
async def async_volume_down(self) -> None:
|
|
"""Turn volume up for media player."""
|
|
await self._state.dec_volume()
|
|
self.async_write_ha_state()
|
|
|
|
@convert_exception
|
|
async def async_turn_on(self) -> None:
|
|
"""Turn the media player on."""
|
|
if self._state.get_power() is not None:
|
|
_LOGGER.debug("Turning on device using connection")
|
|
await self._state.set_power(True)
|
|
else:
|
|
_LOGGER.debug("Firing event to turn on device")
|
|
self.hass.bus.async_fire(EVENT_TURN_ON, {ATTR_ENTITY_ID: self.entity_id})
|
|
|
|
@convert_exception
|
|
async def async_turn_off(self) -> None:
|
|
"""Turn the media player off."""
|
|
await self._state.set_power(False)
|
|
|
|
async def async_browse_media(
|
|
self,
|
|
media_content_type: MediaType | str | None = None,
|
|
media_content_id: str | None = None,
|
|
) -> BrowseMedia:
|
|
"""Implement the websocket media browsing helper."""
|
|
if media_content_id not in (None, "root"):
|
|
raise BrowseError(
|
|
f"Media not found: {media_content_type} / {media_content_id}"
|
|
)
|
|
|
|
presets = self._state.get_preset_details()
|
|
|
|
radio = [
|
|
BrowseMedia(
|
|
title=preset.name,
|
|
media_class=MediaClass.MUSIC,
|
|
media_content_id=f"preset:{preset.index}",
|
|
media_content_type=MediaType.MUSIC,
|
|
can_play=True,
|
|
can_expand=False,
|
|
)
|
|
for preset in presets.values()
|
|
]
|
|
|
|
return BrowseMedia(
|
|
title="Arcam FMJ Receiver",
|
|
media_class=MediaClass.DIRECTORY,
|
|
media_content_id="root",
|
|
media_content_type="library",
|
|
can_play=False,
|
|
can_expand=True,
|
|
children=radio,
|
|
)
|
|
|
|
@convert_exception
|
|
async def async_play_media(
|
|
self, media_type: MediaType | str, media_id: str, **kwargs: Any
|
|
) -> None:
|
|
"""Play media."""
|
|
|
|
if media_id.startswith("preset:"):
|
|
preset = int(media_id[7:])
|
|
await self._state.set_tuner_preset(preset)
|
|
else:
|
|
_LOGGER.error("Media %s is not supported", media_id)
|
|
return
|
|
|
|
@property
|
|
def source(self) -> str | None:
|
|
"""Return the current input source."""
|
|
if (value := self._state.get_source()) is None:
|
|
return None
|
|
return value.name
|
|
|
|
@property
|
|
def source_list(self) -> list[str]:
|
|
"""List of available input sources."""
|
|
return [x.name for x in self._state.get_source_list()]
|
|
|
|
@property
|
|
def sound_mode(self) -> str | None:
|
|
"""Name of the current sound mode."""
|
|
if (value := self._state.get_decode_mode()) is None:
|
|
return None
|
|
return value.name
|
|
|
|
@property
|
|
def sound_mode_list(self) -> list[str] | None:
|
|
"""List of available sound modes."""
|
|
if (values := self._state.get_decode_modes()) is None:
|
|
return None
|
|
return [x.name for x in values]
|
|
|
|
@property
|
|
def is_volume_muted(self) -> bool | None:
|
|
"""Boolean if volume is currently muted."""
|
|
if (value := self._state.get_mute()) is None:
|
|
return None
|
|
return value
|
|
|
|
@property
|
|
def volume_level(self) -> float | None:
|
|
"""Volume level of device."""
|
|
if (value := self._state.get_volume()) is None:
|
|
return None
|
|
return value / 99.0
|
|
|
|
@property
|
|
def media_content_type(self) -> MediaType | None:
|
|
"""Content type of current playing media."""
|
|
source = self._state.get_source()
|
|
if source in (SourceCodes.DAB, SourceCodes.FM):
|
|
value = MediaType.MUSIC
|
|
else:
|
|
value = None
|
|
return value
|
|
|
|
@property
|
|
def media_content_id(self) -> str | None:
|
|
"""Content type of current playing media."""
|
|
source = self._state.get_source()
|
|
if source in (SourceCodes.DAB, SourceCodes.FM):
|
|
if preset := self._state.get_tuner_preset():
|
|
value = f"preset:{preset}"
|
|
else:
|
|
value = None
|
|
else:
|
|
value = None
|
|
|
|
return value
|
|
|
|
@property
|
|
def media_channel(self) -> str | None:
|
|
"""Channel currently playing."""
|
|
source = self._state.get_source()
|
|
if source == SourceCodes.DAB:
|
|
value = self._state.get_dab_station()
|
|
elif source == SourceCodes.FM:
|
|
value = self._state.get_rds_information()
|
|
else:
|
|
value = None
|
|
return value
|
|
|
|
@property
|
|
def media_artist(self) -> str | None:
|
|
"""Artist of current playing media, music track only."""
|
|
if self._state.get_source() == SourceCodes.DAB:
|
|
value = self._state.get_dls_pdt()
|
|
else:
|
|
value = None
|
|
return value
|
|
|
|
@property
|
|
def media_title(self) -> str | None:
|
|
"""Title of current playing media."""
|
|
if (source := self._state.get_source()) is None:
|
|
return None
|
|
|
|
if channel := self.media_channel:
|
|
value = f"{source.name} - {channel}"
|
|
else:
|
|
value = source.name
|
|
return value
|