mirror of https://github.com/home-assistant/core
152 lines
4.3 KiB
Python
152 lines
4.3 KiB
Python
"""Onkyo receiver."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from collections.abc import Callable, Iterable
|
|
import contextlib
|
|
from dataclasses import dataclass, field
|
|
import logging
|
|
from typing import Any
|
|
|
|
import pyeiscp
|
|
|
|
from .const import DEVICE_DISCOVERY_TIMEOUT, DEVICE_INTERVIEW_TIMEOUT, ZONES
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class Callbacks:
|
|
"""Onkyo Receiver Callbacks."""
|
|
|
|
connect: list[Callable[[Receiver], None]] = field(default_factory=list)
|
|
update: list[Callable[[Receiver, tuple[str, str, Any]], None]] = field(
|
|
default_factory=list
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class Receiver:
|
|
"""Onkyo receiver."""
|
|
|
|
conn: pyeiscp.Connection
|
|
model_name: str
|
|
identifier: str
|
|
host: str
|
|
first_connect: bool = True
|
|
callbacks: Callbacks = field(default_factory=Callbacks)
|
|
|
|
@classmethod
|
|
async def async_create(cls, info: ReceiverInfo) -> Receiver:
|
|
"""Set up Onkyo Receiver."""
|
|
|
|
receiver: Receiver | None = None
|
|
|
|
def on_connect(_origin: str) -> None:
|
|
assert receiver is not None
|
|
receiver.on_connect()
|
|
|
|
def on_update(message: tuple[str, str, Any], _origin: str) -> None:
|
|
assert receiver is not None
|
|
receiver.on_update(message)
|
|
|
|
_LOGGER.debug("Creating receiver: %s (%s)", info.model_name, info.host)
|
|
|
|
connection = await pyeiscp.Connection.create(
|
|
host=info.host,
|
|
port=info.port,
|
|
connect_callback=on_connect,
|
|
update_callback=on_update,
|
|
auto_connect=False,
|
|
)
|
|
|
|
return (
|
|
receiver := cls(
|
|
conn=connection,
|
|
model_name=info.model_name,
|
|
identifier=info.identifier,
|
|
host=info.host,
|
|
)
|
|
)
|
|
|
|
def on_connect(self) -> None:
|
|
"""Receiver (re)connected."""
|
|
_LOGGER.debug("Receiver (re)connected: %s (%s)", self.model_name, self.host)
|
|
|
|
# Discover what zones are available for the receiver by querying the power.
|
|
# If we get a response for the specific zone, it means it is available.
|
|
for zone in ZONES:
|
|
self.conn.query_property(zone, "power")
|
|
|
|
for callback in self.callbacks.connect:
|
|
callback(self)
|
|
|
|
self.first_connect = False
|
|
|
|
def on_update(self, message: tuple[str, str, Any]) -> None:
|
|
"""Process new message from the receiver."""
|
|
_LOGGER.debug("Received update callback from %s: %s", self.model_name, message)
|
|
for callback in self.callbacks.update:
|
|
callback(self, message)
|
|
|
|
|
|
@dataclass
|
|
class ReceiverInfo:
|
|
"""Onkyo receiver information."""
|
|
|
|
host: str
|
|
port: int
|
|
model_name: str
|
|
identifier: str
|
|
|
|
|
|
async def async_interview(host: str) -> ReceiverInfo | None:
|
|
"""Interview Onkyo Receiver."""
|
|
_LOGGER.debug("Interviewing receiver: %s", host)
|
|
|
|
receiver_info: ReceiverInfo | None = None
|
|
|
|
event = asyncio.Event()
|
|
|
|
async def _callback(conn: pyeiscp.Connection) -> None:
|
|
"""Receiver interviewed, connection not yet active."""
|
|
nonlocal receiver_info
|
|
if receiver_info is None:
|
|
info = ReceiverInfo(host, conn.port, conn.name, conn.identifier)
|
|
_LOGGER.debug("Receiver interviewed: %s (%s)", info.model_name, info.host)
|
|
receiver_info = info
|
|
event.set()
|
|
|
|
timeout = DEVICE_INTERVIEW_TIMEOUT
|
|
|
|
await pyeiscp.Connection.discover(
|
|
host=host, discovery_callback=_callback, timeout=timeout
|
|
)
|
|
|
|
with contextlib.suppress(asyncio.TimeoutError):
|
|
await asyncio.wait_for(event.wait(), timeout)
|
|
|
|
return receiver_info
|
|
|
|
|
|
async def async_discover() -> Iterable[ReceiverInfo]:
|
|
"""Discover Onkyo Receivers."""
|
|
_LOGGER.debug("Discovering receivers")
|
|
|
|
receiver_infos: list[ReceiverInfo] = []
|
|
|
|
async def _callback(conn: pyeiscp.Connection) -> None:
|
|
"""Receiver discovered, connection not yet active."""
|
|
info = ReceiverInfo(conn.host, conn.port, conn.name, conn.identifier)
|
|
_LOGGER.debug("Receiver discovered: %s (%s)", info.model_name, info.host)
|
|
receiver_infos.append(info)
|
|
|
|
timeout = DEVICE_DISCOVERY_TIMEOUT
|
|
|
|
await pyeiscp.Connection.discover(discovery_callback=_callback, timeout=timeout)
|
|
|
|
await asyncio.sleep(timeout)
|
|
|
|
return receiver_infos
|