diff --git a/getstream/audio/__init__.py b/getstream/audio/__init__.py deleted file mode 100644 index 89cfb557..00000000 --- a/getstream/audio/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -"""Audio utilities for the getstream package.""" - -from .pcm_utils import ( - pcm_to_numpy_array, - numpy_array_to_bytes, - validate_sample_rate_compatibility, - log_audio_processing_info, -) -from .utils import resample_audio - -__all__ = [ - "pcm_to_numpy_array", - "numpy_array_to_bytes", - "validate_sample_rate_compatibility", - "log_audio_processing_info", - "resample_audio", -] diff --git a/getstream/audio/pcm_utils.py b/getstream/audio/pcm_utils.py deleted file mode 100644 index 161f8cdd..00000000 --- a/getstream/audio/pcm_utils.py +++ /dev/null @@ -1,129 +0,0 @@ -""" -Common PCM audio processing utilities for plugins. - -This module provides shared utilities for audio format conversion and validation -to eliminate code duplication between STT, TTS, and VAD plugins. -""" - -import logging - -import numpy as np - -from getstream.video.rtc.track_util import PcmData - -logger = logging.getLogger(__name__) - - -def pcm_to_numpy_array(pcm_data: PcmData) -> np.ndarray: - """ - Convert PcmData samples to numpy array. - - Handles both bytes and numpy array inputs, ensuring consistent int16 output. - - Args: - pcm_data: The PCM audio data to convert - - Returns: - numpy array of int16 audio samples - - Raises: - ValueError: If the input format is not supported - """ - if isinstance(pcm_data.samples, bytes): - # Convert bytes to numpy array - return np.frombuffer(pcm_data.samples, dtype=np.int16) - elif isinstance(pcm_data.samples, np.ndarray): - # Ensure it's int16 format - return pcm_data.samples.astype(np.int16) - else: - raise ValueError( - f"Unsupported samples type: {type(pcm_data.samples)}. " - "Expected bytes or numpy.ndarray" - ) - - -def numpy_array_to_bytes(audio_array: np.ndarray) -> bytes: - """ - Convert numpy audio array to bytes. - - Args: - audio_array: numpy array of audio samples - - Returns: - bytes representation of the audio data - """ - # Ensure int16 format before converting to bytes - if audio_array.dtype != np.int16: - audio_array = audio_array.astype(np.int16) - - return audio_array.tobytes() - - -def validate_sample_rate_compatibility( - input_rate: int, target_rate: int, plugin_name: str -) -> None: - """ - Validate sample rate compatibility and log appropriate messages. - - Args: - input_rate: Input sample rate in Hz - target_rate: Target sample rate in Hz - plugin_name: Name of the plugin for logging context - - Raises: - ValueError: If sample rates are invalid (zero or negative) - """ - if input_rate <= 0: - raise ValueError(f"Invalid input sample rate: {input_rate}Hz") - - if target_rate <= 0: - raise ValueError(f"Invalid target sample rate: {target_rate}Hz") - - if input_rate != target_rate: - if input_rate == 48000 and target_rate == 16000: - # This is the expected WebRTC -> plugin conversion - logger.debug( - f"Converting WebRTC audio (48kHz) to {plugin_name} format (16kHz)" - ) - else: - logger.warning( - f"Unexpected sample rate conversion in {plugin_name}: " - f"{input_rate}Hz -> {target_rate}Hz" - ) - else: - logger.debug("No resampling needed - sample rates match") - - -def log_audio_processing_info( - pcm_data: PcmData, target_rate: int, plugin_name: str -) -> None: - """ - Log detailed audio processing information for debugging. - - Args: - pcm_data: The PCM audio data being processed - target_rate: Target sample rate for the plugin - plugin_name: Name of the plugin for logging context - """ - # Calculate duration safely - duration_ms = 0.0 - if pcm_data.sample_rate > 0: - if isinstance(pcm_data.samples, bytes): - # For bytes, calculate based on int16 format (2 bytes per sample) - num_samples = len(pcm_data.samples) // 2 - else: - num_samples = len(pcm_data.samples) - duration_ms = (num_samples / pcm_data.sample_rate) * 1000 - - logger.debug( - f"Processing audio chunk in {plugin_name}", - extra={ - "plugin": plugin_name, - "input_sample_rate": pcm_data.sample_rate, - "target_sample_rate": target_rate, - "input_format": pcm_data.format, - "samples_type": type(pcm_data.samples).__name__, - "samples_length": len(pcm_data.samples), - "duration_ms": duration_ms, - }, - ) diff --git a/getstream/audio/utils.py b/getstream/audio/utils.py deleted file mode 100644 index a4fd9953..00000000 --- a/getstream/audio/utils.py +++ /dev/null @@ -1,72 +0,0 @@ -""" -Shared helpers for audio manipulation used by VAD and STT plugins. -Currently contains: - * resample_audio() – high-quality multi-backend resampler. -""" - -from __future__ import annotations -import logging -import numpy as np - -# Optional back-ends -try: - import scipy.signal - - _has_scipy = True -except ImportError: - _has_scipy = False - -try: - import torchaudio - import torch - - _has_torchaudio = True -except ImportError: - _has_torchaudio = False - -log = logging.getLogger(__name__) - - -def resample_audio(frame: np.ndarray, from_sr: int, to_sr: int) -> np.ndarray: - """ - High-quality resampling used across the codebase. - - – Prefers scipy.signal.resample_poly (best SNR & speed). - – Falls back to torchaudio.transforms.Resample if SciPy missing. - – Final fallback is a simple average-pool loop (keeps tests green). - - Returns float32 or int16 array matching input dtype. - """ - if from_sr == to_sr: - return frame - - # ------------------------------------------------ SciPy path - if _has_scipy: - try: - return scipy.signal.resample_poly(frame, to_sr, from_sr, axis=-1) - except Exception as e: - log.warning("scipy resample failed: %s – trying fallback", e) - - # ------------------------------------------------ torchaudio path - if _has_torchaudio: - try: - tensor = torch.tensor(frame, dtype=torch.float32) - resampler = torchaudio.transforms.Resample( - orig_freq=from_sr, new_freq=to_sr - ) - return resampler(tensor).numpy() - except Exception as e: - log.warning("torchaudio resample failed: %s – using simple avg-pool", e) - - # ------------------------------------------------ naïve fallback - ratio = from_sr / to_sr - out_len = int(len(frame) / ratio) - out = np.zeros(out_len, dtype=np.float32) - for i in range(out_len): - start = int(i * ratio) - end = int((i + 1) * ratio) - chunk = frame[start:end] - if chunk.size: - out[i] = float(chunk.mean()) - - return out diff --git a/getstream/video/rtc/track_util.py b/getstream/video/rtc/track_util.py index 01f8cd1e..47ce54d7 100644 --- a/getstream/video/rtc/track_util.py +++ b/getstream/video/rtc/track_util.py @@ -1,9 +1,20 @@ import asyncio +import io +import wave import av import numpy as np import re -from typing import Dict, Any, NamedTuple, Callable, Optional +from typing import ( + Dict, + Any, + NamedTuple, + Callable, + Optional, + Union, + Iterator, + AsyncIterator, +) import logging import aiortc @@ -29,10 +40,15 @@ class PcmData(NamedTuple): format: str sample_rate: int - samples: NDArray + samples: NDArray = np.array([], dtype=np.int16) pts: Optional[int] = None # Presentation timestamp dts: Optional[int] = None # Decode timestamp time_base: Optional[float] = None # Time base for converting timestamps to seconds + channels: int = 1 # Number of channels (1=mono, 2=stereo) + + @property + def stereo(self) -> bool: + return self.channels == 2 @property def duration(self) -> float: @@ -47,16 +63,36 @@ def duration(self) -> float: # For f32 format, each element in the array is one sample (float32) if isinstance(self.samples, np.ndarray): - # Direct count of samples in the numpy array - num_samples = len(self.samples) + # If array has shape (channels, samples) or (samples, channels), duration uses the samples dimension + if self.samples.ndim == 2: + # Determine which dimension is samples vs channels + # Standard format is (channels, samples), but we need to handle both + ch = self.channels if self.channels else 1 + if self.samples.shape[0] == ch: + # Shape is (channels, samples) - correct format + num_samples = self.samples.shape[1] + elif self.samples.shape[1] == ch: + # Shape is (samples, channels) - transposed format + num_samples = self.samples.shape[0] + else: + # Ambiguous or unknown - assume (channels, samples) and pick larger dimension + # This handles edge cases like (2, 2) arrays + num_samples = max(self.samples.shape[0], self.samples.shape[1]) + else: + num_samples = len(self.samples) elif isinstance(self.samples, bytes): # If samples is bytes, calculate based on format if self.format == "s16": # For s16 format, each sample is 2 bytes (16 bits) - num_samples = len(self.samples) // 2 + # For multi-channel, divide by channels to get sample count + num_samples = len(self.samples) // ( + 2 * (self.channels if self.channels else 1) + ) elif self.format == "f32": # For f32 format, each sample is 4 bytes (32 bits) - num_samples = len(self.samples) // 4 + num_samples = len(self.samples) // ( + 4 * (self.channels if self.channels else 1) + ) else: # Default assumption for other formats (treat as raw bytes) num_samples = len(self.samples) @@ -73,6 +109,11 @@ def duration(self) -> float: # Calculate duration based on sample rate return num_samples / self.sample_rate + @property + def duration_ms(self) -> float: + """Duration in milliseconds computed from samples and sample rate.""" + return self.duration * 1000.0 + @property def pts_seconds(self) -> Optional[float]: if self.pts is not None and self.time_base is not None: @@ -85,6 +126,709 @@ def dts_seconds(self) -> Optional[float]: return self.dts * self.time_base return None + @classmethod + def from_bytes( + cls, + audio_bytes: bytes, + sample_rate: int = 16000, + format: str = "s16", + channels: int = 1, + ) -> "PcmData": + """Build from raw PCM bytes (interleaved). + + Example: + >>> import numpy as np + >>> b = np.array([1, -1, 2, -2], dtype=np.int16).tobytes() + >>> pcm = PcmData.from_bytes(b, sample_rate=16000, format="s16", channels=2) + >>> pcm.samples.shape[0] # channels-first + 2 + """ + # Determine dtype and bytes per sample + dtype: Any + width: int + if format == "s16": + dtype = np.int16 + width = 2 + elif format == "f32": + dtype = np.float32 + width = 4 + else: + dtype = np.int16 + width = 2 + + # Ensure buffer aligns to whole samples + if len(audio_bytes) % width != 0: + trimmed = len(audio_bytes) - (len(audio_bytes) % width) + if trimmed <= 0: + return cls( + samples=np.array([], dtype=dtype), + sample_rate=sample_rate, + format=format, + channels=channels, + ) + logger.debug( + "Trimming non-aligned PCM buffer: %d -> %d bytes", + len(audio_bytes), + trimmed, + ) + audio_bytes = audio_bytes[:trimmed] + + arr = np.frombuffer(audio_bytes, dtype=dtype) + if channels > 1 and arr.size > 0: + # Convert interleaved [L,R,L,R,...] to shape (channels, samples) + total_frames = (arr.size // channels) * channels + if total_frames != arr.size: + logger.debug( + "Trimming interleaved frames to channel multiple: %d -> %d elements", + arr.size, + total_frames, + ) + arr = arr[:total_frames] + try: + frames = arr.reshape(-1, channels) + arr = frames.T + except Exception: + logger.warning( + f"Unable to reshape audio buffer to {channels} channels; falling back to 1D" + ) + return cls( + samples=arr, sample_rate=sample_rate, format=format, channels=channels + ) + + @classmethod + def from_data( + cls, + data: Union[bytes, bytearray, memoryview, NDArray], + sample_rate: int = 16000, + format: str = "s16", + channels: int = 1, + ) -> "PcmData": + """Build from bytes or numpy arrays. + + Example: + >>> import numpy as np + >>> PcmData.from_data(np.array([1, 2], np.int16), sample_rate=16000, format="s16", channels=1).channels + 1 + """ + if isinstance(data, (bytes, bytearray, memoryview)): + return cls.from_bytes( + bytes(data), sample_rate=sample_rate, format=format, channels=channels + ) + + if isinstance(data, np.ndarray): + arr = data + # Ensure dtype aligns with format + if format == "s16" and arr.dtype != np.int16: + arr = arr.astype(np.int16) + elif format == "f32" and arr.dtype != np.float32: + arr = arr.astype(np.float32) + + # Normalize shape to (channels, samples) for multi-channel + if arr.ndim == 2: + if arr.shape[0] == channels: + samples_arr = arr + elif arr.shape[1] == channels: + samples_arr = arr.T + else: + # Assume first dimension is channels if ambiguous + samples_arr = arr + elif arr.ndim == 1: + if channels > 1: + try: + frames = arr.reshape(-1, channels) + samples_arr = frames.T + except Exception: + logger.warning( + f"Could not reshape 1D array to {channels} channels; keeping mono" + ) + channels = 1 + samples_arr = arr + else: + samples_arr = arr + else: + # Fallback + samples_arr = arr.reshape(-1) + channels = 1 + + return cls( + samples=samples_arr, + sample_rate=sample_rate, + format=format, + channels=channels, + ) + + # Unsupported type + raise TypeError(f"Unsupported data type for PcmData: {type(data)}") + + def resample( + self, + target_sample_rate: int, + target_channels: Optional[int] = None, + resampler: Optional[Any] = None, + ) -> "PcmData": + """Resample to target sample rate/channels. + + Example: + >>> import numpy as np + >>> pcm = PcmData(samples=np.arange(8, dtype=np.int16), sample_rate=16000, format="s16", channels=1) + >>> pcm.resample(16000, target_channels=2).channels + 2 + """ + if target_channels is None: + target_channels = self.channels + if self.sample_rate == target_sample_rate and target_channels == self.channels: + return self + + # Prepare ndarray shape for AV input frame. + # Use planar input (s16p) with shape (channels, samples). + in_layout = "mono" if self.channels == 1 else "stereo" + cmaj = self.samples + if isinstance(cmaj, np.ndarray): + if cmaj.ndim == 1: + # (samples,) -> (channels, samples) + if self.channels > 1: + cmaj = np.tile(cmaj, (self.channels, 1)) + else: + cmaj = cmaj.reshape(1, -1) + elif cmaj.ndim == 2: + # Normalize to (channels, samples) + ch = self.channels if self.channels else 1 + if cmaj.shape[0] == ch: + # Already (channels, samples) + pass + elif cmaj.shape[1] == ch: + # (samples, channels) -> transpose + cmaj = cmaj.T + else: + # Ambiguous - assume larger dim is samples + if cmaj.shape[1] > cmaj.shape[0]: + # Likely (channels, samples) + pass + else: + # Likely (samples, channels) + cmaj = cmaj.T + cmaj = np.ascontiguousarray(cmaj) + frame = av.AudioFrame.from_ndarray(cmaj, format="s16p", layout=in_layout) + frame.sample_rate = self.sample_rate + + # Use provided resampler or create a new one + if resampler is None: + # Create new resampler for one-off use + out_layout = "mono" if target_channels == 1 else "stereo" + resampler = av.AudioResampler( + format="s16", layout=out_layout, rate=target_sample_rate + ) + + # Resample the frame + resampled_frames = resampler.resample(frame) + if resampled_frames: + resampled_frame = resampled_frames[0] + # PyAV's to_ndarray() for packed format returns flattened interleaved data + # For stereo s16 (packed), it returns shape (1, num_values) where num_values = samples * channels + raw_array = resampled_frame.to_ndarray() + num_frames = resampled_frame.samples # Actual number of sample frames + + # Normalize output to (channels, samples) format + ch = int(target_channels) + + # Handle PyAV's packed format quirk: returns (1, num_values) for stereo + if raw_array.ndim == 2 and raw_array.shape[0] == 1 and ch > 1: + # Flatten and deinterleave packed stereo data + # Shape (1, 32000) -> (32000,) -> deinterleave to (2, 16000) + flat = raw_array.reshape(-1) + if len(flat) == num_frames * ch: + # Deinterleave: [L0,R0,L1,R1,...] -> [[L0,L1,...], [R0,R1,...]] + resampled_samples = flat.reshape(-1, ch).T + else: + logger.warning( + "Unexpected array size %d for %d frames x %d channels", + len(flat), + num_frames, + ch, + ) + resampled_samples = flat.reshape(ch, -1) + elif raw_array.ndim == 2: + # Standard case: (samples, channels) or already (channels, samples) + if raw_array.shape[1] == ch: + # (samples, channels) -> transpose to (channels, samples) + resampled_samples = raw_array.T + elif raw_array.shape[0] == ch: + # Already (channels, samples) + resampled_samples = raw_array + else: + # Ambiguous - assume time-major + resampled_samples = raw_array.T + elif raw_array.ndim == 1: + # 1D output (mono) + if ch == 1: + # Keep as 1D for mono + resampled_samples = raw_array + elif ch > 1: + # Shouldn't happen if we requested stereo, but handle it + logger.warning( + "Got 1D array but requested %d channels, duplicating", ch + ) + resampled_samples = np.tile(raw_array, (ch, 1)) + else: + resampled_samples = raw_array + else: + # Unexpected dimensionality + logger.warning( + "Unexpected ndim %d from PyAV, reshaping", raw_array.ndim + ) + resampled_samples = raw_array.reshape(ch, -1) + + # Flatten mono arrays to 1D for consistency + if ( + ch == 1 + and isinstance(resampled_samples, np.ndarray) + and resampled_samples.ndim > 1 + ): + resampled_samples = resampled_samples.flatten() + + # Ensure int16 dtype for s16 + if ( + isinstance(resampled_samples, np.ndarray) + and resampled_samples.dtype != np.int16 + ): + resampled_samples = resampled_samples.astype(np.int16) + + return PcmData( + samples=resampled_samples, + sample_rate=target_sample_rate, + format="s16", + pts=self.pts, + dts=self.dts, + time_base=self.time_base, + channels=target_channels, + ) + else: + # If resampling failed, return original data + return self + + def to_bytes(self) -> bytes: + """Return interleaved PCM bytes. + + Example: + >>> import numpy as np + >>> pcm = PcmData(samples=np.array([[1, -1]], np.int16), sample_rate=16000, format="s16", channels=1) + >>> len(pcm.to_bytes()) > 0 + True + """ + arr = self.samples + if isinstance(arr, np.ndarray): + if arr.ndim == 2: + channels = int(self.channels or arr.shape[0]) + # Normalize to (channels, samples) + if arr.shape[0] == channels: + cmaj = arr + elif arr.shape[1] == channels: + cmaj = arr.T + else: + logger.warning( + "to_bytes: ambiguous array shape %s for channels=%d; assuming time-major", + arr.shape, + channels, + ) + cmaj = arr.T + samples_count = cmaj.shape[1] + # Interleave channels explicitly to avoid any stride-related surprises + out = np.empty(samples_count * channels, dtype=cmaj.dtype) + for i in range(channels): + out[i::channels] = cmaj[i] + return out.tobytes() + return arr.tobytes() + # Fallback + if isinstance(arr, (bytes, bytearray)): + return bytes(arr) + try: + return bytes(arr) + except Exception: + logger.warning("Cannot convert samples to bytes; returning empty") + return b"" + + def to_wav_bytes(self) -> bytes: + """Return WAV bytes (header + frames). + + Example: + >>> import numpy as np + >>> pcm = PcmData(samples=np.array([0, 0], np.int16), sample_rate=16000, format="s16", channels=1) + >>> with open("out.wav", "wb") as f: # write to disk + ... _ = f.write(pcm.to_wav_bytes()) + """ + # Ensure s16 frames + if self.format != "s16": + arr = self.samples + if isinstance(arr, np.ndarray): + if arr.dtype != np.int16: + # Convert floats to int16 range + if arr.dtype != np.float32: + arr = arr.astype(np.float32) + arr = (np.clip(arr, -1.0, 1.0) * 32767.0).astype(np.int16) + frames = PcmData( + samples=arr, + sample_rate=self.sample_rate, + format="s16", + pts=self.pts, + dts=self.dts, + time_base=self.time_base, + channels=self.channels, + ).to_bytes() + else: + frames = self.to_bytes() + width = 2 + else: + frames = self.to_bytes() + width = 2 + + buf = io.BytesIO() + with wave.open(buf, "wb") as wf: + wf.setnchannels(self.channels or 1) + wf.setsampwidth(width) + wf.setframerate(self.sample_rate) + wf.writeframes(frames) + return buf.getvalue() + + def to_float32(self) -> "PcmData": + """Convert samples to float32 in [-1, 1]. + + Example: + >>> import numpy as np + >>> pcm = PcmData(samples=np.array([0, 1], np.int16), sample_rate=16000, format="s16", channels=1) + >>> pcm.to_float32().samples.dtype == np.float32 + True + """ + arr = self.samples + + # Normalize to a numpy array for conversion + if not isinstance(arr, np.ndarray): + try: + # Round-trip through bytes to reconstruct canonical ndarray shape + arr = PcmData.from_bytes( + self.to_bytes(), + sample_rate=self.sample_rate, + format=self.format, + channels=self.channels, + ).samples + except Exception: + # Fallback to from_data for robustness + arr = PcmData.from_data( + self.samples, + sample_rate=self.sample_rate, + format=self.format, + channels=self.channels, + ).samples + + # Convert to float32 and scale if needed + fmt = (self.format or "").lower() + if fmt in ("s16", "int16") or ( + isinstance(arr, np.ndarray) and arr.dtype == np.int16 + ): + arr_f32 = arr.astype(np.float32) / 32768.0 + else: + # Ensure dtype float32; values assumed already in [-1, 1] + arr_f32 = arr.astype(np.float32, copy=False) + + return PcmData( + samples=arr_f32, + sample_rate=self.sample_rate, + format="f32", + pts=self.pts, + dts=self.dts, + time_base=self.time_base, + channels=self.channels, + ) + + def append(self, other: "PcmData") -> "PcmData": + """Append another chunk after adjusting it to match self. + + Example: + >>> import numpy as np + >>> a = PcmData(samples=np.array([1, 2], np.int16), sample_rate=16000, format="s16", channels=1) + >>> b = PcmData(samples=np.array([3, 4], np.int16), sample_rate=16000, format="s16", channels=1) + >>> a.append(b).samples.tolist() + [1, 2, 3, 4] + """ + + # Early exits for empty cases + def _is_empty(arr: Any) -> bool: + try: + return isinstance(arr, np.ndarray) and arr.size == 0 + except Exception: + return False + + # Normalize numpy arrays from bytes-like if needed + def _ensure_ndarray(pcm: "PcmData") -> np.ndarray: + if isinstance(pcm.samples, np.ndarray): + return pcm.samples + return PcmData.from_bytes( + pcm.to_bytes(), + sample_rate=pcm.sample_rate, + format=pcm.format, + channels=pcm.channels, + ).samples + + # Adjust other to match sample rate and channels first + other_adj = other + if ( + other_adj.sample_rate != self.sample_rate + or other_adj.channels != self.channels + ): + other_adj = other_adj.resample( + self.sample_rate, target_channels=self.channels + ) + + # Then adjust format to match + fmt = (self.format or "").lower() + if fmt in ("f32", "float32"): + other_adj = other_adj.to_float32() + elif fmt in ("s16", "int16"): + # Ensure int16 dtype and mark as s16 + arr = _ensure_ndarray(other_adj) + if arr.dtype != np.int16: + if other_adj.format == "f32": + arr = (np.clip(arr.astype(np.float32), -1.0, 1.0) * 32767.0).astype( + np.int16 + ) + else: + arr = arr.astype(np.int16) + other_adj = PcmData( + samples=arr, + sample_rate=other_adj.sample_rate, + format="s16", + pts=other_adj.pts, + dts=other_adj.dts, + time_base=other_adj.time_base, + channels=other_adj.channels, + ) + else: + # For unknown formats, fallback to bytes round-trip in self's format + other_adj = PcmData.from_bytes( + other_adj.to_bytes(), + sample_rate=self.sample_rate, + format=self.format, + channels=self.channels, + ) + + # Ensure ndarrays for concatenation + self_arr = _ensure_ndarray(self) + other_arr = _ensure_ndarray(other_adj) + + # If either is empty, return the other while preserving self's metadata + if _is_empty(self_arr): + # Conform shape to target channels semantics and dtype + if isinstance(other_arr, np.ndarray): + if (self.channels or 1) == 1 and other_arr.ndim > 1: + other_arr = other_arr.reshape(-1) + target_dtype = ( + np.float32 + if (self.format or "").lower() in ("f32", "float32") + else np.int16 + ) + other_arr = other_arr.astype(target_dtype, copy=False) + return PcmData( + samples=other_arr, + sample_rate=self.sample_rate, + format=self.format, + pts=self.pts, + dts=self.dts, + time_base=self.time_base, + channels=self.channels, + ) + if _is_empty(other_arr): + return self + + ch = max(1, int(self.channels or 1)) + + # Concatenate respecting shape conventions + if ch == 1: + # Mono: keep 1D shape + if self_arr.ndim > 1: + self_arr = self_arr.reshape(-1) + if other_arr.ndim > 1: + other_arr = other_arr.reshape(-1) + out = np.concatenate([self_arr, other_arr]) + # Enforce dtype based on format + if (self.format or "").lower() in ( + "f32", + "float32", + ) and out.dtype != np.float32: + out = out.astype(np.float32) + elif (self.format or "").lower() in ( + "s16", + "int16", + ) and out.dtype != np.int16: + out = out.astype(np.int16) + return PcmData( + samples=out, + sample_rate=self.sample_rate, + format=self.format, + pts=self.pts, + dts=self.dts, + time_base=self.time_base, + channels=self.channels, + ) + else: + # Multi-channel: normalize to (channels, samples) + def _to_cmaj(arr: np.ndarray, channels: int) -> np.ndarray: + if arr.ndim == 2: + if arr.shape[0] == channels: + return arr + if arr.shape[1] == channels: + return arr.T + # Ambiguous; assume time-major and transpose + return arr.T + # 1D input: replicate across channels + return np.tile(arr.reshape(1, -1), (channels, 1)) + + self_cmaj = _to_cmaj(self_arr, ch) + other_cmaj = _to_cmaj(other_arr, ch) + out = np.concatenate([self_cmaj, other_cmaj], axis=1) + # Enforce dtype based on format + if (self.format or "").lower() in ( + "f32", + "float32", + ) and out.dtype != np.float32: + out = out.astype(np.float32) + elif (self.format or "").lower() in ( + "s16", + "int16", + ) and out.dtype != np.int16: + out = out.astype(np.int16) + + return PcmData( + samples=out, + sample_rate=self.sample_rate, + format=self.format, + pts=self.pts, + dts=self.dts, + time_base=self.time_base, + channels=self.channels, + ) + + @classmethod + def from_response( + cls, + response: Any, + *, + sample_rate: int = 16000, + channels: int = 1, + format: str = "s16", + ) -> Union["PcmData", Iterator["PcmData"], AsyncIterator["PcmData"]]: + """Normalize provider response to PcmData or iterators of it. + + Example: + >>> PcmData.from_response(bytes([0, 0]), sample_rate=16000, format="s16").sample_rate + 16000 + """ + + # bytes-like returns a single PcmData + if isinstance(response, (bytes, bytearray, memoryview)): + return cls.from_bytes( + bytes(response), + sample_rate=sample_rate, + channels=channels, + format=format, + ) + + # Already a PcmData + if isinstance(response, PcmData): + return response + + # Async iterator + if hasattr(response, "__aiter__"): + + async def _agen(): + width = 2 if format == "s16" else 4 if format == "f32" else 2 + frame_width = width * max(1, channels) + buf = bytearray() + async for item in response: + if isinstance(item, PcmData): + yield item + continue + data = getattr(item, "data", item) + if not isinstance(data, (bytes, bytearray, memoryview)): + raise TypeError("Async iterator yielded unsupported item type") + buf.extend(bytes(data)) + aligned = (len(buf) // frame_width) * frame_width + if aligned: + chunk = bytes(buf[:aligned]) + del buf[:aligned] + yield cls.from_bytes( + chunk, + sample_rate=sample_rate, + channels=channels, + format=format, + ) + # pad remainder, if any + if buf: + pad_len = (-len(buf)) % frame_width + if pad_len: + buf.extend(b"\x00" * pad_len) + yield cls.from_bytes( + bytes(buf), + sample_rate=sample_rate, + channels=channels, + format=format, + ) + + return _agen() + + # Sync iterator (but skip treating bytes as iterable of ints) + if hasattr(response, "__iter__") and not isinstance( + response, (str, bytes, bytearray, memoryview) + ): + + def _gen(): + width = 2 if format == "s16" else 4 if format == "f32" else 2 + frame_width = width * max(1, channels) + buf = bytearray() + for item in response: + if isinstance(item, PcmData): + yield item + continue + data = getattr(item, "data", item) + if not isinstance(data, (bytes, bytearray, memoryview)): + raise TypeError("Iterator yielded unsupported item type") + buf.extend(bytes(data)) + aligned = (len(buf) // frame_width) * frame_width + if aligned: + chunk = bytes(buf[:aligned]) + del buf[:aligned] + yield cls.from_bytes( + chunk, + sample_rate=sample_rate, + channels=channels, + format=format, + ) + if buf: + pad_len = (-len(buf)) % frame_width + if pad_len: + buf.extend(b"\x00" * pad_len) + yield cls.from_bytes( + bytes(buf), + sample_rate=sample_rate, + channels=channels, + format=format, + ) + + return _gen() + + # Single object with .data + if hasattr(response, "data"): + data = getattr(response, "data") + if isinstance(data, (bytes, bytearray, memoryview)): + return cls.from_bytes( + bytes(data), + sample_rate=sample_rate, + channels=channels, + format=format, + ) + + raise TypeError( + f"Unsupported response type for PcmData.from_response: {type(response)}" + ) + def patch_sdp_offer(sdp: str) -> str: """ diff --git a/tests/rtc/test_pcm_data.py b/tests/rtc/test_pcm_data.py new file mode 100644 index 00000000..fa436f83 --- /dev/null +++ b/tests/rtc/test_pcm_data.py @@ -0,0 +1,378 @@ +import numpy as np + +from getstream.video.rtc.track_util import PcmData + + +def _i16_list_from_bytes(b: bytes): + return list(np.frombuffer(b, dtype=np.int16)) + + +def test_to_bytes_interleaves_from_channel_major(): + # Create (channels, samples) data: L=[1,2,3,4], R=[-1,-2,-3,-4] + samples = np.array( + [ + [1, 2, 3, 4], + [-1, -2, -3, -4], + ], + dtype=np.int16, + ) + pcm = PcmData(samples=samples, sample_rate=16000, format="s16", channels=2) + out = _i16_list_from_bytes(pcm.to_bytes()) + assert out == [1, -1, 2, -2, 3, -3, 4, -4] + + +def test_to_bytes_interleaves_from_time_major(): + # Create (samples, channels) data: time-major + time_major = np.array( + [ + [1, -1], + [2, -2], + [3, -3], + [4, -4], + ], + dtype=np.int16, + ) + pcm = PcmData(samples=time_major, sample_rate=16000, format="s16", channels=2) + out = _i16_list_from_bytes(pcm.to_bytes()) + assert out == [1, -1, 2, -2, 3, -3, 4, -4] + + +def test_resample_upmix_produces_channel_major_and_interleaved_bytes(): + # Mono ramp 1..10 + mono = np.arange(1, 11, dtype=np.int16) + pcm_mono = PcmData(samples=mono, sample_rate=16000, format="s16", channels=1) + + # Upmix to stereo (same sample rate) + pcm_stereo = pcm_mono.resample(16000, target_channels=2) + assert pcm_stereo.channels == 2 + assert hasattr(pcm_stereo, "samples") + assert isinstance(pcm_stereo.samples, np.ndarray) + assert pcm_stereo.samples.ndim == 2 + # Expect (channels, samples) shape + assert pcm_stereo.samples.shape[0] == 2 + # Sample count may be >= input due to resampler buffering; check prefix + assert pcm_stereo.samples.shape[1] >= mono.shape[0] + # Both channels should be identical after upmix + assert np.array_equal(pcm_stereo.samples[0], pcm_stereo.samples[1]) + + # Bytes should be interleaved L0,R0,L1,R1,... + out_bytes = pcm_stereo.to_bytes() + # Verify interleaving pattern: L[i] == R[i] for a prefix + out_i16 = _i16_list_from_bytes(out_bytes) + # take first 2 * N pairs (N from input) + pairs = min(len(mono), len(out_i16) // 2) + left = out_i16[0 : 2 * pairs : 2] + right = out_i16[1 : 2 * pairs : 2] + assert left == right + + +def test_resample_rate_and_stereo_size_scaling(): + # 200 mono samples @16kHz -> expect ~3x samples at 48kHz and x2 for stereo + mono = np.arange(200, dtype=np.int16) + pcm_mono = PcmData(samples=mono, sample_rate=16000, format="s16", channels=1) + + pcm_48k_stereo = pcm_mono.resample(48000, target_channels=2) + out = pcm_48k_stereo.to_bytes() + + # 16-bit stereo -> 4 bytes per sample frame + # 20ms at 48k is 960 frames = 3840 bytes; our total depends on input size + # Sanity: output length should be >= input_bytes * 6 - small tolerance + input_bytes = mono.nbytes + assert len(out) >= input_bytes * 5 # conservative lower bound + + +# ===== Bug reproduction tests ===== + + +def test_bug_mono_to_stereo_duration_preserved(): + """ + BUG REPRODUCTION: Converting mono to stereo should preserve duration. + If duration changes, playback will be slowed down or sped up. + """ + # Create 1 second of mono audio at 16kHz + sample_rate = 16000 + duration_sec = 1.0 + num_samples = int(sample_rate * duration_sec) + + # Generate a simple sine wave + t = np.linspace(0, duration_sec, num_samples, dtype=np.float32) + audio = (np.sin(2 * np.pi * 440 * t) * 32767).astype(np.int16) + + pcm_mono = PcmData(samples=audio, sample_rate=sample_rate, format="s16", channels=1) + + # Check initial duration + mono_duration = pcm_mono.duration + print(f"\nMono duration: {mono_duration}s (expected ~1.0s)") + assert abs(mono_duration - duration_sec) < 0.01, ( + f"Mono duration should be ~{duration_sec}s, got {mono_duration}s" + ) + + # Convert to stereo (no resampling, just channel conversion) + pcm_stereo = pcm_mono.resample(sample_rate, target_channels=2) + + # Duration should be EXACTLY the same + stereo_duration = pcm_stereo.duration + print(f"Stereo duration: {stereo_duration}s (expected ~1.0s)") + print(f"Stereo shape: {pcm_stereo.samples.shape} (expected (2, {num_samples}))") + + assert abs(stereo_duration - duration_sec) < 0.01, ( + f"Stereo duration should be ~{duration_sec}s, got {stereo_duration}s (BUG: playback will be wrong!)" + ) + + # Verify shape is correct (channels, samples) + assert pcm_stereo.samples.shape[0] == 2, ( + f"First dimension should be channels (2), got shape {pcm_stereo.samples.shape}" + ) + assert pcm_stereo.samples.shape[1] >= num_samples - 10, ( + f"Second dimension should be ~samples ({num_samples}), got shape {pcm_stereo.samples.shape}" + ) + + +def test_bug_resample_16khz_to_48khz_quality(): + """ + BUG REPRODUCTION: Resampling 16kHz to 48kHz should produce correct sample count. + If sample count is wrong, quality will be bad. + """ + # Create 1 second of mono audio at 16kHz + sample_rate_in = 16000 + sample_rate_out = 48000 + duration_sec = 1.0 + num_samples_in = int(sample_rate_in * duration_sec) + + # Generate a simple sine wave + t = np.linspace(0, duration_sec, num_samples_in, dtype=np.float32) + audio = (np.sin(2 * np.pi * 440 * t) * 32767).astype(np.int16) + + pcm_16k = PcmData( + samples=audio, sample_rate=sample_rate_in, format="s16", channels=1 + ) + + # Resample to 48kHz + pcm_48k = pcm_16k.resample(sample_rate_out, target_channels=1) + + # Check that sample count increased by 3x (48k/16k = 3) + expected_samples = num_samples_in * 3 + actual_samples = ( + len(pcm_48k.samples) if pcm_48k.samples.ndim == 1 else pcm_48k.samples.shape[-1] + ) + + print(f"\n16kHz samples: {num_samples_in}") + print(f"48kHz samples: {actual_samples} (expected ~{expected_samples})") + print(f"48kHz shape: {pcm_48k.samples.shape}") + print(f"48kHz duration: {pcm_48k.duration}s (expected ~1.0s)") + + # Allow some tolerance for resampler edge effects + assert abs(actual_samples - expected_samples) < 100, ( + f"Expected ~{expected_samples} samples at 48kHz, got {actual_samples} (BUG: quality will be bad!)" + ) + + # Duration should remain the same + assert abs(pcm_48k.duration - duration_sec) < 0.01, ( + f"Duration should remain ~{duration_sec}s, got {pcm_48k.duration}s" + ) + + +def test_bug_resample_16khz_to_48khz_stereo_combined(): + """ + BUG REPRODUCTION: The worst case - 16kHz mono to 48kHz stereo. + This combines both bugs: resampling quality AND duration preservation. + """ + # Create 1 second of mono audio at 16kHz + sample_rate_in = 16000 + sample_rate_out = 48000 + duration_sec = 1.0 + num_samples_in = int(sample_rate_in * duration_sec) + + # Generate a simple sine wave + t = np.linspace(0, duration_sec, num_samples_in, dtype=np.float32) + audio = (np.sin(2 * np.pi * 440 * t) * 32767).astype(np.int16) + + pcm_16k = PcmData( + samples=audio, sample_rate=sample_rate_in, format="s16", channels=1 + ) + + # Resample to 48kHz stereo (the problematic case!) + pcm_48k_stereo = pcm_16k.resample(sample_rate_out, target_channels=2) + + print(f"\n16kHz mono shape: {pcm_16k.samples.shape}") + print(f"48kHz stereo shape: {pcm_48k_stereo.samples.shape}") + print(f"48kHz stereo duration: {pcm_48k_stereo.duration}s (expected ~1.0s)") + + # Check shape is correct (channels, samples) + assert pcm_48k_stereo.samples.ndim == 2, "Should be 2D array" + assert pcm_48k_stereo.samples.shape[0] == 2, ( + f"First dimension should be channels (2), got shape {pcm_48k_stereo.samples.shape}" + ) + + # Check that sample count increased by 3x (48k/16k = 3) + expected_samples = num_samples_in * 3 + actual_samples = pcm_48k_stereo.samples.shape[1] + + print(f"Expected ~{expected_samples} samples, got {actual_samples}") + + # Allow some tolerance for resampler edge effects + assert abs(actual_samples - expected_samples) < 100, ( + f"Expected ~{expected_samples} samples at 48kHz, got {actual_samples}" + ) + + # Duration should remain the same (THIS IS THE CRITICAL BUG) + assert abs(pcm_48k_stereo.duration - duration_sec) < 0.01, ( + f"Duration should remain ~{duration_sec}s, got {pcm_48k_stereo.duration}s (BUG: causes slow playback!)" + ) + + +def test_bug_duration_with_different_array_shapes(): + """ + BUG REPRODUCTION: Duration calculation should work with any array shape. + The bug is that shape[-1] is used, which gives wrong results for (samples, channels) arrays. + """ + sample_rate = 16000 + num_samples = 16000 # 1 second + expected_duration = 1.0 + + # Test 1: 1D array (mono) - should work + samples_1d = np.zeros(num_samples, dtype=np.int16) + pcm_1d = PcmData( + samples=samples_1d, sample_rate=sample_rate, format="s16", channels=1 + ) + print(f"\n1D mono: shape={pcm_1d.samples.shape}, duration={pcm_1d.duration}s") + assert abs(pcm_1d.duration - expected_duration) < 0.01 + + # Test 2: 2D array (channels, samples) - CORRECT format, should work + samples_2d_correct = np.zeros((2, num_samples), dtype=np.int16) + pcm_2d_correct = PcmData( + samples=samples_2d_correct, sample_rate=sample_rate, format="s16", channels=2 + ) + print( + f"2D (channels, samples): shape={pcm_2d_correct.samples.shape}, duration={pcm_2d_correct.duration}s" + ) + assert abs(pcm_2d_correct.duration - expected_duration) < 0.01 + + # Test 3: 2D array (samples, channels) - WRONG format but might happen from PyAV + # This is where the bug manifests! + samples_2d_wrong = np.zeros((num_samples, 2), dtype=np.int16) + pcm_2d_wrong = PcmData( + samples=samples_2d_wrong, sample_rate=sample_rate, format="s16", channels=2 + ) + wrong_duration = pcm_2d_wrong.duration + print( + f"2D (samples, channels): shape={pcm_2d_wrong.samples.shape}, duration={wrong_duration}s" + ) + + # With current buggy code using shape[-1], this will give duration = 2/16000 = 0.000125s + # But we want it to be 1.0s + # This assertion will FAIL with the bug + assert abs(wrong_duration - expected_duration) < 0.01, ( + f"Duration with (samples, channels) shape is WRONG: {wrong_duration}s (expected {expected_duration}s)" + ) + + +def test_to_float32_converts_int16_and_preserves_metadata(): + # Prepare a small mono int16 buffer + i16 = np.array([-32768, -16384, 0, 16384, 32767], dtype=np.int16) + sr = 16000 + pcm = PcmData(samples=i16, sample_rate=sr, format="s16", channels=1) + + f32 = pcm.to_float32() + + # Check metadata preserved + assert f32.sample_rate == sr + assert f32.channels == 1 + assert f32.format == "f32" + + # Check dtype and shape preserved (mono stays 1D) + assert isinstance(f32.samples, np.ndarray) + assert f32.samples.dtype == np.float32 + assert f32.samples.ndim == 1 + + # Check value scaling to [-1, 1] + expected = np.array([-1.0, -0.5, 0.0, 0.5, 32767 / 32768.0], dtype=np.float32) + assert np.allclose(f32.samples, expected, atol=1e-6) + + # Idempotency when already float32 + f32_2 = f32.to_float32() + assert f32_2.samples.dtype == np.float32 + assert np.allclose(f32_2.samples, f32.samples, atol=1e-7) + + +def test_append_mono_s16_concatenates_and_preserves_format(): + sr = 16000 + a = np.array([1, 2, 3, 4], dtype=np.int16) + b = np.array([5, 6], dtype=np.int16) + + pcm_a = PcmData(samples=a, sample_rate=sr, format="s16", channels=1) + pcm_b = PcmData(samples=b, sample_rate=sr, format="s16", channels=1) + + out = pcm_a.append(pcm_b) + + assert out.format == "s16" + assert out.channels == 1 + assert isinstance(out.samples, np.ndarray) + assert out.samples.dtype == np.int16 + assert out.samples.ndim == 1 + assert out.sample_rate == sr + assert np.array_equal(out.samples, np.array([1, 2, 3, 4, 5, 6], dtype=np.int16)) + + +def test_append_resamples_and_converts_to_match_target_format(): + # Target is float32 stereo 48kHz + base = np.array([[0.0, 0.1, -0.1], [0.0, 0.1, -0.1]], dtype=np.float32) + pcm_target = PcmData(samples=base, sample_rate=48000, format="f32", channels=2) + + # Other is s16 mono 16kHz + other_raw = np.array([1000, -1000, 1000, -1000, 1000, -1000], dtype=np.int16) + pcm_other = PcmData(samples=other_raw, sample_rate=16000, format="s16", channels=1) + + # Pre-compute expected resampled length by using the same resample pipeline + other_resampled = pcm_other.resample(48000, target_channels=2).to_float32() + if other_resampled.samples.ndim == 2: + expected_added = other_resampled.samples.shape[1] + else: + expected_added = other_resampled.samples.shape[0] + + out = pcm_target.append(pcm_other) + + # Check format/channels preserved and dtype matches + assert out.format == "f32" + assert out.channels == 2 + assert isinstance(out.samples, np.ndarray) and out.samples.dtype == np.float32 + assert out.samples.shape[0] == 2 + + # First part must equal the original base (append should not alter original) + assert np.allclose(out.samples[:, : base.shape[1]], base) + + # Total length should be base + resampled other + assert out.samples.shape[1] == base.shape[1] + expected_added + + +def test_append_empty_buffer_float32_adjusts_other_and_keeps_meta(): + # Create an empty buffer specifying desired output meta using alternate format name + buffer = PcmData(format="float32", sample_rate=16000, channels=1) + + # Other is int16 stereo at 48kHz, small ramp + other = np.array( + [[1000, -1000, 500, -500], [-1000, 1000, -500, 500]], dtype=np.int16 + ) + pcm_other = PcmData(samples=other, sample_rate=48000, format="s16", channels=2) + + # Expected result if we first resample/downmix then convert to float32 + expected_pcm = pcm_other.resample(16000, target_channels=1).to_float32() + + # Append to the empty buffer + out = buffer.append(pcm_other) + + # Metadata should be preserved from buffer + assert out.format in ("f32", "float32") + assert out.sample_rate == 16000 + assert out.channels == 1 + + # Data should match expected (mono float32) + assert isinstance(out.samples, np.ndarray) + assert out.samples.dtype == np.float32 + assert out.samples.ndim == 1 + # Normalize expected to 1D if needed + if isinstance(expected_pcm.samples, np.ndarray) and expected_pcm.samples.ndim == 2: + expected_samples = expected_pcm.samples.reshape(-1) + else: + expected_samples = expected_pcm.samples + assert np.allclose(out.samples[-expected_samples.shape[0] :], expected_samples)