11import asyncio
22import ctypes
33import weakref
4- from typing import TYPE_CHECKING
4+ from typing import TYPE_CHECKING , List , Optional , Union
5+ from weakref import ref
56
67from livekit import DataPacketKind , TrackPublishOptions
78
8- from ._ffi_client import FfiClient
9+ from ._ffi_client import ffi_client
910from ._proto import ffi_pb2 as proto_ffi
1011from ._proto import participant_pb2 as proto_participant
1112from ._proto import room_pb2 as proto_room
1213from .track import LocalAudioTrack , LocalVideoTrack , Track
13- from .track_publication import TrackPublication
14+ from .track_publication import (
15+ LocalTrackPublication ,
16+ RemoteTrackPublication ,
17+ TrackPublication ,
18+ )
1419
1520if TYPE_CHECKING :
1621 from livekit import Room
@@ -52,12 +57,12 @@ class LocalParticipant(Participant):
5257 def __init__ (self , info : proto_participant .ParticipantInfo , room : weakref .ref ['Room' ]):
5358 super ().__init__ (info )
5459 self ._room = room
60+ self .tracks : dict [str , LocalTrackPublication ] = {}
5561
5662 async def publish_data (self ,
57- # TODO(theomonnom): Allow ctypes.Array as payload?
58- payload : bytes or str ,
63+ payload : Union [bytes , str ],
5964 kind : DataPacketKind .ValueType = DataPacketKind .KIND_RELIABLE ,
60- destination_sids : list [ str ] or list ['RemoteParticipant' ] = [] ) -> None :
65+ destination_sids : Optional [ Union [ List [ str ], List ['RemoteParticipant' ]]] = None ) -> None :
6166
6267 room = self ._room ()
6368 if room is None :
@@ -70,21 +75,24 @@ async def publish_data(self,
7075
7176 cdata = (ctypes .c_byte * data_len )(* payload )
7277
73- sids = []
74- for p in destination_sids :
75- if isinstance (p , RemoteParticipant ):
76- sids .append (p .sid )
77- else :
78- sids .append (p )
78+
7979
8080 req = proto_ffi .FfiRequest ()
8181 req .publish_data .room_handle .id = room ._ffi_handle .handle
8282 req .publish_data .data_ptr = ctypes .addressof (cdata )
8383 req .publish_data .data_size = data_len
8484 req .publish_data .kind = kind
85- req .publish_data .destination_sids .extend (sids )
8685
87- ffi_client = FfiClient ()
86+ if destination_sids is not None :
87+ sids = []
88+ for p in destination_sids :
89+ if isinstance (p , RemoteParticipant ):
90+ sids .append (p .sid )
91+ else :
92+ sids .append (p )
93+
94+ req .publish_data .destination_sids .extend (sids )
95+
8896 resp = ffi_client .request (req )
8997 future : asyncio .Future [proto_room .PublishDataCallback ] = asyncio .Future (
9098 )
@@ -113,8 +121,6 @@ async def publish_track(self, track: Track, options: TrackPublishOptions) -> Tra
113121 req .publish_track .room_handle .id = room ._ffi_handle .handle
114122 req .publish_track .options .CopyFrom (options )
115123
116- ffi_client = FfiClient ()
117-
118124 resp = ffi_client .request (req )
119125
120126 future : asyncio .Future [proto_room .PublishTrackCallback ] = asyncio .Future (
@@ -132,7 +138,7 @@ def on_publish_callback(cb: proto_room.PublishTrackCallback):
132138 if cb .error :
133139 raise PublishTrackError (cb .error )
134140
135- track_publication = TrackPublication (cb .publication )
141+ track_publication = LocalTrackPublication (cb .publication , ref ( self ) )
136142 track_publication .track = track
137143 self .tracks [track_publication .sid ] = track_publication
138144 # TODO: Update track info
@@ -142,3 +148,4 @@ def on_publish_callback(cb: proto_room.PublishTrackCallback):
142148class RemoteParticipant (Participant ):
143149 def __init__ (self , info : proto_participant .ParticipantInfo ):
144150 super ().__init__ (info )
151+ self .tracks : dict [str , RemoteTrackPublication ] = {}
0 commit comments