core/homeassistant/components/local_calendar/calendar.py

226 lines
7.7 KiB
Python

"""Calendar platform for a Local Calendar."""
from __future__ import annotations
import asyncio
from datetime import date, datetime, timedelta
import logging
from typing import Any
from ical.calendar import Calendar
from ical.calendar_stream import IcsCalendarStream
from ical.event import Event
from ical.exceptions import CalendarParseError
from ical.store import EventStore, EventStoreError
from ical.types import Range, Recur
import voluptuous as vol
from homeassistant.components.calendar import (
EVENT_END,
EVENT_RRULE,
EVENT_START,
CalendarEntity,
CalendarEntityFeature,
CalendarEvent,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.util import dt as dt_util
from .const import CONF_CALENDAR_NAME, DOMAIN
from .store import LocalCalendarStore
_LOGGER = logging.getLogger(__name__)
PRODID = "-//homeassistant.io//local_calendar 1.0//EN"
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the local calendar platform."""
store = hass.data[DOMAIN][config_entry.entry_id]
ics = await store.async_load()
calendar: Calendar = await hass.async_add_executor_job(
IcsCalendarStream.calendar_from_ics, ics
)
calendar.prodid = PRODID
name = config_entry.data[CONF_CALENDAR_NAME]
entity = LocalCalendarEntity(store, calendar, name, unique_id=config_entry.entry_id)
async_add_entities([entity], True)
class LocalCalendarEntity(CalendarEntity):
"""A calendar entity backed by a local iCalendar file."""
_attr_has_entity_name = True
_attr_supported_features = (
CalendarEntityFeature.CREATE_EVENT
| CalendarEntityFeature.DELETE_EVENT
| CalendarEntityFeature.UPDATE_EVENT
)
def __init__(
self,
store: LocalCalendarStore,
calendar: Calendar,
name: str,
unique_id: str,
) -> None:
"""Initialize LocalCalendarEntity."""
self._store = store
self._calendar = calendar
self._calendar_lock = asyncio.Lock()
self._event: CalendarEvent | None = None
self._attr_name = name
self._attr_unique_id = unique_id
@property
def event(self) -> CalendarEvent | None:
"""Return the next upcoming event."""
return self._event
async def async_get_events(
self, hass: HomeAssistant, start_date: datetime, end_date: datetime
) -> list[CalendarEvent]:
"""Get all events in a specific time frame."""
events = self._calendar.timeline_tz(start_date.tzinfo).overlapping(
start_date,
end_date,
)
return [_get_calendar_event(event) for event in events]
async def async_update(self) -> None:
"""Update entity state with the next upcoming event."""
now = dt_util.now()
events = self._calendar.timeline_tz(now.tzinfo).active_after(now)
if event := next(events, None):
self._event = _get_calendar_event(event)
else:
self._event = None
async def _async_store(self) -> None:
"""Persist the calendar to disk."""
content = IcsCalendarStream.calendar_to_ics(self._calendar)
await self._store.async_store(content)
async def async_create_event(self, **kwargs: Any) -> None:
"""Add a new event to calendar."""
event = _parse_event(kwargs)
async with self._calendar_lock:
event_store = EventStore(self._calendar)
await self.hass.async_add_executor_job(event_store.add, event)
await self._async_store()
await self.async_update_ha_state(force_refresh=True)
async def async_delete_event(
self,
uid: str,
recurrence_id: str | None = None,
recurrence_range: str | None = None,
) -> None:
"""Delete an event on the calendar."""
range_value: Range = Range.NONE
if recurrence_range == Range.THIS_AND_FUTURE:
range_value = Range.THIS_AND_FUTURE
async with self._calendar_lock:
try:
EventStore(self._calendar).delete(
uid,
recurrence_id=recurrence_id,
recurrence_range=range_value,
)
except EventStoreError as err:
raise HomeAssistantError(f"Error while deleting event: {err}") from err
await self._async_store()
await self.async_update_ha_state(force_refresh=True)
async def async_update_event(
self,
uid: str,
event: dict[str, Any],
recurrence_id: str | None = None,
recurrence_range: str | None = None,
) -> None:
"""Update an existing event on the calendar."""
new_event = _parse_event(event)
range_value: Range = Range.NONE
if recurrence_range == Range.THIS_AND_FUTURE:
range_value = Range.THIS_AND_FUTURE
async with self._calendar_lock:
event_store = EventStore(self._calendar)
def apply_edit() -> None:
event_store.edit(
uid,
new_event,
recurrence_id=recurrence_id,
recurrence_range=range_value,
)
try:
await self.hass.async_add_executor_job(apply_edit)
except EventStoreError as err:
raise HomeAssistantError(f"Error while updating event: {err}") from err
await self._async_store()
await self.async_update_ha_state(force_refresh=True)
def _parse_event(event: dict[str, Any]) -> Event:
"""Parse an ical event from a home assistant event dictionary."""
if rrule := event.get(EVENT_RRULE):
event[EVENT_RRULE] = Recur.from_rrule(rrule)
# This function is called with new events created in the local timezone,
# however ical library does not properly return recurrence_ids for
# start dates with a timezone. For now, ensure any datetime is stored as a
# floating local time to ensure we still apply proper local timezone rules.
# This can be removed when ical is updated with a new recurrence_id format
# https://github.com/home-assistant/core/issues/87759
for key in (EVENT_START, EVENT_END):
if (
(value := event[key])
and isinstance(value, datetime)
and value.tzinfo is not None
):
event[key] = dt_util.as_local(value).replace(tzinfo=None)
try:
return Event(**event)
except CalendarParseError as err:
_LOGGER.debug("Error parsing event input fields: %s (%s)", event, str(err))
raise vol.Invalid("Error parsing event input fields") from err
def _get_calendar_event(event: Event) -> CalendarEvent:
"""Return a CalendarEvent from an API event."""
start: datetime | date
end: datetime | date
if isinstance(event.start, datetime) and isinstance(event.end, datetime):
start = dt_util.as_local(event.start)
end = dt_util.as_local(event.end)
if (end - start) <= timedelta(seconds=0):
end = start + timedelta(minutes=30)
else:
start = event.start
end = event.end
if (end - start) < timedelta(days=0):
end = start + timedelta(days=1)
return CalendarEvent(
summary=event.summary,
start=start,
end=end,
description=event.description,
uid=event.uid,
rrule=event.rrule.as_rrule_str() if event.rrule else None,
recurrence_id=event.recurrence_id,
location=event.location,
)