Skip to content
66 changes: 20 additions & 46 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use tokio::select;
use tokio::sync::Mutex;
use tokio::sync::oneshot;
use tracing::error;
Expand Down Expand Up @@ -2239,51 +2238,25 @@ impl CodexMessageProcessor {
.await
{
info!("conversation {conversation_id} was active; shutting down");
let conversation_clone = conversation.clone();
let notify = Arc::new(tokio::sync::Notify::new());
let notify_clone = notify.clone();

// Establish the listener for ShutdownComplete before submitting
// Shutdown so it is not missed.
let is_shutdown = tokio::spawn(async move {
// Create the notified future outside the loop to avoid losing notifications.
let notified = notify_clone.notified();
tokio::pin!(notified);
loop {
select! {
_ = &mut notified => { break; }
event = conversation_clone.next_event() => {
match event {
Ok(event) => {
if matches!(event.msg, EventMsg::ShutdownComplete) { break; }
}
// Break on errors to avoid tight loops when the agent loop has exited.
Err(_) => { break; }
}
}
}
}
});
// Request shutdown.
match conversation.submit(Op::Shutdown).await {
Ok(_) => {
// Successfully submitted Shutdown; wait before proceeding.
select! {
_ = is_shutdown => {
// Normal shutdown: proceed with archive.
}
_ = tokio::time::sleep(Duration::from_secs(10)) => {
warn!("conversation {conversation_id} shutdown timed out; proceeding with archive");
// Wake any waiter; use notify_waiters to avoid missing the signal.
notify.notify_waiters();
// Perhaps we lost a shutdown race, so let's continue to
// clean up the .jsonl file.
}
}
// Do not wait on conversation.next_event(); the listener task already consumes
// the stream. Request shutdown and ensure the rollout file is flushed before moving it.
if let Err(err) = conversation.submit(Op::Shutdown).await {
error!("failed to submit Shutdown to conversation {conversation_id}: {err}");
}

let flush_result =
tokio::time::timeout(Duration::from_secs(5), conversation.flush_rollout()).await;
match flush_result {
Ok(Ok(())) => {}
Ok(Err(err)) => {
warn!(
"conversation {conversation_id} rollout flush failed before archive: {err}"
);
}
Err(err) => {
error!("failed to submit Shutdown to conversation {conversation_id}: {err}");
notify.notify_waiters();
Err(_) => {
warn!(
"conversation {conversation_id} rollout flush timed out; proceeding with archive"
);
}
}
}
Expand All @@ -2295,7 +2268,8 @@ impl CodexMessageProcessor {
.codex_home
.join(codex_core::ARCHIVED_SESSIONS_SUBDIR);
tokio::fs::create_dir_all(&archive_folder).await?;
tokio::fs::rename(&canonical_rollout_path, &archive_folder.join(&file_name)).await?;
let destination = archive_folder.join(&file_name);
tokio::fs::rename(&canonical_rollout_path, &destination).await?;
Ok(())
}
.await;
Expand Down
1 change: 1 addition & 0 deletions codex-rs/app-server/tests/common/rollout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub fn create_fake_rollout(
instructions: None,
source: SessionSource::Cli,
model_provider: model_provider.map(str::to_string),
name: None,
})?;

let lines = [
Expand Down
71 changes: 71 additions & 0 deletions codex-rs/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ enum Subcommand {
/// Resume a previous interactive session (picker by default; use --last to continue the most recent).
Resume(ResumeCommand),

/// Fork an existing session into a new conversation.
Fork(ForkCommand),

/// [EXPERIMENTAL] Browse tasks from Codex Cloud and apply changes locally.
#[clap(name = "cloud", alias = "cloud-tasks")]
Cloud(CloudTasksCli),
Expand Down Expand Up @@ -142,6 +145,16 @@ struct ResumeCommand {
config_overrides: TuiCli,
}

#[derive(Debug, Parser)]
struct ForkCommand {
/// Resume from a saved session name or rollout id, but start a new conversation.
#[arg(value_name = "ID|NAME")]
target: String,

#[clap(flatten)]
config_overrides: TuiCli,
}

#[derive(Debug, Parser)]
struct SandboxArgs {
#[command(subcommand)]
Expand Down Expand Up @@ -466,6 +479,19 @@ async fn cli_main(codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()
let exit_info = codex_tui::run_main(interactive, codex_linux_sandbox_exe).await?;
handle_app_exit(exit_info)?;
}
Some(Subcommand::Fork(ForkCommand {
target,
config_overrides,
})) => {
interactive = finalize_fork_interactive(
interactive,
root_config_overrides.clone(),
target,
config_overrides,
);
let exit_info = codex_tui::run_main(interactive, codex_linux_sandbox_exe).await?;
handle_app_exit(exit_info)?;
}
Some(Subcommand::Login(mut login_cli)) => {
prepend_config_flags(
&mut login_cli.config_overrides,
Expand Down Expand Up @@ -627,6 +653,7 @@ fn finalize_resume_interactive(
interactive.resume_last = last;
interactive.resume_session_id = resume_session_id;
interactive.resume_show_all = show_all;
interactive.fork_source = None;

// Merge resume-scoped flags and overrides with highest precedence.
merge_resume_cli_flags(&mut interactive, resume_cli);
Expand All @@ -637,6 +664,21 @@ fn finalize_resume_interactive(
interactive
}

fn finalize_fork_interactive(
mut interactive: TuiCli,
root_config_overrides: CliConfigOverrides,
target: String,
fork_cli: TuiCli,
) -> TuiCli {
interactive.resume_picker = false;
interactive.resume_last = false;
interactive.resume_session_id = None;
interactive.fork_source = Some(target);
merge_resume_cli_flags(&mut interactive, fork_cli);
prepend_config_flags(&mut interactive.config_overrides, root_config_overrides);
interactive
}

/// Merge flags provided to `codex resume` so they take precedence over any
/// root-level flags. Only overrides fields explicitly set on the resume-scoped
/// CLI. Also appends `-c key=value` overrides with highest precedence.
Expand Down Expand Up @@ -727,6 +769,26 @@ mod tests {
)
}

fn fork_from_args(args: &[&str]) -> TuiCli {
let cli = MultitoolCli::try_parse_from(args).expect("parse");
let MultitoolCli {
interactive,
config_overrides: root_overrides,
subcommand,
feature_toggles: _,
} = cli;

let Subcommand::Fork(ForkCommand {
target,
config_overrides,
}) = subcommand.expect("fork present")
else {
unreachable!()
};

finalize_fork_interactive(interactive, root_overrides, target, config_overrides)
}

fn sample_exit_info(conversation: Option<&str>) -> AppExitInfo {
let token_usage = TokenUsage {
output_tokens: 2,
Expand Down Expand Up @@ -819,6 +881,15 @@ mod tests {
assert!(interactive.resume_show_all);
}

#[test]
fn fork_sets_target_and_disables_resume_controls() {
let interactive = fork_from_args(["codex", "fork", "saved"].as_ref());
assert_eq!(interactive.fork_source.as_deref(), Some("saved"));
assert!(!interactive.resume_picker);
assert!(!interactive.resume_last);
assert!(interactive.resume_session_id.is_none());
}

#[test]
fn resume_merges_option_flags_and_full_auto() {
let interactive = finalize_from_args(
Expand Down
Loading
Loading