core/homeassistant/util/loop.py

203 lines
7.4 KiB
Python

"""asyncio loop utilities."""
from __future__ import annotations
from collections.abc import Callable
import functools
from functools import cache
import linecache
import logging
import threading
import traceback
from typing import Any
from homeassistant.core import async_get_hass_or_none
from homeassistant.helpers.frame import (
MissingIntegrationFrame,
get_current_frame,
get_integration_frame,
)
from homeassistant.loader import async_suggest_report_issue
_LOGGER = logging.getLogger(__name__)
def _get_line_from_cache(filename: str, lineno: int) -> str:
"""Get line from cache or read from file."""
return (linecache.getline(filename, lineno) or "?").strip()
# Set of previously reported blocking calls
# (integration, filename, lineno)
_PREVIOUSLY_REPORTED: set[tuple[str | None, str, int | Any]] = set()
def raise_for_blocking_call(
func: Callable[..., Any],
check_allowed: Callable[[dict[str, Any]], bool] | None = None,
strict: bool = True,
strict_core: bool = True,
**mapped_args: Any,
) -> None:
"""Warn if called inside the event loop. Raise if `strict` is True."""
if check_allowed is not None and check_allowed(mapped_args):
return
found_frame = None
offender_frame = get_current_frame(2)
offender_filename = offender_frame.f_code.co_filename
offender_lineno = offender_frame.f_lineno
offender_line = _get_line_from_cache(offender_filename, offender_lineno)
report_key: tuple[str | None, str, int | Any]
try:
integration_frame = get_integration_frame()
except MissingIntegrationFrame:
# Did not source from integration? Hard error.
report_key = (None, offender_filename, offender_lineno)
was_reported = report_key in _PREVIOUSLY_REPORTED
_PREVIOUSLY_REPORTED.add(report_key)
if not strict_core:
if was_reported:
_LOGGER.debug(
"Detected blocking call to %s with args %s in %s, "
"line %s: %s inside the event loop; "
"This is causing stability issues. "
"Please create a bug report at "
"https://github.com/home-assistant/core/issues?q=is%%3Aopen+is%%3Aissue\n"
"%s\n",
func.__name__,
mapped_args.get("args"),
offender_filename,
offender_lineno,
offender_line,
_dev_help_message(func.__name__),
)
else:
_LOGGER.warning(
"Detected blocking call to %s with args %s in %s, "
"line %s: %s inside the event loop; "
"This is causing stability issues. "
"Please create a bug report at "
"https://github.com/home-assistant/core/issues?q=is%%3Aopen+is%%3Aissue\n"
"%s\n"
"Traceback (most recent call last):\n%s",
func.__name__,
mapped_args.get("args"),
offender_filename,
offender_lineno,
offender_line,
_dev_help_message(func.__name__),
"".join(traceback.format_stack(f=offender_frame)),
)
return
if found_frame is None:
raise RuntimeError( # noqa: TRY200
f"Caught blocking call to {func.__name__} with args {mapped_args.get("args")} "
f"in {offender_filename}, line {offender_lineno}: {offender_line} "
"inside the event loop; "
"This is causing stability issues. "
"Please create a bug report at "
"https://github.com/home-assistant/core/issues?q=is%3Aopen+is%3Aissue\n"
f"{_dev_help_message(func.__name__)}"
)
report_key = (integration_frame.integration, offender_filename, offender_lineno)
was_reported = report_key in _PREVIOUSLY_REPORTED
_PREVIOUSLY_REPORTED.add(report_key)
report_issue = async_suggest_report_issue(
async_get_hass_or_none(),
integration_domain=integration_frame.integration,
module=integration_frame.module,
)
if was_reported:
_LOGGER.debug(
"Detected blocking call to %s with args %s "
"inside the event loop by %sintegration '%s' "
"at %s, line %s: %s (offender: %s, line %s: %s), please %s\n"
"%s\n",
func.__name__,
mapped_args.get("args"),
"custom " if integration_frame.custom_integration else "",
integration_frame.integration,
integration_frame.relative_filename,
integration_frame.line_number,
integration_frame.line,
offender_filename,
offender_lineno,
offender_line,
report_issue,
_dev_help_message(func.__name__),
)
else:
_LOGGER.warning(
"Detected blocking call to %s with args %s "
"inside the event loop by %sintegration '%s' "
"at %s, line %s: %s (offender: %s, line %s: %s), please %s\n"
"%s\n"
"Traceback (most recent call last):\n%s",
func.__name__,
mapped_args.get("args"),
"custom " if integration_frame.custom_integration else "",
integration_frame.integration,
integration_frame.relative_filename,
integration_frame.line_number,
integration_frame.line,
offender_filename,
offender_lineno,
offender_line,
report_issue,
_dev_help_message(func.__name__),
"".join(traceback.format_stack(f=integration_frame.frame)),
)
if strict:
raise RuntimeError(
f"Caught blocking call to {func.__name__} with args "
f"{mapped_args.get('args')} inside the event loop by "
f"{'custom ' if integration_frame.custom_integration else ''}"
f"integration '{integration_frame.integration}' at "
f"{integration_frame.relative_filename}, line {integration_frame.line_number}:"
f" {integration_frame.line}. (offender: {offender_filename}, line "
f"{offender_lineno}: {offender_line}), please {report_issue}\n"
f"{_dev_help_message(func.__name__)}"
)
@cache
def _dev_help_message(what: str) -> str:
"""Generate help message to guide developers."""
return (
"For developers, please see "
"https://developers.home-assistant.io/docs/asyncio_blocking_operations/"
f"#{what.replace('.', '')}"
)
def protect_loop[**_P, _R](
func: Callable[_P, _R],
loop_thread_id: int,
strict: bool = True,
strict_core: bool = True,
check_allowed: Callable[[dict[str, Any]], bool] | None = None,
) -> Callable[_P, _R]:
"""Protect function from running in event loop."""
@functools.wraps(func)
def protected_loop_func(*args: _P.args, **kwargs: _P.kwargs) -> _R:
if threading.get_ident() == loop_thread_id:
raise_for_blocking_call(
func,
strict=strict,
strict_core=strict_core,
check_allowed=check_allowed,
args=args,
kwargs=kwargs,
)
return func(*args, **kwargs)
return protected_loop_func