Skip to content

Commit 585a6c1

Browse files
Instantiate FfiClient Lazily (#125)
1 parent 5d2bff6 commit 585a6c1

File tree

13 files changed

+86
-73
lines changed

13 files changed

+86
-73
lines changed

livekit-rtc/livekit/rtc/_ffi_client.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import pkg_resources
2424

2525
from ._proto import ffi_pb2 as proto_ffi
26-
from ._utils import Queue
26+
from ._utils import Queue, classproperty
2727

2828
logger = logging.getLogger("livekit")
2929

@@ -142,7 +142,7 @@ def ffi_event_callback(
142142

143143
return # no need to queue the logs
144144

145-
ffi_client.queue.put(event)
145+
FfiClient.instance.queue.put(event)
146146

147147

148148
def to_python_level(level: proto_ffi.LogLevel.ValueType) -> int:
@@ -161,6 +161,14 @@ def to_python_level(level: proto_ffi.LogLevel.ValueType) -> int:
161161

162162

163163
class FfiClient:
164+
_instance: Optional["FfiClient"] = None
165+
166+
@classproperty
167+
def instance(self):
168+
if self._instance is None:
169+
self._instance = FfiClient()
170+
return self._instance
171+
164172
def __init__(self) -> None:
165173
self._lock = threading.RLock()
166174
self._queue = FfiQueue[proto_ffi.FfiEvent]()
@@ -188,6 +196,3 @@ def request(self, req: proto_ffi.FfiRequest) -> proto_ffi.FfiResponse:
188196

189197
FfiHandle(handle)
190198
return resp
191-
192-
193-
ffi_client = FfiClient()

livekit-rtc/livekit/rtc/_utils.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,14 @@
77
logger = logging.getLogger("livekit")
88

99

10+
class classproperty(object):
11+
def __init__(self, f):
12+
self.f = classmethod(f)
13+
14+
def __get__(self, *a):
15+
return self.f.__get__(*a)()
16+
17+
1018
def task_done_logger(task: asyncio.Task) -> None:
1119
if task.cancelled():
1220
logger.info("task cancelled: %s", task)

livekit-rtc/livekit/rtc/audio_frame.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414

1515
import ctypes
16-
from ._ffi_client import FfiHandle, ffi_client
16+
from ._ffi_client import FfiHandle, FfiClient
1717
from ._proto import audio_frame_pb2 as proto_audio
1818
from ._proto import ffi_pb2 as proto_ffi
1919
from ._utils import get_address
@@ -65,7 +65,7 @@ def remix_and_resample(self, sample_rate: int, num_channels: int) -> "AudioFrame
6565
req = proto_ffi.FfiRequest()
6666
req.new_audio_resampler.CopyFrom(proto_audio.NewAudioResamplerRequest())
6767

68-
resp = ffi_client.request(req)
68+
resp = FfiClient.instance.request(req)
6969
resampler_handle = FfiHandle(resp.new_audio_resampler.resampler.handle.id)
7070

7171
resample_req = proto_ffi.FfiRequest()
@@ -74,7 +74,7 @@ def remix_and_resample(self, sample_rate: int, num_channels: int) -> "AudioFrame
7474
resample_req.remix_and_resample.sample_rate = sample_rate
7575
resample_req.remix_and_resample.num_channels = num_channels
7676

77-
resp = ffi_client.request(resample_req)
77+
resp = FfiClient.instance.request(resample_req)
7878
return AudioFrame._from_owned_info(resp.remix_and_resample.buffer)
7979

8080
def _proto_info(self) -> proto_audio.AudioFrameBufferInfo:

livekit-rtc/livekit/rtc/audio_source.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from ._ffi_client import FfiHandle, ffi_client
15+
from ._ffi_client import FfiHandle, FfiClient
1616
from ._proto import audio_frame_pb2 as proto_audio_frame
1717
from ._proto import ffi_pb2 as proto_ffi
1818
from .audio_frame import AudioFrame
@@ -27,7 +27,7 @@ def __init__(self, sample_rate: int, num_channels: int) -> None:
2727
req.new_audio_source.sample_rate = sample_rate
2828
req.new_audio_source.num_channels = num_channels
2929

30-
resp = ffi_client.request(req)
30+
resp = FfiClient.instance.request(req)
3131
self._info = resp.new_audio_source.source
3232
self._ffi_handle = FfiHandle(self._info.handle.id)
3333

@@ -37,15 +37,15 @@ async def capture_frame(self, frame: AudioFrame) -> None:
3737
req.capture_audio_frame.source_handle = self._ffi_handle.handle
3838
req.capture_audio_frame.buffer.CopyFrom(frame._proto_info())
3939

40-
queue = ffi_client.queue.subscribe()
40+
queue = FfiClient.instance.queue.subscribe()
4141
try:
42-
resp = ffi_client.request(req)
42+
resp = FfiClient.instance.request(req)
4343
cb = await queue.wait_for(
4444
lambda e: e.capture_audio_frame.async_id
4545
== resp.capture_audio_frame.async_id
4646
)
4747
finally:
48-
ffi_client.queue.unsubscribe(queue)
48+
FfiClient.instance.queue.unsubscribe(queue)
4949

5050
if cb.capture_audio_frame.error:
5151
raise Exception(cb.capture_audio_frame.error)

livekit-rtc/livekit/rtc/audio_stream.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import asyncio
1616
from typing import Optional
1717

18-
from ._ffi_client import FfiHandle, ffi_client
18+
from ._ffi_client import FfiHandle, FfiClient
1919
from ._proto import audio_frame_pb2 as proto_audio_frame
2020
from ._proto import ffi_pb2 as proto_ffi
2121
from ._utils import RingQueue, task_done_logger
@@ -32,14 +32,14 @@ def __init__(
3232
) -> None:
3333
self._track = track
3434
self._loop = loop or asyncio.get_event_loop()
35-
self._ffi_queue = ffi_client.queue.subscribe(self._loop)
35+
self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop)
3636
self._queue: RingQueue[AudioFrame] = RingQueue(capacity)
3737

3838
req = proto_ffi.FfiRequest()
3939
new_audio_stream = req.new_audio_stream
4040
new_audio_stream.track_handle = track._ffi_handle.handle
4141
new_audio_stream.type = proto_audio_frame.AudioStreamType.AUDIO_STREAM_NATIVE
42-
resp = ffi_client.request(req)
42+
resp = FfiClient.instance.request(req)
4343

4444
stream_info = resp.new_audio_stream.stream
4545
self._ffi_handle = FfiHandle(stream_info.handle.id)
@@ -49,7 +49,7 @@ def __init__(
4949
self._task.add_done_callback(task_done_logger)
5050

5151
def __del__(self) -> None:
52-
ffi_client.queue.unsubscribe(self._ffi_queue)
52+
FfiClient.instance.queue.unsubscribe(self._ffi_queue)
5353

5454
async def _run(self):
5555
while True:
@@ -63,7 +63,7 @@ async def _run(self):
6363
elif audio_event.HasField("eos"):
6464
break
6565

66-
ffi_client.queue.unsubscribe(self._ffi_queue)
66+
FfiClient.instance.queue.unsubscribe(self._ffi_queue)
6767

6868
async def aclose(self):
6969
self._ffi_handle.dispose()

livekit-rtc/livekit/rtc/e2ee.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from dataclasses import dataclass, field
1616
from typing import List, Optional
1717

18-
from ._ffi_client import ffi_client
18+
from ._ffi_client import FfiClient
1919
from ._proto import e2ee_pb2 as proto_e2ee
2020
from ._proto import ffi_pb2 as proto_ffi
2121

@@ -52,13 +52,13 @@ def set_shared_key(self, key: bytes, key_index: int) -> None:
5252
req.e2ee.room_handle = self._room_handle
5353
req.e2ee.set_shared_key.key_index = key_index
5454
req.e2ee.set_shared_key.shared_key = key
55-
ffi_client.request(req)
55+
FfiClient.instance.request(req)
5656

5757
def export_shared_key(self, key_index: int) -> bytes:
5858
req = proto_ffi.FfiRequest()
5959
req.e2ee.room_handle = self._room_handle
6060
req.e2ee.get_shared_key.key_index = key_index
61-
resp = ffi_client.request(req)
61+
resp = FfiClient.instance.request(req)
6262
key = resp.e2ee.get_shared_key.key
6363
return key
6464

@@ -67,7 +67,7 @@ def ratchet_shared_key(self, key_index: int) -> bytes:
6767
req.e2ee.room_handle = self._room_handle
6868
req.e2ee.ratchet_shared_key.key_index = key_index
6969

70-
resp = ffi_client.request(req)
70+
resp = FfiClient.instance.request(req)
7171

7272
new_key = resp.e2ee.ratchet_shared_key.new_key
7373
return new_key
@@ -80,14 +80,14 @@ def set_key(self, participant_identity: str, key: bytes, key_index: int) -> None
8080
req.e2ee.set_key.key = key
8181

8282
self.key_index = key_index
83-
ffi_client.request(req)
83+
FfiClient.instance.request(req)
8484

8585
def export_key(self, participant_identity: str, key_index: int) -> bytes:
8686
req = proto_ffi.FfiRequest()
8787
req.e2ee.room_handle = self._room_handle
8888
req.e2ee.get_key.participant_identity = participant_identity
8989
req.e2ee.get_key.key_index = key_index
90-
resp = ffi_client.request(req)
90+
resp = FfiClient.instance.request(req)
9191
key = resp.e2ee.get_key.key
9292
return key
9393

@@ -97,7 +97,7 @@ def ratchet_key(self, participant_identity: str, key_index: int) -> bytes:
9797
req.e2ee.ratchet_key.participant_identity = participant_identity
9898
req.e2ee.ratchet_key.key_index = key_index
9999

100-
resp = ffi_client.request(req)
100+
resp = FfiClient.instance.request(req)
101101
new_key = resp.e2ee.ratchet_key.new_key
102102
return new_key
103103

@@ -129,15 +129,15 @@ def set_enabled(self, enabled: bool) -> None:
129129
req.e2ee.room_handle = self._room_handle
130130
req.e2ee.cryptor_set_enabled.participant_identity = self._participant_identity
131131
req.e2ee.cryptor_set_enabled.enabled = enabled
132-
ffi_client.request(req)
132+
FfiClient.instance.request(req)
133133

134134
def set_key_index(self, key_index: int) -> None:
135135
self._key_index = key_index
136136
req = proto_ffi.FfiRequest()
137137
req.e2ee.room_handle = self._room_handle
138138
req.e2ee.cryptor_set_key_index.participant_identity = self._participant_identity
139139
req.e2ee.cryptor_set_key_index.key_index = key_index
140-
ffi_client.request(req)
140+
FfiClient.instance.request(req)
141141

142142

143143
class E2EEManager:
@@ -164,13 +164,13 @@ def set_enabled(self, enabled: bool) -> None:
164164
req = proto_ffi.FfiRequest()
165165
req.e2ee.room_handle = self._room_handle
166166
req.e2ee.manager_set_enabled.enabled = enabled
167-
ffi_client.request(req)
167+
FfiClient.instance.request(req)
168168

169169
def frame_cryptors(self) -> List[FrameCryptor]:
170170
req = proto_ffi.FfiRequest()
171171
req.e2ee.room_handle = self._room_handle
172172

173-
resp = ffi_client.request(req)
173+
resp = FfiClient.instance.request(req)
174174
frame_cryptors = []
175175
for frame_cryptor in resp.e2ee.manager_get_frame_cryptors.frame_cryptors:
176176
frame_cryptors.append(

livekit-rtc/livekit/rtc/participant.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import ctypes
1616
from typing import List, Union
1717

18-
from ._ffi_client import FfiHandle, ffi_client
18+
from ._ffi_client import FfiHandle, FfiClient
1919
from ._proto import ffi_pb2 as proto_ffi
2020
from ._proto import participant_pb2 as proto_participant
2121
from ._proto.room_pb2 import DataPacketKind, TrackPublishOptions
@@ -105,14 +105,14 @@ async def publish_data(
105105

106106
req.publish_data.destination_sids.extend(sids)
107107

108-
queue = ffi_client.queue.subscribe()
108+
queue = FfiClient.instance.queue.subscribe()
109109
try:
110-
resp = ffi_client.request(req)
110+
resp = FfiClient.instance.request(req)
111111
cb = await queue.wait_for(
112112
lambda e: e.publish_data.async_id == resp.publish_data.async_id
113113
)
114114
finally:
115-
ffi_client.queue.unsubscribe(queue)
115+
FfiClient.instance.queue.unsubscribe(queue)
116116

117117
if cb.publish_data.error:
118118
raise PublishDataError(cb.publish_data.error)
@@ -122,30 +122,30 @@ async def update_metadata(self, metadata: str) -> None:
122122
req.update_local_metadata.local_participant_handle = self._ffi_handle.handle
123123
req.update_local_metadata.metadata = metadata
124124

125-
queue = ffi_client.queue.subscribe()
125+
queue = FfiClient.instance.queue.subscribe()
126126
try:
127-
resp = ffi_client.request(req)
127+
resp = FfiClient.instance.request(req)
128128
await queue.wait_for(
129129
lambda e: e.update_local_metadata.async_id
130130
== resp.update_local_metadata.async_id
131131
)
132132
finally:
133-
ffi_client.queue.unsubscribe(queue)
133+
FfiClient.instance.queue.unsubscribe(queue)
134134

135135
async def update_name(self, name: str) -> None:
136136
req = proto_ffi.FfiRequest()
137137
req.update_local_name.local_participant_handle = self._ffi_handle.handle
138138
req.update_local_name.name = name
139139

140-
queue = ffi_client.queue.subscribe()
140+
queue = FfiClient.instance.queue.subscribe()
141141
try:
142-
resp = ffi_client.request(req)
142+
resp = FfiClient.instance.request(req)
143143
await queue.wait_for(
144144
lambda e: e.update_local_name.async_id
145145
== resp.update_local_name.async_id
146146
)
147147
finally:
148-
ffi_client.queue.unsubscribe(queue)
148+
FfiClient.instance.queue.unsubscribe(queue)
149149

150150
async def publish_track(
151151
self, track: LocalTrack, options: TrackPublishOptions
@@ -157,7 +157,7 @@ async def publish_track(
157157

158158
queue = self._room_queue.subscribe()
159159
try:
160-
resp = ffi_client.request(req)
160+
resp = FfiClient.instance.request(req)
161161
cb = await queue.wait_for(
162162
lambda e: e.publish_track.async_id == resp.publish_track.async_id
163163
)
@@ -181,7 +181,7 @@ async def unpublish_track(self, track_sid: str) -> None:
181181

182182
queue = self._room_queue.subscribe()
183183
try:
184-
resp = ffi_client.request(req)
184+
resp = FfiClient.instance.request(req)
185185
cb = await queue.wait_for(
186186
lambda e: e.unpublish_track.async_id == resp.unpublish_track.async_id
187187
)

0 commit comments

Comments
 (0)