Skip to content

Commit 0e37ca3

Browse files
committed
fix: serialize appends to guarantee broadcast ordering
Add mutex around append() to ensure ID generation, write, and broadcast happen atomically. This fixes a race condition where concurrent appends could broadcast frames out of scru128 ID order.
1 parent 1bb982d commit 0e37ca3

File tree

2 files changed

+110
-1
lines changed

2 files changed

+110
-1
lines changed

src/store/mod.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use tokio::sync::broadcast;
1212
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
1313

1414
use std::collections::HashSet;
15-
use std::sync::{Arc, RwLock};
15+
use std::sync::{Arc, Mutex, RwLock};
1616

1717
use scru128::Scru128Id;
1818

@@ -184,6 +184,7 @@ pub struct Store {
184184
contexts: Arc<RwLock<HashSet<Scru128Id>>>,
185185
broadcast_tx: broadcast::Sender<Frame>,
186186
gc_tx: UnboundedSender<GCTask>,
187+
append_lock: Arc<Mutex<()>>,
187188
}
188189

189190
impl Store {
@@ -222,6 +223,7 @@ impl Store {
222223
contexts: Arc::new(RwLock::new(contexts)),
223224
broadcast_tx,
224225
gc_tx,
226+
append_lock: Arc::new(Mutex::new(())),
225227
};
226228

227229
// Load context registrations
@@ -517,6 +519,11 @@ impl Store {
517519
}
518520

519521
pub fn append(&self, mut frame: Frame) -> Result<Frame, crate::error::Error> {
522+
// Serialize all appends to ensure ID generation, write, and broadcast
523+
// happen atomically. This guarantees subscribers receive frames in
524+
// scru128 ID order.
525+
let _guard = self.append_lock.lock().unwrap();
526+
520527
frame.id = scru128::new();
521528

522529
// Special handling for xs.context registration

src/store/tests.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1207,3 +1207,105 @@ async fn assert_no_more_frames(recver: &mut tokio::sync::mpsc::Receiver<Frame>)
12071207
}
12081208
}
12091209
}
1210+
1211+
mod tests_append_race {
1212+
use super::*;
1213+
use std::sync::atomic::{AtomicUsize, Ordering};
1214+
use std::sync::{Arc, Barrier};
1215+
use tempfile::TempDir;
1216+
1217+
/// Test that concurrent appends broadcast frames in scru128 ID order.
1218+
/// This test attempts to expose a race condition between ID generation,
1219+
/// writing, and broadcasting.
1220+
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1221+
async fn test_concurrent_append_broadcast_order() {
1222+
let temp_dir = TempDir::new().unwrap();
1223+
let store = Arc::new(Store::new(temp_dir.keep()));
1224+
1225+
// Subscribe to broadcasts before spawning tasks
1226+
let mut rx = store
1227+
.read(
1228+
ReadOptions::builder()
1229+
.follow(FollowOption::On)
1230+
.context_id(ZERO_CONTEXT)
1231+
.build(),
1232+
)
1233+
.await;
1234+
1235+
// Wait for threshold marker
1236+
let threshold = rx.recv().await.unwrap();
1237+
assert_eq!(threshold.topic, "xs.threshold");
1238+
1239+
let num_threads = 8;
1240+
let appends_per_thread = 50;
1241+
1242+
// Use a barrier to maximize concurrent contention
1243+
let barrier = Arc::new(Barrier::new(num_threads));
1244+
let completed = Arc::new(AtomicUsize::new(0));
1245+
1246+
// Spawn OS threads (not async tasks) for true parallelism
1247+
let mut handles = Vec::new();
1248+
for thread_id in 0..num_threads {
1249+
let store = Arc::clone(&store);
1250+
let barrier = Arc::clone(&barrier);
1251+
let completed = Arc::clone(&completed);
1252+
handles.push(std::thread::spawn(move || {
1253+
// All threads wait here, then start simultaneously
1254+
barrier.wait();
1255+
for i in 0..appends_per_thread {
1256+
let _ = store.append(
1257+
Frame::builder("race-test", ZERO_CONTEXT)
1258+
.meta(serde_json::json!({"thread": thread_id, "seq": i}))
1259+
.build(),
1260+
);
1261+
}
1262+
completed.fetch_add(1, Ordering::SeqCst);
1263+
}));
1264+
}
1265+
1266+
// Wait for all threads to complete
1267+
for handle in handles {
1268+
handle.join().unwrap();
1269+
}
1270+
1271+
// Collect all broadcast frames
1272+
let expected_count = num_threads * appends_per_thread;
1273+
let mut received = Vec::with_capacity(expected_count);
1274+
1275+
loop {
1276+
if received.len() >= expected_count {
1277+
break;
1278+
}
1279+
match tokio::time::timeout(Duration::from_secs(5), rx.recv()).await {
1280+
Ok(Some(frame)) if frame.topic == "race-test" => {
1281+
received.push(frame);
1282+
}
1283+
Ok(Some(_)) => {
1284+
// Skip non-test frames (like pulses)
1285+
continue;
1286+
}
1287+
Ok(None) => panic!("Channel closed unexpectedly"),
1288+
Err(_) => panic!(
1289+
"Timeout waiting for frames, got {} of {}",
1290+
received.len(),
1291+
expected_count
1292+
),
1293+
}
1294+
}
1295+
1296+
// Verify frames were received in scru128 ID order
1297+
let mut out_of_order = Vec::new();
1298+
for i in 1..received.len() {
1299+
if received[i].id < received[i - 1].id {
1300+
out_of_order.push((i - 1, i, received[i - 1].id, received[i].id));
1301+
}
1302+
}
1303+
1304+
assert!(
1305+
out_of_order.is_empty(),
1306+
"Frames received out of scru128 order! Found {} violations:\n{:?}",
1307+
out_of_order.len(),
1308+
out_of_order.iter().take(10).collect::<Vec<_>>()
1309+
);
1310+
}
1311+
}

0 commit comments

Comments
 (0)