@@ -707,3 +707,59 @@ async def aforward(self, question, **kwargs):
707707 # There should be ~1 second delay between the tool start and end messages because we explicitly sleep for 1 second
708708 # in the tool.
709709 assert timestamps [1 ] - timestamps [0 ] >= 1
710+
711+
712+ @pytest .mark .anyio
713+ async def test_stream_listener_allow_reuse ():
714+ class MyProgram (dspy .Module ):
715+ def __init__ (self ):
716+ super ().__init__ ()
717+ self .predict = dspy .Predict ("question->answer" )
718+
719+ def forward (self , question , ** kwargs ):
720+ self .predict (question = question , ** kwargs )
721+ return self .predict (question = question , ** kwargs )
722+
723+ program = dspy .streamify (
724+ MyProgram (),
725+ stream_listeners = [
726+ dspy .streaming .StreamListener (signature_field_name = "answer" , allow_reuse = True ),
727+ ],
728+ )
729+
730+ async def gpt_4o_mini_stream (* args , ** kwargs ):
731+ # Recorded streaming from openai/gpt-4o-mini
732+ yield ModelResponseStream (model = "gpt-4o-mini" , choices = [StreamingChoices (delta = Delta (content = "[[" ))])
733+ yield ModelResponseStream (model = "gpt-4o-mini" , choices = [StreamingChoices (delta = Delta (content = " ##" ))])
734+ yield ModelResponseStream (model = "gpt-4o-mini" , choices = [StreamingChoices (delta = Delta (content = " answer" ))])
735+ yield ModelResponseStream (model = "gpt-4o-mini" , choices = [StreamingChoices (delta = Delta (content = " ##" ))])
736+ yield ModelResponseStream (model = "gpt-4o-mini" , choices = [StreamingChoices (delta = Delta (content = " ]]\n \n " ))])
737+ yield ModelResponseStream (model = "gpt-4o-mini" , choices = [StreamingChoices (delta = Delta (content = "To" ))])
738+ yield ModelResponseStream (model = "gpt-4o-mini" , choices = [StreamingChoices (delta = Delta (content = " get" ))])
739+ yield ModelResponseStream (model = "gpt-4o-mini" , choices = [StreamingChoices (delta = Delta (content = " to" ))])
740+ yield ModelResponseStream (model = "gpt-4o-mini" , choices = [StreamingChoices (delta = Delta (content = " the" ))])
741+ yield ModelResponseStream (model = "gpt-4o-mini" , choices = [StreamingChoices (delta = Delta (content = " other" ))])
742+ yield ModelResponseStream (model = "gpt-4o-mini" , choices = [StreamingChoices (delta = Delta (content = " side" ))])
743+ yield ModelResponseStream (model = "gpt-4o-mini" , choices = [StreamingChoices (delta = Delta (content = "!" ))])
744+ yield ModelResponseStream (model = "gpt-4o-mini" , choices = [StreamingChoices (delta = Delta (content = "\n \n " ))])
745+ yield ModelResponseStream (model = "gpt-4o-mini" , choices = [StreamingChoices (delta = Delta (content = "[[ ##" ))])
746+ yield ModelResponseStream (model = "gpt-4o-mini" , choices = [StreamingChoices (delta = Delta (content = " completed" ))])
747+ yield ModelResponseStream (model = "gpt-4o-mini" , choices = [StreamingChoices (delta = Delta (content = " ##" ))])
748+ yield ModelResponseStream (model = "gpt-4o-mini" , choices = [StreamingChoices (delta = Delta (content = " ]]" ))])
749+
750+ stream_generators = [gpt_4o_mini_stream , gpt_4o_mini_stream ]
751+
752+ async def completion_side_effect (* args , ** kwargs ):
753+ return stream_generators .pop (0 )() # return new async generator instance
754+
755+ with mock .patch ("litellm.acompletion" , side_effect = completion_side_effect ):
756+ with dspy .context (lm = dspy .LM ("openai/gpt-4o-mini" , cache = False )):
757+ output = program (question = "why did a chicken cross the kitchen?" )
758+ all_chunks = []
759+ async for value in output :
760+ if isinstance (value , dspy .streaming .StreamResponse ):
761+ all_chunks .append (value )
762+
763+ concat_message = "" .join ([chunk .chunk for chunk in all_chunks ])
764+ # The listener functions twice.
765+ assert concat_message == "To get to the other side!To get to the other side!"
0 commit comments