danVnest-home-assistant/custom_components/apparent_temperature/sensor.py

356 lines
11 KiB
Python

"""Sensor platform for apparent_temperature."""
import logging
import math
from collections.abc import Mapping
from typing import Any
import voluptuous as vol
from homeassistant.components.climate import (
ATTR_CURRENT_HUMIDITY,
ATTR_CURRENT_TEMPERATURE,
)
from homeassistant.components.climate import (
DOMAIN as CLIMATE_DOMAIN,
)
from homeassistant.components.group import expand_entity_ids
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
SensorStateClass,
)
from homeassistant.components.weather import (
ATTR_WEATHER_HUMIDITY,
ATTR_WEATHER_TEMPERATURE,
ATTR_WEATHER_TEMPERATURE_UNIT,
ATTR_WEATHER_WIND_SPEED,
ATTR_WEATHER_WIND_SPEED_UNIT,
)
from homeassistant.components.weather import (
DOMAIN as WEATHER_DOMAIN,
)
from homeassistant.const import (
ATTR_DEVICE_CLASS,
ATTR_UNIT_OF_MEASUREMENT,
CONF_NAME,
CONF_SOURCE,
CONF_UNIQUE_ID,
EVENT_HOMEASSISTANT_START,
PERCENTAGE,
STATE_UNAVAILABLE,
STATE_UNKNOWN,
UnitOfSpeed,
UnitOfTemperature,
)
from homeassistant.core import (
Event,
HomeAssistant,
State,
callback,
split_entity_id,
)
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType, UndefinedType
from homeassistant.util.unit_conversion import SpeedConverter, TemperatureConverter
from .const import (
ATTR_HUMIDITY_SOURCE,
ATTR_HUMIDITY_SOURCE_VALUE,
ATTR_TEMPERATURE_SOURCE,
ATTR_TEMPERATURE_SOURCE_VALUE,
ATTR_WIND_SPEED_SOURCE,
ATTR_WIND_SPEED_SOURCE_VALUE,
STARTUP_MESSAGE,
)
_LOGGER = logging.getLogger(__name__)
PLATFORM_SCHEMA = cv.PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_SOURCE): cv.entity_ids,
vol.Optional(CONF_NAME): cv.string,
vol.Optional(CONF_UNIQUE_ID): cv.string,
},
)
# pylint: disable=unused-argument
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None, # noqa: ARG001
) -> None:
"""Set up the Car Wash sensor."""
# Print startup message
_LOGGER.info(STARTUP_MESSAGE)
async_add_entities(
[
ApparentTemperatureSensor(
config.get(CONF_UNIQUE_ID),
config.get(CONF_NAME),
expand_entity_ids(hass, config.get(CONF_SOURCE)),
),
],
)
class ApparentTemperatureSensor(SensorEntity):
"""Apparent Temperature Sensor class."""
_attr_has_entity_name = True
_attr_icon = "mdi:thermometer-lines"
_attr_device_class = SensorDeviceClass.TEMPERATURE
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_should_poll = False
_attr_native_unit_of_measurement = UnitOfTemperature.CELSIUS
_attr_suggested_display_precision = 1
def __init__(
self,
unique_id: str | None,
name: str | None,
sources: list[str],
) -> None:
"""Class initialization."""
self._attr_unique_id = unique_id
self._attr_native_value = None
self._name = name
self._sources = sources
self._temp = None
self._humd = None
self._wind = None
self._temp_val = None
self._humd_val = None
self._wind_val = None
@staticmethod
def _compose_name(source_name: str) -> str:
"""Compose entity name based on source entity name."""
tpos = source_name.rfind("temperature")
return (
source_name + " Apparent Temperature"
if tpos < 0
else source_name[:tpos] + "Apparent " + source_name[tpos:]
)
@property
def name(self) -> str | UndefinedType | None:
"""Return the name of the sensor."""
if self._name:
return self._name
return self._compose_name(split_entity_id(self._sources[0])[1])
@property
def extra_state_attributes(self) -> Mapping[str, Any] | None:
"""Return entity specific state attributes."""
return {
ATTR_TEMPERATURE_SOURCE: self._temp,
ATTR_TEMPERATURE_SOURCE_VALUE: self._temp_val,
ATTR_HUMIDITY_SOURCE: self._humd,
ATTR_HUMIDITY_SOURCE_VALUE: self._humd_val,
ATTR_WIND_SPEED_SOURCE: self._wind,
ATTR_WIND_SPEED_SOURCE_VALUE: self._wind_val,
}
def _setup_sources(self) -> list[str]:
"""Set sources for entity and return list of sources to track."""
entities = set()
for entity_id in self._sources:
state: State = self.hass.states.get(entity_id)
domain = split_entity_id(state.entity_id)[0]
device_class = state.attributes.get(ATTR_DEVICE_CLASS)
unit_of_measurement = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
if domain == WEATHER_DOMAIN:
self._temp = entity_id
self._humd = entity_id
self._wind = entity_id
entities.add(entity_id)
elif domain == CLIMATE_DOMAIN:
self._temp = entity_id
self._humd = entity_id
entities.add(entity_id)
elif (
device_class == SensorDeviceClass.TEMPERATURE
or unit_of_measurement in UnitOfTemperature
):
self._temp = entity_id
entities.add(entity_id)
elif (
device_class == SensorDeviceClass.HUMIDITY
or unit_of_measurement == PERCENTAGE
):
self._humd = entity_id
entities.add(entity_id)
elif unit_of_measurement in UnitOfSpeed:
self._wind = entity_id
entities.add(entity_id)
elif entity_id.find("temperature") >= 0:
self._temp = entity_id
entities.add(entity_id)
elif entity_id.find("humidity") >= 0:
self._humd = entity_id
entities.add(entity_id)
elif entity_id.find("wind") >= 0:
self._wind = entity_id
entities.add(entity_id)
return list(entities)
async def async_added_to_hass(self) -> None:
"""Register callbacks."""
# pylint: disable=unused-argument
@callback
def sensor_state_listener(event: Event) -> None: # noqa: ARG001
"""Handle device state changes."""
self.async_schedule_update_ha_state(force_refresh=True)
# pylint: disable=unused-argument
@callback
def sensor_startup(event: Event) -> None: # noqa: ARG001
"""Update entity on startup."""
async_track_state_change_event(
self.hass,
self._setup_sources(),
sensor_state_listener,
)
self.async_schedule_update_ha_state(
force_refresh=True,
) # Force first update
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, sensor_startup)
@staticmethod
def _has_state(state: str | None) -> bool:
"""Return True if state has any value."""
return state is not None and state not in [
STATE_UNKNOWN,
STATE_UNAVAILABLE,
"None",
"",
]
def _get_temperature(self, entity_id: str | None) -> float | None:
"""Get temperature value (in °C) from entity."""
if entity_id is None:
return None
state: State = self.hass.states.get(entity_id)
if state is None:
return None
domain = split_entity_id(state.entity_id)[0]
if domain == WEATHER_DOMAIN:
temperature = state.attributes.get(ATTR_WEATHER_TEMPERATURE)
entity_unit = state.attributes.get(ATTR_WEATHER_TEMPERATURE_UNIT)
elif domain == CLIMATE_DOMAIN:
temperature = state.attributes.get(ATTR_CURRENT_TEMPERATURE)
entity_unit = state.attributes.get(ATTR_WEATHER_TEMPERATURE_UNIT)
else:
temperature = state.state
entity_unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
if not self._has_state(temperature):
return None
try:
temperature = TemperatureConverter.convert(
float(temperature),
entity_unit,
UnitOfTemperature.CELSIUS,
)
except ValueError:
_LOGGER.exception('Could not convert value "%s" to float', state)
return None
return float(temperature)
def _get_humidity(self, entity_id: str | None) -> float | None:
"""Get humidity value from entity."""
if entity_id is None:
return None
state: State = self.hass.states.get(entity_id)
if state is None:
return None
domain = split_entity_id(state.entity_id)[0]
if domain == WEATHER_DOMAIN:
humidity = state.attributes.get(ATTR_WEATHER_HUMIDITY)
elif domain == CLIMATE_DOMAIN:
humidity = state.attributes.get(ATTR_CURRENT_HUMIDITY)
else:
humidity = state.state
if not self._has_state(humidity):
return None
return float(humidity)
def _get_wind_speed(self, entity_id: str | None) -> float | None:
"""Get wind speed value from entity."""
if entity_id is None:
return 0.0
state: State = self.hass.states.get(entity_id)
if state is None:
return 0.0
domain = split_entity_id(state.entity_id)[0]
if domain == WEATHER_DOMAIN:
wind_speed = state.attributes.get(ATTR_WEATHER_WIND_SPEED)
entity_unit = state.attributes.get(ATTR_WEATHER_WIND_SPEED_UNIT)
else:
wind_speed = state.state
entity_unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
if not self._has_state(wind_speed):
return None
try:
wind_speed = SpeedConverter.convert(
float(wind_speed),
entity_unit,
UnitOfSpeed.METERS_PER_SECOND,
)
except ValueError:
_LOGGER.exception('Could not convert value "%s" to float', state)
return None
return float(wind_speed)
async def async_update(self) -> None:
"""Update sensor state."""
self._temp_val = temp = self._get_temperature(self._temp) # °C
self._humd_val = humd = self._get_humidity(self._humd) # %
self._wind_val = wind = self._get_wind_speed(self._wind) # m/s
_LOGGER.debug("Temp: %s °C Hum: %s %% Wind: %s m/s", temp, humd, wind)
if temp is None or humd is None:
_LOGGER.warning(
"Can't calculate sensor value: some sources are unavailable.",
)
self._attr_native_value = None
return
if wind is None:
_LOGGER.warning(
"Can't get wind speed value. Wind speed will be ignored in calculation.",
)
wind = 0
e_value = humd * 0.06105 * math.exp((17.27 * temp) / (237.7 + temp))
self._attr_native_value = temp + 0.33 * e_value - 0.7 * wind - 4
_LOGGER.debug(
"New sensor state is %s %s",
self._attr_native_value,
self._attr_native_unit_of_measurement,
)