diff --git a/eff_word_net/engine.py b/eff_word_net/engine.py index d25f444..e8d0118 100755 --- a/eff_word_net/engine.py +++ b/eff_word_net/engine.py @@ -97,57 +97,34 @@ def scoreVector(self,inp_vec:np.array) -> float : self.__last_activation_time = current_time return score - + + ## EDIT 2025.03.10 + ## RMS calculation ADD def scoreFrame( self, - inp_audio_frame:np.array, - unsafe:bool = False) -> float : + inp_audio_frame: np.array, + unsafe: bool = False) -> Union[dict, None]: """ Converts given audio frame to embedding and checks for similarity - with given reference file - - Inp Parameters: - - inp_audio_frame : np.array of 1channel 1 sec 16000Hz sampled audio - frame - unsafe : bool value, set to False by default to prevent engine - processing continuous speech or silence, to minimalize false positives - - **Note : change unsafe to True only if you know what you are doing** - - Out Parameters: - - { - "match":True or False, - "confidence":float value - } - or - None when no voice activity is identified + with given reference file. 이제 RMS 값도 함께 반환합니다. """ - - if(not unsafe): - upperPoint = max( - ( - inp_audio_frame/inp_audio_frame.max() - )[:RATE//10] - ) - if(upperPoint > 0.2): + # 입력 프레임의 RMS 계산 (전체 프레임 기준) + rms_value = np.sqrt(np.mean(np.square(inp_audio_frame.astype(np.float32)))) + + if not unsafe: + upperPoint = max((inp_audio_frame/inp_audio_frame.max())[:RATE//10]) + if upperPoint > 0.2: return None - #assert inp_audio_frame.shape == (RATE,), \ - # f"Audio frame needs to be a 1 sec {RATE}Hz sampled vector" - - score = self.scoreVector( - self.model.audioToVector( - inp_audio_frame - ) - ) + score = self.scoreVector(self.model.audioToVector(inp_audio_frame)) return { - "match":score >= self.threshold, - "confidence":score + "match": score >= self.threshold, + "confidence": score, + "rms": rms_value } + HotwordDetectorArray = List[HotwordDetector] MatchInfo = Tuple[HotwordDetector,float] MatchInfoArray = List[MatchInfo] diff --git a/eff_word_net/streams.py b/eff_word_net/streams.py index cb94450..9cede41 100755 --- a/eff_word_net/streams.py +++ b/eff_word_net/streams.py @@ -1,42 +1,37 @@ import pyaudio -from typing import Tuple , Callable +from typing import Callable import numpy as np -from eff_word_net.engine import HotwordDetector -from eff_word_net import RATE +from scipy.signal import resample # For downsampling -NoParameterFunction = Callable[[],None] -AudioFrameFunction = Callable[[],np.array] +NoParameterFunction = Callable[[], None] +AudioFrameFunction = Callable[[], np.array] - -class CustomAudioStream : +class CustomAudioStream: """ - CustomAudioStream implementation allows developers to use - any 16000Hz sampled audio streams with inference engine - - It tries to add sliding window to audio streams + CustomAudioStream applies a sliding window to an audio stream. """ def __init__( self, - open_stream:Callable[[],None], - close_stream:Callable[[],None], - get_next_frame:Callable[[],np.array], - window_length_secs = 1, - sliding_window_secs:float = 1/8 - ): - + open_stream: Callable[[], None], + close_stream: Callable[[], None], + get_next_frame: Callable[[], np.array], + window_length_secs=1, + sliding_window_secs: float = 1/8, + sample_rate=16000 # Target sample rate for the engine + ): self._open_stream = open_stream self._close_stream = close_stream self._get_next_frame = get_next_frame - self._window_size = int(window_length_secs * RATE) - self._sliding_window_size = int(sliding_window_secs * RATE) - - self._out_audio = np.zeros(self._window_size) #blank 1 sec audio - print("Initial S",self._out_audio.shape) + self._sample_rate = sample_rate + self._window_size = int(window_length_secs * sample_rate) + self._sliding_window_size = int(sliding_window_secs * sample_rate) + self._out_audio = np.zeros(self._window_size) # Initialize audio buffer + print("Initial output buffer shape:", self._out_audio.shape) def start_stream(self): self._out_audio = np.zeros(self._window_size) self._open_stream() - for i in range(RATE//self._sliding_window_size -1): + for i in range(self._sample_rate // self._sliding_window_size - 1): self.getFrame() def close_stream(self): @@ -45,54 +40,61 @@ def close_stream(self): def getFrame(self): """ - Returns a 1 sec audio frame with sliding window of 1/8 sec with - sampling frequency 16000Hz + Returns a 1-second audio frame with a sliding window of length (sliding_window_secs) + using the target sample rate. """ - new_frame = self._get_next_frame() - - #print("Prior:", self._out_audio.shape, new_frame.shape ) assert new_frame.shape == (self._sliding_window_size,), \ - "audio frame size from src doesnt match sliding_window_secs" - - + "audio frame size from src doesn't match sliding_window_secs" self._out_audio = np.append( - self._out_audio[self._sliding_window_size:], + self._out_audio[self._sliding_window_size:], new_frame ) - - #print(self._out_audio.shape) - return self._out_audio -class SimpleMicStream(CustomAudioStream) : - - """ - Implements mic stream with sliding window, - implemented by inheriting CustomAudioStream - """ - def __init__(self,window_length_secs=1, sliding_window_secs:float=1/8): - p=pyaudio.PyAudio() - - CHUNK = int(sliding_window_secs*RATE) - print("Chunk size", CHUNK) - mic_stream=p.open( +class SimpleMicStream(CustomAudioStream): + def __init__(self, window_length_secs=1, sliding_window_secs: float = 1/8, + custom_channels=2, custom_rate=48000, custom_device_index=None): + p = pyaudio.PyAudio() + # Calculate CHUNK based on sliding window seconds and capture rate (custom_rate) + CHUNK = int(sliding_window_secs * custom_rate) + print("Chunk size (captured at {}Hz): {}".format(custom_rate, CHUNK)) + + mic_stream = p.open( format=pyaudio.paInt16, - channels=1, - rate=16000, + channels=custom_channels, + rate=custom_rate, input=True, - frames_per_buffer=CHUNK + frames_per_buffer=CHUNK, + input_device_index=custom_device_index ) - mic_stream.stop_stream() + def get_next_frame(): + try: + # Use exception_on_overflow=False to avoid overflow errors + data = mic_stream.read(CHUNK, exception_on_overflow=False) + except Exception as e: + print("Input overflow:", e) + # Return a silent frame if overflow occurs + data = b'\x00' * CHUNK * 2 * custom_channels # 2 bytes per sample + arr = np.frombuffer(data, dtype=np.int16) + if custom_channels > 1: + # Convert stereo to mono by averaging channels + arr = np.mean(arr.reshape(-1, custom_channels), axis=1).astype(np.int16) + # Downsample from custom_rate (48000Hz) to target rate (16000Hz) + target_rate = 16000 + new_length = int(len(arr) * target_rate / custom_rate) + arr_down = resample(arr, new_length).astype(np.int16) + return arr_down + + # Initialize the CustomAudioStream with the target sample rate for the engine CustomAudioStream.__init__( self, - open_stream = mic_stream.start_stream, - close_stream = mic_stream.stop_stream, - get_next_frame = lambda : ( - np.frombuffer(mic_stream.read(CHUNK,exception_on_overflow = False),dtype=np.int16) - ), - window_length_secs=window_length_secs, - sliding_window_secs=sliding_window_secs + open_stream=mic_stream.start_stream, + close_stream=mic_stream.stop_stream, + get_next_frame=get_next_frame, + window_length_secs=window_length_secs, + sliding_window_secs=sliding_window_secs, + sample_rate=16000 # Engine expects 16000 Hz )