@@ -30,6 +30,7 @@ def __init__(
3030 room : rtc .Room ,
3131 * ,
3232 track_source : rtc .TrackSource .ValueType | list [rtc .TrackSource .ValueType ],
33+ processor : rtc .FrameProcessor [T ] | None = None ,
3334 ) -> None :
3435 self ._room = room
3536 self ._accepted_sources = (
@@ -49,6 +50,9 @@ def __init__(
4950
5051 self ._room .on ("track_subscribed" , self ._on_track_available )
5152 self ._room .on ("track_unpublished" , self ._on_track_unavailable )
53+ self ._room .on ("token_refreshed" , self ._on_token_refreshed )
54+
55+ self ._processor = processor
5256
5357 async def __anext__ (self ) -> T :
5458 return await self ._data_ch .__anext__ ()
@@ -122,6 +126,7 @@ async def aclose(self) -> None:
122126 await aio .cancel_and_wait (self ._forward_atask )
123127
124128 self ._room .off ("track_subscribed" , self ._on_track_available )
129+ self ._room .off ("token_refreshed" , self ._on_token_refreshed )
125130 self ._data_ch .close ()
126131
127132 @log_exceptions (logger = logger )
@@ -160,6 +165,8 @@ def _close_stream(self) -> None:
160165 self ._tasks .add (task )
161166 self ._stream = None
162167 self ._publication = None
168+ if self ._processor :
169+ self ._processor ._close ()
163170
164171 def _on_track_available (
165172 self ,
@@ -177,6 +184,16 @@ def _on_track_available(
177184 self ._close_stream ()
178185 self ._stream = self ._create_stream (track , participant )
179186 self ._publication = publication
187+ if self ._processor :
188+ self ._processor ._on_stream_info_updated (
189+ room_name = self ._room .name ,
190+ participant_identity = participant .identity ,
191+ publication_sid = publication .sid ,
192+ )
193+ if self ._room ._token is not None and self ._room ._server_url is not None :
194+ self ._processor ._on_credentials_updated (
195+ token = self ._room ._token , url = self ._room ._server_url
196+ )
180197 self ._forward_atask = asyncio .create_task (
181198 self ._forward_task (self ._forward_atask , self ._stream , publication , participant )
182199 )
@@ -201,6 +218,16 @@ def _on_track_unavailable(
201218 if self ._on_track_available (publication .track , publication , participant ):
202219 return
203220
221+ def _on_token_refreshed (self ) -> None :
222+ if (
223+ self ._processor is not None
224+ and self ._room ._token is not None
225+ and self ._room ._server_url is not None
226+ ):
227+ self ._processor ._on_credentials_updated (
228+ token = self ._room ._token , url = self ._room ._server_url
229+ )
230+
204231
205232class _ParticipantAudioInputStream (_ParticipantInputStream [rtc .AudioFrame ], AudioInput ):
206233 def __init__ (
@@ -209,12 +236,22 @@ def __init__(
209236 * ,
210237 sample_rate : int ,
211238 num_channels : int ,
212- noise_cancellation : rtc .NoiseCancellationOptions | NoiseCancellationSelector | None ,
239+ noise_cancellation : rtc .NoiseCancellationOptions
240+ | NoiseCancellationSelector
241+ | rtc .FrameProcessor [rtc .AudioFrame ]
242+ | None ,
213243 pre_connect_audio_handler : PreConnectAudioHandler | None ,
214244 frame_size_ms : int = 50 ,
215245 ) -> None :
246+ audio_processor : rtc .FrameProcessor [rtc .AudioFrame ] | None = None
247+ if isinstance (noise_cancellation , rtc .FrameProcessor ):
248+ audio_processor = noise_cancellation
249+
216250 _ParticipantInputStream .__init__ (
217- self , room = room , track_source = rtc .TrackSource .SOURCE_MICROPHONE
251+ self ,
252+ room = room ,
253+ track_source = rtc .TrackSource .SOURCE_MICROPHONE ,
254+ processor = audio_processor ,
218255 )
219256 AudioInput .__init__ (self , label = "RoomIO" )
220257 if frame_size_ms <= 0 :
@@ -265,7 +302,7 @@ async def _forward_task(
265302 try :
266303 duration : float = 0
267304 frames = await self ._pre_connect_audio_handler .wait_for_data (publication .track .sid )
268- for frame in self ._resample_frames (frames ):
305+ for frame in self ._resample_frames (self . _apply_audio_processor ( frames ) ):
269306 if self ._attached :
270307 await self ._data_ch .send (frame )
271308 duration += frame .duration
@@ -319,6 +356,20 @@ def _resample_frames(self, frames: Iterable[rtc.AudioFrame]) -> Iterable[rtc.Aud
319356 if resampler :
320357 yield from resampler .flush ()
321358
359+ def _apply_audio_processor (self , frames : Iterable [rtc .AudioFrame ]) -> Iterable [rtc .AudioFrame ]:
360+ for frame in frames :
361+ if self ._processor is not None :
362+ try :
363+ yield self ._processor ._process (frame )
364+ except Exception as e :
365+ logger .warning (
366+ "error pre-processing audio frame" ,
367+ exc_info = e ,
368+ )
369+ yield frame
370+ else :
371+ yield frame
372+
322373
323374class _ParticipantVideoInputStream (_ParticipantInputStream [rtc .VideoFrame ], VideoInput ):
324375 def __init__ (self , room : rtc .Room ) -> None :
0 commit comments