Skip to content

Commit bf9421b

Browse files
thomasnormalthomasahleclaudejxnl
authored
fix(streaming): handle Stream objects in reask handlers (#1992)
Co-authored-by: Thomas Dybdahl Ahle <thomas@ahle.dk> Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com> Co-authored-by: Jason Liu <jxnl@users.noreply.github.com>
1 parent c9563bc commit bf9421b

File tree

3 files changed

+264
-2
lines changed

3 files changed

+264
-2
lines changed

instructor/providers/anthropic/utils.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,19 @@ def reask_anthropic_tools(
148148
kwargs = kwargs.copy()
149149
from anthropic.types import Message
150150

151-
assert isinstance(response, Message), "Response must be a Anthropic Message"
151+
# Handle Stream objects which are not Message instances
152+
# This happens when streaming mode is used with retries
153+
if not isinstance(response, Message):
154+
kwargs["messages"].append(
155+
{
156+
"role": "user",
157+
"content": (
158+
f"Validation Error found:\n{exception}\n"
159+
"Recall the function correctly, fix the errors"
160+
),
161+
}
162+
)
163+
return kwargs
152164

153165
assistant_content = []
154166
tool_use_id = None
@@ -197,7 +209,19 @@ def reask_anthropic_json(
197209
kwargs = kwargs.copy()
198210
from anthropic.types import Message
199211

200-
assert isinstance(response, Message), "Response must be a Anthropic Message"
212+
# Handle Stream objects which are not Message instances
213+
# This happens when streaming mode is used with retries
214+
if not isinstance(response, Message):
215+
kwargs["messages"].append(
216+
{
217+
"role": "user",
218+
"content": (
219+
f"Validation Errors found:\n{exception}\n"
220+
"Recall the function correctly, fix the errors"
221+
),
222+
}
223+
)
224+
return kwargs
201225

202226
# Filter for text blocks to handle ThinkingBlock and other non-text content
203227
text_blocks = [c for c in response.content if c.type == "text"]

instructor/providers/openai/utils.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,15 @@
1919
from ...processing.schema import generate_openai_schema
2020

2121

22+
def _is_stream_response(response: Any) -> bool:
23+
"""Check if response is a Stream object rather than a ChatCompletion.
24+
25+
Stream objects don't have 'choices' attribute and can't be used
26+
for detailed reask messages that reference the response content.
27+
"""
28+
return response is None or not hasattr(response, "choices")
29+
30+
2231
def reask_tools(
2332
kwargs: dict[str, Any],
2433
response: Any,
@@ -32,6 +41,21 @@ def reask_tools(
3241
- Adds: "messages" (tool response messages indicating validation errors)
3342
"""
3443
kwargs = kwargs.copy()
44+
45+
# Handle Stream objects which don't have choices attribute
46+
# This happens when streaming mode is used with retries
47+
if _is_stream_response(response):
48+
kwargs["messages"].append(
49+
{
50+
"role": "user",
51+
"content": (
52+
f"Validation Error found:\n{exception}\n"
53+
"Recall the function correctly, fix the errors"
54+
),
55+
}
56+
)
57+
return kwargs
58+
3559
reask_msgs = [dump_message(response.choices[0].message)]
3660
for tool_call in response.choices[0].message.tool_calls:
3761
reask_msgs.append(
@@ -62,6 +86,19 @@ def reask_responses_tools(
6286
"""
6387
kwargs = kwargs.copy()
6488

89+
# Handle Stream objects which don't have output attribute
90+
if response is None or not hasattr(response, "output"):
91+
kwargs["messages"].append(
92+
{
93+
"role": "user",
94+
"content": (
95+
f"Validation Error found:\n{exception}\n"
96+
"Recall the function correctly, fix the errors"
97+
),
98+
}
99+
)
100+
return kwargs
101+
65102
reask_messages = []
66103
for tool_call in response.output:
67104
reask_messages.append(
@@ -90,6 +127,17 @@ def reask_md_json(
90127
- Adds: "messages" (user message requesting JSON correction)
91128
"""
92129
kwargs = kwargs.copy()
130+
131+
# Handle Stream objects which don't have choices attribute
132+
if _is_stream_response(response):
133+
kwargs["messages"].append(
134+
{
135+
"role": "user",
136+
"content": f"Correct your JSON ONLY RESPONSE, based on the following errors:\n{exception}",
137+
}
138+
)
139+
return kwargs
140+
93141
reask_msgs = [dump_message(response.choices[0].message)]
94142

95143
reask_msgs.append(
@@ -115,6 +163,19 @@ def reask_default(
115163
- Adds: "messages" (user message requesting function correction)
116164
"""
117165
kwargs = kwargs.copy()
166+
167+
# Handle Stream objects which don't have choices attribute
168+
if _is_stream_response(response):
169+
kwargs["messages"].append(
170+
{
171+
"role": "user",
172+
"content": (
173+
f"Recall the function correctly, fix the errors, exceptions found\n{exception}"
174+
),
175+
}
176+
)
177+
return kwargs
178+
118179
reask_msgs = [dump_message(response.choices[0].message)]
119180

120181
reask_msgs.append(

tests/test_streaming_reask_bug.py

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
"""Test for streaming reask bug fix.
2+
3+
Bug: When using streaming mode with max_retries > 1, if validation fails,
4+
the reask handlers crash with "'Stream' object has no attribute 'choices'"
5+
because they expect a ChatCompletion but receive a Stream object.
6+
7+
GitHub Issue: https://github.com/jxnl/instructor/issues/1991
8+
"""
9+
10+
import pytest
11+
from unittest.mock import MagicMock
12+
from pydantic import ValidationError, BaseModel, field_validator
13+
14+
from instructor.mode import Mode
15+
from instructor.processing.response import handle_reask_kwargs
16+
17+
18+
class MockStream:
19+
"""Mock Stream object that mimics openai.Stream behavior."""
20+
21+
def __iter__(self):
22+
return iter([])
23+
24+
def __next__(self):
25+
raise StopIteration
26+
27+
28+
def create_mock_validation_error():
29+
"""Create a real Pydantic ValidationError for testing."""
30+
31+
class TestModel(BaseModel):
32+
name: str
33+
34+
@field_validator("name")
35+
@classmethod
36+
def must_have_space(cls, v):
37+
if " " not in v:
38+
raise ValueError("must contain space")
39+
return v
40+
41+
try:
42+
TestModel(name="John")
43+
except ValidationError as e:
44+
return e
45+
46+
47+
class TestStreamingReaskBug:
48+
"""Tests for the streaming reask bug fix."""
49+
50+
def test_reask_tools_with_stream_object_does_not_crash(self):
51+
"""Test that reask_tools handles Stream objects without crashing.
52+
53+
Previously, this would crash with:
54+
"'Stream' object has no attribute 'choices'"
55+
"""
56+
mock_stream = MockStream()
57+
kwargs = {
58+
"messages": [{"role": "user", "content": "test"}],
59+
"tools": [{"type": "function", "function": {"name": "test"}}],
60+
}
61+
exception = create_mock_validation_error()
62+
63+
# This should not raise an AttributeError
64+
result = handle_reask_kwargs(
65+
kwargs=kwargs,
66+
mode=Mode.TOOLS,
67+
response=mock_stream,
68+
exception=exception,
69+
)
70+
71+
# Should return modified kwargs with error message
72+
assert "messages" in result
73+
assert len(result["messages"]) > 1 # Original + error message
74+
75+
def test_reask_anthropic_tools_with_stream_object(self):
76+
"""Test that Anthropic reask handler handles Stream objects."""
77+
mock_stream = MockStream()
78+
kwargs = {
79+
"messages": [{"role": "user", "content": "test"}],
80+
}
81+
exception = create_mock_validation_error()
82+
83+
result = handle_reask_kwargs(
84+
kwargs=kwargs,
85+
mode=Mode.ANTHROPIC_TOOLS,
86+
response=mock_stream,
87+
exception=exception,
88+
)
89+
90+
assert "messages" in result
91+
92+
def test_reask_with_none_response(self):
93+
"""Test that reask handlers handle None response gracefully."""
94+
kwargs = {
95+
"messages": [{"role": "user", "content": "test"}],
96+
}
97+
exception = create_mock_validation_error()
98+
99+
result = handle_reask_kwargs(
100+
kwargs=kwargs,
101+
mode=Mode.TOOLS,
102+
response=None,
103+
exception=exception,
104+
)
105+
106+
assert "messages" in result
107+
108+
def test_reask_md_json_with_stream_object(self):
109+
"""Test that MD_JSON reask handler handles Stream objects."""
110+
mock_stream = MockStream()
111+
kwargs = {
112+
"messages": [{"role": "user", "content": "test"}],
113+
}
114+
exception = create_mock_validation_error()
115+
116+
result = handle_reask_kwargs(
117+
kwargs=kwargs,
118+
mode=Mode.MD_JSON,
119+
response=mock_stream,
120+
exception=exception,
121+
)
122+
123+
assert "messages" in result
124+
125+
126+
@pytest.mark.skipif(
127+
not pytest.importorskip("openai", reason="openai not installed"),
128+
reason="openai not installed",
129+
)
130+
class TestStreamingReaskIntegration:
131+
"""Integration tests that require OpenAI API key."""
132+
133+
@pytest.fixture
134+
def client(self):
135+
"""Create instructor client if API key available."""
136+
import os
137+
138+
if not os.getenv("OPENAI_API_KEY"):
139+
pytest.skip("OPENAI_API_KEY not set")
140+
141+
import instructor
142+
from openai import OpenAI
143+
144+
return instructor.from_openai(OpenAI())
145+
146+
def test_streaming_with_retries_and_failing_validator(self, client):
147+
"""Test that streaming with retries doesn't crash on validation failure."""
148+
149+
class StrictUser(BaseModel):
150+
name: str
151+
age: int
152+
153+
@field_validator("name")
154+
@classmethod
155+
def name_must_have_space(cls, v: str) -> str:
156+
if v and " " not in v:
157+
raise ValueError("Name must have first and last name")
158+
return v
159+
160+
# This should not crash with AttributeError
161+
# It may raise InstructorRetryException after retries exhausted, which is expected
162+
from instructor.core.exceptions import InstructorRetryException
163+
164+
with pytest.raises(InstructorRetryException):
165+
list(
166+
client.chat.completions.create_partial(
167+
model="gpt-4o-mini",
168+
max_retries=2,
169+
messages=[
170+
{
171+
"role": "user",
172+
"content": "Extract: John is 25. Return name='John' (no last name).",
173+
}
174+
],
175+
response_model=StrictUser,
176+
)
177+
)

0 commit comments

Comments
 (0)