core/homeassistant/components/teslemetry/coordinator.py

205 lines
6.9 KiB
Python

"""Teslemetry Data Coordinator."""
from datetime import datetime, timedelta
from typing import Any
from tesla_fleet_api import EnergySpecific, VehicleSpecific
from tesla_fleet_api.const import TeslaEnergyPeriod, VehicleDataEndpoint
from tesla_fleet_api.exceptions import (
Forbidden,
InvalidToken,
SubscriptionRequired,
TeslaFleetError,
VehicleOffline,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import ENERGY_HISTORY_FIELDS, LOGGER, TeslemetryState
from .helpers import flatten
VEHICLE_INTERVAL = timedelta(seconds=30)
VEHICLE_WAIT = timedelta(minutes=15)
ENERGY_LIVE_INTERVAL = timedelta(seconds=30)
ENERGY_INFO_INTERVAL = timedelta(seconds=30)
ENERGY_HISTORY_INTERVAL = timedelta(seconds=60)
ENDPOINTS = [
VehicleDataEndpoint.CHARGE_STATE,
VehicleDataEndpoint.CLIMATE_STATE,
VehicleDataEndpoint.DRIVE_STATE,
VehicleDataEndpoint.LOCATION_DATA,
VehicleDataEndpoint.VEHICLE_STATE,
VehicleDataEndpoint.VEHICLE_CONFIG,
]
class TeslemetryVehicleDataCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"""Class to manage fetching data from the Teslemetry API."""
updated_once: bool
last_active: datetime
def __init__(
self, hass: HomeAssistant, api: VehicleSpecific, product: dict
) -> None:
"""Initialize Teslemetry Vehicle Update Coordinator."""
super().__init__(
hass,
LOGGER,
name="Teslemetry Vehicle",
update_interval=VEHICLE_INTERVAL,
)
self.api = api
self.data = flatten(product)
self.updated_once = False
self.last_active = datetime.now()
async def _async_update_data(self) -> dict[str, Any]:
"""Update vehicle data using Teslemetry API."""
self.update_interval = VEHICLE_INTERVAL
try:
if self.data["state"] != TeslemetryState.ONLINE:
response = await self.api.vehicle()
self.data["state"] = response["response"]["state"]
if self.data["state"] != TeslemetryState.ONLINE:
return self.data
response = await self.api.vehicle_data(endpoints=ENDPOINTS)
data = response["response"]
except VehicleOffline:
self.data["state"] = TeslemetryState.OFFLINE
return self.data
except InvalidToken as e:
raise ConfigEntryAuthFailed from e
except SubscriptionRequired as e:
raise ConfigEntryAuthFailed from e
except TeslaFleetError as e:
raise UpdateFailed(e.message) from e
self.updated_once = True
if self.api.pre2021 and data["state"] == TeslemetryState.ONLINE:
# Handle pre-2021 vehicles which cannot sleep by themselves
if (
data["charge_state"].get("charging_state") == "Charging"
or data["vehicle_state"].get("is_user_present")
or data["vehicle_state"].get("sentry_mode")
):
# Vehicle is active, reset timer
self.last_active = datetime.now()
else:
elapsed = datetime.now() - self.last_active
if elapsed > timedelta(minutes=20):
# Vehicle didn't sleep, try again in 15 minutes
self.last_active = datetime.now()
elif elapsed > timedelta(minutes=15):
# Let vehicle go to sleep now
self.update_interval = VEHICLE_WAIT
return flatten(data)
class TeslemetryEnergySiteLiveCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"""Class to manage fetching energy site live status from the Teslemetry API."""
updated_once: bool
def __init__(self, hass: HomeAssistant, api: EnergySpecific) -> None:
"""Initialize Teslemetry Energy Site Live coordinator."""
super().__init__(
hass,
LOGGER,
name="Teslemetry Energy Site Live",
update_interval=ENERGY_LIVE_INTERVAL,
)
self.api = api
async def _async_update_data(self) -> dict[str, Any]:
"""Update energy site data using Teslemetry API."""
try:
data = (await self.api.live_status())["response"]
except (InvalidToken, Forbidden, SubscriptionRequired) as e:
raise ConfigEntryAuthFailed from e
except TeslaFleetError as e:
raise UpdateFailed(e.message) from e
# Convert Wall Connectors from array to dict
data["wall_connectors"] = {
wc["din"]: wc for wc in (data.get("wall_connectors") or [])
}
return data
class TeslemetryEnergySiteInfoCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"""Class to manage fetching energy site info from the Teslemetry API."""
updated_once: bool
def __init__(self, hass: HomeAssistant, api: EnergySpecific, product: dict) -> None:
"""Initialize Teslemetry Energy Info coordinator."""
super().__init__(
hass,
LOGGER,
name="Teslemetry Energy Site Info",
update_interval=ENERGY_INFO_INTERVAL,
)
self.api = api
self.data = product
async def _async_update_data(self) -> dict[str, Any]:
"""Update energy site data using Teslemetry API."""
try:
data = (await self.api.site_info())["response"]
except (InvalidToken, Forbidden, SubscriptionRequired) as e:
raise ConfigEntryAuthFailed from e
except TeslaFleetError as e:
raise UpdateFailed(e.message) from e
return flatten(data)
class TeslemetryEnergyHistoryCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"""Class to manage fetching energy site info from the Teslemetry API."""
updated_once: bool
def __init__(self, hass: HomeAssistant, api: EnergySpecific) -> None:
"""Initialize Teslemetry Energy Info coordinator."""
super().__init__(
hass,
LOGGER,
name=f"Teslemetry Energy History {api.energy_site_id}",
update_interval=ENERGY_HISTORY_INTERVAL,
)
self.api = api
async def _async_update_data(self) -> dict[str, Any]:
"""Update energy site data using Teslemetry API."""
try:
data = (await self.api.energy_history(TeslaEnergyPeriod.DAY))["response"]
except (InvalidToken, Forbidden, SubscriptionRequired) as e:
raise ConfigEntryAuthFailed from e
except TeslaFleetError as e:
raise UpdateFailed(e.message) from e
self.updated_once = True
# Add all time periods together
output = {key: 0 for key in ENERGY_HISTORY_FIELDS}
for period in data.get("time_series", []):
for key in ENERGY_HISTORY_FIELDS:
output[key] += period.get(key, 0)
return output