diff --git a/codex-rs/utils/readiness/src/lib.rs b/codex-rs/utils/readiness/src/lib.rs index 8b92f9387a1..ff99c3df8e9 100644 --- a/codex-rs/utils/readiness/src/lib.rs +++ b/codex-rs/utils/readiness/src/lib.rs @@ -118,26 +118,25 @@ impl Readiness for ReadinessFlag { return Err(errors::ReadinessError::FlagAlreadyReady); } - // Generate a token; ensure it's not 0. - let token = Token(self.next_id.fetch_add(1, Ordering::Relaxed)); - // Recheck readiness while holding the lock so mark_ready can't flip the flag between the - // check above and inserting the token. - let inserted = self + // check above and inserting the token. Also ensure the token is non-zero and unique in + // the presence of `i32` wrap-around. + let token = self .with_tokens(|tokens| { if self.load_ready() { - return false; + return None; + } + + loop { + let token = Token(self.next_id.fetch_add(1, Ordering::Relaxed)); + if token.0 != 0 && tokens.insert(token) { + return Some(token); + } } - tokens.insert(token); - true }) .await?; - if !inserted { - return Err(errors::ReadinessError::FlagAlreadyReady); - } - - Ok(token) + token.ok_or(errors::ReadinessError::FlagAlreadyReady) } async fn mark_ready(&self, token: Token) -> Result { @@ -199,6 +198,7 @@ mod errors { #[cfg(test)] mod tests { use std::sync::Arc; + use std::sync::atomic::Ordering; use super::Readiness; use super::ReadinessFlag; @@ -289,4 +289,26 @@ mod tests { .expect_err("contended subscribe should report a lock failure"); assert_matches!(err, ReadinessError::TokenLockFailed); } + + #[tokio::test] + async fn subscribe_skips_zero_token() -> Result<(), ReadinessError> { + let flag = ReadinessFlag::new(); + flag.next_id.store(0, Ordering::Relaxed); + + let token = flag.subscribe().await?; + assert_ne!(token, Token(0)); + assert!(flag.mark_ready(token).await?); + Ok(()) + } + + #[tokio::test] + async fn subscribe_avoids_duplicate_tokens() -> Result<(), ReadinessError> { + let flag = ReadinessFlag::new(); + let token = flag.subscribe().await?; + flag.next_id.store(token.0, Ordering::Relaxed); + + let token2 = flag.subscribe().await?; + assert_ne!(token2, token); + Ok(()) + } }