mirror of https://github.com/home-assistant/core
190 lines
6.0 KiB
Python
190 lines
6.0 KiB
Python
"""Holiday Calendar."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
from holidays import HolidayBase, country_holidays
|
|
|
|
from homeassistant.components.calendar import CalendarEntity, CalendarEvent
|
|
from homeassistant.config_entries import ConfigEntry
|
|
from homeassistant.const import CONF_COUNTRY
|
|
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
|
|
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
|
|
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
|
from homeassistant.helpers.event import async_track_point_in_utc_time
|
|
from homeassistant.util import dt as dt_util
|
|
|
|
from .const import CONF_PROVINCE, DOMAIN
|
|
|
|
|
|
def _get_obj_holidays_and_language(
|
|
country: str, province: str | None, language: str
|
|
) -> tuple[HolidayBase, str]:
|
|
"""Get the object for the requested country and year."""
|
|
obj_holidays = country_holidays(
|
|
country,
|
|
subdiv=province,
|
|
years={dt_util.now().year, dt_util.now().year + 1},
|
|
language=language,
|
|
)
|
|
if language == "en":
|
|
for lang in obj_holidays.supported_languages:
|
|
if lang.startswith("en"):
|
|
obj_holidays = country_holidays(
|
|
country,
|
|
subdiv=province,
|
|
years={dt_util.now().year, dt_util.now().year + 1},
|
|
language=lang,
|
|
)
|
|
language = lang
|
|
break
|
|
if (
|
|
obj_holidays.supported_languages
|
|
and language not in obj_holidays.supported_languages
|
|
and (default_language := obj_holidays.default_language)
|
|
):
|
|
obj_holidays = country_holidays(
|
|
country,
|
|
subdiv=province,
|
|
years={dt_util.now().year, dt_util.now().year + 1},
|
|
language=default_language,
|
|
)
|
|
language = default_language
|
|
|
|
return (obj_holidays, language)
|
|
|
|
|
|
async def async_setup_entry(
|
|
hass: HomeAssistant,
|
|
config_entry: ConfigEntry,
|
|
async_add_entities: AddEntitiesCallback,
|
|
) -> None:
|
|
"""Set up the Holiday Calendar config entry."""
|
|
country: str = config_entry.data[CONF_COUNTRY]
|
|
province: str | None = config_entry.data.get(CONF_PROVINCE)
|
|
language = hass.config.language
|
|
|
|
obj_holidays, language = await hass.async_add_executor_job(
|
|
_get_obj_holidays_and_language, country, province, language
|
|
)
|
|
|
|
async_add_entities(
|
|
[
|
|
HolidayCalendarEntity(
|
|
config_entry.title,
|
|
country,
|
|
province,
|
|
language,
|
|
obj_holidays,
|
|
config_entry.entry_id,
|
|
)
|
|
],
|
|
True,
|
|
)
|
|
|
|
|
|
class HolidayCalendarEntity(CalendarEntity):
|
|
"""Representation of a Holiday Calendar element."""
|
|
|
|
_attr_has_entity_name = True
|
|
_attr_name = None
|
|
_attr_event: CalendarEvent | None = None
|
|
_attr_should_poll = False
|
|
unsub: CALLBACK_TYPE | None = None
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
country: str,
|
|
province: str | None,
|
|
language: str,
|
|
obj_holidays: HolidayBase,
|
|
unique_id: str,
|
|
) -> None:
|
|
"""Initialize HolidayCalendarEntity."""
|
|
self._country = country
|
|
self._province = province
|
|
self._location = name
|
|
self._language = language
|
|
self._attr_unique_id = unique_id
|
|
self._attr_device_info = DeviceInfo(
|
|
identifiers={(DOMAIN, unique_id)},
|
|
entry_type=DeviceEntryType.SERVICE,
|
|
name=name,
|
|
)
|
|
self._obj_holidays = obj_holidays
|
|
|
|
def get_next_interval(self, now: datetime) -> datetime:
|
|
"""Compute next time an update should occur."""
|
|
tomorrow = dt_util.as_local(now) + timedelta(days=1)
|
|
return dt_util.start_of_local_day(tomorrow)
|
|
|
|
def _update_state_and_setup_listener(self) -> None:
|
|
"""Update state and setup listener for next interval."""
|
|
now = dt_util.now()
|
|
self._attr_event = self.update_event(now)
|
|
self.unsub = async_track_point_in_utc_time(
|
|
self.hass, self.point_in_time_listener, self.get_next_interval(now)
|
|
)
|
|
|
|
@callback
|
|
def point_in_time_listener(self, time_date: datetime) -> None:
|
|
"""Get the latest data and update state."""
|
|
self._update_state_and_setup_listener()
|
|
self.async_write_ha_state()
|
|
|
|
async def async_added_to_hass(self) -> None:
|
|
"""Set up first update."""
|
|
self._update_state_and_setup_listener()
|
|
|
|
def update_event(self, now: datetime) -> CalendarEvent | None:
|
|
"""Return the next upcoming event."""
|
|
next_holiday = None
|
|
for holiday_date, holiday_name in sorted(
|
|
self._obj_holidays.items(), key=lambda x: x[0]
|
|
):
|
|
if holiday_date >= now.date():
|
|
next_holiday = (holiday_date, holiday_name)
|
|
break
|
|
|
|
if next_holiday is None:
|
|
return None
|
|
|
|
return CalendarEvent(
|
|
summary=next_holiday[1],
|
|
start=next_holiday[0],
|
|
end=next_holiday[0],
|
|
location=self._location,
|
|
)
|
|
|
|
@property
|
|
def event(self) -> CalendarEvent | None:
|
|
"""Return the next upcoming event."""
|
|
return self._attr_event
|
|
|
|
async def async_get_events(
|
|
self, hass: HomeAssistant, start_date: datetime, end_date: datetime
|
|
) -> list[CalendarEvent]:
|
|
"""Get all events in a specific time frame."""
|
|
obj_holidays = country_holidays(
|
|
self._country,
|
|
subdiv=self._province,
|
|
years=list({start_date.year, end_date.year}),
|
|
language=self._language,
|
|
)
|
|
|
|
event_list: list[CalendarEvent] = []
|
|
|
|
for holiday_date, holiday_name in obj_holidays.items():
|
|
if start_date.date() <= holiday_date <= end_date.date():
|
|
event = CalendarEvent(
|
|
summary=holiday_name,
|
|
start=holiday_date,
|
|
end=holiday_date,
|
|
location=self._location,
|
|
)
|
|
event_list.append(event)
|
|
|
|
return event_list
|