mirror of https://github.com/home-assistant/core
183 lines
5.7 KiB
Python
183 lines
5.7 KiB
Python
"""Debounce helper."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from collections.abc import Callable
|
|
from logging import Logger
|
|
|
|
from homeassistant.core import HassJob, HomeAssistant, callback
|
|
|
|
|
|
class Debouncer[_R_co]:
|
|
"""Class to rate limit calls to a specific command."""
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
logger: Logger,
|
|
*,
|
|
cooldown: float,
|
|
immediate: bool,
|
|
function: Callable[[], _R_co] | None = None,
|
|
background: bool = False,
|
|
) -> None:
|
|
"""Initialize debounce.
|
|
|
|
immediate: indicate if the function needs to be called right away and
|
|
wait <cooldown> until executing next invocation.
|
|
function: optional and can be instantiated later.
|
|
"""
|
|
self.hass = hass
|
|
self.logger = logger
|
|
self._function = function
|
|
self.cooldown = cooldown
|
|
self.immediate = immediate
|
|
self._timer_task: asyncio.TimerHandle | None = None
|
|
self._execute_at_end_of_timer: bool = False
|
|
self._execute_lock = asyncio.Lock()
|
|
self._background = background
|
|
self._job: HassJob[[], _R_co] | None = (
|
|
None
|
|
if function is None
|
|
else HassJob(
|
|
function, f"debouncer cooldown={cooldown}, immediate={immediate}"
|
|
)
|
|
)
|
|
self._shutdown_requested = False
|
|
|
|
@property
|
|
def function(self) -> Callable[[], _R_co] | None:
|
|
"""Return the function being wrapped by the Debouncer."""
|
|
return self._function
|
|
|
|
@function.setter
|
|
def function(self, function: Callable[[], _R_co]) -> None:
|
|
"""Update the function being wrapped by the Debouncer."""
|
|
self._function = function
|
|
if self._job is None or function != self._job.target:
|
|
self._job = HassJob(
|
|
function,
|
|
f"debouncer cooldown={self.cooldown}, immediate={self.immediate}",
|
|
)
|
|
|
|
@callback
|
|
def async_schedule_call(self) -> None:
|
|
"""Schedule a call to the function."""
|
|
if self._async_schedule_or_call_now():
|
|
self._execute_at_end_of_timer = True
|
|
self._on_debounce()
|
|
|
|
def _async_schedule_or_call_now(self) -> bool:
|
|
"""Check if a call should be scheduled.
|
|
|
|
Returns True if the function should be called immediately.
|
|
|
|
Returns False if there is nothing to do.
|
|
"""
|
|
if self._shutdown_requested:
|
|
self.logger.debug("Debouncer call ignored as shutdown has been requested.")
|
|
return False
|
|
|
|
if self._timer_task:
|
|
if not self._execute_at_end_of_timer:
|
|
self._execute_at_end_of_timer = True
|
|
|
|
return False
|
|
|
|
# Locked means a call is in progress. Any call is good, so abort.
|
|
if self._execute_lock.locked():
|
|
return False
|
|
|
|
if not self.immediate:
|
|
self._execute_at_end_of_timer = True
|
|
self._schedule_timer()
|
|
return False
|
|
|
|
return True
|
|
|
|
async def async_call(self) -> None:
|
|
"""Call the function."""
|
|
if not self._async_schedule_or_call_now():
|
|
return
|
|
|
|
async with self._execute_lock:
|
|
# Abort if timer got set while we're waiting for the lock.
|
|
if self._timer_task:
|
|
return
|
|
|
|
assert self._job is not None
|
|
try:
|
|
if task := self.hass.async_run_hass_job(
|
|
self._job, background=self._background
|
|
):
|
|
await task
|
|
finally:
|
|
self._schedule_timer()
|
|
|
|
async def _handle_timer_finish(self) -> None:
|
|
"""Handle a finished timer."""
|
|
assert self._job is not None
|
|
|
|
self._execute_at_end_of_timer = False
|
|
|
|
# Locked means a call is in progress. Any call is good, so abort.
|
|
if self._execute_lock.locked():
|
|
return
|
|
|
|
async with self._execute_lock:
|
|
# Abort if timer got set while we're waiting for the lock.
|
|
if self._timer_task:
|
|
return
|
|
|
|
try:
|
|
if task := self.hass.async_run_hass_job(
|
|
self._job, background=self._background
|
|
):
|
|
await task
|
|
except Exception:
|
|
self.logger.exception("Unexpected exception from %s", self.function)
|
|
finally:
|
|
# Schedule a new timer to prevent new runs during cooldown
|
|
self._schedule_timer()
|
|
|
|
@callback
|
|
def async_shutdown(self) -> None:
|
|
"""Cancel any scheduled call, and prevent new runs."""
|
|
self._shutdown_requested = True
|
|
self.async_cancel()
|
|
|
|
@callback
|
|
def async_cancel(self) -> None:
|
|
"""Cancel any scheduled call."""
|
|
if self._timer_task:
|
|
self._timer_task.cancel()
|
|
self._timer_task = None
|
|
|
|
self._execute_at_end_of_timer = False
|
|
|
|
@callback
|
|
def _on_debounce(self) -> None:
|
|
"""Create job task, but only if pending."""
|
|
self._timer_task = None
|
|
if not self._execute_at_end_of_timer:
|
|
return
|
|
self._execute_at_end_of_timer = False
|
|
name = f"debouncer {self._job} finish cooldown={self.cooldown}, immediate={self.immediate}"
|
|
if not self._background:
|
|
self.hass.async_create_task(
|
|
self._handle_timer_finish(), name, eager_start=True
|
|
)
|
|
return
|
|
self.hass.async_create_background_task(
|
|
self._handle_timer_finish(), name, eager_start=True
|
|
)
|
|
|
|
@callback
|
|
def _schedule_timer(self) -> None:
|
|
"""Schedule a timer."""
|
|
if not self._shutdown_requested:
|
|
self._timer_task = self.hass.loop.call_later(
|
|
self.cooldown, self._on_debounce
|
|
)
|