Skip to content

Commit e5e40e2

Browse files
authored
feat: add token usage on memories (openai#11618)
Add aggregated token usage metrics on phase 1 of memories
1 parent e6eb6be commit e5e40e2

File tree

2 files changed

+181
-51
lines changed

2 files changed

+181
-51
lines changed

codex-rs/core/src/memories/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ mod metrics {
7676
pub(super) const MEMORY_PHASE_ONE_JOBS: &str = "codex.memory.phase1";
7777
/// Number of raw memories produced by phase-1 startup extraction.
7878
pub(super) const MEMORY_PHASE_ONE_OUTPUT: &str = "codex.memory.phase1.output";
79+
/// Histogram for aggregate token usage across one phase-1 startup run.
80+
pub(super) const MEMORY_PHASE_ONE_TOKEN_USAGE: &str = "codex.memory.phase1.token_usage";
7981
/// Number of phase-2 startup jobs grouped by status.
8082
pub(super) const MEMORY_PHASE_TWO_JOBS: &str = "codex.memory.phase2";
8183
/// Number of stage-1 memories included in each phase-2 consolidation step.

codex-rs/core/src/memories/phase1.rs

Lines changed: 179 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use codex_protocol::models::ResponseItem;
1717
use codex_protocol::openai_models::ModelInfo;
1818
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
1919
use codex_protocol::protocol::RolloutItem;
20+
use codex_protocol::protocol::TokenUsage;
2021
use codex_utils_sanitizer::redact_secrets;
2122
use futures::StreamExt;
2223
use serde::Deserialize;
@@ -28,26 +29,32 @@ use tracing::info;
2829
use tracing::warn;
2930

3031
#[derive(Clone, Debug)]
31-
pub(in crate::memories) struct Phase1RequestContext {
32+
pub(in crate::memories) struct RequestContext {
3233
pub(in crate::memories) model_info: ModelInfo,
3334
pub(in crate::memories) otel_manager: OtelManager,
3435
pub(in crate::memories) reasoning_effort: Option<ReasoningEffortConfig>,
3536
pub(in crate::memories) reasoning_summary: ReasoningSummaryConfig,
3637
pub(in crate::memories) turn_metadata_header: Option<String>,
3738
}
3839

40+
struct JobResult {
41+
outcome: JobOutcome,
42+
token_usage: Option<TokenUsage>,
43+
}
44+
3945
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
40-
enum PhaseOneJobOutcome {
46+
enum JobOutcome {
4147
SucceededWithOutput,
4248
SucceededNoOutput,
4349
Failed,
4450
}
4551

46-
struct PhaseOneOutcomeCounts {
52+
struct Stats {
4753
claimed: usize,
4854
succeeded_with_output: usize,
4955
succeeded_no_output: usize,
5056
failed: usize,
57+
total_token_usage: Option<TokenUsage>,
5158
}
5259

5360
/// Phase 1 model output payload.
@@ -92,7 +99,7 @@ pub(in crate::memories) async fn run(session: &Arc<Session>) {
9299
let outcomes = run_jobs(session, claimed_candidates, stage_one_context).await;
93100

94101
// 4. Metrics and logs.
95-
let counts = count_outcomes(outcomes);
102+
let counts = aggregate_stats(outcomes);
96103
emit_metrics(session, &counts);
97104
info!(
98105
"memory stage-1 extraction complete: {} job(s) claimed, {} succeeded ({} with output, {} no output), {} failed",
@@ -118,7 +125,7 @@ pub fn output_schema() -> Value {
118125
})
119126
}
120127

121-
impl Phase1RequestContext {
128+
impl RequestContext {
122129
pub(in crate::memories) fn from_turn_context(
123130
turn_context: &TurnContext,
124131
turn_metadata_header: Option<String>,
@@ -172,9 +179,9 @@ async fn claim_startup_jobs(session: &Arc<Session>) -> Option<Vec<codex_state::S
172179
}
173180
}
174181

175-
async fn build_request_context(session: &Arc<Session>) -> Phase1RequestContext {
182+
async fn build_request_context(session: &Arc<Session>) -> RequestContext {
176183
let turn_context = session.new_default_turn().await;
177-
Phase1RequestContext::from_turn_context(
184+
RequestContext::from_turn_context(
178185
turn_context.as_ref(),
179186
turn_context.resolve_turn_metadata_header().await,
180187
)
@@ -183,8 +190,8 @@ async fn build_request_context(session: &Arc<Session>) -> Phase1RequestContext {
183190
async fn run_jobs(
184191
session: &Arc<Session>,
185192
claimed_candidates: Vec<codex_state::Stage1JobClaim>,
186-
stage_one_context: Phase1RequestContext,
187-
) -> Vec<PhaseOneJobOutcome> {
193+
stage_one_context: RequestContext,
194+
) -> Vec<JobResult> {
188195
futures::stream::iter(claimed_candidates.into_iter())
189196
.map(|claim| {
190197
let session = Arc::clone(session);
@@ -202,10 +209,10 @@ mod job {
202209
pub(in crate::memories) async fn run(
203210
session: &Session,
204211
claim: codex_state::Stage1JobClaim,
205-
stage_one_context: &Phase1RequestContext,
206-
) -> PhaseOneJobOutcome {
212+
stage_one_context: &RequestContext,
213+
) -> JobResult {
207214
let thread = claim.thread;
208-
let stage_one_output = match sample(
215+
let (stage_one_output, token_usage) = match sample(
209216
session,
210217
&thread.rollout_path,
211218
&thread.cwd,
@@ -222,32 +229,41 @@ mod job {
222229
&reason.to_string(),
223230
)
224231
.await;
225-
return PhaseOneJobOutcome::Failed;
232+
return JobResult {
233+
outcome: JobOutcome::Failed,
234+
token_usage: None,
235+
};
226236
}
227237
};
228238

229239
if stage_one_output.raw_memory.is_empty() || stage_one_output.rollout_summary.is_empty() {
230-
return result::no_output(session, thread.id, &claim.ownership_token).await;
240+
return JobResult {
241+
outcome: result::no_output(session, thread.id, &claim.ownership_token).await,
242+
token_usage,
243+
};
231244
}
232245

233-
result::success(
234-
session,
235-
thread.id,
236-
&claim.ownership_token,
237-
thread.updated_at.timestamp(),
238-
&stage_one_output.raw_memory,
239-
&stage_one_output.rollout_summary,
240-
)
241-
.await
246+
JobResult {
247+
outcome: result::success(
248+
session,
249+
thread.id,
250+
&claim.ownership_token,
251+
thread.updated_at.timestamp(),
252+
&stage_one_output.raw_memory,
253+
&stage_one_output.rollout_summary,
254+
)
255+
.await,
256+
token_usage,
257+
}
242258
}
243259

244260
/// Extract the rollout and perform the actual sampling.
245261
async fn sample(
246262
session: &Session,
247263
rollout_path: &Path,
248264
rollout_cwd: &Path,
249-
stage_one_context: &Phase1RequestContext,
250-
) -> anyhow::Result<StageOneOutput> {
265+
stage_one_context: &RequestContext,
266+
) -> anyhow::Result<(StageOneOutput, Option<TokenUsage>)> {
251267
let (rollout_items, _, _) = RolloutRecorder::load_rollout_items(rollout_path).await?;
252268
let rollout_contents = serialize_filtered_rollout_response_items(&rollout_items)?;
253269

@@ -290,6 +306,7 @@ mod job {
290306
// TODO(jif) we should have a shared helper somewhere for this.
291307
// Unwrap the stream.
292308
let mut result = String::new();
309+
let mut token_usage = None;
293310
while let Some(message) = stream.next().await.transpose()? {
294311
match message {
295312
ResponseEvent::OutputTextDelta(delta) => result.push_str(&delta),
@@ -301,7 +318,12 @@ mod job {
301318
result.push_str(&text);
302319
}
303320
}
304-
ResponseEvent::Completed { .. } => break,
321+
ResponseEvent::Completed {
322+
token_usage: usage, ..
323+
} => {
324+
token_usage = usage;
325+
break;
326+
}
305327
_ => {}
306328
}
307329
}
@@ -310,7 +332,7 @@ mod job {
310332
output.raw_memory = redact_secrets(output.raw_memory);
311333
output.rollout_summary = redact_secrets(output.rollout_summary);
312334

313-
Ok(output)
335+
Ok((output, token_usage))
314336
}
315337

316338
mod result {
@@ -339,19 +361,19 @@ mod job {
339361
session: &Session,
340362
thread_id: codex_protocol::ThreadId,
341363
ownership_token: &str,
342-
) -> PhaseOneJobOutcome {
364+
) -> JobOutcome {
343365
let Some(state_db) = session.services.state_db.as_deref() else {
344-
return PhaseOneJobOutcome::Failed;
366+
return JobOutcome::Failed;
345367
};
346368

347369
if state_db
348370
.mark_stage1_job_succeeded_no_output(thread_id, ownership_token)
349371
.await
350372
.unwrap_or(false)
351373
{
352-
PhaseOneJobOutcome::SucceededNoOutput
374+
JobOutcome::SucceededNoOutput
353375
} else {
354-
PhaseOneJobOutcome::Failed
376+
JobOutcome::Failed
355377
}
356378
}
357379

@@ -362,9 +384,9 @@ mod job {
362384
source_updated_at: i64,
363385
raw_memory: &str,
364386
rollout_summary: &str,
365-
) -> PhaseOneJobOutcome {
387+
) -> JobOutcome {
366388
let Some(state_db) = session.services.state_db.as_deref() else {
367-
return PhaseOneJobOutcome::Failed;
389+
return JobOutcome::Failed;
368390
};
369391

370392
if state_db
@@ -378,9 +400,9 @@ mod job {
378400
.await
379401
.unwrap_or(false)
380402
{
381-
PhaseOneJobOutcome::SucceededWithOutput
403+
JobOutcome::SucceededWithOutput
382404
} else {
383-
PhaseOneJobOutcome::Failed
405+
JobOutcome::Failed
384406
}
385407
}
386408
}
@@ -407,29 +429,37 @@ mod job {
407429
}
408430
}
409431

410-
fn count_outcomes(outcomes: Vec<PhaseOneJobOutcome>) -> PhaseOneOutcomeCounts {
411-
let succeeded_with_output = outcomes
412-
.iter()
413-
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::SucceededWithOutput))
414-
.count();
415-
let succeeded_no_output = outcomes
416-
.iter()
417-
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::SucceededNoOutput))
418-
.count();
419-
let failed = outcomes
420-
.iter()
421-
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::Failed))
422-
.count();
432+
fn aggregate_stats(outcomes: Vec<JobResult>) -> Stats {
433+
let claimed = outcomes.len();
434+
let mut succeeded_with_output = 0;
435+
let mut succeeded_no_output = 0;
436+
let mut failed = 0;
437+
let mut total_token_usage = TokenUsage::default();
438+
let mut has_token_usage = false;
439+
440+
for outcome in outcomes {
441+
match outcome.outcome {
442+
JobOutcome::SucceededWithOutput => succeeded_with_output += 1,
443+
JobOutcome::SucceededNoOutput => succeeded_no_output += 1,
444+
JobOutcome::Failed => failed += 1,
445+
}
423446

424-
PhaseOneOutcomeCounts {
425-
claimed: outcomes.len(),
447+
if let Some(token_usage) = outcome.token_usage {
448+
total_token_usage.add_assign(&token_usage);
449+
has_token_usage = true;
450+
}
451+
}
452+
453+
Stats {
454+
claimed,
426455
succeeded_with_output,
427456
succeeded_no_output,
428457
failed,
458+
total_token_usage: has_token_usage.then_some(total_token_usage),
429459
}
430460
}
431461

432-
fn emit_metrics(session: &Session, counts: &PhaseOneOutcomeCounts) {
462+
fn emit_metrics(session: &Session, counts: &Stats) {
433463
if counts.claimed > 0 {
434464
session.services.otel_manager.counter(
435465
metrics::MEMORY_PHASE_ONE_JOBS,
@@ -463,4 +493,102 @@ fn emit_metrics(session: &Session, counts: &PhaseOneOutcomeCounts) {
463493
&[("status", "failed")],
464494
);
465495
}
496+
if let Some(token_usage) = counts.total_token_usage.as_ref() {
497+
session.services.otel_manager.histogram(
498+
metrics::MEMORY_PHASE_ONE_TOKEN_USAGE,
499+
token_usage.total_tokens.max(0),
500+
&[("token_type", "total")],
501+
);
502+
session.services.otel_manager.histogram(
503+
metrics::MEMORY_PHASE_ONE_TOKEN_USAGE,
504+
token_usage.input_tokens.max(0),
505+
&[("token_type", "input")],
506+
);
507+
session.services.otel_manager.histogram(
508+
metrics::MEMORY_PHASE_ONE_TOKEN_USAGE,
509+
token_usage.cached_input(),
510+
&[("token_type", "cached_input")],
511+
);
512+
session.services.otel_manager.histogram(
513+
metrics::MEMORY_PHASE_ONE_TOKEN_USAGE,
514+
token_usage.output_tokens.max(0),
515+
&[("token_type", "output")],
516+
);
517+
session.services.otel_manager.histogram(
518+
metrics::MEMORY_PHASE_ONE_TOKEN_USAGE,
519+
token_usage.reasoning_output_tokens.max(0),
520+
&[("token_type", "reasoning_output")],
521+
);
522+
}
523+
}
524+
525+
#[cfg(test)]
526+
mod tests {
527+
use super::JobOutcome;
528+
use super::JobResult;
529+
use super::aggregate_stats;
530+
use codex_protocol::protocol::TokenUsage;
531+
use pretty_assertions::assert_eq;
532+
533+
#[test]
534+
fn count_outcomes_sums_token_usage_across_all_jobs() {
535+
let counts = aggregate_stats(vec![
536+
JobResult {
537+
outcome: JobOutcome::SucceededWithOutput,
538+
token_usage: Some(TokenUsage {
539+
input_tokens: 10,
540+
cached_input_tokens: 2,
541+
output_tokens: 3,
542+
reasoning_output_tokens: 1,
543+
total_tokens: 13,
544+
}),
545+
},
546+
JobResult {
547+
outcome: JobOutcome::SucceededNoOutput,
548+
token_usage: Some(TokenUsage {
549+
input_tokens: 7,
550+
cached_input_tokens: 1,
551+
output_tokens: 2,
552+
reasoning_output_tokens: 0,
553+
total_tokens: 9,
554+
}),
555+
},
556+
JobResult {
557+
outcome: JobOutcome::Failed,
558+
token_usage: None,
559+
},
560+
]);
561+
562+
assert_eq!(counts.claimed, 3);
563+
assert_eq!(counts.succeeded_with_output, 1);
564+
assert_eq!(counts.succeeded_no_output, 1);
565+
assert_eq!(counts.failed, 1);
566+
assert_eq!(
567+
counts.total_token_usage,
568+
Some(TokenUsage {
569+
input_tokens: 17,
570+
cached_input_tokens: 3,
571+
output_tokens: 5,
572+
reasoning_output_tokens: 1,
573+
total_tokens: 22,
574+
})
575+
);
576+
}
577+
578+
#[test]
579+
fn count_outcomes_keeps_usage_empty_when_no_job_reports_it() {
580+
let counts = aggregate_stats(vec![
581+
JobResult {
582+
outcome: JobOutcome::SucceededWithOutput,
583+
token_usage: None,
584+
},
585+
JobResult {
586+
outcome: JobOutcome::Failed,
587+
token_usage: None,
588+
},
589+
]);
590+
591+
assert_eq!(counts.claimed, 2);
592+
assert_eq!(counts.total_token_usage, None);
593+
}
466594
}

0 commit comments

Comments
 (0)