mirror of https://github.com/home-assistant/core
157 lines
5.2 KiB
Python
157 lines
5.2 KiB
Python
"""Importer for the Elvia integration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta
|
|
from typing import TYPE_CHECKING, cast
|
|
|
|
from elvia import Elvia, error as ElviaError
|
|
|
|
from homeassistant.components.recorder.models import StatisticData, StatisticMetaData
|
|
from homeassistant.components.recorder.statistics import (
|
|
async_add_external_statistics,
|
|
get_last_statistics,
|
|
statistics_during_period,
|
|
)
|
|
from homeassistant.components.recorder.util import get_instance
|
|
from homeassistant.const import UnitOfEnergy
|
|
from homeassistant.util import dt as dt_util
|
|
|
|
from .const import DOMAIN, LOGGER
|
|
|
|
if TYPE_CHECKING:
|
|
from elvia.types.meter_value_types import MeterValueTimeSeries
|
|
|
|
from homeassistant.core import HomeAssistant
|
|
|
|
|
|
class ElviaImporter:
|
|
"""Class to import data from Elvia."""
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
api_token: str,
|
|
metering_point_id: str,
|
|
) -> None:
|
|
"""Initialize."""
|
|
self.hass = hass
|
|
self.client = Elvia(meter_value_token=api_token).meter_value()
|
|
self.metering_point_id = metering_point_id
|
|
|
|
async def _fetch_hourly_data(
|
|
self,
|
|
since: datetime,
|
|
until: datetime,
|
|
) -> list[MeterValueTimeSeries]:
|
|
"""Fetch hourly data."""
|
|
start_time = since.isoformat()
|
|
end_time = until.isoformat()
|
|
LOGGER.debug("Fetching hourly data %s - %s", start_time, end_time)
|
|
all_data = await self.client.get_meter_values(
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
metering_point_ids=[self.metering_point_id],
|
|
)
|
|
return all_data["meteringpoints"][0]["metervalue"]["timeSeries"]
|
|
|
|
async def import_meter_values(self) -> None:
|
|
"""Import meter values."""
|
|
statistics: list[StatisticData] = []
|
|
statistic_id = f"{DOMAIN}:{self.metering_point_id}_consumption"
|
|
last_stats = await get_instance(self.hass).async_add_executor_job(
|
|
get_last_statistics,
|
|
self.hass,
|
|
1,
|
|
statistic_id,
|
|
True,
|
|
{"sum"},
|
|
)
|
|
|
|
if not last_stats:
|
|
# First time we insert 3 years of data (if available)
|
|
hourly_data: list[MeterValueTimeSeries] = []
|
|
until = dt_util.utcnow()
|
|
for year in (3, 2, 1):
|
|
try:
|
|
year_hours = await self._fetch_hourly_data(
|
|
since=until - timedelta(days=365 * year),
|
|
until=until - timedelta(days=365 * (year - 1)),
|
|
)
|
|
except ElviaError.ElviaException:
|
|
# This will raise if the contract have no data for the
|
|
# year, we can safely ignore this
|
|
continue
|
|
hourly_data.extend(year_hours)
|
|
|
|
if hourly_data is None or len(hourly_data) == 0:
|
|
LOGGER.error("No data available for the metering point")
|
|
return
|
|
last_stats_time = None
|
|
_sum = 0.0
|
|
else:
|
|
try:
|
|
hourly_data = await self._fetch_hourly_data(
|
|
since=dt_util.utc_from_timestamp(
|
|
last_stats[statistic_id][0]["end"]
|
|
),
|
|
until=dt_util.utcnow(),
|
|
)
|
|
except ElviaError.ElviaException as err:
|
|
LOGGER.error("Error fetching data: %s", err)
|
|
return
|
|
|
|
if (
|
|
hourly_data is None
|
|
or len(hourly_data) == 0
|
|
or not hourly_data[-1]["verified"]
|
|
or (from_time := dt_util.parse_datetime(hourly_data[0]["startTime"]))
|
|
is None
|
|
):
|
|
return
|
|
|
|
curr_stat = await get_instance(self.hass).async_add_executor_job(
|
|
statistics_during_period,
|
|
self.hass,
|
|
from_time - timedelta(hours=1),
|
|
None,
|
|
{statistic_id},
|
|
"hour",
|
|
None,
|
|
{"sum"},
|
|
)
|
|
first_stat = curr_stat[statistic_id][0]
|
|
_sum = cast(float, first_stat["sum"])
|
|
last_stats_time = first_stat["start"]
|
|
|
|
last_stats_time_dt = (
|
|
dt_util.utc_from_timestamp(last_stats_time) if last_stats_time else None
|
|
)
|
|
|
|
for entry in hourly_data:
|
|
from_time = dt_util.parse_datetime(entry["startTime"])
|
|
if from_time is None or (
|
|
last_stats_time_dt is not None and from_time <= last_stats_time_dt
|
|
):
|
|
continue
|
|
|
|
_sum += entry["value"]
|
|
|
|
statistics.append(
|
|
StatisticData(start=from_time, state=entry["value"], sum=_sum)
|
|
)
|
|
|
|
async_add_external_statistics(
|
|
hass=self.hass,
|
|
metadata=StatisticMetaData(
|
|
has_mean=False,
|
|
has_sum=True,
|
|
name=f"{self.metering_point_id} Consumption",
|
|
source=DOMAIN,
|
|
statistic_id=statistic_id,
|
|
unit_of_measurement=UnitOfEnergy.KILO_WATT_HOUR,
|
|
),
|
|
statistics=statistics,
|
|
)
|
|
LOGGER.debug("Imported %s statistics", len(statistics))
|