@@ -11,14 +11,14 @@ async def main() -> None:
1111
1212 # Load environment variables from a .env file if present
1313 load_dotenv (find_dotenv ())
14-
14+
1515 url = os .getenv ("LIVEKIT_URL" )
1616 token = os .getenv ("LIVEKIT_TOKEN" )
1717 if not url or not token :
1818 raise RuntimeError ("LIVEKIT_URL and LIVEKIT_TOKEN must be set in env" )
1919
2020 room = rtc .Room ()
21-
21+
2222 devices = rtc .MediaDevices ()
2323
2424 # Open microphone with AEC and prepare a player for remote audio feeding AEC reverse stream
@@ -32,7 +32,9 @@ async def main() -> None:
3232 streams_by_pub : dict [str , rtc .AudioStream ] = {}
3333 streams_by_participant : dict [str , set [rtc .AudioStream ]] = {}
3434
35- async def _remove_stream (stream : rtc .AudioStream , participant_sid : str | None = None , pub_sid : str | None = None ) -> None :
35+ async def _remove_stream (
36+ stream : rtc .AudioStream , participant_sid : str | None = None , pub_sid : str | None = None
37+ ) -> None :
3638 try :
3739 mixer .remove_stream (stream )
3840 except Exception :
@@ -48,25 +50,50 @@ async def _remove_stream(stream: rtc.AudioStream, participant_sid: str | None =
4850 if pub_sid is not None :
4951 streams_by_pub .pop (pub_sid , None )
5052
51- def on_track_subscribed (track : rtc .Track , publication : rtc .RemoteTrackPublication , participant : rtc .RemoteParticipant ):
53+ class _FrameOnlyStream :
54+ def __init__ (self , inner : rtc .AudioStream ) -> None :
55+ self ._inner = inner
56+
57+ def __aiter__ (self ):
58+ return self
59+
60+ async def __anext__ (self ) -> rtc .AudioFrame :
61+ event = await self ._inner .__anext__ ()
62+ return event .frame
63+
64+ async def aclose (self ) -> None :
65+ await self ._inner .aclose ()
66+
67+ def on_track_subscribed (
68+ track : rtc .Track ,
69+ publication : rtc .RemoteTrackPublication ,
70+ participant : rtc .RemoteParticipant ,
71+ ):
5272 if track .kind == rtc .TrackKind .KIND_AUDIO :
53- stream = rtc .AudioStream (track , sample_rate = 48000 , num_channels = 1 )
54- streams_by_pub [publication .sid ] = stream
55- streams_by_participant .setdefault (participant .sid , set ()).add (stream )
56- mixer .add_stream (stream )
73+ event_stream = rtc .AudioStream (track , sample_rate = 48000 , num_channels = 1 )
74+ frame_stream = _FrameOnlyStream (event_stream )
75+ streams_by_pub [publication .sid ] = frame_stream
76+ streams_by_participant .setdefault (participant .sid , set ()).add (frame_stream )
77+ mixer .add_stream (frame_stream )
5778 logging .info ("subscribed to audio from %s" , participant .identity )
5879
5980 room .on ("track_subscribed" , on_track_subscribed )
6081
61- def on_track_unsubscribed (track : rtc .Track , publication : rtc .RemoteTrackPublication , participant : rtc .RemoteParticipant ):
82+ def on_track_unsubscribed (
83+ track : rtc .Track ,
84+ publication : rtc .RemoteTrackPublication ,
85+ participant : rtc .RemoteParticipant ,
86+ ):
6287 stream = streams_by_pub .get (publication .sid )
6388 if stream is not None :
6489 asyncio .create_task (_remove_stream (stream , participant .sid , publication .sid ))
6590 logging .info ("unsubscribed from audio of %s" , participant .identity )
6691
6792 room .on ("track_unsubscribed" , on_track_unsubscribed )
6893
69- def on_track_unpublished (publication : rtc .RemoteTrackPublication , participant : rtc .RemoteParticipant ):
94+ def on_track_unpublished (
95+ publication : rtc .RemoteTrackPublication , participant : rtc .RemoteParticipant
96+ ):
7097 stream = streams_by_pub .get (publication .sid )
7198 if stream is not None :
7299 asyncio .create_task (_remove_stream (stream , participant .sid , publication .sid ))
@@ -119,5 +146,3 @@ def on_participant_disconnected(participant: rtc.RemoteParticipant):
119146
120147if __name__ == "__main__" :
121148 asyncio .run (main ())
122-
123-
0 commit comments