core/homeassistant/helpers/frame.py

377 lines
12 KiB
Python

"""Provide frame helper for finding the current frame context."""
from __future__ import annotations
import asyncio
from collections.abc import Callable
from dataclasses import dataclass
import enum
import functools
import linecache
import logging
import sys
from types import FrameType
from typing import Any, cast
from propcache import cached_property
from homeassistant.core import HomeAssistant, async_get_hass_or_none
from homeassistant.exceptions import HomeAssistantError
from homeassistant.loader import (
Integration,
async_get_issue_integration,
async_suggest_report_issue,
)
_LOGGER = logging.getLogger(__name__)
# Keep track of integrations already reported to prevent flooding
_REPORTED_INTEGRATIONS: set[str] = set()
@dataclass(kw_only=True)
class IntegrationFrame:
"""Integration frame container."""
custom_integration: bool
integration: str
module: str | None
relative_filename: str
frame: FrameType
@cached_property
def line_number(self) -> int:
"""Return the line number of the frame."""
return self.frame.f_lineno
@cached_property
def filename(self) -> str:
"""Return the filename of the frame."""
return self.frame.f_code.co_filename
@cached_property
def line(self) -> str:
"""Return the line of the frame."""
return (linecache.getline(self.filename, self.line_number) or "?").strip()
def get_integration_logger(fallback_name: str) -> logging.Logger:
"""Return a logger by checking the current integration frame.
If Python is unable to access the sources files, the call stack frame
will be missing information, so let's guard by requiring a fallback name.
https://github.com/home-assistant/core/issues/24982
"""
try:
integration_frame = get_integration_frame()
except MissingIntegrationFrame:
return logging.getLogger(fallback_name)
if integration_frame.custom_integration:
logger_name = f"custom_components.{integration_frame.integration}"
else:
logger_name = f"homeassistant.components.{integration_frame.integration}"
return logging.getLogger(logger_name)
def get_current_frame(depth: int = 0) -> FrameType:
"""Return the current frame."""
# Add one to depth since get_current_frame is included
return sys._getframe(depth + 1) # noqa: SLF001
def get_integration_frame(exclude_integrations: set | None = None) -> IntegrationFrame:
"""Return the frame, integration and integration path of the current stack frame."""
found_frame = None
if not exclude_integrations:
exclude_integrations = set()
frame: FrameType | None = get_current_frame()
while frame is not None:
filename = frame.f_code.co_filename
for path in ("custom_components/", "homeassistant/components/"):
try:
index = filename.index(path)
start = index + len(path)
end = filename.index("/", start)
integration = filename[start:end]
if integration not in exclude_integrations:
found_frame = frame
break
except ValueError:
continue
if found_frame is not None:
break
frame = frame.f_back
if found_frame is None:
raise MissingIntegrationFrame
found_module: str | None = None
for module, module_obj in dict(sys.modules).items():
if not hasattr(module_obj, "__file__"):
continue
if module_obj.__file__ == found_frame.f_code.co_filename:
found_module = module
break
return IntegrationFrame(
custom_integration=path == "custom_components/",
integration=integration,
module=found_module,
relative_filename=found_frame.f_code.co_filename[index:],
frame=found_frame,
)
class MissingIntegrationFrame(HomeAssistantError):
"""Raised when no integration is found in the frame."""
def report(
what: str,
*,
exclude_integrations: set[str] | None = None,
error_if_core: bool = True,
error_if_integration: bool = False,
level: int = logging.WARNING,
log_custom_component_only: bool = False,
) -> None:
"""Report incorrect usage.
If error_if_core is True, raise instead of log if an integration is not found
when unwinding the stack frame.
If error_if_integration is True, raise instead of log if an integration is found
when unwinding the stack frame.
"""
core_behavior = ReportBehavior.ERROR if error_if_core else ReportBehavior.LOG
core_integration_behavior = (
ReportBehavior.ERROR if error_if_integration else ReportBehavior.LOG
)
custom_integration_behavior = core_integration_behavior
if log_custom_component_only:
if core_behavior is ReportBehavior.LOG:
core_behavior = ReportBehavior.IGNORE
if core_integration_behavior is ReportBehavior.LOG:
core_integration_behavior = ReportBehavior.IGNORE
report_usage(
what,
core_behavior=core_behavior,
core_integration_behavior=core_integration_behavior,
custom_integration_behavior=custom_integration_behavior,
exclude_integrations=exclude_integrations,
level=level,
)
class ReportBehavior(enum.Enum):
"""Enum for behavior on code usage."""
IGNORE = enum.auto()
"""Ignore the code usage."""
LOG = enum.auto()
"""Log the code usage."""
ERROR = enum.auto()
"""Raise an error on code usage."""
def report_usage(
what: str,
*,
breaks_in_ha_version: str | None = None,
core_behavior: ReportBehavior = ReportBehavior.ERROR,
core_integration_behavior: ReportBehavior = ReportBehavior.LOG,
custom_integration_behavior: ReportBehavior = ReportBehavior.LOG,
exclude_integrations: set[str] | None = None,
integration_domain: str | None = None,
level: int = logging.WARNING,
) -> None:
"""Report incorrect code usage.
:param what: will be wrapped with "Detected that integration 'integration' {what}.
Please create a bug report at https://..."
:param breaks_in_ha_version: if set, the report will be adjusted to specify the
breaking version
:param exclude_integrations: skip specified integration when reviewing the stack.
If no integration is found, the core behavior will be applied
:param integration_domain: fallback for identifying the integration if the
frame is not found
"""
try:
integration_frame = get_integration_frame(
exclude_integrations=exclude_integrations
)
except MissingIntegrationFrame as err:
if integration := async_get_issue_integration(
hass := async_get_hass_or_none(), integration_domain
):
_report_integration_domain(
hass,
what,
breaks_in_ha_version,
integration,
core_integration_behavior,
custom_integration_behavior,
level,
)
return
msg = f"Detected code that {what}. Please report this issue"
if core_behavior is ReportBehavior.ERROR:
raise RuntimeError(msg) from err
if core_behavior is ReportBehavior.LOG:
if breaks_in_ha_version:
msg = (
f"Detected code that {what}. This will stop working in Home "
f"Assistant {breaks_in_ha_version}, please report this issue"
)
_LOGGER.warning(msg, stack_info=True)
return
integration_behavior = core_integration_behavior
if integration_frame.custom_integration:
integration_behavior = custom_integration_behavior
if integration_behavior is not ReportBehavior.IGNORE:
_report_integration_frame(
what,
breaks_in_ha_version,
integration_frame,
level,
integration_behavior is ReportBehavior.ERROR,
)
def _report_integration_domain(
hass: HomeAssistant | None,
what: str,
breaks_in_ha_version: str | None,
integration: Integration,
core_integration_behavior: ReportBehavior,
custom_integration_behavior: ReportBehavior,
level: int,
) -> None:
"""Report incorrect usage in an integration (identified via domain).
Async friendly.
"""
integration_behavior = core_integration_behavior
if not integration.is_built_in:
integration_behavior = custom_integration_behavior
if integration_behavior is ReportBehavior.IGNORE:
return
# Keep track of integrations already reported to prevent flooding
key = f"{integration.domain}:{what}"
if (
integration_behavior is not ReportBehavior.ERROR
and key in _REPORTED_INTEGRATIONS
):
return
_REPORTED_INTEGRATIONS.add(key)
report_issue = async_suggest_report_issue(hass, integration=integration)
integration_type = "" if integration.is_built_in else "custom "
_LOGGER.log(
level,
"Detected that %sintegration '%s' %s. %s %s",
integration_type,
integration.domain,
what,
f"This will stop working in Home Assistant {breaks_in_ha_version}, please"
if breaks_in_ha_version
else "Please",
report_issue,
)
if integration_behavior is ReportBehavior.ERROR:
raise RuntimeError(
f"Detected that {integration_type}integration "
f"'{integration.domain}' {what}. Please {report_issue}"
)
def _report_integration_frame(
what: str,
breaks_in_ha_version: str | None,
integration_frame: IntegrationFrame,
level: int = logging.WARNING,
error: bool = False,
) -> None:
"""Report incorrect usage in an integration (identified via frame).
Async friendly.
"""
# Keep track of integrations already reported to prevent flooding
key = f"{integration_frame.filename}:{integration_frame.line_number}"
if not error and key in _REPORTED_INTEGRATIONS:
return
_REPORTED_INTEGRATIONS.add(key)
report_issue = async_suggest_report_issue(
async_get_hass_or_none(),
integration_domain=integration_frame.integration,
module=integration_frame.module,
)
integration_type = "custom " if integration_frame.custom_integration else ""
_LOGGER.log(
level,
"Detected that %sintegration '%s' %s at %s, line %s: %s. %s %s",
integration_type,
integration_frame.integration,
what,
integration_frame.relative_filename,
integration_frame.line_number,
integration_frame.line,
f"This will stop working in Home Assistant {breaks_in_ha_version}, please"
if breaks_in_ha_version
else "Please",
report_issue,
)
if not error:
return
raise RuntimeError(
f"Detected that {integration_type}integration "
f"'{integration_frame.integration}' {what} at "
f"{integration_frame.relative_filename}, line "
f"{integration_frame.line_number}: {integration_frame.line}. "
f"Please {report_issue}"
)
def warn_use[_CallableT: Callable](func: _CallableT, what: str) -> _CallableT:
"""Mock a function to warn when it was about to be used."""
if asyncio.iscoroutinefunction(func):
@functools.wraps(func)
async def report_use(*args: Any, **kwargs: Any) -> None:
report(what)
else:
@functools.wraps(func)
def report_use(*args: Any, **kwargs: Any) -> None:
report(what)
return cast(_CallableT, report_use)
def report_non_thread_safe_operation(what: str) -> None:
"""Report a non-thread safe operation."""
report(
f"calls {what} from a thread other than the event loop, "
"which may cause Home Assistant to crash or data to corrupt. "
"For more information, see "
"https://developers.home-assistant.io/docs/asyncio_thread_safety/"
f"#{what.replace('.', '')}",
error_if_core=True,
error_if_integration=True,
)