Skip to content

Commit fe8122e

Browse files
authored
fix: change log_sse_event() so it no longer takes a closure (#4953)
Unlikely fix for #4381, but worth a shot given that #2103 changed around the same time.
1 parent 876d4f4 commit fe8122e

File tree

3 files changed

+18
-22
lines changed

3 files changed

+18
-22
lines changed

codex-rs/core/src/chat_completions.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -389,10 +389,12 @@ async fn process_chat_sse<S>(
389389
let mut reasoning_text = String::new();
390390

391391
loop {
392-
let sse = match otel_event_manager
393-
.log_sse_event(|| timeout(idle_timeout, stream.next()))
394-
.await
395-
{
392+
let start = std::time::Instant::now();
393+
let response = timeout(idle_timeout, stream.next()).await;
394+
let duration = start.elapsed();
395+
otel_event_manager.log_sse_event(&response, duration);
396+
397+
let sse = match response {
396398
Ok(Some(Ok(ev))) => ev,
397399
Ok(Some(Err(e))) => {
398400
let _ = tx_event

codex-rs/core/src/client.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -649,10 +649,12 @@ async fn process_sse<S>(
649649
let mut response_error: Option<CodexErr> = None;
650650

651651
loop {
652-
let sse = match otel_event_manager
653-
.log_sse_event(|| timeout(idle_timeout, stream.next()))
654-
.await
655-
{
652+
let start = std::time::Instant::now();
653+
let response = timeout(idle_timeout, stream.next()).await;
654+
let duration = start.elapsed();
655+
otel_event_manager.log_sse_event(&response, duration);
656+
657+
let sse = match response {
656658
Ok(Some(Ok(sse))) => sse,
657659
Ok(Some(Err(e))) => {
658660
debug!("SSE Error: {e:#}");

codex-rs/otel/src/otel_event_manager.rs

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -148,21 +148,15 @@ impl OtelEventManager {
148148
response
149149
}
150150

151-
pub async fn log_sse_event<Next, Fut, E>(
151+
pub fn log_sse_event<E>(
152152
&self,
153-
next: Next,
154-
) -> Result<Option<Result<StreamEvent, StreamError<E>>>, Elapsed>
155-
where
156-
Next: FnOnce() -> Fut,
157-
Fut: Future<Output = Result<Option<Result<StreamEvent, StreamError<E>>>, Elapsed>>,
153+
response: &Result<Option<Result<StreamEvent, StreamError<E>>>, Elapsed>,
154+
duration: Duration,
155+
) where
158156
E: Display,
159157
{
160-
let start = std::time::Instant::now();
161-
let response = next().await;
162-
let duration = start.elapsed();
163-
164158
match response {
165-
Ok(Some(Ok(ref sse))) => {
159+
Ok(Some(Ok(sse))) => {
166160
if sse.data.trim() == "[DONE]" {
167161
self.sse_event(&sse.event, duration);
168162
} else {
@@ -191,16 +185,14 @@ impl OtelEventManager {
191185
}
192186
}
193187
}
194-
Ok(Some(Err(ref error))) => {
188+
Ok(Some(Err(error))) => {
195189
self.sse_event_failed(None, duration, error);
196190
}
197191
Ok(None) => {}
198192
Err(_) => {
199193
self.sse_event_failed(None, duration, &"idle timeout waiting for SSE");
200194
}
201195
}
202-
203-
response
204196
}
205197

206198
fn sse_event(&self, kind: &str, duration: Duration) {

0 commit comments

Comments
 (0)