mirror of https://github.com/home-assistant/core
245 lines
7.5 KiB
Python
245 lines
7.5 KiB
Python
"""Authentication for HTTP component."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Awaitable, Callable
|
|
from datetime import timedelta
|
|
from ipaddress import ip_address
|
|
import logging
|
|
import secrets
|
|
import time
|
|
from typing import Any, Final
|
|
|
|
from aiohttp import hdrs
|
|
from aiohttp.web import Application, Request, StreamResponse, middleware
|
|
import jwt
|
|
from jwt import api_jws
|
|
from yarl import URL
|
|
|
|
from homeassistant.auth import jwt_wrapper
|
|
from homeassistant.auth.const import GROUP_ID_READ_ONLY
|
|
from homeassistant.auth.models import User
|
|
from homeassistant.components import websocket_api
|
|
from homeassistant.core import HomeAssistant, callback
|
|
from homeassistant.helpers.http import current_request
|
|
from homeassistant.helpers.json import json_bytes
|
|
from homeassistant.helpers.network import is_cloud_connection
|
|
from homeassistant.helpers.storage import Store
|
|
from homeassistant.util.network import is_local
|
|
|
|
from .const import KEY_AUTHENTICATED, KEY_HASS_REFRESH_TOKEN_ID, KEY_HASS_USER
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
DATA_API_PASSWORD: Final = "api_password"
|
|
DATA_SIGN_SECRET: Final = "http.auth.sign_secret"
|
|
SIGN_QUERY_PARAM: Final = "authSig"
|
|
SAFE_QUERY_PARAMS: Final = frozenset(("height", "width"))
|
|
|
|
STORAGE_VERSION = 1
|
|
STORAGE_KEY = "http.auth"
|
|
CONTENT_USER_NAME = "Home Assistant Content"
|
|
|
|
|
|
@callback
|
|
def async_sign_path(
|
|
hass: HomeAssistant,
|
|
path: str,
|
|
expiration: timedelta,
|
|
*,
|
|
refresh_token_id: str | None = None,
|
|
use_content_user: bool = False,
|
|
) -> str:
|
|
"""Sign a path for temporary access without auth header."""
|
|
if (secret := hass.data.get(DATA_SIGN_SECRET)) is None:
|
|
secret = hass.data[DATA_SIGN_SECRET] = secrets.token_hex()
|
|
|
|
if refresh_token_id is None:
|
|
if use_content_user:
|
|
refresh_token_id = hass.data[STORAGE_KEY]
|
|
elif connection := websocket_api.current_connection.get():
|
|
refresh_token_id = connection.refresh_token_id
|
|
elif (
|
|
request := current_request.get()
|
|
) and KEY_HASS_REFRESH_TOKEN_ID in request:
|
|
refresh_token_id = request[KEY_HASS_REFRESH_TOKEN_ID]
|
|
else:
|
|
refresh_token_id = hass.data[STORAGE_KEY]
|
|
|
|
url = URL(path)
|
|
now_timestamp = int(time.time())
|
|
expiration_timestamp = now_timestamp + int(expiration.total_seconds())
|
|
params = [itm for itm in url.query.items() if itm[0] not in SAFE_QUERY_PARAMS]
|
|
json_payload = json_bytes(
|
|
{
|
|
"iss": refresh_token_id,
|
|
"path": url.path,
|
|
"params": params,
|
|
"iat": now_timestamp,
|
|
"exp": expiration_timestamp,
|
|
}
|
|
)
|
|
encoded = api_jws.encode(json_payload, secret, "HS256")
|
|
params.append((SIGN_QUERY_PARAM, encoded))
|
|
url = url.with_query(params)
|
|
return f"{url.path}?{url.query_string}"
|
|
|
|
|
|
@callback
|
|
def async_user_not_allowed_do_auth(
|
|
hass: HomeAssistant, user: User, request: Request | None = None
|
|
) -> str | None:
|
|
"""Validate that user is not allowed to do auth things."""
|
|
if not user.is_active:
|
|
return "User is not active"
|
|
|
|
if not user.local_only:
|
|
return None
|
|
|
|
# User is marked as local only, check if they are allowed to do auth
|
|
if request is None:
|
|
request = current_request.get()
|
|
|
|
if not request:
|
|
return "No request available to validate local access"
|
|
|
|
if is_cloud_connection(hass):
|
|
return "User is local only"
|
|
|
|
try:
|
|
remote_address = ip_address(request.remote) # type: ignore[arg-type]
|
|
except ValueError:
|
|
return "Invalid remote IP"
|
|
|
|
if is_local(remote_address):
|
|
return None
|
|
|
|
return "User cannot authenticate remotely"
|
|
|
|
|
|
async def async_setup_auth(
|
|
hass: HomeAssistant,
|
|
app: Application,
|
|
) -> None:
|
|
"""Create auth middleware for the app."""
|
|
store = Store[dict[str, Any]](hass, STORAGE_VERSION, STORAGE_KEY)
|
|
if (data := await store.async_load()) is None:
|
|
data = {}
|
|
|
|
refresh_token = None
|
|
if "content_user" in data:
|
|
user = await hass.auth.async_get_user(data["content_user"])
|
|
if user and user.refresh_tokens:
|
|
refresh_token = list(user.refresh_tokens.values())[0]
|
|
|
|
if refresh_token is None:
|
|
user = await hass.auth.async_create_system_user(
|
|
CONTENT_USER_NAME, group_ids=[GROUP_ID_READ_ONLY]
|
|
)
|
|
refresh_token = await hass.auth.async_create_refresh_token(user)
|
|
data["content_user"] = user.id
|
|
await store.async_save(data)
|
|
|
|
hass.data[STORAGE_KEY] = refresh_token.id
|
|
|
|
@callback
|
|
def async_validate_auth_header(request: Request) -> bool:
|
|
"""Test authorization header against access token.
|
|
|
|
Basic auth_type is legacy code, should be removed with api_password.
|
|
"""
|
|
try:
|
|
auth_type, auth_val = request.headers.get(hdrs.AUTHORIZATION, "").split(
|
|
" ", 1
|
|
)
|
|
except ValueError:
|
|
# If no space in authorization header
|
|
return False
|
|
|
|
if auth_type != "Bearer":
|
|
return False
|
|
|
|
refresh_token = hass.auth.async_validate_access_token(auth_val)
|
|
|
|
if refresh_token is None:
|
|
return False
|
|
|
|
if async_user_not_allowed_do_auth(hass, refresh_token.user, request):
|
|
return False
|
|
|
|
request[KEY_HASS_USER] = refresh_token.user
|
|
request[KEY_HASS_REFRESH_TOKEN_ID] = refresh_token.id
|
|
return True
|
|
|
|
@callback
|
|
def async_validate_signed_request(request: Request) -> bool:
|
|
"""Validate a signed request."""
|
|
if (secret := hass.data.get(DATA_SIGN_SECRET)) is None:
|
|
return False
|
|
|
|
if (signature := request.query.get(SIGN_QUERY_PARAM)) is None:
|
|
return False
|
|
|
|
try:
|
|
claims = jwt_wrapper.verify_and_decode(
|
|
signature, secret, algorithms=["HS256"], options={"verify_iss": False}
|
|
)
|
|
except jwt.InvalidTokenError:
|
|
return False
|
|
|
|
if claims["path"] != request.path:
|
|
return False
|
|
|
|
params = [
|
|
list(itm) # claims stores tuples as lists
|
|
for itm in request.query.items()
|
|
if itm[0] not in SAFE_QUERY_PARAMS and itm[0] != SIGN_QUERY_PARAM
|
|
]
|
|
if claims["params"] != params:
|
|
return False
|
|
|
|
refresh_token = hass.auth.async_get_refresh_token(claims["iss"])
|
|
|
|
if refresh_token is None:
|
|
return False
|
|
|
|
request[KEY_HASS_USER] = refresh_token.user
|
|
request[KEY_HASS_REFRESH_TOKEN_ID] = refresh_token.id
|
|
return True
|
|
|
|
@middleware
|
|
async def auth_middleware(
|
|
request: Request, handler: Callable[[Request], Awaitable[StreamResponse]]
|
|
) -> StreamResponse:
|
|
"""Authenticate as middleware."""
|
|
authenticated = False
|
|
|
|
if hdrs.AUTHORIZATION in request.headers and async_validate_auth_header(
|
|
request
|
|
):
|
|
authenticated = True
|
|
auth_type = "bearer token"
|
|
|
|
# We first start with a string check to avoid parsing query params
|
|
# for every request.
|
|
elif (
|
|
request.method == "GET"
|
|
and SIGN_QUERY_PARAM in request.query_string
|
|
and async_validate_signed_request(request)
|
|
):
|
|
authenticated = True
|
|
auth_type = "signed request"
|
|
|
|
if authenticated and _LOGGER.isEnabledFor(logging.DEBUG):
|
|
_LOGGER.debug(
|
|
"Authenticated %s for %s using %s",
|
|
request.remote,
|
|
request.path,
|
|
auth_type,
|
|
)
|
|
|
|
request[KEY_AUTHENTICATED] = authenticated
|
|
return await handler(request)
|
|
|
|
app.middlewares.append(auth_middleware)
|