mirror of https://github.com/home-assistant/core
735 lines
24 KiB
Python
735 lines
24 KiB
Python
"""Component to allow users to login and get tokens.
|
|
|
|
# POST /auth/token
|
|
|
|
This is an OAuth2 endpoint for granting tokens. We currently support the grant
|
|
types "authorization_code" and "refresh_token". Because we follow the OAuth2
|
|
spec, data should be send in formatted as x-www-form-urlencoded. Examples will
|
|
be in JSON as it's more readable.
|
|
|
|
## Grant type authorization_code
|
|
|
|
Exchange the authorization code retrieved from the login flow for tokens.
|
|
|
|
{
|
|
"client_id": "https://hassbian.local:8123/",
|
|
"grant_type": "authorization_code",
|
|
"code": "411ee2f916e648d691e937ae9344681e"
|
|
}
|
|
|
|
Return value will be the access and refresh tokens. The access token will have
|
|
a limited expiration. New access tokens can be requested using the refresh
|
|
token. The value ha_auth_provider will contain the auth provider type that was
|
|
used to authorize the refresh token.
|
|
|
|
{
|
|
"access_token": "ABCDEFGH",
|
|
"expires_in": 1800,
|
|
"refresh_token": "IJKLMNOPQRST",
|
|
"token_type": "Bearer",
|
|
"ha_auth_provider": "homeassistant"
|
|
}
|
|
|
|
## Grant type refresh_token
|
|
|
|
Request a new access token using a refresh token.
|
|
|
|
{
|
|
"client_id": "https://hassbian.local:8123/",
|
|
"grant_type": "refresh_token",
|
|
"refresh_token": "IJKLMNOPQRST"
|
|
}
|
|
|
|
Return value will be a new access token. The access token will have
|
|
a limited expiration.
|
|
|
|
{
|
|
"access_token": "ABCDEFGH",
|
|
"expires_in": 1800,
|
|
"token_type": "Bearer"
|
|
}
|
|
|
|
## Revoking a refresh token
|
|
|
|
It is also possible to revoke a refresh token and all access tokens that have
|
|
ever been granted by that refresh token. Response code will ALWAYS be 200.
|
|
|
|
{
|
|
"token": "IJKLMNOPQRST",
|
|
"action": "revoke"
|
|
}
|
|
|
|
# Websocket API
|
|
|
|
## Get current user
|
|
|
|
Send websocket command `auth/current_user` will return current user of the
|
|
active websocket connection.
|
|
|
|
{
|
|
"id": 10,
|
|
"type": "auth/current_user",
|
|
}
|
|
|
|
The result payload likes
|
|
|
|
{
|
|
"id": 10,
|
|
"type": "result",
|
|
"success": true,
|
|
"result": {
|
|
"id": "USER_ID",
|
|
"name": "John Doe",
|
|
"is_owner": true,
|
|
"credentials": [{
|
|
"auth_provider_type": "homeassistant",
|
|
"auth_provider_id": null
|
|
}],
|
|
"mfa_modules": [{
|
|
"id": "totp",
|
|
"name": "TOTP",
|
|
"enabled": true
|
|
}]
|
|
}
|
|
}
|
|
|
|
## Create a long-lived access token
|
|
|
|
Send websocket command `auth/long_lived_access_token` will create
|
|
a long-lived access token for current user. Access token will not be saved in
|
|
Home Assistant. User need to record the token in secure place.
|
|
|
|
{
|
|
"id": 11,
|
|
"type": "auth/long_lived_access_token",
|
|
"client_name": "GPS Logger",
|
|
"lifespan": 365
|
|
}
|
|
|
|
Result will be a long-lived access token:
|
|
|
|
{
|
|
"id": 11,
|
|
"type": "result",
|
|
"success": true,
|
|
"result": "ABCDEFGH"
|
|
}
|
|
|
|
|
|
# POST /auth/external/callback
|
|
|
|
This is an endpoint for OAuth2 Authorization callbacks used by integrations
|
|
that link accounts with other cloud providers using LocalOAuth2Implementation
|
|
as part of a config flow.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from collections.abc import Callable
|
|
from datetime import datetime, timedelta
|
|
from http import HTTPStatus
|
|
from logging import getLogger
|
|
from typing import Any, cast
|
|
import uuid
|
|
|
|
from aiohttp import web
|
|
from multidict import MultiDictProxy
|
|
import voluptuous as vol
|
|
|
|
from homeassistant.auth import InvalidAuthError
|
|
from homeassistant.auth.models import (
|
|
TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
|
|
Credentials,
|
|
RefreshToken,
|
|
User,
|
|
)
|
|
from homeassistant.components import websocket_api
|
|
from homeassistant.components.http import KEY_HASS
|
|
from homeassistant.components.http.auth import (
|
|
async_sign_path,
|
|
async_user_not_allowed_do_auth,
|
|
)
|
|
from homeassistant.components.http.ban import log_invalid_auth
|
|
from homeassistant.components.http.data_validator import RequestDataValidator
|
|
from homeassistant.components.http.view import HomeAssistantView
|
|
from homeassistant.core import HomeAssistant, callback
|
|
from homeassistant.helpers import config_validation as cv
|
|
from homeassistant.helpers.config_entry_oauth2_flow import OAuth2AuthorizeCallbackView
|
|
from homeassistant.helpers.typing import ConfigType
|
|
from homeassistant.loader import bind_hass
|
|
from homeassistant.util import dt as dt_util
|
|
from homeassistant.util.hass_dict import HassKey
|
|
|
|
from . import indieauth, login_flow, mfa_setup_flow
|
|
|
|
DOMAIN = "auth"
|
|
|
|
type StoreResultType = Callable[[str, Credentials], str]
|
|
type RetrieveResultType = Callable[[str, str], Credentials | None]
|
|
DATA_STORE: HassKey[StoreResultType] = HassKey(DOMAIN)
|
|
CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
|
|
|
|
DELETE_CURRENT_TOKEN_DELAY = 2
|
|
|
|
|
|
@bind_hass
|
|
def create_auth_code(
|
|
hass: HomeAssistant, client_id: str, credential: Credentials
|
|
) -> str:
|
|
"""Create an authorization code to fetch tokens."""
|
|
return hass.data[DATA_STORE](client_id, credential)
|
|
|
|
|
|
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
|
"""Component to allow users to login."""
|
|
store_result, retrieve_result = _create_auth_code_store()
|
|
|
|
hass.data[DATA_STORE] = store_result
|
|
|
|
hass.http.register_view(TokenView(retrieve_result))
|
|
hass.http.register_view(RevokeTokenView())
|
|
hass.http.register_view(LinkUserView(retrieve_result))
|
|
hass.http.register_view(OAuth2AuthorizeCallbackView())
|
|
|
|
websocket_api.async_register_command(hass, websocket_current_user)
|
|
websocket_api.async_register_command(hass, websocket_create_long_lived_access_token)
|
|
websocket_api.async_register_command(hass, websocket_refresh_tokens)
|
|
websocket_api.async_register_command(hass, websocket_delete_refresh_token)
|
|
websocket_api.async_register_command(hass, websocket_delete_all_refresh_tokens)
|
|
websocket_api.async_register_command(hass, websocket_sign_path)
|
|
websocket_api.async_register_command(hass, websocket_refresh_token_set_expiry)
|
|
|
|
login_flow.async_setup(hass, store_result)
|
|
mfa_setup_flow.async_setup(hass)
|
|
|
|
return True
|
|
|
|
|
|
class RevokeTokenView(HomeAssistantView):
|
|
"""View to revoke tokens."""
|
|
|
|
url = "/auth/revoke"
|
|
name = "api:auth:revocation"
|
|
requires_auth = False
|
|
cors_allowed = True
|
|
|
|
async def post(self, request: web.Request) -> web.Response:
|
|
"""Revoke a token."""
|
|
hass = request.app[KEY_HASS]
|
|
data = cast(MultiDictProxy[str], await request.post())
|
|
|
|
# OAuth 2.0 Token Revocation [RFC7009]
|
|
# 2.2 The authorization server responds with HTTP status code 200
|
|
# if the token has been revoked successfully or if the client
|
|
# submitted an invalid token.
|
|
if (token := data.get("token")) is None:
|
|
return web.Response(status=HTTPStatus.OK)
|
|
|
|
refresh_token = hass.auth.async_get_refresh_token_by_token(token)
|
|
|
|
if refresh_token is None:
|
|
return web.Response(status=HTTPStatus.OK)
|
|
|
|
hass.auth.async_remove_refresh_token(refresh_token)
|
|
return web.Response(status=HTTPStatus.OK)
|
|
|
|
|
|
class TokenView(HomeAssistantView):
|
|
"""View to issue tokens."""
|
|
|
|
url = "/auth/token"
|
|
name = "api:auth:token"
|
|
requires_auth = False
|
|
cors_allowed = True
|
|
|
|
def __init__(self, retrieve_auth: RetrieveResultType) -> None:
|
|
"""Initialize the token view."""
|
|
self._retrieve_auth = retrieve_auth
|
|
|
|
@log_invalid_auth
|
|
async def post(self, request: web.Request) -> web.Response:
|
|
"""Grant a token."""
|
|
hass = request.app[KEY_HASS]
|
|
data = cast(MultiDictProxy[str], await request.post())
|
|
|
|
grant_type = data.get("grant_type")
|
|
|
|
# IndieAuth 6.3.5
|
|
# The revocation endpoint is the same as the token endpoint.
|
|
# The revocation request includes an additional parameter,
|
|
# action=revoke.
|
|
if data.get("action") == "revoke":
|
|
# action=revoke is deprecated. Use /auth/revoke instead.
|
|
# Keep here for backwards compat
|
|
return await RevokeTokenView.post(self, request) # type: ignore[arg-type]
|
|
|
|
if grant_type == "authorization_code":
|
|
return await self._async_handle_auth_code(hass, data, request)
|
|
|
|
if grant_type == "refresh_token":
|
|
return await self._async_handle_refresh_token(hass, data, request)
|
|
|
|
return self.json(
|
|
{"error": "unsupported_grant_type"}, status_code=HTTPStatus.BAD_REQUEST
|
|
)
|
|
|
|
async def _async_handle_auth_code(
|
|
self,
|
|
hass: HomeAssistant,
|
|
data: MultiDictProxy[str],
|
|
request: web.Request,
|
|
) -> web.Response:
|
|
"""Handle authorization code request."""
|
|
client_id = data.get("client_id")
|
|
if client_id is None or not indieauth.verify_client_id(client_id):
|
|
return self.json(
|
|
{"error": "invalid_request", "error_description": "Invalid client id"},
|
|
status_code=HTTPStatus.BAD_REQUEST,
|
|
)
|
|
|
|
if (code := data.get("code")) is None:
|
|
return self.json(
|
|
{"error": "invalid_request", "error_description": "Invalid code"},
|
|
status_code=HTTPStatus.BAD_REQUEST,
|
|
)
|
|
|
|
credential = self._retrieve_auth(client_id, code)
|
|
|
|
if credential is None or not isinstance(credential, Credentials):
|
|
return self.json(
|
|
{"error": "invalid_request", "error_description": "Invalid code"},
|
|
status_code=HTTPStatus.BAD_REQUEST,
|
|
)
|
|
|
|
user = await hass.auth.async_get_or_create_user(credential)
|
|
|
|
if user_access_error := async_user_not_allowed_do_auth(hass, user):
|
|
return self.json(
|
|
{
|
|
"error": "access_denied",
|
|
"error_description": user_access_error,
|
|
},
|
|
status_code=HTTPStatus.FORBIDDEN,
|
|
)
|
|
|
|
refresh_token = await hass.auth.async_create_refresh_token(
|
|
user, client_id, credential=credential
|
|
)
|
|
try:
|
|
access_token = hass.auth.async_create_access_token(
|
|
refresh_token, request.remote
|
|
)
|
|
except InvalidAuthError as exc:
|
|
return self.json(
|
|
{"error": "access_denied", "error_description": str(exc)},
|
|
status_code=HTTPStatus.FORBIDDEN,
|
|
)
|
|
|
|
return self.json(
|
|
{
|
|
"access_token": access_token,
|
|
"token_type": "Bearer",
|
|
"refresh_token": refresh_token.token,
|
|
"expires_in": int(
|
|
refresh_token.access_token_expiration.total_seconds()
|
|
),
|
|
"ha_auth_provider": credential.auth_provider_type,
|
|
},
|
|
headers={
|
|
"Cache-Control": "no-store",
|
|
"Pragma": "no-cache",
|
|
},
|
|
)
|
|
|
|
async def _async_handle_refresh_token(
|
|
self,
|
|
hass: HomeAssistant,
|
|
data: MultiDictProxy[str],
|
|
request: web.Request,
|
|
) -> web.Response:
|
|
"""Handle refresh token request."""
|
|
client_id = data.get("client_id")
|
|
if client_id is not None and not indieauth.verify_client_id(client_id):
|
|
return self.json(
|
|
{"error": "invalid_request", "error_description": "Invalid client id"},
|
|
status_code=HTTPStatus.BAD_REQUEST,
|
|
)
|
|
|
|
if (token := data.get("refresh_token")) is None:
|
|
return self.json(
|
|
{"error": "invalid_request"}, status_code=HTTPStatus.BAD_REQUEST
|
|
)
|
|
|
|
refresh_token = hass.auth.async_get_refresh_token_by_token(token)
|
|
|
|
if refresh_token is None:
|
|
return self.json(
|
|
{"error": "invalid_grant"}, status_code=HTTPStatus.BAD_REQUEST
|
|
)
|
|
|
|
if refresh_token.client_id != client_id:
|
|
return self.json(
|
|
{"error": "invalid_request"}, status_code=HTTPStatus.BAD_REQUEST
|
|
)
|
|
|
|
if user_access_error := async_user_not_allowed_do_auth(
|
|
hass, refresh_token.user
|
|
):
|
|
return self.json(
|
|
{
|
|
"error": "access_denied",
|
|
"error_description": user_access_error,
|
|
},
|
|
status_code=HTTPStatus.FORBIDDEN,
|
|
)
|
|
|
|
try:
|
|
access_token = hass.auth.async_create_access_token(
|
|
refresh_token, request.remote
|
|
)
|
|
except InvalidAuthError as exc:
|
|
return self.json(
|
|
{"error": "access_denied", "error_description": str(exc)},
|
|
status_code=HTTPStatus.FORBIDDEN,
|
|
)
|
|
|
|
return self.json(
|
|
{
|
|
"access_token": access_token,
|
|
"token_type": "Bearer",
|
|
"expires_in": int(
|
|
refresh_token.access_token_expiration.total_seconds()
|
|
),
|
|
},
|
|
headers={
|
|
"Cache-Control": "no-store",
|
|
"Pragma": "no-cache",
|
|
},
|
|
)
|
|
|
|
|
|
class LinkUserView(HomeAssistantView):
|
|
"""View to link existing users to new credentials."""
|
|
|
|
url = "/auth/link_user"
|
|
name = "api:auth:link_user"
|
|
|
|
def __init__(self, retrieve_credentials: RetrieveResultType) -> None:
|
|
"""Initialize the link user view."""
|
|
self._retrieve_credentials = retrieve_credentials
|
|
|
|
@RequestDataValidator(vol.Schema({"code": str, "client_id": str}))
|
|
async def post(self, request: web.Request, data: dict[str, Any]) -> web.Response:
|
|
"""Link a user."""
|
|
hass = request.app[KEY_HASS]
|
|
user: User = request["hass_user"]
|
|
|
|
credentials = self._retrieve_credentials(data["client_id"], data["code"])
|
|
|
|
if credentials is None:
|
|
return self.json_message("Invalid code", status_code=HTTPStatus.BAD_REQUEST)
|
|
|
|
linked_user = await hass.auth.async_get_user_by_credentials(credentials)
|
|
if linked_user != user and linked_user is not None:
|
|
return self.json_message(
|
|
"Credential already linked", status_code=HTTPStatus.BAD_REQUEST
|
|
)
|
|
|
|
# No-op if credential is already linked to the user it will be linked to
|
|
if linked_user != user:
|
|
await hass.auth.async_link_user(user, credentials)
|
|
return self.json_message("User linked")
|
|
|
|
|
|
@callback
|
|
def _create_auth_code_store() -> tuple[StoreResultType, RetrieveResultType]:
|
|
"""Create an in memory store."""
|
|
temp_results: dict[tuple[str, str], tuple[datetime, Credentials]] = {}
|
|
|
|
@callback
|
|
def store_result(client_id: str, result: Credentials) -> str:
|
|
"""Store flow result and return a code to retrieve it."""
|
|
if not isinstance(result, Credentials):
|
|
raise TypeError("result has to be a Credentials instance")
|
|
|
|
code = uuid.uuid4().hex
|
|
temp_results[(client_id, code)] = (
|
|
dt_util.utcnow(),
|
|
result,
|
|
)
|
|
return code
|
|
|
|
@callback
|
|
def retrieve_result(client_id: str, code: str) -> Credentials | None:
|
|
"""Retrieve flow result."""
|
|
key = (client_id, code)
|
|
|
|
if key not in temp_results:
|
|
return None
|
|
|
|
created, result = temp_results.pop(key)
|
|
|
|
# OAuth 4.2.1
|
|
# The authorization code MUST expire shortly after it is issued to
|
|
# mitigate the risk of leaks. A maximum authorization code lifetime of
|
|
# 10 minutes is RECOMMENDED.
|
|
if dt_util.utcnow() - created < timedelta(minutes=10):
|
|
return result
|
|
|
|
return None
|
|
|
|
return store_result, retrieve_result
|
|
|
|
|
|
@websocket_api.websocket_command({vol.Required("type"): "auth/current_user"})
|
|
@websocket_api.ws_require_user()
|
|
@websocket_api.async_response
|
|
async def websocket_current_user(
|
|
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
|
|
) -> None:
|
|
"""Return the current user."""
|
|
user = connection.user
|
|
enabled_modules = await hass.auth.async_get_enabled_mfa(user)
|
|
|
|
connection.send_message(
|
|
websocket_api.result_message(
|
|
msg["id"],
|
|
{
|
|
"id": user.id,
|
|
"name": user.name,
|
|
"is_owner": user.is_owner,
|
|
"is_admin": user.is_admin,
|
|
"credentials": [
|
|
{
|
|
"auth_provider_type": c.auth_provider_type,
|
|
"auth_provider_id": c.auth_provider_id,
|
|
}
|
|
for c in user.credentials
|
|
],
|
|
"mfa_modules": [
|
|
{
|
|
"id": module.id,
|
|
"name": module.name,
|
|
"enabled": module.id in enabled_modules,
|
|
}
|
|
for module in hass.auth.auth_mfa_modules
|
|
],
|
|
},
|
|
)
|
|
)
|
|
|
|
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "auth/long_lived_access_token",
|
|
vol.Required("lifespan"): int, # days
|
|
vol.Required("client_name"): str,
|
|
vol.Optional("client_icon"): str,
|
|
}
|
|
)
|
|
@websocket_api.ws_require_user()
|
|
@websocket_api.async_response
|
|
async def websocket_create_long_lived_access_token(
|
|
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
|
|
) -> None:
|
|
"""Create or a long-lived access token."""
|
|
refresh_token = await hass.auth.async_create_refresh_token(
|
|
connection.user,
|
|
client_name=msg["client_name"],
|
|
client_icon=msg.get("client_icon"),
|
|
token_type=TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
|
|
access_token_expiration=timedelta(days=msg["lifespan"]),
|
|
)
|
|
|
|
try:
|
|
access_token = hass.auth.async_create_access_token(refresh_token)
|
|
except InvalidAuthError as exc:
|
|
connection.send_error(msg["id"], websocket_api.ERR_UNAUTHORIZED, str(exc))
|
|
return
|
|
|
|
connection.send_result(msg["id"], access_token)
|
|
|
|
|
|
@websocket_api.websocket_command({vol.Required("type"): "auth/refresh_tokens"})
|
|
@websocket_api.ws_require_user()
|
|
@callback
|
|
def websocket_refresh_tokens(
|
|
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
|
|
) -> None:
|
|
"""Return metadata of users refresh tokens."""
|
|
current_id = connection.refresh_token_id
|
|
|
|
tokens: list[dict[str, Any]] = []
|
|
for refresh in connection.user.refresh_tokens.values():
|
|
if refresh.credential:
|
|
auth_provider_type = refresh.credential.auth_provider_type
|
|
else:
|
|
auth_provider_type = None
|
|
|
|
expire_at = None
|
|
if refresh.expire_at:
|
|
expire_at = dt_util.utc_from_timestamp(refresh.expire_at)
|
|
|
|
tokens.append(
|
|
{
|
|
"auth_provider_type": auth_provider_type,
|
|
"client_icon": refresh.client_icon,
|
|
"client_id": refresh.client_id,
|
|
"client_name": refresh.client_name,
|
|
"created_at": refresh.created_at,
|
|
"expire_at": expire_at,
|
|
"id": refresh.id,
|
|
"is_current": refresh.id == current_id,
|
|
"last_used_at": refresh.last_used_at,
|
|
"last_used_ip": refresh.last_used_ip,
|
|
"type": refresh.token_type,
|
|
}
|
|
)
|
|
|
|
connection.send_result(msg["id"], tokens)
|
|
|
|
|
|
@callback
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "auth/delete_refresh_token",
|
|
vol.Required("refresh_token_id"): str,
|
|
}
|
|
)
|
|
@websocket_api.ws_require_user()
|
|
def websocket_delete_refresh_token(
|
|
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
|
|
) -> None:
|
|
"""Handle a delete refresh token request."""
|
|
refresh_token = connection.user.refresh_tokens.get(msg["refresh_token_id"])
|
|
|
|
if refresh_token is None:
|
|
connection.send_error(msg["id"], "invalid_token_id", "Received invalid token")
|
|
return
|
|
|
|
hass.auth.async_remove_refresh_token(refresh_token)
|
|
|
|
connection.send_result(msg["id"], {})
|
|
|
|
|
|
@callback
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "auth/delete_all_refresh_tokens",
|
|
vol.Optional("token_type"): cv.string,
|
|
vol.Optional("delete_current_token", default=True): bool,
|
|
}
|
|
)
|
|
@websocket_api.ws_require_user()
|
|
def websocket_delete_all_refresh_tokens(
|
|
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
|
|
) -> None:
|
|
"""Handle delete all refresh tokens request."""
|
|
current_refresh_token: RefreshToken
|
|
remove_failed = False
|
|
token_type = msg.get("token_type")
|
|
delete_current_token = msg.get("delete_current_token")
|
|
limit_token_types = token_type is not None
|
|
|
|
for token in list(connection.user.refresh_tokens.values()):
|
|
if token.id == connection.refresh_token_id:
|
|
# Skip the current refresh token as it has revoke_callback,
|
|
# which cancels/closes the connection.
|
|
# It will be removed after sending the result.
|
|
current_refresh_token = token
|
|
continue
|
|
if limit_token_types and token_type != token.token_type:
|
|
continue
|
|
try:
|
|
hass.auth.async_remove_refresh_token(token)
|
|
except Exception:
|
|
getLogger(__name__).exception("Error during refresh token removal")
|
|
remove_failed = True
|
|
|
|
if remove_failed:
|
|
connection.send_error(
|
|
msg["id"], "token_removing_error", "During removal, an error was raised."
|
|
)
|
|
else:
|
|
connection.send_result(msg["id"], {})
|
|
|
|
async def _delete_current_token_soon() -> None:
|
|
"""Delete the current token after a delay.
|
|
|
|
We do not want to delete the current token immediately as it will
|
|
close the connection.
|
|
|
|
This is implemented as a tracked task to ensure the token
|
|
is still deleted if Home Assistant is shut down during
|
|
the delay.
|
|
|
|
It should not be refactored to use a call_later as that
|
|
would not be tracked and the token would not be deleted
|
|
if Home Assistant was shut down during the delay.
|
|
"""
|
|
try:
|
|
await asyncio.sleep(DELETE_CURRENT_TOKEN_DELAY)
|
|
finally:
|
|
# If the task is cancelled because we are shutting down, delete
|
|
# the token right away.
|
|
hass.auth.async_remove_refresh_token(current_refresh_token)
|
|
|
|
if delete_current_token and (
|
|
not limit_token_types or current_refresh_token.token_type == token_type
|
|
):
|
|
# Deleting the token will close the connection so we need
|
|
# to do it with a delay in a tracked task to ensure it still
|
|
# happens if Home Assistant is shutting down.
|
|
hass.async_create_task(_delete_current_token_soon())
|
|
|
|
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "auth/sign_path",
|
|
vol.Required("path"): str,
|
|
vol.Optional("expires", default=30): int,
|
|
}
|
|
)
|
|
@websocket_api.ws_require_user()
|
|
@callback
|
|
def websocket_sign_path(
|
|
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
|
|
) -> None:
|
|
"""Handle a sign path request."""
|
|
connection.send_message(
|
|
websocket_api.result_message(
|
|
msg["id"],
|
|
{
|
|
"path": async_sign_path(
|
|
hass,
|
|
msg["path"],
|
|
timedelta(seconds=msg["expires"]),
|
|
)
|
|
},
|
|
)
|
|
)
|
|
|
|
|
|
@callback
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "auth/refresh_token_set_expiry",
|
|
vol.Required("refresh_token_id"): str,
|
|
vol.Required("enable_expiry"): bool,
|
|
}
|
|
)
|
|
@websocket_api.ws_require_user()
|
|
def websocket_refresh_token_set_expiry(
|
|
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
|
|
) -> None:
|
|
"""Handle a set expiry of a refresh token request."""
|
|
refresh_token = connection.user.refresh_tokens.get(msg["refresh_token_id"])
|
|
|
|
if refresh_token is None:
|
|
connection.send_error(msg["id"], "invalid_token_id", "Received invalid token")
|
|
return
|
|
|
|
hass.auth.async_set_expiry(refresh_token, enable_expiry=msg["enable_expiry"])
|
|
connection.send_result(msg["id"], {})
|