Skip to content

Commit 34006a6

Browse files
committed
updated responses function to include streaming example
1 parent 5bd23f8 commit 34006a6

File tree

2 files changed

+265
-6
lines changed

2 files changed

+265
-6
lines changed

examples/responses-function-call/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ async-openai = {path = "../../async-openai"}
99
serde_json = "1.0.135"
1010
tokio = { version = "1.43.0", features = ["full"] }
1111
serde = { version = "1.0.219", features = ["derive"] }
12+
clap = { version = "4", features = ["derive"] }
13+
futures = "0.3"

examples/responses-function-call/src/main.rs

Lines changed: 263 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
use async_openai::{
2+
traits::EventType,
23
types::responses::{
34
CreateResponseArgs, EasyInputMessage, FunctionCallOutput, FunctionCallOutputItemParam,
4-
FunctionTool, FunctionToolCall, InputItem, InputParam, Item, OutputItem, Tool,
5+
FunctionTool, FunctionToolCall, InputItem, InputParam, Item, OutputItem,
6+
ResponseStreamEvent, Tool,
57
},
68
Client,
79
};
10+
use clap::Parser;
11+
use futures::StreamExt;
812
use serde::Deserialize;
13+
use std::collections::HashMap;
914
use std::error::Error;
15+
use std::io::{stdout, Write};
1016

1117
#[derive(Debug, Deserialize)]
1218
struct WeatherFunctionArgs {
@@ -18,8 +24,27 @@ fn check_weather(location: String, units: String) -> String {
1824
format!("The weather in {location} is 25 {units}")
1925
}
2026

21-
#[tokio::main]
22-
async fn main() -> Result<(), Box<dyn Error>> {
27+
#[derive(Parser, Debug)]
28+
#[command(name = "responses-function-call")]
29+
#[command(about = "Example demonstrating function calls with the Responses API")]
30+
struct Args {
31+
#[command(subcommand)]
32+
command: Command,
33+
}
34+
35+
#[derive(clap::Subcommand, Debug)]
36+
enum Command {
37+
/// Run non-streaming function call example
38+
NonStreaming,
39+
/// Run streaming function call example
40+
Streaming,
41+
/// Run both streaming and non-streaming examples
42+
All,
43+
}
44+
45+
async fn run_non_streaming() -> Result<(), Box<dyn Error>> {
46+
println!("=== Non-Streaming Function Call Example ===\n");
47+
2348
let client = Client::new();
2449

2550
let tools = vec![Tool::Function(FunctionTool {
@@ -62,7 +87,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
6287
.tools(tools.clone())
6388
.build()?;
6489

65-
println!("{}", serde_json::to_string(&request).unwrap());
90+
println!("Request: {}", serde_json::to_string(&request)?);
91+
println!("\n---\n");
6692

6793
let response = client.responses().create(request).await?;
6894

@@ -81,6 +107,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
81107
return Ok(());
82108
};
83109

110+
println!(
111+
"Function call requested: {} with arguments: {}",
112+
function_call_request.name, function_call_request.arguments
113+
);
114+
84115
let function_result = match function_call_request.name.as_str() {
85116
"get_weather" => {
86117
let args: WeatherFunctionArgs = serde_json::from_str(&function_call_request.arguments)?;
@@ -92,6 +123,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
92123
}
93124
};
94125

126+
println!("Function result: {}\n", function_result);
127+
95128
// Add the function call from the assistant back to the conversation
96129
input_items.push(InputItem::Item(Item::FunctionCall(
97130
function_call_request.clone(),
@@ -114,11 +147,235 @@ async fn main() -> Result<(), Box<dyn Error>> {
114147
.tools(tools)
115148
.build()?;
116149

117-
println!("request 2 {}", serde_json::to_string(&request).unwrap());
150+
println!("Second request: {}", serde_json::to_string(&request)?);
151+
println!("\n---\n");
118152

119153
let response = client.responses().create(request).await?;
120154

121-
println!("{}", serde_json::to_string(&response).unwrap());
155+
println!("Final response: {}", serde_json::to_string(&response)?);
156+
157+
Ok(())
158+
}
159+
160+
async fn run_streaming() -> Result<(), Box<dyn Error>> {
161+
println!("=== Streaming Function Call Example ===\n");
162+
163+
let client = Client::new();
164+
165+
let tools = vec![Tool::Function(FunctionTool {
166+
name: "get_weather".to_string(),
167+
description: Some("Retrieves current weather for the given location".to_string()),
168+
parameters: Some(serde_json::json!(
169+
{
170+
"type": "object",
171+
"properties": {
172+
"location": {
173+
"type": "string",
174+
"description": "City and country e.g. Bogotá, Colombia"
175+
},
176+
"units": {
177+
"type": "string",
178+
"enum": [
179+
"celsius",
180+
"fahrenheit"
181+
],
182+
"description": "Units the temperature will be returned in."
183+
}
184+
},
185+
"required": [
186+
"location",
187+
"units"
188+
],
189+
"additionalProperties": false
190+
}
191+
)),
192+
strict: None,
193+
})];
194+
195+
let mut input_items: Vec<InputItem> =
196+
vec![EasyInputMessage::from("What's the weather like in Paris today?").into()];
197+
198+
let request = CreateResponseArgs::default()
199+
.max_output_tokens(512u32)
200+
.model("gpt-4.1")
201+
.stream(true)
202+
.input(InputParam::Items(input_items.clone()))
203+
.tools(tools.clone())
204+
.build()?;
205+
206+
println!("Request: {}", serde_json::to_string(&request)?);
207+
println!("\n---\n");
208+
209+
let mut stream = client.responses().create_stream(request).await?;
210+
211+
// Track function call arguments as they stream in
212+
let mut function_call_args: HashMap<String, String> = HashMap::new();
213+
// Track function call metadata (name, call_id) by item_id
214+
let mut function_call_metadata: HashMap<String, (String, String)> = HashMap::new();
215+
let mut function_call_request: Option<FunctionToolCall> = None;
216+
let mut stdout_lock = stdout().lock();
217+
218+
while let Some(result) = stream.next().await {
219+
match result {
220+
Ok(event) => {
221+
match &event {
222+
ResponseStreamEvent::ResponseOutputItemAdded(added) => {
223+
// When a function call item is added, extract the call_id
224+
if let OutputItem::FunctionCall(fc) = &added.item {
225+
let item_id = fc.id.clone().unwrap_or_default();
226+
function_call_metadata
227+
.insert(item_id.clone(), (fc.name.clone(), fc.call_id.clone()));
228+
writeln!(stdout_lock, "{}: {}\n", added.event_type(), fc.name)?;
229+
}
230+
}
231+
ResponseStreamEvent::ResponseFunctionCallArgumentsDelta(delta) => {
232+
// Accumulate function call arguments
233+
let args = function_call_args
234+
.entry(delta.item_id.clone())
235+
.or_insert_with(String::new);
236+
args.push_str(&delta.delta);
237+
write!(stdout_lock, "{}: {}\n", delta.event_type(), delta.delta)?;
238+
stdout().flush()?;
239+
}
240+
ResponseStreamEvent::ResponseFunctionCallArgumentsDone(done) => {
241+
// Function call arguments are complete
242+
if let Some((name, call_id)) = function_call_metadata.get(&done.item_id) {
243+
let arguments = function_call_args
244+
.remove(&done.item_id)
245+
.unwrap_or_else(|| done.arguments.clone());
246+
247+
writeln!(
248+
stdout_lock,
249+
"{}: [Function call complete: {}]",
250+
done.event_type(),
251+
name
252+
)?;
253+
writeln!(
254+
stdout_lock,
255+
"{}: Arguments: {}\n",
256+
done.event_type(),
257+
arguments
258+
)?;
259+
260+
// Create the function call request
261+
function_call_request = Some(FunctionToolCall {
262+
name: name.clone(),
263+
arguments: arguments,
264+
call_id: call_id.clone(),
265+
id: Some(done.item_id.clone()),
266+
status: None,
267+
});
268+
}
269+
}
270+
ResponseStreamEvent::ResponseOutputTextDelta(delta) => {
271+
write!(stdout_lock, "{}: {}\n", delta.event_type(), delta.delta)?;
272+
stdout().flush()?;
273+
}
274+
ResponseStreamEvent::ResponseCompleted(completed) => {
275+
// todo remove once StreamEnded is fixed
276+
writeln!(stdout_lock, "{}\n", completed.event_type(),)?;
277+
break;
278+
}
279+
_ => {
280+
writeln!(stdout_lock, "{}: skipping\n", event.event_type())?;
281+
}
282+
}
283+
}
284+
Err(e) => {
285+
writeln!(stdout_lock, "\nError: {:?}", e)?;
286+
return Err(Box::new(e));
287+
}
288+
}
289+
}
290+
291+
// Execute the function call if we have one
292+
let Some(function_call_request) = function_call_request else {
293+
println!("\nNo function_call request found");
294+
return Ok(());
295+
};
296+
297+
println!("\n---\n");
298+
299+
let function_result = match function_call_request.name.as_str() {
300+
"get_weather" => {
301+
let args: WeatherFunctionArgs = serde_json::from_str(&function_call_request.arguments)?;
302+
check_weather(args.location, args.units)
303+
}
304+
_ => {
305+
println!("Unknown function {}", function_call_request.name);
306+
return Ok(());
307+
}
308+
};
309+
310+
println!("Function result: {}\n", function_result);
311+
312+
// Add the function call from the assistant back to the conversation
313+
input_items.push(InputItem::Item(Item::FunctionCall(
314+
function_call_request.clone(),
315+
)));
316+
317+
// Add the function call output back to the conversation
318+
input_items.push(InputItem::Item(Item::FunctionCallOutput(
319+
FunctionCallOutputItemParam {
320+
call_id: function_call_request.call_id.clone(),
321+
output: FunctionCallOutput::Text(function_result),
322+
id: None,
323+
status: None,
324+
},
325+
)));
326+
327+
let request = CreateResponseArgs::default()
328+
.max_output_tokens(512u32)
329+
.model("gpt-4.1")
330+
.stream(true)
331+
.input(InputParam::Items(input_items))
332+
.tools(tools)
333+
.build()?;
334+
335+
println!("Second request: {}", serde_json::to_string(&request)?);
336+
println!("\n---\n");
337+
println!("Final response (streaming):\n");
338+
339+
let mut stream = client.responses().create_stream(request).await?;
340+
let mut stdout_lock = stdout().lock();
341+
342+
while let Some(result) = stream.next().await {
343+
match result {
344+
Ok(event) => match &event {
345+
ResponseStreamEvent::ResponseOutputTextDelta(delta) => {
346+
write!(stdout_lock, "{}: {}\n", delta.event_type(), delta.delta)?;
347+
stdout().flush()?;
348+
}
349+
ResponseStreamEvent::ResponseCompleted(completed) => {
350+
// todo remove once StreamEnded is fixed
351+
writeln!(stdout_lock, "{}\n", completed.event_type(),)?;
352+
break;
353+
}
354+
_ => {
355+
writeln!(stdout_lock, "{}: skipping\n", event.event_type())?;
356+
}
357+
},
358+
Err(e) => {
359+
writeln!(stdout_lock, "\nError: {:?}", e)?;
360+
return Err(Box::new(e));
361+
}
362+
}
363+
}
122364

123365
Ok(())
124366
}
367+
368+
#[tokio::main]
369+
async fn main() -> Result<(), Box<dyn Error>> {
370+
let args = Args::parse();
371+
372+
match args.command {
373+
Command::NonStreaming => run_non_streaming().await,
374+
Command::Streaming => run_streaming().await,
375+
Command::All => {
376+
run_non_streaming().await?;
377+
println!("\n\n");
378+
run_streaming().await
379+
}
380+
}
381+
}

0 commit comments

Comments
 (0)