diff --git a/dspy/streaming/streaming_listener.py b/dspy/streaming/streaming_listener.py index 1ea93c2fd8..5c1cc144df 100644 --- a/dspy/streaming/streaming_listener.py +++ b/dspy/streaming/streaming_listener.py @@ -274,7 +274,7 @@ def _json_adapter_handle_stream_chunk(self, token: str, chunk_message: str) -> S except ValueError: pass - if token: + if token or self.stream_end: return StreamResponse( self.predict_name, self.signature_field_name, @@ -292,7 +292,7 @@ def _default_handle_stream_chunk(self, token: str, end_identifier: str) -> Strea token = token + last_token if token else last_token token = token.rstrip() # Remove the trailing \n\n - if token: + if token or self.stream_end: return StreamResponse( self.predict_name, self.signature_field_name, diff --git a/tests/streaming/test_streaming.py b/tests/streaming/test_streaming.py index ae5ea57843..c4efaff7f4 100644 --- a/tests/streaming/test_streaming.py +++ b/tests/streaming/test_streaming.py @@ -613,10 +613,11 @@ async def gemini_stream_2(*args, **kwargs): assert all_chunks[0].predict_name == "predict1" assert all_chunks[0].signature_field_name == "answer" assert all_chunks[0].chunk == "To get to the other side." + assert all_chunks[1].is_last_chunk is True - assert all_chunks[1].predict_name == "predict2" - assert all_chunks[1].signature_field_name == "judgement" - assert all_chunks[1].chunk == ( + assert all_chunks[2].predict_name == "predict2" + assert all_chunks[2].signature_field_name == "judgement" + assert all_chunks[2].chunk == ( "The answer provides the standard punchline for this classic joke format, adapted to the specific location " "mentioned in the question. It is the expected and appropriate response." ) @@ -1649,6 +1650,127 @@ async def reasoning_stream(*args, **kwargs): assert final_prediction.reasoning.content == expected_reasoning +@pytest.mark.anyio +async def test_stream_listener_empty_last_chunk_chat_adapter(): + """Test that StreamListener emits an empty chunk marking field end. + + This test covers the scenario where: + 1. Tokens that cannot form the end identifier are immediately yielded + 2. The last chunk received contains only the marker for the next field (or completion marker) + 3. An empty chunk with is_last_chunk=True is emitted to properly mark field end + """ + + predict = dspy.Predict("question->reasoning, answer") + + async def mock_stream(*args, **kwargs): + yield ModelResponseStream( + model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="[[ ## reasoning ## ]]\n"))] + ) + yield ModelResponseStream( + model="gpt-4o-mini", + choices=[StreamingChoices(delta=Delta(content="Let's think about this problem step by step. "))], + ) + yield ModelResponseStream( + model="gpt-4o-mini", + choices=[StreamingChoices(delta=Delta(content="We need to consider the context of a kitchen. "))], + ) + yield ModelResponseStream( + model="gpt-4o-mini", + choices=[ + StreamingChoices(delta=Delta(content="The chicken likely wants to reach something on the other side. ")) + ], + ) + yield ModelResponseStream( + model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="\n\n[[ ## answer ## ]]\n"))] + ) + yield ModelResponseStream( + model="gpt-4o-mini", + choices=[StreamingChoices(delta=Delta(content="To get to the other side!"))], + ) + yield ModelResponseStream( + model="gpt-4o-mini", + choices=[StreamingChoices(delta=Delta(content="\n\n[[ ## completed ## ]]"))], + ) + + with mock.patch("litellm.acompletion", side_effect=mock_stream): + program = dspy.streamify( + predict, + stream_listeners=[ + dspy.streaming.StreamListener(signature_field_name="reasoning"), + dspy.streaming.StreamListener(signature_field_name="answer"), + ], + ) + with dspy.context(lm=dspy.LM("openai/gpt-4o-mini", cache=False), adapter=dspy.ChatAdapter()): + output = program(question="Why did the chicken cross the kitchen?") + all_chunks = [] + async for value in output: + if isinstance(value, dspy.streaming.StreamResponse): + all_chunks.append(value) + + # Find answer and judgement chunks + reasoning_chunks = [c for c in all_chunks if c.signature_field_name == "reasoning"] + answer_chunks = [c for c in all_chunks if c.signature_field_name == "answer"] + + # The last chunk should be marked as last chunk for both fields. + assert answer_chunks[-1].is_last_chunk is True + assert reasoning_chunks[-1].is_last_chunk is True + + +@pytest.mark.anyio +async def test_stream_listener_empty_last_chunk_json_adapter(): + predict = dspy.Predict("question->reasoning, answer") + + async def mock_stream(*args, **kwargs): + yield ModelResponseStream( + model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content='{"reasoning": "'))] + ) + yield ModelResponseStream( + model="gpt-4o-mini", + choices=[StreamingChoices(delta=Delta(content="Let's think about this problem step by step. "))], + ) + yield ModelResponseStream( + model="gpt-4o-mini", + choices=[StreamingChoices(delta=Delta(content="We need to consider the context of a kitchen. "))], + ) + yield ModelResponseStream( + model="gpt-4o-mini", + choices=[ + StreamingChoices( + delta=Delta(content='The chicken likely wants to reach something on the other side. "') + ) + ], + ) + yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content=',"answer": "'))]) + yield ModelResponseStream( + model="gpt-4o-mini", + choices=[StreamingChoices(delta=Delta(content='To get to the other side!"'))], + ) + yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="\n}"))]) + + with mock.patch("litellm.acompletion", side_effect=mock_stream): + program = dspy.streamify( + predict, + stream_listeners=[ + dspy.streaming.StreamListener(signature_field_name="reasoning"), + dspy.streaming.StreamListener(signature_field_name="answer"), + ], + ) + with dspy.context(lm=dspy.LM("openai/gpt-4o-mini", cache=False), adapter=dspy.JSONAdapter()): + output = program(question="Why did the chicken cross the kitchen?") + all_chunks = [] + async for value in output: + if isinstance(value, dspy.streaming.StreamResponse): + all_chunks.append(value) + + # Find answer and judgement chunks + reasoning_chunks = [c for c in all_chunks if c.signature_field_name == "reasoning"] + answer_chunks = [c for c in all_chunks if c.signature_field_name == "answer"] + + # The last chunk should be marked as last chunk for both fields. + assert answer_chunks[-1].is_last_chunk is True + assert reasoning_chunks[-1].is_last_chunk is True + + @pytest.mark.anyio async def test_streaming_reasoning_fallback(): """Test fallback behavior for non-reasoning models using dspy.Reasoning.