Skip to content

Commit 1b6f3a2

Browse files
committed
stream handlers and address comments
1 parent b68d21a commit 1b6f3a2

File tree

4 files changed

+65
-23
lines changed

4 files changed

+65
-23
lines changed

examples/data_streams.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,19 @@ async def main(room: rtc.Room):
1515

1616
async def greetParticipant(identity: str):
1717
text_writer = await room.local_participant.stream_text(
18-
destination_identities=[identity]
18+
destination_identities=[identity], topic="chat"
1919
)
2020
for char in "Hi! Just a friendly message":
2121
await text_writer.write(char)
22-
await text_writer.close()
22+
await text_writer.aclose()
2323

24-
async def on_text_received(reader: rtc.TextStreamReader):
24+
async def on_chat_message_received(
25+
reader: rtc.TextStreamReader, participant_identity: str
26+
):
2527
full_text = await reader.read_all()
26-
logger.info(full_text)
28+
logger.info(
29+
"Received chat message from %s: '%s'", participant_identity, full_text
30+
)
2731

2832
@room.on("participant_connected")
2933
def on_participant_connected(participant: rtc.RemoteParticipant):
@@ -32,26 +36,22 @@ def on_participant_connected(participant: rtc.RemoteParticipant):
3236
)
3337
asyncio.create_task(greetParticipant(participant.identity))
3438

35-
# track_subscribed is emitted whenever the local participant is subscribed to a new track
36-
@room.on("text_stream_received")
37-
def on_text_stream_received(
38-
reader: rtc.TextStreamReader,
39-
participant_identity: str,
40-
):
41-
logger.info("text stream received from: %s", participant_identity)
42-
asyncio.create_task(on_text_received(reader=reader))
39+
room.set_text_stream_handler(
40+
lambda reader, participant_identity: asyncio.create_task(
41+
on_chat_message_received(reader, participant_identity)
42+
),
43+
"chat",
44+
)
4345

4446
# By default, autosubscribe is enabled. The participant will be subscribed to
4547
# all published tracks in the room
4648
await room.connect(URL, TOKEN)
4749
logger.info("connected to room %s", room.name)
4850

4951
for identity, participant in room.remote_participants.items():
50-
print("Sending a welcome message to %s", participant.identity)
52+
logger.info("Sending a welcome message to %s", identity)
5153
await greetParticipant(participant.identity)
5254

53-
logger.info("exiting")
54-
5555

5656
if __name__ == "__main__":
5757
logging.basicConfig(

livekit-rtc/livekit/rtc/data_stream.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import asyncio
1818
import uuid
1919
import datetime
20+
from collections.abc import Callable
2021
from dataclasses import dataclass
2122
from typing import AsyncIterator, Optional, TypedDict, Dict, List
2223
from ._proto.room_pb2 import DataStream as proto_DataStream
@@ -241,7 +242,7 @@ async def _send_trailer(self, trailer: proto_DataStream.Trailer):
241242
if cb.send_stream_chunk.error:
242243
raise ConnectionError(cb.send_stream_trailer.error)
243244

244-
async def close(self):
245+
async def aclose(self):
245246
await self._send_trailer(
246247
trailer=proto_DataStream.Trailer(
247248
stream_id=self._header.stream_id, reason=""
@@ -355,3 +356,7 @@ async def write(self, data: bytes):
355356
@property
356357
def info(self) -> ByteStreamInfo:
357358
return self._info
359+
360+
361+
TextStreamHandler = Callable[[TextStreamReader, str], None]
362+
ByteStreamHandler = Callable[[ByteStreamReader, str], None]

livekit-rtc/livekit/rtc/participant.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -595,7 +595,7 @@ async def send_text(
595595

596596
for chunk in split_utf8(text, STREAM_CHUNK_SIZE):
597597
await writer.write(chunk)
598-
await writer.close()
598+
await writer.aclose()
599599

600600
return writer.info
601601

@@ -658,7 +658,7 @@ async def send_file(
658658
async with aiofiles.open(file_path, "rb") as f:
659659
while bytes := await f.read(STREAM_CHUNK_SIZE):
660660
await writer.write(bytes)
661-
await writer.close()
661+
await writer.aclose()
662662

663663
return writer.info
664664

livekit-rtc/livekit/rtc/room.py

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,12 @@
3333
from .track import RemoteAudioTrack, RemoteVideoTrack
3434
from .track_publication import RemoteTrackPublication, TrackPublication
3535
from .transcription import TranscriptionSegment
36-
from .data_stream import TextStreamReader, ByteStreamReader
36+
from .data_stream import (
37+
TextStreamReader,
38+
ByteStreamReader,
39+
TextStreamHandler,
40+
ByteStreamHandler,
41+
)
3742

3843
EventTypes = Literal[
3944
"participant_connected",
@@ -63,8 +68,6 @@
6368
"disconnected",
6469
"reconnecting",
6570
"reconnected",
66-
"text_stream_received",
67-
"file_stream_received",
6871
]
6972

7073

@@ -142,6 +145,8 @@ def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
142145
self._local_participant: LocalParticipant | None = None
143146
self._text_stream_readers: Dict[str, TextStreamReader] = {}
144147
self._byte_stream_readers: Dict[str, ByteStreamReader] = {}
148+
self._text_stream_handlers: Dict[str, TextStreamHandler] = {}
149+
self._byte_stream_handlers: Dict[str, ByteStreamHandler] = {}
145150

146151
def __del__(self) -> None:
147152
if self._ffi_handle is not None:
@@ -421,6 +426,22 @@ async def disconnect(self) -> None:
421426
await self._task
422427
FfiClient.instance.queue.unsubscribe(self._ffi_queue)
423428

429+
def set_byte_stream_handler(
430+
self, handler: ByteStreamHandler | None, topic: str = ""
431+
):
432+
if handler is None:
433+
self._byte_stream_handlers.pop(topic)
434+
else:
435+
self._byte_stream_handlers[topic] = handler
436+
437+
def set_text_stream_handler(
438+
self, handler: TextStreamHandler | None, topic: str = ""
439+
):
440+
if handler is None:
441+
self._text_stream_handlers.pop(topic)
442+
else:
443+
self._text_stream_handlers[topic] = handler
444+
424445
async def _listen_task(self) -> None:
425446
# listen to incoming room events
426447
while True:
@@ -757,13 +778,29 @@ def _handle_stream_header(
757778
self, header: proto_room.DataStream.Header, participant_identity: str
758779
):
759780
if header.text_header:
781+
stream_handler = self._text_stream_handlers.get(header.topic)
782+
if stream_handler is None:
783+
logging.debug(
784+
"ignoring text stream with topic '%s', no callback attached",
785+
header.topic,
786+
)
787+
return
788+
760789
reader = TextStreamReader(header)
761790
self._text_stream_readers[header.stream_id] = reader
762-
self.emit("text_stream_received", reader, participant_identity)
791+
stream_handler(reader, participant_identity)
763792
elif header.byte_header:
793+
stream_handler = self._byte_stream_handlers.get(header.topic)
794+
if stream_handler is None:
795+
logging.debug(
796+
"ignoring byte stream with topic '%s', no callback attached",
797+
header.topic,
798+
)
799+
return
800+
764801
reader = ByteStreamReader(header)
765802
self._byte_stream_readers[header.stream_id] = reader
766-
self.emit("file_stream_received", reader, participant_identity)
803+
stream_handler(reader, participant_identity)
767804
pass
768805

769806
def _handle_stream_chunk(self, chunk: proto_room.DataStream.Chunk):

0 commit comments

Comments
 (0)