mirror of https://github.com/home-assistant/core
242 lines
7.5 KiB
Python
242 lines
7.5 KiB
Python
"""Support for LIFX."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from collections.abc import Callable
|
|
from functools import partial
|
|
from typing import Any
|
|
|
|
from aiolifx import products
|
|
from aiolifx.aiolifx import Light
|
|
from aiolifx.message import Message
|
|
from awesomeversion import AwesomeVersion
|
|
|
|
from homeassistant.components.light import (
|
|
ATTR_BRIGHTNESS,
|
|
ATTR_BRIGHTNESS_PCT,
|
|
ATTR_COLOR_NAME,
|
|
ATTR_COLOR_TEMP,
|
|
ATTR_COLOR_TEMP_KELVIN,
|
|
ATTR_HS_COLOR,
|
|
ATTR_KELVIN,
|
|
ATTR_RGB_COLOR,
|
|
ATTR_XY_COLOR,
|
|
)
|
|
from homeassistant.config_entries import ConfigEntry
|
|
from homeassistant.core import HomeAssistant, callback
|
|
from homeassistant.helpers import device_registry as dr
|
|
import homeassistant.util.color as color_util
|
|
|
|
from .const import (
|
|
_LOGGER,
|
|
DEFAULT_ATTEMPTS,
|
|
DOMAIN,
|
|
INFRARED_BRIGHTNESS_VALUES_MAP,
|
|
OVERALL_TIMEOUT,
|
|
)
|
|
|
|
FIX_MAC_FW = AwesomeVersion("3.70")
|
|
|
|
|
|
@callback
|
|
def async_entry_is_legacy(entry: ConfigEntry) -> bool:
|
|
"""Check if a config entry is the legacy shared one."""
|
|
return entry.unique_id is None or entry.unique_id == DOMAIN
|
|
|
|
|
|
@callback
|
|
def async_get_legacy_entry(hass: HomeAssistant) -> ConfigEntry | None:
|
|
"""Get the legacy config entry."""
|
|
for entry in hass.config_entries.async_entries(DOMAIN):
|
|
if async_entry_is_legacy(entry):
|
|
return entry
|
|
return None
|
|
|
|
|
|
def infrared_brightness_value_to_option(value: int) -> str | None:
|
|
"""Convert infrared brightness from value to option."""
|
|
return INFRARED_BRIGHTNESS_VALUES_MAP.get(value, None)
|
|
|
|
|
|
def infrared_brightness_option_to_value(option: str) -> int | None:
|
|
"""Convert infrared brightness option to value."""
|
|
option_values = {v: k for k, v in INFRARED_BRIGHTNESS_VALUES_MAP.items()}
|
|
return option_values.get(option)
|
|
|
|
|
|
def convert_8_to_16(value: int) -> int:
|
|
"""Scale an 8 bit level into 16 bits."""
|
|
return (value << 8) | value
|
|
|
|
|
|
def convert_16_to_8(value: int) -> int:
|
|
"""Scale a 16 bit level into 8 bits."""
|
|
return value >> 8
|
|
|
|
|
|
def lifx_features(bulb: Light) -> dict[str, Any]:
|
|
"""Return a feature map for this bulb, or a default map if unknown."""
|
|
features: dict[str, Any] = (
|
|
products.features_map.get(bulb.product) or products.features_map[1]
|
|
)
|
|
return features
|
|
|
|
|
|
def find_hsbk(hass: HomeAssistant, **kwargs: Any) -> list[float | int | None] | None:
|
|
"""Find the desired color from a number of possible inputs.
|
|
|
|
Hue, Saturation, Brightness, Kelvin
|
|
"""
|
|
hue, saturation, brightness, kelvin = [None] * 4
|
|
|
|
if (color_name := kwargs.get(ATTR_COLOR_NAME)) is not None:
|
|
try:
|
|
hue, saturation = color_util.color_RGB_to_hs(
|
|
*color_util.color_name_to_rgb(color_name)
|
|
)
|
|
except ValueError:
|
|
_LOGGER.warning(
|
|
"Got unknown color %s, falling back to neutral white", color_name
|
|
)
|
|
hue, saturation = (0, 0)
|
|
|
|
if ATTR_HS_COLOR in kwargs:
|
|
hue, saturation = kwargs[ATTR_HS_COLOR]
|
|
elif ATTR_RGB_COLOR in kwargs:
|
|
hue, saturation = color_util.color_RGB_to_hs(*kwargs[ATTR_RGB_COLOR])
|
|
elif ATTR_XY_COLOR in kwargs:
|
|
hue, saturation = color_util.color_xy_to_hs(*kwargs[ATTR_XY_COLOR])
|
|
|
|
if hue is not None:
|
|
assert saturation is not None
|
|
hue = int(hue / 360 * 65535)
|
|
saturation = int(saturation / 100 * 65535)
|
|
kelvin = 3500
|
|
|
|
if ATTR_KELVIN in kwargs:
|
|
_LOGGER.warning(
|
|
"The 'kelvin' parameter is deprecated. Please use 'color_temp_kelvin' for"
|
|
" all service calls"
|
|
)
|
|
kelvin = kwargs.pop(ATTR_KELVIN)
|
|
saturation = 0
|
|
|
|
if ATTR_COLOR_TEMP in kwargs:
|
|
kelvin = color_util.color_temperature_mired_to_kelvin(
|
|
kwargs.pop(ATTR_COLOR_TEMP)
|
|
)
|
|
saturation = 0
|
|
|
|
if ATTR_COLOR_TEMP_KELVIN in kwargs:
|
|
kelvin = kwargs.pop(ATTR_COLOR_TEMP_KELVIN)
|
|
saturation = 0
|
|
|
|
if ATTR_BRIGHTNESS in kwargs:
|
|
brightness = convert_8_to_16(kwargs[ATTR_BRIGHTNESS])
|
|
|
|
if ATTR_BRIGHTNESS_PCT in kwargs:
|
|
brightness = convert_8_to_16(round(255 * kwargs[ATTR_BRIGHTNESS_PCT] / 100))
|
|
|
|
hsbk = [hue, saturation, brightness, kelvin]
|
|
return None if hsbk == [None] * 4 else hsbk
|
|
|
|
|
|
def merge_hsbk(
|
|
base: list[float | int | None], change: list[float | int | None]
|
|
) -> list[float | int | None]:
|
|
"""Copy change on top of base, except when None.
|
|
|
|
Hue, Saturation, Brightness, Kelvin
|
|
"""
|
|
return [b if c is None else c for b, c in zip(base, change, strict=False)]
|
|
|
|
|
|
def _get_mac_offset(mac_addr: str, offset: int) -> str:
|
|
octets = [int(octet, 16) for octet in mac_addr.split(":")]
|
|
octets[5] = (octets[5] + offset) % 256
|
|
return ":".join(f"{octet:02x}" for octet in octets)
|
|
|
|
|
|
def _off_by_one_mac(firmware: str) -> bool:
|
|
"""Check if the firmware version has the off by one mac."""
|
|
return bool(firmware and AwesomeVersion(firmware) >= FIX_MAC_FW)
|
|
|
|
|
|
def get_real_mac_addr(mac_addr: str, firmware: str) -> str:
|
|
"""Increment the last byte of the mac address by one for FW>3.70."""
|
|
return _get_mac_offset(mac_addr, 1) if _off_by_one_mac(firmware) else mac_addr
|
|
|
|
|
|
def formatted_serial(serial_number: str) -> str:
|
|
"""Format the serial number to match the HA device registry."""
|
|
return dr.format_mac(serial_number)
|
|
|
|
|
|
def mac_matches_serial_number(mac_addr: str, serial_number: str) -> bool:
|
|
"""Check if a mac address matches the serial number."""
|
|
formatted_mac = dr.format_mac(mac_addr)
|
|
return bool(
|
|
formatted_serial(serial_number) == formatted_mac
|
|
or _get_mac_offset(serial_number, 1) == formatted_mac
|
|
)
|
|
|
|
|
|
async def async_execute_lifx(method: Callable) -> Message:
|
|
"""Execute a lifx callback method and wait for a response."""
|
|
return (
|
|
await async_multi_execute_lifx_with_retries(
|
|
[method], DEFAULT_ATTEMPTS, OVERALL_TIMEOUT
|
|
)
|
|
)[0]
|
|
|
|
|
|
async def async_multi_execute_lifx_with_retries(
|
|
methods: list[Callable], attempts: int, overall_timeout: int
|
|
) -> list[Message]:
|
|
"""Execute multiple lifx callback methods with retries and wait for a response.
|
|
|
|
This functional will the overall timeout by the number of attempts and
|
|
wait for each method to return a result. If we don't get a result
|
|
within the split timeout, we will send all methods that did not generate
|
|
a response again.
|
|
|
|
If we don't get a result after all attempts, we will raise an
|
|
TimeoutError exception.
|
|
"""
|
|
loop = asyncio.get_running_loop()
|
|
futures: list[asyncio.Future] = [loop.create_future() for _ in methods]
|
|
|
|
def _callback(
|
|
bulb: Light, message: Message | None, future: asyncio.Future[Message]
|
|
) -> None:
|
|
if message and not future.done():
|
|
future.set_result(message)
|
|
|
|
timeout_per_attempt = overall_timeout / attempts
|
|
|
|
for _ in range(attempts):
|
|
for idx, method in enumerate(methods):
|
|
future = futures[idx]
|
|
if not future.done():
|
|
method(callb=partial(_callback, future=future))
|
|
|
|
_, pending = await asyncio.wait(futures, timeout=timeout_per_attempt)
|
|
if not pending:
|
|
break
|
|
|
|
results: list[Message] = []
|
|
failed: list[str] = []
|
|
for idx, future in enumerate(futures):
|
|
if not future.done() or not (result := future.result()):
|
|
method = methods[idx]
|
|
failed.append(str(getattr(method, "__name__", method)))
|
|
else:
|
|
results.append(result)
|
|
|
|
if failed:
|
|
failed_methods = ", ".join(failed)
|
|
raise TimeoutError(f"{failed_methods} timed out after {attempts} attempts")
|
|
|
|
return results
|