mirror of https://github.com/home-assistant/core
117 lines
3.0 KiB
Python
117 lines
3.0 KiB
Python
"""Support for Automation Device Specification (ADS)."""
|
|
|
|
import logging
|
|
|
|
import pyads
|
|
import voluptuous as vol
|
|
|
|
from homeassistant.const import (
|
|
CONF_DEVICE,
|
|
CONF_IP_ADDRESS,
|
|
CONF_PORT,
|
|
EVENT_HOMEASSISTANT_STOP,
|
|
)
|
|
from homeassistant.core import HomeAssistant, ServiceCall
|
|
import homeassistant.helpers.config_validation as cv
|
|
from homeassistant.helpers.typing import ConfigType
|
|
|
|
from .const import CONF_ADS_VAR, DATA_ADS, DOMAIN, AdsType
|
|
from .hub import AdsHub
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
ADS_TYPEMAP = {
|
|
AdsType.BOOL: pyads.PLCTYPE_BOOL,
|
|
AdsType.BYTE: pyads.PLCTYPE_BYTE,
|
|
AdsType.INT: pyads.PLCTYPE_INT,
|
|
AdsType.UINT: pyads.PLCTYPE_UINT,
|
|
AdsType.SINT: pyads.PLCTYPE_SINT,
|
|
AdsType.USINT: pyads.PLCTYPE_USINT,
|
|
AdsType.DINT: pyads.PLCTYPE_DINT,
|
|
AdsType.UDINT: pyads.PLCTYPE_UDINT,
|
|
AdsType.WORD: pyads.PLCTYPE_WORD,
|
|
AdsType.DWORD: pyads.PLCTYPE_DWORD,
|
|
AdsType.REAL: pyads.PLCTYPE_REAL,
|
|
AdsType.LREAL: pyads.PLCTYPE_LREAL,
|
|
AdsType.STRING: pyads.PLCTYPE_STRING,
|
|
AdsType.TIME: pyads.PLCTYPE_TIME,
|
|
AdsType.DATE: pyads.PLCTYPE_DATE,
|
|
AdsType.DATE_AND_TIME: pyads.PLCTYPE_DT,
|
|
AdsType.TOD: pyads.PLCTYPE_TOD,
|
|
}
|
|
|
|
CONF_ADS_FACTOR = "factor"
|
|
CONF_ADS_TYPE = "adstype"
|
|
CONF_ADS_VALUE = "value"
|
|
|
|
|
|
SERVICE_WRITE_DATA_BY_NAME = "write_data_by_name"
|
|
|
|
CONFIG_SCHEMA = vol.Schema(
|
|
{
|
|
DOMAIN: vol.Schema(
|
|
{
|
|
vol.Required(CONF_DEVICE): cv.string,
|
|
vol.Required(CONF_PORT): cv.port,
|
|
vol.Optional(CONF_IP_ADDRESS): cv.string,
|
|
}
|
|
)
|
|
},
|
|
extra=vol.ALLOW_EXTRA,
|
|
)
|
|
|
|
SCHEMA_SERVICE_WRITE_DATA_BY_NAME = vol.Schema(
|
|
{
|
|
vol.Required(CONF_ADS_TYPE): vol.Coerce(AdsType),
|
|
vol.Required(CONF_ADS_VALUE): vol.Coerce(int),
|
|
vol.Required(CONF_ADS_VAR): cv.string,
|
|
}
|
|
)
|
|
|
|
|
|
def setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
|
"""Set up the ADS component."""
|
|
|
|
conf = config[DOMAIN]
|
|
|
|
net_id = conf[CONF_DEVICE]
|
|
ip_address = conf.get(CONF_IP_ADDRESS)
|
|
port = conf[CONF_PORT]
|
|
|
|
client = pyads.Connection(net_id, port, ip_address)
|
|
|
|
try:
|
|
ads = AdsHub(client)
|
|
except pyads.ADSError:
|
|
_LOGGER.error(
|
|
"Could not connect to ADS host (netid=%s, ip=%s, port=%s)",
|
|
net_id,
|
|
ip_address,
|
|
port,
|
|
)
|
|
return False
|
|
|
|
hass.data[DATA_ADS] = ads
|
|
hass.bus.listen(EVENT_HOMEASSISTANT_STOP, ads.shutdown)
|
|
|
|
def handle_write_data_by_name(call: ServiceCall) -> None:
|
|
"""Write a value to the connected ADS device."""
|
|
ads_var: str = call.data[CONF_ADS_VAR]
|
|
ads_type: AdsType = call.data[CONF_ADS_TYPE]
|
|
value: int = call.data[CONF_ADS_VALUE]
|
|
|
|
try:
|
|
ads.write_by_name(ads_var, value, ADS_TYPEMAP[ads_type])
|
|
except pyads.ADSError as err:
|
|
_LOGGER.error(err)
|
|
|
|
hass.services.register(
|
|
DOMAIN,
|
|
SERVICE_WRITE_DATA_BY_NAME,
|
|
handle_write_data_by_name,
|
|
schema=SCHEMA_SERVICE_WRITE_DATA_BY_NAME,
|
|
)
|
|
|
|
return True
|