Skip to content

Commit 846538f

Browse files
committed
move audio mixer inside of MediaDevices for ease of playback
1 parent ca27e5f commit 846538f

File tree

2 files changed

+140
-100
lines changed

2 files changed

+140
-100
lines changed

examples/local_audio/full_duplex.py

Lines changed: 10 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from dotenv import load_dotenv, find_dotenv
77

88
from livekit import api, rtc
9-
from db_meter import calculate_db_level, display_dual_db_meters
9+
from db_meter import calculate_db_level, display_single_db_meter
1010

1111

1212
async def main() -> None:
@@ -29,46 +29,16 @@ async def main() -> None:
2929
mic = devices.open_input()
3030
player = devices.open_output()
3131

32-
# Mixer for all remote audio streams
33-
mixer = rtc.AudioMixer(sample_rate=48000, num_channels=1)
34-
35-
# dB level monitoring
32+
# dB level monitoring (mic only)
3633
mic_db_queue = queue.Queue()
37-
room_db_queue = queue.Queue()
38-
39-
# Track stream bookkeeping for cleanup
40-
streams_by_pub: dict[str, rtc.AudioStream] = {}
41-
streams_by_participant: dict[str, set[rtc.AudioStream]] = {}
42-
43-
# remove stream from mixer and close it
44-
async def _remove_stream(
45-
stream: rtc.AudioStream, participant_sid: str | None = None, pub_sid: str | None = None
46-
) -> None:
47-
try:
48-
mixer.remove_stream(stream)
49-
except Exception:
50-
pass
51-
try:
52-
await stream.aclose()
53-
except Exception:
54-
pass
55-
if participant_sid and participant_sid in streams_by_participant:
56-
streams_by_participant.get(participant_sid, set()).discard(stream)
57-
if not streams_by_participant.get(participant_sid):
58-
streams_by_participant.pop(participant_sid, None)
59-
if pub_sid is not None:
60-
streams_by_pub.pop(pub_sid, None)
6134

6235
def on_track_subscribed(
6336
track: rtc.Track,
6437
publication: rtc.RemoteTrackPublication,
6538
participant: rtc.RemoteParticipant,
6639
):
6740
if track.kind == rtc.TrackKind.KIND_AUDIO:
68-
stream = rtc.AudioStream(track, sample_rate=48000, num_channels=1)
69-
streams_by_pub[publication.sid] = stream
70-
streams_by_participant.setdefault(participant.sid, set()).add(stream)
71-
mixer.add_stream(stream)
41+
player.add_track(track)
7242
logging.info("subscribed to audio from %s", participant.identity)
7343

7444
room.on("track_subscribed", on_track_subscribed)
@@ -78,37 +48,11 @@ def on_track_unsubscribed(
7848
publication: rtc.RemoteTrackPublication,
7949
participant: rtc.RemoteParticipant,
8050
):
81-
stream = streams_by_pub.get(publication.sid)
82-
if stream is not None:
83-
asyncio.create_task(_remove_stream(stream, participant.sid, publication.sid))
84-
logging.info("unsubscribed from audio of %s", participant.identity)
51+
asyncio.create_task(player.remove_track(track))
52+
logging.info("unsubscribed from audio of %s", participant.identity)
8553

8654
room.on("track_unsubscribed", on_track_unsubscribed)
8755

88-
def on_track_unpublished(
89-
publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant
90-
):
91-
stream = streams_by_pub.get(publication.sid)
92-
if stream is not None:
93-
asyncio.create_task(_remove_stream(stream, participant.sid, publication.sid))
94-
logging.info("track unpublished: %s from %s", publication.sid, participant.identity)
95-
96-
room.on("track_unpublished", on_track_unpublished)
97-
98-
def on_participant_disconnected(participant: rtc.RemoteParticipant):
99-
streams = list(streams_by_participant.pop(participant.sid, set()))
100-
for stream in streams:
101-
# Best-effort discover publication sid
102-
pub_sid = None
103-
for k, v in list(streams_by_pub.items()):
104-
if v is stream:
105-
pub_sid = k
106-
break
107-
asyncio.create_task(_remove_stream(stream, participant.sid, pub_sid))
108-
logging.info("participant disconnected: %s", participant.identity)
109-
110-
room.on("participant_disconnected", on_participant_disconnected)
111-
11256
token = (
11357
api.AccessToken(api_key, api_secret)
11458
.with_identity("local-audio")
@@ -135,31 +79,15 @@ def on_participant_disconnected(participant: rtc.RemoteParticipant):
13579

13680
# Start dB meter display in a separate thread
13781
meter_thread = threading.Thread(
138-
target=display_dual_db_meters,
139-
args=(mic_db_queue, room_db_queue, room.name),
82+
target=display_single_db_meter,
83+
args=(mic_db_queue,),
84+
kwargs={"label": "Mic Level: "},
14085
daemon=True
14186
)
14287
meter_thread.start()
14388

144-
# Create a monitoring wrapper for the mixer that calculates dB levels
145-
# while passing frames through to the player
146-
async def monitored_mixer():
147-
try:
148-
async for frame in mixer:
149-
# Calculate dB level for room audio
150-
samples = list(frame.data)
151-
db_level = calculate_db_level(samples)
152-
try:
153-
room_db_queue.put_nowait(db_level)
154-
except queue.Full:
155-
pass # Drop if queue is full
156-
# Yield the frame for playback
157-
yield frame
158-
except Exception:
159-
pass
160-
161-
# Start playing mixed remote audio with monitoring
162-
asyncio.create_task(player.play(monitored_mixer()))
89+
# Start playing mixed remote audio (tracks added via event handlers)
90+
await player.start()
16391

16492
# Monitor microphone dB levels
16593
async def monitor_mic_db():
@@ -191,7 +119,6 @@ async def monitor_mic_db():
191119
pass
192120
finally:
193121
await mic.aclose()
194-
await mixer.aclose()
195122
await player.aclose()
196123
try:
197124
await room.disconnect()

livekit-rtc/livekit/rtc/media_devices.py

Lines changed: 130 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
from . import AudioSource
2626
from .audio_frame import AudioFrame
2727
from .apm import AudioProcessingModule
28+
from .audio_mixer import AudioMixer
29+
from .audio_stream import AudioStream
30+
from .track import Track
2831

2932
"""
3033
Media device helpers built on top of the `sounddevice` library.
@@ -121,6 +124,10 @@ class OutputPlayer:
121124
When `apm_for_reverse` is provided, this player will feed the same PCM it
122125
renders (in 10 ms frames) into the APM reverse path so that echo
123126
cancellation can correlate mic input with speaker output.
127+
128+
The OutputPlayer includes an internal `AudioMixer` for convenient multi-track
129+
playback. Use `add_track()` and `remove_track()` to dynamically manage tracks,
130+
then call `start()` to begin playback.
124131
"""
125132

126133
def __init__(
@@ -142,6 +149,10 @@ def __init__(
142149
self._play_task: Optional[asyncio.Task] = None
143150
self._running = False
144151
self._delay_estimator = delay_estimator
152+
153+
# Internal mixer for add_track/remove_track API
154+
self._mixer: Optional[AudioMixer] = None
155+
self._track_streams: dict[str, AudioStream] = {} # track.sid -> AudioStream
145156

146157
def _callback(outdata: np.ndarray, frame_count: int, time_info: Any, status: Any) -> None:
147158
# Pull PCM int16 from buffer; zero if not enough
@@ -197,31 +208,133 @@ def _callback(outdata: np.ndarray, frame_count: int, time_info: Any, status: Any
197208
blocksize=blocksize,
198209
)
199210

200-
async def play(self, stream: AsyncIterator[AudioFrame]) -> None:
201-
"""Render an async iterator of `AudioFrame` to the output device.
211+
def add_track(self, track: Track) -> None:
212+
"""Add an audio track to the internal mixer for playback.
213+
214+
This creates an `AudioStream` from the track and adds it to the internal
215+
mixer. The mixer is created lazily on first track addition. Call `start()`
216+
to begin playback of all added tracks.
202217
203-
The raw PCM data is appended to an internal buffer consumed by the
204-
realtime callback. If an APM was supplied, reverse frames are fed for AEC.
218+
Args:
219+
track: The audio track to add (typically from a remote participant).
220+
221+
Raises:
222+
ValueError: If the track is not an audio track or has already been added.
205223
"""
206-
self._running = True
207-
self._stream.start()
208-
try:
209-
async for frame in stream:
210-
if not self._running:
211-
break
212-
# Append raw PCM bytes for callback consumption
213-
self._buffer.extend(frame.data.tobytes())
214-
finally:
215-
self._running = False
224+
if track.sid in self._track_streams:
225+
raise ValueError(f"Track {track.sid} already added to player")
226+
227+
# Create mixer on first track addition
228+
if self._mixer is None:
229+
self._mixer = AudioMixer(
230+
sample_rate=self._sample_rate,
231+
num_channels=self._num_channels
232+
)
233+
234+
# Create audio stream for this track
235+
stream = AudioStream(
236+
track,
237+
sample_rate=self._sample_rate,
238+
num_channels=self._num_channels
239+
)
240+
241+
self._track_streams[track.sid] = stream
242+
self._mixer.add_stream(stream)
243+
244+
async def remove_track(self, track: Track) -> None:
245+
"""Remove an audio track from the internal mixer.
246+
247+
This removes the track's stream from the mixer and closes it.
248+
249+
Args:
250+
track: The audio track to remove.
251+
"""
252+
stream = self._track_streams.pop(track.sid, None)
253+
if stream is None:
254+
return
255+
256+
if self._mixer is not None:
216257
try:
217-
self._stream.stop()
218-
self._stream.close()
258+
self._mixer.remove_stream(stream)
219259
except Exception:
220260
pass
261+
262+
try:
263+
await stream.aclose()
264+
except Exception:
265+
pass
266+
267+
async def start(self) -> None:
268+
"""Start playback of all tracks in the internal mixer.
269+
270+
This begins a background task that consumes frames from the internal mixer
271+
and sends them to the output device. Tracks can be added or removed
272+
dynamically using `add_track()` and `remove_track()`.
273+
274+
Raises:
275+
RuntimeError: If playback is already started or no mixer is available.
276+
"""
277+
if self._play_task is not None and not self._play_task.done():
278+
raise RuntimeError("Playback already started")
279+
280+
if self._mixer is None:
281+
self._mixer = AudioMixer(
282+
sample_rate=self._sample_rate,
283+
num_channels=self._num_channels
284+
)
285+
286+
async def _playback_loop():
287+
"""Internal playback loop that consumes frames from the mixer."""
288+
self._running = True
289+
self._stream.start()
290+
try:
291+
async for frame in self._mixer:
292+
if not self._running:
293+
break
294+
# Append raw PCM bytes for callback consumption
295+
self._buffer.extend(frame.data.tobytes())
296+
finally:
297+
self._running = False
298+
try:
299+
self._stream.stop()
300+
self._stream.close()
301+
except Exception:
302+
pass
303+
304+
self._play_task = asyncio.create_task(_playback_loop())
221305

222306
async def aclose(self) -> None:
223-
"""Stop playback and close the output stream."""
307+
"""Stop playback and close the output stream.
308+
309+
This also cleans up all added tracks and the internal mixer.
310+
"""
224311
self._running = False
312+
313+
# Cancel playback task if running
314+
if self._play_task is not None and not self._play_task.done():
315+
self._play_task.cancel()
316+
try:
317+
await self._play_task
318+
except asyncio.CancelledError:
319+
pass
320+
321+
# Clean up all track streams
322+
for stream in list(self._track_streams.values()):
323+
try:
324+
await stream.aclose()
325+
except Exception:
326+
pass
327+
self._track_streams.clear()
328+
329+
# Close mixer
330+
if self._mixer is not None:
331+
try:
332+
await self._mixer.aclose()
333+
except Exception:
334+
pass
335+
self._mixer = None
336+
337+
# Close output stream
225338
try:
226339
self._stream.stop()
227340
self._stream.close()

0 commit comments

Comments
 (0)