core/homeassistant/components/discord/notify.py

201 lines
6.9 KiB
Python

"""Discord platform for notify component."""
from __future__ import annotations
from io import BytesIO
import logging
import os.path
from typing import Any, cast
import aiohttp
import nextcord
from nextcord.abc import Messageable
from homeassistant.components.notify import (
ATTR_DATA,
ATTR_TARGET,
BaseNotificationService,
)
from homeassistant.const import CONF_API_TOKEN
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
_LOGGER = logging.getLogger(__name__)
ATTR_EMBED = "embed"
ATTR_EMBED_AUTHOR = "author"
ATTR_EMBED_COLOR = "color"
ATTR_EMBED_DESCRIPTION = "description"
ATTR_EMBED_FIELDS = "fields"
ATTR_EMBED_FOOTER = "footer"
ATTR_EMBED_TITLE = "title"
ATTR_EMBED_THUMBNAIL = "thumbnail"
ATTR_EMBED_IMAGE = "image"
ATTR_EMBED_URL = "url"
ATTR_IMAGES = "images"
ATTR_URLS = "urls"
ATTR_VERIFY_SSL = "verify_ssl"
MAX_ALLOWED_DOWNLOAD_SIZE_BYTES = 8000000
async def async_get_service(
hass: HomeAssistant,
config: ConfigType,
discovery_info: DiscoveryInfoType | None = None,
) -> DiscordNotificationService | None:
"""Get the Discord notification service."""
if discovery_info is None:
return None
return DiscordNotificationService(hass, discovery_info[CONF_API_TOKEN])
class DiscordNotificationService(BaseNotificationService):
"""Implement the notification service for Discord."""
def __init__(self, hass: HomeAssistant, token: str) -> None:
"""Initialize the service."""
self.token = token
self.hass = hass
def file_exists(self, filename: str) -> bool:
"""Check if a file exists on disk and is in authorized path."""
if not self.hass.config.is_allowed_path(filename):
_LOGGER.warning("Path not allowed: %s", filename)
return False
if not os.path.isfile(filename):
_LOGGER.warning("Not a file: %s", filename)
return False
return True
async def async_get_file_from_url(
self, url: str, verify_ssl: bool, max_file_size: int
) -> bytearray | None:
"""Retrieve file bytes from URL."""
if not self.hass.config.is_allowed_external_url(url):
_LOGGER.error("URL not allowed: %s", url)
return None
session = async_get_clientsession(self.hass)
async with session.get(
url,
ssl=verify_ssl,
timeout=aiohttp.ClientTimeout(total=30),
raise_for_status=True,
) as resp:
content_length = resp.headers.get("Content-Length")
if content_length is not None and int(content_length) > max_file_size:
_LOGGER.error(
(
"Attachment too large (Content-Length reports %s). Max size: %s"
" bytes"
),
int(content_length),
max_file_size,
)
return None
file_size = 0
byte_chunks = bytearray()
async for byte_chunk, _ in resp.content.iter_chunks():
file_size += len(byte_chunk)
if file_size > max_file_size:
_LOGGER.error(
"Attachment too large (Stream reports %s). Max size: %s bytes",
file_size,
max_file_size,
)
return None
byte_chunks.extend(byte_chunk)
return byte_chunks
async def async_send_message(self, message: str, **kwargs: Any) -> None:
"""Login to Discord, send message to channel(s) and log out."""
nextcord.VoiceClient.warn_nacl = False
discord_bot = nextcord.Client()
images = []
embedding = None
if ATTR_TARGET not in kwargs:
_LOGGER.error("No target specified")
return
data = kwargs.get(ATTR_DATA) or {}
embeds: list[nextcord.Embed] = []
if ATTR_EMBED in data:
embedding = data[ATTR_EMBED]
title = embedding.get(ATTR_EMBED_TITLE)
description = embedding.get(ATTR_EMBED_DESCRIPTION)
color = embedding.get(ATTR_EMBED_COLOR)
url = embedding.get(ATTR_EMBED_URL)
fields = embedding.get(ATTR_EMBED_FIELDS) or []
if embedding:
embed = nextcord.Embed(
title=title, description=description, color=color, url=url
)
for field in fields:
embed.add_field(**field)
if ATTR_EMBED_FOOTER in embedding:
embed.set_footer(**embedding[ATTR_EMBED_FOOTER])
if ATTR_EMBED_AUTHOR in embedding:
embed.set_author(**embedding[ATTR_EMBED_AUTHOR])
if ATTR_EMBED_THUMBNAIL in embedding:
embed.set_thumbnail(**embedding[ATTR_EMBED_THUMBNAIL])
if ATTR_EMBED_IMAGE in embedding:
embed.set_image(**embedding[ATTR_EMBED_IMAGE])
embeds.append(embed)
if ATTR_IMAGES in data:
for image in data.get(ATTR_IMAGES, []):
image_exists = await self.hass.async_add_executor_job(
self.file_exists, image
)
filename = os.path.basename(image)
if image_exists:
images.append((image, filename))
if ATTR_URLS in data:
for url in data.get(ATTR_URLS, []):
file = await self.async_get_file_from_url(
url,
data.get(ATTR_VERIFY_SSL, True),
MAX_ALLOWED_DOWNLOAD_SIZE_BYTES,
)
if file is not None:
filename = os.path.basename(url)
images.append((BytesIO(file), filename))
await discord_bot.login(self.token)
try:
for channelid in kwargs[ATTR_TARGET]:
channelid = int(channelid)
# Must create new instances of File for each channel.
files = [nextcord.File(image, filename) for image, filename in images]
try:
channel = cast(
Messageable, await discord_bot.fetch_channel(channelid)
)
except nextcord.NotFound:
try:
channel = await discord_bot.fetch_user(channelid)
except nextcord.NotFound:
_LOGGER.warning("Channel not found for ID: %s", channelid)
continue
await channel.send(message, files=files, embeds=embeds)
except (nextcord.HTTPException, nextcord.NotFound) as error:
_LOGGER.warning("Communication error: %s", error)
await discord_bot.close()