Skip to content

Commit 3a6a43f

Browse files
authored
Extract single responses SSE event parsing (#9114)
To be reused in WebSockets parsing.
1 parent d7cdcfc commit 3a6a43f

File tree

1 file changed

+129
-113
lines changed

1 file changed

+129
-113
lines changed

codex-rs/codex-api/src/sse/responses.rs

Lines changed: 129 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ struct ResponseCompletedOutputTokensDetails {
126126
}
127127

128128
#[derive(Deserialize, Debug)]
129-
struct SseEvent {
129+
struct ResponsesStreamEvent {
130130
#[serde(rename = "type")]
131131
kind: String,
132132
response: Option<Value>,
@@ -136,14 +136,130 @@ struct SseEvent {
136136
content_index: Option<i64>,
137137
}
138138

139+
#[derive(Debug)]
140+
pub enum ResponsesEventError {
141+
Api(ApiError),
142+
}
143+
144+
impl ResponsesEventError {
145+
pub fn into_api_error(self) -> ApiError {
146+
match self {
147+
Self::Api(error) => error,
148+
}
149+
}
150+
}
151+
152+
fn process_responses_event(
153+
event: ResponsesStreamEvent,
154+
) -> std::result::Result<Option<ResponseEvent>, ResponsesEventError> {
155+
match event.kind.as_str() {
156+
"response.output_item.done" => {
157+
if let Some(item_val) = event.item {
158+
if let Ok(item) = serde_json::from_value::<ResponseItem>(item_val) {
159+
return Ok(Some(ResponseEvent::OutputItemDone(item)));
160+
}
161+
debug!("failed to parse ResponseItem from output_item.done");
162+
}
163+
}
164+
"response.output_text.delta" => {
165+
if let Some(delta) = event.delta {
166+
return Ok(Some(ResponseEvent::OutputTextDelta(delta)));
167+
}
168+
}
169+
"response.reasoning_summary_text.delta" => {
170+
if let (Some(delta), Some(summary_index)) = (event.delta, event.summary_index) {
171+
return Ok(Some(ResponseEvent::ReasoningSummaryDelta {
172+
delta,
173+
summary_index,
174+
}));
175+
}
176+
}
177+
"response.reasoning_text.delta" => {
178+
if let (Some(delta), Some(content_index)) = (event.delta, event.content_index) {
179+
return Ok(Some(ResponseEvent::ReasoningContentDelta {
180+
delta,
181+
content_index,
182+
}));
183+
}
184+
}
185+
"response.created" => {
186+
if event.response.is_some() {
187+
return Ok(Some(ResponseEvent::Created {}));
188+
}
189+
}
190+
"response.failed" => {
191+
if let Some(resp_val) = event.response {
192+
let mut response_error = ApiError::Stream("response.failed event received".into());
193+
if let Some(error) = resp_val.get("error")
194+
&& let Ok(error) = serde_json::from_value::<Error>(error.clone())
195+
{
196+
if is_context_window_error(&error) {
197+
response_error = ApiError::ContextWindowExceeded;
198+
} else if is_quota_exceeded_error(&error) {
199+
response_error = ApiError::QuotaExceeded;
200+
} else if is_usage_not_included(&error) {
201+
response_error = ApiError::UsageNotIncluded;
202+
} else {
203+
let delay = try_parse_retry_after(&error);
204+
let message = error.message.unwrap_or_default();
205+
response_error = ApiError::Retryable { message, delay };
206+
}
207+
}
208+
return Err(ResponsesEventError::Api(response_error));
209+
}
210+
211+
return Err(ResponsesEventError::Api(ApiError::Stream(
212+
"response.failed event received".into(),
213+
)));
214+
}
215+
"response.completed" => {
216+
if let Some(resp_val) = event.response {
217+
match serde_json::from_value::<ResponseCompleted>(resp_val) {
218+
Ok(resp) => {
219+
return Ok(Some(ResponseEvent::Completed {
220+
response_id: resp.id,
221+
token_usage: resp.usage.map(Into::into),
222+
}));
223+
}
224+
Err(err) => {
225+
let error = format!("failed to parse ResponseCompleted: {err}");
226+
debug!("{error}");
227+
return Err(ResponsesEventError::Api(ApiError::Stream(error)));
228+
}
229+
}
230+
}
231+
}
232+
"response.output_item.added" => {
233+
if let Some(item_val) = event.item {
234+
if let Ok(item) = serde_json::from_value::<ResponseItem>(item_val) {
235+
return Ok(Some(ResponseEvent::OutputItemAdded(item)));
236+
}
237+
debug!("failed to parse ResponseItem from output_item.done");
238+
}
239+
}
240+
"response.reasoning_summary_part.added" => {
241+
if let Some(summary_index) = event.summary_index {
242+
return Ok(Some(ResponseEvent::ReasoningSummaryPartAdded {
243+
summary_index,
244+
}));
245+
}
246+
}
247+
_ => {
248+
trace!("unhandled responses event: {}", event.kind);
249+
}
250+
}
251+
252+
Ok(None)
253+
}
254+
139255
pub async fn process_sse(
140256
stream: ByteStream,
141257
tx_event: mpsc::Sender<Result<ResponseEvent, ApiError>>,
142258
idle_timeout: Duration,
143259
telemetry: Option<Arc<dyn SseTelemetry>>,
144260
) {
145261
let mut stream = stream.eventsource();
146-
let mut response_completed: Option<ResponseCompleted> = None;
262+
let mut response_completed: Option<ResponseEvent> = None;
147263
let mut response_error: Option<ApiError> = None;
148264

149265
loop {
@@ -161,11 +277,7 @@ pub async fn process_sse(
161277
}
162278
Ok(None) => {
163279
match response_completed.take() {
164-
Some(ResponseCompleted { id, usage }) => {
165-
let event = ResponseEvent::Completed {
166-
response_id: id,
167-
token_usage: usage.map(Into::into),
168-
};
280+
Some(event) => {
169281
let _ = tx_event.send(Ok(event)).await;
170282
}
171283
None => {
@@ -188,123 +300,27 @@ pub async fn process_sse(
188300
let raw = sse.data.clone();
189301
trace!("SSE event: {raw}");
190302

191-
let event: SseEvent = match serde_json::from_str(&sse.data) {
303+
let event: ResponsesStreamEvent = match serde_json::from_str(&sse.data) {
192304
Ok(event) => event,
193305
Err(e) => {
194306
debug!("Failed to parse SSE event: {e}, data: {}", &sse.data);
195307
continue;
196308
}
197309
};
198310

199-
match event.kind.as_str() {
200-
"response.output_item.done" => {
201-
let Some(item_val) = event.item else { continue };
202-
let Ok(item) = serde_json::from_value::<ResponseItem>(item_val) else {
203-
debug!("failed to parse ResponseItem from output_item.done");
204-
continue;
205-
};
206-
207-
let event = ResponseEvent::OutputItemDone(item);
208-
if tx_event.send(Ok(event)).await.is_err() {
209-
return;
210-
}
211-
}
212-
"response.output_text.delta" => {
213-
if let Some(delta) = event.delta {
214-
let event = ResponseEvent::OutputTextDelta(delta);
215-
if tx_event.send(Ok(event)).await.is_err() {
216-
return;
217-
}
218-
}
219-
}
220-
"response.reasoning_summary_text.delta" => {
221-
if let (Some(delta), Some(summary_index)) = (event.delta, event.summary_index) {
222-
let event = ResponseEvent::ReasoningSummaryDelta {
223-
delta,
224-
summary_index,
225-
};
226-
if tx_event.send(Ok(event)).await.is_err() {
227-
return;
228-
}
229-
}
230-
}
231-
"response.reasoning_text.delta" => {
232-
if let (Some(delta), Some(content_index)) = (event.delta, event.content_index) {
233-
let event = ResponseEvent::ReasoningContentDelta {
234-
delta,
235-
content_index,
236-
};
237-
if tx_event.send(Ok(event)).await.is_err() {
238-
return;
239-
}
240-
}
241-
}
242-
"response.created" => {
243-
if event.response.is_some() {
244-
let _ = tx_event.send(Ok(ResponseEvent::Created {})).await;
245-
}
246-
}
247-
"response.failed" => {
248-
if let Some(resp_val) = event.response {
249-
response_error =
250-
Some(ApiError::Stream("response.failed event received".into()));
251-
252-
if let Some(error) = resp_val.get("error")
253-
&& let Ok(error) = serde_json::from_value::<Error>(error.clone())
254-
{
255-
if is_context_window_error(&error) {
256-
response_error = Some(ApiError::ContextWindowExceeded);
257-
} else if is_quota_exceeded_error(&error) {
258-
response_error = Some(ApiError::QuotaExceeded);
259-
} else if is_usage_not_included(&error) {
260-
response_error = Some(ApiError::UsageNotIncluded);
261-
} else {
262-
let delay = try_parse_retry_after(&error);
263-
let message = error.message.clone().unwrap_or_default();
264-
response_error = Some(ApiError::Retryable { message, delay });
265-
}
266-
}
267-
}
268-
}
269-
"response.completed" => {
270-
if let Some(resp_val) = event.response {
271-
match serde_json::from_value::<ResponseCompleted>(resp_val) {
272-
Ok(r) => {
273-
response_completed = Some(r);
274-
}
275-
Err(e) => {
276-
let error = format!("failed to parse ResponseCompleted: {e}");
277-
debug!(error);
278-
response_error = Some(ApiError::Stream(error));
279-
continue;
280-
}
281-
};
282-
};
283-
}
284-
"response.output_item.added" => {
285-
let Some(item_val) = event.item else { continue };
286-
let Ok(item) = serde_json::from_value::<ResponseItem>(item_val) else {
287-
debug!("failed to parse ResponseItem from output_item.done");
288-
continue;
289-
};
290-
291-
let event = ResponseEvent::OutputItemAdded(item);
292-
if tx_event.send(Ok(event)).await.is_err() {
311+
match process_responses_event(event) {
312+
Ok(Some(event)) => {
313+
if matches!(event, ResponseEvent::Completed { .. }) {
314+
response_completed = Some(event);
315+
} else if tx_event.send(Ok(event)).await.is_err() {
293316
return;
294317
}
295318
}
296-
"response.reasoning_summary_part.added" => {
297-
if let Some(summary_index) = event.summary_index {
298-
let event = ResponseEvent::ReasoningSummaryPartAdded { summary_index };
299-
if tx_event.send(Ok(event)).await.is_err() {
300-
return;
301-
}
302-
}
319+
Ok(None) => {}
320+
Err(error) => {
321+
response_error = Some(error.into_api_error());
303322
}
304-
_ => {
305-
trace!("unhandled SSE event: {:#?}", event.kind);
306-
}
307-
}
323+
};
308324
}
309325
}
310326

0 commit comments

Comments
 (0)