mirror of https://github.com/home-assistant/core
419 lines
13 KiB
Python
419 lines
13 KiB
Python
"""Config flow for EZVIZ."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Mapping
|
|
import logging
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
from pyezviz.client import EzvizClient
|
|
from pyezviz.exceptions import (
|
|
AuthTestResultFailed,
|
|
EzvizAuthVerificationCode,
|
|
InvalidHost,
|
|
InvalidURL,
|
|
PyEzvizError,
|
|
)
|
|
from pyezviz.test_cam_rtsp import TestRTSPAuth
|
|
import voluptuous as vol
|
|
|
|
from homeassistant.config_entries import (
|
|
ConfigEntry,
|
|
ConfigFlow,
|
|
ConfigFlowResult,
|
|
OptionsFlow,
|
|
)
|
|
from homeassistant.const import (
|
|
CONF_CUSTOMIZE,
|
|
CONF_IP_ADDRESS,
|
|
CONF_PASSWORD,
|
|
CONF_TIMEOUT,
|
|
CONF_TYPE,
|
|
CONF_URL,
|
|
CONF_USERNAME,
|
|
)
|
|
from homeassistant.core import callback
|
|
|
|
from .const import (
|
|
ATTR_SERIAL,
|
|
ATTR_TYPE_CAMERA,
|
|
ATTR_TYPE_CLOUD,
|
|
CONF_FFMPEG_ARGUMENTS,
|
|
CONF_RFSESSION_ID,
|
|
CONF_SESSION_ID,
|
|
DEFAULT_CAMERA_USERNAME,
|
|
DEFAULT_FFMPEG_ARGUMENTS,
|
|
DEFAULT_TIMEOUT,
|
|
DOMAIN,
|
|
EU_URL,
|
|
RUSSIA_URL,
|
|
)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
DEFAULT_OPTIONS = {
|
|
CONF_FFMPEG_ARGUMENTS: DEFAULT_FFMPEG_ARGUMENTS,
|
|
CONF_TIMEOUT: DEFAULT_TIMEOUT,
|
|
}
|
|
|
|
|
|
def _validate_and_create_auth(data: dict) -> dict[str, Any]:
|
|
"""Try to login to EZVIZ cloud account and return token."""
|
|
# Verify cloud credentials by attempting a login request with username and password.
|
|
# Return login token.
|
|
|
|
ezviz_client = EzvizClient(
|
|
data[CONF_USERNAME],
|
|
data[CONF_PASSWORD],
|
|
data[CONF_URL],
|
|
data.get(CONF_TIMEOUT, DEFAULT_TIMEOUT),
|
|
)
|
|
|
|
ezviz_token = ezviz_client.login()
|
|
|
|
return {
|
|
CONF_SESSION_ID: ezviz_token[CONF_SESSION_ID],
|
|
CONF_RFSESSION_ID: ezviz_token[CONF_RFSESSION_ID],
|
|
CONF_URL: ezviz_token["api_url"],
|
|
CONF_TYPE: ATTR_TYPE_CLOUD,
|
|
}
|
|
|
|
|
|
def _test_camera_rtsp_creds(data: dict) -> None:
|
|
"""Try DESCRIBE on RTSP camera with credentials."""
|
|
|
|
test_rtsp = TestRTSPAuth(
|
|
data[CONF_IP_ADDRESS], data[CONF_USERNAME], data[CONF_PASSWORD]
|
|
)
|
|
|
|
test_rtsp.main()
|
|
|
|
|
|
class EzvizConfigFlow(ConfigFlow, domain=DOMAIN):
|
|
"""Handle a config flow for EZVIZ."""
|
|
|
|
VERSION = 1
|
|
|
|
ip_address: str
|
|
username: str | None
|
|
password: str | None
|
|
unique_id: str
|
|
|
|
async def _validate_and_create_camera_rtsp(self, data: dict) -> ConfigFlowResult:
|
|
"""Try DESCRIBE on RTSP camera with credentials."""
|
|
|
|
# Get EZVIZ cloud credentials from config entry
|
|
ezviz_token = {
|
|
CONF_SESSION_ID: None,
|
|
CONF_RFSESSION_ID: None,
|
|
"api_url": None,
|
|
}
|
|
ezviz_timeout = DEFAULT_TIMEOUT
|
|
|
|
for item in self._async_current_entries():
|
|
if item.data.get(CONF_TYPE) == ATTR_TYPE_CLOUD:
|
|
ezviz_token = {
|
|
CONF_SESSION_ID: item.data.get(CONF_SESSION_ID),
|
|
CONF_RFSESSION_ID: item.data.get(CONF_RFSESSION_ID),
|
|
"api_url": item.data.get(CONF_URL),
|
|
}
|
|
ezviz_timeout = item.data.get(CONF_TIMEOUT, DEFAULT_TIMEOUT)
|
|
|
|
# Abort flow if user removed cloud account before adding camera.
|
|
if ezviz_token.get(CONF_SESSION_ID) is None:
|
|
return self.async_abort(reason="ezviz_cloud_account_missing")
|
|
|
|
ezviz_client = EzvizClient(token=ezviz_token, timeout=ezviz_timeout)
|
|
|
|
# We need to wake hibernating cameras.
|
|
# First create EZVIZ API instance.
|
|
await self.hass.async_add_executor_job(ezviz_client.login)
|
|
|
|
# Secondly try to wake hybernating camera.
|
|
await self.hass.async_add_executor_job(
|
|
ezviz_client.get_detection_sensibility, data[ATTR_SERIAL]
|
|
)
|
|
|
|
# Thirdly attempts an authenticated RTSP DESCRIBE request.
|
|
await self.hass.async_add_executor_job(_test_camera_rtsp_creds, data)
|
|
|
|
return self.async_create_entry(
|
|
title=data[ATTR_SERIAL],
|
|
data={
|
|
CONF_USERNAME: data[CONF_USERNAME],
|
|
CONF_PASSWORD: data[CONF_PASSWORD],
|
|
CONF_TYPE: ATTR_TYPE_CAMERA,
|
|
},
|
|
options=DEFAULT_OPTIONS,
|
|
)
|
|
|
|
@staticmethod
|
|
@callback
|
|
def async_get_options_flow(config_entry: ConfigEntry) -> EzvizOptionsFlowHandler:
|
|
"""Get the options flow for this handler."""
|
|
return EzvizOptionsFlowHandler()
|
|
|
|
async def async_step_user(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> ConfigFlowResult:
|
|
"""Handle a flow initiated by the user."""
|
|
|
|
# Check if EZVIZ cloud account is present in entry config,
|
|
# abort if already configured.
|
|
for item in self._async_current_entries():
|
|
if item.data.get(CONF_TYPE) == ATTR_TYPE_CLOUD:
|
|
return self.async_abort(reason="already_configured_account")
|
|
|
|
errors = {}
|
|
auth_data = {}
|
|
|
|
if user_input is not None:
|
|
await self.async_set_unique_id(user_input[CONF_USERNAME])
|
|
self._abort_if_unique_id_configured()
|
|
|
|
if user_input[CONF_URL] == CONF_CUSTOMIZE:
|
|
self.username = user_input[CONF_USERNAME]
|
|
self.password = user_input[CONF_PASSWORD]
|
|
return await self.async_step_user_custom_url()
|
|
|
|
try:
|
|
auth_data = await self.hass.async_add_executor_job(
|
|
_validate_and_create_auth, user_input
|
|
)
|
|
|
|
except InvalidURL:
|
|
errors["base"] = "invalid_host"
|
|
|
|
except InvalidHost:
|
|
errors["base"] = "cannot_connect"
|
|
|
|
except EzvizAuthVerificationCode:
|
|
errors["base"] = "mfa_required"
|
|
|
|
except PyEzvizError:
|
|
errors["base"] = "invalid_auth"
|
|
|
|
except Exception:
|
|
_LOGGER.exception("Unexpected exception")
|
|
return self.async_abort(reason="unknown")
|
|
|
|
else:
|
|
return self.async_create_entry(
|
|
title=user_input[CONF_USERNAME],
|
|
data=auth_data,
|
|
options=DEFAULT_OPTIONS,
|
|
)
|
|
|
|
data_schema = vol.Schema(
|
|
{
|
|
vol.Required(CONF_USERNAME): str,
|
|
vol.Required(CONF_PASSWORD): str,
|
|
vol.Required(CONF_URL, default=EU_URL): vol.In(
|
|
[EU_URL, RUSSIA_URL, CONF_CUSTOMIZE]
|
|
),
|
|
}
|
|
)
|
|
|
|
return self.async_show_form(
|
|
step_id="user", data_schema=data_schema, errors=errors
|
|
)
|
|
|
|
async def async_step_user_custom_url(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> ConfigFlowResult:
|
|
"""Handle a flow initiated by the user for custom region url."""
|
|
errors = {}
|
|
auth_data = {}
|
|
|
|
if user_input is not None:
|
|
user_input[CONF_USERNAME] = self.username
|
|
user_input[CONF_PASSWORD] = self.password
|
|
|
|
try:
|
|
auth_data = await self.hass.async_add_executor_job(
|
|
_validate_and_create_auth, user_input
|
|
)
|
|
|
|
except InvalidURL:
|
|
errors["base"] = "invalid_host"
|
|
|
|
except InvalidHost:
|
|
errors["base"] = "cannot_connect"
|
|
|
|
except EzvizAuthVerificationCode:
|
|
errors["base"] = "mfa_required"
|
|
|
|
except PyEzvizError:
|
|
errors["base"] = "invalid_auth"
|
|
|
|
except Exception:
|
|
_LOGGER.exception("Unexpected exception")
|
|
return self.async_abort(reason="unknown")
|
|
|
|
else:
|
|
return self.async_create_entry(
|
|
title=user_input[CONF_USERNAME],
|
|
data=auth_data,
|
|
options=DEFAULT_OPTIONS,
|
|
)
|
|
|
|
data_schema_custom_url = vol.Schema(
|
|
{
|
|
vol.Required(CONF_URL, default=EU_URL): str,
|
|
}
|
|
)
|
|
|
|
return self.async_show_form(
|
|
step_id="user_custom_url", data_schema=data_schema_custom_url, errors=errors
|
|
)
|
|
|
|
async def async_step_integration_discovery(
|
|
self, discovery_info: dict[str, Any]
|
|
) -> ConfigFlowResult:
|
|
"""Handle a flow for discovered camera without rtsp config entry."""
|
|
|
|
await self.async_set_unique_id(discovery_info[ATTR_SERIAL])
|
|
self._abort_if_unique_id_configured()
|
|
|
|
if TYPE_CHECKING:
|
|
# A unique ID is passed in via the discovery info
|
|
assert self.unique_id is not None
|
|
self.context["title_placeholders"] = {ATTR_SERIAL: self.unique_id}
|
|
self.ip_address = discovery_info[CONF_IP_ADDRESS]
|
|
|
|
return await self.async_step_confirm()
|
|
|
|
async def async_step_confirm(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> ConfigFlowResult:
|
|
"""Confirm and create entry from discovery step."""
|
|
errors = {}
|
|
|
|
if user_input is not None:
|
|
user_input[ATTR_SERIAL] = self.unique_id
|
|
user_input[CONF_IP_ADDRESS] = self.ip_address
|
|
try:
|
|
return await self._validate_and_create_camera_rtsp(user_input)
|
|
|
|
except (InvalidHost, InvalidURL):
|
|
errors["base"] = "invalid_host"
|
|
|
|
except EzvizAuthVerificationCode:
|
|
errors["base"] = "mfa_required"
|
|
|
|
except (PyEzvizError, AuthTestResultFailed):
|
|
errors["base"] = "invalid_auth"
|
|
|
|
except Exception:
|
|
_LOGGER.exception("Unexpected exception")
|
|
return self.async_abort(reason="unknown")
|
|
|
|
discovered_camera_schema = vol.Schema(
|
|
{
|
|
vol.Required(CONF_USERNAME, default=DEFAULT_CAMERA_USERNAME): str,
|
|
vol.Required(CONF_PASSWORD): str,
|
|
}
|
|
)
|
|
|
|
return self.async_show_form(
|
|
step_id="confirm",
|
|
data_schema=discovered_camera_schema,
|
|
errors=errors,
|
|
description_placeholders={
|
|
ATTR_SERIAL: self.unique_id,
|
|
CONF_IP_ADDRESS: self.ip_address,
|
|
},
|
|
)
|
|
|
|
async def async_step_reauth(
|
|
self, entry_data: Mapping[str, Any]
|
|
) -> ConfigFlowResult:
|
|
"""Handle a flow for reauthentication with password."""
|
|
|
|
return await self.async_step_reauth_confirm()
|
|
|
|
async def async_step_reauth_confirm(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> ConfigFlowResult:
|
|
"""Handle a Confirm flow for reauthentication with password."""
|
|
auth_data = {}
|
|
errors = {}
|
|
entry = None
|
|
|
|
for item in self._async_current_entries():
|
|
if item.data.get(CONF_TYPE) == ATTR_TYPE_CLOUD:
|
|
self.context["title_placeholders"] = {ATTR_SERIAL: item.title}
|
|
entry = await self.async_set_unique_id(item.title)
|
|
|
|
if not entry:
|
|
return self.async_abort(reason="ezviz_cloud_account_missing")
|
|
|
|
if user_input is not None:
|
|
user_input[CONF_URL] = entry.data[CONF_URL]
|
|
|
|
try:
|
|
auth_data = await self.hass.async_add_executor_job(
|
|
_validate_and_create_auth, user_input
|
|
)
|
|
|
|
except (InvalidHost, InvalidURL):
|
|
errors["base"] = "invalid_host"
|
|
|
|
except EzvizAuthVerificationCode:
|
|
errors["base"] = "mfa_required"
|
|
|
|
except (PyEzvizError, AuthTestResultFailed):
|
|
errors["base"] = "invalid_auth"
|
|
|
|
except Exception:
|
|
_LOGGER.exception("Unexpected exception")
|
|
return self.async_abort(reason="unknown")
|
|
|
|
else:
|
|
return self.async_update_reload_and_abort(
|
|
entry,
|
|
data=auth_data,
|
|
)
|
|
|
|
data_schema = vol.Schema(
|
|
{
|
|
vol.Required(CONF_USERNAME, default=entry.title): vol.In([entry.title]),
|
|
vol.Required(CONF_PASSWORD): str,
|
|
}
|
|
)
|
|
|
|
return self.async_show_form(
|
|
step_id="reauth_confirm",
|
|
data_schema=data_schema,
|
|
errors=errors,
|
|
)
|
|
|
|
|
|
class EzvizOptionsFlowHandler(OptionsFlow):
|
|
"""Handle EZVIZ client options."""
|
|
|
|
async def async_step_init(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> ConfigFlowResult:
|
|
"""Manage EZVIZ options."""
|
|
if user_input is not None:
|
|
return self.async_create_entry(title="", data=user_input)
|
|
|
|
options = vol.Schema(
|
|
{
|
|
vol.Optional(
|
|
CONF_TIMEOUT,
|
|
default=self.config_entry.options.get(
|
|
CONF_TIMEOUT, DEFAULT_TIMEOUT
|
|
),
|
|
): int,
|
|
vol.Optional(
|
|
CONF_FFMPEG_ARGUMENTS,
|
|
default=self.config_entry.options.get(
|
|
CONF_FFMPEG_ARGUMENTS, DEFAULT_FFMPEG_ARGUMENTS
|
|
),
|
|
): str,
|
|
}
|
|
)
|
|
|
|
return self.async_show_form(step_id="init", data_schema=options)
|