2121import aiofiles
2222from typing import List , Union , Callable , Dict , Awaitable , Optional , Mapping , cast
2323from abc import abstractmethod , ABC
24+ import logging
2425
2526
2627from ._ffi_client import FfiClient , FfiHandle
2728from ._proto import ffi_pb2 as proto_ffi
29+ from ._proto import room_pb2 as proto_room
2830from ._proto import participant_pb2 as proto_participant
2931from ._proto .room_pb2 import (
3032 TrackPublishOptions ,
5052 ByteStreamWriter ,
5153 ByteStreamInfo ,
5254 STREAM_CHUNK_SIZE ,
55+ TextStreamReader ,
56+ ByteStreamReader ,
57+ TextStreamHandler ,
58+ ByteStreamHandler ,
5359)
5460
5561
@@ -159,6 +165,10 @@ def __init__(
159165 self ._rpc_handlers : Dict [
160166 str , Callable [[RpcInvocationData ], Union [Awaitable [str ], str ]]
161167 ] = {}
168+ self ._text_stream_readers : Dict [str , TextStreamReader ] = {}
169+ self ._byte_stream_readers : Dict [str , ByteStreamReader ] = {}
170+ self ._text_stream_handlers : Dict [str , TextStreamHandler ] = {}
171+ self ._byte_stream_handlers : Dict [str , ByteStreamHandler ] = {}
162172
163173 @property
164174 def track_publications (self ) -> Mapping [str , LocalTrackPublication ]:
@@ -549,12 +559,65 @@ async def set_attributes(self, attributes: dict[str, str]) -> None:
549559 finally :
550560 FfiClient .instance .queue .unsubscribe (queue )
551561
562+ def _handle_stream_header (
563+ self , header : proto_room .DataStream .Header , participant_identity : str
564+ ):
565+ stream_type = header .WhichOneof ("content_header" )
566+ if stream_type == "text_header" :
567+ text_stream_handler = self ._text_stream_handlers .get (header .topic )
568+ if text_stream_handler is None :
569+ logging .info (
570+ "ignoring text stream with topic '%s', no callback attached" ,
571+ header .topic ,
572+ )
573+ return
574+
575+ text_reader = TextStreamReader (header )
576+ self ._text_stream_readers [header .stream_id ] = text_reader
577+ text_stream_handler (text_reader , participant_identity )
578+ elif stream_type == "byte_header" :
579+ logging .warning ("received byte header, %s" , header .stream_id )
580+ byte_stream_handler = self ._byte_stream_handlers .get (header .topic )
581+ if byte_stream_handler is None :
582+ logging .info (
583+ "ignoring byte stream with topic '%s', no callback attached" ,
584+ header .topic ,
585+ )
586+ return
587+
588+ byte_reader = ByteStreamReader (header )
589+ self ._byte_stream_readers [header .stream_id ] = byte_reader
590+ byte_stream_handler (byte_reader , participant_identity )
591+ else :
592+ logging .warning ("received unknown header type, %s" , stream_type )
593+ pass
594+
595+ async def _handle_stream_chunk (self , chunk : proto_room .DataStream .Chunk ):
596+ text_reader = self ._text_stream_readers .get (chunk .stream_id )
597+ file_reader = self ._byte_stream_readers .get (chunk .stream_id )
598+
599+ if text_reader :
600+ await text_reader ._on_chunk_update (chunk )
601+ elif file_reader :
602+ await file_reader ._on_chunk_update (chunk )
603+
604+ async def _handle_stream_trailer (self , trailer : proto_room .DataStream .Trailer ):
605+ text_reader = self ._text_stream_readers .get (trailer .stream_id )
606+ file_reader = self ._byte_stream_readers .get (trailer .stream_id )
607+
608+ if text_reader :
609+ await text_reader ._on_stream_close (trailer )
610+ self ._text_stream_readers .pop (trailer .stream_id )
611+ elif file_reader :
612+ await file_reader ._on_stream_close (trailer )
613+ self ._byte_stream_readers .pop (trailer .stream_id )
614+
552615 async def stream_text (
553616 self ,
554617 * ,
555618 destination_identities : Optional [List [str ]] = None ,
556619 topic : str = "" ,
557- extensions : Optional [Dict [str , str ]] = None ,
620+ attributes : Optional [Dict [str , str ]] = None ,
558621 reply_to_id : str | None = None ,
559622 total_size : int | None = None ,
560623 ) -> TextStreamWriter :
@@ -565,7 +628,7 @@ async def stream_text(
565628 writer = TextStreamWriter (
566629 self ,
567630 topic = topic ,
568- extensions = extensions ,
631+ attributes = attributes ,
569632 reply_to_id = reply_to_id ,
570633 destination_identities = destination_identities ,
571634 total_size = total_size ,
@@ -581,14 +644,14 @@ async def send_text(
581644 * ,
582645 destination_identities : Optional [List [str ]] = None ,
583646 topic : str = "" ,
584- extensions : Optional [Dict [str , str ]] = None ,
647+ attributes : Optional [Dict [str , str ]] = None ,
585648 reply_to_id : str | None = None ,
586649 ):
587650 total_size = len (text .encode ())
588651 writer = await self .stream_text (
589652 destination_identities = destination_identities ,
590653 topic = topic ,
591- extensions = extensions ,
654+ attributes = attributes ,
592655 reply_to_id = reply_to_id ,
593656 total_size = total_size ,
594657 )
@@ -605,7 +668,7 @@ async def stream_bytes(
605668 * ,
606669 total_size : int | None = None ,
607670 mime_type : str = "application/octet-stream" ,
608- extensions : Optional [Dict [str , str ]] = None ,
671+ attributes : Optional [Dict [str , str ]] = None ,
609672 stream_id : str | None = None ,
610673 destination_identities : Optional [List [str ]] = None ,
611674 topic : str = "" ,
@@ -617,7 +680,7 @@ async def stream_bytes(
617680 writer = ByteStreamWriter (
618681 self ,
619682 name = name ,
620- extensions = extensions ,
683+ attributes = attributes ,
621684 total_size = total_size ,
622685 stream_id = stream_id ,
623686 mime_type = mime_type ,
@@ -650,7 +713,7 @@ async def send_file(
650713 name = file_name ,
651714 total_size = file_size ,
652715 mime_type = mime_type ,
653- extensions = attributes ,
716+ attributes = attributes ,
654717 stream_id = stream_id ,
655718 destination_identities = destination_identities ,
656719 topic = topic ,
@@ -663,6 +726,28 @@ async def send_file(
663726
664727 return writer .info
665728
729+ def register_byte_stream_handler (self , topic : str , handler : ByteStreamHandler ):
730+ existing_handler = self ._byte_stream_handlers .get (topic )
731+ if existing_handler is None :
732+ self ._byte_stream_handlers [topic ] = handler
733+ else :
734+ raise ValueError ("byte stream handler for topic '%s' already set" % topic )
735+
736+ def unregister_byte_stream_handler (self , topic : str ):
737+ if self ._byte_stream_handlers .get (topic ):
738+ self ._byte_stream_handlers .pop (topic )
739+
740+ def register_text_stream_handler (self , topic : str , handler : TextStreamHandler ):
741+ existing_handler = self ._text_stream_handlers .get (topic )
742+ if existing_handler is None :
743+ self ._text_stream_handlers [topic ] = handler
744+ else :
745+ raise ValueError ("text stream handler for topic '%s' already set" % topic )
746+
747+ def unregister_text_stream_handler (self , topic : str ):
748+ if self ._text_stream_handlers .get (topic ):
749+ self ._text_stream_handlers .pop (topic )
750+
666751 async def publish_track (
667752 self , track : LocalTrack , options : TrackPublishOptions = TrackPublishOptions ()
668753 ) -> LocalTrackPublication :
0 commit comments