mirror of https://github.com/home-assistant/core
184 lines
6.4 KiB
Python
184 lines
6.4 KiB
Python
"""The Overseerr integration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import cast
|
|
|
|
from aiohttp.hdrs import METH_POST
|
|
from aiohttp.web_request import Request
|
|
from aiohttp.web_response import Response
|
|
from python_overseerr import OverseerrConnectionError
|
|
|
|
from homeassistant.components import cloud
|
|
from homeassistant.components.webhook import (
|
|
async_generate_url,
|
|
async_register,
|
|
async_unregister,
|
|
)
|
|
from homeassistant.const import CONF_WEBHOOK_ID, Platform
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers import config_validation as cv
|
|
from homeassistant.helpers.dispatcher import async_dispatcher_send
|
|
from homeassistant.helpers.http import HomeAssistantView
|
|
from homeassistant.helpers.typing import ConfigType
|
|
|
|
from .const import DOMAIN, EVENT_KEY, JSON_PAYLOAD, LOGGER, REGISTERED_NOTIFICATIONS
|
|
from .coordinator import OverseerrConfigEntry, OverseerrCoordinator
|
|
from .services import setup_services
|
|
|
|
PLATFORMS: list[Platform] = [Platform.EVENT, Platform.SENSOR]
|
|
CONF_CLOUDHOOK_URL = "cloudhook_url"
|
|
|
|
CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)
|
|
|
|
|
|
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
|
"""Set up the Overseerr component."""
|
|
setup_services(hass)
|
|
return True
|
|
|
|
|
|
async def async_setup_entry(hass: HomeAssistant, entry: OverseerrConfigEntry) -> bool:
|
|
"""Set up Overseerr from a config entry."""
|
|
|
|
coordinator = OverseerrCoordinator(hass, entry)
|
|
|
|
await coordinator.async_config_entry_first_refresh()
|
|
|
|
entry.runtime_data = coordinator
|
|
|
|
webhook_manager = OverseerrWebhookManager(hass, entry)
|
|
|
|
try:
|
|
await webhook_manager.register_webhook()
|
|
except OverseerrConnectionError:
|
|
LOGGER.error("Failed to register Overseerr webhook")
|
|
|
|
entry.async_on_unload(webhook_manager.unregister_webhook)
|
|
|
|
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
|
|
|
|
return True
|
|
|
|
|
|
async def async_unload_entry(hass: HomeAssistant, entry: OverseerrConfigEntry) -> bool:
|
|
"""Unload a config entry."""
|
|
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
|
|
|
|
|
|
async def async_remove_entry(hass: HomeAssistant, entry: OverseerrConfigEntry) -> None:
|
|
"""Cleanup when entry is removed."""
|
|
if cloud.async_active_subscription(hass):
|
|
try:
|
|
LOGGER.debug(
|
|
"Removing Overseerr cloudhook (%s)", entry.data[CONF_WEBHOOK_ID]
|
|
)
|
|
await cloud.async_delete_cloudhook(hass, entry.data[CONF_WEBHOOK_ID])
|
|
except cloud.CloudNotAvailable:
|
|
pass
|
|
|
|
|
|
class OverseerrWebhookManager:
|
|
"""Overseerr webhook manager."""
|
|
|
|
def __init__(self, hass: HomeAssistant, entry: OverseerrConfigEntry) -> None:
|
|
"""Initialize Overseerr webhook manager."""
|
|
self.hass = hass
|
|
self.entry = entry
|
|
self.client = entry.runtime_data.client
|
|
|
|
@property
|
|
def webhook_urls(self) -> list[str]:
|
|
"""Return webhook URLs."""
|
|
urls = [
|
|
async_generate_url(
|
|
self.hass, self.entry.data[CONF_WEBHOOK_ID], prefer_external=external
|
|
)
|
|
for external in (False, True)
|
|
]
|
|
res = []
|
|
for url in urls:
|
|
if url not in res:
|
|
res.append(url)
|
|
if CONF_CLOUDHOOK_URL in self.entry.data:
|
|
res.append(self.entry.data[CONF_CLOUDHOOK_URL])
|
|
return res
|
|
|
|
async def register_webhook(self) -> None:
|
|
"""Register webhook."""
|
|
async_register(
|
|
self.hass,
|
|
DOMAIN,
|
|
self.entry.title,
|
|
self.entry.data[CONF_WEBHOOK_ID],
|
|
self.handle_webhook,
|
|
allowed_methods=[METH_POST],
|
|
)
|
|
if not await self.check_need_change():
|
|
return
|
|
for url in self.webhook_urls:
|
|
if await self.test_and_set_webhook(url):
|
|
return
|
|
LOGGER.info("Failed to register Overseerr webhook")
|
|
if (
|
|
cloud.async_active_subscription(self.hass)
|
|
and CONF_CLOUDHOOK_URL not in self.entry.data
|
|
):
|
|
LOGGER.info("Trying to register a cloudhook URL")
|
|
url = await _async_cloudhook_generate_url(self.hass, self.entry)
|
|
if await self.test_and_set_webhook(url):
|
|
return
|
|
LOGGER.error("Failed to register Overseerr cloudhook")
|
|
|
|
async def check_need_change(self) -> bool:
|
|
"""Check if webhook needs to be changed."""
|
|
current_config = await self.client.get_webhook_notification_config()
|
|
return (
|
|
not current_config.enabled
|
|
or current_config.options.webhook_url not in self.webhook_urls
|
|
or current_config.options.json_payload != json.loads(JSON_PAYLOAD)
|
|
or current_config.types != REGISTERED_NOTIFICATIONS
|
|
)
|
|
|
|
async def test_and_set_webhook(self, url: str) -> bool:
|
|
"""Test and set webhook."""
|
|
if await self.client.test_webhook_notification_config(url, JSON_PAYLOAD):
|
|
LOGGER.debug("Setting Overseerr webhook to %s", url)
|
|
await self.client.set_webhook_notification_config(
|
|
enabled=True,
|
|
types=REGISTERED_NOTIFICATIONS,
|
|
webhook_url=url,
|
|
json_payload=JSON_PAYLOAD,
|
|
)
|
|
return True
|
|
return False
|
|
|
|
async def handle_webhook(
|
|
self, hass: HomeAssistant, webhook_id: str, request: Request
|
|
) -> Response:
|
|
"""Handle webhook."""
|
|
data = await request.json()
|
|
LOGGER.debug("Received webhook payload: %s", data)
|
|
if data["notification_type"].startswith("MEDIA"):
|
|
await self.entry.runtime_data.async_refresh()
|
|
async_dispatcher_send(hass, EVENT_KEY, data)
|
|
return HomeAssistantView.json({"message": "ok"})
|
|
|
|
async def unregister_webhook(self) -> None:
|
|
"""Unregister webhook."""
|
|
async_unregister(self.hass, self.entry.data[CONF_WEBHOOK_ID])
|
|
|
|
|
|
async def _async_cloudhook_generate_url(
|
|
hass: HomeAssistant, entry: OverseerrConfigEntry
|
|
) -> str:
|
|
"""Generate the full URL for a webhook_id."""
|
|
if CONF_CLOUDHOOK_URL not in entry.data:
|
|
webhook_id = entry.data[CONF_WEBHOOK_ID]
|
|
webhook_url = await cloud.async_create_cloudhook(hass, webhook_id)
|
|
data = {**entry.data, CONF_CLOUDHOOK_URL: webhook_url}
|
|
hass.config_entries.async_update_entry(entry, data=data)
|
|
return webhook_url
|
|
return cast(str, entry.data[CONF_CLOUDHOOK_URL])
|