core/homeassistant/components/command_line/sensor.py

247 lines
8.4 KiB
Python

"""Allows to configure custom shell commands to turn a value for a sensor."""
from __future__ import annotations
import asyncio
from collections.abc import Mapping
from datetime import datetime, timedelta
import json
from typing import Any, cast
from jsonpath import jsonpath
from homeassistant.components.sensor import SensorDeviceClass
from homeassistant.components.sensor.helpers import async_parse_date_datetime
from homeassistant.const import (
CONF_COMMAND,
CONF_NAME,
CONF_SCAN_INTERVAL,
CONF_VALUE_TEMPLATE,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import TemplateError
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.template import Template
from homeassistant.helpers.trigger_template_entity import ManualTriggerSensorEntity
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.util import dt as dt_util
from .const import (
CONF_COMMAND_TIMEOUT,
CONF_JSON_ATTRIBUTES,
CONF_JSON_ATTRIBUTES_PATH,
LOGGER,
TRIGGER_ENTITY_OPTIONS,
)
from .utils import async_check_output_or_log
DEFAULT_NAME = "Command Sensor"
SCAN_INTERVAL = timedelta(seconds=60)
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the Command Sensor."""
if not discovery_info:
return
discovery_info = cast(DiscoveryInfoType, discovery_info)
sensor_config = discovery_info
command: str = sensor_config[CONF_COMMAND]
command_timeout: int = sensor_config[CONF_COMMAND_TIMEOUT]
json_attributes: list[str] | None = sensor_config.get(CONF_JSON_ATTRIBUTES)
json_attributes_path: str | None = sensor_config.get(CONF_JSON_ATTRIBUTES_PATH)
scan_interval: timedelta = sensor_config.get(CONF_SCAN_INTERVAL, SCAN_INTERVAL)
value_template: Template | None = sensor_config.get(CONF_VALUE_TEMPLATE)
data = CommandSensorData(hass, command, command_timeout)
trigger_entity_config = {
CONF_NAME: Template(sensor_config[CONF_NAME], hass),
**{k: v for k, v in sensor_config.items() if k in TRIGGER_ENTITY_OPTIONS},
}
async_add_entities(
[
CommandSensor(
data,
trigger_entity_config,
value_template,
json_attributes,
json_attributes_path,
scan_interval,
)
]
)
class CommandSensor(ManualTriggerSensorEntity):
"""Representation of a sensor that is using shell commands."""
_attr_should_poll = False
def __init__(
self,
data: CommandSensorData,
config: ConfigType,
value_template: Template | None,
json_attributes: list[str] | None,
json_attributes_path: str | None,
scan_interval: timedelta,
) -> None:
"""Initialize the sensor."""
super().__init__(self.hass, config)
self.data = data
self._attr_extra_state_attributes: dict[str, Any] = {}
self._json_attributes = json_attributes
self._json_attributes_path = json_attributes_path
self._attr_native_value = None
self._value_template = value_template
self._scan_interval = scan_interval
self._process_updates: asyncio.Lock | None = None
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return extra state attributes."""
return self._attr_extra_state_attributes
async def async_added_to_hass(self) -> None:
"""Call when entity about to be added to hass."""
await super().async_added_to_hass()
await self._update_entity_state()
self.async_on_remove(
async_track_time_interval(
self.hass,
self._update_entity_state,
self._scan_interval,
name=f"Command Line Sensor - {self.name}",
cancel_on_shutdown=True,
),
)
async def _update_entity_state(self, now: datetime | None = None) -> None:
"""Update the state of the entity."""
if self._process_updates is None:
self._process_updates = asyncio.Lock()
if self._process_updates.locked():
LOGGER.warning(
"Updating Command Line Sensor %s took longer than the scheduled update interval %s",
self.name,
self._scan_interval,
)
return
async with self._process_updates:
await self._async_update()
async def _async_update(self) -> None:
"""Get the latest data and updates the state."""
await self.data.async_update()
value = self.data.value
if self._json_attributes:
self._attr_extra_state_attributes = {}
if value:
try:
json_dict = json.loads(value)
if self._json_attributes_path is not None:
json_dict = jsonpath(json_dict, self._json_attributes_path)
# jsonpath will always store the result in json_dict[0]
# so the next line happens to work exactly as needed to
# find the result
if isinstance(json_dict, list):
json_dict = json_dict[0]
if isinstance(json_dict, Mapping):
self._attr_extra_state_attributes = {
k: json_dict[k]
for k in self._json_attributes
if k in json_dict
}
else:
LOGGER.warning("JSON result was not a dictionary")
except ValueError:
LOGGER.warning("Unable to parse output as JSON: %s", value)
else:
LOGGER.warning("Empty reply found when expecting JSON data")
if self._value_template is None:
self._attr_native_value = None
self._process_manual_data(value)
return
self._attr_native_value = None
if self._value_template is not None and value is not None:
value = self._value_template.async_render_with_possible_json_value(
value,
None,
)
if self.device_class not in {
SensorDeviceClass.DATE,
SensorDeviceClass.TIMESTAMP,
}:
self._attr_native_value = value
elif value is not None:
self._attr_native_value = async_parse_date_datetime(
value, self.entity_id, self.device_class
)
self._process_manual_data(value)
self.async_write_ha_state()
async def async_update(self) -> None:
"""Update the entity.
Only used by the generic entity update service.
"""
await self._update_entity_state(dt_util.now())
class CommandSensorData:
"""The class for handling the data retrieval."""
def __init__(self, hass: HomeAssistant, command: str, command_timeout: int) -> None:
"""Initialize the data object."""
self.value: str | None = None
self.hass = hass
self.command = command
self.timeout = command_timeout
async def async_update(self) -> None:
"""Get the latest data with a shell command."""
command = self.command
if " " not in command:
prog = command
args = None
args_compiled = None
else:
prog, args = command.split(" ", 1)
args_compiled = Template(args, self.hass)
if args_compiled:
try:
args_to_render = {"arguments": args}
rendered_args = args_compiled.async_render(args_to_render)
except TemplateError as ex:
LOGGER.exception("Error rendering command template: %s", ex)
return
else:
rendered_args = None
if rendered_args == args:
# No template used. default behavior
pass
else:
# Template used. Construct the string used in the shell
command = f"{prog} {rendered_args}"
LOGGER.debug("Running command: %s", command)
self.value = await async_check_output_or_log(command, self.timeout)