File tree Expand file tree Collapse file tree 1 file changed +10
-2
lines changed
Expand file tree Collapse file tree 1 file changed +10
-2
lines changed Original file line number Diff line number Diff line change @@ -753,7 +753,11 @@ def _handle_stream_header(
753753
754754 text_reader = TextStreamReader (header )
755755 self ._text_stream_readers [header .stream_id ] = text_reader
756- text_stream_handler (text_reader , participant_identity )
756+ task = asyncio .create_task (
757+ text_stream_handler (text_reader , participant_identity , self )
758+ )
759+ self ._data_stream_tasks .add (task )
760+ task .add_done_callback (self ._data_stream_tasks .discard )
757761 elif stream_type == "byte_header" :
758762 byte_stream_handler = self ._byte_stream_handlers .get (header .topic )
759763 if byte_stream_handler is None :
@@ -765,7 +769,11 @@ def _handle_stream_header(
765769
766770 byte_reader = ByteStreamReader (header )
767771 self ._byte_stream_readers [header .stream_id ] = byte_reader
768- byte_stream_handler (byte_reader , participant_identity )
772+ task = asyncio .create_task (
773+ byte_stream_handler (byte_reader , participant_identity )
774+ )
775+ self ._data_stream_tasks .add (task )
776+ task .add_done_callback (self ._data_stream_tasks .discard )
769777 else :
770778 logging .warning ("received unknown header type, %s" , stream_type )
771779 pass
You can’t perform that action at this time.
0 commit comments