Skip to content

Commit 65e4157

Browse files
committed
feat: reduce atomic usage on spmc receiver
1 parent ed6eb04 commit 65e4157

File tree

1 file changed

+28
-18
lines changed

1 file changed

+28
-18
lines changed

pulsebeam-runtime/src/sync/spmc.rs

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ impl<T> Ring<T> {
5959
#[derive(Debug)]
6060
pub struct Sender<T> {
6161
ring: Arc<Ring<T>>,
62-
head: u64,
62+
local_head: u64,
6363
}
6464

6565
impl<T> Sender<T> {
@@ -68,16 +68,16 @@ impl<T> Sender<T> {
6868
return;
6969
}
7070

71-
let idx = (self.head as usize) & self.ring.mask;
71+
let idx = (self.local_head as usize) & self.ring.mask;
7272

7373
{
7474
let mut slot = self.ring.slots[idx].write();
7575
slot.val = Some(val);
76-
slot.seq = self.head;
76+
slot.seq = self.local_head;
7777
}
7878

79-
self.ring.head.store(self.head + 1, Ordering::Release);
80-
self.head += 1;
79+
self.ring.head.store(self.local_head + 1, Ordering::Release);
80+
self.local_head += 1;
8181
self.ring.event.notify(usize::MAX);
8282
}
8383
}
@@ -93,6 +93,7 @@ impl<T> Drop for Sender<T> {
9393
pub struct Receiver<T> {
9494
ring: Arc<Ring<T>>,
9595
next_seq: u64,
96+
local_head: u64,
9697
listener: Option<EventListener>,
9798
}
9899

@@ -109,18 +110,24 @@ impl<T: Clone> Receiver<T> {
109110
loop {
110111
let coop = std::task::ready!(tokio::task::coop::poll_proceed(cx));
111112

112-
// Snapshot producer head
113-
let head = self.ring.head.load(Ordering::Acquire);
113+
// Snapshot producer head. This allows batching efficiency without a batching API.
114+
// It creates a fast-path for a slightly behind receiver to catchup the producer
115+
// without spending atomic load on every iteration.
116+
//
117+
// Safety: head is strictly monotonically increasing
118+
if self.next_seq == self.local_head {
119+
self.local_head = self.ring.head.load(Ordering::Acquire);
120+
}
114121
let capacity = self.ring.mask as u64 + 1;
115-
let earliest = head.saturating_sub(capacity);
122+
let earliest = self.local_head.saturating_sub(capacity);
116123

117124
// Closed and nothing left
118-
if self.ring.closed.load(Ordering::Acquire) == 1 && self.next_seq >= head {
125+
if self.ring.closed.load(Ordering::Acquire) == 1 && self.next_seq >= self.local_head {
119126
return Poll::Ready(Err(RecvError::Closed));
120127
}
121128

122129
// No new items — wait
123-
if self.next_seq >= head {
130+
if self.next_seq >= self.local_head {
124131
match &mut self.listener {
125132
Some(l) => {
126133
if Pin::new(l).poll(cx).is_pending() {
@@ -144,15 +151,15 @@ impl<T: Clone> Receiver<T> {
144151
// Slot overwritten before we got here
145152
if slot_seq < earliest {
146153
drop(slot);
147-
self.next_seq = head;
148-
return Poll::Ready(Err(RecvError::Lagged(head)));
154+
self.next_seq = self.local_head;
155+
return Poll::Ready(Err(RecvError::Lagged(self.local_head)));
149156
}
150157

151158
// Seq mismatch — producer overwrote after head snapshot
152159
if slot_seq != self.next_seq {
153160
drop(slot);
154-
self.next_seq = head;
155-
return Poll::Ready(Err(RecvError::Lagged(head)));
161+
self.next_seq = self.local_head;
162+
return Poll::Ready(Err(RecvError::Lagged(self.local_head)));
156163
}
157164

158165
// Valid message
@@ -167,8 +174,8 @@ impl<T: Clone> Receiver<T> {
167174
// This shouldn't never happen, but just in case..
168175
// Seq was correct but value missing — treat as lag
169176
drop(slot);
170-
self.next_seq = head;
171-
return Poll::Ready(Err(RecvError::Lagged(head)));
177+
self.next_seq = self.local_head;
178+
return Poll::Ready(Err(RecvError::Lagged(self.local_head)));
172179
}
173180
}
174181
}
@@ -190,9 +197,11 @@ impl<T: Clone> Stream for Receiver<T> {
190197

191198
impl<T: Clone> Clone for Receiver<T> {
192199
fn clone(&self) -> Self {
200+
let head = self.ring.head.load(Ordering::Acquire);
193201
Self {
194202
ring: self.ring.clone(),
195-
next_seq: self.ring.head.load(Ordering::Acquire),
203+
next_seq: head,
204+
local_head: head,
196205
listener: None,
197206
}
198207
}
@@ -203,11 +212,12 @@ pub fn channel<T: Send + Sync + Clone + 'static>(capacity: usize) -> (Sender<T>,
203212
(
204213
Sender {
205214
ring: ring.clone(),
206-
head: 0,
215+
local_head: 0,
207216
},
208217
Receiver {
209218
ring: ring.clone(),
210219
next_seq: 0,
220+
local_head: 0,
211221
listener: None,
212222
},
213223
)

0 commit comments

Comments
 (0)