core/homeassistant/components/lastfm/config_flow.py

213 lines
6.8 KiB
Python

"""Config flow for LastFm."""
from __future__ import annotations
from typing import Any
from pylast import LastFMNetwork, PyLastError, User, WSError
import voluptuous as vol
from homeassistant.config_entries import (
ConfigEntry,
ConfigFlow,
ConfigFlowResult,
OptionsFlow,
)
from homeassistant.const import CONF_API_KEY
from homeassistant.core import callback
from homeassistant.helpers.selector import (
SelectOptionDict,
SelectSelector,
SelectSelectorConfig,
)
from .const import CONF_MAIN_USER, CONF_USERS, DOMAIN
PLACEHOLDERS = {"api_account_url": "https://www.last.fm/api/account/create"}
CONFIG_SCHEMA: vol.Schema = vol.Schema(
{
vol.Required(CONF_API_KEY): str,
vol.Required(CONF_MAIN_USER): str,
}
)
def get_lastfm_user(api_key: str, username: str) -> tuple[User, dict[str, str]]:
"""Get and validate lastFM User."""
user = LastFMNetwork(api_key=api_key).get_user(username)
errors = {}
try:
user.get_playcount()
except WSError as error:
if error.details == "User not found":
errors["base"] = "invalid_account"
elif (
error.details
== "Invalid API key - You must be granted a valid key by last.fm"
):
errors["base"] = "invalid_auth"
else:
errors["base"] = "unknown"
except Exception: # noqa: BLE001
errors["base"] = "unknown"
return user, errors
def validate_lastfm_users(
api_key: str, usernames: list[str]
) -> tuple[list[str], dict[str, str]]:
"""Validate list of users. Return tuple of valid users and errors."""
valid_users = []
errors = {}
for username in usernames:
_, lastfm_errors = get_lastfm_user(api_key, username)
if lastfm_errors:
errors = lastfm_errors
else:
valid_users.append(username)
return valid_users, errors
class LastFmConfigFlowHandler(ConfigFlow, domain=DOMAIN):
"""Config flow handler for LastFm."""
data: dict[str, Any] = {}
@staticmethod
@callback
def async_get_options_flow(
config_entry: ConfigEntry,
) -> LastFmOptionsFlowHandler:
"""Get the options flow for this handler."""
return LastFmOptionsFlowHandler()
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Initialize user input."""
errors: dict[str, str] = {}
if user_input is not None:
self.data = user_input.copy()
_, errors = get_lastfm_user(
self.data[CONF_API_KEY], self.data[CONF_MAIN_USER]
)
if not errors:
return await self.async_step_friends()
return self.async_show_form(
step_id="user",
errors=errors,
description_placeholders=PLACEHOLDERS,
data_schema=self.add_suggested_values_to_schema(CONFIG_SCHEMA, user_input),
)
async def async_step_friends(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Form to select other users and friends."""
errors: dict[str, str] = {}
if user_input is not None:
users, errors = validate_lastfm_users(
self.data[CONF_API_KEY], user_input[CONF_USERS]
)
user_input[CONF_USERS] = users
if not errors:
return self.async_create_entry(
title="LastFM",
data={},
options={
CONF_API_KEY: self.data[CONF_API_KEY],
CONF_MAIN_USER: self.data[CONF_MAIN_USER],
CONF_USERS: [
self.data[CONF_MAIN_USER],
*user_input[CONF_USERS],
],
},
)
try:
main_user, _ = get_lastfm_user(
self.data[CONF_API_KEY], self.data[CONF_MAIN_USER]
)
friends_response = await self.hass.async_add_executor_job(
main_user.get_friends
)
friends = [
SelectOptionDict(value=friend.name, label=friend.get_name(True))
for friend in friends_response
]
except PyLastError:
friends = []
return self.async_show_form(
step_id="friends",
errors=errors,
data_schema=self.add_suggested_values_to_schema(
vol.Schema(
{
vol.Required(CONF_USERS): SelectSelector(
SelectSelectorConfig(
options=friends, custom_value=True, multiple=True
)
),
}
),
user_input or {CONF_USERS: []},
),
)
class LastFmOptionsFlowHandler(OptionsFlow):
"""LastFm Options flow handler."""
async def async_step_init(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Initialize form."""
errors: dict[str, str] = {}
options = self.config_entry.options
if user_input is not None:
users, errors = validate_lastfm_users(
options[CONF_API_KEY], user_input[CONF_USERS]
)
user_input[CONF_USERS] = users
if not errors:
return self.async_create_entry(
title="LastFM",
data={
**options,
CONF_USERS: user_input[CONF_USERS],
},
)
if options[CONF_MAIN_USER]:
try:
main_user, _ = get_lastfm_user(
options[CONF_API_KEY],
options[CONF_MAIN_USER],
)
friends_response = await self.hass.async_add_executor_job(
main_user.get_friends
)
friends = [
SelectOptionDict(value=friend.name, label=friend.get_name(True))
for friend in friends_response
]
except PyLastError:
friends = []
else:
friends = []
return self.async_show_form(
step_id="init",
errors=errors,
data_schema=self.add_suggested_values_to_schema(
vol.Schema(
{
vol.Required(CONF_USERS): SelectSelector(
SelectSelectorConfig(
options=friends, custom_value=True, multiple=True
)
),
}
),
user_input or options,
),
)