Skip to content

Commit 580ab7c

Browse files
committed
move handler registration back to room
1 parent fc0cced commit 580ab7c

File tree

2 files changed

+89
-91
lines changed

2 files changed

+89
-91
lines changed

livekit-rtc/livekit/rtc/participant.py

Lines changed: 0 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626

2727
from ._ffi_client import FfiClient, FfiHandle
2828
from ._proto import ffi_pb2 as proto_ffi
29-
from ._proto import room_pb2 as proto_room
3029
from ._proto import participant_pb2 as proto_participant
3130
from ._proto.room_pb2 import (
3231
TrackPublishOptions,
@@ -52,10 +51,6 @@
5251
ByteStreamWriter,
5352
ByteStreamInfo,
5453
STREAM_CHUNK_SIZE,
55-
TextStreamReader,
56-
ByteStreamReader,
57-
TextStreamHandler,
58-
ByteStreamHandler,
5954
)
6055

6156

@@ -165,10 +160,6 @@ def __init__(
165160
self._rpc_handlers: Dict[
166161
str, Callable[[RpcInvocationData], Union[Awaitable[str], str]]
167162
] = {}
168-
self._text_stream_readers: Dict[str, TextStreamReader] = {}
169-
self._byte_stream_readers: Dict[str, ByteStreamReader] = {}
170-
self._text_stream_handlers: Dict[str, TextStreamHandler] = {}
171-
self._byte_stream_handlers: Dict[str, ByteStreamHandler] = {}
172163

173164
@property
174165
def track_publications(self) -> Mapping[str, LocalTrackPublication]:
@@ -559,59 +550,6 @@ async def set_attributes(self, attributes: dict[str, str]) -> None:
559550
finally:
560551
FfiClient.instance.queue.unsubscribe(queue)
561552

562-
def _handle_stream_header(
563-
self, header: proto_room.DataStream.Header, participant_identity: str
564-
):
565-
stream_type = header.WhichOneof("content_header")
566-
if stream_type == "text_header":
567-
text_stream_handler = self._text_stream_handlers.get(header.topic)
568-
if text_stream_handler is None:
569-
logging.info(
570-
"ignoring text stream with topic '%s', no callback attached",
571-
header.topic,
572-
)
573-
return
574-
575-
text_reader = TextStreamReader(header)
576-
self._text_stream_readers[header.stream_id] = text_reader
577-
text_stream_handler(text_reader, participant_identity)
578-
elif stream_type == "byte_header":
579-
logging.warning("received byte header, %s", header.stream_id)
580-
byte_stream_handler = self._byte_stream_handlers.get(header.topic)
581-
if byte_stream_handler is None:
582-
logging.info(
583-
"ignoring byte stream with topic '%s', no callback attached",
584-
header.topic,
585-
)
586-
return
587-
588-
byte_reader = ByteStreamReader(header)
589-
self._byte_stream_readers[header.stream_id] = byte_reader
590-
byte_stream_handler(byte_reader, participant_identity)
591-
else:
592-
logging.warning("received unknown header type, %s", stream_type)
593-
pass
594-
595-
async def _handle_stream_chunk(self, chunk: proto_room.DataStream.Chunk):
596-
text_reader = self._text_stream_readers.get(chunk.stream_id)
597-
file_reader = self._byte_stream_readers.get(chunk.stream_id)
598-
599-
if text_reader:
600-
await text_reader._on_chunk_update(chunk)
601-
elif file_reader:
602-
await file_reader._on_chunk_update(chunk)
603-
604-
async def _handle_stream_trailer(self, trailer: proto_room.DataStream.Trailer):
605-
text_reader = self._text_stream_readers.get(trailer.stream_id)
606-
file_reader = self._byte_stream_readers.get(trailer.stream_id)
607-
608-
if text_reader:
609-
await text_reader._on_stream_close(trailer)
610-
self._text_stream_readers.pop(trailer.stream_id)
611-
elif file_reader:
612-
await file_reader._on_stream_close(trailer)
613-
self._byte_stream_readers.pop(trailer.stream_id)
614-
615553
async def stream_text(
616554
self,
617555
*,
@@ -726,28 +664,6 @@ async def send_file(
726664

727665
return writer.info
728666

729-
def register_byte_stream_handler(self, topic: str, handler: ByteStreamHandler):
730-
existing_handler = self._byte_stream_handlers.get(topic)
731-
if existing_handler is None:
732-
self._byte_stream_handlers[topic] = handler
733-
else:
734-
raise ValueError("byte stream handler for topic '%s' already set" % topic)
735-
736-
def unregister_byte_stream_handler(self, topic: str):
737-
if self._byte_stream_handlers.get(topic):
738-
self._byte_stream_handlers.pop(topic)
739-
740-
def register_text_stream_handler(self, topic: str, handler: TextStreamHandler):
741-
existing_handler = self._text_stream_handlers.get(topic)
742-
if existing_handler is None:
743-
self._text_stream_handlers[topic] = handler
744-
else:
745-
raise ValueError("text stream handler for topic '%s' already set" % topic)
746-
747-
def unregister_text_stream_handler(self, topic: str):
748-
if self._text_stream_handlers.get(topic):
749-
self._text_stream_handlers.pop(topic)
750-
751667
async def publish_track(
752668
self, track: LocalTrack, options: TrackPublishOptions = TrackPublishOptions()
753669
) -> LocalTrackPublication:

livekit-rtc/livekit/rtc/room.py

Lines changed: 89 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@
3333
from .track import RemoteAudioTrack, RemoteVideoTrack
3434
from .track_publication import RemoteTrackPublication, TrackPublication
3535
from .transcription import TranscriptionSegment
36+
from .data_stream import (
37+
TextStreamReader,
38+
ByteStreamReader,
39+
TextStreamHandler,
40+
ByteStreamHandler,
41+
)
3642

3743

3844
EventTypes = Literal[
@@ -140,6 +146,11 @@ def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
140146
self._first_sid_future = asyncio.Future[str]()
141147
self._local_participant: LocalParticipant | None = None
142148

149+
self._text_stream_readers: Dict[str, TextStreamReader] = {}
150+
self._byte_stream_readers: Dict[str, ByteStreamReader] = {}
151+
self._text_stream_handlers: Dict[str, TextStreamHandler] = {}
152+
self._byte_stream_handlers: Dict[str, ByteStreamHandler] = {}
153+
143154
def __del__(self) -> None:
144155
if self._ffi_handle is not None:
145156
FfiClient.instance.queue.unsubscribe(self._ffi_queue)
@@ -398,6 +409,28 @@ def on_participant_connected(participant):
398409
# start listening to room events
399410
self._task = self._loop.create_task(self._listen_task())
400411

412+
def register_byte_stream_handler(self, topic: str, handler: ByteStreamHandler):
413+
existing_handler = self._byte_stream_handlers.get(topic)
414+
if existing_handler is None:
415+
self._byte_stream_handlers[topic] = handler
416+
else:
417+
raise ValueError("byte stream handler for topic '%s' already set" % topic)
418+
419+
def unregister_byte_stream_handler(self, topic: str):
420+
if self._byte_stream_handlers.get(topic):
421+
self._byte_stream_handlers.pop(topic)
422+
423+
def register_text_stream_handler(self, topic: str, handler: TextStreamHandler):
424+
existing_handler = self._text_stream_handlers.get(topic)
425+
if existing_handler is None:
426+
self._text_stream_handlers[topic] = handler
427+
else:
428+
raise ValueError("text stream handler for topic '%s' already set" % topic)
429+
430+
def unregister_text_stream_handler(self, topic: str):
431+
if self._text_stream_handlers.get(topic):
432+
self._text_stream_handlers.pop(topic)
433+
401434
async def disconnect(self) -> None:
402435
"""Disconnects from the room."""
403436
if not self.isconnected():
@@ -714,28 +747,77 @@ def _on_room_event(self, event: proto_room.RoomEvent):
714747
elif which == "reconnected":
715748
self.emit("reconnected")
716749
elif which == "stream_header_received":
717-
self.local_participant._handle_stream_header(
750+
self._handle_stream_header(
718751
event.stream_header_received.header,
719752
event.stream_header_received.participant_identity,
720753
)
721754
elif which == "stream_chunk_received":
722755
task = asyncio.create_task(
723-
self.local_participant._handle_stream_chunk(
724-
event.stream_chunk_received.chunk
725-
)
756+
self._handle_stream_chunk(event.stream_chunk_received.chunk)
726757
)
727758
self._data_stream_tasks.add(task)
728759
task.add_done_callback(self._data_stream_tasks.discard)
729760

730761
elif which == "stream_trailer_received":
731762
task = asyncio.create_task(
732-
self.local_participant._handle_stream_trailer(
733-
event.stream_trailer_received.trailer
734-
)
763+
self._handle_stream_trailer(event.stream_trailer_received.trailer)
735764
)
736765
self._data_stream_tasks.add(task)
737766
task.add_done_callback(self._data_stream_tasks.discard)
738767

768+
def _handle_stream_header(
769+
self, header: proto_room.DataStream.Header, participant_identity: str
770+
):
771+
stream_type = header.WhichOneof("content_header")
772+
if stream_type == "text_header":
773+
text_stream_handler = self._text_stream_handlers.get(header.topic)
774+
if text_stream_handler is None:
775+
logging.info(
776+
"ignoring text stream with topic '%s', no callback attached",
777+
header.topic,
778+
)
779+
return
780+
781+
text_reader = TextStreamReader(header)
782+
self._text_stream_readers[header.stream_id] = text_reader
783+
text_stream_handler(text_reader, participant_identity)
784+
elif stream_type == "byte_header":
785+
logging.warning("received byte header, %s", header.stream_id)
786+
byte_stream_handler = self._byte_stream_handlers.get(header.topic)
787+
if byte_stream_handler is None:
788+
logging.info(
789+
"ignoring byte stream with topic '%s', no callback attached",
790+
header.topic,
791+
)
792+
return
793+
794+
byte_reader = ByteStreamReader(header)
795+
self._byte_stream_readers[header.stream_id] = byte_reader
796+
byte_stream_handler(byte_reader, participant_identity)
797+
else:
798+
logging.warning("received unknown header type, %s", stream_type)
799+
pass
800+
801+
async def _handle_stream_chunk(self, chunk: proto_room.DataStream.Chunk):
802+
text_reader = self._text_stream_readers.get(chunk.stream_id)
803+
file_reader = self._byte_stream_readers.get(chunk.stream_id)
804+
805+
if text_reader:
806+
await text_reader._on_chunk_update(chunk)
807+
elif file_reader:
808+
await file_reader._on_chunk_update(chunk)
809+
810+
async def _handle_stream_trailer(self, trailer: proto_room.DataStream.Trailer):
811+
text_reader = self._text_stream_readers.get(trailer.stream_id)
812+
file_reader = self._byte_stream_readers.get(trailer.stream_id)
813+
814+
if text_reader:
815+
await text_reader._on_stream_close(trailer)
816+
self._text_stream_readers.pop(trailer.stream_id)
817+
elif file_reader:
818+
await file_reader._on_stream_close(trailer)
819+
self._byte_stream_readers.pop(trailer.stream_id)
820+
739821
async def _drain_rpc_invocation_tasks(self) -> None:
740822
if self._rpc_invocation_tasks:
741823
for task in self._rpc_invocation_tasks:

0 commit comments

Comments
 (0)