Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/3645.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Fixed `SmallWebRTCTransport` not respecting `TransportParams.audio_out_10ms_chunks` parameter. The transport was hardcoded to produce 10ms audio frames regardless of the configured chunk size.
35 changes: 23 additions & 12 deletions src/pipecat/transports/smallwebrtc/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,24 @@ class RawAudioTrack(AudioStreamTrack):
supporting queued audio data with proper synchronization.
"""

def __init__(self, sample_rate):
def __init__(self, sample_rate: int, num_10ms_chunks: int = 1):
"""Initialize the raw audio track.

Args:
sample_rate: The audio sample rate in Hz.
num_10ms_chunks: Number of 10ms chunks per output frame (default 1).
"""
super().__init__()
self._sample_rate = sample_rate
self._num_10ms_chunks = num_10ms_chunks
self._samples_per_10ms = sample_rate * 10 // 1000
self._bytes_per_10ms = self._samples_per_10ms * 2 # 16-bit (2 bytes per sample)
# Calculate chunk size based on num_10ms_chunks
self._samples_per_chunk = self._samples_per_10ms * num_10ms_chunks
self._bytes_per_chunk = self._bytes_per_10ms * num_10ms_chunks
self._timestamp = 0
self._start = time.time()
# Queue of (bytes, future), broken into 10ms sub chunks as needed
# Queue of (bytes, future), broken into configured chunk sizes as needed
self._chunk_queue = deque()

def add_audio_bytes(self, audio_bytes: bytes):
Expand All @@ -103,17 +108,20 @@ def add_audio_bytes(self, audio_bytes: bytes):
A Future that completes when the data is processed.

Raises:
ValueError: If audio bytes are not a multiple of 10ms size.
ValueError: If audio bytes are not a multiple of the configured chunk size.
"""
if len(audio_bytes) % self._bytes_per_10ms != 0:
raise ValueError("Audio bytes must be a multiple of 10ms size.")
if len(audio_bytes) % self._bytes_per_chunk != 0:
raise ValueError(
f"Audio bytes must be a multiple of {self._num_10ms_chunks * 10}ms size "
f"({self._bytes_per_chunk} bytes)."
)
future = asyncio.get_running_loop().create_future()

# Break input into 10ms chunks
for i in range(0, len(audio_bytes), self._bytes_per_10ms):
chunk = audio_bytes[i : i + self._bytes_per_10ms]
# Break input into configured chunk sizes
for i in range(0, len(audio_bytes), self._bytes_per_chunk):
chunk = audio_bytes[i : i + self._bytes_per_chunk]
# Only the last chunk carries the future to be resolved once fully consumed
fut = future if i + self._bytes_per_10ms >= len(audio_bytes) else None
fut = future if i + self._bytes_per_chunk >= len(audio_bytes) else None
self._chunk_queue.append((chunk, fut))

return future
Expand All @@ -135,7 +143,7 @@ async def recv(self):
if future and not future.done():
future.set_result(True)
else:
chunk = bytes(self._bytes_per_10ms) # silence
chunk = bytes(self._bytes_per_chunk) # silence

# Convert the byte data to an ndarray of int16 samples
samples = np.frombuffer(chunk, dtype=np.int16)
Expand All @@ -145,7 +153,7 @@ async def recv(self):
frame.sample_rate = self._sample_rate
frame.pts = self._timestamp
frame.time_base = fractions.Fraction(1, self._sample_rate)
self._timestamp += self._samples_per_10ms
self._timestamp += self._samples_per_chunk
return frame


Expand Down Expand Up @@ -493,7 +501,10 @@ async def _handle_client_connected(self):
self._video_input_track = self._webrtc_connection.video_input_track()
self._screen_video_track = self._webrtc_connection.screen_video_input_track()
if self._params.audio_out_enabled:
self._audio_output_track = RawAudioTrack(sample_rate=self._out_sample_rate)
self._audio_output_track = RawAudioTrack(
sample_rate=self._out_sample_rate,
num_10ms_chunks=self._params.audio_out_10ms_chunks,
)
self._webrtc_connection.replace_audio_track(self._audio_output_track)

if self._params.video_out_enabled:
Expand Down
Loading