mirror of https://github.com/home-assistant/core
273 lines
10 KiB
Python
273 lines
10 KiB
Python
"""The Fronius integration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from datetime import datetime, timedelta
|
|
import logging
|
|
from typing import Final
|
|
|
|
from pyfronius import Fronius, FroniusError
|
|
|
|
from homeassistant.config_entries import ConfigEntry, ConfigEntryState
|
|
from homeassistant.const import ATTR_MODEL, ATTR_SW_VERSION, CONF_HOST, Platform
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.exceptions import ConfigEntryNotReady
|
|
from homeassistant.helpers import device_registry as dr
|
|
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
|
from homeassistant.helpers.device_registry import DeviceInfo
|
|
from homeassistant.helpers.dispatcher import async_dispatcher_send
|
|
from homeassistant.helpers.event import async_track_time_interval
|
|
|
|
from .const import (
|
|
DOMAIN,
|
|
SOLAR_NET_DISCOVERY_NEW,
|
|
SOLAR_NET_ID_SYSTEM,
|
|
SOLAR_NET_RESCAN_TIMER,
|
|
FroniusDeviceInfo,
|
|
)
|
|
from .coordinator import (
|
|
FroniusCoordinatorBase,
|
|
FroniusInverterUpdateCoordinator,
|
|
FroniusLoggerUpdateCoordinator,
|
|
FroniusMeterUpdateCoordinator,
|
|
FroniusOhmpilotUpdateCoordinator,
|
|
FroniusPowerFlowUpdateCoordinator,
|
|
FroniusStorageUpdateCoordinator,
|
|
)
|
|
|
|
_LOGGER: Final = logging.getLogger(__name__)
|
|
PLATFORMS: Final = [Platform.SENSOR]
|
|
|
|
type FroniusConfigEntry = ConfigEntry[FroniusSolarNet]
|
|
|
|
|
|
async def async_setup_entry(hass: HomeAssistant, entry: FroniusConfigEntry) -> bool:
|
|
"""Set up fronius from a config entry."""
|
|
host = entry.data[CONF_HOST]
|
|
fronius = Fronius(async_get_clientsession(hass), host)
|
|
solar_net = FroniusSolarNet(hass, entry, fronius)
|
|
await solar_net.init_devices()
|
|
|
|
entry.runtime_data = solar_net
|
|
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
|
|
return True
|
|
|
|
|
|
async def async_unload_entry(hass: HomeAssistant, entry: FroniusConfigEntry) -> bool:
|
|
"""Unload a config entry."""
|
|
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
|
|
|
|
|
|
async def async_remove_config_entry_device(
|
|
hass: HomeAssistant, config_entry: ConfigEntry, device_entry: dr.DeviceEntry
|
|
) -> bool:
|
|
"""Remove a config entry from a device."""
|
|
return True
|
|
|
|
|
|
class FroniusSolarNet:
|
|
"""The FroniusSolarNet class routes received values to sensor entities."""
|
|
|
|
def __init__(
|
|
self, hass: HomeAssistant, entry: ConfigEntry, fronius: Fronius
|
|
) -> None:
|
|
"""Initialize FroniusSolarNet class."""
|
|
self.hass = hass
|
|
self.config_entry = entry
|
|
self.coordinator_lock = asyncio.Lock()
|
|
self.fronius = fronius
|
|
self.host: str = entry.data[CONF_HOST]
|
|
# entry.unique_id is either logger uid or first inverter uid if no logger available
|
|
# prepended by "solar_net_" to have individual device for whole system (power_flow)
|
|
self.solar_net_device_id = f"solar_net_{entry.unique_id}"
|
|
self.system_device_info: DeviceInfo | None = None
|
|
|
|
self.inverter_coordinators: list[FroniusInverterUpdateCoordinator] = []
|
|
self.logger_coordinator: FroniusLoggerUpdateCoordinator | None = None
|
|
self.meter_coordinator: FroniusMeterUpdateCoordinator | None = None
|
|
self.ohmpilot_coordinator: FroniusOhmpilotUpdateCoordinator | None = None
|
|
self.power_flow_coordinator: FroniusPowerFlowUpdateCoordinator | None = None
|
|
self.storage_coordinator: FroniusStorageUpdateCoordinator | None = None
|
|
|
|
async def init_devices(self) -> None:
|
|
"""Initialize DataUpdateCoordinators for SolarNet devices."""
|
|
if self.config_entry.data["is_logger"]:
|
|
self.logger_coordinator = FroniusLoggerUpdateCoordinator(
|
|
hass=self.hass,
|
|
solar_net=self,
|
|
logger=_LOGGER,
|
|
name=f"{DOMAIN}_logger_{self.host}",
|
|
)
|
|
await self.logger_coordinator.async_config_entry_first_refresh()
|
|
|
|
# _create_solar_net_device uses data from self.logger_coordinator when available
|
|
self.system_device_info = await self._create_solar_net_device()
|
|
|
|
await self._init_devices_inverter()
|
|
|
|
self.meter_coordinator = await self._init_optional_coordinator(
|
|
FroniusMeterUpdateCoordinator(
|
|
hass=self.hass,
|
|
solar_net=self,
|
|
logger=_LOGGER,
|
|
name=f"{DOMAIN}_meters_{self.host}",
|
|
)
|
|
)
|
|
|
|
self.ohmpilot_coordinator = await self._init_optional_coordinator(
|
|
FroniusOhmpilotUpdateCoordinator(
|
|
hass=self.hass,
|
|
solar_net=self,
|
|
logger=_LOGGER,
|
|
name=f"{DOMAIN}_ohmpilot_{self.host}",
|
|
)
|
|
)
|
|
|
|
self.power_flow_coordinator = await self._init_optional_coordinator(
|
|
FroniusPowerFlowUpdateCoordinator(
|
|
hass=self.hass,
|
|
solar_net=self,
|
|
logger=_LOGGER,
|
|
name=f"{DOMAIN}_power_flow_{self.host}",
|
|
)
|
|
)
|
|
|
|
self.storage_coordinator = await self._init_optional_coordinator(
|
|
FroniusStorageUpdateCoordinator(
|
|
hass=self.hass,
|
|
solar_net=self,
|
|
logger=_LOGGER,
|
|
name=f"{DOMAIN}_storages_{self.host}",
|
|
)
|
|
)
|
|
|
|
# Setup periodic re-scan
|
|
self.config_entry.async_on_unload(
|
|
async_track_time_interval(
|
|
self.hass,
|
|
self._init_devices_inverter,
|
|
timedelta(minutes=SOLAR_NET_RESCAN_TIMER),
|
|
)
|
|
)
|
|
|
|
async def _create_solar_net_device(self) -> DeviceInfo:
|
|
"""Create a device for the Fronius SolarNet system."""
|
|
solar_net_device: DeviceInfo = DeviceInfo(
|
|
configuration_url=self.fronius.url,
|
|
identifiers={(DOMAIN, self.solar_net_device_id)},
|
|
manufacturer="Fronius",
|
|
name="SolarNet",
|
|
)
|
|
if self.logger_coordinator:
|
|
_logger_info = self.logger_coordinator.data[SOLAR_NET_ID_SYSTEM]
|
|
# API v0 doesn't provide product_type
|
|
solar_net_device[ATTR_MODEL] = _logger_info.get("product_type", {}).get(
|
|
"value", "Datalogger Web"
|
|
)
|
|
solar_net_device[ATTR_SW_VERSION] = _logger_info["software_version"][
|
|
"value"
|
|
]
|
|
|
|
device_registry = dr.async_get(self.hass)
|
|
device_registry.async_get_or_create(
|
|
config_entry_id=self.config_entry.entry_id,
|
|
**solar_net_device,
|
|
)
|
|
return solar_net_device
|
|
|
|
async def _init_devices_inverter(self, _now: datetime | None = None) -> None:
|
|
"""Get available inverters and set up coordinators for new found devices."""
|
|
_inverter_infos = await self._get_inverter_infos()
|
|
|
|
_LOGGER.debug("Processing inverters for: %s", _inverter_infos)
|
|
for _inverter_info in _inverter_infos:
|
|
_inverter_name = (
|
|
f"{DOMAIN}_inverter_{_inverter_info.solar_net_id}_{self.host}"
|
|
)
|
|
|
|
# Add found inverter only not already existing
|
|
if _inverter_info.solar_net_id in [
|
|
inv.inverter_info.solar_net_id for inv in self.inverter_coordinators
|
|
]:
|
|
continue
|
|
|
|
_coordinator = FroniusInverterUpdateCoordinator(
|
|
hass=self.hass,
|
|
solar_net=self,
|
|
logger=_LOGGER,
|
|
name=_inverter_name,
|
|
inverter_info=_inverter_info,
|
|
)
|
|
if self.config_entry.state == ConfigEntryState.LOADED:
|
|
await _coordinator.async_refresh()
|
|
else:
|
|
await _coordinator.async_config_entry_first_refresh()
|
|
self.inverter_coordinators.append(_coordinator)
|
|
|
|
# Only for re-scans. Initial setup adds entities through sensor.async_setup_entry
|
|
if self.config_entry.state == ConfigEntryState.LOADED:
|
|
async_dispatcher_send(self.hass, SOLAR_NET_DISCOVERY_NEW, _coordinator)
|
|
|
|
_LOGGER.debug(
|
|
"New inverter added (UID: %s)",
|
|
_inverter_info.unique_id,
|
|
)
|
|
|
|
async def _get_inverter_infos(self) -> list[FroniusDeviceInfo]:
|
|
"""Get information about the inverters in the SolarNet system."""
|
|
inverter_infos: list[FroniusDeviceInfo] = []
|
|
|
|
try:
|
|
_inverter_info = await self.fronius.inverter_info()
|
|
except FroniusError as err:
|
|
if self.config_entry.state == ConfigEntryState.LOADED:
|
|
# During a re-scan we will attempt again as per schedule.
|
|
_LOGGER.debug("Re-scan failed for %s", self.host)
|
|
return inverter_infos
|
|
|
|
raise ConfigEntryNotReady from err
|
|
|
|
for inverter in _inverter_info["inverters"]:
|
|
solar_net_id = inverter["device_id"]["value"]
|
|
unique_id = inverter["unique_id"]["value"]
|
|
device_info = DeviceInfo(
|
|
identifiers={(DOMAIN, unique_id)},
|
|
manufacturer=inverter["device_type"].get("manufacturer", "Fronius"),
|
|
model=inverter["device_type"].get(
|
|
"model", inverter["device_type"]["value"]
|
|
),
|
|
name=inverter.get("custom_name", {}).get("value"),
|
|
via_device=(DOMAIN, self.solar_net_device_id),
|
|
)
|
|
inverter_infos.append(
|
|
FroniusDeviceInfo(
|
|
device_info=device_info,
|
|
solar_net_id=solar_net_id,
|
|
unique_id=unique_id,
|
|
)
|
|
)
|
|
_LOGGER.debug(
|
|
"Inverter found at %s (Device ID: %s, UID: %s)",
|
|
self.host,
|
|
solar_net_id,
|
|
unique_id,
|
|
)
|
|
return inverter_infos
|
|
|
|
@staticmethod
|
|
async def _init_optional_coordinator[_FroniusCoordinatorT: FroniusCoordinatorBase](
|
|
coordinator: _FroniusCoordinatorT,
|
|
) -> _FroniusCoordinatorT | None:
|
|
"""Initialize an update coordinator and return it if devices are found."""
|
|
try:
|
|
await coordinator.async_config_entry_first_refresh()
|
|
except ConfigEntryNotReady:
|
|
# ConfigEntryNotReady raised form FroniusError / KeyError in
|
|
# DataUpdateCoordinator if request not supported by the Fronius device
|
|
return None
|
|
# if no device for the request is installed an empty dict is returned
|
|
if not coordinator.data:
|
|
return None
|
|
return coordinator
|