Skip to content

Commit 3b49909

Browse files
authored
DataStreamAudioOutput (#687)
1 parent 5d49bab commit 3b49909

File tree

6 files changed

+376
-2
lines changed

6 files changed

+376
-2
lines changed

.changeset/cold-cooks-smash.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@livekit/agents': patch
3+
---
4+
5+
Add avatar datastream io component

agents/src/utils.ts

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
11
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
4-
import { AudioFrame, AudioResampler } from '@livekit/rtc-node';
4+
import type {
5+
ParticipantKind,
6+
RemoteParticipant,
7+
RemoteTrackPublication,
8+
Room,
9+
TrackKind,
10+
} from '@livekit/rtc-node';
11+
import { AudioFrame, AudioResampler, RoomEvent } from '@livekit/rtc-node';
512
import { EventEmitter, once } from 'node:events';
613
import type { ReadableStream } from 'node:stream/web';
714
import { TransformStream, type TransformStreamDefaultController } from 'node:stream/web';
@@ -697,3 +704,112 @@ export function delay(ms: number, options: DelayOptions = {}): Promise<void> {
697704
signal?.addEventListener('abort', abort, { once: true });
698705
});
699706
}
707+
708+
/**
709+
* Returns a participant that matches the given identity. If identity is None, the first
710+
* participant that joins the room will be returned.
711+
* If the participant has already joined, the function will return immediately.
712+
* @param room - The room to wait for a participant in.
713+
* @param identity - The identity of the participant to wait for.
714+
* @param kind - The kind of the participant to wait for.
715+
* @returns A promise that resolves to the participant.
716+
*/
717+
export async function waitForParticipant({
718+
room,
719+
identity,
720+
kind,
721+
}: {
722+
room: Room;
723+
identity?: string;
724+
kind?: ParticipantKind | ParticipantKind[];
725+
}): Promise<RemoteParticipant> {
726+
if (!room.isConnected) {
727+
throw new Error('Room is not connected');
728+
}
729+
730+
const fut = new Future<RemoteParticipant>();
731+
732+
const kindMatch = (participant: RemoteParticipant) => {
733+
if (kind === undefined) return true;
734+
735+
if (Array.isArray(kind)) {
736+
return kind.includes(participant.kind);
737+
}
738+
739+
return participant.kind === kind;
740+
};
741+
742+
const onParticipantConnected = (p: RemoteParticipant) => {
743+
if ((identity === undefined || p.identity === identity) && kindMatch(p)) {
744+
if (!fut.done) {
745+
fut.resolve(p);
746+
}
747+
}
748+
};
749+
750+
room.on(RoomEvent.ParticipantConnected, onParticipantConnected);
751+
752+
try {
753+
for (const p of room.remoteParticipants.values()) {
754+
onParticipantConnected(p);
755+
if (fut.done) {
756+
break;
757+
}
758+
}
759+
760+
return await fut.await;
761+
} finally {
762+
room.off(RoomEvent.ParticipantConnected, onParticipantConnected);
763+
}
764+
}
765+
766+
export async function waitForTrackPublication({
767+
room,
768+
identity,
769+
kind,
770+
}: {
771+
room: Room;
772+
identity: string;
773+
kind: TrackKind;
774+
}): Promise<RemoteTrackPublication> {
775+
if (!room.isConnected) {
776+
throw new Error('Room is not connected');
777+
}
778+
779+
const fut = new Future<RemoteTrackPublication>();
780+
781+
const kindMatch = (k: TrackKind | undefined) => {
782+
if (kind === undefined || kind === null) {
783+
return true;
784+
}
785+
return k === kind;
786+
};
787+
788+
const onTrackPublished = (
789+
publication: RemoteTrackPublication,
790+
participant: RemoteParticipant,
791+
) => {
792+
if (fut.done) return;
793+
if (
794+
(identity === undefined || participant.identity === identity) &&
795+
kindMatch(publication.kind)
796+
) {
797+
fut.resolve(publication);
798+
}
799+
};
800+
801+
room.on(RoomEvent.TrackPublished, onTrackPublished);
802+
803+
try {
804+
for (const p of room.remoteParticipants.values()) {
805+
for (const publication of p.trackPublications.values()) {
806+
onTrackPublished(publication, p);
807+
if (fut.done) break;
808+
}
809+
}
810+
811+
return await fut.await;
812+
} finally {
813+
room.off(RoomEvent.TrackPublished, onTrackPublished);
814+
}
815+
}
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
import { Mutex } from '@livekit/mutex';
5+
import {
6+
type AudioFrame,
7+
type ByteStreamWriter,
8+
type Room,
9+
RoomEvent,
10+
type RpcInvocationData,
11+
type TrackKind,
12+
} from '@livekit/rtc-node';
13+
import { log } from '../../log.js';
14+
import {
15+
Future,
16+
Task,
17+
shortuuid,
18+
waitForParticipant,
19+
waitForTrackPublication,
20+
} from '../../utils.js';
21+
import { AudioOutput, type PlaybackFinishedEvent } from '../io.js';
22+
23+
const RPC_CLEAR_BUFFER = 'lk.clear_buffer';
24+
const RPC_PLAYBACK_FINISHED = 'lk.playback_finished';
25+
const AUDIO_STREAM_TOPIC = 'lk.audio_stream';
26+
27+
export interface DataStreamAudioOutputOptions {
28+
room: Room;
29+
destinationIdentity: string;
30+
sampleRate?: number;
31+
waitRemoteTrack?: TrackKind;
32+
}
33+
34+
/**
35+
* AudioOutput implementation that streams audio to a remote avatar worker using LiveKit DataStream.
36+
*/
37+
export class DataStreamAudioOutput extends AudioOutput {
38+
static _playbackFinishedRpcRegistered: boolean = false;
39+
static _playbackFinishedHandlers: Record<string, (data: RpcInvocationData) => string> = {};
40+
41+
private room: Room;
42+
private destinationIdentity: string;
43+
private roomConnectedFuture: Future<void>;
44+
private waitRemoteTrack?: TrackKind;
45+
private streamWriter?: ByteStreamWriter;
46+
private pushedDuration: number = 0;
47+
private started: boolean = false;
48+
private lock = new Mutex();
49+
private startTask?: Task<void>;
50+
51+
#logger = log();
52+
53+
constructor(opts: DataStreamAudioOutputOptions) {
54+
super(opts.sampleRate, undefined);
55+
56+
const { room, destinationIdentity, sampleRate, waitRemoteTrack } = opts;
57+
this.room = room;
58+
this.destinationIdentity = destinationIdentity;
59+
this.sampleRate = sampleRate;
60+
this.waitRemoteTrack = waitRemoteTrack;
61+
62+
const onRoomConnected = async () => {
63+
if (this.startTask) return;
64+
65+
await this.roomConnectedFuture.await;
66+
67+
// register the rpc method right after the room is connected
68+
DataStreamAudioOutput.registerPlaybackFinishedRpc({
69+
room,
70+
callerIdentity: this.destinationIdentity,
71+
handler: (data) => this.handlePlaybackFinished(data),
72+
});
73+
74+
this.startTask = Task.from(({ signal }) => this._start(signal));
75+
};
76+
77+
this.roomConnectedFuture = new Future<void>();
78+
79+
this.room.on(RoomEvent.ConnectionStateChanged, (_) => {
80+
if (room.isConnected && !this.roomConnectedFuture.done) {
81+
this.roomConnectedFuture.resolve(undefined);
82+
}
83+
});
84+
85+
if (this.room.isConnected) {
86+
this.roomConnectedFuture.resolve(undefined);
87+
}
88+
89+
onRoomConnected();
90+
}
91+
92+
private async _start(_abortSignal: AbortSignal) {
93+
const unlock = await this.lock.lock();
94+
95+
try {
96+
if (this.started) return;
97+
98+
await this.roomConnectedFuture.await;
99+
100+
this.#logger.debug(
101+
{
102+
identity: this.destinationIdentity,
103+
},
104+
'waiting for the remote participant',
105+
);
106+
107+
await waitForParticipant({
108+
room: this.room,
109+
identity: this.destinationIdentity,
110+
});
111+
112+
if (this.waitRemoteTrack) {
113+
this.#logger.debug(
114+
{
115+
identity: this.destinationIdentity,
116+
kind: this.waitRemoteTrack,
117+
},
118+
'waiting for the remote track',
119+
);
120+
121+
await waitForTrackPublication({
122+
room: this.room,
123+
identity: this.destinationIdentity,
124+
kind: this.waitRemoteTrack,
125+
});
126+
}
127+
128+
this.#logger.debug(
129+
{
130+
identity: this.destinationIdentity,
131+
},
132+
'remote participant ready',
133+
);
134+
135+
this.started = true;
136+
} finally {
137+
unlock();
138+
}
139+
}
140+
141+
async captureFrame(frame: AudioFrame): Promise<void> {
142+
if (!this.startTask) {
143+
this.startTask = Task.from(({ signal }) => this._start(signal));
144+
}
145+
146+
await this.startTask.result;
147+
await super.captureFrame(frame);
148+
149+
if (!this.streamWriter) {
150+
this.streamWriter = await this.room.localParticipant!.streamBytes({
151+
name: shortuuid('AUDIO_'),
152+
topic: AUDIO_STREAM_TOPIC,
153+
destinationIdentities: [this.destinationIdentity],
154+
attributes: {
155+
sample_rate: frame.sampleRate.toString(),
156+
num_channels: frame.channels.toString(),
157+
},
158+
});
159+
this.pushedDuration = 0;
160+
}
161+
162+
// frame.data is a Int16Array, write accepts a Uint8Array
163+
await this.streamWriter.write(new Uint8Array(frame.data.buffer));
164+
this.pushedDuration += frame.samplesPerChannel / frame.sampleRate;
165+
}
166+
167+
flush(): void {
168+
super.flush();
169+
170+
if (this.streamWriter === undefined || !this.started) {
171+
return;
172+
}
173+
174+
this.streamWriter.close().finally(() => {
175+
this.streamWriter = undefined;
176+
});
177+
}
178+
179+
clearBuffer(): void {
180+
if (!this.started) return;
181+
182+
this.room.localParticipant!.performRpc({
183+
destinationIdentity: this.destinationIdentity,
184+
method: RPC_CLEAR_BUFFER,
185+
payload: '',
186+
});
187+
}
188+
189+
private handlePlaybackFinished(data: RpcInvocationData): string {
190+
if (data.callerIdentity !== this.destinationIdentity) {
191+
this.#logger.warn(
192+
{
193+
callerIdentity: data.callerIdentity,
194+
destinationIdentity: this.destinationIdentity,
195+
},
196+
'playback finished event received from unexpected participant',
197+
);
198+
return 'reject';
199+
}
200+
201+
this.#logger.info(
202+
{
203+
callerIdentity: data.callerIdentity,
204+
},
205+
'playback finished event received',
206+
);
207+
208+
const playbackFinishedEvent = JSON.parse(data.payload) as PlaybackFinishedEvent;
209+
this.onPlaybackFinished(playbackFinishedEvent);
210+
return 'ok';
211+
}
212+
213+
static registerPlaybackFinishedRpc({
214+
room,
215+
callerIdentity,
216+
handler,
217+
}: {
218+
room: Room;
219+
callerIdentity: string;
220+
handler: (data: RpcInvocationData) => string;
221+
}) {
222+
DataStreamAudioOutput._playbackFinishedHandlers[callerIdentity] = handler;
223+
224+
if (DataStreamAudioOutput._playbackFinishedRpcRegistered) {
225+
return;
226+
}
227+
228+
const rpcHandler = async (data: RpcInvocationData): Promise<string> => {
229+
const handler = DataStreamAudioOutput._playbackFinishedHandlers[data.callerIdentity];
230+
if (!handler) {
231+
log().warn(
232+
{
233+
callerIdentity: data.callerIdentity,
234+
expectedIdentities: Object.keys(DataStreamAudioOutput._playbackFinishedHandlers),
235+
},
236+
'playback finished event received from unexpected participant',
237+
);
238+
239+
return 'reject';
240+
}
241+
return handler(data);
242+
};
243+
244+
room.localParticipant?.registerRpcMethod(RPC_PLAYBACK_FINISHED, rpcHandler);
245+
DataStreamAudioOutput._playbackFinishedRpcRegistered = true;
246+
}
247+
}

agents/src/voice/avatar/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
export * from './datastream_io.js';

agents/src/voice/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,7 @@
33
// SPDX-License-Identifier: Apache-2.0
44
export { Agent, StopResponse, type AgentOptions, type ModelSettings } from './agent.js';
55
export { AgentSession, type AgentSessionOptions } from './agent_session.js';
6+
7+
export * from './avatar/index.js';
68
export * from './events.js';
79
export { RunContext } from './run_context.js';

0 commit comments

Comments
 (0)