Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
a2d6b8e
Unify rollout reconstruction and resume/fork hydration
charley-oai Feb 23, 2026
fd4f195
Handle missing abort turn IDs in rollout replay
charley-oai Feb 23, 2026
11d00b2
Box rollout replay meta turn variant for clippy
charley-oai Feb 23, 2026
f6b47b6
Document rollout replay metadata reconstruction
charley-oai Feb 24, 2026
4e112da
Clarify compaction-after-turn replay comment
charley-oai Feb 24, 2026
59ce703
Make preturn vs mid-turn compaction replay explicit
charley-oai Feb 24, 2026
f7d34f1
Clarify CompactionOutsideTurn as legacy fallback
charley-oai Feb 25, 2026
54da324
Clarify outside-turn compaction replay semantics
charley-oai Feb 25, 2026
f171e98
Simplify legacy compaction fallback accounting
charley-oai Feb 25, 2026
028e6c9
Rename compaction baseline-clear replay flag
charley-oai Feb 25, 2026
b8663ce
Document compacted-turn fallback baseline invalidation
charley-oai Feb 25, 2026
8e08ea9
Handle trailing incomplete compacted turns in replay
charley-oai Feb 25, 2026
7b2c735
Preserve compacted incomplete turn replacement on replay
charley-oai Feb 25, 2026
1b86daf
Align rollback replay with incomplete user turns
charley-oai Feb 25, 2026
9ced301
Finalize incomplete replay turns
charley-oai Feb 25, 2026
952c5b5
comment
charley-oai Feb 25, 2026
36fff69
Add rollback replay equivalence tests
charley-oai Feb 25, 2026
d599804
Ignore unmatched aborts during replay
charley-oai Feb 25, 2026
d0a43d0
Move rollout reconstruction out of codex.rs
charley-oai Feb 25, 2026
e47e0ac
Persist TurnContext only when model-visible context changes
charley-oai Feb 25, 2026
b3e4201
Move rollout reconstruction tests out of codex.rs
charley-oai Feb 25, 2026
21902a7
Make rollout reconstruction comment timeless
charley-oai Feb 25, 2026
1b74077
Clarify replay comment for reference context
charley-oai Feb 25, 2026
ce4dd33
Trim legacy detail from rollout replay comment
charley-oai Feb 25, 2026
7660215
Rename replay compaction marker for clarity
charley-oai Feb 25, 2026
7fe48bc
Simplify rollout replay metadata resolution
charley-oai Feb 25, 2026
19b7d05
Advance reference context baseline without prompt diffs
charley-oai Feb 26, 2026
415e44c
Narrow TurnContext rollout persistence
charley-oai Feb 26, 2026
b8908ce
Clarify replay TurnContext comment
charley-oai Feb 26, 2026
96377fb
Make rollout metadata resolution tail-oriented
charley-oai Feb 26, 2026
ac88442
Make rollout replay reverse-scan friendly
charley-oai Feb 26, 2026
0a8645e
Document reverse replay rollback buffer
charley-oai Feb 26, 2026
3086586
Document reverse replay collector fields
charley-oai Feb 26, 2026
fb4b6f7
Document rollout reconstruction return type
charley-oai Feb 26, 2026
8fa9bf7
Add rollout lazy loading design note
charley-oai Feb 26, 2026
d065ead
Remove rollout lazy loading design note
charley-oai Feb 26, 2026
ed000ee
Simplify unified rollout reconstruction
charley-oai Feb 26, 2026
7d1297b
Persist TurnContext after full-context reinjection
charley-oai Feb 26, 2026
709dae6
Make replay reference-context state explicit
charley-oai Feb 26, 2026
6c7e9bf
Rename replay reference-context enum
charley-oai Feb 26, 2026
8436614
Rename replay turn reference-context state
charley-oai Feb 26, 2026
501d658
Refactor rollout reconstruction around reverse replay
charley-oai Feb 26, 2026
e74fe7e
Document reverse replay state in rollout reconstruction
charley-oai Feb 26, 2026
1137b3a
Simplify rollout reconstruction turn id matching
charley-oai Feb 26, 2026
44b3006
Name reconstructed reference context binding
charley-oai Feb 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
596 changes: 163 additions & 433 deletions codex-rs/core/src/codex.rs

Large diffs are not rendered by default.

272 changes: 272 additions & 0 deletions codex-rs/core/src/codex/rollout_reconstruction.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
use super::*;

// Return value of `Session::reconstruct_history_from_rollout`, bundling the rebuilt history with
// the resume/fork hydration metadata derived from the same replay.
#[derive(Debug)]
pub(super) struct RolloutReconstruction {
pub(super) history: Vec<ResponseItem>,
pub(super) previous_model: Option<String>,
pub(super) reference_context_item: Option<TurnContextItem>,
}

#[derive(Debug, Default)]
enum TurnReferenceContextItem {
/// No `TurnContextItem` has been seen for this replay span yet.
///
/// This differs from `Cleared`: `NeverSet` means there is no evidence this turn ever
/// established a baseline, while `Cleared` means a baseline existed and a later compaction
/// invalidated it. Only the latter must emit an explicit clearing segment for resume/fork
/// hydration.
#[default]
NeverSet,
/// A previously established baseline was invalidated by later compaction.
Cleared,
/// The latest baseline established by this replay span.
LatestSet(Box<TurnContextItem>),
}

#[derive(Debug, Default)]
struct ActiveReplaySegment {
turn_id: Option<String>,
counts_as_user_turn: bool,
previous_model: Option<String>,
reference_context_item: TurnReferenceContextItem,
replacement_history_index: Option<usize>,
}

fn turn_ids_are_compatible(active_turn_id: Option<&str>, item_turn_id: Option<&str>) -> bool {
active_turn_id
.is_none_or(|turn_id| item_turn_id.is_none_or(|item_turn_id| item_turn_id == turn_id))
}

impl Session {
pub(super) async fn reconstruct_history_from_rollout(
&self,
turn_context: &TurnContext,
rollout_items: &[RolloutItem],
) -> RolloutReconstruction {
// Replay metadata should already match the shape of the future lazy reverse loader, even
// while history materialization still uses an eager bridge. Scan newest-to-oldest,
// stopping once a surviving replacement-history checkpoint and the required resume metadata
// are both known; then replay only that suffix forward to preserve exact history semantics.
// Index of the earliest rollout item that still needs forward materialization. Once reverse
// replay finds a surviving `replacement_history` checkpoint, everything before that index
// is obsolete for rebuilt history.
let mut replay_start_index = None;
let mut previous_model = None;
let mut reference_context_item = TurnReferenceContextItem::NeverSet;
// Rollback is "drop the newest N user turns". While scanning in reverse, that becomes
// "skip the next N user-turn segments we finalize".
let mut pending_rollback_turns = 0usize;
// Reverse replay accumulates rollout items into the newest in-progress turn segment until
// we hit its matching `TurnStarted`, at which point the segment can be finalized.
let mut active_segment: Option<ActiveReplaySegment> = None;

let finalize_active_segment =
|active_segment: ActiveReplaySegment,
replay_start_index: &mut Option<usize>,
previous_model: &mut Option<String>,
reference_context_item: &mut TurnReferenceContextItem,
pending_rollback_turns: &mut usize| {
// Thread rollback always targets the newest surviving user turns, so consume that
// skip budget before letting this segment contribute metadata or a compaction base.
if *pending_rollback_turns > 0 {
if active_segment.counts_as_user_turn {
*pending_rollback_turns -= 1;
}
return;
}

// A surviving replacement-history checkpoint is a complete history base. Once we
// know the newest surviving one, older rollout items do not affect rebuilt history.
if replay_start_index.is_none()
&& let Some(replacement_history_index) =
active_segment.replacement_history_index
{
*replay_start_index = Some(replacement_history_index);
}

// `previous_model` comes from the newest surviving user turn that established one.
if previous_model.is_none() && active_segment.counts_as_user_turn {
*previous_model = active_segment.previous_model;
}

// `reference_context_item` comes from the newest surviving user turn baseline, or
// from a surviving compaction that explicitly cleared that baseline.
if matches!(reference_context_item, TurnReferenceContextItem::NeverSet)
&& (active_segment.counts_as_user_turn
|| matches!(
active_segment.reference_context_item,
TurnReferenceContextItem::Cleared
))
{
*reference_context_item = active_segment.reference_context_item;
}
};

for (index, item) in rollout_items.iter().enumerate().rev() {
match item {
RolloutItem::Compacted(compacted) => {
let active_segment =
active_segment.get_or_insert_with(ActiveReplaySegment::default);
// Looking backward, compaction clears any older baseline unless a newer
// `TurnContextItem` in this same segment has already re-established it.
if matches!(
active_segment.reference_context_item,
TurnReferenceContextItem::NeverSet
) {
active_segment.reference_context_item = TurnReferenceContextItem::Cleared;
}
if active_segment.replacement_history_index.is_none()
&& compacted.replacement_history.is_some()
{
active_segment.replacement_history_index = Some(index);
}
}
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => {
pending_rollback_turns = pending_rollback_turns
.saturating_add(usize::try_from(rollback.num_turns).unwrap_or(usize::MAX));
}
RolloutItem::EventMsg(EventMsg::TurnStarted(event)) => {
// `TurnStarted` is the oldest boundary of the active reverse segment.
if active_segment.as_ref().is_some_and(|active_segment| {
turn_ids_are_compatible(
active_segment.turn_id.as_deref(),
Some(event.turn_id.as_str()),
)
}) && let Some(active_segment) = active_segment.take()
{
finalize_active_segment(
active_segment,
&mut replay_start_index,
&mut previous_model,
&mut reference_context_item,
&mut pending_rollback_turns,
);
}
}
RolloutItem::EventMsg(EventMsg::TurnComplete(event)) => {
let active_segment =
active_segment.get_or_insert_with(ActiveReplaySegment::default);
// Reverse replay often sees `TurnComplete` before any turn-scoped metadata.
// Capture the turn id early so later `TurnContext` / abort items can match it.
if active_segment.turn_id.is_none() {
active_segment.turn_id = Some(event.turn_id.clone());
}
}
RolloutItem::EventMsg(EventMsg::TurnAborted(event)) => {
if let Some(active_segment) = active_segment.as_mut() {
if active_segment.turn_id.is_none()
&& let Some(turn_id) = &event.turn_id
{
active_segment.turn_id = Some(turn_id.clone());
}
} else if let Some(turn_id) = &event.turn_id {
active_segment = Some(ActiveReplaySegment {
turn_id: Some(turn_id.clone()),
..Default::default()
});
}
}
RolloutItem::EventMsg(EventMsg::UserMessage(_)) => {
let active_segment =
active_segment.get_or_insert_with(ActiveReplaySegment::default);
active_segment.counts_as_user_turn = true;
}
RolloutItem::TurnContext(ctx) => {
let active_segment =
active_segment.get_or_insert_with(ActiveReplaySegment::default);
// Legacy rollouts can omit lifecycle ids, so a bare `TurnContextItem` still
// establishes a user-turn segment and its metadata by itself.
if active_segment.turn_id.is_none() {
active_segment.turn_id = ctx.turn_id.clone();
active_segment.counts_as_user_turn = true;
}
if turn_ids_are_compatible(
active_segment.turn_id.as_deref(),
ctx.turn_id.as_deref(),
) {
active_segment.previous_model = Some(ctx.model.clone());
if matches!(
active_segment.reference_context_item,
TurnReferenceContextItem::NeverSet
) {
active_segment.reference_context_item =
TurnReferenceContextItem::LatestSet(Box::new(ctx.clone()));
}
}
}
RolloutItem::ResponseItem(_)
| RolloutItem::EventMsg(_)
| RolloutItem::SessionMeta(_) => {}
}

if replay_start_index.is_some()
&& previous_model.is_some()
&& !matches!(reference_context_item, TurnReferenceContextItem::NeverSet)
{
// At this point we have both eager resume metadata values and a history checkpoint
// for the surviving suffix, so older rollout items cannot affect this result.
break;
}
}

if let Some(active_segment) = active_segment.take() {
finalize_active_segment(
active_segment,
&mut replay_start_index,
&mut previous_model,
&mut reference_context_item,
&mut pending_rollback_turns,
);
}

let initial_context = self.build_initial_context(turn_context, None).await;
let mut history = ContextManager::new();
// Temporary eager bridge: rebuild exact history semantics from only the surviving suffix
// discovered by reverse replay. This keeps the history result stable while the control
// flow moves toward the future lazy reverse loader design.
for item in &rollout_items[replay_start_index.unwrap_or(0)..] {
match item {
RolloutItem::ResponseItem(response_item) => {
history.record_items(
std::iter::once(response_item),
turn_context.truncation_policy,
);
}
RolloutItem::Compacted(compacted) => {
if let Some(replacement_history) = &compacted.replacement_history {
history.replace(replacement_history.clone());
} else {
let user_messages = collect_user_messages(history.raw_items());
let rebuilt = compact::build_compacted_history(
initial_context.clone(),
&user_messages,
&compacted.message,
);
history.replace(rebuilt);
}
}
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => {
history.drop_last_n_user_turns(rollback.num_turns);
}
RolloutItem::EventMsg(_)
| RolloutItem::TurnContext(_)
| RolloutItem::SessionMeta(_) => {}
}
}

let reference_context_item = match reference_context_item {
TurnReferenceContextItem::NeverSet | TurnReferenceContextItem::Cleared => None,
TurnReferenceContextItem::LatestSet(turn_reference_context_item) => {
Some(*turn_reference_context_item)
}
};

RolloutReconstruction {
history: history.raw_items().to_vec(),
previous_model,
reference_context_item,
}
}
}
Loading
Loading