Skip to content

Commit 6377816

Browse files
committed
Cache cursors
1 parent 756963e commit 6377816

File tree

2 files changed

+158
-39
lines changed

2 files changed

+158
-39
lines changed

src/queue.rs

Lines changed: 157 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ pub struct YCQueue<'a> {
3030
slot_size: u16,
3131
// slot count is a power of two, so we can store its exponent for fast calculations
3232
slot_count_exp: u16,
33+
cached_consumer_cursor: YCQueueCursor,
34+
cached_producer_cursor: YCQueueCursor,
3335
}
3436

3537
impl<'a> YCQueue<'a> {
@@ -83,6 +85,8 @@ impl<'a> YCQueue<'a> {
8385
slot_count: slot_count as u16,
8486
slot_size: slot_size as u16,
8587
slot_count_exp,
88+
cached_consumer_cursor: shared_metadata.consumer_cursor.load(Ordering::Acquire),
89+
cached_producer_cursor: shared_metadata.producer_cursor.load(Ordering::Acquire),
8690
})
8791
}
8892

@@ -260,16 +264,20 @@ impl<'a> YCQueue<'a> {
260264
Ok(())
261265
}
262266

263-
fn cursor_index(&self, cursor: YCQueueCursor) -> u16 {
264-
cursor_index(cursor, self.slot_count_exp)
267+
fn refresh_consumer_cursor(&mut self) -> YCQueueCursor {
268+
let cursor = self.shared_metadata.consumer_cursor.load(Ordering::Acquire);
269+
self.cached_consumer_cursor = cursor;
270+
cursor
265271
}
266272

267-
fn producer_cursor(&self) -> YCQueueCursor {
268-
self.shared_metadata.producer_cursor.load(Ordering::Acquire)
273+
fn refresh_producer_cursor(&mut self) -> YCQueueCursor {
274+
let cursor = self.shared_metadata.producer_cursor.load(Ordering::Acquire);
275+
self.cached_producer_cursor = cursor;
276+
cursor
269277
}
270278

271-
fn consumer_cursor(&self) -> YCQueueCursor {
272-
self.shared_metadata.consumer_cursor.load(Ordering::Acquire)
279+
fn cursor_index(&self, cursor: YCQueueCursor) -> u16 {
280+
cursor_index(cursor, self.slot_count_exp)
273281
}
274282

275283
/// Returns the number of slots that have been produced (or are being produced into) but not yet consumed.
@@ -292,10 +300,11 @@ impl<'a> YCQueue<'a> {
292300
/// assert_eq!(queue.in_flight_count(), 1);
293301
/// ```
294302
#[inline]
295-
pub fn in_flight_count(&self) -> u16 {
296-
(self.producer_cursor() - self.consumer_cursor())
297-
.try_into()
298-
.expect("in_flight_count overflow")
303+
pub fn in_flight_count(&mut self) -> u16 {
304+
self.refresh_consumer_cursor();
305+
self.refresh_producer_cursor();
306+
self.slots_used_cached()
307+
.expect("producer cursor should not trail consumer")
299308
}
300309

301310
/// Returns the circular index that will be reserved by the next producer call.
@@ -304,7 +313,7 @@ impl<'a> YCQueue<'a> {
304313
/// The slot index measured modulo the queue capacity.
305314
#[inline]
306315
pub fn produce_idx(&self) -> u16 {
307-
self.cursor_index(self.producer_cursor())
316+
self.cursor_index(self.cached_producer_cursor)
308317
}
309318

310319
/// Returns the circular index that will be reserved by the next consumer call.
@@ -313,7 +322,7 @@ impl<'a> YCQueue<'a> {
313322
/// The slot index measured modulo the queue capacity.
314323
#[inline]
315324
pub fn consume_idx(&self) -> u16 {
316-
self.cursor_index(self.consumer_cursor())
325+
self.cursor_index(self.cached_consumer_cursor)
317326
}
318327

319328
/// Returns the total number of slots managed by this queue.
@@ -322,6 +331,31 @@ impl<'a> YCQueue<'a> {
322331
self.slot_count
323332
}
324333

334+
fn slots_used_cached(&self) -> Option<u16> {
335+
let producer_cursor = self.cached_producer_cursor;
336+
let consumer_cursor = self.cached_consumer_cursor;
337+
338+
// is producer is behind consumer, it (producer) is stale
339+
if producer_cursor < consumer_cursor {
340+
return None;
341+
}
342+
343+
// if producer is more than slot_count ahead, consumer is stale
344+
if producer_cursor > consumer_cursor + self.slot_count as u64 {
345+
return None;
346+
}
347+
348+
let used = u16::try_from(producer_cursor - consumer_cursor).unwrap_or_else(|_| {
349+
panic!("distance should fit in u16: {producer_cursor} - {consumer_cursor}")
350+
});
351+
Some(used)
352+
}
353+
354+
fn slots_available_cached(&self) -> Option<u16> {
355+
let used = self.slots_used_cached()?;
356+
Some(self.slot_count - used)
357+
}
358+
325359
/// Reserve contiguous slots for producers, optionally in best-effort mode.
326360
///
327361
/// When `best_effort` is `false`, the function succeeds only if all `num_slots` are available;
@@ -366,41 +400,71 @@ impl<'a> YCQueue<'a> {
366400
return Err(YCQueueError::InvalidArgs);
367401
}
368402

403+
let requested_slots = num_slots;
404+
let mut refreshed = false;
369405
let old_producer = loop {
370-
let consumer = self.shared_metadata.consumer_cursor.load(Ordering::Acquire);
371-
let producer = self.shared_metadata.producer_cursor.load(Ordering::Acquire);
372-
let producer_limit = cursor_advance(consumer, self.slot_count);
406+
let producer = self.refresh_producer_cursor();
407+
408+
// first check available slots against cached consumer, if it's too stale we re-try
409+
let mut available_slots = match self.slots_available_cached() {
410+
Some(slots) => slots,
411+
None => {
412+
if !refreshed {
413+
self.refresh_consumer_cursor();
414+
refreshed = true;
415+
num_slots = requested_slots;
416+
continue;
417+
} else {
418+
return Err(YCQueueError::OutOfSpace);
419+
}
420+
}
421+
};
373422

374-
if (!best_effort && cursor_advance(producer, num_slots) > producer_limit)
375-
|| (best_effort && producer >= producer_limit)
376-
{
377-
return Err(YCQueueError::OutOfSpace);
423+
if best_effort {
424+
num_slots = num_slots.min(available_slots).max(1);
425+
}
426+
427+
if num_slots > available_slots {
428+
if !refreshed {
429+
self.refresh_consumer_cursor();
430+
refreshed = true;
431+
num_slots = requested_slots;
432+
continue;
433+
} else {
434+
return Err(YCQueueError::OutOfSpace);
435+
}
378436
}
379437

380-
let available_slots = self.check_owner(
438+
available_slots = self.check_owner(
381439
self.cursor_index(producer),
382440
num_slots,
383441
YCQueueOwner::Producer,
384442
);
385443

386-
if (!best_effort && available_slots != num_slots)
387-
|| (best_effort && available_slots == 0)
388-
{
389-
return Err(YCQueueError::SlotNotReady);
444+
if num_slots > available_slots {
445+
if best_effort && available_slots > 0 {
446+
num_slots = available_slots;
447+
} else {
448+
return Err(YCQueueError::SlotNotReady);
449+
}
390450
}
391451

392452
debug_assert!(available_slots > 0);
393453
debug_assert!(available_slots <= num_slots);
394454
num_slots = available_slots;
395455

396-
let next_producer = cursor_advance(producer, available_slots);
456+
let new_producer = cursor_advance(producer, available_slots);
457+
397458
match self.shared_metadata.producer_cursor.compare_exchange(
398459
producer,
399-
next_producer,
460+
new_producer,
400461
Ordering::AcqRel,
401462
Ordering::Acquire,
402463
) {
403-
Ok(_) => break producer,
464+
Ok(_) => {
465+
self.cached_producer_cursor = new_producer;
466+
break producer;
467+
}
404468
Err(_) => std::hint::spin_loop(),
405469
}
406470
};
@@ -609,17 +673,42 @@ impl<'a> YCQueue<'a> {
609673
return Err(YCQueueError::InvalidArgs);
610674
}
611675

676+
let requested_slots = num_slots;
677+
let mut refreshed = false;
612678
let old_consumer = loop {
613-
let producer = self.shared_metadata.producer_cursor.load(Ordering::Acquire);
614-
let consumer = self.shared_metadata.consumer_cursor.load(Ordering::Acquire);
679+
let consumer = self.refresh_consumer_cursor();
680+
681+
// first check available slots against cached producer, if it's too stale we re-try
682+
let mut available_slots = match self.slots_used_cached() {
683+
Some(slots) => slots,
684+
None => {
685+
if !refreshed {
686+
self.refresh_producer_cursor();
687+
refreshed = true;
688+
num_slots = requested_slots;
689+
continue;
690+
} else {
691+
return Err(YCQueueError::EmptyQueue);
692+
}
693+
}
694+
};
615695

616-
if (!best_effort && cursor_advance(consumer, num_slots) > producer)
617-
|| (best_effort && consumer >= producer)
618-
{
619-
return Err(YCQueueError::EmptyQueue);
696+
if best_effort {
697+
num_slots = num_slots.min(available_slots).max(1);
620698
}
621699

622-
let available_slots = self.check_owner(
700+
if num_slots > available_slots {
701+
if !refreshed {
702+
self.refresh_producer_cursor();
703+
refreshed = true;
704+
num_slots = requested_slots;
705+
continue;
706+
} else {
707+
return Err(YCQueueError::EmptyQueue);
708+
}
709+
}
710+
711+
available_slots = self.check_owner(
623712
self.cursor_index(consumer),
624713
num_slots,
625714
YCQueueOwner::Consumer,
@@ -629,20 +718,26 @@ impl<'a> YCQueue<'a> {
629718
{
630719
return Err(YCQueueError::SlotNotReady);
631720
}
721+
if num_slots > available_slots {
722+
debug_assert!(best_effort);
723+
num_slots = available_slots;
724+
}
632725

633726
debug_assert!(available_slots > 0);
634727
debug_assert!(available_slots <= num_slots);
635-
num_slots = available_slots;
636728

637-
let new_consumer = cursor_advance(consumer, num_slots);
729+
let next_consumer = cursor_advance(consumer, available_slots);
638730

639731
match self.shared_metadata.consumer_cursor.compare_exchange(
640732
consumer,
641-
new_consumer,
733+
next_consumer,
642734
Ordering::AcqRel,
643735
Ordering::Acquire,
644736
) {
645-
Ok(_) => break consumer,
737+
Ok(_) => {
738+
self.cached_consumer_cursor = next_consumer;
739+
break consumer;
740+
}
646741
Err(_) => std::hint::spin_loop(),
647742
}
648743
};
@@ -925,6 +1020,30 @@ mod tests {
9251020
assert_eq!(queue.consume_idx(), 3);
9261021
}
9271022

1023+
#[test]
1024+
fn in_flight_count_tracks_other_writers() {
1025+
use std::sync::atomic::Ordering;
1026+
1027+
let slot_count: u16 = 8;
1028+
let slot_size: u16 = 16;
1029+
1030+
let owned = YCQueueOwnedData::new(slot_count, slot_size);
1031+
let mut queue = YCQueue::from_owned_data(&owned).unwrap();
1032+
1033+
assert_eq!(queue.in_flight_count(), 0);
1034+
1035+
queue
1036+
.shared_metadata
1037+
.producer_cursor
1038+
.store(3, Ordering::Release);
1039+
queue
1040+
.shared_metadata
1041+
.consumer_cursor
1042+
.store(1, Ordering::Release);
1043+
1044+
assert_eq!(queue.in_flight_count(), 2);
1045+
}
1046+
9281047
#[test]
9291048
fn best_effort_produce_partial_batch() {
9301049
let slot_count: u16 = 4;

tests/multi_thread_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -676,7 +676,7 @@ mod multi_thread_tests {
676676
"large consumer never completed a batch"
677677
);
678678

679-
let final_queue = YCQueue::from_owned_data(&owned_data).unwrap();
679+
let mut final_queue = YCQueue::from_owned_data(&owned_data).unwrap();
680680
assert_eq!(final_queue.in_flight_count(), 0);
681681
assert_eq!(final_queue.produce_idx(), final_queue.consume_idx());
682682
}

0 commit comments

Comments
 (0)