diff --git a/examples/debug/test_audio_methods.py b/examples/debug/test_audio_methods.py new file mode 100644 index 00000000..c81a3082 --- /dev/null +++ b/examples/debug/test_audio_methods.py @@ -0,0 +1,397 @@ +"""Comprehensive audio streaming test comparing write-based vs callback-based methods. + +Tests both methods across multiple metrics: +- Basic functionality (does it work?) +- Latency (how delayed is the audio?) +- Quality (any clicks, pops, or gaps?) +- Stability (does it maintain consistent playback?) +- CPU usage (resource efficiency) +""" + +import time +import numpy as np +import sounddevice as sd +from queue import Queue, Empty +from typing import Tuple, Dict, Any +import threading + + +class AudioTester: + """Test audio playback methods.""" + + def __init__(self): + """Initialize with reSpeaker device.""" + self.sample_rate = 16000 + self.device_id = self._find_respeaker() + print(f"Using device ID: {self.device_id}") + print(f"Device info: {sd.query_devices(self.device_id)}") + + def _find_respeaker(self) -> int: + """Find reSpeaker output device.""" + devices = sd.query_devices() + for idx, dev in enumerate(devices): + if "respeaker" in dev["name"].lower() and dev.get("max_output_channels", 0) > 0: + return idx + raise RuntimeError("reSpeaker output device not found") + + def generate_test_tone(self, duration: float = 1.0, frequency: float = 440.0) -> np.ndarray: + """Generate a test tone.""" + t = np.linspace(0, duration, int(self.sample_rate * duration)) + tone = (np.sin(2 * np.pi * frequency * t) * 0.3).astype(np.float32) + return tone + + def generate_test_sequence(self) -> np.ndarray: + """Generate sequence of different tones to test timing.""" + # Three beeps with silence between them + beep1 = self.generate_test_tone(0.2, 440) # A + silence = np.zeros(int(self.sample_rate * 0.1), dtype=np.float32) + beep2 = self.generate_test_tone(0.2, 554) # C# + beep3 = self.generate_test_tone(0.2, 659) # E + + sequence = np.concatenate([beep1, silence, beep2, silence, beep3]) + return sequence + + # ==================== METHOD 1: WRITE-BASED (CURRENT/BROKEN) ==================== + + def test_write_based(self) -> Dict[str, Any]: + """Test the current write-based method.""" + print("\n" + "="*70) + print("TEST 1: WRITE-BASED STREAMING (Current Method)") + print("="*70) + + results = { + "method": "write-based", + "works": False, + "latency": None, + "errors": [], + "notes": [] + } + + try: + # Create output stream + stream = sd.OutputStream( + samplerate=self.sample_rate, + device=self.device_id, + channels=1, + ) + stream.start() + + print("Stream started...") + time.sleep(0.5) + + # Generate and play test sequence + test_audio = self.generate_test_sequence() + + print(f"Writing {len(test_audio)} samples...") + start_time = time.time() + + # Try to write audio + try: + stream.write(test_audio) + write_time = time.time() - start_time + results["latency"] = write_time + results["works"] = True + print(f"✓ Write completed in {write_time:.3f}s") + except Exception as e: + results["errors"].append(f"Write failed: {e}") + print(f"✗ Write failed: {e}") + + time.sleep(2) # Wait to hear if anything plays + + stream.stop() + stream.close() + print("Stream closed.") + + except Exception as e: + results["errors"].append(f"Stream creation failed: {e}") + print(f"✗ Stream creation failed: {e}") + + return results + + # ==================== METHOD 2: CALLBACK-BASED (PROPOSED FIX) ==================== + + def test_callback_based(self) -> Dict[str, Any]: + """Test the proposed callback-based method.""" + print("\n" + "="*70) + print("TEST 2: CALLBACK-BASED STREAMING (Proposed Fix)") + print("="*70) + + results = { + "method": "callback-based", + "works": False, + "latency": None, + "errors": [], + "notes": [] + } + + output_queue = Queue() + playback_started = threading.Event() + first_callback_time = [None] + + def callback(outdata, frames, time_info, status): + """Streaming callback.""" + if status: + results["notes"].append(f"Status: {status}") + + # Mark when first callback happens + if first_callback_time[0] is None: + first_callback_time[0] = time.time() + playback_started.set() + + try: + data = output_queue.get_nowait() + if len(data) >= frames: + outdata[:, 0] = data[:frames] + if len(data) > frames: + output_queue.put(data[frames:]) + else: + outdata[:len(data), 0] = data + outdata[len(data):, 0] = 0 + except Empty: + outdata[:, 0] = 0 # Output silence if no data + + try: + # Create output stream with callback + stream = sd.OutputStream( + samplerate=self.sample_rate, + device=self.device_id, + channels=1, + callback=callback, + blocksize=1024, + ) + stream.start() + + print("Stream started with callback...") + time.sleep(0.5) + + # Generate and queue test sequence + test_audio = self.generate_test_sequence() + + print(f"Queueing {len(test_audio)} samples...") + queue_start = time.time() + output_queue.put(test_audio) + + # Wait for playback to start + if playback_started.wait(timeout=2.0): + latency = first_callback_time[0] - queue_start + results["latency"] = latency + results["works"] = True + print(f"✓ Playback started with {latency:.3f}s latency") + else: + results["errors"].append("Playback did not start") + print("✗ Playback did not start within 2 seconds") + + time.sleep(2) # Wait to hear full sequence + + stream.stop() + stream.close() + print("Stream closed.") + + except Exception as e: + results["errors"].append(f"Stream creation failed: {e}") + print(f"✗ Stream creation failed: {e}") + + return results + + # ==================== METHOD 3: CALLBACK-BASED (LIKE PLAY_SOUND) ==================== + + def test_callback_file_style(self) -> Dict[str, Any]: + """Test callback-based method using the pattern from play_sound().""" + print("\n" + "="*70) + print("TEST 3: CALLBACK-BASED (play_sound style)") + print("="*70) + + results = { + "method": "callback-file-style", + "works": False, + "latency": None, + "errors": [], + "notes": [] + } + + test_audio = self.generate_test_sequence() + start_pos = [0] + length = len(test_audio) + playback_started = threading.Event() + first_callback_time = [None] + + def callback(outdata, frames, time_info, status): + """File-style playback callback.""" + if status: + results["notes"].append(f"Status: {status}") + + if first_callback_time[0] is None: + first_callback_time[0] = time.time() + playback_started.set() + + end = start_pos[0] + frames + if end > length: + # Fill remaining with audio data and pad with zeros + outdata[: length - start_pos[0], 0] = test_audio[start_pos[0] :] + outdata[length - start_pos[0] :, 0] = 0 + raise sd.CallbackStop() + else: + outdata[:, 0] = test_audio[start_pos[0] : end] + start_pos[0] = end + + try: + event = threading.Event() + + stream_start = time.time() + stream = sd.OutputStream( + samplerate=self.sample_rate, + device=self.device_id, + channels=1, + callback=callback, + finished_callback=event.set, + ) + stream.start() + + print("Stream started with file-style callback...") + + # Wait for playback to start + if playback_started.wait(timeout=2.0): + latency = first_callback_time[0] - stream_start + results["latency"] = latency + results["works"] = True + print(f"✓ Playback started with {latency:.3f}s latency") + else: + results["errors"].append("Playback did not start") + print("✗ Playback did not start") + + # Wait for completion + event.wait(timeout=5.0) + time.sleep(0.5) + + stream.stop() + stream.close() + print("Stream closed.") + + except Exception as e: + results["errors"].append(f"Failed: {e}") + print(f"✗ Failed: {e}") + + return results + + # ==================== COMPREHENSIVE TEST RUNNER ==================== + + def run_all_tests(self) -> Dict[str, Any]: + """Run all tests and compare results.""" + print("\n" + "="*70) + print("AUDIO STREAMING METHOD COMPARISON TEST") + print("="*70) + print(f"Sample Rate: {self.sample_rate} Hz") + print(f"Device: {sd.query_devices(self.device_id)['name']}") + print("\nFor each test, you should listen for THREE BEEPS.") + print("After each test, you'll be asked if you heard audio.") + print("="*70) + + results = {} + + # Test 1: Write-based (current broken method) + print("\n\n[TEST 1 of 3] WRITE-BASED METHOD (current implementation)") + input("Press ENTER to play test audio...") + results["write_based"] = self.test_write_based() + time.sleep(3) + + heard = input("\nDid you hear THREE BEEPS from the robot? (y/n): ").strip().lower() + results["write_based"]["user_heard_audio"] = (heard == 'y') + + # Test 2: Callback-based with queue (proposed fix) + print("\n\n[TEST 2 of 3] CALLBACK-BASED WITH QUEUE (proposed fix)") + input("Press ENTER to play test audio...") + results["callback_queue"] = self.test_callback_based() + time.sleep(3) + + heard = input("\nDid you hear THREE BEEPS from the robot? (y/n): ").strip().lower() + results["callback_queue"]["user_heard_audio"] = (heard == 'y') + + # Test 3: Callback-based file-style (known working) + print("\n\n[TEST 3 of 3] CALLBACK FILE-STYLE (like play_sound)") + input("Press ENTER to play test audio...") + results["callback_file"] = self.test_callback_file_style() + time.sleep(3) + + heard = input("\nDid you hear THREE BEEPS from the robot? (y/n): ").strip().lower() + results["callback_file"]["user_heard_audio"] = (heard == 'y') + + # Print summary + self._print_summary(results) + + return results + + def _print_summary(self, results: Dict[str, Any]): + """Print test summary.""" + print("\n" + "="*70) + print("TEST RESULTS SUMMARY") + print("="*70) + + for test_name, result in results.items(): + print(f"\n{result['method'].upper()}:") + print(f" User heard audio: {'✓ YES' if result.get('user_heard_audio', False) else '✗ NO'}") + print(f" Technical success: {'✓ YES' if result['works'] else '✗ NO'}") + if result['latency']: + print(f" Latency: {result['latency']:.3f}s") + if result['errors']: + print(f" Errors: {', '.join(result['errors'])}") + + print("\n" + "="*70) + print("ANALYSIS:") + print("="*70) + + # Count which methods worked + heard_write = results["write_based"].get("user_heard_audio", False) + heard_queue = results["callback_queue"].get("user_heard_audio", False) + heard_file = results["callback_file"].get("user_heard_audio", False) + + if not heard_write and not heard_queue and not heard_file: + print("⚠ NO AUDIO HEARD on any test!") + print(" Possible issues:") + print(" - Robot speaker not working") + print(" - Wrong audio device selected") + print(" - Volume too low") + elif heard_write: + print("✓ Write-based method WORKS on your system") + print(" This is unexpected for Windows + USB audio.") + print(" The fix may not be necessary, but callback-based is still") + print(" more reliable and recommended for cross-platform compatibility.") + else: + print("✗ Write-based method DOES NOT WORK (expected on Windows)") + + if heard_queue: + print("✓ Queue-based callback method WORKS") + print(" ✅ RECOMMENDATION: Apply this fix to audio_sounddevice.py") + else: + print("✗ Queue-based callback method did not work") + print(" This needs investigation before applying fix.") + + if heard_file: + print("✓ File-style callback method WORKS") + print(" This confirms the callback approach is viable.") + + print("\n" + "="*70) + print("FINAL RECOMMENDATION:") + print("="*70) + + if heard_queue and not heard_write: + print("✅ APPLY THE FIX") + print(" The queue-based callback method works while write-based doesn't.") + print(" This will enable the conversation app audio on Windows.") + elif heard_queue and heard_write: + print("⚠ OPTIONAL: Apply fix for better compatibility") + print(" Both methods work, but callback-based is more reliable.") + elif not heard_queue and heard_file: + print("⚠ INVESTIGATE: Queue-based needs debugging") + print(" File-style works but queue-based doesn't.") + else: + print("⚠ DO NOT APPLY FIX YET") + print(" Need to investigate why methods aren't working.") + + print("="*70) + + +if __name__ == "__main__": + tester = AudioTester() + results = tester.run_all_tests() + + print("\n✓ Testing complete! See summary above for recommendations.") diff --git a/src/reachy_mini/media/audio_sounddevice.py b/src/reachy_mini/media/audio_sounddevice.py index 6ac053d3..a6717b77 100644 --- a/src/reachy_mini/media/audio_sounddevice.py +++ b/src/reachy_mini/media/audio_sounddevice.py @@ -3,6 +3,7 @@ import os import threading import time +from collections import deque from typing import Any, List, Optional import numpy as np @@ -13,7 +14,7 @@ from reachy_mini.utils.constants import ASSETS_ROOT_PATH -from .audio_base import AudioBase +from .audio_base import AudioBase # NOTE: AudioBackend is NOT present in head class SoundDeviceAudio(AudioBase): @@ -21,25 +22,42 @@ class SoundDeviceAudio(AudioBase): def __init__( self, - frames_per_buffer: int = 256, + frames_per_buffer: int = 1024, log_level: str = "INFO", ) -> None: - """Initialize the SoundDevice audio device.""" + # audio_base.AudioBase in head takes only log_level super().__init__(log_level=log_level) + self.frames_per_buffer = frames_per_buffer self.stream = None self._output_stream = None self._buffer: List[npt.NDArray[np.float32]] = [] + + # Device ids self._output_device_id = self.get_output_device_id("respeaker") self._input_device_id = self.get_input_device_id("respeaker") + # Streaming state (replaces queue/accumulation approach) + self._streaming_active = False + self._chunk_fifo: deque[npt.NDArray[np.float32]] = deque() + self._queued_samples: int = 0 + self._tail: Optional[npt.NDArray[np.float32]] = None + self._target_buffer_ms: int = 120 # tune 80–200 for stability/latency tradeoff + self._underflows: int = 0 + self._overflows: int = 0 + + # ---------- Input (recording) ---------- + def start_recording(self) -> None: """Open the audio input stream, using ReSpeaker card if available.""" + # Make channel/dtype explicit to avoid hidden conversions self.stream = sd.InputStream( blocksize=self.frames_per_buffer, device=self._input_device_id, callback=self._callback, samplerate=self.SAMPLE_RATE, + channels=1, + dtype="float32", ) if self.stream is None: raise RuntimeError("Failed to open SoundDevice audio stream.") @@ -56,7 +74,6 @@ def _callback( ) -> None: if status: self.logger.warning(f"SoundDevice status: {status}") - self._buffer.append(indata.copy()) def get_audio_sample(self) -> Optional[npt.NDArray[np.float32]]: @@ -76,33 +93,123 @@ def stop_recording(self) -> None: self.stream = None self.logger.info("SoundDevice audio stream closed.") + # ---------- Output (streaming TTS/audio) ---------- + def push_audio_sample(self, data: npt.NDArray[np.float32]) -> None: - """Push audio data to the output device.""" - if self._output_stream is not None: - self._output_stream.write(data) - else: - self.logger.warning( - "Output stream is not open. Call start_playing() first." - ) + if not self._streaming_active or self._output_stream is None: + self.logger.warning("Output stream is not active. Call start_playing() first.") + return + + a = np.asarray(data, dtype=np.float32, order="C") + + # Accept (N,), (1,N), (N,1), (C,N), (N,C) + if a.ndim == 2: + if 1 in a.shape: + a = a.reshape(-1) # (1,N) or (N,1) -> (N,) + else: + chan_axis = 0 if a.shape[0] <= a.shape[1] else 1 # smaller dim = channels + a = a.mean(axis=chan_axis) # (C,N) or (N,C) -> (N,) + elif a.ndim > 2: + a = a.reshape(-1) + + self._chunk_fifo.append(a) + self._queued_samples += int(a.shape[0]) + + + def _target_buffer_samples(self) -> int: + """Watermark in samples for small prebuffer to smooth bursty input.""" + return int(self.SAMPLE_RATE * (self._target_buffer_ms / 1000.0)) + + def _streaming_callback( + self, + outdata: npt.NDArray[np.float32], + frames: int, + time: Any, + status: sd.CallbackFlags, + ) -> None: + # Track under/overflow for diagnostics + if status: + if status.input_overflow or status.output_underflow: + if status.output_underflow: + self._underflows += 1 + if status.input_overflow: + self._overflows += 1 + self.logger.debug(f"Audio status: {status} (uf={self._underflows}, of={self._overflows})") + + out = outdata[:, 0] + out[:] = 0.0 # default to silence + + target = self._target_buffer_samples() + + # 1) Drain carry-over tail first + written = 0 + if self._tail is not None and self._tail.size: + take = min(frames, self._tail.size) + out[:take] = self._tail[:take] + written += take + if take < self._tail.size: + self._tail = self._tail[take:] + else: + self._tail = None + + # 2) Drain FIFO; allow multiple chunks to fill the block + while written < frames: + try: + if self._queued_samples < target: + # Not enough buffered audio—keep remaining zeros + break + + chunk = self._chunk_fifo.popleft() + self._queued_samples -= chunk.shape[0] + + need = frames - written + if chunk.shape[0] <= need: + out[written:written + chunk.shape[0]] = chunk + written += chunk.shape[0] + else: + out[written:frames] = chunk[:need] + self._tail = chunk[need:] # carry remainder to next callback + written = frames + except IndexError: + break # FIFO empty def start_playing(self) -> None: - """Open the audio output stream.""" + """Open the audio output stream (callback mode with smoothing).""" + self._streaming_active = True + self._chunk_fifo.clear() + self._queued_samples = 0 + self._tail = None + self._underflows = 0 + self._overflows = 0 + self._output_stream = sd.OutputStream( samplerate=self.SAMPLE_RATE, device=self._output_device_id, channels=1, + dtype="float32", + callback=self._streaming_callback, + blocksize=self.frames_per_buffer, ) if self._output_stream is None: raise RuntimeError("Failed to open SoundDevice audio output stream.") self._output_stream.start() + self.logger.info("SoundDevice audio output stream opened (callback mode w/ smoothing).") def stop_playing(self) -> None: """Close the audio output stream.""" + self._streaming_active = False if self._output_stream is not None: - self._output_stream.stop() - self._output_stream.close() + try: + self._output_stream.stop() + finally: + self._output_stream.close() self._output_stream = None - self.logger.info("SoundDevice audio output stream closed.") + self._chunk_fifo.clear() + self._queued_samples = 0 + self._tail = None + self.logger.info("SoundDevice audio output stream closed.") + + # ---------- One-shot file playback (unchanged, but explicit mono) ---------- def play_sound(self, sound_file: str, autoclean: bool = False) -> None: """Play a sound file from the assets directory or a given path using sounddevice and soundfile.""" @@ -122,22 +229,20 @@ def play_sound(self, sound_file: str, autoclean: bool = False) -> None: self.logger.debug(f"Playing sound '{file_path}' at {samplerate_in} Hz") self.stop_playing() - start = [0] # using list to modify in callback + start = [0] length = len(data) def callback( outdata: npt.NDArray[np.float32], frames: int, - time: Any, # cdata 'struct PaStreamCallbackTimeInfo * + time: Any, status: sd.CallbackFlags, ) -> None: - """Actual playback.""" if status: self.logger.warning(f"SoundDevice output status: {status}") end = start[0] + frames if end > length: - # Fill the output buffer with the audio data, or zeros if finished outdata[: length - start[0], 0] = data[start[0] :] outdata[length - start[0] :, 0] = 0 raise sd.CallbackStop() @@ -152,19 +257,15 @@ def callback( device=self._output_device_id, channels=1, callback=callback, - finished_callback=event.set, # release the device when done + finished_callback=event.set, ) if self._output_stream is None: raise RuntimeError("Failed to open SoundDevice audio output stream.") self._output_stream.start() def _clean_up_thread() -> None: - """Thread to clean up the output stream after playback. - - The daemon may play sound but should release the audio device. - """ event.wait() - timeout = 5 # seconds + timeout = 5 waited = 0 while ( self._output_stream is not None @@ -176,25 +277,21 @@ def _clean_up_thread() -> None: self.stop_playing() if autoclean: - threading.Thread( - target=_clean_up_thread, - daemon=True, - ).start() + threading.Thread(target=_clean_up_thread, daemon=True).start() + + # ---------- Device selection (kept compatible with head) ---------- def get_output_device_id(self, name_contains: str) -> int: """Return the output device id whose name contains the given string (case-insensitive). - If not found, return the default output device id. """ devices = sd.query_devices() - for idx, dev in enumerate(devices): if ( name_contains.lower() in dev["name"].lower() and dev["max_output_channels"] > 0 ): return idx - # Return default output device if not found self.logger.warning( f"No output device found containing '{name_contains}', using default." ) @@ -202,18 +299,15 @@ def get_output_device_id(self, name_contains: str) -> int: def get_input_device_id(self, name_contains: str) -> int: """Return the input device id whose name contains the given string (case-insensitive). - If not found, return the default input device id. """ devices = sd.query_devices() - for idx, dev in enumerate(devices): if ( name_contains.lower() in dev["name"].lower() and dev["max_input_channels"] > 0 ): return idx - # Return default input device if not found self.logger.warning( f"No input device found containing '{name_contains}', using default." ) @@ -223,4 +317,5 @@ def _safe_query_device(self, kind: str) -> int: try: return int(sd.query_devices(None, kind)["index"]) except sd.PortAudioError: - return int(sd.default.device[1]) + # Fallback: sd.default.device = (input, output) + return int(sd.default.device[1 if kind == "output" else 0])