mirror of https://github.com/home-assistant/core
147 lines
5.5 KiB
Python
147 lines
5.5 KiB
Python
"""Amber Electric Coordinator."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import timedelta
|
|
from typing import Any
|
|
|
|
import amberelectric
|
|
from amberelectric.models.actual_interval import ActualInterval
|
|
from amberelectric.models.channel import ChannelType
|
|
from amberelectric.models.current_interval import CurrentInterval
|
|
from amberelectric.models.forecast_interval import ForecastInterval
|
|
from amberelectric.models.price_descriptor import PriceDescriptor
|
|
from amberelectric.rest import ApiException
|
|
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
|
|
|
from .const import LOGGER
|
|
|
|
|
|
def is_current(interval: ActualInterval | CurrentInterval | ForecastInterval) -> bool:
|
|
"""Return true if the supplied interval is a CurrentInterval."""
|
|
return isinstance(interval, CurrentInterval)
|
|
|
|
|
|
def is_forecast(interval: ActualInterval | CurrentInterval | ForecastInterval) -> bool:
|
|
"""Return true if the supplied interval is a ForecastInterval."""
|
|
return isinstance(interval, ForecastInterval)
|
|
|
|
|
|
def is_general(interval: ActualInterval | CurrentInterval | ForecastInterval) -> bool:
|
|
"""Return true if the supplied interval is on the general channel."""
|
|
return interval.channel_type == ChannelType.GENERAL
|
|
|
|
|
|
def is_controlled_load(
|
|
interval: ActualInterval | CurrentInterval | ForecastInterval,
|
|
) -> bool:
|
|
"""Return true if the supplied interval is on the controlled load channel."""
|
|
return interval.channel_type == ChannelType.CONTROLLEDLOAD
|
|
|
|
|
|
def is_feed_in(interval: ActualInterval | CurrentInterval | ForecastInterval) -> bool:
|
|
"""Return true if the supplied interval is on the feed in channel."""
|
|
return interval.channel_type == ChannelType.FEEDIN
|
|
|
|
|
|
def normalize_descriptor(descriptor: PriceDescriptor | None) -> str | None:
|
|
"""Return the snake case versions of descriptor names. Returns None if the name is not recognized."""
|
|
if descriptor is None:
|
|
return None
|
|
if descriptor.value == "spike":
|
|
return "spike"
|
|
if descriptor.value == "high":
|
|
return "high"
|
|
if descriptor.value == "neutral":
|
|
return "neutral"
|
|
if descriptor.value == "low":
|
|
return "low"
|
|
if descriptor.value == "veryLow":
|
|
return "very_low"
|
|
if descriptor.value == "extremelyLow":
|
|
return "extremely_low"
|
|
if descriptor.value == "negative":
|
|
return "negative"
|
|
return None
|
|
|
|
|
|
class AmberUpdateCoordinator(DataUpdateCoordinator):
|
|
"""AmberUpdateCoordinator - In charge of downloading the data for a site, which all the sensors read."""
|
|
|
|
def __init__(
|
|
self, hass: HomeAssistant, api: amberelectric.AmberApi, site_id: str
|
|
) -> None:
|
|
"""Initialise the data service."""
|
|
super().__init__(
|
|
hass,
|
|
LOGGER,
|
|
name="amberelectric",
|
|
update_interval=timedelta(minutes=1),
|
|
)
|
|
self._api = api
|
|
self.site_id = site_id
|
|
|
|
def update_price_data(self) -> dict[str, dict[str, Any]]:
|
|
"""Update callback."""
|
|
|
|
result: dict[str, dict[str, Any]] = {
|
|
"current": {},
|
|
"descriptors": {},
|
|
"forecasts": {},
|
|
"grid": {},
|
|
}
|
|
try:
|
|
data = self._api.get_current_prices(self.site_id, next=48)
|
|
intervals = [interval.actual_instance for interval in data]
|
|
except ApiException as api_exception:
|
|
raise UpdateFailed("Missing price data, skipping update") from api_exception
|
|
|
|
current = [interval for interval in intervals if is_current(interval)]
|
|
forecasts = [interval for interval in intervals if is_forecast(interval)]
|
|
general = [interval for interval in current if is_general(interval)]
|
|
|
|
if len(general) == 0:
|
|
raise UpdateFailed("No general channel configured")
|
|
|
|
result["current"]["general"] = general[0]
|
|
result["descriptors"]["general"] = normalize_descriptor(general[0].descriptor)
|
|
result["forecasts"]["general"] = [
|
|
interval for interval in forecasts if is_general(interval)
|
|
]
|
|
result["grid"]["renewables"] = round(general[0].renewables)
|
|
result["grid"]["price_spike"] = general[0].spike_status.value
|
|
tariff_information = general[0].tariff_information
|
|
if tariff_information:
|
|
result["grid"]["demand_window"] = tariff_information.demand_window
|
|
|
|
controlled_load = [
|
|
interval for interval in current if is_controlled_load(interval)
|
|
]
|
|
if controlled_load:
|
|
result["current"]["controlled_load"] = controlled_load[0]
|
|
result["descriptors"]["controlled_load"] = normalize_descriptor(
|
|
controlled_load[0].descriptor
|
|
)
|
|
result["forecasts"]["controlled_load"] = [
|
|
interval for interval in forecasts if is_controlled_load(interval)
|
|
]
|
|
|
|
feed_in = [interval for interval in current if is_feed_in(interval)]
|
|
if feed_in:
|
|
result["current"]["feed_in"] = feed_in[0]
|
|
result["descriptors"]["feed_in"] = normalize_descriptor(
|
|
feed_in[0].descriptor
|
|
)
|
|
result["forecasts"]["feed_in"] = [
|
|
interval for interval in forecasts if is_feed_in(interval)
|
|
]
|
|
|
|
LOGGER.debug("Fetched new Amber data: %s", intervals)
|
|
return result
|
|
|
|
async def _async_update_data(self) -> dict[str, Any]:
|
|
"""Async update wrapper."""
|
|
return await self.hass.async_add_executor_job(self.update_price_data)
|