Skip to content

Commit f410958

Browse files
committed
fixup: applies fix for queue contention
Applies a fix for `mpmc::Queue::enqueue` contention using retry parameters. Authored-by: Sosthène Guédon <https://github.com/sosthene-nitrokey>
1 parent 3191f31 commit f410958

File tree

1 file changed

+41
-3
lines changed

1 file changed

+41
-3
lines changed

src/mpmc.rs

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,14 @@ impl<T, S: Storage> QueueInner<T, S> {
208208

209209
/// Returns the item in the front of the queue, or `None` if the queue is empty.
210210
pub fn dequeue(&self) -> Option<T> {
211-
unsafe { dequeue(S::as_ptr(self.buffer.get()), &self.dequeue_pos, self.mask()) }
211+
unsafe {
212+
dequeue(
213+
S::as_ptr(self.buffer.get()),
214+
&self.dequeue_pos,
215+
&self.enqueue_pos,
216+
self.mask(),
217+
)
218+
}
212219
}
213220

214221
/// Adds an `item` to the end of the queue.
@@ -218,6 +225,7 @@ impl<T, S: Storage> QueueInner<T, S> {
218225
unsafe {
219226
enqueue(
220227
S::as_ptr(self.buffer.get()),
228+
&self.dequeue_pos,
221229
&self.enqueue_pos,
222230
self.mask(),
223231
item,
@@ -255,18 +263,25 @@ impl<T> Cell<T> {
255263
}
256264
}
257265

266+
const CONTENTION_RETRY_COUNT: usize = 10000;
267+
258268
unsafe fn dequeue<T>(
259269
buffer: *mut Cell<T>,
260270
dequeue_pos: &AtomicTargetSize,
271+
enqueue_pos: &AtomicTargetSize,
261272
mask: UintSize,
262273
) -> Option<T> {
263274
let mut pos = dequeue_pos.load(Ordering::Relaxed);
264275

265276
let mut cell;
277+
let mut seq;
278+
let mut dif;
279+
let mut contention_retry_count = 0;
280+
266281
loop {
267282
cell = buffer.add(usize::from(pos & mask));
268-
let seq = (*cell).sequence.load(Ordering::Acquire);
269-
let dif = (seq as IntSize).wrapping_sub((pos.wrapping_add(1)) as IntSize);
283+
seq = (*cell).sequence.load(Ordering::Acquire);
284+
dif = (seq as IntSize).wrapping_sub((pos.wrapping_add(1)) as IntSize);
270285

271286
match dif.cmp(&0) {
272287
core::cmp::Ordering::Equal => {
@@ -283,6 +298,16 @@ unsafe fn dequeue<T>(
283298
}
284299
}
285300
core::cmp::Ordering::Less => {
301+
if pos != enqueue_pos.load(Ordering::Relaxed)
302+
&& contention_retry_count < CONTENTION_RETRY_COUNT
303+
{
304+
// In this case according to the positions the queue is not empty
305+
// This suggests that there is a enqueue operations that is in progress in some other task
306+
// Therefore we can wait a bit hoping that the other task can finish its `enqueue` operation complete
307+
core::hint::spin_loop();
308+
contention_retry_count += 1;
309+
continue;
310+
}
286311
return None;
287312
}
288313
core::cmp::Ordering::Greater => {
@@ -300,13 +325,15 @@ unsafe fn dequeue<T>(
300325

301326
unsafe fn enqueue<T>(
302327
buffer: *mut Cell<T>,
328+
dequeue_pos: &AtomicTargetSize,
303329
enqueue_pos: &AtomicTargetSize,
304330
mask: UintSize,
305331
item: T,
306332
) -> Result<(), T> {
307333
let mut pos = enqueue_pos.load(Ordering::Relaxed);
308334

309335
let mut cell;
336+
let mut contention_retry_count = 0;
310337
loop {
311338
cell = buffer.add(usize::from(pos & mask));
312339
let seq = (*cell).sequence.load(Ordering::Acquire);
@@ -325,8 +352,19 @@ unsafe fn enqueue<T>(
325352
{
326353
break;
327354
}
355+
pos = enqueue_pos.load(Ordering::Relaxed);
328356
}
329357
core::cmp::Ordering::Less => {
358+
if dequeue_pos.load(Ordering::Relaxed).wrapping_add(mask + 1) != pos
359+
&& contention_retry_count < CONTENTION_RETRY_COUNT
360+
{
361+
// In this case according to the positions the queue is not full
362+
// This suggests that there is a dequeue operation that is in progress in some other task
363+
// Therefore we can wait a bit hoping that the other task can finish its `dequeue` operation completes
364+
core::hint::spin_loop();
365+
contention_retry_count += 1;
366+
continue;
367+
}
330368
return Err(item);
331369
}
332370
core::cmp::Ordering::Greater => {

0 commit comments

Comments
 (0)