diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index bf9f0b9403..8afa2075da 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -148,7 +148,6 @@ use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::time::Duration; -use tokio::select; use tokio::sync::Mutex; use tokio::sync::oneshot; use tracing::error; @@ -2239,51 +2238,25 @@ impl CodexMessageProcessor { .await { info!("conversation {conversation_id} was active; shutting down"); - let conversation_clone = conversation.clone(); - let notify = Arc::new(tokio::sync::Notify::new()); - let notify_clone = notify.clone(); - - // Establish the listener for ShutdownComplete before submitting - // Shutdown so it is not missed. - let is_shutdown = tokio::spawn(async move { - // Create the notified future outside the loop to avoid losing notifications. - let notified = notify_clone.notified(); - tokio::pin!(notified); - loop { - select! { - _ = &mut notified => { break; } - event = conversation_clone.next_event() => { - match event { - Ok(event) => { - if matches!(event.msg, EventMsg::ShutdownComplete) { break; } - } - // Break on errors to avoid tight loops when the agent loop has exited. - Err(_) => { break; } - } - } - } - } - }); - // Request shutdown. - match conversation.submit(Op::Shutdown).await { - Ok(_) => { - // Successfully submitted Shutdown; wait before proceeding. - select! { - _ = is_shutdown => { - // Normal shutdown: proceed with archive. - } - _ = tokio::time::sleep(Duration::from_secs(10)) => { - warn!("conversation {conversation_id} shutdown timed out; proceeding with archive"); - // Wake any waiter; use notify_waiters to avoid missing the signal. - notify.notify_waiters(); - // Perhaps we lost a shutdown race, so let's continue to - // clean up the .jsonl file. - } - } + // Do not wait on conversation.next_event(); the listener task already consumes + // the stream. Request shutdown and ensure the rollout file is flushed before moving it. + if let Err(err) = conversation.submit(Op::Shutdown).await { + error!("failed to submit Shutdown to conversation {conversation_id}: {err}"); + } + + let flush_result = + tokio::time::timeout(Duration::from_secs(5), conversation.flush_rollout()).await; + match flush_result { + Ok(Ok(())) => {} + Ok(Err(err)) => { + warn!( + "conversation {conversation_id} rollout flush failed before archive: {err}" + ); } - Err(err) => { - error!("failed to submit Shutdown to conversation {conversation_id}: {err}"); - notify.notify_waiters(); + Err(_) => { + warn!( + "conversation {conversation_id} rollout flush timed out; proceeding with archive" + ); } } } @@ -2295,7 +2268,8 @@ impl CodexMessageProcessor { .codex_home .join(codex_core::ARCHIVED_SESSIONS_SUBDIR); tokio::fs::create_dir_all(&archive_folder).await?; - tokio::fs::rename(&canonical_rollout_path, &archive_folder.join(&file_name)).await?; + let destination = archive_folder.join(&file_name); + tokio::fs::rename(&canonical_rollout_path, &destination).await?; Ok(()) } .await; diff --git a/codex-rs/app-server/tests/common/rollout.rs b/codex-rs/app-server/tests/common/rollout.rs index c8197a046d..203b989d3e 100644 --- a/codex-rs/app-server/tests/common/rollout.rs +++ b/codex-rs/app-server/tests/common/rollout.rs @@ -46,6 +46,7 @@ pub fn create_fake_rollout( instructions: None, source: SessionSource::Cli, model_provider: model_provider.map(str::to_string), + name: None, })?; let lines = [ diff --git a/codex-rs/cli/src/main.rs b/codex-rs/cli/src/main.rs index bb60870de5..f0c4b2e34c 100644 --- a/codex-rs/cli/src/main.rs +++ b/codex-rs/cli/src/main.rs @@ -100,6 +100,9 @@ enum Subcommand { /// Resume a previous interactive session (picker by default; use --last to continue the most recent). Resume(ResumeCommand), + /// Fork an existing session into a new conversation. + Fork(ForkCommand), + /// [EXPERIMENTAL] Browse tasks from Codex Cloud and apply changes locally. #[clap(name = "cloud", alias = "cloud-tasks")] Cloud(CloudTasksCli), @@ -142,6 +145,16 @@ struct ResumeCommand { config_overrides: TuiCli, } +#[derive(Debug, Parser)] +struct ForkCommand { + /// Resume from a saved session name or rollout id, but start a new conversation. + #[arg(value_name = "ID|NAME")] + target: String, + + #[clap(flatten)] + config_overrides: TuiCli, +} + #[derive(Debug, Parser)] struct SandboxArgs { #[command(subcommand)] @@ -466,6 +479,19 @@ async fn cli_main(codex_linux_sandbox_exe: Option) -> anyhow::Result<() let exit_info = codex_tui::run_main(interactive, codex_linux_sandbox_exe).await?; handle_app_exit(exit_info)?; } + Some(Subcommand::Fork(ForkCommand { + target, + config_overrides, + })) => { + interactive = finalize_fork_interactive( + interactive, + root_config_overrides.clone(), + target, + config_overrides, + ); + let exit_info = codex_tui::run_main(interactive, codex_linux_sandbox_exe).await?; + handle_app_exit(exit_info)?; + } Some(Subcommand::Login(mut login_cli)) => { prepend_config_flags( &mut login_cli.config_overrides, @@ -627,6 +653,7 @@ fn finalize_resume_interactive( interactive.resume_last = last; interactive.resume_session_id = resume_session_id; interactive.resume_show_all = show_all; + interactive.fork_source = None; // Merge resume-scoped flags and overrides with highest precedence. merge_resume_cli_flags(&mut interactive, resume_cli); @@ -637,6 +664,21 @@ fn finalize_resume_interactive( interactive } +fn finalize_fork_interactive( + mut interactive: TuiCli, + root_config_overrides: CliConfigOverrides, + target: String, + fork_cli: TuiCli, +) -> TuiCli { + interactive.resume_picker = false; + interactive.resume_last = false; + interactive.resume_session_id = None; + interactive.fork_source = Some(target); + merge_resume_cli_flags(&mut interactive, fork_cli); + prepend_config_flags(&mut interactive.config_overrides, root_config_overrides); + interactive +} + /// Merge flags provided to `codex resume` so they take precedence over any /// root-level flags. Only overrides fields explicitly set on the resume-scoped /// CLI. Also appends `-c key=value` overrides with highest precedence. @@ -727,6 +769,26 @@ mod tests { ) } + fn fork_from_args(args: &[&str]) -> TuiCli { + let cli = MultitoolCli::try_parse_from(args).expect("parse"); + let MultitoolCli { + interactive, + config_overrides: root_overrides, + subcommand, + feature_toggles: _, + } = cli; + + let Subcommand::Fork(ForkCommand { + target, + config_overrides, + }) = subcommand.expect("fork present") + else { + unreachable!() + }; + + finalize_fork_interactive(interactive, root_overrides, target, config_overrides) + } + fn sample_exit_info(conversation: Option<&str>) -> AppExitInfo { let token_usage = TokenUsage { output_tokens: 2, @@ -819,6 +881,15 @@ mod tests { assert!(interactive.resume_show_all); } + #[test] + fn fork_sets_target_and_disables_resume_controls() { + let interactive = fork_from_args(["codex", "fork", "saved"].as_ref()); + assert_eq!(interactive.fork_source.as_deref(), Some("saved")); + assert!(!interactive.resume_picker); + assert!(!interactive.resume_last); + assert!(interactive.resume_session_id.is_none()); + } + #[test] fn resume_merges_option_flags_and_full_auto() { let interactive = finalize_from_args( diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 4d9285dd24..ad8283169d 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -100,6 +100,8 @@ use crate::protocol::TurnDiffEvent; use crate::protocol::WarningEvent; use crate::rollout::RolloutRecorder; use crate::rollout::RolloutRecorderParams; +use crate::saved_sessions::build_saved_session_entry; +use crate::saved_sessions::upsert_saved_session; use crate::shell; use crate::state::ActiveTurn; use crate::state::SessionServices; @@ -134,6 +136,8 @@ use codex_protocol::user_input::UserInput; use codex_utils_readiness::Readiness; use codex_utils_readiness::ReadinessFlag; use codex_utils_tokenizer::warm_model_cache; +use reqwest::StatusCode; +use std::path::Path; /// The high-level interface to the Codex system. /// It operates as a queue pair where you send submissions and receive events. @@ -149,6 +153,7 @@ pub struct Codex { pub struct CodexSpawnOk { pub codex: Codex, pub conversation_id: ConversationId, + pub(crate) session: Arc, } pub(crate) const INITIAL_SUBMIT_ID: &str = ""; @@ -209,7 +214,8 @@ impl Codex { let conversation_id = session.conversation_id; // This task will run until Op::Shutdown is received. - tokio::spawn(submission_loop(session, config, rx_sub)); + let submission_session = Arc::clone(&session); + tokio::spawn(submission_loop(submission_session, config, rx_sub)); let codex = Codex { next_id: AtomicU64::new(0), tx_sub, @@ -219,6 +225,7 @@ impl Codex { Ok(CodexSpawnOk { codex, conversation_id, + session, }) } @@ -632,18 +639,68 @@ impl Session { } /// Ensure all rollout writes are durably flushed. - pub(crate) async fn flush_rollout(&self) { + pub(crate) async fn flush_rollout(&self) -> std::io::Result<()> { let recorder = { let guard = self.services.rollout.lock().await; guard.clone() }; - if let Some(rec) = recorder - && let Err(e) = rec.flush().await - { - warn!("failed to flush rollout recorder: {e}"); + if let Some(rec) = recorder { + rec.flush().await + } else { + Ok(()) + } + } + + pub(crate) async fn set_session_name(&self, name: Option) -> std::io::Result<()> { + let recorder = { + let guard = self.services.rollout.lock().await; + guard.clone() + }; + if let Some(rec) = recorder { + rec.set_session_name(name).await + } else { + Ok(()) } } + pub(crate) async fn rollout_path(&self) -> CodexResult { + let guard = self.services.rollout.lock().await; + let Some(rec) = guard.as_ref() else { + return Err(CodexErr::Fatal( + "Rollout recorder is not initialized; cannot save session.".to_string(), + )); + }; + Ok(rec.rollout_path.clone()) + } + + pub(crate) async fn model(&self) -> String { + let state = self.state.lock().await; + state.session_configuration.model.clone() + } + + pub(crate) async fn save_session( + &self, + codex_home: &Path, + name: &str, + ) -> CodexResult { + let trimmed = name.trim(); + if trimmed.is_empty() { + return Err(CodexErr::Fatal("Usage: /save ".to_string())); + } + let rollout_path = self.rollout_path().await?; + self.flush_rollout() + .await + .map_err(|e| CodexErr::Fatal(format!("failed to flush rollout recorder: {e}")))?; + self.set_session_name(Some(trimmed.to_string())) + .await + .map_err(|e| CodexErr::Fatal(format!("failed to write session name: {e}")))?; + let entry = + build_saved_session_entry(trimmed.to_string(), rollout_path, self.model().await) + .await?; + upsert_saved_session(codex_home, entry.clone()).await?; + Ok(entry) + } + fn next_internal_sub_id(&self) -> String { let id = self .next_internal_sub_id @@ -659,7 +716,9 @@ impl Session { let items = self.build_initial_context(&turn_context); self.record_conversation_items(&turn_context, &items).await; // Ensure initial items are visible to immediate readers (e.g., tests, forks). - self.flush_rollout().await; + if let Err(e) = self.flush_rollout().await { + warn!("failed to flush rollout recorder: {e}"); + } } InitialHistory::Resumed(_) | InitialHistory::Forked(_) => { let rollout_items = conversation_history.get_rollout_items(); @@ -703,10 +762,18 @@ impl Session { // If persisting, persist all rollout items as-is (recorder filters) if persist && !rollout_items.is_empty() { - self.persist_rollout_items(&rollout_items).await; + // Drop legacy SessionMeta lines from the source rollout so the forked + // session only contains its own SessionMeta written by the recorder. + let filtered = + InitialHistory::Forked(rollout_items.clone()).without_session_meta(); + if !filtered.is_empty() { + self.persist_rollout_items(&filtered).await; + } } // Flush after seeding history and any persisted rollout copy. - self.flush_rollout().await; + if let Err(e) = self.flush_rollout().await { + warn!("failed to flush rollout recorder: {e}"); + } } } } @@ -1425,6 +1492,9 @@ async fn submission_loop(sess: Arc, config: Arc, rx_sub: Receiv Op::Review { review_request } => { handlers::review(&sess, &config, sub.id.clone(), review_request).await; } + Op::SaveSession { name } => { + handlers::save_session(&sess, &config, sub.id.clone(), name).await; + } _ => {} // Ignore unknown ops; enum is non_exhaustive to allow extensions. } } @@ -1452,6 +1522,7 @@ mod handlers { use codex_protocol::protocol::Op; use codex_protocol::protocol::ReviewDecision; use codex_protocol::protocol::ReviewRequest; + use codex_protocol::protocol::SaveSessionResponseEvent; use codex_protocol::protocol::TurnAbortReason; use codex_protocol::user_input::UserInput; @@ -1673,6 +1744,38 @@ mod handlers { .await; } + pub async fn save_session( + sess: &Arc, + config: &Arc, + sub_id: String, + name: String, + ) { + match sess.save_session(&config.codex_home, &name).await { + Ok(entry) => { + let event = Event { + id: sub_id, + msg: EventMsg::SaveSessionResponse(SaveSessionResponseEvent { + name: entry.name, + rollout_path: entry.rollout_path, + conversation_id: entry.conversation_id, + }), + }; + sess.send_event_raw(event).await; + } + Err(err) => { + let message = format!("Failed to save session '{name}': {err}"); + let event = Event { + id: sub_id, + msg: EventMsg::Error(ErrorEvent { + message, + http_status_code: None, + }), + }; + sess.send_event_raw(event).await; + } + } + } + pub async fn shutdown(sess: &Arc, sub_id: String) -> bool { sess.abort_all_tasks(TurnAbortReason::Interrupted).await; info!("Shutting down Codex instance"); diff --git a/codex-rs/core/src/codex_conversation.rs b/codex-rs/core/src/codex_conversation.rs index 5bb9c97c5b..6c3d0ecd4f 100644 --- a/codex-rs/core/src/codex_conversation.rs +++ b/codex-rs/core/src/codex_conversation.rs @@ -1,22 +1,26 @@ use crate::codex::Codex; +use crate::codex::Session; use crate::error::Result as CodexResult; use crate::protocol::Event; use crate::protocol::Op; use crate::protocol::Submission; use std::path::PathBuf; +use std::sync::Arc; pub struct CodexConversation { codex: Codex, rollout_path: PathBuf, + session: Arc, } /// Conduit for the bidirectional stream of messages that compose a conversation /// in Codex. impl CodexConversation { - pub(crate) fn new(codex: Codex, rollout_path: PathBuf) -> Self { + pub(crate) fn new(codex: Codex, rollout_path: PathBuf, session: Arc) -> Self { Self { codex, rollout_path, + session, } } @@ -36,4 +40,24 @@ impl CodexConversation { pub fn rollout_path(&self) -> PathBuf { self.rollout_path.clone() } + + pub async fn flush_rollout(&self) -> CodexResult<()> { + Ok(self.session.flush_rollout().await?) + } + + pub async fn set_session_name(&self, name: Option) -> CodexResult<()> { + Ok(self.session.set_session_name(name).await?) + } + + pub async fn model(&self) -> String { + self.session.model().await + } + + pub async fn save_session( + &self, + codex_home: &std::path::Path, + name: &str, + ) -> CodexResult { + self.session.save_session(codex_home, name).await + } } diff --git a/codex-rs/core/src/conversation_manager.rs b/codex-rs/core/src/conversation_manager.rs index 8ffefd5679..986e41b057 100644 --- a/codex-rs/core/src/conversation_manager.rs +++ b/codex-rs/core/src/conversation_manager.rs @@ -3,6 +3,7 @@ use crate::CodexAuth; use crate::codex::Codex; use crate::codex::CodexSpawnOk; use crate::codex::INITIAL_SUBMIT_ID; +use crate::codex::Session; use crate::codex_conversation::CodexConversation; use crate::config::Config; use crate::error::CodexErr; @@ -11,6 +12,7 @@ use crate::protocol::Event; use crate::protocol::EventMsg; use crate::protocol::SessionConfiguredEvent; use crate::rollout::RolloutRecorder; +use crate::saved_sessions::resolve_rollout_path; use codex_protocol::ConversationId; use codex_protocol::items::TurnItem; use codex_protocol::models::ResponseItem; @@ -18,6 +20,7 @@ use codex_protocol::protocol::InitialHistory; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; use std::collections::HashMap; +use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::RwLock; @@ -56,6 +59,7 @@ impl ConversationManager { ) } + /// Start a brand new conversation with default initial history. pub async fn new_conversation(&self, config: Config) -> CodexResult { self.spawn_conversation(config, self.auth_manager.clone()) .await @@ -69,6 +73,7 @@ impl ConversationManager { let CodexSpawnOk { codex, conversation_id, + session, } = Codex::spawn( config, auth_manager, @@ -76,13 +81,14 @@ impl ConversationManager { self.session_source.clone(), ) .await?; - self.finalize_spawn(codex, conversation_id).await + self.finalize_spawn(codex, conversation_id, session).await } async fn finalize_spawn( &self, codex: Codex, conversation_id: ConversationId, + session: Arc, ) -> CodexResult { // The first event must be `SessionInitialized`. Validate and forward it // to the caller so that they can display it in the conversation @@ -101,6 +107,7 @@ impl ConversationManager { let conversation = Arc::new(CodexConversation::new( codex, session_configured.rollout_path.clone(), + session, )); self.conversations .write() @@ -125,6 +132,7 @@ impl ConversationManager { .ok_or_else(|| CodexErr::ConversationNotFound(conversation_id)) } + /// Resume a conversation from an on-disk rollout file. pub async fn resume_conversation_from_rollout( &self, config: Config, @@ -136,6 +144,23 @@ impl ConversationManager { .await } + /// Resume a conversation by saved-session name or rollout id string. + pub async fn resume_conversation_from_identifier( + &self, + config: Config, + identifier: &str, + auth_manager: Arc, + ) -> CodexResult { + let Some(path) = resolve_rollout_path(&config.codex_home, identifier).await? else { + return Err(CodexErr::Fatal(format!( + "No saved session or rollout found for '{identifier}'" + ))); + }; + self.resume_conversation_from_rollout(config, path, auth_manager) + .await + } + + /// Resume a conversation from provided rollout history items. pub async fn resume_conversation_with_history( &self, config: Config, @@ -145,6 +170,7 @@ impl ConversationManager { let CodexSpawnOk { codex, conversation_id, + session, } = Codex::spawn( config, auth_manager, @@ -152,7 +178,54 @@ impl ConversationManager { self.session_source.clone(), ) .await?; - self.finalize_spawn(codex, conversation_id).await + self.finalize_spawn(codex, conversation_id, session).await + } + + /// Fork a new conversation from the given rollout path. + pub async fn fork_from_rollout( + &self, + config: Config, + path: PathBuf, + auth_manager: Arc, + ) -> CodexResult { + let initial_history = RolloutRecorder::get_rollout_history(&path).await?; + let forked = match initial_history { + InitialHistory::Resumed(resumed) => InitialHistory::Forked(resumed.history), + InitialHistory::Forked(items) => InitialHistory::Forked(items), + InitialHistory::New => InitialHistory::New, + }; + self.resume_conversation_with_history(config, forked, auth_manager) + .await + } + + /// Fork a new conversation from a saved-session name or rollout id string. + pub async fn fork_from_identifier( + &self, + config: Config, + identifier: &str, + auth_manager: Arc, + ) -> CodexResult { + let Some(path) = resolve_rollout_path(&config.codex_home, identifier).await? else { + return Err(CodexErr::Fatal(format!( + "No saved session or rollout found for '{identifier}'" + ))); + }; + self.fork_from_rollout(config, path, auth_manager).await + } + + /// Persist a human-friendly session name and record it in saved_sessions.json. + pub async fn save_session( + &self, + conversation_id: ConversationId, + codex_home: &Path, + name: &str, + ) -> CodexResult { + let trimmed = name.trim(); + if trimmed.is_empty() { + return Err(CodexErr::Fatal("Usage: /save ".to_string())); + } + let conversation = self.get_conversation(conversation_id).await?; + conversation.save_session(codex_home, trimmed).await } /// Removes the conversation from the manager's internal map, though the @@ -185,9 +258,10 @@ impl ConversationManager { let CodexSpawnOk { codex, conversation_id, + session, } = Codex::spawn(config, auth_manager, history, self.session_source.clone()).await?; - self.finalize_spawn(codex, conversation_id).await + self.finalize_spawn(codex, conversation_id, session).await } } diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 2ae11d79d0..27d7cdaea3 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -66,6 +66,7 @@ mod openai_model_info; pub mod project_doc; mod rollout; pub(crate) mod safety; +pub mod saved_sessions; pub mod seatbelt; pub mod shell; pub mod spawn; @@ -83,6 +84,12 @@ pub use rollout::list::ConversationsPage; pub use rollout::list::Cursor; pub use rollout::list::parse_cursor; pub use rollout::list::read_head_for_summary; +pub use saved_sessions::SavedSessionEntry; +pub use saved_sessions::build_saved_session_entry; +pub use saved_sessions::list_saved_sessions; +pub use saved_sessions::resolve_rollout_path; +pub use saved_sessions::resolve_saved_session; +pub use saved_sessions::upsert_saved_session; mod function_tool; mod state; mod tasks; diff --git a/codex-rs/core/src/rollout/policy.rs b/codex-rs/core/src/rollout/policy.rs index 9e0e308362..6bbe20bdad 100644 --- a/codex-rs/core/src/rollout/policy.rs +++ b/codex-rs/core/src/rollout/policy.rs @@ -66,6 +66,7 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { | EventMsg::ExecApprovalRequest(_) | EventMsg::ApplyPatchApprovalRequest(_) | EventMsg::BackgroundEvent(_) + | EventMsg::SaveSessionResponse(_) | EventMsg::StreamError(_) | EventMsg::PatchApplyBegin(_) | EventMsg::PatchApplyEnd(_) diff --git a/codex-rs/core/src/rollout/recorder.rs b/codex-rs/core/src/rollout/recorder.rs index a39f85c823..47faf28a9e 100644 --- a/codex-rs/core/src/rollout/recorder.rs +++ b/codex-rs/core/src/rollout/recorder.rs @@ -11,6 +11,8 @@ use serde_json::Value; use time::OffsetDateTime; use time::format_description::FormatItem; use time::macros::format_description; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncSeekExt; use tokio::io::AsyncWriteExt; use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::{self}; @@ -70,6 +72,10 @@ enum RolloutCmd { Shutdown { ack: oneshot::Sender<()>, }, + SetName { + name: Option, + ack: oneshot::Sender>, + }, } impl RolloutRecorderParams { @@ -148,11 +154,14 @@ impl RolloutRecorder { instructions, source, model_provider: Some(config.model_provider_id.clone()), + name: None, }), ) } RolloutRecorderParams::Resume { path } => ( tokio::fs::OpenOptions::new() + .read(true) + .write(true) .append(true) .open(&path) .await?, @@ -196,6 +205,21 @@ impl RolloutRecorder { .map_err(|e| IoError::other(format!("failed to queue rollout items: {e}"))) } + /// Update the session name stored in the rollout's SessionMeta line. + pub async fn set_session_name(&self, name: Option) -> std::io::Result<()> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(RolloutCmd::SetName { name, ack: tx }) + .await + .map_err(|e| IoError::other(format!("failed to queue session name update: {e}")))?; + match rx.await { + Ok(result) => result, + Err(e) => Err(IoError::other(format!( + "failed waiting for session name update: {e}" + ))), + } + } + /// Flush all queued writes and wait until they are committed by the writer task. pub async fn flush(&self) -> std::io::Result<()> { let (tx, rx) = oneshot::channel(); @@ -334,6 +358,7 @@ fn create_log_file( let path = dir.join(filename); let file = std::fs::OpenOptions::new() + .read(true) .append(true) .create(true) .open(&path)?; @@ -389,6 +414,10 @@ async fn rollout_writer( RolloutCmd::Shutdown { ack } => { let _ = ack.send(()); } + RolloutCmd::SetName { name, ack } => { + let result = rewrite_session_meta_name(&mut writer.file, name).await; + let _ = ack.send(result); + } } } @@ -422,3 +451,232 @@ impl JsonlWriter { Ok(()) } } + +async fn rewrite_session_meta_name( + file: &mut tokio::fs::File, + name: Option, +) -> std::io::Result<()> { + use std::io::SeekFrom; + + file.flush().await?; + file.seek(SeekFrom::Start(0)).await?; + let mut contents = Vec::new(); + file.read_to_end(&mut contents).await?; + if contents.is_empty() { + return Err(IoError::other("empty rollout file")); + } + let newline_idx = contents + .iter() + .position(|&b| b == b'\n') + .ok_or_else(|| IoError::other("rollout missing newline after SessionMeta"))?; + let first_line = &contents[..newline_idx]; + let mut rollout_line: RolloutLine = serde_json::from_slice(first_line) + .map_err(|e| IoError::other(format!("failed to parse SessionMeta: {e}")))?; + let RolloutItem::SessionMeta(ref mut session_meta_line) = rollout_line.item else { + return Err(IoError::other("first rollout item is not SessionMeta")); + }; + session_meta_line.meta.name = name; + let mut updated = serde_json::to_vec(&rollout_line)?; + updated.push(b'\n'); + updated.extend_from_slice(&contents[newline_idx + 1..]); + file.set_len(0).await?; + file.seek(SeekFrom::Start(0)).await?; + file.write_all(&updated).await?; + file.flush().await?; + file.seek(SeekFrom::End(0)).await?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::rewrite_session_meta_name; + use codex_protocol::ConversationId; + use codex_protocol::models::ContentItem; + use codex_protocol::models::ResponseItem; + use codex_protocol::protocol::RolloutItem; + use codex_protocol::protocol::RolloutLine; + use codex_protocol::protocol::SessionMeta; + use codex_protocol::protocol::SessionMetaLine; + use tempfile::NamedTempFile; + use tokio::fs::OpenOptions; + use tokio::io::AsyncReadExt; + use tokio::io::AsyncSeekExt; + use tokio::io::AsyncWriteExt; + + fn sample_meta(name: Option<&str>) -> RolloutItem { + RolloutItem::SessionMeta(SessionMetaLine { + meta: SessionMeta { + id: ConversationId::from_string("00000000-0000-4000-8000-000000000001") + .expect("conversation id"), + timestamp: "2025-01-01T00:00:00.000Z".to_string(), + cwd: "/tmp".into(), + originator: "tester".to_string(), + cli_version: "1.0.0".to_string(), + instructions: None, + source: codex_protocol::protocol::SessionSource::Cli, + model_provider: Some("provider".to_string()), + name: name.map(str::to_string), + }, + git: None, + }) + } + + fn sample_line() -> RolloutLine { + RolloutLine { + timestamp: "2025-01-01T00:00:00.000Z".to_string(), + item: sample_meta(None), + } + } + + async fn write_rollout(lines: &[RolloutLine]) -> (NamedTempFile, tokio::fs::File) { + let temp = NamedTempFile::new().expect("temp file"); + let mut file = OpenOptions::new() + .read(true) + .write(true) + .truncate(true) + .create(true) + .open(temp.path()) + .await + .expect("open temp file"); + for line in lines { + let mut json = serde_json::to_vec(line).expect("serialize line"); + json.push(b'\n'); + file.write_all(&json).await.expect("write line"); + } + file.seek(std::io::SeekFrom::Start(0)) + .await + .expect("rewind"); + (temp, file) + } + + async fn read_first_line(path: &std::path::Path) -> RolloutLine { + let mut contents = String::new(); + let mut file = OpenOptions::new() + .read(true) + .open(path) + .await + .expect("open for read"); + file.read_to_string(&mut contents).await.expect("read file"); + let first = contents.lines().next().expect("first line"); + serde_json::from_str(first).expect("parse first line") + } + + #[tokio::test] + async fn updates_meta_name_and_preserves_rest() { + let events = vec![ + sample_line(), + RolloutLine { + timestamp: "2025-01-01T00:00:01.000Z".to_string(), + item: RolloutItem::ResponseItem(ResponseItem::Message { + id: None, + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: "hello".to_string(), + }], + }), + }, + ]; + let (temp, mut file) = write_rollout(&events).await; + + rewrite_session_meta_name(&mut file, Some("renamed".to_string())) + .await + .expect("rewrite ok"); + + let first = read_first_line(temp.path()).await; + let RolloutItem::SessionMeta(meta_line) = first.item else { + panic!("expected SessionMeta line"); + }; + assert_eq!(meta_line.meta.name.as_deref(), Some("renamed")); + + let contents = tokio::fs::read_to_string(temp.path()) + .await + .expect("read file"); + let lines: Vec<_> = contents.lines().collect(); + assert_eq!(lines.len(), 2); + let parsed: RolloutLine = serde_json::from_str(lines[1]).expect("parse second line"); + let RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) = parsed.item + else { + panic!("expected response item"); + }; + assert_eq!(role, "assistant"); + assert_eq!( + content, + vec![ContentItem::OutputText { + text: "hello".to_string() + }] + ); + } + + #[tokio::test] + async fn clearing_name_sets_none() { + let mut first = sample_line(); + first.item = sample_meta(Some("existing")); + let (temp, mut file) = write_rollout(&[first]).await; + + rewrite_session_meta_name(&mut file, None) + .await + .expect("rewrite ok"); + + let first = read_first_line(temp.path()).await; + let RolloutItem::SessionMeta(meta_line) = first.item else { + panic!("expected SessionMeta line"); + }; + assert_eq!(meta_line.meta.name, None); + } + + #[tokio::test] + async fn errors_on_empty_file() { + let temp = NamedTempFile::new().expect("temp file"); + let mut file = OpenOptions::new() + .read(true) + .write(true) + .open(temp.path()) + .await + .expect("open temp file"); + let err = rewrite_session_meta_name(&mut file, Some("x".to_string())) + .await + .expect_err("expected error"); + assert!(format!("{err}").contains("empty rollout file")); + } + + #[tokio::test] + async fn errors_when_first_line_not_session_meta() { + let wrong = RolloutLine { + timestamp: "t".to_string(), + item: RolloutItem::ResponseItem(ResponseItem::Message { + id: None, + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: "hello".to_string(), + }], + }), + }; + let (_temp, mut file) = write_rollout(&[wrong]).await; + let err = rewrite_session_meta_name(&mut file, Some("x".to_string())) + .await + .expect_err("expected error"); + assert!(format!("{err}").contains("first rollout item is not SessionMeta")); + // ensure file pointer is rewound to end after failure paths + let pos = file + .seek(std::io::SeekFrom::Current(0)) + .await + .expect("seek"); + assert!(pos > 0); + } + + #[tokio::test] + async fn errors_when_missing_newline() { + let temp = NamedTempFile::new().expect("temp file"); + let mut file = OpenOptions::new() + .read(true) + .write(true) + .open(temp.path()) + .await + .expect("open temp file"); + file.write_all(b"no newline").await.expect("write"); + let err = rewrite_session_meta_name(&mut file, Some("x".to_string())) + .await + .expect_err("expected error"); + assert!(format!("{err}").contains("rollout missing newline after SessionMeta")); + } +} diff --git a/codex-rs/core/src/rollout/tests.rs b/codex-rs/core/src/rollout/tests.rs index f367782b12..1471f12370 100644 --- a/codex-rs/core/src/rollout/tests.rs +++ b/codex-rs/core/src/rollout/tests.rs @@ -594,6 +594,7 @@ async fn test_tail_includes_last_response_items() -> Result<()> { cli_version: "test_version".into(), source: SessionSource::VSCode, model_provider: Some("test-provider".into()), + name: None, }, git: None, }), @@ -687,6 +688,7 @@ async fn test_tail_handles_short_sessions() -> Result<()> { cli_version: "test_version".into(), source: SessionSource::VSCode, model_provider: Some("test-provider".into()), + name: None, }, git: None, }), @@ -781,6 +783,7 @@ async fn test_tail_skips_trailing_non_responses() -> Result<()> { cli_version: "test_version".into(), source: SessionSource::VSCode, model_provider: Some("test-provider".into()), + name: None, }, git: None, }), diff --git a/codex-rs/core/src/saved_sessions.rs b/codex-rs/core/src/saved_sessions.rs new file mode 100644 index 0000000000..775d537c92 --- /dev/null +++ b/codex-rs/core/src/saved_sessions.rs @@ -0,0 +1,144 @@ +use crate::error::Result; +use crate::find_conversation_path_by_id_str; +use crate::rollout::list::read_head_for_summary; +use codex_protocol::ConversationId; +use codex_protocol::protocol::SessionMetaLine; +use serde::Deserialize; +use serde::Serialize; +use std::collections::BTreeMap; +use std::io::Error as IoError; +use std::io::ErrorKind; +use std::path::Path; +use std::path::PathBuf; +use time::OffsetDateTime; +use time::format_description::well_known::Rfc3339; +use tracing::warn; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct SavedSessionEntry { + pub name: String, + pub conversation_id: ConversationId, + pub rollout_path: PathBuf, + pub cwd: PathBuf, + pub model: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub model_provider: Option, + pub saved_at: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub created_at: Option, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct SavedSessionsFile { + #[serde(default)] + entries: BTreeMap, +} + +fn saved_sessions_path(codex_home: &Path) -> PathBuf { + codex_home.join("saved_sessions.json") +} + +async fn load_saved_sessions_file(path: &Path) -> Result { + match tokio::fs::read_to_string(path).await { + Ok(text) => { + let parsed = serde_json::from_str(&text) + .map_err(|e| IoError::other(format!("failed to parse saved sessions: {e}")))?; + Ok(parsed) + } + Err(err) if err.kind() == ErrorKind::NotFound => Ok(SavedSessionsFile::default()), + Err(err) => Err(err.into()), + } +} + +async fn write_saved_sessions_file(path: &Path, file: &SavedSessionsFile) -> Result<()> { + if let Some(parent) = path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + let json = serde_json::to_string_pretty(file) + .map_err(|e| IoError::other(format!("failed to serialize saved sessions: {e}")))?; + let tmp_path = path.with_extension("json.tmp"); + tokio::fs::write(&tmp_path, json).await?; + tokio::fs::rename(tmp_path, path).await?; + Ok(()) +} + +/// Create a new entry from the rollout's SessionMeta line. +pub async fn build_saved_session_entry( + name: String, + rollout_path: PathBuf, + model: String, +) -> Result { + let head = read_head_for_summary(&rollout_path).await?; + let first = head.first().ok_or_else(|| { + IoError::other(format!( + "rollout at {} has no SessionMeta", + rollout_path.display() + )) + })?; + let SessionMetaLine { mut meta, .. } = serde_json::from_value::(first.clone()) + .map_err(|e| IoError::other(format!("failed to parse SessionMeta: {e}")))?; + meta.name = Some(name.clone()); + let saved_at = OffsetDateTime::now_utc() + .format(&Rfc3339) + .map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?; + let created_at = if meta.timestamp.is_empty() { + None + } else { + Some(meta.timestamp.clone()) + }; + Ok(SavedSessionEntry { + name, + conversation_id: meta.id, + rollout_path, + cwd: meta.cwd, + model, + model_provider: meta.model_provider, + saved_at, + created_at, + }) +} + +/// Insert or replace a saved session entry in `saved_sessions.json`. +pub async fn upsert_saved_session(codex_home: &Path, entry: SavedSessionEntry) -> Result<()> { + let path = saved_sessions_path(codex_home); + let mut file = load_saved_sessions_file(&path).await?; + file.entries.insert(entry.name.clone(), entry); + write_saved_sessions_file(&path, &file).await +} + +/// Lookup a saved session by name, if present. +pub async fn resolve_saved_session( + codex_home: &Path, + name: &str, +) -> Result> { + let path = saved_sessions_path(codex_home); + let file = load_saved_sessions_file(&path).await?; + Ok(file.entries.get(name).cloned()) +} + +/// Return all saved sessions ordered by newest `saved_at` first. +pub async fn list_saved_sessions(codex_home: &Path) -> Result> { + let path = saved_sessions_path(codex_home); + let file = load_saved_sessions_file(&path).await?; + let mut entries: Vec = file.entries.values().cloned().collect(); + entries.sort_by(|a, b| b.saved_at.cmp(&a.saved_at)); + Ok(entries) +} + +/// Resolve a rollout path from either a saved-session name or rollout id string. +/// Returns `Ok(None)` when nothing matches. +pub async fn resolve_rollout_path(codex_home: &Path, identifier: &str) -> Result> { + if let Some(entry) = resolve_saved_session(codex_home, identifier).await? { + if entry.rollout_path.exists() { + return Ok(Some(entry.rollout_path)); + } + warn!( + "saved session '{}' points to missing rollout at {}", + identifier, + entry.rollout_path.display() + ); + } + Ok(find_conversation_path_by_id_str(codex_home, identifier).await?) +} diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index 9bda02c34e..a6258fd3a0 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -128,7 +128,9 @@ impl Session { task_cancellation_token.child_token(), ) .await; - session_ctx.clone_session().flush_rollout().await; + if let Err(e) = session_ctx.clone_session().flush_rollout().await { + tracing::warn!("failed to flush rollout recorder: {e}"); + } if !task_cancellation_token.is_cancelled() { // Emit completion uniformly from spawn site so all tasks share the same lifecycle. let sess = session_ctx.clone_session(); diff --git a/codex-rs/core/tests/suite/mod.rs b/codex-rs/core/tests/suite/mod.rs index 60b828b1c3..042e96d907 100644 --- a/codex-rs/core/tests/suite/mod.rs +++ b/codex-rs/core/tests/suite/mod.rs @@ -45,6 +45,7 @@ mod resume; mod review; mod rmcp_client; mod rollout_list_find; +mod saved_sessions; mod seatbelt; mod shell_serialization; mod stream_error_allows_next_turn; diff --git a/codex-rs/core/tests/suite/saved_sessions.rs b/codex-rs/core/tests/suite/saved_sessions.rs new file mode 100644 index 0000000000..99c5fc2eaa --- /dev/null +++ b/codex-rs/core/tests/suite/saved_sessions.rs @@ -0,0 +1,457 @@ +#![allow(clippy::expect_used)] +use anyhow::Result; +use codex_core::AuthManager; +use codex_core::CodexAuth; +use codex_core::CodexConversation; +use codex_core::ConversationManager; +use codex_core::SavedSessionEntry; +use codex_core::build_saved_session_entry; +use codex_core::config::Config; +use codex_core::protocol::EventMsg; +use codex_core::protocol::Op; +use codex_core::protocol::RolloutItem; +use codex_core::protocol::RolloutLine; +use codex_core::protocol::SaveSessionResponseEvent; +use codex_core::protocol::SessionSource; +use codex_core::resolve_saved_session; +use codex_core::upsert_saved_session; +use codex_protocol::user_input::UserInput; +use core_test_support::responses::ev_assistant_message; +use core_test_support::responses::ev_completed; +use core_test_support::responses::ev_response_created; +use core_test_support::responses::mount_sse_sequence; +use core_test_support::responses::sse; +use core_test_support::responses::start_mock_server; +use core_test_support::skip_if_no_network; +use core_test_support::test_codex::test_codex; +use core_test_support::wait_for_event; +use core_test_support::wait_for_event_match; +use pretty_assertions::assert_eq; +use serde_json::json; +use std::path::Path; +use std::sync::Arc; + +fn completion_body(idx: usize, message: &str) -> String { + let resp_id = format!("resp-{idx}"); + let msg_id = format!("msg-{idx}"); + sse(vec![ + ev_response_created(&resp_id), + ev_assistant_message(&msg_id, message), + ev_completed(&resp_id), + ]) +} + +fn rollout_lines(path: &Path) -> Vec { + let text = std::fs::read_to_string(path).expect("read rollout"); + text.lines() + .filter_map(|line| { + if line.trim().is_empty() { + return None; + } + let value: serde_json::Value = serde_json::from_str(line).expect("rollout line json"); + Some(serde_json::from_value::(value).expect("rollout line")) + }) + .collect() +} + +fn rollout_items_without_meta(path: &Path) -> Vec { + rollout_lines(path) + .into_iter() + .filter_map(|line| match line.item { + RolloutItem::SessionMeta(_) => None, + other => Some(other), + }) + .collect() +} + +fn session_meta_count(path: &Path) -> usize { + rollout_lines(path) + .iter() + .filter(|line| matches!(line.item, RolloutItem::SessionMeta(_))) + .count() +} + +async fn submit_text(codex: &Arc, text: &str) -> Result<()> { + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: text.to_string(), + }], + }) + .await?; + let _ = wait_for_event(codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + Ok(()) +} + +async fn save_session( + name: &str, + codex: &Arc, + config: &Config, +) -> Result { + codex.flush_rollout().await?; + codex.set_session_name(Some(name.to_string())).await?; + let entry = + build_saved_session_entry(name.to_string(), codex.rollout_path(), codex.model().await) + .await?; + upsert_saved_session(&config.codex_home, entry.clone()).await?; + Ok(entry) +} + +async fn save_session_via_op( + codex: &Arc, + name: &str, +) -> Result { + codex + .submit(Op::SaveSession { + name: name.to_string(), + }) + .await?; + let response: SaveSessionResponseEvent = wait_for_event_match(codex, |ev| match ev { + EventMsg::SaveSessionResponse(resp) => Some(resp.clone()), + _ => None, + }) + .await; + Ok(response) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn save_and_resume_by_name() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + mount_sse_sequence(&server, vec![completion_body(1, "initial")]).await; + + let mut builder = test_codex(); + let initial = builder.build(&server).await?; + submit_text(&initial.codex, "first turn").await?; + + let name = "alpha"; + let entry = save_session(name, &initial.codex, &initial.config).await?; + let resolved = resolve_saved_session(&initial.config.codex_home, name) + .await? + .expect("saved session"); + assert_eq!(entry, resolved); + assert_eq!(session_meta_count(&entry.rollout_path), 1); + + let saved_items = rollout_items_without_meta(&entry.rollout_path); + + let resumed = builder + .resume(&server, initial.home.clone(), entry.rollout_path.clone()) + .await?; + assert_eq!(resumed.session_configured.session_id, entry.conversation_id); + let resumed_items = rollout_items_without_meta(&resumed.session_configured.rollout_path); + + assert_eq!( + serde_json::to_value(saved_items)?, + serde_json::to_value(resumed_items)? + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn save_session_op_persists_and_emits_response() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + mount_sse_sequence(&server, vec![completion_body(1, "initial")]).await; + + let mut builder = test_codex(); + let initial = builder.build(&server).await?; + submit_text(&initial.codex, "first turn").await?; + + let name = "via-op"; + let response = save_session_via_op(&initial.codex, name).await?; + + assert_eq!(response.name, name); + assert_eq!( + response.conversation_id, + initial.session_configured.session_id + ); + assert!(response.rollout_path.exists()); + + let resolved = resolve_saved_session(&initial.config.codex_home, name) + .await? + .expect("saved session"); + assert_eq!(resolved.rollout_path, response.rollout_path); + assert_eq!(resolved.conversation_id, response.conversation_id); + assert_eq!(session_meta_count(&resolved.rollout_path), 1); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn fork_from_identifier_after_save_op() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + mount_sse_sequence( + &server, + vec![ + completion_body(1, "seed"), + completion_body(2, "fork-extra-1"), + completion_body(3, "fork-extra-2"), + ], + ) + .await; + + let mut builder = test_codex(); + let initial = builder.build(&server).await?; + submit_text(&initial.codex, "seeded").await?; + + let name = "forkable-op"; + let response = save_session_via_op(&initial.codex, name).await?; + + let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy")); + let conversation_manager = ConversationManager::new(auth_manager.clone(), SessionSource::Exec); + let forked = conversation_manager + .fork_from_identifier(initial.config.clone(), name, auth_manager) + .await?; + + assert_ne!( + forked.session_configured.session_id, + response.conversation_id + ); + + // Record the baseline rollout for the saved session. + let base_items = rollout_items_without_meta(&response.rollout_path); + + // Send additional turns to the forked conversation and flush. + submit_text(&forked.conversation, "fork one").await?; + submit_text(&forked.conversation, "fork two").await?; + forked.conversation.flush_rollout().await?; + + // Re-read both rollouts: source should remain unchanged. + let base_after = rollout_items_without_meta(&response.rollout_path); + assert_eq!( + serde_json::to_value(&base_items)?, + serde_json::to_value(&base_after)? + ); + + // Forked rollout should extend the baseline. + let fork_items = rollout_items_without_meta(&forked.conversation.rollout_path()); + assert!( + fork_items.len() > base_items.len(), + "expected forked rollout to contain additional items" + ); + let fork_prefix: Vec<_> = fork_items.iter().take(base_items.len()).cloned().collect(); + assert_eq!( + serde_json::to_value(&base_items)?, + serde_json::to_value(&fork_prefix)?, + "forked rollout should extend the baseline history" + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn save_and_fork_by_name() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + mount_sse_sequence(&server, vec![completion_body(1, "base")]).await; + + let mut builder = test_codex(); + let initial = builder.build(&server).await?; + submit_text(&initial.codex, "original").await?; + + let entry = save_session("forkable", &initial.codex, &initial.config).await?; + + let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy")); + let conversation_manager = ConversationManager::new(auth_manager.clone(), SessionSource::Exec); + let forked = conversation_manager + .fork_from_rollout( + initial.config.clone(), + entry.rollout_path.clone(), + auth_manager, + ) + .await?; + + assert_ne!(forked.session_configured.session_id, entry.conversation_id); + assert_ne!(forked.conversation.rollout_path(), entry.rollout_path); + assert_eq!(session_meta_count(&forked.conversation.rollout_path()), 1); + + let base_items = rollout_items_without_meta(&entry.rollout_path); + let fork_items = rollout_items_without_meta(&forked.conversation.rollout_path()); + assert_eq!( + serde_json::to_value(base_items)?, + serde_json::to_value(fork_items)? + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn forked_messages_do_not_touch_original() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + mount_sse_sequence( + &server, + vec![ + completion_body(1, "base"), + completion_body(2, "fork-1"), + completion_body(3, "fork-2"), + ], + ) + .await; + + let mut builder = test_codex(); + let initial = builder.build(&server).await?; + submit_text(&initial.codex, "first").await?; + + let entry = save_session("branch", &initial.codex, &initial.config).await?; + let baseline_items = rollout_items_without_meta(&entry.rollout_path); + + let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy")); + let conversation_manager = ConversationManager::new(auth_manager.clone(), SessionSource::Exec); + let forked = conversation_manager + .fork_from_rollout( + initial.config.clone(), + entry.rollout_path.clone(), + auth_manager.clone(), + ) + .await?; + + submit_text(&forked.conversation, "fork message one").await?; + submit_text(&forked.conversation, "fork message two").await?; + + let resumed = builder + .resume(&server, initial.home.clone(), entry.rollout_path.clone()) + .await?; + let resumed_items = rollout_items_without_meta(&resumed.session_configured.rollout_path); + + assert_eq!( + serde_json::to_value(baseline_items.clone())?, + serde_json::to_value(resumed_items)? + ); + assert_eq!( + serde_json::to_value(baseline_items)?, + serde_json::to_value(rollout_items_without_meta(&entry.rollout_path))? + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn resumed_messages_are_present_in_new_fork() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + mount_sse_sequence( + &server, + vec![ + completion_body(1, "original"), + completion_body(2, "fork-extra"), + completion_body(3, "resumed-extra"), + ], + ) + .await; + + let mut builder = test_codex(); + let initial = builder.build(&server).await?; + submit_text(&initial.codex, "start").await?; + + let entry = save_session("seed", &initial.codex, &initial.config).await?; + + let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy")); + let conversation_manager = ConversationManager::new(auth_manager.clone(), SessionSource::Exec); + let forked = conversation_manager + .fork_from_rollout( + initial.config.clone(), + entry.rollout_path.clone(), + auth_manager.clone(), + ) + .await?; + submit_text(&forked.conversation, "fork only").await?; + + let resumed = builder + .resume(&server, initial.home.clone(), entry.rollout_path.clone()) + .await?; + submit_text(&resumed.codex, "resumed addition").await?; + resumed.codex.flush_rollout().await?; + let updated_base_items = rollout_items_without_meta(&entry.rollout_path); + + let fork_again = conversation_manager + .fork_from_rollout( + initial.config.clone(), + entry.rollout_path.clone(), + auth_manager, + ) + .await?; + let fork_again_items = rollout_items_without_meta(&fork_again.conversation.rollout_path()); + assert_eq!( + serde_json::to_value(updated_base_items)?, + serde_json::to_value(fork_again_items)? + ); + assert_eq!( + session_meta_count(&fork_again.conversation.rollout_path()), + 1 + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn duplicate_name_overwrites_entry() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + mount_sse_sequence( + &server, + vec![completion_body(1, "one"), completion_body(2, "two")], + ) + .await; + + let mut builder = test_codex(); + let first = builder.build(&server).await?; + submit_text(&first.codex, "first session").await?; + let name = "shared"; + let entry_one = save_session(name, &first.codex, &first.config).await?; + + let second = builder.build(&server).await?; + submit_text(&second.codex, "second session").await?; + let entry_two = save_session(name, &second.codex, &second.config).await?; + + let resolved = resolve_saved_session(&second.config.codex_home, name) + .await? + .expect("latest entry present"); + assert_eq!(resolved, entry_two); + assert_ne!(resolved.conversation_id, entry_one.conversation_id); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn same_session_multiple_names() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + mount_sse_sequence(&server, vec![completion_body(1, "hello")]).await; + + let mut builder = test_codex(); + let session = builder.build(&server).await?; + submit_text(&session.codex, "save twice").await?; + + let entry_first = save_session("first", &session.codex, &session.config).await?; + let entry_second = save_session("second", &session.codex, &session.config).await?; + + let resolved_first = resolve_saved_session(&session.config.codex_home, "first") + .await? + .expect("first entry"); + let resolved_second = resolve_saved_session(&session.config.codex_home, "second") + .await? + .expect("second entry"); + + assert_eq!(entry_first.conversation_id, entry_second.conversation_id); + assert_eq!( + resolved_first.conversation_id, + resolved_second.conversation_id + ); + assert_eq!(resolved_first.rollout_path, resolved_second.rollout_path); + + let names: serde_json::Value = json!([entry_first.name, entry_second.name]); + assert_eq!(names, json!(["first", "second"])); + + Ok(()) +} diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index 1f007bbe0e..58d7a9c5fd 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -567,6 +567,7 @@ impl EventProcessor for EventProcessorWithHumanOutput { | EventMsg::ReasoningContentDelta(_) | EventMsg::ReasoningRawContentDelta(_) | EventMsg::UndoCompleted(_) + | EventMsg::SaveSessionResponse(_) | EventMsg::UndoStarted(_) => {} } CodexStatus::Running diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index 8dccb51250..2e0075643d 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -302,6 +302,7 @@ async fn run_codex_tool_session_inner( | EventMsg::UndoStarted(_) | EventMsg::UndoCompleted(_) | EventMsg::ExitedReviewMode(_) + | EventMsg::SaveSessionResponse(_) | EventMsg::DeprecationNotice(_) => { // For now, we do not do anything extra for these // events. Note that diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index f20e412831..a4eff723b6 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -102,6 +102,9 @@ pub enum Op { final_output_json_schema: Option, }, + /// Persist the current session under a user-provided name. + SaveSession { name: String }, + /// Override parts of the persistent turn context for subsequent turns. /// /// All fields are optional; when omitted, the existing value is preserved. @@ -513,6 +516,9 @@ pub enum EventMsg { BackgroundEvent(BackgroundEventEvent), + /// Result of a save-session request. + SaveSessionResponse(SaveSessionResponseEvent), + UndoStarted(UndoStartedEvent), UndoCompleted(UndoCompletedEvent), @@ -1036,6 +1042,13 @@ impl InitialHistory { } } + pub fn without_session_meta(&self) -> Vec { + self.get_rollout_items() + .into_iter() + .filter(|item| !matches!(item, RolloutItem::SessionMeta(_))) + .collect() + } + pub fn get_event_msgs(&self) -> Option> { match self { InitialHistory::New => None, @@ -1096,6 +1109,8 @@ pub struct SessionMeta { #[serde(default)] pub source: SessionSource, pub model_provider: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub name: Option, } impl Default for SessionMeta { @@ -1109,6 +1124,7 @@ impl Default for SessionMeta { instructions: None, source: SessionSource::default(), model_provider: None, + name: None, } } } @@ -1370,6 +1386,13 @@ pub struct StreamInfoEvent { pub message: String, } +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] +pub struct SaveSessionResponseEvent { + pub name: String, + pub rollout_path: PathBuf, + pub conversation_id: ConversationId, +} + #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] pub struct PatchApplyBeginEvent { /// Identifier so this can be paired with the PatchApplyEnd event. diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 7c86dd3b6e..749a145b75 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -7,6 +7,8 @@ use crate::diff_render::DiffSummary; use crate::exec_command::strip_bash_lc_and_escape; use crate::file_search::FileSearchManager; use crate::history_cell::HistoryCell; +#[cfg(not(debug_assertions))] +use crate::history_cell::UpdateAvailableHistoryCell; use crate::model_migration::ModelMigrationOutcome; use crate::model_migration::migration_copy_for_config; use crate::model_migration::run_model_migration_prompt; @@ -53,9 +55,6 @@ use std::time::Duration; use tokio::select; use tokio::sync::mpsc::unbounded_channel; -#[cfg(not(debug_assertions))] -use crate::history_cell::UpdateAvailableHistoryCell; - const GPT_5_1_MIGRATION_AUTH_MODES: [AuthMode; 2] = [AuthMode::ChatGPT, AuthMode::ApiKey]; const GPT_5_1_CODEX_MIGRATION_AUTH_MODES: [AuthMode; 1] = [AuthMode::ChatGPT]; @@ -296,6 +295,27 @@ impl App { resumed.session_configured, ) } + ResumeSelection::Fork(path) => { + let resumed = conversation_manager + .fork_from_rollout(config.clone(), path.clone(), auth_manager.clone()) + .await + .wrap_err_with(|| format!("Failed to fork session from {}", path.display()))?; + let init = crate::chatwidget::ChatWidgetInit { + config: config.clone(), + frame_requester: tui.frame_requester(), + app_event_tx: app_event_tx.clone(), + initial_prompt: initial_prompt.clone(), + initial_images: initial_images.clone(), + enhanced_keys_supported, + auth_manager: auth_manager.clone(), + feedback: feedback.clone(), + }; + ChatWidget::new_from_existing( + init, + resumed.conversation, + resumed.session_configured, + ) + } }; chat_widget.maybe_prompt_windows_sandbox_enable(); diff --git a/codex-rs/tui/src/bottom_pane/chat_composer.rs b/codex-rs/tui/src/bottom_pane/chat_composer.rs index 30727c298b..d956c70c4f 100644 --- a/codex-rs/tui/src/bottom_pane/chat_composer.rs +++ b/codex-rs/tui/src/bottom_pane/chat_composer.rs @@ -69,7 +69,10 @@ const LARGE_PASTE_CHAR_THRESHOLD: usize = 1000; #[derive(Debug, PartialEq)] pub enum InputResult { Submitted(String), - Command(SlashCommand), + Command { + command: SlashCommand, + args: Option, + }, None, } @@ -333,6 +336,19 @@ impl ChatComposer { PasteBurst::recommended_flush_delay() } + fn command_args_from_line(line: &str, command: SlashCommand) -> Option { + if let Some((name, rest)) = parse_slash_name(line) + && name == command.command() + { + let trimmed = rest.trim(); + if trimmed.is_empty() { + return None; + } + return Some(trimmed.to_string()); + } + None + } + /// Integrate results from an asynchronous file search. pub(crate) fn on_file_search_result(&mut self, query: String, matches: Vec) { // Only apply if user is still editing a token starting with `query`. @@ -505,8 +521,9 @@ impl ChatComposer { if let Some(sel) = popup.selected_item() { match sel { CommandItem::Builtin(cmd) => { + let args = Self::command_args_from_line(first_line, cmd); self.textarea.set_text(""); - return (InputResult::Command(cmd), true); + return (InputResult::Command { command: cmd, args }, true); } CommandItem::UserPrompt(idx) => { if let Some(prompt) = popup.prompt(idx) { @@ -920,22 +937,21 @@ impl ChatComposer { modifiers: KeyModifiers::NONE, .. } => { - // If the first line is a bare built-in slash command (no args), - // dispatch it even when the slash popup isn't visible. This preserves - // the workflow: type a prefix ("/di"), press Tab to complete to - // "/diff ", then press Enter to run it. Tab moves the cursor beyond - // the '/name' token and our caret-based heuristic hides the popup, - // but Enter should still dispatch the command rather than submit - // literal text. + // If the first line is a built-in slash command, dispatch it even when + // the slash popup isn't visible. This preserves the workflow: + // type a prefix ("/di"), press Tab to complete to "/diff ", then press + // Enter to run it. Tab moves the cursor beyond the '/name' token and + // our caret-based heuristic hides the popup, but Enter should still + // dispatch the command rather than submit literal text. let first_line = self.textarea.text().lines().next().unwrap_or(""); - if let Some((name, rest)) = parse_slash_name(first_line) - && rest.is_empty() + if let Some((name, _rest)) = parse_slash_name(first_line) && let Some((_n, cmd)) = built_in_slash_commands() .into_iter() .find(|(n, _)| *n == name) { + let args = Self::command_args_from_line(first_line, cmd); self.textarea.set_text(""); - return (InputResult::Command(cmd), true); + return (InputResult::Command { command: cmd, args }, true); } // If we're in a paste-like burst capture, treat Enter as part of the burst // and accumulate it rather than submitting or inserting immediately. @@ -2399,8 +2415,9 @@ mod tests { // When a slash command is dispatched, the composer should return a // Command result (not submit literal text) and clear its textarea. match result { - InputResult::Command(cmd) => { + InputResult::Command { command: cmd, args } => { assert_eq!(cmd.command(), "init"); + assert!(args.is_none()); } InputResult::Submitted(text) => { panic!("expected command dispatch, but composer submitted literal text: {text}") @@ -2474,7 +2491,10 @@ mod tests { let (result, _needs_redraw) = composer.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE)); match result { - InputResult::Command(cmd) => assert_eq!(cmd.command(), "diff"), + InputResult::Command { command: cmd, args } => { + assert_eq!(cmd.command(), "diff"); + assert!(args.is_none()); + } InputResult::Submitted(text) => { panic!("expected command dispatch after Tab completion, got literal submit: {text}") } @@ -2483,6 +2503,42 @@ mod tests { assert!(composer.textarea.is_empty()); } + #[test] + fn slash_command_with_args_dispatches_and_preserves_args() { + use crossterm::event::KeyCode; + use crossterm::event::KeyEvent; + use crossterm::event::KeyModifiers; + + let (tx, _rx) = unbounded_channel::(); + let sender = AppEventSender::new(tx); + let mut composer = ChatComposer::new( + true, + sender, + false, + "Ask Codex to do anything".to_string(), + false, + ); + + composer.textarea.set_text("/save feature-one"); + + let (result, _needs_redraw) = + composer.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE)); + + match result { + InputResult::Command { command: cmd, args } => { + assert_eq!(cmd, SlashCommand::Save); + assert_eq!(args.as_deref(), Some("feature-one")); + } + InputResult::Submitted(text) => { + panic!( + "expected slash command dispatch, but composer submitted literal text: {text}" + ) + } + InputResult::None => panic!("expected Command result for '/save feature-one'"), + } + assert!(composer.textarea.is_empty()); + } + #[test] fn slash_mention_dispatches_command_and_inserts_at() { use crossterm::event::KeyCode; @@ -2505,8 +2561,9 @@ mod tests { composer.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE)); match result { - InputResult::Command(cmd) => { + InputResult::Command { command: cmd, args } => { assert_eq!(cmd.command(), "mention"); + assert!(args.is_none()); } InputResult::Submitted(text) => { panic!("expected command dispatch, but composer submitted literal text: {text}") diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index a5728ab14f..6ecb3891c7 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -1326,8 +1326,8 @@ impl ChatWidget { }; self.queue_user_message(user_message); } - InputResult::Command(cmd) => { - self.dispatch_command(cmd); + InputResult::Command { command: cmd, args } => { + self.dispatch_command(cmd, args); } InputResult::None => {} } @@ -1350,7 +1350,7 @@ impl ChatWidget { self.request_redraw(); } - fn dispatch_command(&mut self, cmd: SlashCommand) { + fn dispatch_command(&mut self, cmd: SlashCommand, args: Option) { if !cmd.available_during_task() && self.bottom_pane.is_task_running() { let message = format!( "'/{}' is disabled while a task is in progress.", @@ -1371,6 +1371,9 @@ impl ChatWidget { SlashCommand::New => { self.app_event_tx.send(AppEvent::NewSession); } + SlashCommand::Save => { + self.handle_save_command(args); + } SlashCommand::Init => { let init_target = self.config.cwd.join(DEFAULT_PROJECT_DOC_FILENAME); if init_target.exists() { @@ -1488,6 +1491,31 @@ impl ChatWidget { } } + fn handle_save_command(&mut self, args: Option) { + let Some(name_raw) = args else { + self.add_to_history(history_cell::new_error_event( + "Usage: /save ".to_string(), + )); + return; + }; + let name = name_raw.trim(); + if name.is_empty() { + self.add_to_history(history_cell::new_error_event( + "Usage: /save ".to_string(), + )); + return; + } + if self.conversation_id.is_none() { + self.add_to_history(history_cell::new_error_event( + "Session is not ready yet; try /save again in a moment.".to_string(), + )); + return; + } + self.app_event_tx + .send(AppEvent::CodexOp(Op::SaveSession { name: name.into() })); + self.add_info_message(format!("Saving session '{name}'..."), None); + } + pub(crate) fn handle_paste(&mut self, text: String) { self.bottom_pane.handle_paste(text); } @@ -1658,6 +1686,12 @@ impl ChatWidget { EventMsg::Error(ErrorEvent { message }) => self.on_error(message), EventMsg::McpStartupUpdate(ev) => self.on_mcp_startup_update(ev), EventMsg::McpStartupComplete(ev) => self.on_mcp_startup_complete(ev), + EventMsg::SaveSessionResponse(ev) => { + self.add_info_message( + format!("Saved session '{}' ({}).", ev.name, ev.conversation_id), + Some(format!("Rollout: {}", ev.rollout_path.display())), + ); + } EventMsg::TurnAborted(ev) => match ev.reason { TurnAbortReason::Interrupted => { self.on_interrupted_turn(ev.reason); diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index 1393e52886..840d3f5d4a 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -1049,7 +1049,7 @@ fn slash_init_skips_when_project_doc_exists() { std::fs::write(&existing_path, "existing instructions").unwrap(); chat.config.cwd = tempdir.path().to_path_buf(); - chat.dispatch_command(SlashCommand::Init); + chat.dispatch_command(SlashCommand::Init, None); match op_rx.try_recv() { Err(TryRecvError::Empty) => {} @@ -1077,7 +1077,7 @@ fn slash_init_skips_when_project_doc_exists() { fn slash_quit_requests_exit() { let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(); - chat.dispatch_command(SlashCommand::Quit); + chat.dispatch_command(SlashCommand::Quit, None); assert_matches!(rx.try_recv(), Ok(AppEvent::ExitRequest)); } @@ -1086,7 +1086,7 @@ fn slash_quit_requests_exit() { fn slash_exit_requests_exit() { let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(); - chat.dispatch_command(SlashCommand::Exit); + chat.dispatch_command(SlashCommand::Exit, None); assert_matches!(rx.try_recv(), Ok(AppEvent::ExitRequest)); } @@ -1095,7 +1095,7 @@ fn slash_exit_requests_exit() { fn slash_undo_sends_op() { let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(); - chat.dispatch_command(SlashCommand::Undo); + chat.dispatch_command(SlashCommand::Undo, None); match rx.try_recv() { Ok(AppEvent::CodexOp(Op::Undo)) => {} @@ -1109,7 +1109,7 @@ fn slash_rollout_displays_current_path() { let rollout_path = PathBuf::from("/tmp/codex-test-rollout.jsonl"); chat.current_rollout_path = Some(rollout_path.clone()); - chat.dispatch_command(SlashCommand::Rollout); + chat.dispatch_command(SlashCommand::Rollout, None); let cells = drain_insert_history(&mut rx); assert_eq!(cells.len(), 1, "expected info message for rollout path"); @@ -1124,7 +1124,7 @@ fn slash_rollout_displays_current_path() { fn slash_rollout_handles_missing_path() { let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(); - chat.dispatch_command(SlashCommand::Rollout); + chat.dispatch_command(SlashCommand::Rollout, None); let cells = drain_insert_history(&mut rx); assert_eq!( @@ -1719,7 +1719,7 @@ fn feedback_selection_popup_snapshot() { let (mut chat, _rx, _op_rx) = make_chatwidget_manual(); // Open the feedback category selection popup via slash command. - chat.dispatch_command(SlashCommand::Feedback); + chat.dispatch_command(SlashCommand::Feedback, None); let popup = render_bottom_popup(&chat, 80); assert_snapshot!("feedback_selection_popup", popup); @@ -1797,7 +1797,7 @@ fn disabled_slash_command_while_task_running_snapshot() { chat.bottom_pane.set_task_running(true); // Dispatch a command that is unavailable while a task runs (e.g., /model) - chat.dispatch_command(SlashCommand::Model); + chat.dispatch_command(SlashCommand::Model, None); // Drain history and snapshot the rendered error line(s) let cells = drain_insert_history(&mut rx); diff --git a/codex-rs/tui/src/cli.rs b/codex-rs/tui/src/cli.rs index 2b19b4c064..8984cdc31d 100644 --- a/codex-rs/tui/src/cli.rs +++ b/codex-rs/tui/src/cli.rs @@ -32,6 +32,9 @@ pub struct Cli { #[clap(skip)] pub resume_show_all: bool, + #[clap(skip)] + pub fork_source: Option, + /// Model the agent should use. #[arg(long, short = 'm')] pub model: Option, diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index 33bd18c437..1dfa0a292b 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -19,9 +19,9 @@ use codex_core::config::ConfigOverrides; use codex_core::config::find_codex_home; use codex_core::config::load_config_as_toml_with_cli_overrides; use codex_core::config::resolve_oss_provider; -use codex_core::find_conversation_path_by_id_str; use codex_core::get_platform_sandbox; use codex_core::protocol::AskForApproval; +use codex_core::resolve_rollout_path; use codex_protocol::config_types::SandboxMode; use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use std::fs::OpenOptions; @@ -429,8 +429,29 @@ async fn run_ratatui_app( }; // Determine resume behavior: explicit id, then resume last, then picker. - let resume_selection = if let Some(id_str) = cli.resume_session_id.as_deref() { - match find_conversation_path_by_id_str(&config.codex_home, id_str).await? { + let resume_selection = if let Some(target) = cli.fork_source.as_deref() { + match resolve_rollout_path(&config.codex_home, target).await? { + Some(path) => resume_picker::ResumeSelection::Fork(path), + None => { + error!("Error finding conversation path: {target}"); + restore(); + session_log::log_session_end(); + let _ = tui.terminal.clear(); + if let Err(err) = writeln!( + std::io::stdout(), + "No saved session or rollout found for '{target}'. Run `codex resume` without a name to choose from existing sessions." + ) { + error!("Failed to write resume error message: {err}"); + } + return Ok(AppExitInfo { + token_usage: codex_core::protocol::TokenUsage::default(), + conversation_id: None, + update_action: None, + }); + } + } + } else if let Some(id_str) = cli.resume_session_id.as_deref() { + match resolve_rollout_path(&config.codex_home, id_str).await? { Some(path) => resume_picker::ResumeSelection::Resume(path), None => { error!("Error finding conversation path: {id_str}"); @@ -439,7 +460,7 @@ async fn run_ratatui_app( let _ = tui.terminal.clear(); if let Err(err) = writeln!( std::io::stdout(), - "No saved session found with ID {id_str}. Run `codex resume` without an ID to choose from existing sessions." + "No saved session or rollout found for '{id_str}'. Run `codex resume` without a name to choose from existing sessions." ) { error!("Failed to write resume error message: {err}"); } diff --git a/codex-rs/tui/src/resume_picker.rs b/codex-rs/tui/src/resume_picker.rs index 6a21496d34..d7cc07116c 100644 --- a/codex-rs/tui/src/resume_picker.rs +++ b/codex-rs/tui/src/resume_picker.rs @@ -42,6 +42,7 @@ const LOAD_NEAR_THRESHOLD: usize = 5; pub enum ResumeSelection { StartFresh, Resume(PathBuf), + Fork(PathBuf), Exit, } diff --git a/codex-rs/tui/src/slash_command.rs b/codex-rs/tui/src/slash_command.rs index 969d279b07..494223b085 100644 --- a/codex-rs/tui/src/slash_command.rs +++ b/codex-rs/tui/src/slash_command.rs @@ -16,6 +16,7 @@ pub enum SlashCommand { Approvals, Review, New, + Save, Init, Compact, Undo, @@ -37,6 +38,7 @@ impl SlashCommand { match self { SlashCommand::Feedback => "send logs to maintainers", SlashCommand::New => "start a new chat during a conversation", + SlashCommand::Save => "save this session so it can be resumed later", SlashCommand::Init => "create an AGENTS.md file with instructions for Codex", SlashCommand::Compact => "summarize conversation to prevent hitting the context limit", SlashCommand::Review => "review my current changes and find issues", @@ -64,6 +66,7 @@ impl SlashCommand { pub fn available_during_task(self) -> bool { match self { SlashCommand::New + | SlashCommand::Save | SlashCommand::Init | SlashCommand::Compact | SlashCommand::Undo