Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 17 additions & 40 deletions eff_word_net/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,57 +97,34 @@ def scoreVector(self,inp_vec:np.array) -> float :
self.__last_activation_time = current_time

return score


## EDIT 2025.03.10
## RMS calculation ADD
def scoreFrame(
self,
inp_audio_frame:np.array,
unsafe:bool = False) -> float :
inp_audio_frame: np.array,
unsafe: bool = False) -> Union[dict, None]:
"""
Converts given audio frame to embedding and checks for similarity
with given reference file

Inp Parameters:

inp_audio_frame : np.array of 1channel 1 sec 16000Hz sampled audio
frame
unsafe : bool value, set to False by default to prevent engine
processing continuous speech or silence, to minimalize false positives

**Note : change unsafe to True only if you know what you are doing**

Out Parameters:

{
"match":True or False,
"confidence":float value
}
or
None when no voice activity is identified
with given reference file. 이제 RMS 값도 함께 반환합니다.
"""

if(not unsafe):
upperPoint = max(
(
inp_audio_frame/inp_audio_frame.max()
)[:RATE//10]
)
if(upperPoint > 0.2):
# 입력 프레임의 RMS 계산 (전체 프레임 기준)
rms_value = np.sqrt(np.mean(np.square(inp_audio_frame.astype(np.float32))))

if not unsafe:
upperPoint = max((inp_audio_frame/inp_audio_frame.max())[:RATE//10])
if upperPoint > 0.2:
return None

#assert inp_audio_frame.shape == (RATE,), \
# f"Audio frame needs to be a 1 sec {RATE}Hz sampled vector"

score = self.scoreVector(
self.model.audioToVector(
inp_audio_frame
)
)
score = self.scoreVector(self.model.audioToVector(inp_audio_frame))

return {
"match":score >= self.threshold,
"confidence":score
"match": score >= self.threshold,
"confidence": score,
"rms": rms_value
}


HotwordDetectorArray = List[HotwordDetector]
MatchInfo = Tuple[HotwordDetector,float]
MatchInfoArray = List[MatchInfo]
Expand Down
120 changes: 61 additions & 59 deletions eff_word_net/streams.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,37 @@
import pyaudio
from typing import Tuple , Callable
from typing import Callable
import numpy as np
from eff_word_net.engine import HotwordDetector
from eff_word_net import RATE
from scipy.signal import resample # For downsampling

NoParameterFunction = Callable[[],None]
AudioFrameFunction = Callable[[],np.array]
NoParameterFunction = Callable[[], None]
AudioFrameFunction = Callable[[], np.array]


class CustomAudioStream :
class CustomAudioStream:
"""
CustomAudioStream implementation allows developers to use
any 16000Hz sampled audio streams with inference engine

It tries to add sliding window to audio streams
CustomAudioStream applies a sliding window to an audio stream.
"""
def __init__(
self,
open_stream:Callable[[],None],
close_stream:Callable[[],None],
get_next_frame:Callable[[],np.array],
window_length_secs = 1,
sliding_window_secs:float = 1/8
):

open_stream: Callable[[], None],
close_stream: Callable[[], None],
get_next_frame: Callable[[], np.array],
window_length_secs=1,
sliding_window_secs: float = 1/8,
sample_rate=16000 # Target sample rate for the engine
):
self._open_stream = open_stream
self._close_stream = close_stream
self._get_next_frame = get_next_frame
self._window_size = int(window_length_secs * RATE)
self._sliding_window_size = int(sliding_window_secs * RATE)

self._out_audio = np.zeros(self._window_size) #blank 1 sec audio
print("Initial S",self._out_audio.shape)
self._sample_rate = sample_rate
self._window_size = int(window_length_secs * sample_rate)
self._sliding_window_size = int(sliding_window_secs * sample_rate)
self._out_audio = np.zeros(self._window_size) # Initialize audio buffer
print("Initial output buffer shape:", self._out_audio.shape)

def start_stream(self):
self._out_audio = np.zeros(self._window_size)
self._open_stream()
for i in range(RATE//self._sliding_window_size -1):
for i in range(self._sample_rate // self._sliding_window_size - 1):
self.getFrame()

def close_stream(self):
Expand All @@ -45,54 +40,61 @@ def close_stream(self):

def getFrame(self):
"""
Returns a 1 sec audio frame with sliding window of 1/8 sec with
sampling frequency 16000Hz
Returns a 1-second audio frame with a sliding window of length (sliding_window_secs)
using the target sample rate.
"""

new_frame = self._get_next_frame()

#print("Prior:", self._out_audio.shape, new_frame.shape )
assert new_frame.shape == (self._sliding_window_size,), \
"audio frame size from src doesnt match sliding_window_secs"


"audio frame size from src doesn't match sliding_window_secs"
self._out_audio = np.append(
self._out_audio[self._sliding_window_size:],
self._out_audio[self._sliding_window_size:],
new_frame
)

#print(self._out_audio.shape)

return self._out_audio

class SimpleMicStream(CustomAudioStream) :

"""
Implements mic stream with sliding window,
implemented by inheriting CustomAudioStream
"""
def __init__(self,window_length_secs=1, sliding_window_secs:float=1/8):
p=pyaudio.PyAudio()

CHUNK = int(sliding_window_secs*RATE)
print("Chunk size", CHUNK)
mic_stream=p.open(
class SimpleMicStream(CustomAudioStream):
def __init__(self, window_length_secs=1, sliding_window_secs: float = 1/8,
custom_channels=2, custom_rate=48000, custom_device_index=None):
p = pyaudio.PyAudio()
# Calculate CHUNK based on sliding window seconds and capture rate (custom_rate)
CHUNK = int(sliding_window_secs * custom_rate)
print("Chunk size (captured at {}Hz): {}".format(custom_rate, CHUNK))

mic_stream = p.open(
format=pyaudio.paInt16,
channels=1,
rate=16000,
channels=custom_channels,
rate=custom_rate,
input=True,
frames_per_buffer=CHUNK
frames_per_buffer=CHUNK,
input_device_index=custom_device_index
)

mic_stream.stop_stream()

def get_next_frame():
try:
# Use exception_on_overflow=False to avoid overflow errors
data = mic_stream.read(CHUNK, exception_on_overflow=False)
except Exception as e:
print("Input overflow:", e)
# Return a silent frame if overflow occurs
data = b'\x00' * CHUNK * 2 * custom_channels # 2 bytes per sample
arr = np.frombuffer(data, dtype=np.int16)
if custom_channels > 1:
# Convert stereo to mono by averaging channels
arr = np.mean(arr.reshape(-1, custom_channels), axis=1).astype(np.int16)
# Downsample from custom_rate (48000Hz) to target rate (16000Hz)
target_rate = 16000
new_length = int(len(arr) * target_rate / custom_rate)
arr_down = resample(arr, new_length).astype(np.int16)
return arr_down

# Initialize the CustomAudioStream with the target sample rate for the engine
CustomAudioStream.__init__(
self,
open_stream = mic_stream.start_stream,
close_stream = mic_stream.stop_stream,
get_next_frame = lambda : (
np.frombuffer(mic_stream.read(CHUNK,exception_on_overflow = False),dtype=np.int16)
),
window_length_secs=window_length_secs,
sliding_window_secs=sliding_window_secs
open_stream=mic_stream.start_stream,
close_stream=mic_stream.stop_stream,
get_next_frame=get_next_frame,
window_length_secs=window_length_secs,
sliding_window_secs=sliding_window_secs,
sample_rate=16000 # Engine expects 16000 Hz
)