core/homeassistant/components/mobile_app/notify.py

205 lines
6.5 KiB
Python

"""Support for mobile_app push notifications."""
from __future__ import annotations
import asyncio
from functools import partial
from http import HTTPStatus
import logging
import aiohttp
from homeassistant.components.notify import (
ATTR_DATA,
ATTR_MESSAGE,
ATTR_TARGET,
ATTR_TITLE,
ATTR_TITLE_DEFAULT,
BaseNotificationService,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
import homeassistant.util.dt as dt_util
from .const import (
ATTR_APP_DATA,
ATTR_APP_ID,
ATTR_APP_VERSION,
ATTR_DEVICE_NAME,
ATTR_OS_VERSION,
ATTR_PUSH_RATE_LIMITS,
ATTR_PUSH_RATE_LIMITS_ERRORS,
ATTR_PUSH_RATE_LIMITS_MAXIMUM,
ATTR_PUSH_RATE_LIMITS_RESETS_AT,
ATTR_PUSH_RATE_LIMITS_SUCCESSFUL,
ATTR_PUSH_TOKEN,
ATTR_PUSH_URL,
ATTR_WEBHOOK_ID,
DATA_CONFIG_ENTRIES,
DATA_NOTIFY,
DATA_PUSH_CHANNEL,
DOMAIN,
)
from .util import supports_push
_LOGGER = logging.getLogger(__name__)
def push_registrations(hass):
"""Return a dictionary of push enabled registrations."""
targets = {}
for webhook_id, entry in hass.data[DOMAIN][DATA_CONFIG_ENTRIES].items():
if not supports_push(hass, webhook_id):
continue
targets[entry.data[ATTR_DEVICE_NAME]] = webhook_id
return targets
def log_rate_limits(hass, device_name, resp, level=logging.INFO):
"""Output rate limit log line at given level."""
if ATTR_PUSH_RATE_LIMITS not in resp:
return
rate_limits = resp[ATTR_PUSH_RATE_LIMITS]
resetsAt = rate_limits[ATTR_PUSH_RATE_LIMITS_RESETS_AT]
resetsAtTime = dt_util.parse_datetime(resetsAt) - dt_util.utcnow()
rate_limit_msg = (
"mobile_app push notification rate limits for %s: "
"%d sent, %d allowed, %d errors, "
"resets in %s"
)
_LOGGER.log(
level,
rate_limit_msg,
device_name,
rate_limits[ATTR_PUSH_RATE_LIMITS_SUCCESSFUL],
rate_limits[ATTR_PUSH_RATE_LIMITS_MAXIMUM],
rate_limits[ATTR_PUSH_RATE_LIMITS_ERRORS],
str(resetsAtTime).split(".", maxsplit=1)[0],
)
async def async_get_service(
hass: HomeAssistant,
config: ConfigType,
discovery_info: DiscoveryInfoType | None = None,
) -> MobileAppNotificationService:
"""Get the mobile_app notification service."""
service = hass.data[DOMAIN][DATA_NOTIFY] = MobileAppNotificationService(hass)
return service
class MobileAppNotificationService(BaseNotificationService):
"""Implement the notification service for mobile_app."""
def __init__(self, hass):
"""Initialize the service."""
self._hass = hass
@property
def targets(self):
"""Return a dictionary of registered targets."""
return push_registrations(self.hass)
async def async_send_message(self, message="", **kwargs):
"""Send a message to the Lambda APNS gateway."""
data = {ATTR_MESSAGE: message}
# Remove default title from notifications.
if (
kwargs.get(ATTR_TITLE) is not None
and kwargs.get(ATTR_TITLE) != ATTR_TITLE_DEFAULT
):
data[ATTR_TITLE] = kwargs.get(ATTR_TITLE)
if not (targets := kwargs.get(ATTR_TARGET)):
targets = push_registrations(self.hass).values()
if kwargs.get(ATTR_DATA) is not None:
data[ATTR_DATA] = kwargs.get(ATTR_DATA)
local_push_channels = self.hass.data[DOMAIN][DATA_PUSH_CHANNEL]
for target in targets:
registration = self.hass.data[DOMAIN][DATA_CONFIG_ENTRIES][target].data
if target in local_push_channels:
local_push_channels[target].async_send_notification(
data,
partial(
self._async_send_remote_message_target, target, registration
),
)
continue
# Test if local push only.
if ATTR_PUSH_URL not in registration[ATTR_APP_DATA]:
raise HomeAssistantError(
"Device not connected to local push notifications"
)
await self._async_send_remote_message_target(target, registration, data)
async def _async_send_remote_message_target(self, target, registration, data):
"""Send a message to a target."""
app_data = registration[ATTR_APP_DATA]
push_token = app_data[ATTR_PUSH_TOKEN]
push_url = app_data[ATTR_PUSH_URL]
target_data = dict(data)
target_data[ATTR_PUSH_TOKEN] = push_token
reg_info = {
ATTR_APP_ID: registration[ATTR_APP_ID],
ATTR_APP_VERSION: registration[ATTR_APP_VERSION],
ATTR_WEBHOOK_ID: target,
}
if ATTR_OS_VERSION in registration:
reg_info[ATTR_OS_VERSION] = registration[ATTR_OS_VERSION]
target_data["registration_info"] = reg_info
try:
async with asyncio.timeout(10):
response = await async_get_clientsession(self._hass).post(
push_url, json=target_data
)
result = await response.json()
if response.status in (
HTTPStatus.OK,
HTTPStatus.CREATED,
HTTPStatus.ACCEPTED,
):
log_rate_limits(self.hass, registration[ATTR_DEVICE_NAME], result)
return
fallback_error = result.get("errorMessage", "Unknown error")
fallback_message = (
f"Internal server error, please try again later: {fallback_error}"
)
message = result.get("message", fallback_message)
if "message" in result:
if message[-1] not in [".", "?", "!"]:
message += "."
message += " This message is generated externally to Home Assistant."
if response.status == HTTPStatus.TOO_MANY_REQUESTS:
_LOGGER.warning(message)
log_rate_limits(
self.hass, registration[ATTR_DEVICE_NAME], result, logging.WARNING
)
else:
_LOGGER.error(message)
except TimeoutError:
_LOGGER.error("Timeout sending notification to %s", push_url)
except aiohttp.ClientError as err:
_LOGGER.error("Error sending notification to %s: %r", push_url, err)