Skip to content

Commit d2135f8

Browse files
committed
video_stream
1 parent 3a7fe96 commit d2135f8

File tree

9 files changed

+414
-34
lines changed

9 files changed

+414
-34
lines changed

examples/basic_room/room.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,18 @@ async def main():
1111

1212
@room.on("participant_connected")
1313
def on_participant_connected(participant: livekit.RemoteParticipant):
14-
print("Participant connected: " + participant.identity)
14+
print("participant connected: " + participant.identity)
15+
16+
@room.on("track_subscribed")
17+
def on_track_subscribed(track: livekit.Track, publication: livekit.RemoteTrackPublication, participant: livekit.RemoteParticipant):
18+
if track.kind == livekit.TrackKind.KIND_VIDEO:
19+
video_stream = livekit.VideoStream(track)
20+
21+
@video_stream.on("frame_received")
22+
def on_video_frame(frame: livekit.VideoFrame, buffer: livekit.VideoFrameBuffer):
23+
argb = livekit.ArgbFrame(
24+
livekit.VideoFormatType.FORMAT_ABGR, buffer.width, buffer.height)
25+
buffer.to_argb(argb)
1526

1627
print("Connected to room with sid: " + room.sid)
1728
await room.run()

livekit/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,17 @@
33

44
__version__ = "0.0.1"
55

6-
from _proto.track_pb2 import (TrackKind, TrackSource, StreamState)
6+
from ._proto.video_frame_pb2 import (
7+
VideoRotation, VideoFormatType, VideoFrameBufferType)
8+
from ._proto.track_pb2 import (TrackKind, TrackSource, StreamState)
79

810
from .room import Room
911
from .participant import (Participant, LocalParticipant, RemoteParticipant)
1012
from .track import (Track, LocalAudioTrack, LocalVideoTrack,
1113
RemoteAudioTrack, RemoteVideoTrack)
1214
from .track_publication import (
1315
TrackPublication, LocalTrackPublication, RemoteTrackPublication)
16+
17+
from .video_frame import (ArgbFrame, VideoFrame, VideoFrameBuffer, NativeVideoFrameBuffer, PlanarYuvBuffer,
18+
PlanarYuv8Buffer, PlanarYuv16Buffer, I420Buffer, I420ABuffer, I422Buffer, I010Buffer, NV12Buffer)
19+
from .video_stream import VideoStream

livekit/_ffi_client.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,30 @@
66
import pkg_resources
77
import asyncio
88
import threading
9-
import logging
9+
import logging
1010

1111
if sys.platform == "win32":
1212
libfile = 'livekit_ffi.dll'
1313
elif sys.platform == "darwin":
1414
libfile = 'liblivekit_ffi.dylib'
1515
else:
16-
libfile = 'liblivekit_ffi.so'
16+
libfile = 'liblivekit_ffi.so'
1717

1818
libpath = pkg_resources.resource_filename('livekit', libfile)
1919

2020
ffi_lib = CDLL(libpath)
2121

2222
# C function types
23-
ffi_lib.livekit_ffi_request.argtypes = [POINTER(c_ubyte), c_size_t, POINTER(POINTER(c_ubyte)), POINTER(c_size_t)]
23+
ffi_lib.livekit_ffi_request.argtypes = [
24+
POINTER(c_ubyte), c_size_t, POINTER(POINTER(c_ubyte)), POINTER(c_size_t)]
2425
ffi_lib.livekit_ffi_request.restype = c_size_t
2526

2627
ffi_lib.livekit_ffi_drop_handle.argtypes = [c_size_t]
2728
ffi_lib.livekit_ffi_drop_handle.restype = c_bool
2829

2930
INVALID_HANDLE = 0
3031

32+
3133
@CFUNCTYPE(c_void_p, POINTER(c_uint8), c_size_t)
3234
def ffi_event_callback(data_ptr: POINTER(c_uint8), data_len: c_size_t):
3335
event_data = bytes(data_ptr[:data_len])
@@ -40,46 +42,51 @@ def ffi_event_callback(data_ptr: POINTER(c_uint8), data_len: c_size_t):
4042

4143
loop.call_soon_threadsafe(dispatch_event, event)
4244

45+
4346
def dispatch_event(event: proto_ffi.FfiEvent):
4447
ffi_client = FfiClient()
4548
which = event.WhichOneof('message')
46-
if which == 'connect':
47-
ffi_client.emit('connect', event.connect)
48-
elif which == 'room_event':
49-
ffi_client.emit('room', event.room_event)
49+
ffi_client.emit(which, getattr(event, which))
50+
5051

5152
class Singleton(type):
5253
_instances = {}
54+
5355
def __call__(cls, *args, **kwargs):
5456
if cls not in cls._instances:
55-
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
57+
cls._instances[cls] = super(
58+
Singleton, cls).__call__(*args, **kwargs)
5659
return cls._instances[cls]
5760

61+
5862
class FfiClient(EventEmitter, metaclass=Singleton):
5963
def __init__(self):
6064
super().__init__()
6165
self._lock = threading.Lock()
6266
self._event_loop = None
6367

6468
req = proto_ffi.FfiRequest()
65-
req.initialize.event_callback_ptr = cast(ffi_event_callback, c_void_p).value
69+
req.initialize.event_callback_ptr = cast(
70+
ffi_event_callback, c_void_p).value
6671
self.request(req)
6772

6873
def set_event_loop(self, loop: asyncio.AbstractEventLoop):
6974
with self._lock:
7075
if self._event_loop is not None and self._event_loop != loop:
71-
logging.warning("FfiClient is now using a different asyncio event_loop")
76+
logging.warning(
77+
"FfiClient is now using a different asyncio event_loop")
7278

7379
self._event_loop = loop
7480

75-
def request(self, req: proto_ffi.FfiRequest) -> proto_ffi.FfiResponse:
81+
def request(self, req: proto_ffi.FfiRequest) -> proto_ffi.FfiResponse:
7682
data = req.SerializeToString()
7783
data_len = len(data)
7884
data = (c_ubyte * data_len)(*data)
7985

8086
resp_ptr = POINTER(c_ubyte)()
8187
resp_len = c_size_t()
82-
handle = ffi_lib.livekit_ffi_request(data, data_len, byref(resp_ptr), byref(resp_len))
88+
handle = ffi_lib.livekit_ffi_request(
89+
data, data_len, byref(resp_ptr), byref(resp_len))
8390

8491
resp_data = bytes(resp_ptr[:resp_len.value])
8592
resp = proto_ffi.FfiResponse()
@@ -88,6 +95,7 @@ def request(self, req: proto_ffi.FfiRequest) -> proto_ffi.FfiResponse:
8895
FfiHandle(handle)
8996
return resp
9097

98+
9199
class FfiHandle:
92100
handle = INVALID_HANDLE
93101

@@ -96,4 +104,4 @@ def __init__(self, handle: int):
96104

97105
def __del__(self):
98106
if self.handle != INVALID_HANDLE:
99-
assert(ffi_lib.livekit_ffi_drop_handle(c_size_t(self.handle)))
107+
assert (ffi_lib.livekit_ffi_drop_handle(c_size_t(self.handle)))

livekit/participant.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44

55
class Participant():
6-
76
def __init__(self, info: proto_participant.ParticipantInfo):
87
self._info = info
98
self._tracks: dict[str, TrackPublication] = {}

livekit/room.py

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
1-
from ctypes import *
21
import asyncio
32
from pyee.asyncio import AsyncIOEventEmitter
43
from ._ffi_client import (FfiClient, FfiHandle)
5-
from .participant import (Participant, LocalParticipant, RemoteParticipant)
6-
from .track_publication import (RemoteTrackPublication, LocalTrackPublication)
74
from ._proto import ffi_pb2 as proto_ffi
85
from ._proto import room_pb2 as proto_room
96
from ._proto import participant_pb2 as proto_participant
7+
from .participant import (Participant, LocalParticipant, RemoteParticipant)
8+
from .track_publication import (RemoteTrackPublication, LocalTrackPublication)
9+
from .track import (RemoteAudioTrack, RemoteVideoTrack)
10+
from livekit import TrackKind
11+
import weakref
1012

1113

1214
class ConnectError(Exception):
@@ -21,6 +23,10 @@ def __init__(self):
2123
self._room_info: proto_room.RoomInfo = None
2224
self._participants: dict[str, RemoteParticipant] = {}
2325

26+
def __del__(self):
27+
ffi_client = FfiClient()
28+
ffi_client.remove_listener('room_event', self._on_room_event)
29+
2430
async def connect(self, url: str, token: str):
2531
# TODO(theomonnom): We should be more flexible about the event loop
2632
ffi_client = FfiClient()
@@ -37,6 +43,10 @@ async def connect(self, url: str, token: str):
3743

3844
def on_connect_callback(cb: proto_room.ConnectCallback):
3945
if cb.async_id == async_id:
46+
# add existing participants
47+
for participant_info in cb.room.participants:
48+
self._create_remote_participant(participant_info)
49+
4050
future.set_result(cb)
4151
ffi_client.remove_listener('connect', on_connect_callback)
4252

@@ -49,7 +59,7 @@ def on_connect_callback(cb: proto_room.ConnectCallback):
4959
self._ffi_handle = FfiHandle(resp.room.handle.id)
5060
self._room_info = resp.room
5161
self._close_future = asyncio.Future()
52-
ffi_client.add_listener('room', self._on_room_event)
62+
ffi_client.add_listener('room_event', self._on_room_event)
5363

5464
async def close(self):
5565
self._ffi_handle = None
@@ -74,15 +84,47 @@ def _on_room_event(self, event: proto_room.RoomEvent):
7484
publication = RemoteTrackPublication(
7585
event.track_published.publication)
7686
participant._tracks[publication.sid] = publication
77-
self.emit('track_published', publication)
87+
self.emit('track_published', publication, participant)
7888
elif which == 'track_unpublished':
7989
participant = self._participants[event.track_unpublished.participant_sid]
90+
publication = participant._tracks.pop(
91+
event.track_unpublished.publication_sid)
92+
self.emit('track_unpublished', publication, participant)
93+
elif which == 'track_subscribed':
94+
track_info = event.track_subscribed.track
95+
participant = self._participants[event.track_subscribed.participant_sid]
96+
publication = participant._tracks[track_info.sid]
97+
98+
if track_info.kind == TrackKind.KIND_VIDEO:
99+
video_track = RemoteVideoTrack(
100+
track_info, weakref.ref(self), weakref.ref(participant))
101+
publication._track = video_track
102+
self.emit('track_subscribed', video_track,
103+
publication, participant)
104+
elif track_info.kind == TrackKind.KIND_AUDIO:
105+
audio_track = RemoteAudioTrack(
106+
track_info, weakref.ref(self), weakref.ref(participant))
107+
publication._track = audio_track
108+
self.emit('track_subscribed', audio_track,
109+
publication, participant)
110+
elif which == 'track_unsubscribed':
111+
participant = self._participants[event.track_unsubscribed.participant_sid]
112+
publication = participant._tracks[event.track_unsubscribed.track_sid]
113+
track = publication._track
114+
publication._track = None
115+
self.emit('track_unsubscribed', track, publication, participant)
80116

81117
def _create_remote_participant(self, info: proto_participant.ParticipantInfo) -> RemoteParticipant:
118+
if info.sid in self._participants:
119+
raise Exception('participant already exists')
120+
82121
participant = RemoteParticipant(info)
83122
self._participants[participant.sid] = participant
84123

85-
# TODO(publications)
124+
# add existing track publications
125+
for publication_info in info.publications:
126+
publication = RemoteTrackPublication(publication_info)
127+
participant._tracks[publication.sid] = publication
86128

87129
return participant
88130

livekit/track.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,24 @@
11
from ._proto import track_pb2 as proto_track
2-
from typing import Optional
32
from ._ffi_client import FfiHandle
3+
from typing import (Optional, TYPE_CHECKING)
4+
from weakref import ref
5+
6+
if TYPE_CHECKING:
7+
from livekit import (Room, Participant)
48

59

610
class Track():
7-
def __init__(self, info: proto_track.TrackInfo, handle: Optional[FfiHandle]):
11+
def __init__(self, handle: Optional[FfiHandle], info: proto_track.TrackInfo, room: ref['Room'], participant: ref['Participant']):
812
self._info = info
913
self._ffi_handle = handle
1014

15+
# TODO(theomonnom): Simplify that and use a FfiHandleId?
16+
# the weak references are needed because when we need to communicate with the
17+
# ffi_server which track we are referring to, we also need to provide the room
18+
# and the participant.
19+
self._room = room
20+
self._participant = participant
21+
1122
@property
1223
def sid(self) -> str:
1324
return self._info.sid
@@ -33,20 +44,20 @@ def update_info(self, info: proto_track.TrackInfo):
3344

3445

3546
class LocalAudioTrack(Track):
36-
def __init__(self, info: proto_track.TrackInfo, handle: FfiHandle):
37-
super().__init__(info, handle)
47+
def __init__(self, ffi_handle: FfiHandle, info: proto_track.TrackInfo, room: ref['Room'], participant: ref['Participant']):
48+
super().__init__(ffi_handle, info, room, participant)
3849

3950

4051
class LocalVideoTrack(Track):
41-
def __init__(self, info: proto_track.TrackInfo, handle: FfiHandle):
42-
super().__init__(info, handle)
52+
def __init__(self, ffi_handle: FfiHandle, info: proto_track.TrackInfo, room: ref['Room'], participant: ref['Participant']):
53+
super().__init__(ffi_handle, info, room, participant)
4354

4455

4556
class RemoteAudioTrack(Track):
46-
def __init__(self, info: proto_track.TrackInfo):
47-
super().__init__(info, None)
57+
def __init__(self, info: proto_track.TrackInfo, room: ref['Room'], participant: ref['Participant']):
58+
super().__init__(None, info, room, participant)
4859

4960

5061
class RemoteVideoTrack(Track):
51-
def __init__(self, info: proto_track.TrackInfo):
52-
super().__init__(info, None)
62+
def __init__(self, info: proto_track.TrackInfo, room: ref['Room'], participant: ref['Participant']):
63+
super().__init__(None, info, room, participant)

livekit/track_publication.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
from livekit._proto import track_pb2 as proto_track
22
from ._proto import track_pb2 as proto_track
3+
from .track import Track
34

45

56
class TrackPublication():
67
def __init__(self, info: proto_track.TrackPublicationInfo):
78
self._info = info
9+
self._track: Track = None
810

911
@property
1012
def sid(self) -> str:
@@ -23,8 +25,8 @@ def source(self) -> proto_track.TrackSource:
2325
return self._info.source
2426

2527
@property
26-
def simulcast(self) -> bool:
27-
return self._info.simulcast
28+
def simulcasted(self) -> bool:
29+
return self._info.simulcasted
2830

2931
@property
3032
def width(self) -> int:

0 commit comments

Comments
 (0)