core/homeassistant/components/otbr/util.py

289 lines
9.4 KiB
Python

"""Utility functions for the Open Thread Border Router integration."""
from __future__ import annotations
from collections.abc import Callable, Coroutine
import dataclasses
from functools import wraps
import logging
import random
from typing import TYPE_CHECKING, Any, Concatenate, cast
import aiohttp
import python_otbr_api
from python_otbr_api import PENDING_DATASET_DELAY_TIMER, tlv_parser
from python_otbr_api.pskc import compute_pskc
from python_otbr_api.tlv_parser import MeshcopTLVType
from homeassistant.components.homeassistant_hardware.silabs_multiprotocol_addon import (
MultiprotocolAddonManager,
get_multiprotocol_addon_manager,
is_multiprotocol_url,
multi_pan_addon_using_device,
)
from homeassistant.components.homeassistant_yellow import RADIO_DEVICE as YELLOW_RADIO
from homeassistant.config_entries import SOURCE_USER
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import issue_registry as ir
from .const import DOMAIN
if TYPE_CHECKING:
from . import OTBRConfigEntry
_LOGGER = logging.getLogger(__name__)
INFO_URL_SKY_CONNECT = (
"https://skyconnect.home-assistant.io/multiprotocol-channel-missmatch"
)
INFO_URL_YELLOW = "https://yellow.home-assistant.io/multiprotocol-channel-missmatch"
INSECURE_NETWORK_KEYS = (
# Thread web UI default
bytes.fromhex("00112233445566778899AABBCCDDEEFF"),
)
INSECURE_PASSPHRASES = (
# Thread web UI default
"j01Nme",
# Thread documentation default
"J01NME",
)
class GetBorderAgentIdNotSupported(HomeAssistantError):
"""Raised from python_otbr_api.GetBorderAgentIdNotSupportedError."""
def compose_default_network_name(pan_id: int) -> str:
"""Generate a default network name."""
return f"ha-thread-{pan_id:04x}"
def generate_random_pan_id() -> int:
"""Generate a random PAN ID."""
# PAN ID is 2 bytes, 0xffff is reserved for broadcast
return random.randint(0, 0xFFFE)
def _handle_otbr_error[**_P, _R](
func: Callable[Concatenate[OTBRData, _P], Coroutine[Any, Any, _R]],
) -> Callable[Concatenate[OTBRData, _P], Coroutine[Any, Any, _R]]:
"""Handle OTBR errors."""
@wraps(func)
async def _func(self: OTBRData, *args: _P.args, **kwargs: _P.kwargs) -> _R:
try:
return await func(self, *args, **kwargs)
except (python_otbr_api.OTBRError, aiohttp.ClientError, TimeoutError) as exc:
raise HomeAssistantError("Failed to call OTBR API") from exc
return _func
@dataclasses.dataclass
class OTBRData:
"""Container for OTBR data."""
url: str
api: python_otbr_api.OTBR
entry_id: str
@_handle_otbr_error
async def factory_reset(self, hass: HomeAssistant) -> None:
"""Reset the router."""
try:
await self.api.factory_reset()
except python_otbr_api.FactoryResetNotSupportedError:
_LOGGER.warning(
"OTBR does not support factory reset, attempting to delete dataset"
)
await self.delete_active_dataset()
await update_unique_id(
hass,
hass.config_entries.async_get_entry(self.entry_id),
await self.get_border_agent_id(),
)
@_handle_otbr_error
async def get_border_agent_id(self) -> bytes:
"""Get the border agent ID or None if not supported by the router."""
try:
return await self.api.get_border_agent_id()
except python_otbr_api.GetBorderAgentIdNotSupportedError as exc:
raise GetBorderAgentIdNotSupported from exc
@_handle_otbr_error
async def set_enabled(self, enabled: bool) -> None:
"""Enable or disable the router."""
return await self.api.set_enabled(enabled)
@_handle_otbr_error
async def get_active_dataset(self) -> python_otbr_api.ActiveDataSet | None:
"""Get current active operational dataset, or None."""
return await self.api.get_active_dataset()
@_handle_otbr_error
async def get_active_dataset_tlvs(self) -> bytes | None:
"""Get current active operational dataset in TLVS format, or None."""
return await self.api.get_active_dataset_tlvs()
@_handle_otbr_error
async def get_pending_dataset_tlvs(self) -> bytes | None:
"""Get current pending operational dataset in TLVS format, or None."""
return await self.api.get_pending_dataset_tlvs()
@_handle_otbr_error
async def create_active_dataset(
self, dataset: python_otbr_api.ActiveDataSet
) -> None:
"""Create an active operational dataset."""
return await self.api.create_active_dataset(dataset)
@_handle_otbr_error
async def delete_active_dataset(self) -> None:
"""Delete the active operational dataset."""
return await self.api.delete_active_dataset()
@_handle_otbr_error
async def set_active_dataset_tlvs(self, dataset: bytes) -> None:
"""Set current active operational dataset in TLVS format."""
await self.api.set_active_dataset_tlvs(dataset)
@_handle_otbr_error
async def set_channel(
self, channel: int, delay: float = PENDING_DATASET_DELAY_TIMER / 1000
) -> None:
"""Set current channel."""
await self.api.set_channel(channel, delay=int(delay * 1000))
@_handle_otbr_error
async def get_extended_address(self) -> bytes:
"""Get extended address (EUI-64)."""
return await self.api.get_extended_address()
async def get_allowed_channel(hass: HomeAssistant, otbr_url: str) -> int | None:
"""Return the allowed channel, or None if there's no restriction."""
if not is_multiprotocol_url(otbr_url):
# The OTBR is not sharing the radio, no restriction
return None
multipan_manager: MultiprotocolAddonManager = await get_multiprotocol_addon_manager(
hass
)
return multipan_manager.async_get_channel()
async def _warn_on_channel_collision(
hass: HomeAssistant, otbrdata: OTBRData, dataset_tlvs: bytes
) -> None:
"""Warn user if OTBR and ZHA attempt to use different channels."""
def delete_issue() -> None:
ir.async_delete_issue(
hass,
DOMAIN,
f"otbr_zha_channel_collision_{otbrdata.entry_id}",
)
if (allowed_channel := await get_allowed_channel(hass, otbrdata.url)) is None:
delete_issue()
return
dataset = tlv_parser.parse_tlv(dataset_tlvs.hex())
if (channel_s := dataset.get(MeshcopTLVType.CHANNEL)) is None:
delete_issue()
return
channel = cast(tlv_parser.Channel, channel_s).channel
if channel == allowed_channel:
delete_issue()
return
yellow = await multi_pan_addon_using_device(hass, YELLOW_RADIO)
learn_more_url = INFO_URL_YELLOW if yellow else INFO_URL_SKY_CONNECT
ir.async_create_issue(
hass,
DOMAIN,
f"otbr_zha_channel_collision_{otbrdata.entry_id}",
is_fixable=False,
is_persistent=False,
learn_more_url=learn_more_url,
severity=ir.IssueSeverity.WARNING,
translation_key="otbr_zha_channel_collision",
translation_placeholders={
"otbr_channel": str(channel),
"zha_channel": str(allowed_channel),
},
)
def _warn_on_default_network_settings(
hass: HomeAssistant, otbrdata: OTBRData, dataset_tlvs: bytes
) -> None:
"""Warn user if insecure default network settings are used."""
dataset = tlv_parser.parse_tlv(dataset_tlvs.hex())
insecure = False
if (
network_key := dataset.get(MeshcopTLVType.NETWORKKEY)
) is not None and network_key.data in INSECURE_NETWORK_KEYS:
insecure = True
if (
not insecure
and MeshcopTLVType.EXTPANID in dataset
and MeshcopTLVType.NETWORKNAME in dataset
and MeshcopTLVType.PSKC in dataset
):
ext_pan_id = dataset[MeshcopTLVType.EXTPANID]
network_name = cast(tlv_parser.NetworkName, dataset[MeshcopTLVType.NETWORKNAME])
pskc = dataset[MeshcopTLVType.PSKC].data
for passphrase in INSECURE_PASSPHRASES:
if pskc == compute_pskc(ext_pan_id.data, network_name.name, passphrase):
insecure = True
break
if insecure:
ir.async_create_issue(
hass,
DOMAIN,
f"insecure_thread_network_{otbrdata.entry_id}",
is_fixable=False,
is_persistent=False,
severity=ir.IssueSeverity.WARNING,
translation_key="insecure_thread_network",
)
else:
ir.async_delete_issue(
hass,
DOMAIN,
f"insecure_thread_network_{otbrdata.entry_id}",
)
async def update_issues(
hass: HomeAssistant, otbrdata: OTBRData, dataset_tlvs: bytes
) -> None:
"""Raise or clear repair issues related to network settings."""
await _warn_on_channel_collision(hass, otbrdata, dataset_tlvs)
_warn_on_default_network_settings(hass, otbrdata, dataset_tlvs)
async def update_unique_id(
hass: HomeAssistant, entry: OTBRConfigEntry | None, border_agent_id: bytes
) -> None:
"""Update the config entry's unique_id if not matching."""
border_agent_id_hex = border_agent_id.hex()
if entry and entry.source == SOURCE_USER and entry.unique_id != border_agent_id_hex:
_LOGGER.debug(
"Updating unique_id of entry %s from %s to %s",
entry.entry_id,
entry.unique_id,
border_agent_id_hex,
)
hass.config_entries.async_update_entry(entry, unique_id=border_agent_id_hex)