Skip to content

Commit d4ef15c

Browse files
authored
[TST] TOCTTOU test for log fork (#5793)
## Description of changes _Summarize the changes made by this PR._ - Improvements & Bug fixes - N/A - New functionality - Implements a TOCTTOU test for log forking - Create an empty log - Concurrently - Write to the log - Wait for (0-5ms), copy the log (copy is faster than write) - Check copied log ## Test plan _How are these changes tested?_ - [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Migration plan _Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?_ ## Observability plan _What is the plan to instrument and monitor this change?_ ## Documentation Changes _Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs section](https://github.com/chroma-core/chroma/tree/main/docs/docs.trychroma.com)?_
1 parent 54c2de9 commit d4ef15c

File tree

1 file changed

+148
-0
lines changed

1 file changed

+148
-0
lines changed
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
use std::sync::Arc;
2+
use tokio::sync::Barrier;
3+
4+
use chroma_storage::s3_client_for_test_with_new_bucket;
5+
6+
use wal3::{
7+
LogPosition, LogReader, LogReaderOptions, LogWriter, LogWriterOptions, Manifest,
8+
ThrottleOptions,
9+
};
10+
11+
#[tokio::test]
12+
async fn test_k8s_integration_85_copy_race_condition() {
13+
const DELAYS_MS: &[u64] = &[0, 1, 2, 3, 5];
14+
const ATTEMPTS_PER_DELAY: usize = 5;
15+
16+
let mut race_detected_count = 0;
17+
let mut total_attempts = 0;
18+
19+
for &delay_ms in DELAYS_MS {
20+
for attempt in 0..ATTEMPTS_PER_DELAY {
21+
total_attempts += 1;
22+
println!(
23+
"\n========== Delay: {}ms, Attempt {} ==========",
24+
delay_ms, attempt
25+
);
26+
if run_single_attempt(total_attempts, delay_ms).await {
27+
race_detected_count += 1;
28+
println!(
29+
"!!! Race condition detected with {}ms delay, attempt {} !!!",
30+
delay_ms, attempt
31+
);
32+
}
33+
}
34+
}
35+
36+
println!("\n========== SUMMARY ==========");
37+
println!(
38+
"Race condition detected in {} out of {} attempts",
39+
race_detected_count, total_attempts
40+
);
41+
42+
if race_detected_count > 0 {
43+
panic!(
44+
"Race condition detected in {} out of {} attempts!",
45+
race_detected_count, total_attempts
46+
);
47+
}
48+
49+
println!("Test passed: Race condition was not triggered in any attempts.");
50+
}
51+
52+
async fn run_single_attempt(attempt: usize, delay_ms: u64) -> bool {
53+
let storage = Arc::new(s3_client_for_test_with_new_bucket().await);
54+
let prefix = format!("test_copy_empty_concurrent_{}", attempt);
55+
56+
Manifest::initialize(&LogWriterOptions::default(), &storage, &prefix, "init")
57+
.await
58+
.unwrap();
59+
60+
let reader = LogReader::open(
61+
LogReaderOptions::default(),
62+
Arc::clone(&storage),
63+
prefix.clone(),
64+
)
65+
.await
66+
.unwrap();
67+
68+
let manifest_before = reader.manifest().await.unwrap().unwrap();
69+
let next_write_before = manifest_before.next_write_timestamp();
70+
let next_seq_no_before = manifest_before.next_fragment_seq_no();
71+
72+
let barrier_start = Arc::new(Barrier::new(2));
73+
let barrier_start_clone = Arc::clone(&barrier_start);
74+
75+
let storage_clone = Arc::clone(&storage);
76+
let prefix_clone = prefix.clone();
77+
78+
let writer_task = tokio::spawn(async move {
79+
let log = LogWriter::open(
80+
LogWriterOptions {
81+
throttle_fragment: ThrottleOptions {
82+
batch_size_bytes: 1,
83+
batch_interval_us: 1,
84+
..ThrottleOptions::default()
85+
},
86+
..LogWriterOptions::default()
87+
},
88+
storage_clone,
89+
&prefix_clone,
90+
"concurrent_writer",
91+
(),
92+
)
93+
.await
94+
.unwrap();
95+
96+
barrier_start_clone.wait().await;
97+
98+
log.append_many(vec![Vec::from("concurrent data")])
99+
.await
100+
.unwrap();
101+
});
102+
103+
barrier_start.wait().await;
104+
105+
if delay_ms > 0 {
106+
tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
107+
}
108+
109+
wal3::copy(
110+
&storage,
111+
&LogWriterOptions::default(),
112+
&reader,
113+
LogPosition::default(),
114+
format!("{}_target", prefix),
115+
)
116+
.await
117+
.unwrap();
118+
119+
writer_task.await.unwrap();
120+
121+
let copied_reader = LogReader::open(
122+
LogReaderOptions::default(),
123+
Arc::clone(&storage),
124+
format!("{}_target", prefix),
125+
)
126+
.await
127+
.unwrap();
128+
129+
let copied_manifest = copied_reader.manifest().await.unwrap().unwrap();
130+
131+
// Check if race condition was triggered:
132+
// - Copied log has 0 fragments (scan() saw empty log)
133+
// - BUT copied log has updated next_write/next_seq_no (second manifest load saw writer's changes)
134+
let race_detected = copied_manifest.fragments.is_empty()
135+
&& (copied_manifest.next_write_timestamp() != next_write_before
136+
|| copied_manifest.next_fragment_seq_no() != next_seq_no_before);
137+
138+
if race_detected {
139+
println!(" Race detected: fragments={}, next_write={:?} (expected {:?}), next_seq_no={:?} (expected {:?})",
140+
copied_manifest.fragments.len(),
141+
copied_manifest.next_write_timestamp(),
142+
next_write_before,
143+
copied_manifest.next_fragment_seq_no(),
144+
next_seq_no_before);
145+
}
146+
147+
race_detected
148+
}

0 commit comments

Comments
 (0)