core/homeassistant/components/lifx/util.py

242 lines
7.5 KiB
Python

"""Support for LIFX."""
from __future__ import annotations
import asyncio
from collections.abc import Callable
from functools import partial
from typing import Any
from aiolifx import products
from aiolifx.aiolifx import Light
from aiolifx.message import Message
from awesomeversion import AwesomeVersion
from homeassistant.components.light import (
ATTR_BRIGHTNESS,
ATTR_BRIGHTNESS_PCT,
ATTR_COLOR_NAME,
ATTR_COLOR_TEMP,
ATTR_COLOR_TEMP_KELVIN,
ATTR_HS_COLOR,
ATTR_KELVIN,
ATTR_RGB_COLOR,
ATTR_XY_COLOR,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import device_registry as dr
import homeassistant.util.color as color_util
from .const import (
_LOGGER,
DEFAULT_ATTEMPTS,
DOMAIN,
INFRARED_BRIGHTNESS_VALUES_MAP,
OVERALL_TIMEOUT,
)
FIX_MAC_FW = AwesomeVersion("3.70")
@callback
def async_entry_is_legacy(entry: ConfigEntry) -> bool:
"""Check if a config entry is the legacy shared one."""
return entry.unique_id is None or entry.unique_id == DOMAIN
@callback
def async_get_legacy_entry(hass: HomeAssistant) -> ConfigEntry | None:
"""Get the legacy config entry."""
for entry in hass.config_entries.async_entries(DOMAIN):
if async_entry_is_legacy(entry):
return entry
return None
def infrared_brightness_value_to_option(value: int) -> str | None:
"""Convert infrared brightness from value to option."""
return INFRARED_BRIGHTNESS_VALUES_MAP.get(value, None)
def infrared_brightness_option_to_value(option: str) -> int | None:
"""Convert infrared brightness option to value."""
option_values = {v: k for k, v in INFRARED_BRIGHTNESS_VALUES_MAP.items()}
return option_values.get(option)
def convert_8_to_16(value: int) -> int:
"""Scale an 8 bit level into 16 bits."""
return (value << 8) | value
def convert_16_to_8(value: int) -> int:
"""Scale a 16 bit level into 8 bits."""
return value >> 8
def lifx_features(bulb: Light) -> dict[str, Any]:
"""Return a feature map for this bulb, or a default map if unknown."""
features: dict[str, Any] = (
products.features_map.get(bulb.product) or products.features_map[1]
)
return features
def find_hsbk(hass: HomeAssistant, **kwargs: Any) -> list[float | int | None] | None:
"""Find the desired color from a number of possible inputs.
Hue, Saturation, Brightness, Kelvin
"""
hue, saturation, brightness, kelvin = [None] * 4
if (color_name := kwargs.get(ATTR_COLOR_NAME)) is not None:
try:
hue, saturation = color_util.color_RGB_to_hs(
*color_util.color_name_to_rgb(color_name)
)
except ValueError:
_LOGGER.warning(
"Got unknown color %s, falling back to neutral white", color_name
)
hue, saturation = (0, 0)
if ATTR_HS_COLOR in kwargs:
hue, saturation = kwargs[ATTR_HS_COLOR]
elif ATTR_RGB_COLOR in kwargs:
hue, saturation = color_util.color_RGB_to_hs(*kwargs[ATTR_RGB_COLOR])
elif ATTR_XY_COLOR in kwargs:
hue, saturation = color_util.color_xy_to_hs(*kwargs[ATTR_XY_COLOR])
if hue is not None:
assert saturation is not None
hue = int(hue / 360 * 65535)
saturation = int(saturation / 100 * 65535)
kelvin = 3500
if ATTR_KELVIN in kwargs:
_LOGGER.warning(
"The 'kelvin' parameter is deprecated. Please use 'color_temp_kelvin' for"
" all service calls"
)
kelvin = kwargs.pop(ATTR_KELVIN)
saturation = 0
if ATTR_COLOR_TEMP in kwargs:
kelvin = color_util.color_temperature_mired_to_kelvin(
kwargs.pop(ATTR_COLOR_TEMP)
)
saturation = 0
if ATTR_COLOR_TEMP_KELVIN in kwargs:
kelvin = kwargs.pop(ATTR_COLOR_TEMP_KELVIN)
saturation = 0
if ATTR_BRIGHTNESS in kwargs:
brightness = convert_8_to_16(kwargs[ATTR_BRIGHTNESS])
if ATTR_BRIGHTNESS_PCT in kwargs:
brightness = convert_8_to_16(round(255 * kwargs[ATTR_BRIGHTNESS_PCT] / 100))
hsbk = [hue, saturation, brightness, kelvin]
return None if hsbk == [None] * 4 else hsbk
def merge_hsbk(
base: list[float | int | None], change: list[float | int | None]
) -> list[float | int | None]:
"""Copy change on top of base, except when None.
Hue, Saturation, Brightness, Kelvin
"""
return [b if c is None else c for b, c in zip(base, change, strict=False)]
def _get_mac_offset(mac_addr: str, offset: int) -> str:
octets = [int(octet, 16) for octet in mac_addr.split(":")]
octets[5] = (octets[5] + offset) % 256
return ":".join(f"{octet:02x}" for octet in octets)
def _off_by_one_mac(firmware: str) -> bool:
"""Check if the firmware version has the off by one mac."""
return bool(firmware and AwesomeVersion(firmware) >= FIX_MAC_FW)
def get_real_mac_addr(mac_addr: str, firmware: str) -> str:
"""Increment the last byte of the mac address by one for FW>3.70."""
return _get_mac_offset(mac_addr, 1) if _off_by_one_mac(firmware) else mac_addr
def formatted_serial(serial_number: str) -> str:
"""Format the serial number to match the HA device registry."""
return dr.format_mac(serial_number)
def mac_matches_serial_number(mac_addr: str, serial_number: str) -> bool:
"""Check if a mac address matches the serial number."""
formatted_mac = dr.format_mac(mac_addr)
return bool(
formatted_serial(serial_number) == formatted_mac
or _get_mac_offset(serial_number, 1) == formatted_mac
)
async def async_execute_lifx(method: Callable) -> Message:
"""Execute a lifx callback method and wait for a response."""
return (
await async_multi_execute_lifx_with_retries(
[method], DEFAULT_ATTEMPTS, OVERALL_TIMEOUT
)
)[0]
async def async_multi_execute_lifx_with_retries(
methods: list[Callable], attempts: int, overall_timeout: int
) -> list[Message]:
"""Execute multiple lifx callback methods with retries and wait for a response.
This functional will the overall timeout by the number of attempts and
wait for each method to return a result. If we don't get a result
within the split timeout, we will send all methods that did not generate
a response again.
If we don't get a result after all attempts, we will raise an
TimeoutError exception.
"""
loop = asyncio.get_running_loop()
futures: list[asyncio.Future] = [loop.create_future() for _ in methods]
def _callback(
bulb: Light, message: Message | None, future: asyncio.Future[Message]
) -> None:
if message and not future.done():
future.set_result(message)
timeout_per_attempt = overall_timeout / attempts
for _ in range(attempts):
for idx, method in enumerate(methods):
future = futures[idx]
if not future.done():
method(callb=partial(_callback, future=future))
_, pending = await asyncio.wait(futures, timeout=timeout_per_attempt)
if not pending:
break
results: list[Message] = []
failed: list[str] = []
for idx, future in enumerate(futures):
if not future.done() or not (result := future.result()):
method = methods[idx]
failed.append(str(getattr(method, "__name__", method)))
else:
results.append(result)
if failed:
failed_methods = ", ".join(failed)
raise TimeoutError(f"{failed_methods} timed out after {attempts} attempts")
return results