mirror of https://github.com/home-assistant/core
189 lines
6.1 KiB
Python
189 lines
6.1 KiB
Python
"""Support for OpenWRT (ubus) routers."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import re
|
|
|
|
from openwrt.ubus import Ubus
|
|
import voluptuous as vol
|
|
|
|
from homeassistant.components.device_tracker import (
|
|
DOMAIN as DEVICE_TRACKER_DOMAIN,
|
|
PLATFORM_SCHEMA as DEVICE_TRACKER_PLATFORM_SCHEMA,
|
|
DeviceScanner,
|
|
)
|
|
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
|
|
from homeassistant.core import HomeAssistant
|
|
import homeassistant.helpers.config_validation as cv
|
|
from homeassistant.helpers.typing import ConfigType
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
CONF_DHCP_SOFTWARE = "dhcp_software"
|
|
DEFAULT_DHCP_SOFTWARE = "dnsmasq"
|
|
DHCP_SOFTWARES = ["dnsmasq", "odhcpd", "none"]
|
|
|
|
PLATFORM_SCHEMA = DEVICE_TRACKER_PLATFORM_SCHEMA.extend(
|
|
{
|
|
vol.Required(CONF_HOST): cv.string,
|
|
vol.Required(CONF_PASSWORD): cv.string,
|
|
vol.Required(CONF_USERNAME): cv.string,
|
|
vol.Optional(CONF_DHCP_SOFTWARE, default=DEFAULT_DHCP_SOFTWARE): vol.In(
|
|
DHCP_SOFTWARES
|
|
),
|
|
}
|
|
)
|
|
|
|
|
|
def get_scanner(hass: HomeAssistant, config: ConfigType) -> DeviceScanner | None:
|
|
"""Validate the configuration and return an ubus scanner."""
|
|
config = config[DEVICE_TRACKER_DOMAIN]
|
|
|
|
dhcp_sw = config[CONF_DHCP_SOFTWARE]
|
|
scanner: DeviceScanner
|
|
if dhcp_sw == "dnsmasq":
|
|
scanner = DnsmasqUbusDeviceScanner(config)
|
|
elif dhcp_sw == "odhcpd":
|
|
scanner = OdhcpdUbusDeviceScanner(config)
|
|
else:
|
|
scanner = UbusDeviceScanner(config)
|
|
|
|
return scanner if scanner.success_init else None
|
|
|
|
|
|
def _refresh_on_access_denied(func):
|
|
"""If remove rebooted, it lost our session so rebuild one and try again."""
|
|
|
|
def decorator(self, *args, **kwargs):
|
|
"""Wrap the function to refresh session_id on PermissionError."""
|
|
try:
|
|
return func(self, *args, **kwargs)
|
|
except PermissionError:
|
|
_LOGGER.warning(
|
|
"Invalid session detected. Trying to refresh session_id and re-run RPC"
|
|
)
|
|
self.ubus.connect()
|
|
|
|
return func(self, *args, **kwargs)
|
|
|
|
return decorator
|
|
|
|
|
|
class UbusDeviceScanner(DeviceScanner):
|
|
"""Class which queries a wireless router running OpenWrt firmware.
|
|
|
|
Adapted from Tomato scanner.
|
|
"""
|
|
|
|
def __init__(self, config):
|
|
"""Initialize the scanner."""
|
|
self.host = config[CONF_HOST]
|
|
self.username = config[CONF_USERNAME]
|
|
self.password = config[CONF_PASSWORD]
|
|
|
|
self.parse_api_pattern = re.compile(r"(?P<param>\w*) = (?P<value>.*);")
|
|
self.last_results = {}
|
|
self.url = f"http://{self.host}/ubus"
|
|
|
|
self.ubus = Ubus(self.url, self.username, self.password)
|
|
self.hostapd = []
|
|
self.mac2name = None
|
|
self.success_init = self.ubus.connect() is not None
|
|
|
|
def scan_devices(self):
|
|
"""Scan for new devices and return a list with found device IDs."""
|
|
self._update_info()
|
|
return self.last_results
|
|
|
|
def _generate_mac2name(self):
|
|
"""Return empty MAC to name dict. Overridden if DHCP server is set."""
|
|
self.mac2name = {}
|
|
|
|
@_refresh_on_access_denied
|
|
def get_device_name(self, device):
|
|
"""Return the name of the given device or None if we don't know."""
|
|
if self.mac2name is None:
|
|
self._generate_mac2name()
|
|
if self.mac2name is None:
|
|
# Generation of mac2name dictionary failed
|
|
return None
|
|
return self.mac2name.get(device.upper(), None)
|
|
|
|
async def async_get_extra_attributes(self, device: str) -> dict[str, str]:
|
|
"""Return the host to distinguish between multiple routers."""
|
|
return {"host": self.host}
|
|
|
|
@_refresh_on_access_denied
|
|
def _update_info(self):
|
|
"""Ensure the information from the router is up to date.
|
|
|
|
Returns boolean if scanning successful.
|
|
"""
|
|
if not self.success_init:
|
|
return False
|
|
|
|
_LOGGER.debug("Checking hostapd")
|
|
|
|
if not self.hostapd:
|
|
hostapd = self.ubus.get_hostapd()
|
|
self.hostapd.extend(hostapd.keys())
|
|
|
|
self.last_results = []
|
|
results = 0
|
|
# for each access point
|
|
for hostapd in self.hostapd:
|
|
if result := self.ubus.get_hostapd_clients(hostapd):
|
|
results = results + 1
|
|
# Check for each device is authorized (valid wpa key)
|
|
for key in result["clients"]:
|
|
device = result["clients"][key]
|
|
if device["authorized"]:
|
|
self.last_results.append(key)
|
|
|
|
return bool(results)
|
|
|
|
|
|
class DnsmasqUbusDeviceScanner(UbusDeviceScanner):
|
|
"""Implement the Ubus device scanning for the dnsmasq DHCP server."""
|
|
|
|
def __init__(self, config):
|
|
"""Initialize the scanner."""
|
|
super().__init__(config)
|
|
self.leasefile = None
|
|
|
|
def _generate_mac2name(self):
|
|
if self.leasefile is None:
|
|
if result := self.ubus.get_uci_config("dhcp", "dnsmasq"):
|
|
values = result["values"].values()
|
|
self.leasefile = next(iter(values))["leasefile"]
|
|
else:
|
|
return
|
|
|
|
result = self.ubus.file_read(self.leasefile)
|
|
if result:
|
|
self.mac2name = {}
|
|
for line in result["data"].splitlines():
|
|
hosts = line.split(" ")
|
|
self.mac2name[hosts[1].upper()] = hosts[3]
|
|
else:
|
|
# Error, handled in the ubus.file_read()
|
|
return
|
|
|
|
|
|
class OdhcpdUbusDeviceScanner(UbusDeviceScanner):
|
|
"""Implement the Ubus device scanning for the odhcp DHCP server."""
|
|
|
|
def _generate_mac2name(self):
|
|
if result := self.ubus.get_dhcp_method("ipv4leases"):
|
|
self.mac2name = {}
|
|
for device in result["device"].values():
|
|
for lease in device["leases"]:
|
|
mac = lease["mac"] # mac = aabbccddeeff
|
|
# Convert it to expected format with colon
|
|
mac = ":".join(mac[i : i + 2] for i in range(0, len(mac), 2))
|
|
self.mac2name[mac.upper()] = lease["hostname"]
|
|
else:
|
|
# Error, handled in the ubus.get_dhcp_method()
|
|
return
|