core/homeassistant/components/opensky/coordinator.py

120 lines
4.0 KiB
Python

"""DataUpdateCoordinator for the OpenSky integration."""
from __future__ import annotations
from datetime import timedelta
from python_opensky import OpenSky, OpenSkyError, StateVector
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
ATTR_LATITUDE,
ATTR_LONGITUDE,
CONF_LATITUDE,
CONF_LONGITUDE,
CONF_RADIUS,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import (
ATTR_ALTITUDE,
ATTR_CALLSIGN,
ATTR_ICAO24,
ATTR_SENSOR,
CONF_ALTITUDE,
DEFAULT_ALTITUDE,
DOMAIN,
EVENT_OPENSKY_ENTRY,
EVENT_OPENSKY_EXIT,
LOGGER,
)
class OpenSkyDataUpdateCoordinator(DataUpdateCoordinator[int]):
"""An OpenSky Data Update Coordinator."""
config_entry: ConfigEntry
def __init__(self, hass: HomeAssistant, opensky: OpenSky) -> None:
"""Initialize the OpenSky data coordinator."""
super().__init__(
hass,
LOGGER,
name=DOMAIN,
update_interval={
True: timedelta(seconds=90),
False: timedelta(minutes=15),
}.get(opensky.is_authenticated),
)
self._opensky = opensky
self._previously_tracked: set[str] | None = None
self._bounding_box = OpenSky.get_bounding_box(
self.config_entry.data[CONF_LATITUDE],
self.config_entry.data[CONF_LONGITUDE],
self.config_entry.options[CONF_RADIUS],
)
self._altitude = self.config_entry.options.get(CONF_ALTITUDE, DEFAULT_ALTITUDE)
async def _async_update_data(self) -> int:
try:
response = await self._opensky.get_states(bounding_box=self._bounding_box)
except OpenSkyError as exc:
raise UpdateFailed from exc
currently_tracked = set()
flight_metadata: dict[str, StateVector] = {}
for flight in response.states:
if not flight.callsign:
continue
callsign = flight.callsign.strip()
if callsign:
flight_metadata[callsign] = flight
else:
continue
if (
flight.longitude is None
or flight.latitude is None
or flight.on_ground
or flight.barometric_altitude is None
):
continue
altitude = flight.barometric_altitude
if altitude > self._altitude and self._altitude != 0:
continue
currently_tracked.add(callsign)
if self._previously_tracked is not None:
entries = currently_tracked - self._previously_tracked
exits = self._previously_tracked - currently_tracked
self._handle_boundary(entries, EVENT_OPENSKY_ENTRY, flight_metadata)
self._handle_boundary(exits, EVENT_OPENSKY_EXIT, flight_metadata)
self._previously_tracked = currently_tracked
return len(currently_tracked)
def _handle_boundary(
self, flights: set[str], event: str, metadata: dict[str, StateVector]
) -> None:
"""Handle flights crossing region boundary."""
for flight in flights:
if flight in metadata:
altitude = metadata[flight].barometric_altitude
longitude = metadata[flight].longitude
latitude = metadata[flight].latitude
icao24 = metadata[flight].icao24
else:
# Assume Flight has landed if missing.
altitude = 0
longitude = None
latitude = None
icao24 = None
data = {
ATTR_CALLSIGN: flight,
ATTR_ALTITUDE: altitude,
ATTR_SENSOR: self.config_entry.title,
ATTR_LONGITUDE: longitude,
ATTR_LATITUDE: latitude,
ATTR_ICAO24: icao24,
}
self.hass.bus.fire(event, data)