core/homeassistant/components/konnected/__init__.py

444 lines
14 KiB
Python

"""Support for Konnected devices."""
import copy
import hmac
from http import HTTPStatus
import json
import logging
from aiohttp.hdrs import AUTHORIZATION
from aiohttp.web import Request, Response
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components.binary_sensor import DEVICE_CLASSES_SCHEMA
from homeassistant.components.http import KEY_HASS, HomeAssistantView
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
ATTR_ENTITY_ID,
CONF_ACCESS_TOKEN,
CONF_BINARY_SENSORS,
CONF_DEVICES,
CONF_DISCOVERY,
CONF_HOST,
CONF_ID,
CONF_NAME,
CONF_PIN,
CONF_PORT,
CONF_REPEAT,
CONF_SENSORS,
CONF_SWITCHES,
CONF_TYPE,
CONF_ZONE,
STATE_OFF,
STATE_ON,
Platform,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.typing import ConfigType
from .config_flow import ( # Loading the config flow file will register the flow
CONF_DEFAULT_OPTIONS,
CONF_IO,
CONF_IO_BIN,
CONF_IO_DIG,
CONF_IO_SWI,
OPTIONS_SCHEMA,
)
from .const import (
CONF_ACTIVATION,
CONF_API_HOST,
CONF_BLINK,
CONF_INVERSE,
CONF_MOMENTARY,
CONF_PAUSE,
CONF_POLL_INTERVAL,
DOMAIN,
PIN_TO_ZONE,
STATE_HIGH,
STATE_LOW,
UNDO_UPDATE_LISTENER,
UPDATE_ENDPOINT,
ZONE_TO_PIN,
ZONES,
)
from .handlers import HANDLERS
from .panel import AlarmPanel
_LOGGER = logging.getLogger(__name__)
def ensure_pin(value):
"""Check if valid pin and coerce to string."""
if value is None:
raise vol.Invalid("pin value is None")
if PIN_TO_ZONE.get(str(value)) is None:
raise vol.Invalid("pin not valid")
return str(value)
def ensure_zone(value):
"""Check if valid zone and coerce to string."""
if value is None:
raise vol.Invalid("zone value is None")
if str(value) not in ZONES:
raise vol.Invalid("zone not valid")
return str(value)
def import_device_validator(config):
"""Validate zones and reformat for import."""
config = copy.deepcopy(config)
io_cfgs = {}
# Replace pins with zones
for conf_platform, conf_io in (
(CONF_BINARY_SENSORS, CONF_IO_BIN),
(CONF_SENSORS, CONF_IO_DIG),
(CONF_SWITCHES, CONF_IO_SWI),
):
for zone in config.get(conf_platform, []):
if zone.get(CONF_PIN):
zone[CONF_ZONE] = PIN_TO_ZONE[zone[CONF_PIN]]
del zone[CONF_PIN]
io_cfgs[zone[CONF_ZONE]] = conf_io
# Migrate config_entry data into default_options structure
config[CONF_IO] = io_cfgs
config[CONF_DEFAULT_OPTIONS] = OPTIONS_SCHEMA(config)
# clean up fields migrated to options
config.pop(CONF_BINARY_SENSORS, None)
config.pop(CONF_SENSORS, None)
config.pop(CONF_SWITCHES, None)
config.pop(CONF_BLINK, None)
config.pop(CONF_DISCOVERY, None)
config.pop(CONF_API_HOST, None)
config.pop(CONF_IO, None)
return config
def import_validator(config):
"""Reformat for import."""
config = copy.deepcopy(config)
# push api_host into device configs
for device in config.get(CONF_DEVICES, []):
device[CONF_API_HOST] = config.get(CONF_API_HOST, "")
return config
# configuration.yaml schemas (legacy)
BINARY_SENSOR_SCHEMA_YAML = vol.All(
vol.Schema(
{
vol.Exclusive(CONF_ZONE, "s_io"): ensure_zone,
vol.Exclusive(CONF_PIN, "s_io"): ensure_pin,
vol.Required(CONF_TYPE): DEVICE_CLASSES_SCHEMA,
vol.Optional(CONF_NAME): cv.string,
vol.Optional(CONF_INVERSE, default=False): cv.boolean,
}
),
cv.has_at_least_one_key(CONF_PIN, CONF_ZONE),
)
SENSOR_SCHEMA_YAML = vol.All(
vol.Schema(
{
vol.Exclusive(CONF_ZONE, "s_io"): ensure_zone,
vol.Exclusive(CONF_PIN, "s_io"): ensure_pin,
vol.Required(CONF_TYPE): vol.All(vol.Lower, vol.In(["dht", "ds18b20"])),
vol.Optional(CONF_NAME): cv.string,
vol.Optional(CONF_POLL_INTERVAL, default=3): vol.All(
vol.Coerce(int), vol.Range(min=1)
),
}
),
cv.has_at_least_one_key(CONF_PIN, CONF_ZONE),
)
SWITCH_SCHEMA_YAML = vol.All(
vol.Schema(
{
vol.Exclusive(CONF_ZONE, "s_io"): ensure_zone,
vol.Exclusive(CONF_PIN, "s_io"): ensure_pin,
vol.Optional(CONF_NAME): cv.string,
vol.Optional(CONF_ACTIVATION, default=STATE_HIGH): vol.All(
vol.Lower, vol.Any(STATE_HIGH, STATE_LOW)
),
vol.Optional(CONF_MOMENTARY): vol.All(vol.Coerce(int), vol.Range(min=10)),
vol.Optional(CONF_PAUSE): vol.All(vol.Coerce(int), vol.Range(min=10)),
vol.Optional(CONF_REPEAT): vol.All(vol.Coerce(int), vol.Range(min=-1)),
}
),
cv.has_at_least_one_key(CONF_PIN, CONF_ZONE),
)
DEVICE_SCHEMA_YAML = vol.All(
vol.Schema(
{
vol.Required(CONF_ID): cv.matches_regex("[0-9a-f]{12}"),
vol.Optional(CONF_BINARY_SENSORS): vol.All(
cv.ensure_list, [BINARY_SENSOR_SCHEMA_YAML]
),
vol.Optional(CONF_SENSORS): vol.All(cv.ensure_list, [SENSOR_SCHEMA_YAML]),
vol.Optional(CONF_SWITCHES): vol.All(cv.ensure_list, [SWITCH_SCHEMA_YAML]),
vol.Inclusive(CONF_HOST, "host_info"): cv.string,
vol.Inclusive(CONF_PORT, "host_info"): cv.port,
vol.Optional(CONF_BLINK, default=True): cv.boolean,
vol.Optional(CONF_API_HOST, default=""): vol.Any("", cv.url),
vol.Optional(CONF_DISCOVERY, default=True): cv.boolean,
}
),
import_device_validator,
)
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.All(
import_validator,
vol.Schema(
{
vol.Required(CONF_ACCESS_TOKEN): cv.string,
vol.Optional(CONF_API_HOST): vol.Url(),
vol.Optional(CONF_DEVICES): vol.All(
cv.ensure_list, [DEVICE_SCHEMA_YAML]
),
}
),
)
},
extra=vol.ALLOW_EXTRA,
)
YAML_CONFIGS = "yaml_configs"
PLATFORMS = [Platform.BINARY_SENSOR, Platform.SENSOR, Platform.SWITCH]
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Konnected platform."""
if (cfg := config.get(DOMAIN)) is None:
cfg = {}
if DOMAIN not in hass.data:
hass.data[DOMAIN] = {
CONF_ACCESS_TOKEN: cfg.get(CONF_ACCESS_TOKEN),
CONF_API_HOST: cfg.get(CONF_API_HOST),
CONF_DEVICES: {},
}
hass.http.register_view(KonnectedView)
# Check if they have yaml configured devices
if CONF_DEVICES not in cfg:
return True
for device in cfg.get(CONF_DEVICES, []):
# Attempt to importing the cfg. Use
# hass.async_add_job to avoid a deadlock.
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_IMPORT}, data=device
)
)
return True
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up panel from a config entry."""
client = AlarmPanel(hass, entry)
# creates a panel data store in hass.data[DOMAIN][CONF_DEVICES]
await client.async_save_data()
# if the cfg entry was created we know we could connect to the panel at some point
# async_connect will handle retries until it establishes a connection
await client.async_connect()
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
# config entry specific data to enable unload
hass.data[DOMAIN][entry.entry_id] = {
UNDO_UPDATE_LISTENER: entry.add_update_listener(async_entry_updated)
}
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
hass.data[DOMAIN][entry.entry_id][UNDO_UPDATE_LISTENER]()
if unload_ok:
hass.data[DOMAIN][CONF_DEVICES].pop(entry.data[CONF_ID])
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok
async def async_entry_updated(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Reload the config entry when options change."""
await hass.config_entries.async_reload(entry.entry_id)
class KonnectedView(HomeAssistantView):
"""View creates an endpoint to receive push updates from the device."""
url = UPDATE_ENDPOINT
name = "api:konnected"
requires_auth = False # Uses access token from configuration
def __init__(self) -> None:
"""Initialize the view."""
@staticmethod
def binary_value(state, activation):
"""Return binary value for GPIO based on state and activation."""
if activation == STATE_HIGH:
return 1 if state == STATE_ON else 0
return 0 if state == STATE_ON else 1
async def update_sensor(self, request: Request, device_id) -> Response:
"""Process a put or post."""
hass = request.app[KEY_HASS]
data = hass.data[DOMAIN]
auth = request.headers.get(AUTHORIZATION)
tokens = []
if hass.data[DOMAIN].get(CONF_ACCESS_TOKEN):
tokens.extend([hass.data[DOMAIN][CONF_ACCESS_TOKEN]])
tokens.extend(
[
entry.data[CONF_ACCESS_TOKEN]
for entry in hass.config_entries.async_entries(DOMAIN)
if entry.data.get(CONF_ACCESS_TOKEN)
]
)
if auth is None or not next(
(True for token in tokens if hmac.compare_digest(f"Bearer {token}", auth)),
False,
):
return self.json_message(
"unauthorized", status_code=HTTPStatus.UNAUTHORIZED
)
try: # Konnected 2.2.0 and above supports JSON payloads
payload = await request.json()
except json.decoder.JSONDecodeError:
_LOGGER.error(
"Your Konnected device software may be out of "
"date. Visit https://help.konnected.io for "
"updating instructions"
)
if (device := data[CONF_DEVICES].get(device_id)) is None:
return self.json_message(
"unregistered device", status_code=HTTPStatus.BAD_REQUEST
)
if (panel := device.get("panel")) is not None:
# connect if we haven't already
hass.async_create_task(panel.async_connect())
try:
zone_num = str(payload.get(CONF_ZONE) or PIN_TO_ZONE[payload[CONF_PIN]])
payload[CONF_ZONE] = zone_num
zone_data = (
device[CONF_BINARY_SENSORS].get(zone_num)
or next(
(s for s in device[CONF_SWITCHES] if s[CONF_ZONE] == zone_num), None
)
or next(
(s for s in device[CONF_SENSORS] if s[CONF_ZONE] == zone_num), None
)
)
except KeyError:
zone_data = None
if zone_data is None:
return self.json_message(
"unregistered sensor/actuator", status_code=HTTPStatus.BAD_REQUEST
)
zone_data["device_id"] = device_id
for attr in ("state", "temp", "humi", "addr"):
value = payload.get(attr)
handler = HANDLERS.get(attr)
if value is not None and handler:
hass.async_create_task(handler(hass, zone_data, payload))
return self.json_message("ok")
async def get(self, request: Request, device_id) -> Response:
"""Return the current binary state of a switch."""
hass = request.app[KEY_HASS]
data = hass.data[DOMAIN]
if not (device := data[CONF_DEVICES].get(device_id)):
return self.json_message(
f"Device {device_id} not configured", status_code=HTTPStatus.NOT_FOUND
)
if (panel := device.get("panel")) is not None:
# connect if we haven't already
hass.async_create_task(panel.async_connect())
# Our data model is based on zone ids but we convert from/to pin ids
# based on whether they are specified in the request
try:
zone_num = str(
request.query.get(CONF_ZONE) or PIN_TO_ZONE[request.query[CONF_PIN]]
)
zone = next(
switch
for switch in device[CONF_SWITCHES]
if switch[CONF_ZONE] == zone_num
)
except StopIteration:
zone = None
except KeyError:
zone = None
zone_num = None
if not zone:
target = request.query.get(
CONF_ZONE, request.query.get(CONF_PIN, "unknown")
)
return self.json_message(
f"Switch on zone or pin {target} not configured",
status_code=HTTPStatus.NOT_FOUND,
)
resp = {}
if request.query.get(CONF_ZONE):
resp[CONF_ZONE] = zone_num
elif zone_num:
resp[CONF_PIN] = ZONE_TO_PIN[zone_num]
# Make sure entity is setup
if zone_entity_id := zone.get(ATTR_ENTITY_ID):
resp["state"] = self.binary_value(
hass.states.get(zone_entity_id).state, # type: ignore[union-attr]
zone[CONF_ACTIVATION],
)
return self.json(resp)
_LOGGER.warning("Konnected entity not yet setup, returning default")
resp["state"] = self.binary_value(STATE_OFF, zone[CONF_ACTIVATION])
return self.json(resp)
async def put(self, request: Request, device_id) -> Response:
"""Receive a sensor update via PUT request and async set state."""
return await self.update_sensor(request, device_id)
async def post(self, request: Request, device_id) -> Response:
"""Receive a sensor update via POST request and async set state."""
return await self.update_sensor(request, device_id)