Skip to content

Commit 2104d8b

Browse files
renfredxhms-jpq
authored andcommitted
fix: provide parsed outputs for resumed streams (#756)
* provide `parsed` to final response on resumed stream * update example * remove stream.close calls * use internal method --------- Co-authored-by: dogisgreat <[email protected]>
1 parent 4d79701 commit 2104d8b

File tree

3 files changed

+193
-77
lines changed

3 files changed

+193
-77
lines changed

examples/responses/streaming_previous_response.rb

Lines changed: 137 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -7,73 +7,150 @@
77

88
client = OpenAI::Client.new
99

10-
# Request 1: Create a new streaming response with store=true
11-
puts "Creating a new streaming response..."
12-
stream = client.responses.stream(
13-
model: "o4-mini",
14-
input: "Tell me a short story about a robot learning to paint.",
15-
instructions: "You are a creative storyteller.",
16-
background: true
17-
)
18-
19-
events = []
20-
response_id = ""
21-
22-
stream.each do |event|
23-
events << event
24-
puts "Event from initial stream: #{event.type} (seq: #{event.sequence_number})"
25-
case event
26-
27-
when OpenAI::Models::Responses::ResponseCreatedEvent
28-
response_id = event.response.id if response_id.empty?
29-
puts("Captured response ID: #{response_id}")
10+
begin
11+
puts "----- resuming stream from a previous response -----"
12+
13+
# Request 1: Create a new streaming response with background=true
14+
puts "Creating a new streaming response..."
15+
stream = client.responses.stream(
16+
model: "o4-mini",
17+
input: "Tell me a short story about a robot learning to paint.",
18+
instructions: "You are a creative storyteller.",
19+
background: true
20+
)
21+
22+
events = []
23+
response_id = ""
24+
25+
stream.each do |event|
26+
events << event
27+
puts "Event from initial stream: #{event.type} (seq: #{event.sequence_number})"
28+
case event
29+
30+
when OpenAI::Models::Responses::ResponseCreatedEvent
31+
response_id = event.response.id if response_id.empty?
32+
puts("Captured response ID: #{response_id}")
33+
end
34+
35+
# Simulate stopping after a few events
36+
if events.length >= 5
37+
puts "Terminating after #{events.length} events"
38+
break
39+
end
3040
end
3141

32-
# Simulate stopping after a few events
33-
if events.length >= 5
34-
puts "Terminating after #{events.length} events"
35-
break
42+
puts "Collected #{events.length} events"
43+
puts "Response ID: #{response_id}"
44+
puts "Last event sequence number: #{events.last.sequence_number}.\n"
45+
46+
# Give the background response some time to process more events.
47+
puts "Waiting a moment for the background response to progress...\n"
48+
sleep(3)
49+
50+
# Request 2: Resume the stream using the captured response_id.
51+
puts
52+
puts "Resuming stream from sequence #{events.last.sequence_number}..."
53+
54+
resumed_stream = client.responses.stream(
55+
previous_response_id: response_id,
56+
starting_after: events.last.sequence_number
57+
)
58+
59+
resumed_events = []
60+
resumed_stream.each do |event|
61+
resumed_events << event
62+
puts "Event from resumed stream: #{event.type} (seq: #{event.sequence_number})"
63+
# Stop when we get the completed event or collect enough events.
64+
if event.is_a?(OpenAI::Models::Responses::ResponseCompletedEvent)
65+
puts "Response completed!"
66+
break
67+
end
68+
69+
break if resumed_events.length >= 10
70+
end
71+
72+
puts "Collected #{resumed_events.length} additional events"
73+
74+
# Show that we properly resumed from where we left off.
75+
if resumed_events.any?
76+
first_resumed_event = resumed_events.first
77+
last_initial_event = events.last
78+
puts "First resumed event sequence: #{first_resumed_event.sequence_number}"
79+
puts "Should be greater than last initial event: #{last_initial_event.sequence_number}"
3680
end
3781
end
3882

39-
stream.close
40-
41-
puts
42-
puts "Collected #{events.length} events"
43-
puts "Response ID: #{response_id}"
44-
puts "Last event sequence number: #{events.last.sequence_number}.\n"
45-
46-
# Give the background response some time to process more events.
47-
puts "Waiting a moment for the background response to progress...\n"
48-
sleep(2)
49-
50-
# Request 2: Resume the stream using the captured response_id.
51-
puts "Resuming stream from sequence #{events.last.sequence_number}..."
52-
53-
resumed_stream = client.responses.stream(
54-
previous_response_id: response_id,
55-
starting_after: events.last.sequence_number
56-
)
57-
58-
resumed_events = []
59-
resumed_stream.each do |event|
60-
resumed_events << event
61-
puts "Event from resumed stream: #{event.type} (seq: #{event.sequence_number})"
62-
# Stop when we get the completed event or collect enough events.
63-
if event.is_a?(OpenAI::Models::Responses::ResponseCompletedEvent)
64-
puts "Response completed!"
65-
break
83+
begin
84+
puts "\n----- resuming stream with structured outputs -----"
85+
86+
class Step < OpenAI::BaseModel
87+
required :explanation, String
88+
required :output, String
6689
end
6790

68-
break if resumed_events.length >= 10
69-
end
91+
class MathResponse < OpenAI::BaseModel
92+
required :steps, OpenAI::ArrayOf[Step]
93+
required :final_answer, String
94+
end
95+
96+
puts "Creating a background streaming response with structured output..."
97+
stream = client.responses.stream(
98+
input: "solve 8x + 31 = 2",
99+
model: "gpt-4o-2024-08-06",
100+
text: MathResponse,
101+
background: true
102+
)
70103

71-
puts "\nCollected #{resumed_events.length} additional events"
104+
events = []
105+
response_id = ""
106+
107+
stream.each do |event|
108+
events << event
109+
110+
case event
111+
when OpenAI::Models::Responses::ResponseCreatedEvent
112+
response_id = event.response.id if response_id.empty?
113+
end
114+
115+
if events.length >= 5
116+
break
117+
end
118+
end
119+
120+
puts "Waiting for the background response to complete...\n"
121+
sleep(3)
122+
123+
puts
124+
puts "Resuming stream from sequence #{events.last.sequence_number}..."
125+
126+
resumed_stream = client.responses.stream(
127+
previous_response_id: response_id,
128+
starting_after: events.last.sequence_number,
129+
# NOTE: You must pass the structured output format when resuming to access parsed
130+
# outputs in the resumed stream.
131+
text: MathResponse
132+
)
133+
134+
resumed_stream.each do |event|
135+
case event
136+
when OpenAI::Streaming::ResponseTextDeltaEvent
137+
print(event.delta)
138+
when OpenAI::Streaming::ResponseTextDoneEvent
139+
puts
140+
puts("--- Parsed object from resumed stream ---")
141+
pp(event.parsed)
142+
when OpenAI::Models::Responses::ResponseCompletedEvent
143+
puts("Response completed.")
144+
break
145+
end
146+
end
72147

73-
# Show that we properly resumed from where we left off.
74-
if resumed_events.any?
75-
first_resumed_event = resumed_events.first
76-
last_initial_event = events.last
77-
puts "First resumed event sequence: #{first_resumed_event.sequence_number}"
78-
puts "Should be greater than last initial event: #{last_initial_event.sequence_number}"
148+
puts "\nFinal response parsed outputs:"
149+
response = resumed_stream.get_final_response
150+
response
151+
.output
152+
.flat_map { _1.content }
153+
.each do |content|
154+
pp(content.parsed)
155+
end
79156
end

lib/openai/resources/responses.rb

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -170,20 +170,22 @@ def stream(params)
170170
end
171171
model, tool_models = get_structured_output_models(parsed)
172172

173+
unwrap = ->(raw) do
174+
if raw[:type] == "response.completed" && raw[:response]
175+
parse_structured_outputs!(raw[:response], model, tool_models)
176+
end
177+
raw
178+
end
179+
173180
if previous_response_id
174-
retrieve_params = {}
175-
retrieve_params[:include] = params[:include] if params[:include]
176-
retrieve_params[:request_options] = params[:request_options] if params[:request_options]
181+
retrieve_params = params.slice(:include, :request_options)
177182

178-
raw_stream = retrieve_streaming(previous_response_id, retrieve_params)
183+
raw_stream = retrieve_streaming_internal(
184+
previous_response_id,
185+
params: retrieve_params,
186+
unwrap: unwrap
187+
)
179188
else
180-
unwrap = ->(raw) do
181-
if raw[:type] == "response.completed" && raw[:response]
182-
parse_structured_outputs!(raw[:response], model, tool_models)
183-
end
184-
raw
185-
end
186-
187189
parsed[:stream] = true
188190

189191
raw_stream = @client.request(
@@ -365,6 +367,21 @@ def retrieve_streaming(response_id, params = {})
365367
)
366368
end
367369

370+
private def retrieve_streaming_internal(response_id, params:, unwrap:)
371+
parsed, options = OpenAI::Responses::ResponseRetrieveParams.dump_request(params)
372+
parsed.store(:stream, true)
373+
@client.request(
374+
method: :get,
375+
path: ["responses/%1$s", response_id],
376+
query: parsed,
377+
headers: {"accept" => "text/event-stream"},
378+
stream: OpenAI::Internal::Stream,
379+
model: OpenAI::Responses::ResponseStreamEvent,
380+
options: options,
381+
unwrap: unwrap
382+
)
383+
end
384+
368385
# Deletes a model response with the given ID.
369386
#
370387
# @overload delete(response_id, request_options: {})

test/openai/resources/responses/streaming_test.rb

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -305,23 +305,23 @@ def test_resume_stream_with_response_id_and_starting_after
305305
end
306306

307307
def test_resume_stream_with_structured_output
308-
# Stub the GET request to retrieve the response (streaming).
309-
stub_request(:get, "http://localhost/responses/msg_structured?stream=true")
308+
stub_request(:get, "http://localhost/responses/msg_background_structured?stream=true")
310309
.to_return(
311310
status: 200,
312311
headers: {"Content-Type" => "text/event-stream"},
313312
body: resume_stream_structured_output_sse_response
314313
)
315314

316-
stream = @client.responses.stream(previous_response_id: "msg_structured", text: WeatherModel)
315+
stream = @client.responses.stream(
316+
previous_response_id: "msg_background_structured",
317+
text: WeatherModel
318+
)
317319
events = stream.to_a
318320

319321
text_done = events.find { |e| e.type == :"response.output_text.done" }
320-
assert_equal("{\"location\":\"San Francisco\",\"temperature\":72}", text_done.text)
321-
322-
# Verify the parsed content is available.
323322
assert_pattern do
324323
text_done => OpenAI::Streaming::ResponseTextDoneEvent[
324+
text: "{\"location\":\"San Francisco\",\"temperature\":72}",
325325
parsed: WeatherModel[
326326
location: "San Francisco",
327327
temperature: 72
@@ -338,6 +338,28 @@ def test_resume_stream_with_structured_output
338338
}
339339
]
340340
end
341+
342+
# Also verify the parsed field is available in the final response for resumed streams.
343+
final_response = stream.get_final_response
344+
assert_pattern do
345+
final_response => OpenAI::Models::Responses::Response[
346+
id: "msg_structured",
347+
status: :completed,
348+
output: [
349+
{
350+
content: [
351+
{
352+
text: "{\"location\":\"San Francisco\",\"temperature\":72}",
353+
parsed: WeatherModel[
354+
location: "San Francisco",
355+
temperature: 72
356+
]
357+
}
358+
]
359+
}
360+
]
361+
]
362+
end
341363
end
342364

343365
def test_structured_output_parsed_in_final_response

0 commit comments

Comments
 (0)