diff --git a/AGENTS.md b/AGENTS.md index e924ceeae1b..3c36e718e3c 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -52,6 +52,12 @@ See `codex-rs/tui/styles.md`. - If you need to indent wrapped lines, use the initial_indent / subsequent_indent options from RtOptions if you can, rather than writing custom logic. - If you have a list of lines and you need to prefix them all with some prefix (optionally different on the first vs subsequent lines), use the `prefix_lines` helper from line_utils. +## Code + +### Coding conventions + +Keep functions relatively simple. Do not produce functions with dozens of return statements. Create helper functions if needed to keep code flow easy to understand. + ## Tests ### Snapshot tests diff --git a/codex-rs/core/src/auth.rs b/codex-rs/core/src/auth.rs index 96714e3f74b..59e5f868e1c 100644 --- a/codex-rs/core/src/auth.rs +++ b/codex-rs/core/src/auth.rs @@ -100,6 +100,14 @@ impl From for std::io::Error { } impl CodexAuth { + /// Workspace/account identifier associated with this auth snapshot, derived from the cached + /// ID token claims. This is used to prevent accidentally adopting credentials for a different + /// identity when multiple Codex instances share a credential store. + pub fn chatgpt_account_id(&self) -> Option { + self.get_current_token_data() + .and_then(|t| t.id_token.chatgpt_account_id) + } + pub async fn refresh_token(&self) -> Result { tracing::info!("Refreshing token"); @@ -719,6 +727,165 @@ mod tests { assert_eq!(auth, None); } + #[tokio::test] + async fn sync_from_storage_applies_updated_tokens_for_matching_identity() { + let codex_home = tempdir().unwrap(); + let jwt = write_auth_file( + AuthFileParams { + openai_api_key: None, + chatgpt_plan_type: "pro".to_string(), + chatgpt_account_id: Some("acct_123".to_string()), + }, + codex_home.path(), + ) + .expect("seed auth"); + + let manager = AuthManager::new( + codex_home.path().to_path_buf(), + false, + AuthCredentialsStoreMode::File, + ); + let expected = manager.auth().expect("auth should be loaded"); + + let updated = AuthDotJson { + openai_api_key: None, + tokens: Some(TokenData { + id_token: parse_id_token(&jwt).expect("jwt should parse"), + access_token: "new-access".to_string(), + refresh_token: "new-refresh".to_string(), + account_id: None, + }), + last_refresh: Some(Utc::now()), + }; + save_auth(codex_home.path(), &updated, AuthCredentialsStoreMode::File) + .expect("write updated auth"); + + let sync = manager + .sync_from_storage_for_request(&expected) + .await + .expect("sync should succeed"); + assert_eq!(sync, SyncFromStorageResult::Applied { changed: true }); + + let current = manager.auth().expect("auth should exist"); + let guard = current.auth_dot_json.lock().unwrap(); + let tokens = guard + .as_ref() + .and_then(|a| a.tokens.as_ref()) + .expect("tokens should exist"); + assert_eq!(tokens.access_token, "new-access"); + assert_eq!(tokens.refresh_token, "new-refresh"); + } + + #[tokio::test] + async fn sync_from_storage_returns_mismatch_for_different_identity() { + let codex_home = tempdir().unwrap(); + let _jwt_expected = write_auth_file( + AuthFileParams { + openai_api_key: None, + chatgpt_plan_type: "pro".to_string(), + chatgpt_account_id: Some("acct_123".to_string()), + }, + codex_home.path(), + ) + .expect("seed auth"); + + let manager = AuthManager::new( + codex_home.path().to_path_buf(), + false, + AuthCredentialsStoreMode::File, + ); + let expected = manager.auth().expect("auth should be loaded"); + + let jwt_other = write_auth_file( + AuthFileParams { + openai_api_key: None, + chatgpt_plan_type: "pro".to_string(), + chatgpt_account_id: Some("acct_other".to_string()), + }, + codex_home.path(), + ) + .expect("seed other auth"); + + let mismatched = AuthDotJson { + openai_api_key: None, + tokens: Some(TokenData { + id_token: parse_id_token(&jwt_other).expect("jwt should parse"), + access_token: "other-access".to_string(), + refresh_token: "other-refresh".to_string(), + account_id: None, + }), + last_refresh: Some(Utc::now()), + }; + save_auth( + codex_home.path(), + &mismatched, + AuthCredentialsStoreMode::File, + ) + .expect("write mismatched auth"); + + let sync = manager + .sync_from_storage_for_request(&expected) + .await + .expect("sync should succeed"); + assert_eq!(sync, SyncFromStorageResult::IdentityMismatch); + } + + #[tokio::test] + async fn sync_from_storage_skips_when_identity_missing() { + let codex_home = tempdir().unwrap(); + write_auth_file( + AuthFileParams { + openai_api_key: None, + chatgpt_plan_type: "pro".to_string(), + chatgpt_account_id: None, + }, + codex_home.path(), + ) + .expect("seed auth"); + + let manager = AuthManager::new( + codex_home.path().to_path_buf(), + false, + AuthCredentialsStoreMode::File, + ); + let expected = manager.auth().expect("auth should be loaded"); + + let sync = manager + .sync_from_storage_for_request(&expected) + .await + .expect("sync should succeed"); + assert_eq!(sync, SyncFromStorageResult::SkippedMissingIdentity); + } + + #[tokio::test] + async fn sync_from_storage_reports_logged_out_when_storage_empty() { + let codex_home = tempdir().unwrap(); + write_auth_file( + AuthFileParams { + openai_api_key: None, + chatgpt_plan_type: "pro".to_string(), + chatgpt_account_id: Some("acct_123".to_string()), + }, + codex_home.path(), + ) + .expect("seed auth"); + + let manager = AuthManager::new( + codex_home.path().to_path_buf(), + false, + AuthCredentialsStoreMode::File, + ); + let expected = manager.auth().expect("auth should be loaded"); + + std::fs::remove_file(codex_home.path().join("auth.json")).expect("remove auth"); + + let sync = manager + .sync_from_storage_for_request(&expected) + .await + .expect("sync should succeed"); + assert_eq!(sync, SyncFromStorageResult::LoggedOut); + } + #[tokio::test] #[serial(codex_api_key)] async fn pro_account_with_no_api_key_uses_chatgpt_auth() { @@ -1052,14 +1219,15 @@ mod tests { } } -/// Central manager providing a single source of truth for auth.json derived +/// Central manager providing a single source of truth for auth storage derived /// authentication data. It loads once (or on preference change) and then /// hands out cloned `CodexAuth` values so the rest of the program has a /// consistent snapshot. /// -/// External modifications to `auth.json` will NOT be observed until -/// `reload()` is called explicitly. This matches the design goal of avoiding -/// different parts of the program seeing inconsistent auth data mid‑run. +/// External modifications to credentials in storage will generally NOT be +/// observed until `reload()` is called explicitly. One exception is the +/// token-refresh recovery path, which may consult storage in order to adopt +/// tokens rotated by another concurrently-running Codex instance. #[derive(Debug)] pub struct AuthManager { codex_home: PathBuf, @@ -1068,6 +1236,35 @@ pub struct AuthManager { auth_credentials_store_mode: AuthCredentialsStoreMode, } +/// Outcome of attempting to sync the currently cached ChatGPT credentials from storage. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum SyncFromStorageResult { + /// The request auth did not include a workspace identifier, so we cannot safely compare. + SkippedMissingIdentity, + /// No credentials were found in storage (user logged out). + LoggedOut, + /// Storage contains credentials for a different identity (workspace/account). + IdentityMismatch, + /// Storage credentials match identity and were applied to the in-memory snapshot. + Applied { changed: bool }, +} + +/// Per-request state used to coordinate a limited 401 recovery flow. +/// +/// This is intentionally managed by `AuthManager` so callers don't need to +/// understand the recovery stages. +#[derive(Default, Debug)] +pub(crate) struct UnauthorizedRecovery { + synced_from_storage: bool, + refreshed_token: bool, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum UnauthorizedRecoveryDecision { + Retry, + GiveUp, +} + impl AuthManager { /// Create a new manager loading the initial auth using the provided /// preferred auth method. Errors loading auth are swallowed; `auth()` will @@ -1174,7 +1371,7 @@ impl AuthManager { } /// Attempt to refresh the current auth token (if any). On success, reload - /// the auth state from disk so other components observe refreshed token. + /// the auth state from storage so other components observe refreshed token. /// If the token refresh fails in a permanent (non‑transient) way, logs out /// to clear invalid auth state. pub async fn refresh_token(&self) -> Result, RefreshTokenError> { @@ -1182,17 +1379,8 @@ impl AuthManager { Some(a) => a, None => return Ok(None), }; - match auth.refresh_token().await { - Ok(token) => { - // Reload to pick up persisted changes. - self.reload(); - Ok(Some(token)) - } - Err(e) => { - tracing::error!("Failed to refresh token: {}", e); - Err(e) - } - } + + self.refresh_token_for_request(&auth).await } /// Log out by deleting the on‑disk auth.json (if present). Returns Ok(true) @@ -1209,4 +1397,233 @@ impl AuthManager { pub fn get_auth_mode(&self) -> Option { self.auth().map(|a| a.mode) } + + pub(crate) async fn sync_from_storage_for_request( + &self, + expected: &CodexAuth, + ) -> std::io::Result { + let Some(expected_account_id) = expected.chatgpt_account_id() else { + return Ok(SyncFromStorageResult::SkippedMissingIdentity); + }; + + let storage = + create_auth_storage(self.codex_home.clone(), self.auth_credentials_store_mode); + let Some(stored) = load_auth_dot_json_with_retries(&storage).await? else { + // Ensure the cached auth reflects the logged-out state. + self.reload(); + return Ok(SyncFromStorageResult::LoggedOut); + }; + + // We only support ChatGPT in this sync path. If storage now points to an API key or is + // otherwise missing tokens, treat it as an identity mismatch for this request. + let Some(tokens) = stored.tokens.clone() else { + self.reload(); + return Ok(SyncFromStorageResult::IdentityMismatch); + }; + + let Some(stored_account_id) = tokens.id_token.chatgpt_account_id.as_deref() else { + // Cannot prove identity match. + return Ok(SyncFromStorageResult::SkippedMissingIdentity); + }; + + if stored_account_id != expected_account_id { + // Keep cached auth in sync for subsequent requests, but do not retry the in-flight + // request under a different identity. + self.reload(); + return Ok(SyncFromStorageResult::IdentityMismatch); + } + + let changed = if let Some(current) = self.auth() { + if let Ok(mut guard) = current.auth_dot_json.lock() { + let current_auth = guard.clone(); + *guard = Some(stored.clone()); + current_auth != Some(stored) + } else { + false + } + } else { + false + }; + + Ok(SyncFromStorageResult::Applied { changed }) + } + + pub(crate) async fn refresh_token_for_request( + &self, + expected: &CodexAuth, + ) -> Result, RefreshTokenError> { + if expected.mode != AuthMode::ChatGPT { + return Ok(None); + } + + let Some(auth) = self.auth() else { + return Ok(None); + }; + + let expected_account_id = expected.chatgpt_account_id(); + let Some(expected_account_id) = expected_account_id.as_deref() else { + // Cannot safely consult storage without a stable identity; fall back to current + // in-memory refresh behavior. + let token = auth.refresh_token().await?; + self.reload(); + return Ok(Some(token)); + }; + + let storage = + create_auth_storage(self.codex_home.clone(), self.auth_credentials_store_mode); + let Some(mut attempted_refresh_token) = + load_stored_refresh_token_if_identity_matches(&storage, expected_account_id).await? + else { + return Ok(None); + }; + + let mut retried = false; + loop { + let refresh_response = + try_refresh_token(attempted_refresh_token.clone(), &auth.client).await; + + match refresh_response { + Ok(refresh_response) => { + let updated = update_tokens( + &storage, + refresh_response.id_token, + refresh_response.access_token, + refresh_response.refresh_token, + ) + .await + .map_err(RefreshTokenError::from)?; + + if let Ok(mut auth_lock) = auth.auth_dot_json.lock() { + *auth_lock = Some(updated.clone()); + } + self.reload(); + + let access = updated + .tokens + .as_ref() + .map(|t| t.access_token.clone()) + .ok_or_else(|| { + RefreshTokenError::other_with_message( + "Token data is not available after refresh.", + ) + })?; + return Ok(Some(access)); + } + Err(RefreshTokenError::Permanent(failed)) + if !retried + && matches!( + failed.reason, + RefreshTokenFailedReason::Expired | RefreshTokenFailedReason::Exhausted + ) => + { + // Another instance may have refreshed and rotated the refresh token while we + // were attempting our refresh. Reload and retry once if the stored refresh + // token differs and identity still matches. + let Some(stored_refresh_token) = load_stored_refresh_token_if_identity_matches( + &storage, + expected_account_id, + ) + .await? + else { + return Ok(None); + }; + if stored_refresh_token != attempted_refresh_token { + attempted_refresh_token = stored_refresh_token; + retried = true; + continue; + } + + return Err(RefreshTokenError::Permanent(failed)); + } + Err(err) => return Err(err), + } + } + } + + pub(crate) async fn recover_from_unauthorized_for_request( + &self, + expected: &CodexAuth, + recovery: &mut UnauthorizedRecovery, + ) -> Result { + if recovery.refreshed_token { + return Ok(UnauthorizedRecoveryDecision::GiveUp); + } + + if expected.mode != AuthMode::ChatGPT { + return Ok(UnauthorizedRecoveryDecision::GiveUp); + } + + if !recovery.synced_from_storage { + let sync = self + .sync_from_storage_for_request(expected) + .await + .map_err(RefreshTokenError::Transient)?; + recovery.synced_from_storage = true; + match sync { + SyncFromStorageResult::Applied { changed } => { + tracing::debug!(changed, "synced ChatGPT credentials from storage after 401"); + Ok(UnauthorizedRecoveryDecision::Retry) + } + SyncFromStorageResult::SkippedMissingIdentity => { + Ok(UnauthorizedRecoveryDecision::Retry) + } + SyncFromStorageResult::LoggedOut | SyncFromStorageResult::IdentityMismatch => { + Ok(UnauthorizedRecoveryDecision::GiveUp) + } + } + } else { + match self.refresh_token_for_request(expected).await? { + Some(_) => { + recovery.refreshed_token = true; + Ok(UnauthorizedRecoveryDecision::Retry) + } + None => Ok(UnauthorizedRecoveryDecision::GiveUp), + } + } + } +} + +async fn load_stored_refresh_token_if_identity_matches( + storage: &Arc, + expected_account_id: &str, +) -> Result, RefreshTokenError> { + let Some(stored) = load_auth_dot_json_with_retries(storage) + .await + .map_err(RefreshTokenError::Transient)? + else { + return Ok(None); + }; + + let Some(tokens) = stored.tokens else { + return Ok(None); + }; + + if tokens.id_token.chatgpt_account_id.as_deref() != Some(expected_account_id) { + return Ok(None); + } + + Ok(Some(tokens.refresh_token)) +} + +async fn load_auth_dot_json_with_retries( + storage: &Arc, +) -> std::io::Result> { + // This primarily mitigates concurrent file writes where another process truncates and rewrites + // auth.json, which can cause transient JSON parse errors for readers. + const MAX_ATTEMPTS: usize = 3; + const BASE_DELAY_MS: u64 = 25; + + for attempt in 0..MAX_ATTEMPTS { + match storage.load() { + Ok(value) => return Ok(value), + Err(err) if err.kind() == ErrorKind::InvalidData && attempt + 1 < MAX_ATTEMPTS => { + let delay = Duration::from_millis(BASE_DELAY_MS * (attempt as u64 + 1)); + tokio::time::sleep(delay).await; + continue; + } + Err(err) => return Err(err), + } + } + + Ok(None) } diff --git a/codex-rs/core/src/auth/storage.rs b/codex-rs/core/src/auth/storage.rs index a238eb9c38e..133200674d5 100644 --- a/codex-rs/core/src/auth/storage.rs +++ b/codex-rs/core/src/auth/storage.rs @@ -7,6 +7,7 @@ use sha2::Sha256; use std::fmt::Debug; use std::fs::File; use std::fs::OpenOptions; +use std::io::ErrorKind; use std::io::Read; use std::io::Write; #[cfg(unix)] @@ -81,7 +82,8 @@ impl FileAuthStorage { let mut file = File::open(auth_file)?; let mut contents = String::new(); file.read_to_string(&mut contents)?; - let auth_dot_json: AuthDotJson = serde_json::from_str(&contents)?; + let auth_dot_json: AuthDotJson = serde_json::from_str(&contents) + .map_err(|err| std::io::Error::new(ErrorKind::InvalidData, err))?; Ok(auth_dot_json) } diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 11a3c5c65f3..415f4b4294f 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -17,7 +17,6 @@ use codex_api::TransportError; use codex_api::common::Reasoning; use codex_api::create_text_param_for_request; use codex_api::error::ApiError; -use codex_app_server_protocol::AuthMode; use codex_otel::otel_manager::OtelManager; use codex_protocol::ConversationId; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; @@ -153,7 +152,7 @@ impl ModelClient { let conversation_id = self.conversation_id.to_string(); let session_source = self.session_source.clone(); - let mut refreshed = false; + let mut recovery = crate::auth::UnauthorizedRecovery::default(); loop { let auth = auth_manager.as_ref().and_then(|m| m.auth()); let api_provider = self @@ -179,7 +178,7 @@ impl ModelClient { Err(ApiError::Transport(TransportError::Http { status, .. })) if status == StatusCode::UNAUTHORIZED => { - handle_unauthorized(status, &mut refreshed, &auth_manager, &auth).await?; + handle_unauthorized(status, &mut recovery, &auth_manager, &auth).await?; continue; } Err(err) => return Err(map_api_error(err)), @@ -242,7 +241,7 @@ impl ModelClient { let conversation_id = self.conversation_id.to_string(); let session_source = self.session_source.clone(); - let mut refreshed = false; + let mut recovery = crate::auth::UnauthorizedRecovery::default(); loop { let auth = auth_manager.as_ref().and_then(|m| m.auth()); let api_provider = self @@ -276,7 +275,7 @@ impl ModelClient { Err(ApiError::Transport(TransportError::Http { status, .. })) if status == StatusCode::UNAUTHORIZED => { - handle_unauthorized(status, &mut refreshed, &auth_manager, &auth).await?; + handle_unauthorized(status, &mut recovery, &auth_manager, &auth).await?; continue; } Err(err) => return Err(map_api_error(err)), @@ -485,22 +484,20 @@ where /// the mapped `CodexErr` is returned to the caller. async fn handle_unauthorized( status: StatusCode, - refreshed: &mut bool, + recovery: &mut crate::auth::UnauthorizedRecovery, auth_manager: &Option>, auth: &Option, ) -> Result<()> { - if *refreshed { - return Err(map_unauthorized_status(status)); - } - if let Some(manager) = auth_manager.as_ref() && let Some(auth) = auth.as_ref() - && auth.mode == AuthMode::ChatGPT { - match manager.refresh_token().await { - Ok(_) => { - *refreshed = true; - Ok(()) + match manager + .recover_from_unauthorized_for_request(auth, recovery) + .await + { + Ok(crate::auth::UnauthorizedRecoveryDecision::Retry) => Ok(()), + Ok(crate::auth::UnauthorizedRecoveryDecision::GiveUp) => { + Err(map_unauthorized_status(status)) } Err(RefreshTokenError::Permanent(failed)) => Err(CodexErr::RefreshTokenFailed(failed)), Err(RefreshTokenError::Transient(other)) => Err(CodexErr::Io(other)),