@@ -426,21 +426,27 @@ async def disconnect(self) -> None:
426426 await self ._task
427427 FfiClient .instance .queue .unsubscribe (self ._ffi_queue )
428428
429- def set_byte_stream_handler (
430- self , handler : ByteStreamHandler | None , topic : str = ""
431- ):
432- if handler is None :
429+ def set_byte_stream_handler (self , handler : ByteStreamHandler , topic : str = "" ):
430+ existing_handler = self ._byte_stream_handlers .get (topic )
431+ if existing_handler is None :
432+ self ._byte_stream_handlers [topic ] = handler
433+ else :
434+ raise TypeError ("byte stream handler for topic '%s' already set" % topic )
435+
436+ def remove_byte_stream_handler (self , topic : str = "" ):
437+ if self ._byte_stream_handlers .get (topic ):
433438 self ._byte_stream_handlers .pop (topic )
439+
440+ def set_text_stream_handler (self , handler : TextStreamHandler , topic : str = "" ):
441+ existing_handler = self ._text_stream_handlers .get (topic )
442+ if existing_handler is None :
443+ self ._text_stream_handlers [topic ] = handler
434444 else :
435- self . _byte_stream_handlers [ topic ] = handler
445+ raise TypeError ( "text stream handler for topic '%s' already set" % topic )
436446
437- def set_text_stream_handler (
438- self , handler : TextStreamHandler | None , topic : str = ""
439- ):
440- if handler is None :
447+ def remove_text_stream_handler (self , topic : str = "" ):
448+ if self ._text_stream_handlers .get (topic ):
441449 self ._text_stream_handlers .pop (topic )
442- else :
443- self ._text_stream_handlers [topic ] = handler
444450
445451 async def _listen_task (self ) -> None :
446452 # listen to incoming room events
0 commit comments