Skip to content

Commit 1e2cc8d

Browse files
MatejKosecclaude
andcommitted
fix: address CodeRabbit v2 review findings
- Add stream_handle.arm() to streaming responses path for client disconnect detection (matches chat_completions handler pattern) - Add check_for_backend_error() to non-streaming responses path to detect backend errors before committing to HTTP 200 - Fix error messages referencing "chat completions" → "responses" - Populate output items in response.completed streaming event (was sending empty output array) - Remove dead current_fc_index field from ResponseStreamConverter - Remove duplicate ListInputItemsOrder enum (reuse ListOrder) - Remove redundant with_model override in ResponsesStreamPayload - Add tracing::debug for skipped unsupported input item types Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Matej Kosec <mkosec@nvidia.com>
1 parent 916fd40 commit 1e2cc8d

File tree

5 files changed

+54
-32
lines changed

5 files changed

+54
-32
lines changed

lib/async-openai/src/types/responses/api.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,6 @@ pub struct ListConversationItemsQuery {
3030
pub include: Option<Vec<IncludeParam>>,
3131
}
3232

33-
/// Sort order for listing input items.
34-
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, ToSchema)]
35-
#[serde(rename_all = "lowercase")]
36-
pub enum ListInputItemsOrder {
37-
/// Ascending order
38-
Asc,
39-
/// Descending order
40-
Desc,
41-
}
42-
4333
/// Query parameters for getting a response.
4434
#[derive(Debug, Serialize, Deserialize, Default, Clone, Builder, PartialEq, ToSchema)]
4535
#[builder(name = "GetResponseQueryArgs")]
@@ -75,7 +65,7 @@ pub struct ListInputItemsQuery {
7565
pub limit: Option<u32>,
7666
/// The order to return the input items in. Default is `desc`.
7767
#[serde(skip_serializing_if = "Option::is_none")]
78-
pub order: Option<ListInputItemsOrder>,
68+
pub order: Option<ListOrder>,
7969
/// An item ID to list items after, used in pagination.
8070
#[serde(skip_serializing_if = "Option::is_none")]
8171
pub after: Option<String>,

lib/llm/src/http/service/openai.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1164,7 +1164,7 @@ async fn handler_responses(
11641164
.await
11651165
.map_err(|e| {
11661166
ErrorMessage::internal_server_error(&format!(
1167-
"Failed to await chat completions task: {:?}",
1167+
"Failed to await responses task: {:?}",
11681168
e,
11691169
))
11701170
})?;
@@ -1181,7 +1181,7 @@ async fn responses(
11811181
state: Arc<service_v2::State>,
11821182
template: Option<RequestTemplate>,
11831183
mut request: Context<NvCreateResponse>,
1184-
stream_handle: ConnectionHandle,
1184+
mut stream_handle: ConnectionHandle,
11851185
) -> Result<Response, ErrorResponse> {
11861186
// return a 503 if the service is not ready
11871187
check_ready(&state)?;
@@ -1267,6 +1267,11 @@ async fn responses(
12671267
.create_inflight_guard(&model, Endpoint::Responses, streaming);
12681268

12691269
if streaming {
1270+
// For streaming responses, we return HTTP 200 immediately without checking for errors.
1271+
// Once HTTP 200 OK is sent, we cannot change the status code, so any backend errors
1272+
// must be delivered as SSE events in the stream. This is standard SSE behavior.
1273+
stream_handle.arm(); // allows the system to detect client disconnects and cancel the LLM generation
1274+
12701275
// Streaming path: convert chat completion stream chunks to Responses API SSE events.
12711276
// The engine yields Annotated<NvCreateChatCompletionStreamResponse>. We extract the
12721277
// inner stream response data and convert it to Responses API events.
@@ -1328,8 +1333,18 @@ async fn responses(
13281333
Ok(sse_stream.into_response())
13291334
} else {
13301335
// Non-streaming path: aggregate stream into single response
1336+
1337+
// Check first event for backend errors before aggregating (non-streaming only)
1338+
let stream_with_check =
1339+
check_for_backend_error(engine_stream)
1340+
.await
1341+
.map_err(|error_response| {
1342+
tracing::error!(request_id, "Backend error detected: {:?}", error_response);
1343+
error_response
1344+
})?;
1345+
13311346
let mut http_queue_guard = Some(http_queue_guard);
1332-
let stream = engine_stream.inspect(move |response| {
1347+
let stream = stream_with_check.inspect(move |response| {
13331348
process_response_and_observe_metrics(
13341349
response,
13351350
&mut response_collector,
@@ -1343,11 +1358,11 @@ async fn responses(
13431358
.map_err(|e| {
13441359
tracing::error!(
13451360
request_id,
1346-
"Failed to fold chat completions stream for: {:?}",
1361+
"Failed to fold responses stream: {:?}",
13471362
e
13481363
);
13491364
ErrorMessage::internal_server_error(&format!(
1350-
"Failed to fold chat completions stream: {}",
1365+
"Failed to fold responses stream: {}",
13511366
e
13521367
))
13531368
})?;

lib/llm/src/protocols/openai/responses/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -319,8 +319,8 @@ fn convert_input_items_to_messages(
319319
},
320320
));
321321
}
322-
_ => {
323-
// Skip other item types (file search, computer call, etc.)
322+
other => {
323+
tracing::debug!("Skipping unsupported input item type during conversion: {:?}", std::mem::discriminant(other));
324324
}
325325
},
326326
InputItem::EasyMessage(easy) => {

lib/llm/src/protocols/openai/responses/stream_converter.rs

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ pub struct ResponseStreamConverter {
4141
accumulated_text: String,
4242
// Function call tracking
4343
function_call_items: Vec<FunctionCallState>,
44-
current_fc_index: Option<usize>,
4544
// Output index counter
4645
next_output_index: u32,
4746
}
@@ -72,7 +71,6 @@ impl ResponseStreamConverter {
7271
message_output_index: 0,
7372
accumulated_text: String::new(),
7473
function_call_items: Vec::new(),
75-
current_fc_index: None,
7674
next_output_index: 0,
7775
}
7876
}
@@ -83,7 +81,7 @@ impl ResponseStreamConverter {
8381
seq
8482
}
8583

86-
fn make_response(&self, status: Status) -> Response {
84+
fn make_response(&self, status: Status, output: Vec<OutputItem>) -> Response {
8785
let completed_at = if status == Status::Completed {
8886
Some(
8987
SystemTime::now()
@@ -101,7 +99,7 @@ impl ResponseStreamConverter {
10199
completed_at,
102100
status,
103101
model: self.model.clone(),
104-
output: vec![],
102+
output,
105103
// Spec-required defaults
106104
background: Some(false),
107105
frequency_penalty: Some(0.0),
@@ -144,13 +142,13 @@ impl ResponseStreamConverter {
144142

145143
let created = ResponseStreamEvent::ResponseCreated(ResponseCreatedEvent {
146144
sequence_number: self.next_seq(),
147-
response: self.make_response(Status::InProgress),
145+
response: self.make_response(Status::InProgress, vec![]),
148146
});
149147
events.push(make_sse_event(&created));
150148

151149
let in_progress = ResponseStreamEvent::ResponseInProgress(ResponseInProgressEvent {
152150
sequence_number: self.next_seq(),
153-
response: self.make_response(Status::InProgress),
151+
response: self.make_response(Status::InProgress, vec![]),
154152
});
155153
events.push(make_sse_event(&in_progress));
156154

@@ -301,7 +299,6 @@ impl ResponseStreamConverter {
301299
}
302300
}
303301

304-
self.current_fc_index = Some(tc_index);
305302
}
306303
}
307304
}
@@ -399,10 +396,36 @@ impl ResponseStreamConverter {
399396
events.push(make_sse_event(&item_done));
400397
}
401398

399+
// Build the final output vector from accumulated state
400+
let mut output = Vec::new();
401+
if self.message_started {
402+
output.push(OutputItem::Message(OutputMessage {
403+
id: self.message_item_id.clone(),
404+
content: vec![OutputMessageContent::OutputText(OutputTextContent {
405+
text: self.accumulated_text.clone(),
406+
annotations: vec![],
407+
logprobs: Some(vec![]),
408+
})],
409+
role: AssistantRole::Assistant,
410+
status: OutputStatus::Completed,
411+
}));
412+
}
413+
for fc in &self.function_call_items {
414+
if fc.started {
415+
output.push(OutputItem::FunctionCall(FunctionToolCall {
416+
id: Some(fc.item_id.clone()),
417+
call_id: fc.call_id.clone(),
418+
name: fc.name.clone(),
419+
arguments: fc.accumulated_args.clone(),
420+
status: Some(OutputStatus::Completed),
421+
}));
422+
}
423+
}
424+
402425
// Emit response.completed
403426
let completed = ResponseStreamEvent::ResponseCompleted(ResponseCompletedEvent {
404427
sequence_number: self.next_seq(),
405-
response: self.make_response(Status::Completed),
428+
response: self.make_response(Status::Completed, output),
406429
});
407430
events.push(make_sse_event(&completed));
408431

tests/utils/payloads.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -547,12 +547,6 @@ class ResponsesStreamPayload(BasePayload):
547547
endpoint: str = "/v1/responses"
548548
http_stream: bool = True
549549

550-
def with_model(self, model):
551-
p = deepcopy(self)
552-
if "model" not in p.body:
553-
p.body = {**p.body, "model": model}
554-
return p
555-
556550
@staticmethod
557551
def extract_content(response):
558552
"""Parse SSE stream and validate event structure."""

0 commit comments

Comments
 (0)