Skip to content

Commit 7617e80

Browse files
committed
add streaming example
1 parent cc89a7c commit 7617e80

File tree

2 files changed

+118
-0
lines changed

2 files changed

+118
-0
lines changed

examples/responses-structured-outputs/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ async-openai = { path = "../../async-openai" }
99
serde_json = "1.0"
1010
tokio = { version = "1", features = ["full"] }
1111
clap = { version = "4", features = ["derive"] }
12+
futures = "0.3"

examples/responses-structured-outputs/src/main.rs

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@ use async_openai::{
66
chat::ResponseFormatJsonSchema,
77
responses::{
88
CreateResponseArgs, InputMessage, InputRole, OutputItem, OutputMessageContent,
9+
ResponseStreamEvent,
910
},
1011
},
1112
Client,
1213
};
1314
use clap::Parser;
15+
use futures::StreamExt;
1416
use serde_json::json;
17+
use std::io::{stdout, Write};
1518

1619
/// Chain of thought example: Guides the model through step-by-step reasoning
1720
async fn chain_of_thought(client: &Client<OpenAIConfig>) -> Result<(), Box<dyn Error>> {
@@ -289,6 +292,114 @@ async fn moderation(client: &Client<OpenAIConfig>) -> Result<(), Box<dyn Error>>
289292
Ok(())
290293
}
291294

295+
/// Streaming structured output example: Extract entities from text with streaming
296+
async fn streaming_structured_output(client: &Client<OpenAIConfig>) -> Result<(), Box<dyn Error>> {
297+
println!("=== Streaming Structured Output Example ===\n");
298+
299+
let schema = json!({
300+
"type": "object",
301+
"properties": {
302+
"attributes": {
303+
"type": "array",
304+
"items": { "type": "string" }
305+
},
306+
"colors": {
307+
"type": "array",
308+
"items": { "type": "string" }
309+
},
310+
"animals": {
311+
"type": "array",
312+
"items": { "type": "string" }
313+
}
314+
},
315+
"required": ["attributes", "colors", "animals"],
316+
"additionalProperties": false
317+
});
318+
319+
let request = CreateResponseArgs::default()
320+
.model("gpt-4.1")
321+
.stream(true)
322+
.text(ResponseFormatJsonSchema {
323+
description: Some("Extract entities from the input text".to_string()),
324+
name: "entities".to_string(),
325+
schema: Some(schema),
326+
strict: Some(true),
327+
})
328+
.input(vec![
329+
InputMessage {
330+
role: InputRole::System,
331+
content: vec!["Extract entities from the input text".into()],
332+
status: None,
333+
},
334+
InputMessage {
335+
role: InputRole::User,
336+
content: vec![
337+
"The quick brown fox jumps over the lazy dog with piercing blue eyes".into(),
338+
],
339+
status: None,
340+
},
341+
])
342+
.build()?;
343+
344+
let mut stream = client.responses().create_stream(request).await?;
345+
let mut lock = stdout().lock();
346+
let mut final_response = None;
347+
348+
while let Some(result) = stream.next().await {
349+
match result {
350+
Ok(event) => match event {
351+
ResponseStreamEvent::ResponseRefusalDelta(delta) => {
352+
write!(lock, "{}", delta.delta)?;
353+
lock.flush()?;
354+
}
355+
ResponseStreamEvent::ResponseOutputTextDelta(delta) => {
356+
write!(lock, "{}", delta.delta)?;
357+
lock.flush()?;
358+
}
359+
ResponseStreamEvent::ResponseError(error) => {
360+
writeln!(lock, "\nError: {}", error.message)?;
361+
if let Some(code) = &error.code {
362+
writeln!(lock, "Code: {}", code)?;
363+
}
364+
if let Some(param) = &error.param {
365+
writeln!(lock, "Param: {}", param)?;
366+
}
367+
}
368+
ResponseStreamEvent::ResponseCompleted(completed) => {
369+
writeln!(lock, "\nCompleted")?;
370+
final_response = Some(completed.response);
371+
break;
372+
}
373+
ResponseStreamEvent::ResponseFailed(_)
374+
| ResponseStreamEvent::ResponseIncomplete(_) => {
375+
break;
376+
}
377+
_ => {
378+
// Ignore other events
379+
}
380+
},
381+
Err(e) => {
382+
writeln!(lock, "\nStream error: {:#?}", e)?;
383+
}
384+
}
385+
}
386+
387+
if let Some(response) = final_response {
388+
writeln!(lock, "\nFinal response:")?;
389+
for output in response.output {
390+
if let OutputItem::Message(message) = output {
391+
for content in message.content {
392+
if let OutputMessageContent::OutputText(text) = content {
393+
writeln!(lock, "{}", text.text)?;
394+
}
395+
}
396+
}
397+
}
398+
}
399+
400+
Ok(())
401+
}
402+
292403
#[derive(Parser, Debug)]
293404
#[command(name = "responses-structured-outputs")]
294405
#[command(about = "Examples of structured outputs using the Responses API", long_about = None)]
@@ -308,6 +419,8 @@ enum Example {
308419
UiGeneration,
309420
/// Moderation: Analyze content for policy violations
310421
Moderation,
422+
/// Streaming structured output: Extract entities with streaming
423+
Streaming,
311424
/// Run all examples
312425
All,
313426
}
@@ -330,11 +443,15 @@ async fn main() -> Result<(), Box<dyn Error>> {
330443
Example::Moderation => {
331444
moderation(&client).await?;
332445
}
446+
Example::Streaming => {
447+
streaming_structured_output(&client).await?;
448+
}
333449
Example::All => {
334450
chain_of_thought(&client).await?;
335451
structured_data_extraction(&client).await?;
336452
ui_generation(&client).await?;
337453
moderation(&client).await?;
454+
streaming_structured_output(&client).await?;
338455
}
339456
}
340457

0 commit comments

Comments
 (0)