core/homeassistant/components/compensation/__init__.py

136 lines
4.0 KiB
Python

"""The Compensation integration."""
import logging
from operator import itemgetter
import numpy as np
import voluptuous as vol
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
from homeassistant.const import (
CONF_ATTRIBUTE,
CONF_MAXIMUM,
CONF_MINIMUM,
CONF_SOURCE,
CONF_UNIQUE_ID,
CONF_UNIT_OF_MEASUREMENT,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.discovery import async_load_platform
from homeassistant.helpers.typing import ConfigType
from .const import (
CONF_COMPENSATION,
CONF_DATAPOINTS,
CONF_DEGREE,
CONF_LOWER_LIMIT,
CONF_POLYNOMIAL,
CONF_PRECISION,
CONF_UPPER_LIMIT,
DATA_COMPENSATION,
DEFAULT_DEGREE,
DEFAULT_PRECISION,
DOMAIN,
)
_LOGGER = logging.getLogger(__name__)
def datapoints_greater_than_degree(value: dict) -> dict:
"""Validate data point list is greater than polynomial degrees."""
if len(value[CONF_DATAPOINTS]) <= value[CONF_DEGREE]:
raise vol.Invalid(
f"{CONF_DATAPOINTS} must have at least"
f" {value[CONF_DEGREE]+1} {CONF_DATAPOINTS}"
)
return value
COMPENSATION_SCHEMA = vol.Schema(
{
vol.Required(CONF_SOURCE): cv.entity_id,
vol.Required(CONF_DATAPOINTS): [
vol.ExactSequence([vol.Coerce(float), vol.Coerce(float)])
],
vol.Optional(CONF_UNIQUE_ID): cv.string,
vol.Optional(CONF_ATTRIBUTE): cv.string,
vol.Optional(CONF_UPPER_LIMIT, default=False): cv.boolean,
vol.Optional(CONF_LOWER_LIMIT, default=False): cv.boolean,
vol.Optional(CONF_PRECISION, default=DEFAULT_PRECISION): cv.positive_int,
vol.Optional(CONF_DEGREE, default=DEFAULT_DEGREE): vol.All(
vol.Coerce(int),
vol.Range(min=1, max=7),
),
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
}
)
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{cv.slug: vol.All(COMPENSATION_SCHEMA, datapoints_greater_than_degree)}
)
},
extra=vol.ALLOW_EXTRA,
)
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Compensation sensor."""
hass.data[DATA_COMPENSATION] = {}
for compensation, conf in config[DOMAIN].items():
_LOGGER.debug("Setup %s.%s", DOMAIN, compensation)
degree = conf[CONF_DEGREE]
initial_coefficients: list[tuple[float, float]] = conf[CONF_DATAPOINTS]
sorted_coefficients = sorted(initial_coefficients, key=itemgetter(0))
# get x values and y values from the x,y point pairs
x_values, y_values = zip(*initial_coefficients, strict=False)
# try to get valid coefficients for a polynomial
coefficients = None
with np.errstate(all="raise"):
try:
coefficients = np.polyfit(x_values, y_values, degree)
except FloatingPointError as error:
_LOGGER.error(
"Setup of %s encountered an error, %s",
compensation,
error,
)
if coefficients is not None:
data = {
k: v for k, v in conf.items() if k not in [CONF_DEGREE, CONF_DATAPOINTS]
}
data[CONF_POLYNOMIAL] = np.poly1d(coefficients)
if data[CONF_LOWER_LIMIT]:
data[CONF_MINIMUM] = sorted_coefficients[0]
else:
data[CONF_MINIMUM] = None
if data[CONF_UPPER_LIMIT]:
data[CONF_MAXIMUM] = sorted_coefficients[-1]
else:
data[CONF_MAXIMUM] = None
hass.data[DATA_COMPENSATION][compensation] = data
hass.async_create_task(
async_load_platform(
hass,
SENSOR_DOMAIN,
DOMAIN,
{CONF_COMPENSATION: compensation},
config,
)
)
return True