mirror of https://github.com/home-assistant/core
97 lines
2.9 KiB
Python
97 lines
2.9 KiB
Python
"""Audio enhancement for Assist."""
|
|
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass
|
|
import logging
|
|
|
|
from pymicro_vad import MicroVad
|
|
from pyspeex_noise import AudioProcessor
|
|
|
|
from .const import BYTES_PER_CHUNK
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass(frozen=True, slots=True)
|
|
class EnhancedAudioChunk:
|
|
"""Enhanced audio chunk and metadata."""
|
|
|
|
audio: bytes
|
|
"""Raw PCM audio @ 16Khz with 16-bit mono samples"""
|
|
|
|
timestamp_ms: int
|
|
"""Timestamp relative to start of audio stream (milliseconds)"""
|
|
|
|
speech_probability: float | None
|
|
"""Probability that audio chunk contains speech (0-1), None if unknown"""
|
|
|
|
|
|
class AudioEnhancer(ABC):
|
|
"""Base class for audio enhancement."""
|
|
|
|
def __init__(
|
|
self, auto_gain: int, noise_suppression: int, is_vad_enabled: bool
|
|
) -> None:
|
|
"""Initialize audio enhancer."""
|
|
self.auto_gain = auto_gain
|
|
self.noise_suppression = noise_suppression
|
|
self.is_vad_enabled = is_vad_enabled
|
|
|
|
@abstractmethod
|
|
def enhance_chunk(self, audio: bytes, timestamp_ms: int) -> EnhancedAudioChunk:
|
|
"""Enhance chunk of PCM audio @ 16Khz with 16-bit mono samples."""
|
|
|
|
|
|
class MicroVadSpeexEnhancer(AudioEnhancer):
|
|
"""Audio enhancer that runs microVAD and speex."""
|
|
|
|
def __init__(
|
|
self, auto_gain: int, noise_suppression: int, is_vad_enabled: bool
|
|
) -> None:
|
|
"""Initialize audio enhancer."""
|
|
super().__init__(auto_gain, noise_suppression, is_vad_enabled)
|
|
|
|
self.audio_processor: AudioProcessor | None = None
|
|
|
|
# Scale from 0-4
|
|
self.noise_suppression = noise_suppression * -15
|
|
|
|
# Scale from 0-31
|
|
self.auto_gain = auto_gain * 300
|
|
|
|
if (self.auto_gain != 0) or (self.noise_suppression != 0):
|
|
self.audio_processor = AudioProcessor(
|
|
self.auto_gain, self.noise_suppression
|
|
)
|
|
_LOGGER.debug(
|
|
"Initialized speex with auto_gain=%s, noise_suppression=%s",
|
|
self.auto_gain,
|
|
self.noise_suppression,
|
|
)
|
|
|
|
self.vad: MicroVad | None = None
|
|
|
|
if self.is_vad_enabled:
|
|
self.vad = MicroVad()
|
|
_LOGGER.debug("Initialized microVAD")
|
|
|
|
def enhance_chunk(self, audio: bytes, timestamp_ms: int) -> EnhancedAudioChunk:
|
|
"""Enhance 10ms chunk of PCM audio @ 16Khz with 16-bit mono samples."""
|
|
speech_probability: float | None = None
|
|
|
|
assert len(audio) == BYTES_PER_CHUNK
|
|
|
|
if self.vad is not None:
|
|
# Run VAD
|
|
speech_probability = self.vad.Process10ms(audio)
|
|
|
|
if self.audio_processor is not None:
|
|
# Run noise suppression and auto gain
|
|
audio = self.audio_processor.Process10ms(audio).audio
|
|
|
|
return EnhancedAudioChunk(
|
|
audio=audio,
|
|
timestamp_ms=timestamp_ms,
|
|
speech_probability=speech_probability,
|
|
)
|