diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 6d583e46ed7..ce8b35849cf 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -156,6 +156,10 @@ use crate::error::Result as CodexResult; use crate::exec::StreamOutput; use codex_config::CONFIG_TOML_FILE; +mod rollout_reconstruction; +#[cfg(test)] +mod rollout_reconstruction_tests; + #[derive(Debug, PartialEq)] pub enum SteerInputError { NoActiveTurn(Vec), @@ -1720,21 +1724,14 @@ impl Session { let rollout_items = resumed_history.history; let restored_tool_selection = Self::extract_mcp_tool_selection_from_rollout(&rollout_items); - let (previous_regular_turn_context_item, crossed_compaction_after_turn) = - Self::last_rollout_regular_turn_context_lookup(&rollout_items); - let previous_model = - previous_regular_turn_context_item.map(|ctx| ctx.model.clone()); + let reconstructed_rollout = self + .reconstruct_history_from_rollout(&turn_context, &rollout_items) + .await; + let previous_model = reconstructed_rollout.previous_model.clone(); let curr = turn_context.model_info.slug.as_str(); - let reference_context_item = if !crossed_compaction_after_turn { - previous_regular_turn_context_item.cloned() - } else { - // Keep the baseline empty when compaction may have stripped the referenced - // context diffs so the first resumed regular turn fully reinjects context. - None - }; { let mut state = self.state.lock().await; - state.set_reference_context_item(reference_context_item); + state.set_reference_context_item(reconstructed_rollout.reference_context_item); } self.set_previous_model(previous_model.clone()).await; @@ -1754,9 +1751,7 @@ impl Session { } // Always add response items to conversation history - let reconstructed_history = self - .reconstruct_history_from_rollout(&turn_context, &rollout_items) - .await; + let reconstructed_history = reconstructed_rollout.history; if !reconstructed_history.is_empty() { self.record_into_history(&reconstructed_history, &turn_context) .await; @@ -1779,16 +1774,14 @@ impl Session { InitialHistory::Forked(rollout_items) => { let restored_tool_selection = Self::extract_mcp_tool_selection_from_rollout(&rollout_items); - let (previous_regular_turn_context_item, _) = - Self::last_rollout_regular_turn_context_lookup(&rollout_items); - let previous_model = - previous_regular_turn_context_item.map(|ctx| ctx.model.clone()); + let reconstructed_rollout = self + .reconstruct_history_from_rollout(&turn_context, &rollout_items) + .await; + let previous_model = reconstructed_rollout.previous_model.clone(); self.set_previous_model(previous_model).await; // Always add response items to conversation history - let reconstructed_history = self - .reconstruct_history_from_rollout(&turn_context, &rollout_items) - .await; + let reconstructed_history = reconstructed_rollout.history; if !reconstructed_history.is_empty() { self.record_into_history(&reconstructed_history, &turn_context) .await; @@ -1827,150 +1820,6 @@ impl Session { } } - /// Returns `(last_turn_context_item, crossed_compaction_after_turn)` from the - /// rollback-adjusted rollout view. - /// - /// This relies on the invariant that only regular turns persist `TurnContextItem`. - /// `ThreadRolledBack` markers are applied so resume/fork uses the post-rollback history view. - /// - /// Returns `(None, false)` when no persisted `TurnContextItem` can be found. - /// - /// Older/minimal rollouts may only contain `RolloutItem::TurnContext` entries without turn - /// lifecycle events. In that case we fall back to the last `TurnContextItem` (plus whether a - /// later `Compacted` item appears in rollout order). - // TODO(ccunningham): Simplify this lookup by sharing rollout traversal/rollback application - // with `reconstruct_history_from_rollout` so resume/fork baseline hydration does not need a - // second bespoke rollout scan. - fn last_rollout_regular_turn_context_lookup( - rollout_items: &[RolloutItem], - ) -> (Option<&TurnContextItem>, bool) { - // Reverse scan over rollout items. `ThreadRolledBack(num_turns)` is naturally handled by - // skipping the next `num_turns` completed turn spans we encounter while walking backward. - // - // "Active turn" here means: we have seen `TurnComplete`/`TurnAborted` and are currently - // scanning backward through that completed turn until its matching `TurnStarted`. - let mut turns_to_skip_due_to_rollback = 0usize; - let mut saw_surviving_compaction_after_candidate = false; - let mut saw_turn_lifecycle_event = false; - let mut active_turn_id: Option<&str> = None; - let mut active_turn_saw_user_message = false; - let mut active_turn_context: Option<&TurnContextItem> = None; - let mut active_turn_contains_compaction = false; - - for item in rollout_items.iter().rev() { - match item { - RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => { - // Rollbacks count completed turns, not `TurnContextItem`s. We must continue - // ignoring all items inside each skipped turn until we reach its - // corresponding `TurnStarted`. - let num_turns = usize::try_from(rollback.num_turns).unwrap_or(usize::MAX); - turns_to_skip_due_to_rollback = - turns_to_skip_due_to_rollback.saturating_add(num_turns); - } - RolloutItem::EventMsg(EventMsg::TurnComplete(event)) => { - saw_turn_lifecycle_event = true; - // Enter the reverse "turn span" for this completed turn. - active_turn_id = Some(event.turn_id.as_str()); - active_turn_saw_user_message = false; - active_turn_context = None; - active_turn_contains_compaction = false; - } - RolloutItem::EventMsg(EventMsg::TurnAborted(event)) => { - saw_turn_lifecycle_event = true; - // Same reverse-turn handling as `TurnComplete`. Some aborted turns may not - // have a turn id; in that case we cannot match `TurnContextItem`s to them. - active_turn_id = event.turn_id.as_deref(); - active_turn_saw_user_message = false; - active_turn_context = None; - active_turn_contains_compaction = false; - } - RolloutItem::EventMsg(EventMsg::UserMessage(_)) => { - if active_turn_id.is_some() { - active_turn_saw_user_message = true; - } - } - RolloutItem::EventMsg(EventMsg::TurnStarted(event)) => { - saw_turn_lifecycle_event = true; - if active_turn_id == Some(event.turn_id.as_str()) { - let active_turn_is_rolled_back = - active_turn_saw_user_message && turns_to_skip_due_to_rollback > 0; - if active_turn_is_rolled_back { - // `ThreadRolledBack(num_turns)` counts user turns, so only consume a - // skip once we've confirmed this reverse-scanned turn span contains a - // user message. Standalone task turns must not consume rollback skips. - turns_to_skip_due_to_rollback -= 1; - } - if !active_turn_is_rolled_back { - if let Some(context_item) = active_turn_context { - return ( - Some(context_item), - saw_surviving_compaction_after_candidate, - ); - } - // No `TurnContextItem` in this surviving turn; keep scanning older - // turns, but remember if this turn compacted so the eventual - // candidate reports "compaction happened after it". - if active_turn_contains_compaction { - saw_surviving_compaction_after_candidate = true; - } - } - active_turn_id = None; - active_turn_saw_user_message = false; - active_turn_context = None; - active_turn_contains_compaction = false; - } - } - RolloutItem::TurnContext(ctx) => { - // Capture the latest turn context seen in this reverse-scanned turn span. If - // the turn later proves to be rolled back, we discard it when we hit the - // matching `TurnStarted`. Older rollouts may have lifecycle events but omit - // `TurnContextItem.turn_id`; accept those as belonging to the active turn - // span for resume/fork hydration. - if let Some(active_id) = active_turn_id - && ctx - .turn_id - .as_deref() - .is_none_or(|turn_id| turn_id == active_id) - { - // Reverse scan sees the latest `TurnContextItem` for the turn first. - active_turn_context.get_or_insert(ctx); - } - } - RolloutItem::Compacted(_) => { - if active_turn_id.is_some() { - // Compaction inside the currently scanned turn is only "after" the - // eventual candidate if this turn has no `TurnContextItem` and we keep - // scanning into older turns. - active_turn_contains_compaction = true; - } else { - saw_surviving_compaction_after_candidate = true; - } - } - _ => {} - } - } - - // Legacy/minimal rollouts may only persist `TurnContextItem`/`Compacted` without turn - // lifecycle events. Fall back to the last `TurnContextItem` in rollout order so - // resume/fork can still hydrate `previous_model` and detect compaction-after-baseline. - if !saw_turn_lifecycle_event { - let mut saw_compaction_after_last_turn_context = false; - for item in rollout_items.iter().rev() { - match item { - RolloutItem::Compacted(_) => { - saw_compaction_after_last_turn_context = true; - } - RolloutItem::TurnContext(ctx) => { - return (Some(ctx), saw_compaction_after_last_turn_context); - } - _ => {} - } - } - } - - (None, false) - } - fn last_token_info_from_rollout(rollout_items: &[RolloutItem]) -> Option { rollout_items.iter().rev().find_map(|item| match item { RolloutItem::EventMsg(EventMsg::TokenCount(ev)) => ev.info.clone(), @@ -2879,42 +2728,6 @@ impl Session { self.send_raw_response_items(turn_context, items).await; } - async fn reconstruct_history_from_rollout( - &self, - turn_context: &TurnContext, - rollout_items: &[RolloutItem], - ) -> Vec { - let mut history = ContextManager::new(); - for item in rollout_items { - match item { - RolloutItem::ResponseItem(response_item) => { - history.record_items( - std::iter::once(response_item), - turn_context.truncation_policy, - ); - } - RolloutItem::Compacted(compacted) => { - if let Some(replacement) = &compacted.replacement_history { - history.replace(replacement.clone()); - } else { - let user_messages = collect_user_messages(history.raw_items()); - let rebuilt = compact::build_compacted_history( - self.build_initial_context(turn_context, None).await, - &user_messages, - &compacted.message, - ); - history.replace(rebuilt); - } - } - RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => { - history.drop_last_n_user_turns(rollback.num_turns); - } - _ => {} - } - } - history.raw_items().to_vec() - } - /// Append ResponseItems to the in-memory conversation history only. pub(crate) async fn record_into_history( &self, @@ -3158,7 +2971,8 @@ impl Session { state.reference_context_item() } - /// Persist the latest turn context snapshot and emit any required model-visible context updates. + /// Persist the latest turn context snapshot for the first real user turn and for + /// steady-state turns that emit model-visible context updates. /// /// When the reference snapshot is missing, this injects full initial context. Otherwise, it /// emits only settings diff items. @@ -3166,15 +2980,19 @@ impl Session { /// If full context is injected and a model switch occurred, this prepends the /// `` developer message so model-specific instructions are not lost. /// - /// Invariant: this is the only runtime path that writes a non-`None` - /// `reference_context_item`. Non-regular tasks intentionally do not update that - /// baseline; `reference_context_item` tracks the latest regular model turn. + /// This is the normal runtime path that establishes a new `reference_context_item`. + /// Mid-turn compaction is the other path that can re-establish that baseline when it + /// reinjects full initial context into replacement history. Other non-regular tasks + /// intentionally do not update the baseline. pub(crate) async fn record_context_updates_and_set_reference_context_item( &self, turn_context: &TurnContext, previous_user_turn_model: Option<&str>, ) { - let reference_context_item = self.reference_context_item().await; + let reference_context_item = { + let state = self.state.lock().await; + state.reference_context_item() + }; let should_inject_full_context = reference_context_item.is_none(); let context_items = if should_inject_full_context { self.build_initial_context(turn_context, previous_user_turn_model) @@ -3187,13 +3005,20 @@ impl Session { turn_context, ) }; + let turn_context_item = turn_context.to_turn_context_item(); if !context_items.is_empty() { self.record_conversation_items(turn_context, &context_items) .await; } + // Persist one `TurnContextItem` per real user turn so resume/lazy replay can recover the + // latest durable baseline even when this turn emitted no model-visible context diffs. + self.persist_rollout_items(&[RolloutItem::TurnContext(turn_context_item.clone())]) + .await; + // Advance the in-memory diff baseline even when this turn emitted no model-visible + // context items. This keeps later runtime diffing aligned with the current turn state. let mut state = self.state.lock().await; - state.set_reference_context_item(Some(turn_context.to_turn_context_item())); + state.set_reference_context_item(Some(turn_context_item)); } pub(crate) async fn update_token_usage_info( @@ -6203,11 +6028,6 @@ async fn try_run_sampling_request( prompt: &Prompt, cancellation_token: CancellationToken, ) -> CodexResult { - // Persist one TurnContext marker per sampling request (not just per user turn) so rollout - // analysis can reconstruct API-turn boundaries. `run_turn` persists model-visible context - // diffs/full reinjection earlier in the same regular turn before reaching this path. - let rollout_item = RolloutItem::TurnContext(turn_context.to_turn_context_item()); - feedback_tags!( model = turn_context.model_info.slug.clone(), approval_policy = turn_context.approval_policy.value(), @@ -6216,8 +6036,6 @@ async fn try_run_sampling_request( auth_mode = sess.services.auth_manager.auth_mode(), features = sess.features.enabled_features(), ); - - sess.persist_rollout_items(&[rollout_item]).await; let mut stream = client_session .stream( prompt, @@ -6579,6 +6397,10 @@ mod tests { use crate::protocol::TokenCountEvent; use crate::protocol::TokenUsage; use crate::protocol::TokenUsageInfo; + use crate::protocol::UserMessageEvent; + use crate::rollout::policy::EventPersistenceMode; + use crate::rollout::recorder::RolloutRecorder; + use crate::rollout::recorder::RolloutRecorderParams; use crate::state::TaskKind; use crate::tasks::SessionTask; use crate::tasks::SessionTaskContext; @@ -7219,7 +7041,7 @@ mod tests { .reconstruct_history_from_rollout(reconstruction_turn.as_ref(), &rollout_items) .await; - assert_eq!(expected, reconstructed); + assert_eq!(expected, reconstructed.history); } #[tokio::test] @@ -7255,7 +7077,7 @@ mod tests { .reconstruct_history_from_rollout(&turn_context, &rollout_items) .await; - assert_eq!(reconstructed, replacement_history); + assert_eq!(reconstructed.history, replacement_history); } #[tokio::test] @@ -7275,221 +7097,6 @@ mod tests { assert_eq!(expected, history.raw_items()); } - #[tokio::test] - async fn record_initial_history_resumed_hydrates_previous_model() { - let (session, turn_context) = make_session_and_context().await; - let previous_model = "previous-rollout-model"; - let previous_context_item = TurnContextItem { - turn_id: Some(turn_context.sub_id.clone()), - cwd: turn_context.cwd.clone(), - current_date: turn_context.current_date.clone(), - timezone: turn_context.timezone.clone(), - approval_policy: turn_context.approval_policy.value(), - sandbox_policy: turn_context.sandbox_policy.get().clone(), - network: None, - model: previous_model.to_string(), - personality: turn_context.personality, - collaboration_mode: Some(turn_context.collaboration_mode.clone()), - effort: turn_context.reasoning_effort, - summary: turn_context.reasoning_summary, - user_instructions: None, - developer_instructions: None, - final_output_json_schema: None, - truncation_policy: Some(turn_context.truncation_policy.into()), - }; - let rollout_items = vec![RolloutItem::TurnContext(previous_context_item)]; - - session - .record_initial_history(InitialHistory::Resumed(ResumedHistory { - conversation_id: ThreadId::default(), - history: rollout_items, - rollout_path: PathBuf::from("/tmp/resume.jsonl"), - })) - .await; - - assert_eq!( - session.previous_model().await, - Some(previous_model.to_string()) - ); - } - - #[tokio::test] - async fn record_initial_history_resumed_hydrates_previous_model_from_lifecycle_turn_with_missing_turn_context_id() - { - let (session, turn_context) = make_session_and_context().await; - let previous_model = "previous-rollout-model"; - let mut previous_context_item = TurnContextItem { - turn_id: Some(turn_context.sub_id.clone()), - cwd: turn_context.cwd.clone(), - current_date: turn_context.current_date.clone(), - timezone: turn_context.timezone.clone(), - approval_policy: turn_context.approval_policy.value(), - sandbox_policy: turn_context.sandbox_policy.get().clone(), - network: None, - model: previous_model.to_string(), - personality: turn_context.personality, - collaboration_mode: Some(turn_context.collaboration_mode.clone()), - effort: turn_context.reasoning_effort, - summary: turn_context.reasoning_summary, - user_instructions: None, - developer_instructions: None, - final_output_json_schema: None, - truncation_policy: Some(turn_context.truncation_policy.into()), - }; - let turn_id = previous_context_item - .turn_id - .clone() - .expect("turn context should have turn_id"); - previous_context_item.turn_id = None; - - let rollout_items = vec![ - RolloutItem::EventMsg(EventMsg::TurnStarted( - codex_protocol::protocol::TurnStartedEvent { - turn_id: turn_id.clone(), - model_context_window: Some(128_000), - collaboration_mode_kind: ModeKind::Default, - }, - )), - RolloutItem::EventMsg(EventMsg::UserMessage( - codex_protocol::protocol::UserMessageEvent { - message: "seed".to_string(), - images: None, - local_images: Vec::new(), - text_elements: Vec::new(), - }, - )), - RolloutItem::TurnContext(previous_context_item), - RolloutItem::EventMsg(EventMsg::TurnComplete( - codex_protocol::protocol::TurnCompleteEvent { - turn_id, - last_agent_message: None, - }, - )), - ]; - - session - .record_initial_history(InitialHistory::Resumed(ResumedHistory { - conversation_id: ThreadId::default(), - history: rollout_items, - rollout_path: PathBuf::from("/tmp/resume.jsonl"), - })) - .await; - - assert_eq!( - session.previous_model().await, - Some(previous_model.to_string()) - ); - } - - #[tokio::test] - async fn record_initial_history_resumed_rollback_skips_only_user_turns() { - let (session, turn_context) = make_session_and_context().await; - let previous_context_item = turn_context.to_turn_context_item(); - let user_turn_id = previous_context_item - .turn_id - .clone() - .expect("turn context should have turn_id"); - let standalone_turn_id = "standalone-task-turn".to_string(); - let rollout_items = vec![ - RolloutItem::EventMsg(EventMsg::TurnStarted( - codex_protocol::protocol::TurnStartedEvent { - turn_id: user_turn_id.clone(), - model_context_window: Some(128_000), - collaboration_mode_kind: ModeKind::Default, - }, - )), - RolloutItem::EventMsg(EventMsg::UserMessage( - codex_protocol::protocol::UserMessageEvent { - message: "seed".to_string(), - images: None, - local_images: Vec::new(), - text_elements: Vec::new(), - }, - )), - RolloutItem::TurnContext(previous_context_item), - RolloutItem::EventMsg(EventMsg::TurnComplete( - codex_protocol::protocol::TurnCompleteEvent { - turn_id: user_turn_id, - last_agent_message: None, - }, - )), - // Standalone task turn (no UserMessage) should not consume rollback skips. - RolloutItem::EventMsg(EventMsg::TurnStarted( - codex_protocol::protocol::TurnStartedEvent { - turn_id: standalone_turn_id.clone(), - model_context_window: Some(128_000), - collaboration_mode_kind: ModeKind::Default, - }, - )), - RolloutItem::EventMsg(EventMsg::TurnComplete( - codex_protocol::protocol::TurnCompleteEvent { - turn_id: standalone_turn_id, - last_agent_message: None, - }, - )), - RolloutItem::EventMsg(EventMsg::ThreadRolledBack( - codex_protocol::protocol::ThreadRolledBackEvent { num_turns: 1 }, - )), - ]; - - session - .record_initial_history(InitialHistory::Resumed(ResumedHistory { - conversation_id: ThreadId::default(), - history: rollout_items, - rollout_path: PathBuf::from("/tmp/resume.jsonl"), - })) - .await; - - assert_eq!(session.previous_model().await, None); - assert!(session.reference_context_item().await.is_none()); - } - - #[tokio::test] - async fn record_initial_history_resumed_seeds_reference_context_item_without_compaction() { - let (session, turn_context) = make_session_and_context().await; - let previous_context_item = turn_context.to_turn_context_item(); - let rollout_items = vec![RolloutItem::TurnContext(previous_context_item.clone())]; - - session - .record_initial_history(InitialHistory::Resumed(ResumedHistory { - conversation_id: ThreadId::default(), - history: rollout_items, - rollout_path: PathBuf::from("/tmp/resume.jsonl"), - })) - .await; - - assert_eq!( - serde_json::to_value(session.reference_context_item().await) - .expect("serialize seeded reference context item"), - serde_json::to_value(Some(previous_context_item)) - .expect("serialize expected reference context item") - ); - } - - #[tokio::test] - async fn record_initial_history_resumed_does_not_seed_reference_context_item_after_compaction() - { - let (session, turn_context) = make_session_and_context().await; - let previous_context_item = turn_context.to_turn_context_item(); - let rollout_items = vec![ - RolloutItem::TurnContext(previous_context_item), - RolloutItem::Compacted(CompactedItem { - message: String::new(), - replacement_history: Some(Vec::new()), - }), - ]; - - session - .record_initial_history(InitialHistory::Resumed(ResumedHistory { - conversation_id: ThreadId::default(), - history: rollout_items, - rollout_path: PathBuf::from("/tmp/resume.jsonl"), - })) - .await; - - assert!(session.reference_context_item().await.is_none()); - } - #[tokio::test] async fn resumed_history_injects_initial_context_on_first_context_update_only() { let (session, turn_context) = make_session_and_context().await; @@ -7703,7 +7310,34 @@ mod tests { final_output_json_schema: None, truncation_policy: Some(turn_context.truncation_policy.into()), }; - let rollout_items = vec![RolloutItem::TurnContext(previous_context_item)]; + let turn_id = previous_context_item + .turn_id + .clone() + .expect("turn context should have turn_id"); + let rollout_items = vec![ + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: turn_id.clone(), + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "forked seed".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + )), + RolloutItem::TurnContext(previous_context_item), + RolloutItem::EventMsg(EventMsg::TurnComplete( + codex_protocol::protocol::TurnCompleteEvent { + turn_id, + last_agent_message: None, + }, + )), + ]; session .record_initial_history(InitialHistory::Forked(rollout_items)) @@ -8947,6 +8581,84 @@ mod tests { assert_eq!(history.raw_items().to_vec(), expected_history); } + #[tokio::test] + async fn record_context_updates_and_set_reference_context_item_persists_baseline_without_emitting_diffs() + { + let (session, previous_context) = make_session_and_context().await; + let next_model = if previous_context.model_info.slug == "gpt-5.1" { + "gpt-5" + } else { + "gpt-5.1" + }; + let turn_context = previous_context + .with_model(next_model.to_string(), &session.services.models_manager) + .await; + let previous_context_item = previous_context.to_turn_context_item(); + { + let mut state = session.state.lock().await; + state.set_reference_context_item(Some(previous_context_item.clone())); + } + let config = session.get_config().await; + let recorder = RolloutRecorder::new( + config.as_ref(), + RolloutRecorderParams::new( + ThreadId::default(), + None, + SessionSource::Exec, + BaseInstructions::default(), + Vec::new(), + EventPersistenceMode::Limited, + ), + None, + None, + ) + .await + .expect("create rollout recorder"); + let rollout_path = recorder.rollout_path().to_path_buf(); + { + let mut rollout = session.services.rollout.lock().await; + *rollout = Some(recorder); + } + + let update_items = + session.build_settings_update_items(Some(&previous_context_item), None, &turn_context); + assert_eq!(update_items, Vec::new()); + + session + .record_context_updates_and_set_reference_context_item(&turn_context, None) + .await; + + assert_eq!( + session.clone_history().await.raw_items().to_vec(), + Vec::new() + ); + assert_eq!( + serde_json::to_value(session.reference_context_item().await) + .expect("serialize current context item"), + serde_json::to_value(Some(turn_context.to_turn_context_item())) + .expect("serialize expected context item") + ); + session.ensure_rollout_materialized().await; + session.flush_rollout().await; + + let InitialHistory::Resumed(resumed) = RolloutRecorder::get_rollout_history(&rollout_path) + .await + .expect("read rollout history") + else { + panic!("expected resumed rollout history"); + }; + let persisted_turn_context = resumed.history.iter().find_map(|item| match item { + RolloutItem::TurnContext(ctx) => Some(ctx.clone()), + _ => None, + }); + assert_eq!( + serde_json::to_value(persisted_turn_context) + .expect("serialize persisted turn context item"), + serde_json::to_value(Some(turn_context.to_turn_context_item())) + .expect("serialize expected turn context item") + ); + } + #[tokio::test] async fn build_initial_context_prepends_model_switch_message() { let (session, turn_context) = make_session_and_context().await; @@ -8965,6 +8677,83 @@ mod tests { assert!(text.contains("")); } + #[tokio::test] + async fn record_context_updates_and_set_reference_context_item_persists_full_reinjection_to_rollout() + { + let (session, previous_context) = make_session_and_context().await; + let next_model = if previous_context.model_info.slug == "gpt-5.1" { + "gpt-5" + } else { + "gpt-5.1" + }; + let turn_context = previous_context + .with_model(next_model.to_string(), &session.services.models_manager) + .await; + let config = session.get_config().await; + let recorder = RolloutRecorder::new( + config.as_ref(), + RolloutRecorderParams::new( + ThreadId::default(), + None, + SessionSource::Exec, + BaseInstructions::default(), + Vec::new(), + EventPersistenceMode::Limited, + ), + None, + None, + ) + .await + .expect("create rollout recorder"); + let rollout_path = recorder.rollout_path().to_path_buf(); + { + let mut rollout = session.services.rollout.lock().await; + *rollout = Some(recorder); + } + + session + .persist_rollout_items(&[RolloutItem::EventMsg(EventMsg::UserMessage( + UserMessageEvent { + message: "seed rollout".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + ))]) + .await; + { + let mut state = session.state.lock().await; + state.set_reference_context_item(None); + } + + session + .record_context_updates_and_set_reference_context_item( + &turn_context, + Some(previous_context.model_info.slug.as_str()), + ) + .await; + session.ensure_rollout_materialized().await; + session.flush_rollout().await; + + let InitialHistory::Resumed(resumed) = RolloutRecorder::get_rollout_history(&rollout_path) + .await + .expect("read rollout history") + else { + panic!("expected resumed rollout history"); + }; + let persisted_turn_context = resumed.history.iter().find_map(|item| match item { + RolloutItem::TurnContext(ctx) => Some(ctx.clone()), + _ => None, + }); + + assert_eq!( + serde_json::to_value(persisted_turn_context) + .expect("serialize persisted turn context item"), + serde_json::to_value(Some(turn_context.to_turn_context_item())) + .expect("serialize expected turn context item") + ); + } + #[tokio::test] async fn run_user_shell_command_does_not_set_reference_context_item() { let (session, _turn_context, rx) = make_session_and_context_with_rx().await; diff --git a/codex-rs/core/src/codex/rollout_reconstruction.rs b/codex-rs/core/src/codex/rollout_reconstruction.rs new file mode 100644 index 00000000000..d2fa9e2a3e0 --- /dev/null +++ b/codex-rs/core/src/codex/rollout_reconstruction.rs @@ -0,0 +1,278 @@ +use super::*; + +// Return value of `Session::reconstruct_history_from_rollout`, bundling the rebuilt history with +// the resume/fork hydration metadata derived from the same replay. +#[derive(Debug)] +pub(super) struct RolloutReconstruction { + pub(super) history: Vec, + pub(super) previous_model: Option, + pub(super) reference_context_item: Option, +} + +#[derive(Debug, Default)] +enum TurnReferenceContextItem { + /// No `TurnContextItem` has been seen for this replay span yet. + /// + /// This differs from `Cleared`: `NeverSet` means there is no evidence this turn ever + /// established a baseline, while `Cleared` means a baseline existed and a later compaction + /// invalidated it. Only the latter must emit an explicit clearing segment for resume/fork + /// hydration. + #[default] + NeverSet, + /// A previously established baseline was invalidated by later compaction. + Cleared, + /// The latest baseline established by this replay span. + Latest(Box), +} + +#[derive(Debug, Default)] +struct ActiveReplaySegment<'a> { + turn_id: Option, + counts_as_user_turn: bool, + previous_model: Option, + reference_context_item: TurnReferenceContextItem, + base_replacement_history: Option<&'a [ResponseItem]>, +} + +fn turn_ids_are_compatible(active_turn_id: Option<&str>, item_turn_id: Option<&str>) -> bool { + active_turn_id + .is_none_or(|turn_id| item_turn_id.is_none_or(|item_turn_id| item_turn_id == turn_id)) +} + +fn finalize_active_segment<'a>( + active_segment: ActiveReplaySegment<'a>, + base_replacement_history: &mut Option<&'a [ResponseItem]>, + previous_model: &mut Option, + reference_context_item: &mut TurnReferenceContextItem, + pending_rollback_turns: &mut usize, +) { + // Thread rollback always targets the newest surviving user turns, so consume that + // skip budget before letting this segment contribute metadata or a compaction base. + if *pending_rollback_turns > 0 { + if active_segment.counts_as_user_turn { + *pending_rollback_turns -= 1; + } + return; + } + + // A surviving replacement-history checkpoint is a complete history base. Once we + // know the newest surviving one, older rollout items do not affect rebuilt history. + if base_replacement_history.is_none() + && let Some(segment_base_replacement_history) = active_segment.base_replacement_history + { + *base_replacement_history = Some(segment_base_replacement_history); + } + + // `previous_model` comes from the newest surviving user turn that established one. + if previous_model.is_none() && active_segment.counts_as_user_turn { + *previous_model = active_segment.previous_model; + } + + // `reference_context_item` comes from the newest surviving user turn baseline, or + // from a surviving compaction that explicitly cleared that baseline. + if matches!(reference_context_item, TurnReferenceContextItem::NeverSet) + && (active_segment.counts_as_user_turn + || matches!( + active_segment.reference_context_item, + TurnReferenceContextItem::Cleared + )) + { + *reference_context_item = active_segment.reference_context_item; + } +} + +impl Session { + pub(super) async fn reconstruct_history_from_rollout( + &self, + turn_context: &TurnContext, + rollout_items: &[RolloutItem], + ) -> RolloutReconstruction { + // Replay metadata should already match the shape of the future lazy reverse loader, even + // while history materialization still uses an eager bridge. Scan newest-to-oldest, + // stopping once a surviving replacement-history checkpoint and the required resume metadata + // are both known; then replay only the buffered surviving tail forward to preserve exact + // history semantics. + let mut base_replacement_history: Option<&[ResponseItem]> = None; + let mut previous_model = None; + let mut reference_context_item = TurnReferenceContextItem::NeverSet; + // Rollback is "drop the newest N user turns". While scanning in reverse, that becomes + // "skip the next N user-turn segments we finalize". + let mut pending_rollback_turns = 0usize; + // Borrowed suffix of rollout items newer than the newest surviving replacement-history + // checkpoint. If no such checkpoint exists, this remains the full rollout. + let mut rollout_suffix = rollout_items; + // Reverse replay accumulates rollout items into the newest in-progress turn segment until + // we hit its matching `TurnStarted`, at which point the segment can be finalized. + let mut active_segment: Option> = None; + + for (index, item) in rollout_items.iter().enumerate().rev() { + match item { + RolloutItem::Compacted(compacted) => { + let active_segment = + active_segment.get_or_insert_with(ActiveReplaySegment::default); + // Looking backward, compaction clears any older baseline unless a newer + // `TurnContextItem` in this same segment has already re-established it. + if matches!( + active_segment.reference_context_item, + TurnReferenceContextItem::NeverSet + ) { + active_segment.reference_context_item = TurnReferenceContextItem::Cleared; + } + if active_segment.base_replacement_history.is_none() + && let Some(replacement_history) = &compacted.replacement_history + { + active_segment.base_replacement_history = Some(replacement_history); + rollout_suffix = &rollout_items[index + 1..]; + } + } + RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => { + pending_rollback_turns = pending_rollback_turns + .saturating_add(usize::try_from(rollback.num_turns).unwrap_or(usize::MAX)); + } + RolloutItem::EventMsg(EventMsg::TurnComplete(event)) => { + let active_segment = + active_segment.get_or_insert_with(ActiveReplaySegment::default); + // Reverse replay often sees `TurnComplete` before any turn-scoped metadata. + // Capture the turn id early so later `TurnContext` / abort items can match it. + if active_segment.turn_id.is_none() { + active_segment.turn_id = Some(event.turn_id.clone()); + } + } + RolloutItem::EventMsg(EventMsg::TurnAborted(event)) => { + if let Some(active_segment) = active_segment.as_mut() { + if active_segment.turn_id.is_none() + && let Some(turn_id) = &event.turn_id + { + active_segment.turn_id = Some(turn_id.clone()); + } + } else if let Some(turn_id) = &event.turn_id { + active_segment = Some(ActiveReplaySegment { + turn_id: Some(turn_id.clone()), + ..Default::default() + }); + } + } + RolloutItem::EventMsg(EventMsg::UserMessage(_)) => { + let active_segment = + active_segment.get_or_insert_with(ActiveReplaySegment::default); + active_segment.counts_as_user_turn = true; + } + RolloutItem::TurnContext(ctx) => { + let active_segment = + active_segment.get_or_insert_with(ActiveReplaySegment::default); + // `TurnContextItem` can attach metadata to an existing segment, but only a + // real `UserMessage` event should make the segment count as a user turn. + if active_segment.turn_id.is_none() { + active_segment.turn_id = ctx.turn_id.clone(); + } + if turn_ids_are_compatible( + active_segment.turn_id.as_deref(), + ctx.turn_id.as_deref(), + ) { + active_segment.previous_model = Some(ctx.model.clone()); + if matches!( + active_segment.reference_context_item, + TurnReferenceContextItem::NeverSet + ) { + active_segment.reference_context_item = + TurnReferenceContextItem::Latest(Box::new(ctx.clone())); + } + } + } + RolloutItem::EventMsg(EventMsg::TurnStarted(event)) => { + // `TurnStarted` is the oldest boundary of the active reverse segment. + if active_segment.as_ref().is_some_and(|active_segment| { + turn_ids_are_compatible( + active_segment.turn_id.as_deref(), + Some(event.turn_id.as_str()), + ) + }) && let Some(active_segment) = active_segment.take() + { + finalize_active_segment( + active_segment, + &mut base_replacement_history, + &mut previous_model, + &mut reference_context_item, + &mut pending_rollback_turns, + ); + } + } + RolloutItem::ResponseItem(_) + | RolloutItem::EventMsg(_) + | RolloutItem::SessionMeta(_) => {} + } + + if base_replacement_history.is_some() + && previous_model.is_some() + && !matches!(reference_context_item, TurnReferenceContextItem::NeverSet) + { + // At this point we have both eager resume metadata values and the replacement- + // history base for the surviving tail, so older rollout items cannot affect this + // result. + break; + } + } + + if let Some(active_segment) = active_segment.take() { + finalize_active_segment( + active_segment, + &mut base_replacement_history, + &mut previous_model, + &mut reference_context_item, + &mut pending_rollback_turns, + ); + } + + let initial_context = self.build_initial_context(turn_context, None).await; + let mut history = ContextManager::new(); + if let Some(base_replacement_history) = base_replacement_history { + history.replace(base_replacement_history.to_vec()); + } + // Temporary eager bridge: rebuild exact history semantics from the borrowed rollout suffix + // discovered by reverse replay. This keeps the history result stable while the control + // flow moves toward the future lazy reverse loader design without cloning the full rollout + // in the common no-compaction case. + for item in rollout_suffix { + match item { + RolloutItem::ResponseItem(response_item) => { + history.record_items( + std::iter::once(response_item), + turn_context.truncation_policy, + ); + } + RolloutItem::Compacted(compacted) => { + if let Some(replacement_history) = &compacted.replacement_history { + history.replace(replacement_history.clone()); + } else { + let user_messages = collect_user_messages(history.raw_items()); + let rebuilt = compact::build_compacted_history( + initial_context.clone(), + &user_messages, + &compacted.message, + ); + history.replace(rebuilt); + } + } + RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => { + history.drop_last_n_user_turns(rollback.num_turns); + } + RolloutItem::EventMsg(_) + | RolloutItem::TurnContext(_) + | RolloutItem::SessionMeta(_) => {} + } + } + + let reference_context_item = match reference_context_item { + TurnReferenceContextItem::NeverSet | TurnReferenceContextItem::Cleared => None, + TurnReferenceContextItem::Latest(turn_reference_context_item) => { + Some(*turn_reference_context_item) + } + }; + + RolloutReconstruction { + history: history.raw_items().to_vec(), + previous_model, + reference_context_item, + } + } +} diff --git a/codex-rs/core/src/codex/rollout_reconstruction_tests.rs b/codex-rs/core/src/codex/rollout_reconstruction_tests.rs new file mode 100644 index 00000000000..5765bda145b --- /dev/null +++ b/codex-rs/core/src/codex/rollout_reconstruction_tests.rs @@ -0,0 +1,1168 @@ +use super::*; + +use crate::protocol::CompactedItem; +use crate::protocol::InitialHistory; +use crate::protocol::ResumedHistory; +use codex_protocol::ThreadId; +use codex_protocol::models::ContentItem; +use codex_protocol::models::ResponseItem; +use pretty_assertions::assert_eq; +use std::path::PathBuf; + +fn user_message(text: &str) -> ResponseItem { + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: text.to_string(), + }], + end_turn: None, + phase: None, + } +} + +fn assistant_message(text: &str) -> ResponseItem { + ResponseItem::Message { + id: None, + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: text.to_string(), + }], + end_turn: None, + phase: None, + } +} + +#[tokio::test] +async fn record_initial_history_resumed_bare_turn_context_does_not_hydrate_previous_model() { + let (session, turn_context) = make_session_and_context().await; + let previous_model = "previous-rollout-model"; + let previous_context_item = TurnContextItem { + turn_id: Some(turn_context.sub_id.clone()), + cwd: turn_context.cwd.clone(), + current_date: turn_context.current_date.clone(), + timezone: turn_context.timezone.clone(), + approval_policy: turn_context.approval_policy.value(), + sandbox_policy: turn_context.sandbox_policy.get().clone(), + network: None, + model: previous_model.to_string(), + personality: turn_context.personality, + collaboration_mode: Some(turn_context.collaboration_mode.clone()), + effort: turn_context.reasoning_effort, + summary: turn_context.reasoning_summary, + user_instructions: None, + developer_instructions: None, + final_output_json_schema: None, + truncation_policy: Some(turn_context.truncation_policy.into()), + }; + let rollout_items = vec![RolloutItem::TurnContext(previous_context_item)]; + + session + .record_initial_history(InitialHistory::Resumed(ResumedHistory { + conversation_id: ThreadId::default(), + history: rollout_items, + rollout_path: PathBuf::from("/tmp/resume.jsonl"), + })) + .await; + + assert_eq!(session.previous_model().await, None); + assert!(session.reference_context_item().await.is_none()); +} + +#[tokio::test] +async fn record_initial_history_resumed_hydrates_previous_model_from_lifecycle_turn_with_missing_turn_context_id() + { + let (session, turn_context) = make_session_and_context().await; + let previous_model = "previous-rollout-model"; + let mut previous_context_item = TurnContextItem { + turn_id: Some(turn_context.sub_id.clone()), + cwd: turn_context.cwd.clone(), + current_date: turn_context.current_date.clone(), + timezone: turn_context.timezone.clone(), + approval_policy: turn_context.approval_policy.value(), + sandbox_policy: turn_context.sandbox_policy.get().clone(), + network: None, + model: previous_model.to_string(), + personality: turn_context.personality, + collaboration_mode: Some(turn_context.collaboration_mode.clone()), + effort: turn_context.reasoning_effort, + summary: turn_context.reasoning_summary, + user_instructions: None, + developer_instructions: None, + final_output_json_schema: None, + truncation_policy: Some(turn_context.truncation_policy.into()), + }; + let turn_id = previous_context_item + .turn_id + .clone() + .expect("turn context should have turn_id"); + previous_context_item.turn_id = None; + + let rollout_items = vec![ + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: turn_id.clone(), + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "seed".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + )), + RolloutItem::TurnContext(previous_context_item), + RolloutItem::EventMsg(EventMsg::TurnComplete( + codex_protocol::protocol::TurnCompleteEvent { + turn_id, + last_agent_message: None, + }, + )), + ]; + + session + .record_initial_history(InitialHistory::Resumed(ResumedHistory { + conversation_id: ThreadId::default(), + history: rollout_items, + rollout_path: PathBuf::from("/tmp/resume.jsonl"), + })) + .await; + + assert_eq!( + session.previous_model().await, + Some(previous_model.to_string()) + ); +} + +#[tokio::test] +async fn reconstruct_history_rollback_keeps_history_and_metadata_in_sync_for_completed_turns() { + let (session, turn_context) = make_session_and_context().await; + let first_context_item = turn_context.to_turn_context_item(); + let first_turn_id = first_context_item + .turn_id + .clone() + .expect("turn context should have turn_id"); + let mut rolled_back_context_item = first_context_item.clone(); + rolled_back_context_item.turn_id = Some("rolled-back-turn".to_string()); + rolled_back_context_item.model = "rolled-back-model".to_string(); + let rolled_back_turn_id = rolled_back_context_item + .turn_id + .clone() + .expect("turn context should have turn_id"); + let turn_one_user = user_message("turn 1 user"); + let turn_one_assistant = assistant_message("turn 1 assistant"); + let turn_two_user = user_message("turn 2 user"); + let turn_two_assistant = assistant_message("turn 2 assistant"); + + let rollout_items = vec![ + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: first_turn_id.clone(), + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "turn 1 user".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + )), + RolloutItem::TurnContext(first_context_item.clone()), + RolloutItem::ResponseItem(turn_one_user.clone()), + RolloutItem::ResponseItem(turn_one_assistant.clone()), + RolloutItem::EventMsg(EventMsg::TurnComplete( + codex_protocol::protocol::TurnCompleteEvent { + turn_id: first_turn_id, + last_agent_message: None, + }, + )), + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: rolled_back_turn_id.clone(), + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "turn 2 user".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + )), + RolloutItem::TurnContext(rolled_back_context_item), + RolloutItem::ResponseItem(turn_two_user), + RolloutItem::ResponseItem(turn_two_assistant), + RolloutItem::EventMsg(EventMsg::TurnComplete( + codex_protocol::protocol::TurnCompleteEvent { + turn_id: rolled_back_turn_id, + last_agent_message: None, + }, + )), + RolloutItem::EventMsg(EventMsg::ThreadRolledBack( + codex_protocol::protocol::ThreadRolledBackEvent { num_turns: 1 }, + )), + ]; + + let reconstructed = session + .reconstruct_history_from_rollout(&turn_context, &rollout_items) + .await; + + assert_eq!( + reconstructed.history, + vec![turn_one_user, turn_one_assistant] + ); + assert_eq!( + reconstructed.previous_model, + Some(turn_context.model_info.slug.clone()) + ); + assert_eq!( + serde_json::to_value(reconstructed.reference_context_item) + .expect("serialize reconstructed reference context item"), + serde_json::to_value(Some(first_context_item)) + .expect("serialize expected reference context item") + ); +} + +#[tokio::test] +async fn reconstruct_history_rollback_keeps_history_and_metadata_in_sync_for_incomplete_turn() { + let (session, turn_context) = make_session_and_context().await; + let first_context_item = turn_context.to_turn_context_item(); + let first_turn_id = first_context_item + .turn_id + .clone() + .expect("turn context should have turn_id"); + let incomplete_turn_id = "incomplete-rolled-back-turn".to_string(); + let turn_one_user = user_message("turn 1 user"); + let turn_one_assistant = assistant_message("turn 1 assistant"); + let turn_two_user = user_message("turn 2 user"); + + let rollout_items = vec![ + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: first_turn_id.clone(), + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "turn 1 user".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + )), + RolloutItem::TurnContext(first_context_item.clone()), + RolloutItem::ResponseItem(turn_one_user.clone()), + RolloutItem::ResponseItem(turn_one_assistant.clone()), + RolloutItem::EventMsg(EventMsg::TurnComplete( + codex_protocol::protocol::TurnCompleteEvent { + turn_id: first_turn_id, + last_agent_message: None, + }, + )), + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: incomplete_turn_id, + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "turn 2 user".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + )), + RolloutItem::ResponseItem(turn_two_user), + RolloutItem::EventMsg(EventMsg::ThreadRolledBack( + codex_protocol::protocol::ThreadRolledBackEvent { num_turns: 1 }, + )), + ]; + + let reconstructed = session + .reconstruct_history_from_rollout(&turn_context, &rollout_items) + .await; + + assert_eq!( + reconstructed.history, + vec![turn_one_user, turn_one_assistant] + ); + assert_eq!( + reconstructed.previous_model, + Some(turn_context.model_info.slug.clone()) + ); + assert_eq!( + serde_json::to_value(reconstructed.reference_context_item) + .expect("serialize reconstructed reference context item"), + serde_json::to_value(Some(first_context_item)) + .expect("serialize expected reference context item") + ); +} + +#[tokio::test] +async fn reconstruct_history_rollback_skips_non_user_turns_for_history_and_metadata() { + let (session, turn_context) = make_session_and_context().await; + let first_context_item = turn_context.to_turn_context_item(); + let first_turn_id = first_context_item + .turn_id + .clone() + .expect("turn context should have turn_id"); + let second_turn_id = "rolled-back-user-turn".to_string(); + let standalone_turn_id = "standalone-turn".to_string(); + let turn_one_user = user_message("turn 1 user"); + let turn_one_assistant = assistant_message("turn 1 assistant"); + let turn_two_user = user_message("turn 2 user"); + let turn_two_assistant = assistant_message("turn 2 assistant"); + let standalone_assistant = assistant_message("standalone assistant"); + + let rollout_items = vec![ + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: first_turn_id.clone(), + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "turn 1 user".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + )), + RolloutItem::TurnContext(first_context_item.clone()), + RolloutItem::ResponseItem(turn_one_user.clone()), + RolloutItem::ResponseItem(turn_one_assistant.clone()), + RolloutItem::EventMsg(EventMsg::TurnComplete( + codex_protocol::protocol::TurnCompleteEvent { + turn_id: first_turn_id, + last_agent_message: None, + }, + )), + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: second_turn_id.clone(), + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "turn 2 user".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + )), + RolloutItem::ResponseItem(turn_two_user), + RolloutItem::ResponseItem(turn_two_assistant), + RolloutItem::EventMsg(EventMsg::TurnComplete( + codex_protocol::protocol::TurnCompleteEvent { + turn_id: second_turn_id, + last_agent_message: None, + }, + )), + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: standalone_turn_id.clone(), + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::ResponseItem(standalone_assistant), + RolloutItem::EventMsg(EventMsg::TurnComplete( + codex_protocol::protocol::TurnCompleteEvent { + turn_id: standalone_turn_id, + last_agent_message: None, + }, + )), + RolloutItem::EventMsg(EventMsg::ThreadRolledBack( + codex_protocol::protocol::ThreadRolledBackEvent { num_turns: 1 }, + )), + ]; + + let reconstructed = session + .reconstruct_history_from_rollout(&turn_context, &rollout_items) + .await; + + assert_eq!( + reconstructed.history, + vec![turn_one_user, turn_one_assistant] + ); + assert_eq!( + reconstructed.previous_model, + Some(turn_context.model_info.slug.clone()) + ); + assert_eq!( + serde_json::to_value(reconstructed.reference_context_item) + .expect("serialize reconstructed reference context item"), + serde_json::to_value(Some(first_context_item)) + .expect("serialize expected reference context item") + ); +} + +#[tokio::test] +async fn reconstruct_history_rollback_clears_history_and_metadata_when_exceeding_user_turns() { + let (session, turn_context) = make_session_and_context().await; + let only_context_item = turn_context.to_turn_context_item(); + let only_turn_id = only_context_item + .turn_id + .clone() + .expect("turn context should have turn_id"); + let rollout_items = vec![ + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: only_turn_id.clone(), + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "only user".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + )), + RolloutItem::TurnContext(only_context_item), + RolloutItem::ResponseItem(user_message("only user")), + RolloutItem::ResponseItem(assistant_message("only assistant")), + RolloutItem::EventMsg(EventMsg::TurnComplete( + codex_protocol::protocol::TurnCompleteEvent { + turn_id: only_turn_id, + last_agent_message: None, + }, + )), + RolloutItem::EventMsg(EventMsg::ThreadRolledBack( + codex_protocol::protocol::ThreadRolledBackEvent { num_turns: 99 }, + )), + ]; + + let reconstructed = session + .reconstruct_history_from_rollout(&turn_context, &rollout_items) + .await; + + assert_eq!(reconstructed.history, Vec::new()); + assert_eq!(reconstructed.previous_model, None); + assert!(reconstructed.reference_context_item.is_none()); +} + +#[tokio::test] +async fn record_initial_history_resumed_rollback_skips_only_user_turns() { + let (session, turn_context) = make_session_and_context().await; + let previous_context_item = turn_context.to_turn_context_item(); + let user_turn_id = previous_context_item + .turn_id + .clone() + .expect("turn context should have turn_id"); + let standalone_turn_id = "standalone-task-turn".to_string(); + let rollout_items = vec![ + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: user_turn_id.clone(), + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "seed".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + )), + RolloutItem::TurnContext(previous_context_item), + RolloutItem::EventMsg(EventMsg::TurnComplete( + codex_protocol::protocol::TurnCompleteEvent { + turn_id: user_turn_id, + last_agent_message: None, + }, + )), + // Standalone task turn (no UserMessage) should not consume rollback skips. + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: standalone_turn_id.clone(), + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::EventMsg(EventMsg::TurnComplete( + codex_protocol::protocol::TurnCompleteEvent { + turn_id: standalone_turn_id, + last_agent_message: None, + }, + )), + RolloutItem::EventMsg(EventMsg::ThreadRolledBack( + codex_protocol::protocol::ThreadRolledBackEvent { num_turns: 1 }, + )), + ]; + + session + .record_initial_history(InitialHistory::Resumed(ResumedHistory { + conversation_id: ThreadId::default(), + history: rollout_items, + rollout_path: PathBuf::from("/tmp/resume.jsonl"), + })) + .await; + + assert_eq!(session.previous_model().await, None); + assert!(session.reference_context_item().await.is_none()); +} + +#[tokio::test] +async fn record_initial_history_resumed_rollback_drops_incomplete_user_turn_compaction_metadata() { + let (session, turn_context) = make_session_and_context().await; + let previous_context_item = turn_context.to_turn_context_item(); + let previous_turn_id = previous_context_item + .turn_id + .clone() + .expect("turn context should have turn_id"); + let incomplete_turn_id = "incomplete-compacted-user-turn".to_string(); + + let rollout_items = vec![ + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: previous_turn_id.clone(), + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "seed".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + )), + RolloutItem::TurnContext(previous_context_item.clone()), + RolloutItem::EventMsg(EventMsg::TurnComplete( + codex_protocol::protocol::TurnCompleteEvent { + turn_id: previous_turn_id, + last_agent_message: None, + }, + )), + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: incomplete_turn_id, + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "rolled back".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + )), + RolloutItem::Compacted(CompactedItem { + message: String::new(), + replacement_history: Some(Vec::new()), + }), + RolloutItem::EventMsg(EventMsg::ThreadRolledBack( + codex_protocol::protocol::ThreadRolledBackEvent { num_turns: 1 }, + )), + ]; + + session + .record_initial_history(InitialHistory::Resumed(ResumedHistory { + conversation_id: ThreadId::default(), + history: rollout_items, + rollout_path: PathBuf::from("/tmp/resume.jsonl"), + })) + .await; + + assert_eq!( + session.previous_model().await, + Some(turn_context.model_info.slug.clone()) + ); + assert_eq!( + serde_json::to_value(session.reference_context_item().await) + .expect("serialize seeded reference context item"), + serde_json::to_value(Some(previous_context_item)) + .expect("serialize expected reference context item") + ); +} + +#[tokio::test] +async fn record_initial_history_resumed_bare_turn_context_does_not_seed_reference_context_item() { + let (session, turn_context) = make_session_and_context().await; + let previous_context_item = turn_context.to_turn_context_item(); + let rollout_items = vec![RolloutItem::TurnContext(previous_context_item.clone())]; + + session + .record_initial_history(InitialHistory::Resumed(ResumedHistory { + conversation_id: ThreadId::default(), + history: rollout_items, + rollout_path: PathBuf::from("/tmp/resume.jsonl"), + })) + .await; + + assert!(session.reference_context_item().await.is_none()); +} + +#[tokio::test] +async fn record_initial_history_resumed_does_not_seed_reference_context_item_after_compaction() { + let (session, turn_context) = make_session_and_context().await; + let previous_context_item = turn_context.to_turn_context_item(); + let rollout_items = vec![ + RolloutItem::TurnContext(previous_context_item), + RolloutItem::Compacted(CompactedItem { + message: String::new(), + replacement_history: Some(Vec::new()), + }), + ]; + + session + .record_initial_history(InitialHistory::Resumed(ResumedHistory { + conversation_id: ThreadId::default(), + history: rollout_items, + rollout_path: PathBuf::from("/tmp/resume.jsonl"), + })) + .await; + + assert_eq!(session.previous_model().await, None); + assert!(session.reference_context_item().await.is_none()); +} + +#[tokio::test] +async fn record_initial_history_resumed_turn_context_after_compaction_reestablishes_reference_context_item() + { + let (session, turn_context) = make_session_and_context().await; + let previous_model = "previous-rollout-model"; + let previous_context_item = TurnContextItem { + turn_id: Some(turn_context.sub_id.clone()), + cwd: turn_context.cwd.clone(), + current_date: turn_context.current_date.clone(), + timezone: turn_context.timezone.clone(), + approval_policy: turn_context.approval_policy.value(), + sandbox_policy: turn_context.sandbox_policy.get().clone(), + network: None, + model: previous_model.to_string(), + personality: turn_context.personality, + collaboration_mode: Some(turn_context.collaboration_mode.clone()), + effort: turn_context.reasoning_effort, + summary: turn_context.reasoning_summary, + user_instructions: None, + developer_instructions: None, + final_output_json_schema: None, + truncation_policy: Some(turn_context.truncation_policy.into()), + }; + let previous_turn_id = previous_context_item + .turn_id + .clone() + .expect("turn context should have turn_id"); + let rollout_items = vec![ + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: previous_turn_id.clone(), + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "seed".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + )), + // Compaction clears baseline until a later TurnContextItem re-establishes it. + RolloutItem::Compacted(CompactedItem { + message: String::new(), + replacement_history: Some(Vec::new()), + }), + RolloutItem::TurnContext(previous_context_item), + RolloutItem::EventMsg(EventMsg::TurnComplete( + codex_protocol::protocol::TurnCompleteEvent { + turn_id: previous_turn_id, + last_agent_message: None, + }, + )), + ]; + + session + .record_initial_history(InitialHistory::Resumed(ResumedHistory { + conversation_id: ThreadId::default(), + history: rollout_items, + rollout_path: PathBuf::from("/tmp/resume.jsonl"), + })) + .await; + + assert_eq!( + session.previous_model().await, + Some(previous_model.to_string()) + ); + assert_eq!( + serde_json::to_value(session.reference_context_item().await) + .expect("serialize seeded reference context item"), + serde_json::to_value(Some(TurnContextItem { + turn_id: Some(turn_context.sub_id.clone()), + cwd: turn_context.cwd.clone(), + current_date: turn_context.current_date.clone(), + timezone: turn_context.timezone.clone(), + approval_policy: turn_context.approval_policy.value(), + sandbox_policy: turn_context.sandbox_policy.get().clone(), + network: None, + model: previous_model.to_string(), + personality: turn_context.personality, + collaboration_mode: Some(turn_context.collaboration_mode.clone()), + effort: turn_context.reasoning_effort, + summary: turn_context.reasoning_summary, + user_instructions: None, + developer_instructions: None, + final_output_json_schema: None, + truncation_policy: Some(turn_context.truncation_policy.into()), + })) + .expect("serialize expected reference context item") + ); +} + +#[tokio::test] +async fn record_initial_history_resumed_aborted_turn_without_id_clears_active_turn_for_compaction_accounting() + { + let (session, turn_context) = make_session_and_context().await; + let previous_model = "previous-rollout-model"; + let previous_context_item = TurnContextItem { + turn_id: Some(turn_context.sub_id.clone()), + cwd: turn_context.cwd.clone(), + current_date: turn_context.current_date.clone(), + timezone: turn_context.timezone.clone(), + approval_policy: turn_context.approval_policy.value(), + sandbox_policy: turn_context.sandbox_policy.get().clone(), + network: None, + model: previous_model.to_string(), + personality: turn_context.personality, + collaboration_mode: Some(turn_context.collaboration_mode.clone()), + effort: turn_context.reasoning_effort, + summary: turn_context.reasoning_summary, + user_instructions: None, + developer_instructions: None, + final_output_json_schema: None, + truncation_policy: Some(turn_context.truncation_policy.into()), + }; + let previous_turn_id = previous_context_item + .turn_id + .clone() + .expect("turn context should have turn_id"); + let aborted_turn_id = "aborted-turn-without-id".to_string(); + + let rollout_items = vec![ + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: previous_turn_id.clone(), + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "seed".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + )), + RolloutItem::TurnContext(previous_context_item), + RolloutItem::EventMsg(EventMsg::TurnComplete( + codex_protocol::protocol::TurnCompleteEvent { + turn_id: previous_turn_id, + last_agent_message: None, + }, + )), + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: aborted_turn_id, + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "aborted".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + )), + RolloutItem::EventMsg(EventMsg::TurnAborted( + codex_protocol::protocol::TurnAbortedEvent { + turn_id: None, + reason: TurnAbortReason::Interrupted, + }, + )), + RolloutItem::Compacted(CompactedItem { + message: String::new(), + replacement_history: Some(Vec::new()), + }), + ]; + + session + .record_initial_history(InitialHistory::Resumed(ResumedHistory { + conversation_id: ThreadId::default(), + history: rollout_items, + rollout_path: PathBuf::from("/tmp/resume.jsonl"), + })) + .await; + + assert_eq!( + session.previous_model().await, + Some(previous_model.to_string()) + ); + assert!(session.reference_context_item().await.is_none()); +} + +#[tokio::test] +async fn record_initial_history_resumed_unmatched_abort_preserves_active_turn_for_later_turn_context() + { + let (session, turn_context) = make_session_and_context().await; + let previous_context_item = turn_context.to_turn_context_item(); + let previous_turn_id = previous_context_item + .turn_id + .clone() + .expect("turn context should have turn_id"); + let current_model = "current-rollout-model"; + let current_turn_id = "current-turn".to_string(); + let unmatched_abort_turn_id = "other-turn".to_string(); + let current_context_item = TurnContextItem { + turn_id: Some(current_turn_id.clone()), + cwd: turn_context.cwd.clone(), + current_date: turn_context.current_date.clone(), + timezone: turn_context.timezone.clone(), + approval_policy: turn_context.approval_policy.value(), + sandbox_policy: turn_context.sandbox_policy.get().clone(), + network: None, + model: current_model.to_string(), + personality: turn_context.personality, + collaboration_mode: Some(turn_context.collaboration_mode.clone()), + effort: turn_context.reasoning_effort, + summary: turn_context.reasoning_summary, + user_instructions: None, + developer_instructions: None, + final_output_json_schema: None, + truncation_policy: Some(turn_context.truncation_policy.into()), + }; + + let rollout_items = vec![ + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: previous_turn_id.clone(), + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "seed".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + )), + RolloutItem::TurnContext(previous_context_item), + RolloutItem::EventMsg(EventMsg::TurnComplete( + codex_protocol::protocol::TurnCompleteEvent { + turn_id: previous_turn_id, + last_agent_message: None, + }, + )), + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: current_turn_id.clone(), + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "current".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + )), + RolloutItem::EventMsg(EventMsg::TurnAborted( + codex_protocol::protocol::TurnAbortedEvent { + turn_id: Some(unmatched_abort_turn_id), + reason: TurnAbortReason::Interrupted, + }, + )), + RolloutItem::TurnContext(current_context_item.clone()), + RolloutItem::EventMsg(EventMsg::TurnComplete( + codex_protocol::protocol::TurnCompleteEvent { + turn_id: current_turn_id, + last_agent_message: None, + }, + )), + ]; + + session + .record_initial_history(InitialHistory::Resumed(ResumedHistory { + conversation_id: ThreadId::default(), + history: rollout_items, + rollout_path: PathBuf::from("/tmp/resume.jsonl"), + })) + .await; + + assert_eq!( + session.previous_model().await, + Some(current_model.to_string()) + ); + assert_eq!( + serde_json::to_value(session.reference_context_item().await) + .expect("serialize seeded reference context item"), + serde_json::to_value(Some(current_context_item)) + .expect("serialize expected reference context item") + ); +} + +#[tokio::test] +async fn record_initial_history_resumed_trailing_incomplete_turn_compaction_clears_reference_context_item() + { + let (session, turn_context) = make_session_and_context().await; + let previous_model = "previous-rollout-model"; + let previous_context_item = TurnContextItem { + turn_id: Some(turn_context.sub_id.clone()), + cwd: turn_context.cwd.clone(), + current_date: turn_context.current_date.clone(), + timezone: turn_context.timezone.clone(), + approval_policy: turn_context.approval_policy.value(), + sandbox_policy: turn_context.sandbox_policy.get().clone(), + network: None, + model: previous_model.to_string(), + personality: turn_context.personality, + collaboration_mode: Some(turn_context.collaboration_mode.clone()), + effort: turn_context.reasoning_effort, + summary: turn_context.reasoning_summary, + user_instructions: None, + developer_instructions: None, + final_output_json_schema: None, + truncation_policy: Some(turn_context.truncation_policy.into()), + }; + let previous_turn_id = previous_context_item + .turn_id + .clone() + .expect("turn context should have turn_id"); + let incomplete_turn_id = "trailing-incomplete-turn".to_string(); + + let rollout_items = vec![ + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: previous_turn_id.clone(), + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "seed".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + )), + RolloutItem::TurnContext(previous_context_item), + RolloutItem::EventMsg(EventMsg::TurnComplete( + codex_protocol::protocol::TurnCompleteEvent { + turn_id: previous_turn_id, + last_agent_message: None, + }, + )), + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: incomplete_turn_id, + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "incomplete".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + )), + RolloutItem::Compacted(CompactedItem { + message: String::new(), + replacement_history: Some(Vec::new()), + }), + ]; + + session + .record_initial_history(InitialHistory::Resumed(ResumedHistory { + conversation_id: ThreadId::default(), + history: rollout_items, + rollout_path: PathBuf::from("/tmp/resume.jsonl"), + })) + .await; + + assert_eq!( + session.previous_model().await, + Some(previous_model.to_string()) + ); + assert!(session.reference_context_item().await.is_none()); +} + +#[tokio::test] +async fn record_initial_history_resumed_trailing_incomplete_turn_preserves_turn_context_item() { + let (session, turn_context) = make_session_and_context().await; + let current_context_item = turn_context.to_turn_context_item(); + let current_turn_id = current_context_item + .turn_id + .clone() + .expect("turn context should have turn_id"); + + let rollout_items = vec![ + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: current_turn_id, + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "incomplete".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + )), + RolloutItem::TurnContext(current_context_item.clone()), + ]; + + session + .record_initial_history(InitialHistory::Resumed(ResumedHistory { + conversation_id: ThreadId::default(), + history: rollout_items, + rollout_path: PathBuf::from("/tmp/resume.jsonl"), + })) + .await; + + assert_eq!( + session.previous_model().await, + Some(turn_context.model_info.slug.clone()) + ); + assert_eq!( + serde_json::to_value(session.reference_context_item().await) + .expect("serialize seeded reference context item"), + serde_json::to_value(Some(current_context_item)) + .expect("serialize expected reference context item") + ); +} + +#[tokio::test] +async fn record_initial_history_resumed_replaced_incomplete_compacted_turn_clears_reference_context_item() + { + let (session, turn_context) = make_session_and_context().await; + let previous_model = "previous-rollout-model"; + let previous_context_item = TurnContextItem { + turn_id: Some(turn_context.sub_id.clone()), + cwd: turn_context.cwd.clone(), + current_date: turn_context.current_date.clone(), + timezone: turn_context.timezone.clone(), + approval_policy: turn_context.approval_policy.value(), + sandbox_policy: turn_context.sandbox_policy.get().clone(), + network: None, + model: previous_model.to_string(), + personality: turn_context.personality, + collaboration_mode: Some(turn_context.collaboration_mode.clone()), + effort: turn_context.reasoning_effort, + summary: turn_context.reasoning_summary, + user_instructions: None, + developer_instructions: None, + final_output_json_schema: None, + truncation_policy: Some(turn_context.truncation_policy.into()), + }; + let previous_turn_id = previous_context_item + .turn_id + .clone() + .expect("turn context should have turn_id"); + let compacted_incomplete_turn_id = "compacted-incomplete-turn".to_string(); + let replacing_turn_id = "replacing-turn".to_string(); + + let rollout_items = vec![ + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: previous_turn_id.clone(), + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "seed".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + )), + RolloutItem::TurnContext(previous_context_item), + RolloutItem::EventMsg(EventMsg::TurnComplete( + codex_protocol::protocol::TurnCompleteEvent { + turn_id: previous_turn_id, + last_agent_message: None, + }, + )), + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: compacted_incomplete_turn_id, + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "compacted".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + )), + RolloutItem::Compacted(CompactedItem { + message: String::new(), + replacement_history: Some(Vec::new()), + }), + // A newer TurnStarted replaces the incomplete compacted turn without a matching + // completion/abort for the old one. + RolloutItem::EventMsg(EventMsg::TurnStarted( + codex_protocol::protocol::TurnStartedEvent { + turn_id: replacing_turn_id, + model_context_window: Some(128_000), + collaboration_mode_kind: ModeKind::Default, + }, + )), + ]; + + session + .record_initial_history(InitialHistory::Resumed(ResumedHistory { + conversation_id: ThreadId::default(), + history: rollout_items, + rollout_path: PathBuf::from("/tmp/resume.jsonl"), + })) + .await; + + assert_eq!( + session.previous_model().await, + Some(previous_model.to_string()) + ); + assert!(session.reference_context_item().await.is_none()); +} diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index 7470a148958..0441f9c4969 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -224,7 +224,7 @@ async fn run_compact_task_inner( InitialContextInjection::DoNotInject => None, InitialContextInjection::BeforeLastUserMessage => Some(turn_context.to_turn_context_item()), }; - sess.replace_history(new_history.clone(), reference_context_item) + sess.replace_history(new_history.clone(), reference_context_item.clone()) .await; sess.recompute_token_usage(&turn_context).await; @@ -232,7 +232,15 @@ async fn run_compact_task_inner( message: summary_text.clone(), replacement_history: Some(new_history), }); - sess.persist_rollout_items(&[rollout_item]).await; + let rollout_items = if let Some(turn_context_item) = reference_context_item { + // Mid-turn compaction re-injected initial context into the replacement history, so + // persist a fresh `TurnContextItem` after `Compacted` to re-establish the baseline for + // resume/fork replay. + vec![rollout_item, RolloutItem::TurnContext(turn_context_item)] + } else { + vec![rollout_item] + }; + sess.persist_rollout_items(&rollout_items).await; sess.emit_turn_item_completed(&turn_context, compaction_item) .await; diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index cc5f5164c39..3019a4ad8b6 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -159,7 +159,7 @@ async fn run_remote_compact_task_inner_impl( InitialContextInjection::DoNotInject => None, InitialContextInjection::BeforeLastUserMessage => Some(turn_context.to_turn_context_item()), }; - sess.replace_history(new_history.clone(), reference_context_item) + sess.replace_history(new_history.clone(), reference_context_item.clone()) .await; sess.recompute_token_usage(turn_context).await; @@ -167,8 +167,18 @@ async fn run_remote_compact_task_inner_impl( message: String::new(), replacement_history: Some(new_history), }; - sess.persist_rollout_items(&[RolloutItem::Compacted(compacted_item)]) - .await; + let rollout_items = if let Some(turn_context_item) = reference_context_item { + // Mid-turn compaction re-injected initial context into the replacement history, so + // persist a fresh `TurnContextItem` after `Compacted` to re-establish the baseline for + // resume/fork replay. + vec![ + RolloutItem::Compacted(compacted_item), + RolloutItem::TurnContext(turn_context_item), + ] + } else { + vec![RolloutItem::Compacted(compacted_item)] + }; + sess.persist_rollout_items(&rollout_items).await; sess.emit_turn_item_completed(turn_context, compaction_item) .await; diff --git a/codex-rs/core/tests/suite/compact.rs b/codex-rs/core/tests/suite/compact.rs index 9e376ce494e..618861f33e3 100644 --- a/codex-rs/core/tests/suite/compact.rs +++ b/codex-rs/core/tests/suite/compact.rs @@ -362,7 +362,7 @@ async fn summarize_context_three_requests_and_instructions() { codex.submit(Op::Shutdown).await.unwrap(); wait_for_event(&codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await; - // Verify rollout contains regular sampling TurnContext entries and a Compacted entry. + // Verify rollout contains user-turn TurnContext entries and a Compacted entry. println!("rollout path: {}", rollout_path.display()); let text = std::fs::read_to_string(&rollout_path).unwrap_or_else(|e| { panic!( @@ -393,9 +393,9 @@ async fn summarize_context_three_requests_and_instructions() { } } - assert!( - regular_turn_context_count == 2, - "expected two regular sampling TurnContext entries in rollout" + assert_eq!( + regular_turn_context_count, 2, + "rollout should contain one TurnContext entry per real user turn" ); assert!( saw_compacted_summary, @@ -2080,9 +2080,9 @@ async fn auto_compact_persists_rollout_entries() { } } - assert!( - turn_context_count >= 2, - "expected at least two turn context entries, got {turn_context_count}" + assert_eq!( + turn_context_count, 3, + "rollout should contain one TurnContext entry per real user turn" ); } diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index b5aaf47ea65..880e6a24fe5 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -2124,10 +2124,10 @@ pub struct TurnContextNetworkItem { pub denied_domains: Vec, } -/// Persist only when the same turn also persists the corresponding -/// model-visible context updates (diffs or full reinjection), so -/// resume/fork does not use a `reference_context_item` whose context -/// was never actually visible to the model. +/// Persist once per real user turn after computing that turn's model-visible +/// context updates, and again after mid-turn compaction when replacement +/// history re-establishes full context, so resume/fork replay can recover the +/// latest durable baseline. #[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, TS)] pub struct TurnContextItem { #[serde(default, skip_serializing_if = "Option::is_none")]