11// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
22//
33// SPDX-License-Identifier: Apache-2.0
4- import type { AudioFrame } from '@livekit/rtc-node' ;
4+ import { type AudioFrame , FrameProcessor } from '@livekit/rtc-node' ;
55import {
66 AudioStream ,
77 type NoiseCancellationOptions ,
@@ -22,6 +22,7 @@ export class ParticipantAudioInputStream extends AudioInput {
2222 private sampleRate : number ;
2323 private numChannels : number ;
2424 private noiseCancellation ?: NoiseCancellationOptions ;
25+ private frameProcessor ?: FrameProcessor < AudioFrame > ;
2526 private publication : RemoteTrackPublication | null = null ;
2627 private participantIdentity : string | null = null ;
2728 private logger = log ( ) ;
@@ -34,16 +35,21 @@ export class ParticipantAudioInputStream extends AudioInput {
3435 room : Room ;
3536 sampleRate : number ;
3637 numChannels : number ;
37- noiseCancellation ?: NoiseCancellationOptions ;
38+ noiseCancellation ?: NoiseCancellationOptions | FrameProcessor < AudioFrame > ;
3839 } ) {
3940 super ( ) ;
4041 this . room = room ;
4142 this . sampleRate = sampleRate ;
4243 this . numChannels = numChannels ;
43- this . noiseCancellation = noiseCancellation ;
44+ if ( noiseCancellation instanceof FrameProcessor ) {
45+ this . frameProcessor = noiseCancellation ;
46+ } else {
47+ this . noiseCancellation = noiseCancellation ;
48+ }
4449
4550 this . room . on ( RoomEvent . TrackSubscribed , this . onTrackSubscribed ) ;
4651 this . room . on ( RoomEvent . TrackUnpublished , this . onTrackUnpublished ) ;
52+ this . room . on ( RoomEvent . TokenRefreshed , this . onTokenRefreshed ) ;
4753 }
4854
4955 setParticipant ( participant : RemoteParticipant | string | null ) {
@@ -116,6 +122,9 @@ export class ParticipantAudioInputStream extends AudioInput {
116122 if ( this . deferredStream . isSourceSet ) {
117123 this . deferredStream . detachSource ( ) ;
118124 }
125+
126+ this . frameProcessor ?. close ( ) ;
127+
119128 this . publication = null ;
120129 }
121130
@@ -140,21 +149,40 @@ export class ParticipantAudioInputStream extends AudioInput {
140149 outputRate : this . sampleRate ,
141150 } ) ,
142151 ) ;
152+ this . frameProcessor ?. onStreamInfoUpdated ( {
153+ participantIdentity : participant . identity ,
154+ roomName : this . room . name ! ,
155+ publicationSid : publication . sid ! ,
156+ } ) ;
157+ this . frameProcessor ?. onCredentialsUpdated ( {
158+ token : this . room . token ! ,
159+ url : this . room . serverUrl ! ,
160+ } ) ;
143161 return true ;
144162 } ;
145163
164+ private onTokenRefreshed = ( ) => {
165+ if ( this . room . token && this . room . serverUrl ) {
166+ this . frameProcessor ?. onCredentialsUpdated ( {
167+ token : this . room . token ,
168+ url : this . room . serverUrl ,
169+ } ) ;
170+ }
171+ } ;
172+
146173 private createStream ( track : RemoteTrack ) : ReadableStream < AudioFrame > {
147174 return new AudioStream ( track , {
148175 sampleRate : this . sampleRate ,
149176 numChannels : this . numChannels ,
150- noiseCancellation : this . noiseCancellation ,
177+ noiseCancellation : this . frameProcessor || this . noiseCancellation ,
151178 // TODO(AJS-269): resolve compatibility issue with node-sdk to remove the forced type casting
152179 } ) as unknown as ReadableStream < AudioFrame > ;
153180 }
154181
155182 async close ( ) {
156183 this . room . off ( RoomEvent . TrackSubscribed , this . onTrackSubscribed ) ;
157184 this . room . off ( RoomEvent . TrackUnpublished , this . onTrackUnpublished ) ;
185+ this . room . off ( RoomEvent . TokenRefreshed , this . onTokenRefreshed ) ;
158186 this . closeStream ( ) ;
159187 // Ignore errors - stream may be locked by RecorderIO or already cancelled
160188 await this . deferredStream . stream . cancel ( ) . catch ( ( ) => { } ) ;
0 commit comments