Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 35 additions & 13 deletions codex-rs/utils/readiness/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,25 @@ impl Readiness for ReadinessFlag {
return Err(errors::ReadinessError::FlagAlreadyReady);
}

// Generate a token; ensure it's not 0.
let token = Token(self.next_id.fetch_add(1, Ordering::Relaxed));

// Recheck readiness while holding the lock so mark_ready can't flip the flag between the
// check above and inserting the token.
let inserted = self
// check above and inserting the token. Also ensure the token is non-zero and unique in
// the presence of `i32` wrap-around.
let token = self
.with_tokens(|tokens| {
if self.load_ready() {
return false;
return None;
}

loop {
let token = Token(self.next_id.fetch_add(1, Ordering::Relaxed));
if token.0 != 0 && tokens.insert(token) {
return Some(token);
}
}
tokens.insert(token);
true
})
.await?;

if !inserted {
return Err(errors::ReadinessError::FlagAlreadyReady);
}

Ok(token)
token.ok_or(errors::ReadinessError::FlagAlreadyReady)
}

async fn mark_ready(&self, token: Token) -> Result<bool, errors::ReadinessError> {
Expand Down Expand Up @@ -199,6 +198,7 @@ mod errors {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::Ordering;

use super::Readiness;
use super::ReadinessFlag;
Expand Down Expand Up @@ -289,4 +289,26 @@ mod tests {
.expect_err("contended subscribe should report a lock failure");
assert_matches!(err, ReadinessError::TokenLockFailed);
}

#[tokio::test]
async fn subscribe_skips_zero_token() -> Result<(), ReadinessError> {
let flag = ReadinessFlag::new();
flag.next_id.store(0, Ordering::Relaxed);

let token = flag.subscribe().await?;
assert_ne!(token, Token(0));
assert!(flag.mark_ready(token).await?);
Ok(())
}

#[tokio::test]
async fn subscribe_avoids_duplicate_tokens() -> Result<(), ReadinessError> {
let flag = ReadinessFlag::new();
let token = flag.subscribe().await?;
flag.next_id.store(token.0, Ordering::Relaxed);

let token2 = flag.subscribe().await?;
assert_ne!(token2, token);
Ok(())
}
}
Loading