Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dspy/streaming/streaming_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ def _json_adapter_handle_stream_chunk(self, token: str, chunk_message: str) -> S
except ValueError:
pass

if token:
if token or self.stream_end:
return StreamResponse(
self.predict_name,
self.signature_field_name,
Expand All @@ -292,7 +292,7 @@ def _default_handle_stream_chunk(self, token: str, end_identifier: str) -> Strea
token = token + last_token if token else last_token
token = token.rstrip() # Remove the trailing \n\n

if token:
if token or self.stream_end:
return StreamResponse(
self.predict_name,
self.signature_field_name,
Expand Down
128 changes: 125 additions & 3 deletions tests/streaming/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,10 +613,11 @@ async def gemini_stream_2(*args, **kwargs):
assert all_chunks[0].predict_name == "predict1"
assert all_chunks[0].signature_field_name == "answer"
assert all_chunks[0].chunk == "To get to the other side."
assert all_chunks[1].is_last_chunk is True

assert all_chunks[1].predict_name == "predict2"
assert all_chunks[1].signature_field_name == "judgement"
assert all_chunks[1].chunk == (
assert all_chunks[2].predict_name == "predict2"
assert all_chunks[2].signature_field_name == "judgement"
assert all_chunks[2].chunk == (
"The answer provides the standard punchline for this classic joke format, adapted to the specific location "
"mentioned in the question. It is the expected and appropriate response."
)
Expand Down Expand Up @@ -1649,6 +1650,127 @@ async def reasoning_stream(*args, **kwargs):
assert final_prediction.reasoning.content == expected_reasoning


@pytest.mark.anyio
async def test_stream_listener_empty_last_chunk_chat_adapter():
"""Test that StreamListener emits an empty chunk marking field end.

This test covers the scenario where:
1. Tokens that cannot form the end identifier are immediately yielded
2. The last chunk received contains only the marker for the next field (or completion marker)
3. An empty chunk with is_last_chunk=True is emitted to properly mark field end
"""

predict = dspy.Predict("question->reasoning, answer")

async def mock_stream(*args, **kwargs):
yield ModelResponseStream(
model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="[[ ## reasoning ## ]]\n"))]
)
yield ModelResponseStream(
model="gpt-4o-mini",
choices=[StreamingChoices(delta=Delta(content="Let's think about this problem step by step. "))],
)
yield ModelResponseStream(
model="gpt-4o-mini",
choices=[StreamingChoices(delta=Delta(content="We need to consider the context of a kitchen. "))],
)
yield ModelResponseStream(
model="gpt-4o-mini",
choices=[
StreamingChoices(delta=Delta(content="The chicken likely wants to reach something on the other side. "))
],
)
yield ModelResponseStream(
model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="\n\n[[ ## answer ## ]]\n"))]
)
yield ModelResponseStream(
model="gpt-4o-mini",
choices=[StreamingChoices(delta=Delta(content="To get to the other side!"))],
)
yield ModelResponseStream(
model="gpt-4o-mini",
choices=[StreamingChoices(delta=Delta(content="\n\n[[ ## completed ## ]]"))],
)

with mock.patch("litellm.acompletion", side_effect=mock_stream):
program = dspy.streamify(
predict,
stream_listeners=[
dspy.streaming.StreamListener(signature_field_name="reasoning"),
dspy.streaming.StreamListener(signature_field_name="answer"),
],
)
with dspy.context(lm=dspy.LM("openai/gpt-4o-mini", cache=False), adapter=dspy.ChatAdapter()):
output = program(question="Why did the chicken cross the kitchen?")
all_chunks = []
async for value in output:
if isinstance(value, dspy.streaming.StreamResponse):
all_chunks.append(value)

# Find answer and judgement chunks
reasoning_chunks = [c for c in all_chunks if c.signature_field_name == "reasoning"]
answer_chunks = [c for c in all_chunks if c.signature_field_name == "answer"]

# The last chunk should be marked as last chunk for both fields.
assert answer_chunks[-1].is_last_chunk is True
assert reasoning_chunks[-1].is_last_chunk is True


@pytest.mark.anyio
async def test_stream_listener_empty_last_chunk_json_adapter():
predict = dspy.Predict("question->reasoning, answer")

async def mock_stream(*args, **kwargs):
yield ModelResponseStream(
model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content='{"reasoning": "'))]
)
yield ModelResponseStream(
model="gpt-4o-mini",
choices=[StreamingChoices(delta=Delta(content="Let's think about this problem step by step. "))],
)
yield ModelResponseStream(
model="gpt-4o-mini",
choices=[StreamingChoices(delta=Delta(content="We need to consider the context of a kitchen. "))],
)
yield ModelResponseStream(
model="gpt-4o-mini",
choices=[
StreamingChoices(
delta=Delta(content='The chicken likely wants to reach something on the other side. "')
)
],
)
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content=',"answer": "'))])
yield ModelResponseStream(
model="gpt-4o-mini",
choices=[StreamingChoices(delta=Delta(content='To get to the other side!"'))],
)
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="\n}"))])

with mock.patch("litellm.acompletion", side_effect=mock_stream):
program = dspy.streamify(
predict,
stream_listeners=[
dspy.streaming.StreamListener(signature_field_name="reasoning"),
dspy.streaming.StreamListener(signature_field_name="answer"),
],
)
with dspy.context(lm=dspy.LM("openai/gpt-4o-mini", cache=False), adapter=dspy.JSONAdapter()):
output = program(question="Why did the chicken cross the kitchen?")
all_chunks = []
async for value in output:
if isinstance(value, dspy.streaming.StreamResponse):
all_chunks.append(value)

# Find answer and judgement chunks
reasoning_chunks = [c for c in all_chunks if c.signature_field_name == "reasoning"]
answer_chunks = [c for c in all_chunks if c.signature_field_name == "answer"]

# The last chunk should be marked as last chunk for both fields.
assert answer_chunks[-1].is_last_chunk is True
assert reasoning_chunks[-1].is_last_chunk is True


@pytest.mark.anyio
async def test_streaming_reasoning_fallback():
"""Test fallback behavior for non-reasoning models using dspy.Reasoning.
Expand Down