core/tests/components/nest/test_config_flow.py

1179 lines
40 KiB
Python

"""Test the Google Nest Device Access config flow."""
from __future__ import annotations
from http import HTTPStatus
from typing import Any
from unittest.mock import patch
from google_nest_sdm.exceptions import AuthException
from google_nest_sdm.structure import Structure
import pytest
from homeassistant import config_entries
from homeassistant.components import dhcp
from homeassistant.components.nest.const import DOMAIN, OAUTH2_AUTHORIZE, OAUTH2_TOKEN
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResult, FlowResultType
from homeassistant.helpers import config_entry_oauth2_flow
from .common import (
CLIENT_ID,
CLOUD_PROJECT_ID,
PROJECT_ID,
SUBSCRIBER_ID,
TEST_CONFIG_APP_CREDS,
TEST_CONFIGFLOW_APP_CREDS,
FakeSubscriber,
NestTestConfig,
)
from tests.common import MockConfigEntry
from tests.test_util.aiohttp import AiohttpClientMocker
from tests.typing import ClientSessionGenerator
WEB_REDIRECT_URL = "https://example.com/auth/external/callback"
APP_REDIRECT_URL = "urn:ietf:wg:oauth:2.0:oob"
RAND_SUBSCRIBER_SUFFIX = "ABCDEF"
FAKE_DHCP_DATA = dhcp.DhcpServiceInfo(
ip="127.0.0.2", macaddress="001122334455", hostname="fake_hostname"
)
@pytest.fixture
def nest_test_config() -> NestTestConfig:
"""Fixture with empty configuration and no existing config entry."""
return TEST_CONFIGFLOW_APP_CREDS
@pytest.fixture(autouse=True)
def mock_rand_topic_name_fixture() -> None:
"""Set the topic name random string to a constant."""
with patch(
"homeassistant.components.nest.config_flow.get_random_string",
return_value=RAND_SUBSCRIBER_SUFFIX,
):
yield
class OAuthFixture:
"""Simulate the oauth flow used by the config flow."""
def __init__(
self,
hass: HomeAssistant,
hass_client_no_auth: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Initialize OAuthFixture."""
self.hass = hass
self.hass_client = hass_client_no_auth
self.aioclient_mock = aioclient_mock
async def async_app_creds_flow(
self,
result: dict,
cloud_project_id: str = CLOUD_PROJECT_ID,
project_id: str = PROJECT_ID,
) -> None:
"""Invoke multiple steps in the app credentials based flow."""
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "cloud_project"
result = await self.async_configure(
result, {"cloud_project_id": CLOUD_PROJECT_ID}
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "device_project"
result = await self.async_configure(result, {"project_id": project_id})
await self.async_oauth_web_flow(result, project_id=project_id)
async def async_oauth_web_flow(self, result: dict, project_id=PROJECT_ID) -> None:
"""Invoke the oauth flow for Web Auth with fake responses."""
state = self.create_state(result, WEB_REDIRECT_URL)
assert result["type"] is FlowResultType.EXTERNAL_STEP
assert result["url"] == self.authorize_url(
state,
WEB_REDIRECT_URL,
CLIENT_ID,
project_id,
)
# Simulate user redirect back with auth code
client = await self.hass_client()
resp = await client.get(f"/auth/external/callback?code=abcd&state={state}")
assert resp.status == 200
assert resp.headers["content-type"] == "text/html; charset=utf-8"
async def async_reauth(self, config_entry: ConfigEntry) -> dict:
"""Initiate a reuath flow."""
config_entry.async_start_reauth(self.hass)
await self.hass.async_block_till_done()
# Advance through the reauth flow
result = self.async_progress()
assert result["step_id"] == "reauth_confirm"
# Advance to the oauth flow
return await self.hass.config_entries.flow.async_configure(
result["flow_id"], {}
)
def async_progress(self) -> FlowResult:
"""Return the current step of the config flow."""
flows = self.hass.config_entries.flow.async_progress()
assert len(flows) == 1
return flows[0]
def create_state(self, result: dict, redirect_url: str) -> str:
"""Create state object based on redirect url."""
return config_entry_oauth2_flow._encode_jwt(
self.hass,
{
"flow_id": result["flow_id"],
"redirect_uri": redirect_url,
},
)
def authorize_url(
self, state: str, redirect_url: str, client_id: str, project_id: str
) -> str:
"""Generate the expected authorization url."""
oauth_authorize = OAUTH2_AUTHORIZE.format(project_id=project_id)
return (
f"{oauth_authorize}?response_type=code&client_id={client_id}"
f"&redirect_uri={redirect_url}"
f"&state={state}&scope=https://www.googleapis.com/auth/sdm.service"
"+https://www.googleapis.com/auth/pubsub"
"&access_type=offline&prompt=consent"
)
def async_mock_refresh(self) -> None:
"""Finish the OAuth flow exchanging auth token for refresh token."""
self.aioclient_mock.post(
OAUTH2_TOKEN,
json={
"refresh_token": "mock-refresh-token",
"access_token": "mock-access-token",
"type": "Bearer",
"expires_in": 60,
},
)
async def async_complete_pubsub_flow(
self,
result: dict,
selected_topic: str,
selected_subscription: str = "create_new_subscription",
user_input: dict | None = None,
) -> ConfigEntry:
"""Fixture to walk through the Pub/Sub topic and subscription steps.
This picks a simple set of steps that are reusable for most flows without
exercising the corner cases.
"""
# Validate Pub/Sub topics are shown
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_topic"
assert not result.get("errors")
# Select Pub/Sub topic the show available subscriptions (none)
result = await self.async_configure(
result,
{
"topic_name": selected_topic,
},
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_subscription"
assert not result.get("errors")
# Create the subscription and end the flow
return await self.async_finish_setup(
result,
{
"subscription_name": selected_subscription,
},
)
async def async_finish_setup(
self, result: dict, user_input: dict | None = None
) -> ConfigEntry:
"""Finish the OAuth flow exchanging auth token for refresh token."""
with patch(
"homeassistant.components.nest.async_setup_entry", return_value=True
) as mock_setup:
await self.async_configure(result, user_input)
assert len(mock_setup.mock_calls) == 1
await self.hass.async_block_till_done()
return self.get_config_entry()
async def async_configure(
self, result: dict[str, Any], user_input: dict[str, Any]
) -> dict:
"""Advance to the next step in the config flow."""
return await self.hass.config_entries.flow.async_configure(
result["flow_id"],
user_input,
)
def get_config_entry(self) -> ConfigEntry:
"""Get the config entry."""
entries = self.hass.config_entries.async_entries(DOMAIN)
assert len(entries) >= 1
return entries[0]
@pytest.fixture
async def oauth(
hass: HomeAssistant,
hass_client_no_auth: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
current_request_with_host: None,
) -> OAuthFixture:
"""Create the simulated oauth flow."""
return OAuthFixture(hass, hass_client_no_auth, aioclient_mock)
@pytest.fixture(name="sdm_managed_topic")
def mock_sdm_managed_topic() -> bool:
"""Fixture to configure fake server responses for SDM owend Pub/Sub topics."""
return False
@pytest.fixture(name="user_managed_topics")
def mock_user_managed_topics() -> list[str]:
"""Fixture to configure fake server response for user owned Pub/Sub topics."""
return []
@pytest.fixture(name="subscriptions")
def mock_subscriptions() -> list[tuple[str, str]]:
"""Fixture to configure fake server response for user subscriptions that exist."""
return []
@pytest.fixture(name="device_access_project_id")
def mock_device_access_project_id() -> str:
"""Fixture to configure the device access console project id used in tests."""
return PROJECT_ID
@pytest.fixture(name="cloud_project_id")
def mock_cloud_project_id() -> str:
"""Fixture to configure the cloud console project id used in tests."""
return CLOUD_PROJECT_ID
@pytest.fixture(name="create_subscription_status")
def mock_create_subscription_status() -> str:
"""Fixture to configure the return code when creating the subscription."""
return HTTPStatus.OK
@pytest.fixture(name="list_topics_status")
def mock_list_topics_status() -> str:
"""Fixture to configure the return code when listing topics."""
return HTTPStatus.OK
@pytest.fixture(name="list_subscriptions_status")
def mock_list_subscriptions_status() -> str:
"""Fixture to configure the return code when listing subscriptions."""
return HTTPStatus.OK
@pytest.fixture(autouse=True)
def mock_pubsub_api_responses(
aioclient_mock: AiohttpClientMocker,
sdm_managed_topic: bool,
user_managed_topics: list[str],
subscriptions: list[tuple[str, str]],
device_access_project_id: str,
cloud_project_id: str,
create_subscription_status: HTTPStatus,
list_topics_status: HTTPStatus,
list_subscriptions_status: HTTPStatus,
) -> None:
"""Configure a server response for an SDM managed Pub/Sub topic.
We check for a topic created by the SDM Device Access Console (but note we don't have permission to read it)
or the user has created one themselves in the Google Cloud Project.
"""
aioclient_mock.get(
f"https://pubsub.googleapis.com/v1/projects/sdm-prod/topics/enterprise-{device_access_project_id}",
status=HTTPStatus.FORBIDDEN if sdm_managed_topic else HTTPStatus.NOT_FOUND,
)
aioclient_mock.get(
f"https://pubsub.googleapis.com/v1/projects/{cloud_project_id}/topics",
json={
"topics": [
{
"name": topic_name,
}
for topic_name in user_managed_topics or ()
]
},
status=list_topics_status,
)
# We check for a topic created by the SDM Device Access Console (but note we don't have permission to read it)
# or the user has created one themselves in the Google Cloud Project.
aioclient_mock.get(
f"https://pubsub.googleapis.com/v1/projects/{cloud_project_id}/subscriptions",
json={
"subscriptions": [
{
"name": subscription_name,
"topic": topic,
"pushConfig": {},
"ackDeadlineSeconds": 10,
"messageRetentionDuration": "604800s",
"expirationPolicy": {"ttl": "2678400s"},
"state": "ACTIVE",
}
for (subscription_name, topic) in subscriptions or ()
]
},
status=list_subscriptions_status,
)
aioclient_mock.put(
f"https://pubsub.googleapis.com/v1/projects/{cloud_project_id}/subscriptions/home-assistant-{RAND_SUBSCRIBER_SUFFIX}",
json={},
status=create_subscription_status,
)
@pytest.mark.parametrize(("sdm_managed_topic"), [(True)])
async def test_app_credentials(
hass: HomeAssistant,
oauth,
subscriber,
) -> None:
"""Check full flow."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await oauth.async_app_creds_flow(result)
oauth.async_mock_refresh()
result = await oauth.async_configure(result, None)
entry = await oauth.async_complete_pubsub_flow(
result, selected_topic=f"projects/sdm-prod/topics/enterprise-{PROJECT_ID}"
)
data = dict(entry.data)
assert "token" in data
data["token"].pop("expires_in")
data["token"].pop("expires_at")
assert data == {
"sdm": {},
"auth_implementation": "imported-cred",
"cloud_project_id": CLOUD_PROJECT_ID,
"project_id": PROJECT_ID,
"subscription_name": f"projects/{CLOUD_PROJECT_ID}/subscriptions/home-assistant-{RAND_SUBSCRIBER_SUFFIX}",
"topic_name": f"projects/sdm-prod/topics/enterprise-{PROJECT_ID}",
"token": {
"refresh_token": "mock-refresh-token",
"access_token": "mock-access-token",
"type": "Bearer",
},
}
@pytest.mark.parametrize(
("sdm_managed_topic", "device_access_project_id", "cloud_project_id"),
[(True, "new-project-id", "new-cloud-project-id")],
)
async def test_config_flow_restart(hass: HomeAssistant, oauth, subscriber) -> None:
"""Check with auth implementation is re-initialized when aborting the flow."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await oauth.async_app_creds_flow(result)
oauth.async_mock_refresh()
# At this point, we should have a valid auth implementation configured.
# Simulate aborting the flow and starting over to ensure we get prompted
# again to configure everything.
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "cloud_project"
# Change the values to show they are reflected below
result = await oauth.async_configure(
result, {"cloud_project_id": "new-cloud-project-id"}
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "device_project"
result = await oauth.async_configure(result, {"project_id": "new-project-id"})
await oauth.async_oauth_web_flow(result, "new-project-id")
oauth.async_mock_refresh()
result = await oauth.async_configure(result, {"code": "1234"})
entry = await oauth.async_complete_pubsub_flow(
result, selected_topic="projects/sdm-prod/topics/enterprise-new-project-id"
)
data = dict(entry.data)
assert "token" in data
data["token"].pop("expires_in")
data["token"].pop("expires_at")
assert data == {
"sdm": {},
"auth_implementation": "imported-cred",
"cloud_project_id": "new-cloud-project-id",
"project_id": "new-project-id",
"subscription_name": "projects/new-cloud-project-id/subscriptions/home-assistant-ABCDEF",
"topic_name": "projects/sdm-prod/topics/enterprise-new-project-id",
"token": {
"refresh_token": "mock-refresh-token",
"access_token": "mock-access-token",
"type": "Bearer",
},
}
@pytest.mark.parametrize(("sdm_managed_topic"), [(True)])
async def test_config_flow_wrong_project_id(
hass: HomeAssistant,
oauth,
subscriber,
) -> None:
"""Check the case where the wrong project ids are entered."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "cloud_project"
result = await oauth.async_configure(result, {"cloud_project_id": CLOUD_PROJECT_ID})
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "device_project"
# Enter the cloud project id instead of device access project id (really we just check
# they are the same value which is never correct)
result = await oauth.async_configure(result, {"project_id": CLOUD_PROJECT_ID})
assert result["type"] is FlowResultType.FORM
assert "errors" in result
assert "project_id" in result["errors"]
assert result["errors"]["project_id"] == "wrong_project_id"
# Fix with a correct value and complete the rest of the flow
result = await oauth.async_configure(result, {"project_id": PROJECT_ID})
await oauth.async_oauth_web_flow(result)
await hass.async_block_till_done()
oauth.async_mock_refresh()
result = await oauth.async_configure(result, {"code": "1234"})
entry = await oauth.async_complete_pubsub_flow(
result, selected_topic="projects/sdm-prod/topics/enterprise-some-project-id"
)
data = dict(entry.data)
assert "token" in data
data["token"].pop("expires_in")
data["token"].pop("expires_at")
assert data == {
"sdm": {},
"auth_implementation": "imported-cred",
"cloud_project_id": CLOUD_PROJECT_ID,
"project_id": PROJECT_ID,
"subscription_name": "projects/cloud-id-9876/subscriptions/home-assistant-ABCDEF",
"topic_name": "projects/sdm-prod/topics/enterprise-some-project-id",
"token": {
"refresh_token": "mock-refresh-token",
"access_token": "mock-access-token",
"type": "Bearer",
},
}
@pytest.mark.parametrize(
("sdm_managed_topic", "create_subscription_status"), [(True, HTTPStatus.NOT_FOUND)]
)
async def test_config_flow_pubsub_configuration_error(
hass: HomeAssistant,
oauth,
mock_subscriber,
) -> None:
"""Check full flow fails with configuration error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await oauth.async_app_creds_flow(result)
oauth.async_mock_refresh()
result = await oauth.async_configure(result, {"code": "1234"})
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_topic"
assert result.get("data_schema")({}) == {
"topic_name": f"projects/sdm-prod/topics/enterprise-{PROJECT_ID}",
}
# Select Pub/Sub topic the show available subscriptions (none)
result = await oauth.async_configure(
result,
{
"topic_name": f"projects/sdm-prod/topics/enterprise-{PROJECT_ID}",
},
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_subscription"
assert result.get("data_schema")({}) == {
"subscription_name": "create_new_subscription",
}
# Failure when creating the subscription
result = await oauth.async_configure(
result,
{
"subscription_name": "create_new_subscription",
},
)
assert result.get("type") is FlowResultType.FORM
assert result.get("errors") == {"base": "pubsub_api_error"}
@pytest.mark.parametrize(
("sdm_managed_topic", "create_subscription_status"),
[(True, HTTPStatus.INTERNAL_SERVER_ERROR)],
)
async def test_config_flow_pubsub_subscriber_error(
hass: HomeAssistant, oauth, mock_subscriber
) -> None:
"""Check full flow with a subscriber error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await oauth.async_app_creds_flow(result)
oauth.async_mock_refresh()
result = await oauth.async_configure(result, {"code": "1234"})
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_topic"
assert result.get("data_schema")({}) == {
"topic_name": f"projects/sdm-prod/topics/enterprise-{PROJECT_ID}",
}
# Select Pub/Sub topic the show available subscriptions (none)
result = await oauth.async_configure(
result,
{
"topic_name": f"projects/sdm-prod/topics/enterprise-{PROJECT_ID}",
},
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_subscription"
assert result.get("data_schema")({}) == {
"subscription_name": "create_new_subscription",
}
# Failure when creating the subscription
result = await oauth.async_configure(
result,
{
"subscription_name": "create_new_subscription",
},
)
assert result.get("type") is FlowResultType.FORM
assert result.get("errors") == {"base": "pubsub_api_error"}
@pytest.mark.parametrize(
("nest_test_config", "sdm_managed_topic", "device_access_project_id"),
[(TEST_CONFIG_APP_CREDS, True, "project-id-2")],
)
async def test_multiple_config_entries(
hass: HomeAssistant, oauth, setup_platform
) -> None:
"""Verify config flow can be started when existing config entry exists."""
await setup_platform()
entries = hass.config_entries.async_entries(DOMAIN)
assert len(entries) == 1
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await oauth.async_app_creds_flow(result, project_id="project-id-2")
oauth.async_mock_refresh()
result = await oauth.async_configure(result, user_input={})
entry = await oauth.async_complete_pubsub_flow(
result, selected_topic="projects/sdm-prod/topics/enterprise-project-id-2"
)
assert entry.title == "Mock Title"
assert "token" in entry.data
entries = hass.config_entries.async_entries(DOMAIN)
assert len(entries) == 2
@pytest.mark.parametrize(
("nest_test_config", "sdm_managed_topic"), [(TEST_CONFIG_APP_CREDS, True)]
)
async def test_duplicate_config_entries(
hass: HomeAssistant, oauth, setup_platform
) -> None:
"""Verify that config entries must be for unique projects."""
await setup_platform()
entries = hass.config_entries.async_entries(DOMAIN)
assert len(entries) == 1
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "cloud_project"
result = await oauth.async_configure(result, {"cloud_project_id": CLOUD_PROJECT_ID})
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "device_project"
result = await oauth.async_configure(result, {"project_id": PROJECT_ID})
assert result.get("type") is FlowResultType.ABORT
assert result.get("reason") == "already_configured"
@pytest.mark.parametrize(
("nest_test_config", "sdm_managed_topic"), [(TEST_CONFIG_APP_CREDS, True)]
)
async def test_reauth_multiple_config_entries(
hass: HomeAssistant, oauth, setup_platform, config_entry
) -> None:
"""Test Nest reauthentication with multiple existing config entries."""
await setup_platform()
old_entry = MockConfigEntry(
domain=DOMAIN,
data={
**config_entry.data,
"extra_data": True,
},
)
old_entry.add_to_hass(hass)
entries = hass.config_entries.async_entries(DOMAIN)
assert len(entries) == 2
orig_subscriber_id = config_entry.data.get("subscriber_id")
# Invoke the reauth flow
result = await oauth.async_reauth(config_entry)
await oauth.async_oauth_web_flow(result)
oauth.async_mock_refresh()
await oauth.async_finish_setup(result)
# Only reauth entry was updated, the other entry is preserved
entries = hass.config_entries.async_entries(DOMAIN)
assert len(entries) == 2
entry = entries[0]
assert entry.unique_id == PROJECT_ID
entry.data["token"].pop("expires_at")
assert entry.data["token"] == {
"refresh_token": "mock-refresh-token",
"access_token": "mock-access-token",
"type": "Bearer",
"expires_in": 60,
}
assert entry.data.get("subscriber_id") == orig_subscriber_id # Not updated
assert not entry.data.get("extra_data")
# Other entry was not refreshed
entry = entries[1]
entry.data["token"].pop("expires_at")
assert entry.data.get("token", {}).get("access_token") == "some-token"
assert entry.data.get("extra_data")
@pytest.mark.parametrize(("sdm_managed_topic"), [(True)])
async def test_pubsub_subscription_strip_whitespace(
hass: HomeAssistant, oauth, subscriber
) -> None:
"""Check that project id has whitespace stripped on entry."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await oauth.async_app_creds_flow(
result, cloud_project_id=" " + CLOUD_PROJECT_ID + " "
)
oauth.async_mock_refresh()
result = await oauth.async_configure(result, {"code": "1234"})
entry = await oauth.async_complete_pubsub_flow(
result, selected_topic="projects/sdm-prod/topics/enterprise-some-project-id"
)
assert entry.title == "Import from configuration.yaml"
assert "token" in entry.data
entry.data["token"].pop("expires_at")
assert entry.unique_id == PROJECT_ID
assert entry.data["token"] == {
"refresh_token": "mock-refresh-token",
"access_token": "mock-access-token",
"type": "Bearer",
"expires_in": 60,
}
assert "subscription_name" in entry.data
assert entry.data["cloud_project_id"] == CLOUD_PROJECT_ID
@pytest.mark.parametrize(
("sdm_managed_topic", "create_subscription_status"),
[(True, HTTPStatus.UNAUTHORIZED)],
)
async def test_pubsub_subscription_auth_failure(
hass: HomeAssistant, oauth, mock_subscriber
) -> None:
"""Check flow that creates a pub/sub subscription."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await oauth.async_app_creds_flow(result)
oauth.async_mock_refresh()
result = await oauth.async_configure(result, {"code": "1234"})
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_topic"
assert result.get("data_schema")({}) == {
"topic_name": f"projects/sdm-prod/topics/enterprise-{PROJECT_ID}",
}
# Select Pub/Sub topic the show available subscriptions (none)
result = await oauth.async_configure(
result,
{
"topic_name": f"projects/sdm-prod/topics/enterprise-{PROJECT_ID}",
},
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_subscription"
assert result.get("data_schema")({}) == {
"subscription_name": "create_new_subscription",
}
# Failure when creating the subscription
result = await oauth.async_configure(
result,
{
"subscription_name": "create_new_subscription",
},
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_subscription"
assert result.get("errors") == {"base": "pubsub_api_error"}
@pytest.mark.parametrize(
("nest_test_config", "sdm_managed_topic"), [(TEST_CONFIG_APP_CREDS, True)]
)
async def test_pubsub_subscriber_config_entry_reauth(
hass: HomeAssistant,
oauth,
setup_platform,
subscriber,
config_entry,
) -> None:
"""Test the pubsub subscriber id is preserved during reauth."""
await setup_platform()
result = await oauth.async_reauth(config_entry)
await oauth.async_oauth_web_flow(result)
oauth.async_mock_refresh()
# Entering an updated access token refreshes the config entry.
entry = await oauth.async_finish_setup(result, {"code": "1234"})
entry.data["token"].pop("expires_at")
assert entry.unique_id == PROJECT_ID
assert entry.data["token"] == {
"refresh_token": "mock-refresh-token",
"access_token": "mock-access-token",
"type": "Bearer",
"expires_in": 60,
}
assert entry.data["auth_implementation"] == "imported-cred"
assert entry.data["subscriber_id"] == SUBSCRIBER_ID
assert entry.data["cloud_project_id"] == CLOUD_PROJECT_ID
@pytest.mark.parametrize(("sdm_managed_topic"), [(True)])
async def test_config_entry_title_from_home(
hass: HomeAssistant, oauth, subscriber
) -> None:
"""Test that the Google Home name is used for the config entry title."""
device_manager = await subscriber.async_get_device_manager()
device_manager.add_structure(
Structure.MakeStructure(
{
"name": f"enterprise/{PROJECT_ID}/structures/some-structure-id",
"traits": {
"sdm.structures.traits.Info": {
"customName": "Example Home",
},
},
}
)
)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await oauth.async_app_creds_flow(result)
oauth.async_mock_refresh()
result = await oauth.async_configure(result, {"code": "1234"})
entry = await oauth.async_complete_pubsub_flow(
result, selected_topic=f"projects/sdm-prod/topics/enterprise-{PROJECT_ID}"
)
assert entry.title == "Example Home"
assert "token" in entry.data
assert entry.data.get("cloud_project_id") == CLOUD_PROJECT_ID
assert (
entry.data.get("subscription_name")
== f"projects/{CLOUD_PROJECT_ID}/subscriptions/home-assistant-{RAND_SUBSCRIBER_SUFFIX}"
)
assert (
entry.data.get("topic_name")
== f"projects/sdm-prod/topics/enterprise-{PROJECT_ID}"
)
@pytest.mark.parametrize(("sdm_managed_topic"), [(True)])
async def test_config_entry_title_multiple_homes(
hass: HomeAssistant, oauth, subscriber
) -> None:
"""Test handling of multiple Google Homes authorized."""
device_manager = await subscriber.async_get_device_manager()
device_manager.add_structure(
Structure.MakeStructure(
{
"name": f"enterprise/{PROJECT_ID}/structures/id-1",
"traits": {
"sdm.structures.traits.Info": {
"customName": "Example Home #1",
},
},
}
)
)
device_manager.add_structure(
Structure.MakeStructure(
{
"name": f"enterprise/{PROJECT_ID}/structures/id-2",
"traits": {
"sdm.structures.traits.Info": {
"customName": "Example Home #2",
},
},
}
)
)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await oauth.async_app_creds_flow(result)
oauth.async_mock_refresh()
result = await oauth.async_configure(result, {"code": "1234"})
entry = await oauth.async_complete_pubsub_flow(
result, selected_topic=f"projects/sdm-prod/topics/enterprise-{PROJECT_ID}"
)
assert entry.title == "Example Home #1, Example Home #2"
@pytest.mark.parametrize(("sdm_managed_topic"), [(True)])
async def test_title_failure_fallback(
hass: HomeAssistant, oauth, mock_subscriber
) -> None:
"""Test exception handling when determining the structure names."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await oauth.async_app_creds_flow(result)
oauth.async_mock_refresh()
mock_subscriber.async_get_device_manager.side_effect = AuthException()
result = await oauth.async_configure(result, {"code": "1234"})
entry = await oauth.async_complete_pubsub_flow(
result, selected_topic=f"projects/sdm-prod/topics/enterprise-{PROJECT_ID}"
)
assert entry.title == "Import from configuration.yaml"
assert "token" in entry.data
assert entry.data.get("cloud_project_id") == CLOUD_PROJECT_ID
assert (
entry.data.get("subscription_name")
== f"projects/{CLOUD_PROJECT_ID}/subscriptions/home-assistant-{RAND_SUBSCRIBER_SUFFIX}"
)
assert (
entry.data.get("topic_name")
== f"projects/sdm-prod/topics/enterprise-{PROJECT_ID}"
)
@pytest.mark.parametrize(("sdm_managed_topic"), [(True)])
async def test_structure_missing_trait(hass: HomeAssistant, oauth, subscriber) -> None:
"""Test handling the case where a structure has no name set."""
device_manager = await subscriber.async_get_device_manager()
device_manager.add_structure(
Structure.MakeStructure(
{
"name": f"enterprise/{PROJECT_ID}/structures/id-1",
# Missing Info trait
"traits": {},
}
)
)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await oauth.async_app_creds_flow(result)
oauth.async_mock_refresh()
result = await oauth.async_configure(result, {"code": "1234"})
entry = await oauth.async_complete_pubsub_flow(
result, selected_topic=f"projects/sdm-prod/topics/enterprise-{PROJECT_ID}"
)
# Fallback to default name
assert entry.title == "Import from configuration.yaml"
@pytest.mark.parametrize("nest_test_config", [NestTestConfig()])
async def test_dhcp_discovery(
hass: HomeAssistant, oauth: OAuthFixture, nest_test_config: NestTestConfig
) -> None:
"""Exercise discovery dhcp starts the config flow and kicks user to frontend creds flow."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_DHCP},
data=FAKE_DHCP_DATA,
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "create_cloud_project"
result = await oauth.async_configure(result, {})
assert result.get("type") is FlowResultType.ABORT
assert result.get("reason") == "missing_credentials"
@pytest.mark.parametrize(("sdm_managed_topic"), [(True)])
async def test_dhcp_discovery_with_creds(
hass: HomeAssistant,
oauth,
subscriber,
) -> None:
"""Exercise discovery dhcp with no config present (can't run)."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_DHCP},
data=FAKE_DHCP_DATA,
)
await hass.async_block_till_done()
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "cloud_project"
result = await oauth.async_configure(result, {"cloud_project_id": CLOUD_PROJECT_ID})
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "device_project"
result = await oauth.async_configure(result, {"project_id": PROJECT_ID})
await oauth.async_oauth_web_flow(result)
oauth.async_mock_refresh()
result = await oauth.async_configure(result, {"code": "1234"})
entry = await oauth.async_complete_pubsub_flow(
result, selected_topic=f"projects/sdm-prod/topics/enterprise-{PROJECT_ID}"
)
data = dict(entry.data)
assert "token" in data
data["token"].pop("expires_in")
data["token"].pop("expires_at")
assert data == {
"sdm": {},
"auth_implementation": "imported-cred",
"cloud_project_id": CLOUD_PROJECT_ID,
"project_id": PROJECT_ID,
"subscription_name": f"projects/{CLOUD_PROJECT_ID}/subscriptions/home-assistant-{RAND_SUBSCRIBER_SUFFIX}",
"topic_name": f"projects/sdm-prod/topics/enterprise-{PROJECT_ID}",
"token": {
"refresh_token": "mock-refresh-token",
"access_token": "mock-access-token",
"type": "Bearer",
},
}
@pytest.mark.parametrize(
("status_code", "error_reason"),
[
(HTTPStatus.UNAUTHORIZED, "oauth_unauthorized"),
(HTTPStatus.NOT_FOUND, "oauth_failed"),
(HTTPStatus.INTERNAL_SERVER_ERROR, "oauth_failed"),
],
)
async def test_token_error(
hass: HomeAssistant,
oauth: OAuthFixture,
subscriber: FakeSubscriber,
status_code: HTTPStatus,
error_reason: str,
) -> None:
"""Check full flow."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await oauth.async_app_creds_flow(result)
oauth.aioclient_mock.post(
OAUTH2_TOKEN,
status=status_code,
)
result = await oauth.async_configure(result, user_input=None)
assert result.get("type") is FlowResultType.ABORT
assert result.get("reason") == error_reason
@pytest.mark.parametrize(
("user_managed_topics", "subscriptions"),
[
(
[f"projects/{CLOUD_PROJECT_ID}/topics/some-topic-id"],
[
(
f"projects/{CLOUD_PROJECT_ID}/subscriptions/some-subscription-id",
f"projects/{CLOUD_PROJECT_ID}/topics/some-topic-id",
)
],
)
],
)
async def test_existing_topic_and_subscription(
hass: HomeAssistant,
oauth,
subscriber,
) -> None:
"""Test selecting existing user managed topic and subscription."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await oauth.async_app_creds_flow(result)
oauth.async_mock_refresh()
result = await oauth.async_configure(result, None)
entry = await oauth.async_complete_pubsub_flow(
result,
selected_topic=f"projects/{CLOUD_PROJECT_ID}/topics/some-topic-id",
selected_subscription=f"projects/{CLOUD_PROJECT_ID}/subscriptions/some-subscription-id",
)
data = dict(entry.data)
assert "token" in data
data["token"].pop("expires_in")
data["token"].pop("expires_at")
assert data == {
"sdm": {},
"auth_implementation": "imported-cred",
"cloud_project_id": CLOUD_PROJECT_ID,
"project_id": PROJECT_ID,
"subscription_name": f"projects/{CLOUD_PROJECT_ID}/subscriptions/some-subscription-id",
"subscriber_id_imported": True,
"topic_name": f"projects/{CLOUD_PROJECT_ID}/topics/some-topic-id",
"token": {
"refresh_token": "mock-refresh-token",
"access_token": "mock-access-token",
"type": "Bearer",
},
}
async def test_no_eligible_topics(
hass: HomeAssistant,
oauth,
subscriber,
) -> None:
"""Test the case where there are no eligible pub/sub topics."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await oauth.async_app_creds_flow(result)
oauth.async_mock_refresh()
result = await oauth.async_configure(result, None)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub"
assert result.get("errors") == {"base": "no_pubsub_topics"}
@pytest.mark.parametrize(
("list_topics_status"),
[
(HTTPStatus.INTERNAL_SERVER_ERROR),
],
)
async def test_list_topics_failure(
hass: HomeAssistant,
oauth,
subscriber,
) -> None:
"""Test selecting existing user managed topic and subscription."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await oauth.async_app_creds_flow(result)
oauth.async_mock_refresh()
result = await oauth.async_configure(result, None)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub"
assert result.get("errors") == {"base": "pubsub_api_error"}
@pytest.mark.parametrize(
("sdm_managed_topic", "list_subscriptions_status"),
[
(True, HTTPStatus.INTERNAL_SERVER_ERROR),
],
)
async def test_list_subscriptions_failure(
hass: HomeAssistant,
oauth,
subscriber,
) -> None:
"""Test selecting existing user managed topic and subscription."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await oauth.async_app_creds_flow(result)
oauth.async_mock_refresh()
result = await oauth.async_configure(result, None)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_topic"
assert not result.get("errors")
# Select Pub/Sub topic the show available subscriptions (none)
result = await oauth.async_configure(
result,
{
"topic_name": f"projects/sdm-prod/topics/enterprise-{PROJECT_ID}",
},
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_subscription"
assert result.get("errors") == {"base": "pubsub_api_error"}