Skip to content

Commit ce21c84

Browse files
committed
test implementation
1 parent 297d1d0 commit ce21c84

File tree

1 file changed

+153
-0
lines changed

1 file changed

+153
-0
lines changed

examples/play_audio_stream.py

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
import asyncio
2+
import os
3+
import numpy as np
4+
import sounddevice as sd
5+
6+
from livekit import rtc, api
7+
from livekit.plugins import noise_cancellation
8+
9+
SAMPLERATE = 48000
10+
BLOCKSIZE = 480 # 10ms chunks at 48kHz
11+
CHANNELS = 1
12+
13+
class AudioBuffer:
14+
def __init__(self, blocksize=BLOCKSIZE):
15+
self.blocksize = blocksize
16+
self.buffer = np.array([], dtype=np.int16)
17+
18+
def add_frame(self, frame_data):
19+
self.buffer = np.concatenate([self.buffer, frame_data])
20+
21+
def get_chunk(self):
22+
if len(self.buffer) >= self.blocksize:
23+
chunk = self.buffer[:self.blocksize]
24+
self.buffer = self.buffer[self.blocksize:]
25+
return chunk
26+
return None
27+
28+
def get_padded_chunk(self):
29+
if len(self.buffer) > 0:
30+
chunk = np.zeros(self.blocksize, dtype=np.int16)
31+
available = min(len(self.buffer), self.blocksize)
32+
chunk[:available] = self.buffer[:available]
33+
self.buffer = self.buffer[available:]
34+
return chunk
35+
return np.zeros(self.blocksize, dtype=np.int16)
36+
37+
async def audio_player(queue: asyncio.Queue):
38+
"""Pull from the queue and stream audio using sounddevice."""
39+
buffer = AudioBuffer(BLOCKSIZE)
40+
41+
def callback(outdata, frames, time, status):
42+
if status:
43+
print(f"Audio callback status: {status}")
44+
45+
# Try to fill buffer from queue
46+
while not queue.empty():
47+
try:
48+
data = queue.get_nowait()
49+
buffer.add_frame(data)
50+
except asyncio.QueueEmpty:
51+
break
52+
53+
# Get exactly the right amount of data
54+
chunk = buffer.get_chunk()
55+
if chunk is not None:
56+
outdata[:] = chunk.reshape(-1, 1)
57+
else:
58+
# Not enough data, use what we have padded with zeros
59+
outdata[:] = buffer.get_padded_chunk().reshape(-1, 1)
60+
61+
stream = sd.OutputStream(
62+
samplerate=SAMPLERATE,
63+
channels=CHANNELS,
64+
blocksize=BLOCKSIZE,
65+
dtype='int16',
66+
callback=callback,
67+
latency='low'
68+
)
69+
with stream:
70+
while True:
71+
await asyncio.sleep(0.1) # keep the loop alive
72+
73+
async def rtc_session(room, queue: asyncio.Queue):
74+
track: rtc.RemoteAudioTrack | None = None
75+
while not track:
76+
for participant in room.remote_participants.values():
77+
for t in participant.track_publications.values():
78+
if t.kind == rtc.TrackKind.KIND_AUDIO and t.subscribed:
79+
track = t.track
80+
break
81+
if track:
82+
break
83+
if not track:
84+
print("waiting for audio track")
85+
await asyncio.sleep(2)
86+
87+
stream = rtc.AudioStream.from_track(
88+
track=track,
89+
sample_rate=SAMPLERATE,
90+
num_channels=1,
91+
noise_cancellation=noise_cancellation.BVC(), # or NC()
92+
)
93+
94+
print("playing stream")
95+
try:
96+
# Process audio frames from the stream
97+
async for audio_frame_event in stream:
98+
frame = audio_frame_event.frame
99+
100+
audio_data = np.frombuffer(frame.data, dtype=np.int16)
101+
102+
try:
103+
await queue.put(audio_data)
104+
except asyncio.QueueFull:
105+
# Skip this frame if queue is full
106+
print("Warning: Audio queue full, dropping frame")
107+
continue
108+
109+
finally:
110+
# Clean up the stream when done
111+
await stream.aclose()
112+
113+
async def main():
114+
queue = asyncio.Queue(maxsize=50)
115+
player_task = asyncio.create_task(audio_player(queue))
116+
117+
token = (
118+
api.AccessToken()
119+
.with_identity("python-bot")
120+
.with_name("Python Bot")
121+
.with_grants(
122+
api.VideoGrants(
123+
room_join=True,
124+
room="my-room",
125+
agent=True,
126+
)
127+
)
128+
.to_jwt()
129+
)
130+
url = os.getenv("LIVEKIT_URL")
131+
132+
room = rtc.Room()
133+
await room.connect(
134+
url,
135+
token,
136+
options=rtc.RoomOptions(
137+
auto_subscribe=True,
138+
),
139+
)
140+
print(f"Connected to room: {room.name}")
141+
142+
try:
143+
await rtc_session(room, queue)
144+
finally:
145+
# Clean up
146+
await room.disconnect()
147+
player_task.cancel()
148+
try:
149+
await player_task
150+
except asyncio.CancelledError:
151+
pass
152+
153+
asyncio.run(main())

0 commit comments

Comments
 (0)