core/homeassistant/components/squeezebox/coordinator.py

121 lines
4.2 KiB
Python

"""DataUpdateCoordinator for the Squeezebox integration."""
from asyncio import timeout
from collections.abc import Callable
from datetime import timedelta
import logging
import re
from typing import Any
from pysqueezebox import Player, Server
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.util import dt as dt_util
from .const import (
PLAYER_UPDATE_INTERVAL,
SENSOR_UPDATE_INTERVAL,
SIGNAL_PLAYER_REDISCOVERED,
STATUS_API_TIMEOUT,
STATUS_SENSOR_LASTSCAN,
STATUS_SENSOR_NEEDSRESTART,
STATUS_SENSOR_RESCAN,
)
_LOGGER = logging.getLogger(__name__)
class LMSStatusDataUpdateCoordinator(DataUpdateCoordinator):
"""LMS Status custom coordinator."""
def __init__(self, hass: HomeAssistant, lms: Server) -> None:
"""Initialize my coordinator."""
super().__init__(
hass,
_LOGGER,
name=lms.name,
update_interval=timedelta(seconds=SENSOR_UPDATE_INTERVAL),
always_update=False,
)
self.lms = lms
self.newversion_regex = re.compile("<.*$")
async def _async_update_data(self) -> dict:
"""Fetch data from LMS status call.
Then we process only a subset to make then nice for HA
"""
async with timeout(STATUS_API_TIMEOUT):
data = await self.lms.async_status()
if not data:
raise UpdateFailed("No data from status poll")
_LOGGER.debug("Raw serverstatus %s=%s", self.lms.name, data)
return self._prepare_status_data(data)
def _prepare_status_data(self, data: dict) -> dict:
"""Sensors that need the data changing for HA presentation."""
# Binary sensors
# rescan bool are we rescanning alter poll not present if false
data[STATUS_SENSOR_RESCAN] = STATUS_SENSOR_RESCAN in data
# needsrestart bool pending lms plugin updates not present if false
data[STATUS_SENSOR_NEEDSRESTART] = STATUS_SENSOR_NEEDSRESTART in data
# Sensors that need special handling
# 'lastscan': '1718431678', epoc -> ISO 8601 not always present
data[STATUS_SENSOR_LASTSCAN] = (
dt_util.utc_from_timestamp(int(data[STATUS_SENSOR_LASTSCAN]))
if STATUS_SENSOR_LASTSCAN in data
else None
)
_LOGGER.debug("Processed serverstatus %s=%s", self.lms.name, data)
return data
class SqueezeBoxPlayerUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"""Coordinator for Squeezebox players."""
def __init__(self, hass: HomeAssistant, player: Player, server_uuid: str) -> None:
"""Initialize the coordinator."""
super().__init__(
hass,
_LOGGER,
name=player.name,
update_interval=timedelta(seconds=PLAYER_UPDATE_INTERVAL),
always_update=True,
)
self.player = player
self.available = True
self._remove_dispatcher: Callable | None = None
self.server_uuid = server_uuid
async def _async_update_data(self) -> dict[str, Any]:
"""Update Player if available, or listen for rediscovery if not."""
if self.available:
# Only update players available at last update, unavailable players are rediscovered instead
await self.player.async_update()
if self.player.connected is False:
_LOGGER.debug("Player %s is not available", self.name)
self.available = False
# start listening for restored players
self._remove_dispatcher = async_dispatcher_connect(
self.hass, SIGNAL_PLAYER_REDISCOVERED, self.rediscovered
)
return {}
@callback
def rediscovered(self, unique_id: str, connected: bool) -> None:
"""Make a player available again."""
if unique_id == self.player.player_id and connected:
self.available = True
_LOGGER.debug("Player %s is available again", self.name)
if self._remove_dispatcher:
self._remove_dispatcher()