core/homeassistant/components/ads/__init__.py

117 lines
3.0 KiB
Python

"""Support for Automation Device Specification (ADS)."""
import logging
import pyads
import voluptuous as vol
from homeassistant.const import (
CONF_DEVICE,
CONF_IP_ADDRESS,
CONF_PORT,
EVENT_HOMEASSISTANT_STOP,
)
from homeassistant.core import HomeAssistant, ServiceCall
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.typing import ConfigType
from .const import CONF_ADS_VAR, DATA_ADS, DOMAIN, AdsType
from .hub import AdsHub
_LOGGER = logging.getLogger(__name__)
ADS_TYPEMAP = {
AdsType.BOOL: pyads.PLCTYPE_BOOL,
AdsType.BYTE: pyads.PLCTYPE_BYTE,
AdsType.INT: pyads.PLCTYPE_INT,
AdsType.UINT: pyads.PLCTYPE_UINT,
AdsType.SINT: pyads.PLCTYPE_SINT,
AdsType.USINT: pyads.PLCTYPE_USINT,
AdsType.DINT: pyads.PLCTYPE_DINT,
AdsType.UDINT: pyads.PLCTYPE_UDINT,
AdsType.WORD: pyads.PLCTYPE_WORD,
AdsType.DWORD: pyads.PLCTYPE_DWORD,
AdsType.REAL: pyads.PLCTYPE_REAL,
AdsType.LREAL: pyads.PLCTYPE_LREAL,
AdsType.STRING: pyads.PLCTYPE_STRING,
AdsType.TIME: pyads.PLCTYPE_TIME,
AdsType.DATE: pyads.PLCTYPE_DATE,
AdsType.DATE_AND_TIME: pyads.PLCTYPE_DT,
AdsType.TOD: pyads.PLCTYPE_TOD,
}
CONF_ADS_FACTOR = "factor"
CONF_ADS_TYPE = "adstype"
CONF_ADS_VALUE = "value"
SERVICE_WRITE_DATA_BY_NAME = "write_data_by_name"
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Required(CONF_DEVICE): cv.string,
vol.Required(CONF_PORT): cv.port,
vol.Optional(CONF_IP_ADDRESS): cv.string,
}
)
},
extra=vol.ALLOW_EXTRA,
)
SCHEMA_SERVICE_WRITE_DATA_BY_NAME = vol.Schema(
{
vol.Required(CONF_ADS_TYPE): vol.Coerce(AdsType),
vol.Required(CONF_ADS_VALUE): vol.Coerce(int),
vol.Required(CONF_ADS_VAR): cv.string,
}
)
def setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the ADS component."""
conf = config[DOMAIN]
net_id = conf[CONF_DEVICE]
ip_address = conf.get(CONF_IP_ADDRESS)
port = conf[CONF_PORT]
client = pyads.Connection(net_id, port, ip_address)
try:
ads = AdsHub(client)
except pyads.ADSError:
_LOGGER.error(
"Could not connect to ADS host (netid=%s, ip=%s, port=%s)",
net_id,
ip_address,
port,
)
return False
hass.data[DATA_ADS] = ads
hass.bus.listen(EVENT_HOMEASSISTANT_STOP, ads.shutdown)
def handle_write_data_by_name(call: ServiceCall) -> None:
"""Write a value to the connected ADS device."""
ads_var: str = call.data[CONF_ADS_VAR]
ads_type: AdsType = call.data[CONF_ADS_TYPE]
value: int = call.data[CONF_ADS_VALUE]
try:
ads.write_by_name(ads_var, value, ADS_TYPEMAP[ads_type])
except pyads.ADSError as err:
_LOGGER.error(err)
hass.services.register(
DOMAIN,
SERVICE_WRITE_DATA_BY_NAME,
handle_write_data_by_name,
schema=SCHEMA_SERVICE_WRITE_DATA_BY_NAME,
)
return True