mirror of https://github.com/home-assistant/core
354 lines
11 KiB
Python
354 lines
11 KiB
Python
"""InfluxDB component which allows you to get data from an Influx database."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import datetime
|
|
import logging
|
|
from typing import Final
|
|
|
|
import voluptuous as vol
|
|
|
|
from homeassistant.components.sensor import (
|
|
PLATFORM_SCHEMA as SENSOR_PLATFORM_SCHEMA,
|
|
SensorEntity,
|
|
)
|
|
from homeassistant.const import (
|
|
CONF_API_VERSION,
|
|
CONF_LANGUAGE,
|
|
CONF_NAME,
|
|
CONF_UNIQUE_ID,
|
|
CONF_UNIT_OF_MEASUREMENT,
|
|
CONF_VALUE_TEMPLATE,
|
|
EVENT_HOMEASSISTANT_STOP,
|
|
)
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.exceptions import PlatformNotReady, TemplateError
|
|
import homeassistant.helpers.config_validation as cv
|
|
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
|
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
|
from homeassistant.util import Throttle
|
|
|
|
from . import create_influx_url, get_influx_connection, validate_version_specific_config
|
|
from .const import (
|
|
API_VERSION_2,
|
|
COMPONENT_CONFIG_SCHEMA_CONNECTION,
|
|
CONF_BUCKET,
|
|
CONF_DB_NAME,
|
|
CONF_FIELD,
|
|
CONF_GROUP_FUNCTION,
|
|
CONF_IMPORTS,
|
|
CONF_MEASUREMENT_NAME,
|
|
CONF_QUERIES,
|
|
CONF_QUERIES_FLUX,
|
|
CONF_QUERY,
|
|
CONF_RANGE_START,
|
|
CONF_RANGE_STOP,
|
|
CONF_WHERE,
|
|
DEFAULT_API_VERSION,
|
|
DEFAULT_FIELD,
|
|
DEFAULT_FUNCTION_FLUX,
|
|
DEFAULT_GROUP_FUNCTION,
|
|
DEFAULT_RANGE_START,
|
|
DEFAULT_RANGE_STOP,
|
|
INFLUX_CONF_VALUE,
|
|
INFLUX_CONF_VALUE_V2,
|
|
LANGUAGE_FLUX,
|
|
LANGUAGE_INFLUXQL,
|
|
MIN_TIME_BETWEEN_UPDATES,
|
|
NO_BUCKET_ERROR,
|
|
NO_DATABASE_ERROR,
|
|
QUERY_MULTIPLE_RESULTS_MESSAGE,
|
|
QUERY_NO_RESULTS_MESSAGE,
|
|
RENDERING_QUERY_ERROR_MESSAGE,
|
|
RENDERING_QUERY_MESSAGE,
|
|
RENDERING_WHERE_ERROR_MESSAGE,
|
|
RENDERING_WHERE_MESSAGE,
|
|
RUNNING_QUERY_MESSAGE,
|
|
)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
SCAN_INTERVAL: Final = datetime.timedelta(seconds=60)
|
|
|
|
|
|
def _merge_connection_config_into_query(conf, query):
|
|
"""Merge connection details into each configured query."""
|
|
for key in conf:
|
|
if key not in query and key not in [CONF_QUERIES, CONF_QUERIES_FLUX]:
|
|
query[key] = conf[key]
|
|
|
|
|
|
def validate_query_format_for_version(conf: dict) -> dict:
|
|
"""Ensure queries are provided in correct format based on API version."""
|
|
if conf[CONF_API_VERSION] == API_VERSION_2:
|
|
if CONF_QUERIES_FLUX not in conf:
|
|
raise vol.Invalid(
|
|
f"{CONF_QUERIES_FLUX} is required when {CONF_API_VERSION} is"
|
|
f" {API_VERSION_2}"
|
|
)
|
|
|
|
for query in conf[CONF_QUERIES_FLUX]:
|
|
_merge_connection_config_into_query(conf, query)
|
|
query[CONF_LANGUAGE] = LANGUAGE_FLUX
|
|
|
|
del conf[CONF_BUCKET]
|
|
|
|
else:
|
|
if CONF_QUERIES not in conf:
|
|
raise vol.Invalid(
|
|
f"{CONF_QUERIES} is required when {CONF_API_VERSION} is"
|
|
f" {DEFAULT_API_VERSION}"
|
|
)
|
|
|
|
for query in conf[CONF_QUERIES]:
|
|
_merge_connection_config_into_query(conf, query)
|
|
query[CONF_LANGUAGE] = LANGUAGE_INFLUXQL
|
|
|
|
del conf[CONF_DB_NAME]
|
|
|
|
return conf
|
|
|
|
|
|
_QUERY_SENSOR_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Required(CONF_NAME): cv.string,
|
|
vol.Optional(CONF_UNIQUE_ID): cv.string,
|
|
vol.Optional(CONF_VALUE_TEMPLATE): cv.template,
|
|
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
|
|
}
|
|
)
|
|
|
|
_QUERY_SCHEMA = {
|
|
LANGUAGE_INFLUXQL: _QUERY_SENSOR_SCHEMA.extend(
|
|
{
|
|
vol.Optional(CONF_DB_NAME): cv.string,
|
|
vol.Required(CONF_MEASUREMENT_NAME): cv.string,
|
|
vol.Optional(
|
|
CONF_GROUP_FUNCTION, default=DEFAULT_GROUP_FUNCTION
|
|
): cv.string,
|
|
vol.Optional(CONF_FIELD, default=DEFAULT_FIELD): cv.string,
|
|
vol.Required(CONF_WHERE): cv.template,
|
|
}
|
|
),
|
|
LANGUAGE_FLUX: _QUERY_SENSOR_SCHEMA.extend(
|
|
{
|
|
vol.Optional(CONF_BUCKET): cv.string,
|
|
vol.Optional(CONF_RANGE_START, default=DEFAULT_RANGE_START): cv.string,
|
|
vol.Optional(CONF_RANGE_STOP, default=DEFAULT_RANGE_STOP): cv.string,
|
|
vol.Required(CONF_QUERY): cv.template,
|
|
vol.Optional(CONF_IMPORTS): vol.All(cv.ensure_list, [cv.string]),
|
|
vol.Optional(CONF_GROUP_FUNCTION): cv.string,
|
|
}
|
|
),
|
|
}
|
|
|
|
PLATFORM_SCHEMA = vol.All(
|
|
SENSOR_PLATFORM_SCHEMA.extend(COMPONENT_CONFIG_SCHEMA_CONNECTION).extend(
|
|
{
|
|
vol.Exclusive(CONF_QUERIES, "queries"): [_QUERY_SCHEMA[LANGUAGE_INFLUXQL]],
|
|
vol.Exclusive(CONF_QUERIES_FLUX, "queries"): [_QUERY_SCHEMA[LANGUAGE_FLUX]],
|
|
}
|
|
),
|
|
validate_version_specific_config,
|
|
validate_query_format_for_version,
|
|
create_influx_url,
|
|
)
|
|
|
|
|
|
def setup_platform(
|
|
hass: HomeAssistant,
|
|
config: ConfigType,
|
|
add_entities: AddEntitiesCallback,
|
|
discovery_info: DiscoveryInfoType | None = None,
|
|
) -> None:
|
|
"""Set up the InfluxDB component."""
|
|
try:
|
|
influx = get_influx_connection(config, test_read=True)
|
|
except ConnectionError as exc:
|
|
_LOGGER.error(exc)
|
|
raise PlatformNotReady from exc
|
|
|
|
entities = []
|
|
if CONF_QUERIES_FLUX in config:
|
|
for query in config[CONF_QUERIES_FLUX]:
|
|
if query[CONF_BUCKET] in influx.data_repositories:
|
|
entities.append(InfluxSensor(hass, influx, query))
|
|
else:
|
|
_LOGGER.error(NO_BUCKET_ERROR, query[CONF_BUCKET])
|
|
else:
|
|
for query in config[CONF_QUERIES]:
|
|
if query[CONF_DB_NAME] in influx.data_repositories:
|
|
entities.append(InfluxSensor(hass, influx, query))
|
|
else:
|
|
_LOGGER.error(NO_DATABASE_ERROR, query[CONF_DB_NAME])
|
|
|
|
add_entities(entities, update_before_add=True)
|
|
|
|
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, lambda _: influx.close())
|
|
|
|
|
|
class InfluxSensor(SensorEntity):
|
|
"""Implementation of a Influxdb sensor."""
|
|
|
|
def __init__(self, hass, influx, query):
|
|
"""Initialize the sensor."""
|
|
self._name = query.get(CONF_NAME)
|
|
self._unit_of_measurement = query.get(CONF_UNIT_OF_MEASUREMENT)
|
|
self._value_template = query.get(CONF_VALUE_TEMPLATE)
|
|
self._state = None
|
|
self._hass = hass
|
|
self._attr_unique_id = query.get(CONF_UNIQUE_ID)
|
|
|
|
if query[CONF_LANGUAGE] == LANGUAGE_FLUX:
|
|
self.data = InfluxFluxSensorData(
|
|
influx,
|
|
query.get(CONF_BUCKET),
|
|
query.get(CONF_RANGE_START),
|
|
query.get(CONF_RANGE_STOP),
|
|
query.get(CONF_QUERY),
|
|
query.get(CONF_IMPORTS),
|
|
query.get(CONF_GROUP_FUNCTION),
|
|
)
|
|
|
|
else:
|
|
self.data = InfluxQLSensorData(
|
|
influx,
|
|
query.get(CONF_DB_NAME),
|
|
query.get(CONF_GROUP_FUNCTION),
|
|
query.get(CONF_FIELD),
|
|
query.get(CONF_MEASUREMENT_NAME),
|
|
query.get(CONF_WHERE),
|
|
)
|
|
|
|
@property
|
|
def name(self):
|
|
"""Return the name of the sensor."""
|
|
return self._name
|
|
|
|
@property
|
|
def native_value(self):
|
|
"""Return the state of the sensor."""
|
|
return self._state
|
|
|
|
@property
|
|
def native_unit_of_measurement(self):
|
|
"""Return the unit of measurement of this entity, if any."""
|
|
return self._unit_of_measurement
|
|
|
|
def update(self) -> None:
|
|
"""Get the latest data from Influxdb and updates the states."""
|
|
self.data.update()
|
|
if (value := self.data.value) is None:
|
|
value = None
|
|
if self._value_template is not None:
|
|
value = self._value_template.render_with_possible_json_value(
|
|
str(value), None
|
|
)
|
|
|
|
self._state = value
|
|
|
|
|
|
class InfluxFluxSensorData:
|
|
"""Class for handling the data retrieval from Influx with Flux query."""
|
|
|
|
def __init__(self, influx, bucket, range_start, range_stop, query, imports, group):
|
|
"""Initialize the data object."""
|
|
self.influx = influx
|
|
self.bucket = bucket
|
|
self.range_start = range_start
|
|
self.range_stop = range_stop
|
|
self.query = query
|
|
self.imports = imports
|
|
self.group = group
|
|
self.value = None
|
|
self.full_query = None
|
|
|
|
self.query_prefix = (
|
|
f'from(bucket:"{bucket}") |> range(start: {range_start}, stop:'
|
|
f" {range_stop}) |>"
|
|
)
|
|
if imports is not None:
|
|
for i in imports:
|
|
self.query_prefix = f'import "{i}" {self.query_prefix}'
|
|
|
|
if group is None:
|
|
self.query_postfix = DEFAULT_FUNCTION_FLUX
|
|
else:
|
|
self.query_postfix = f'|> {group}(column: "{INFLUX_CONF_VALUE_V2}")'
|
|
|
|
@Throttle(MIN_TIME_BETWEEN_UPDATES)
|
|
def update(self):
|
|
"""Get the latest data by querying influx."""
|
|
_LOGGER.debug(RENDERING_QUERY_MESSAGE, self.query)
|
|
try:
|
|
rendered_query = self.query.render(parse_result=False)
|
|
except TemplateError as ex:
|
|
_LOGGER.error(RENDERING_QUERY_ERROR_MESSAGE, ex)
|
|
return
|
|
|
|
self.full_query = f"{self.query_prefix} {rendered_query} {self.query_postfix}"
|
|
|
|
_LOGGER.debug(RUNNING_QUERY_MESSAGE, self.full_query)
|
|
|
|
try:
|
|
tables = self.influx.query(self.full_query)
|
|
except (ConnectionError, ValueError) as exc:
|
|
_LOGGER.error(exc)
|
|
self.value = None
|
|
return
|
|
|
|
if not tables:
|
|
_LOGGER.warning(QUERY_NO_RESULTS_MESSAGE, self.full_query)
|
|
self.value = None
|
|
else:
|
|
if len(tables) > 1 or len(tables[0].records) > 1:
|
|
_LOGGER.warning(QUERY_MULTIPLE_RESULTS_MESSAGE, self.full_query)
|
|
self.value = tables[0].records[0].values[INFLUX_CONF_VALUE_V2]
|
|
|
|
|
|
class InfluxQLSensorData:
|
|
"""Class for handling the data retrieval with v1 API."""
|
|
|
|
def __init__(self, influx, db_name, group, field, measurement, where):
|
|
"""Initialize the data object."""
|
|
self.influx = influx
|
|
self.db_name = db_name
|
|
self.group = group
|
|
self.field = field
|
|
self.measurement = measurement
|
|
self.where = where
|
|
self.value = None
|
|
self.query = None
|
|
|
|
@Throttle(MIN_TIME_BETWEEN_UPDATES)
|
|
def update(self):
|
|
"""Get the latest data with a shell command."""
|
|
_LOGGER.debug(RENDERING_WHERE_MESSAGE, self.where)
|
|
try:
|
|
where_clause = self.where.render(parse_result=False)
|
|
except TemplateError as ex:
|
|
_LOGGER.error(RENDERING_WHERE_ERROR_MESSAGE, ex)
|
|
return
|
|
|
|
self.query = (
|
|
f"select {self.group}({self.field}) as {INFLUX_CONF_VALUE} from" # noqa: S608
|
|
f" {self.measurement} where {where_clause}"
|
|
)
|
|
|
|
_LOGGER.debug(RUNNING_QUERY_MESSAGE, self.query)
|
|
|
|
try:
|
|
points = self.influx.query(self.query, self.db_name)
|
|
except (ConnectionError, ValueError) as exc:
|
|
_LOGGER.error(exc)
|
|
self.value = None
|
|
return
|
|
|
|
if not points:
|
|
_LOGGER.warning(QUERY_NO_RESULTS_MESSAGE, self.query)
|
|
self.value = None
|
|
else:
|
|
if len(points) > 1:
|
|
_LOGGER.warning(QUERY_MULTIPLE_RESULTS_MESSAGE, self.query)
|
|
self.value = points[0].get(INFLUX_CONF_VALUE)
|