mirror of https://github.com/home-assistant/core
143 lines
4.9 KiB
Python
143 lines
4.9 KiB
Python
"""Backup platform for the Google Drive integration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import AsyncIterator, Callable, Coroutine
|
|
import logging
|
|
from typing import Any
|
|
|
|
from google_drive_api.exceptions import GoogleDriveApiError
|
|
|
|
from homeassistant.components.backup import AgentBackup, BackupAgent, BackupAgentError
|
|
from homeassistant.core import HomeAssistant, callback
|
|
from homeassistant.exceptions import HomeAssistantError
|
|
from homeassistant.helpers.aiohttp_client import ChunkAsyncStreamIterator
|
|
from homeassistant.util import slugify
|
|
|
|
from . import DATA_BACKUP_AGENT_LISTENERS, GoogleDriveConfigEntry
|
|
from .const import DOMAIN
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
async def async_get_backup_agents(
|
|
hass: HomeAssistant,
|
|
**kwargs: Any,
|
|
) -> list[BackupAgent]:
|
|
"""Return a list of backup agents."""
|
|
entries = hass.config_entries.async_loaded_entries(DOMAIN)
|
|
return [GoogleDriveBackupAgent(entry) for entry in entries]
|
|
|
|
|
|
@callback
|
|
def async_register_backup_agents_listener(
|
|
hass: HomeAssistant,
|
|
*,
|
|
listener: Callable[[], None],
|
|
**kwargs: Any,
|
|
) -> Callable[[], None]:
|
|
"""Register a listener to be called when agents are added or removed.
|
|
|
|
:return: A function to unregister the listener.
|
|
"""
|
|
hass.data.setdefault(DATA_BACKUP_AGENT_LISTENERS, []).append(listener)
|
|
|
|
@callback
|
|
def remove_listener() -> None:
|
|
"""Remove the listener."""
|
|
hass.data[DATA_BACKUP_AGENT_LISTENERS].remove(listener)
|
|
if not hass.data[DATA_BACKUP_AGENT_LISTENERS]:
|
|
del hass.data[DATA_BACKUP_AGENT_LISTENERS]
|
|
|
|
return remove_listener
|
|
|
|
|
|
class GoogleDriveBackupAgent(BackupAgent):
|
|
"""Google Drive backup agent."""
|
|
|
|
domain = DOMAIN
|
|
|
|
def __init__(self, config_entry: GoogleDriveConfigEntry) -> None:
|
|
"""Initialize the cloud backup sync agent."""
|
|
super().__init__()
|
|
assert config_entry.unique_id
|
|
self.name = config_entry.title
|
|
self.unique_id = slugify(config_entry.unique_id)
|
|
self._client = config_entry.runtime_data
|
|
|
|
async def async_upload_backup(
|
|
self,
|
|
*,
|
|
open_stream: Callable[[], Coroutine[Any, Any, AsyncIterator[bytes]]],
|
|
backup: AgentBackup,
|
|
**kwargs: Any,
|
|
) -> None:
|
|
"""Upload a backup.
|
|
|
|
:param open_stream: A function returning an async iterator that yields bytes.
|
|
:param backup: Metadata about the backup that should be uploaded.
|
|
"""
|
|
try:
|
|
await self._client.async_upload_backup(open_stream, backup)
|
|
except (GoogleDriveApiError, HomeAssistantError, TimeoutError) as err:
|
|
raise BackupAgentError(f"Failed to upload backup: {err}") from err
|
|
|
|
async def async_list_backups(self, **kwargs: Any) -> list[AgentBackup]:
|
|
"""List backups."""
|
|
try:
|
|
return await self._client.async_list_backups()
|
|
except (GoogleDriveApiError, HomeAssistantError, TimeoutError) as err:
|
|
raise BackupAgentError(f"Failed to list backups: {err}") from err
|
|
|
|
async def async_get_backup(
|
|
self,
|
|
backup_id: str,
|
|
**kwargs: Any,
|
|
) -> AgentBackup | None:
|
|
"""Return a backup."""
|
|
backups = await self.async_list_backups()
|
|
for backup in backups:
|
|
if backup.backup_id == backup_id:
|
|
return backup
|
|
return None
|
|
|
|
async def async_download_backup(
|
|
self,
|
|
backup_id: str,
|
|
**kwargs: Any,
|
|
) -> AsyncIterator[bytes]:
|
|
"""Download a backup file.
|
|
|
|
:param backup_id: The ID of the backup that was returned in async_list_backups.
|
|
:return: An async iterator that yields bytes.
|
|
"""
|
|
_LOGGER.debug("Downloading backup_id: %s", backup_id)
|
|
try:
|
|
file_id = await self._client.async_get_backup_file_id(backup_id)
|
|
if file_id:
|
|
_LOGGER.debug("Downloading file_id: %s", file_id)
|
|
stream = await self._client.async_download(file_id)
|
|
return ChunkAsyncStreamIterator(stream)
|
|
except (GoogleDriveApiError, HomeAssistantError, TimeoutError) as err:
|
|
raise BackupAgentError(f"Failed to download backup: {err}") from err
|
|
raise BackupAgentError("Backup not found")
|
|
|
|
async def async_delete_backup(
|
|
self,
|
|
backup_id: str,
|
|
**kwargs: Any,
|
|
) -> None:
|
|
"""Delete a backup file.
|
|
|
|
:param backup_id: The ID of the backup that was returned in async_list_backups.
|
|
"""
|
|
_LOGGER.debug("Deleting backup_id: %s", backup_id)
|
|
try:
|
|
file_id = await self._client.async_get_backup_file_id(backup_id)
|
|
if file_id:
|
|
_LOGGER.debug("Deleting file_id: %s", file_id)
|
|
await self._client.async_delete(file_id)
|
|
_LOGGER.debug("Deleted backup_id: %s", backup_id)
|
|
except (GoogleDriveApiError, HomeAssistantError, TimeoutError) as err:
|
|
raise BackupAgentError(f"Failed to delete backup: {err}") from err
|