-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstreaming_vad_recorder.py
More file actions
229 lines (186 loc) · 7.81 KB
/
streaming_vad_recorder.py
File metadata and controls
229 lines (186 loc) · 7.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
"""
Streaming VAD Recorder - Records audio from continuous stream using VAD
Subscribes to continuous audio stream instead of opening device
"""
import logging
import threading
import wave
import numpy as np
import webrtcvad
from pathlib import Path
from typing import Optional
from collections import deque
logger = logging.getLogger(__name__)
class StreamingVADRecorder:
"""
VAD-based recorder that works with continuous audio stream.
Buffers audio and uses VAD to detect when user stops speaking.
"""
def __init__(self,
sample_rate: int = 48000,
vad_aggressiveness: int = 2,
frame_duration_ms: int = 30,
padding_duration_ms: int = 300,
min_speech_duration_ms: int = 500,
max_duration_s: int = 30,
recordings_dir: Path = Path("/tmp")):
"""
Initialize streaming VAD recorder
Args:
sample_rate: Audio sample rate
vad_aggressiveness: VAD aggressiveness (0-3, higher = more aggressive)
frame_duration_ms: Frame duration for VAD (10, 20, or 30ms)
padding_duration_ms: Padding before/after speech
min_speech_duration_ms: Minimum speech duration
max_duration_s: Maximum recording duration
recordings_dir: Directory to save recordings
"""
self.sample_rate = sample_rate
self.vad_aggressiveness = vad_aggressiveness
self.frame_duration_ms = frame_duration_ms
self.padding_duration_ms = padding_duration_ms
self.min_speech_duration_ms = min_speech_duration_ms
self.max_duration_s = max_duration_s
self.recordings_dir = Path(recordings_dir)
# Create recordings directory
self.recordings_dir.mkdir(parents=True, exist_ok=True)
# VAD
self.vad = webrtcvad.Vad(vad_aggressiveness)
# Frame size in samples
self.frame_size = int(sample_rate * frame_duration_ms / 1000)
# Padding frames
self.padding_frames = int(padding_duration_ms / frame_duration_ms)
# Min speech frames
self.min_speech_frames = int(min_speech_duration_ms / frame_duration_ms)
# Recording state
self.recording = False
self.recording_lock = threading.Lock()
self.audio_buffer = deque()
self.speech_detected = threading.Event()
self.recording_complete = threading.Event()
# Recording result
self.recorded_file = None
logger.info(f"StreamingVADRecorder initialized:")
logger.info(f" Sample rate: {sample_rate}Hz")
logger.info(f" VAD aggressiveness: {vad_aggressiveness}")
logger.info(f" Frame duration: {frame_duration_ms}ms")
logger.info(f" Padding: {padding_duration_ms}ms")
logger.info(f" Min speech: {min_speech_duration_ms}ms")
def start_recording(self):
"""Start recording (buffers audio until VAD detects end of speech)"""
with self.recording_lock:
if self.recording:
logger.warning("Already recording")
return
logger.info("🎤 Starting streaming recording...")
self.recording = True
self.audio_buffer.clear()
self.speech_detected.clear()
self.recording_complete.clear()
self.recorded_file = None
def process_audio(self, audio_chunk: np.ndarray, sample_rate: int):
"""
Process audio chunk (called by continuous stream)
Args:
audio_chunk: Audio data
sample_rate: Sample rate
"""
with self.recording_lock:
if not self.recording:
return
try:
# Add to buffer
self.audio_buffer.append(audio_chunk)
# Check if we have enough for VAD analysis
total_samples = sum(len(chunk) for chunk in self.audio_buffer)
total_duration_s = total_samples / sample_rate
# Check max duration
if total_duration_s >= self.max_duration_s:
logger.info(f"⏱️ Max duration reached ({self.max_duration_s}s)")
self._finalize_recording()
return
# Perform VAD analysis
self._analyze_vad()
except Exception as e:
logger.error(f"Error processing audio for recording: {e}", exc_info=True)
def _analyze_vad(self):
"""Analyze buffered audio with VAD"""
# Convert buffer to single array
if not self.audio_buffer:
return
audio_data = np.concatenate(list(self.audio_buffer))
# Split into frames for VAD
num_frames = len(audio_data) // self.frame_size
if num_frames == 0:
return
# Analyze recent frames
speech_frames = 0
silence_frames = 0
for i in range(max(0, num_frames - 10), num_frames): # Check last 10 frames
frame_start = i * self.frame_size
frame_end = frame_start + self.frame_size
frame = audio_data[frame_start:frame_end]
if len(frame) == self.frame_size:
# Convert to bytes for VAD
frame_bytes = frame.tobytes()
# Check if speech
is_speech = self.vad.is_speech(frame_bytes, self.sample_rate)
if is_speech:
speech_frames += 1
silence_frames = 0
else:
silence_frames += 1
# Check if we detected speech
if speech_frames >= 3:
if not self.speech_detected.is_set():
logger.debug("🗣️ Speech detected")
self.speech_detected.set()
# Check if speech ended (after detecting speech)
if self.speech_detected.is_set() and silence_frames >= self.padding_frames:
if num_frames >= self.min_speech_frames:
logger.info("🔇 Speech ended")
self._finalize_recording()
def _finalize_recording(self):
"""Save recording and signal completion"""
with self.recording_lock:
if not self.recording:
return
try:
# Combine all audio
audio_data = np.concatenate(list(self.audio_buffer))
# Generate filename
import time
timestamp = int(time.time())
filename = self.recordings_dir / f"recording_{timestamp}.wav"
# Save as WAV
with wave.open(str(filename), 'wb') as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2) # 16-bit
wav_file.setframerate(self.sample_rate)
wav_file.writeframes(audio_data.tobytes())
duration = len(audio_data) / self.sample_rate
logger.info(f"✅ Recording saved: {filename} ({duration:.2f}s)")
self.recorded_file = filename
self.recording = False
self.recording_complete.set()
except Exception as e:
logger.error(f"Error finalizing recording: {e}", exc_info=True)
self.recording = False
self.recording_complete.set()
def wait_for_recording(self, timeout: float = None) -> Optional[Path]:
"""
Wait for recording to complete
Args:
timeout: Maximum time to wait
Returns:
Path to recorded file, or None if failed/timeout
"""
if self.recording_complete.wait(timeout=timeout):
return self.recorded_file
return None
def stop_recording(self):
"""Stop recording immediately"""
with self.recording_lock:
if self.recording:
logger.info("🛑 Stopping recording")
self._finalize_recording()