mirror of https://github.com/home-assistant/core
205 lines
6.9 KiB
Python
205 lines
6.9 KiB
Python
"""Teslemetry Data Coordinator."""
|
|
|
|
from datetime import datetime, timedelta
|
|
from typing import Any
|
|
|
|
from tesla_fleet_api import EnergySpecific, VehicleSpecific
|
|
from tesla_fleet_api.const import TeslaEnergyPeriod, VehicleDataEndpoint
|
|
from tesla_fleet_api.exceptions import (
|
|
Forbidden,
|
|
InvalidToken,
|
|
SubscriptionRequired,
|
|
TeslaFleetError,
|
|
VehicleOffline,
|
|
)
|
|
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.exceptions import ConfigEntryAuthFailed
|
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
|
|
|
from .const import ENERGY_HISTORY_FIELDS, LOGGER, TeslemetryState
|
|
from .helpers import flatten
|
|
|
|
VEHICLE_INTERVAL = timedelta(seconds=30)
|
|
VEHICLE_WAIT = timedelta(minutes=15)
|
|
ENERGY_LIVE_INTERVAL = timedelta(seconds=30)
|
|
ENERGY_INFO_INTERVAL = timedelta(seconds=30)
|
|
ENERGY_HISTORY_INTERVAL = timedelta(seconds=60)
|
|
|
|
ENDPOINTS = [
|
|
VehicleDataEndpoint.CHARGE_STATE,
|
|
VehicleDataEndpoint.CLIMATE_STATE,
|
|
VehicleDataEndpoint.DRIVE_STATE,
|
|
VehicleDataEndpoint.LOCATION_DATA,
|
|
VehicleDataEndpoint.VEHICLE_STATE,
|
|
VehicleDataEndpoint.VEHICLE_CONFIG,
|
|
]
|
|
|
|
|
|
class TeslemetryVehicleDataCoordinator(DataUpdateCoordinator[dict[str, Any]]):
|
|
"""Class to manage fetching data from the Teslemetry API."""
|
|
|
|
updated_once: bool
|
|
last_active: datetime
|
|
|
|
def __init__(
|
|
self, hass: HomeAssistant, api: VehicleSpecific, product: dict
|
|
) -> None:
|
|
"""Initialize Teslemetry Vehicle Update Coordinator."""
|
|
super().__init__(
|
|
hass,
|
|
LOGGER,
|
|
name="Teslemetry Vehicle",
|
|
update_interval=VEHICLE_INTERVAL,
|
|
)
|
|
self.api = api
|
|
self.data = flatten(product)
|
|
self.updated_once = False
|
|
self.last_active = datetime.now()
|
|
|
|
async def _async_update_data(self) -> dict[str, Any]:
|
|
"""Update vehicle data using Teslemetry API."""
|
|
|
|
self.update_interval = VEHICLE_INTERVAL
|
|
|
|
try:
|
|
if self.data["state"] != TeslemetryState.ONLINE:
|
|
response = await self.api.vehicle()
|
|
self.data["state"] = response["response"]["state"]
|
|
|
|
if self.data["state"] != TeslemetryState.ONLINE:
|
|
return self.data
|
|
|
|
response = await self.api.vehicle_data(endpoints=ENDPOINTS)
|
|
data = response["response"]
|
|
|
|
except VehicleOffline:
|
|
self.data["state"] = TeslemetryState.OFFLINE
|
|
return self.data
|
|
except InvalidToken as e:
|
|
raise ConfigEntryAuthFailed from e
|
|
except SubscriptionRequired as e:
|
|
raise ConfigEntryAuthFailed from e
|
|
except TeslaFleetError as e:
|
|
raise UpdateFailed(e.message) from e
|
|
|
|
self.updated_once = True
|
|
|
|
if self.api.pre2021 and data["state"] == TeslemetryState.ONLINE:
|
|
# Handle pre-2021 vehicles which cannot sleep by themselves
|
|
if (
|
|
data["charge_state"].get("charging_state") == "Charging"
|
|
or data["vehicle_state"].get("is_user_present")
|
|
or data["vehicle_state"].get("sentry_mode")
|
|
):
|
|
# Vehicle is active, reset timer
|
|
self.last_active = datetime.now()
|
|
else:
|
|
elapsed = datetime.now() - self.last_active
|
|
if elapsed > timedelta(minutes=20):
|
|
# Vehicle didn't sleep, try again in 15 minutes
|
|
self.last_active = datetime.now()
|
|
elif elapsed > timedelta(minutes=15):
|
|
# Let vehicle go to sleep now
|
|
self.update_interval = VEHICLE_WAIT
|
|
|
|
return flatten(data)
|
|
|
|
|
|
class TeslemetryEnergySiteLiveCoordinator(DataUpdateCoordinator[dict[str, Any]]):
|
|
"""Class to manage fetching energy site live status from the Teslemetry API."""
|
|
|
|
updated_once: bool
|
|
|
|
def __init__(self, hass: HomeAssistant, api: EnergySpecific) -> None:
|
|
"""Initialize Teslemetry Energy Site Live coordinator."""
|
|
super().__init__(
|
|
hass,
|
|
LOGGER,
|
|
name="Teslemetry Energy Site Live",
|
|
update_interval=ENERGY_LIVE_INTERVAL,
|
|
)
|
|
self.api = api
|
|
|
|
async def _async_update_data(self) -> dict[str, Any]:
|
|
"""Update energy site data using Teslemetry API."""
|
|
|
|
try:
|
|
data = (await self.api.live_status())["response"]
|
|
except (InvalidToken, Forbidden, SubscriptionRequired) as e:
|
|
raise ConfigEntryAuthFailed from e
|
|
except TeslaFleetError as e:
|
|
raise UpdateFailed(e.message) from e
|
|
|
|
# Convert Wall Connectors from array to dict
|
|
data["wall_connectors"] = {
|
|
wc["din"]: wc for wc in (data.get("wall_connectors") or [])
|
|
}
|
|
|
|
return data
|
|
|
|
|
|
class TeslemetryEnergySiteInfoCoordinator(DataUpdateCoordinator[dict[str, Any]]):
|
|
"""Class to manage fetching energy site info from the Teslemetry API."""
|
|
|
|
updated_once: bool
|
|
|
|
def __init__(self, hass: HomeAssistant, api: EnergySpecific, product: dict) -> None:
|
|
"""Initialize Teslemetry Energy Info coordinator."""
|
|
super().__init__(
|
|
hass,
|
|
LOGGER,
|
|
name="Teslemetry Energy Site Info",
|
|
update_interval=ENERGY_INFO_INTERVAL,
|
|
)
|
|
self.api = api
|
|
self.data = product
|
|
|
|
async def _async_update_data(self) -> dict[str, Any]:
|
|
"""Update energy site data using Teslemetry API."""
|
|
|
|
try:
|
|
data = (await self.api.site_info())["response"]
|
|
except (InvalidToken, Forbidden, SubscriptionRequired) as e:
|
|
raise ConfigEntryAuthFailed from e
|
|
except TeslaFleetError as e:
|
|
raise UpdateFailed(e.message) from e
|
|
|
|
return flatten(data)
|
|
|
|
|
|
class TeslemetryEnergyHistoryCoordinator(DataUpdateCoordinator[dict[str, Any]]):
|
|
"""Class to manage fetching energy site info from the Teslemetry API."""
|
|
|
|
updated_once: bool
|
|
|
|
def __init__(self, hass: HomeAssistant, api: EnergySpecific) -> None:
|
|
"""Initialize Teslemetry Energy Info coordinator."""
|
|
super().__init__(
|
|
hass,
|
|
LOGGER,
|
|
name=f"Teslemetry Energy History {api.energy_site_id}",
|
|
update_interval=ENERGY_HISTORY_INTERVAL,
|
|
)
|
|
self.api = api
|
|
|
|
async def _async_update_data(self) -> dict[str, Any]:
|
|
"""Update energy site data using Teslemetry API."""
|
|
|
|
try:
|
|
data = (await self.api.energy_history(TeslaEnergyPeriod.DAY))["response"]
|
|
except (InvalidToken, Forbidden, SubscriptionRequired) as e:
|
|
raise ConfigEntryAuthFailed from e
|
|
except TeslaFleetError as e:
|
|
raise UpdateFailed(e.message) from e
|
|
|
|
self.updated_once = True
|
|
|
|
# Add all time periods together
|
|
output = {key: 0 for key in ENERGY_HISTORY_FIELDS}
|
|
for period in data.get("time_series", []):
|
|
for key in ENERGY_HISTORY_FIELDS:
|
|
output[key] += period.get(key, 0)
|
|
|
|
return output
|