Skip to content

Commit 124af3f

Browse files
lukasIOgithub-actions[bot]davidzhao
authored
Add data stream support (#347)
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: David Zhao <[email protected]>
1 parent d9aac49 commit 124af3f

File tree

15 files changed

+1328
-173
lines changed

15 files changed

+1328
-173
lines changed

.gitattributes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
**/*.dll filter=lfs diff=lfs merge=lfs -text
22
**/*.so filter=lfs diff=lfs merge=lfs -text
33
**/*.dylib filter=lfs diff=lfs merge=lfs -text
4+
**/*.jpg filter=lfs diff=lfs merge=lfs -text
45
livekit-protocol/livekit/protocol/** linguist-generated=true
56
livekit-rtc/livekit/rtc/_proto/** linguist-generated=true
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import os
2+
import logging
3+
import asyncio
4+
from signal import SIGINT, SIGTERM
5+
from livekit import rtc
6+
7+
# Set the following environment variables with your own values
8+
TOKEN = os.environ.get("LIVEKIT_TOKEN")
9+
URL = os.environ.get("LIVEKIT_URL")
10+
11+
12+
async def main(room: rtc.Room):
13+
logging.basicConfig(level=logging.INFO)
14+
logger = logging.getLogger(__name__)
15+
16+
async def greetParticipant(identity: str):
17+
text_writer = await room.local_participant.stream_text(
18+
destination_identities=[identity], topic="chat"
19+
)
20+
for char in "Hi! Just a friendly message":
21+
await text_writer.write(char)
22+
await text_writer.aclose()
23+
24+
await room.local_participant.send_file(
25+
"./green_tree_python.jpg",
26+
destination_identities=[identity],
27+
topic="welcome",
28+
)
29+
30+
async def on_chat_message_received(
31+
reader: rtc.TextStreamReader, participant_identity: str
32+
):
33+
full_text = await reader.read_all()
34+
logger.info(
35+
"Received chat message from %s: '%s'", participant_identity, full_text
36+
)
37+
38+
async def on_welcome_image_received(
39+
reader: rtc.ByteStreamReader, participant_identity: str
40+
):
41+
logger.info(
42+
"Received image from %s: '%s'", participant_identity, reader.info["name"]
43+
)
44+
with open(reader.info["name"], mode="wb") as f:
45+
async for chunk in reader:
46+
f.write(chunk)
47+
48+
f.close()
49+
50+
@room.on("participant_connected")
51+
def on_participant_connected(participant: rtc.RemoteParticipant):
52+
logger.info(
53+
"participant connected: %s %s", participant.sid, participant.identity
54+
)
55+
asyncio.create_task(greetParticipant(participant.identity))
56+
57+
room.set_text_stream_handler(
58+
lambda reader, participant_identity: asyncio.create_task(
59+
on_chat_message_received(reader, participant_identity)
60+
),
61+
"chat",
62+
)
63+
64+
room.set_byte_stream_handler(
65+
lambda reader, participant_identity: asyncio.create_task(
66+
on_welcome_image_received(reader, participant_identity)
67+
),
68+
"welcome",
69+
)
70+
71+
# By default, autosubscribe is enabled. The participant will be subscribed to
72+
# all published tracks in the room
73+
await room.connect(URL, TOKEN)
74+
logger.info("connected to room %s", room.name)
75+
76+
for identity, participant in room.remote_participants.items():
77+
logger.info("Sending a welcome message to %s", identity)
78+
await greetParticipant(participant.identity)
79+
80+
81+
if __name__ == "__main__":
82+
logging.basicConfig(
83+
level=logging.INFO,
84+
handlers=[
85+
logging.FileHandler("data_stream_example.log"),
86+
logging.StreamHandler(),
87+
],
88+
)
89+
90+
loop = asyncio.get_event_loop()
91+
room = rtc.Room(loop=loop)
92+
93+
async def cleanup():
94+
await room.disconnect()
95+
loop.stop()
96+
97+
asyncio.ensure_future(main(room))
98+
for signal in [SIGINT, SIGTERM]:
99+
loop.add_signal_handler(signal, lambda: asyncio.ensure_future(cleanup()))
100+
101+
try:
102+
loop.run_forever()
103+
finally:
104+
loop.close()
Lines changed: 3 additions & 0 deletions
Loading

livekit-rtc/livekit/rtc/__init__.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,15 @@
7474
from .utils import combine_audio_frames
7575
from .rpc import RpcError, RpcInvocationData
7676
from .synchronizer import AVSynchronizer
77+
from .data_stream import (
78+
TextStreamInfo,
79+
TextStreamUpdate,
80+
ByteStreamInfo,
81+
TextStreamReader,
82+
TextStreamWriter,
83+
ByteStreamWriter,
84+
ByteStreamReader,
85+
)
7786

7887
__all__ = [
7988
"ConnectionQuality",
@@ -140,5 +149,12 @@
140149
"EventEmitter",
141150
"combine_audio_frames",
142151
"AVSynchronizer",
152+
"TextStreamUpdate",
153+
"TextStreamInfo",
154+
"ByteStreamInfo",
155+
"TextStreamReader",
156+
"TextStreamWriter",
157+
"ByteStreamReader",
158+
"ByteStreamWriter",
143159
"__version__",
144160
]

livekit-rtc/livekit/rtc/_proto/ffi_pb2.py

Lines changed: 20 additions & 20 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

livekit-rtc/livekit/rtc/_proto/ffi_pb2.pyi

Lines changed: 61 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

livekit-rtc/livekit/rtc/_proto/room_pb2.py

Lines changed: 132 additions & 96 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

livekit-rtc/livekit/rtc/_proto/room_pb2.pyi

Lines changed: 387 additions & 43 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

livekit-rtc/livekit/rtc/_proto/video_frame_pb2.pyi

Lines changed: 4 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

livekit-rtc/livekit/rtc/_utils.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,3 +130,15 @@ def generate_random_base62(length=12):
130130
"""
131131
global _base62_characters
132132
return "".join(random.choice(_base62_characters) for _ in range(length))
133+
134+
135+
# adapted from https://stackoverflow.com/a/6043797
136+
def split_utf8(s: str, n: int):
137+
"""Split UTF-8 s into chunks of maximum length n."""
138+
while len(s) > n:
139+
k = n
140+
while (ord(s[k]) & 0xC0) == 0x80:
141+
k -= 1
142+
yield s[:k]
143+
s = s[k:]
144+
yield s

0 commit comments

Comments
 (0)