Skip to content

Commit c9c5b9c

Browse files
authored
Add select_biased! macro (#1040)
1 parent 8cec8ec commit c9c5b9c

File tree

4 files changed

+177
-24
lines changed

4 files changed

+177
-24
lines changed

crossbeam-channel/src/select.rs

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ enum Timeout {
177177
fn run_select(
178178
handles: &mut [(&dyn SelectHandle, usize, *const u8)],
179179
timeout: Timeout,
180+
is_biased: bool,
180181
) -> Option<(Token, usize, *const u8)> {
181182
if handles.is_empty() {
182183
// Wait until the timeout and return.
@@ -193,8 +194,10 @@ fn run_select(
193194
}
194195
}
195196

196-
// Shuffle the operations for fairness.
197-
utils::shuffle(handles);
197+
if !is_biased {
198+
// Shuffle the operations for fairness.
199+
utils::shuffle(handles);
200+
}
198201

199202
// Create a token, which serves as a temporary variable that gets initialized in this function
200203
// and is later used by a call to `channel::read()` or `channel::write()` that completes the
@@ -325,6 +328,7 @@ fn run_select(
325328
fn run_ready(
326329
handles: &mut [(&dyn SelectHandle, usize, *const u8)],
327330
timeout: Timeout,
331+
is_biased: bool,
328332
) -> Option<usize> {
329333
if handles.is_empty() {
330334
// Wait until the timeout and return.
@@ -341,8 +345,10 @@ fn run_ready(
341345
}
342346
}
343347

344-
// Shuffle the operations for fairness.
345-
utils::shuffle(handles);
348+
if !is_biased {
349+
// Shuffle the operations for fairness.
350+
utils::shuffle(handles);
351+
}
346352

347353
loop {
348354
let backoff = Backoff::new();
@@ -450,8 +456,9 @@ fn run_ready(
450456
#[inline]
451457
pub fn try_select<'a>(
452458
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
459+
is_biased: bool,
453460
) -> Result<SelectedOperation<'a>, TrySelectError> {
454-
match run_select(handles, Timeout::Now) {
461+
match run_select(handles, Timeout::Now, is_biased) {
455462
None => Err(TrySelectError),
456463
Some((token, index, ptr)) => Ok(SelectedOperation {
457464
token,
@@ -467,12 +474,13 @@ pub fn try_select<'a>(
467474
#[inline]
468475
pub fn select<'a>(
469476
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
477+
is_biased: bool,
470478
) -> SelectedOperation<'a> {
471479
if handles.is_empty() {
472480
panic!("no operations have been added to `Select`");
473481
}
474482

475-
let (token, index, ptr) = run_select(handles, Timeout::Never).unwrap();
483+
let (token, index, ptr) = run_select(handles, Timeout::Never, is_biased).unwrap();
476484
SelectedOperation {
477485
token,
478486
index,
@@ -487,10 +495,11 @@ pub fn select<'a>(
487495
pub fn select_timeout<'a>(
488496
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
489497
timeout: Duration,
498+
is_biased: bool,
490499
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
491500
match Instant::now().checked_add(timeout) {
492-
Some(deadline) => select_deadline(handles, deadline),
493-
None => Ok(select(handles)),
501+
Some(deadline) => select_deadline(handles, deadline, is_biased),
502+
None => Ok(select(handles, is_biased)),
494503
}
495504
}
496505

@@ -499,8 +508,9 @@ pub fn select_timeout<'a>(
499508
pub(crate) fn select_deadline<'a>(
500509
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
501510
deadline: Instant,
511+
is_biased: bool,
502512
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
503-
match run_select(handles, Timeout::At(deadline)) {
513+
match run_select(handles, Timeout::At(deadline), is_biased) {
504514
None => Err(SelectTimeoutError),
505515
Some((token, index, ptr)) => Ok(SelectedOperation {
506516
token,
@@ -764,7 +774,7 @@ impl<'a> Select<'a> {
764774
/// }
765775
/// ```
766776
pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
767-
try_select(&mut self.handles)
777+
try_select(&mut self.handles, false)
768778
}
769779

770780
/// Blocks until one of the operations becomes ready and selects it.
@@ -815,7 +825,7 @@ impl<'a> Select<'a> {
815825
/// # t2.join().unwrap(); // join thread to avoid https://github.com/rust-lang/miri/issues/1371
816826
/// ```
817827
pub fn select(&mut self) -> SelectedOperation<'a> {
818-
select(&mut self.handles)
828+
select(&mut self.handles, false)
819829
}
820830

821831
/// Blocks for a limited time until one of the operations becomes ready and selects it.
@@ -869,7 +879,7 @@ impl<'a> Select<'a> {
869879
&mut self,
870880
timeout: Duration,
871881
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
872-
select_timeout(&mut self.handles, timeout)
882+
select_timeout(&mut self.handles, timeout, false)
873883
}
874884

875885
/// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
@@ -925,7 +935,7 @@ impl<'a> Select<'a> {
925935
&mut self,
926936
deadline: Instant,
927937
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
928-
select_deadline(&mut self.handles, deadline)
938+
select_deadline(&mut self.handles, deadline, false)
929939
}
930940

931941
/// Attempts to find a ready operation without blocking.
@@ -964,7 +974,7 @@ impl<'a> Select<'a> {
964974
/// }
965975
/// ```
966976
pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
967-
match run_ready(&mut self.handles, Timeout::Now) {
977+
match run_ready(&mut self.handles, Timeout::Now, false) {
968978
None => Err(TryReadyError),
969979
Some(index) => Ok(index),
970980
}
@@ -1021,7 +1031,7 @@ impl<'a> Select<'a> {
10211031
panic!("no operations have been added to `Select`");
10221032
}
10231033

1024-
run_ready(&mut self.handles, Timeout::Never).unwrap()
1034+
run_ready(&mut self.handles, Timeout::Never, false).unwrap()
10251035
}
10261036

10271037
/// Blocks for a limited time until one of the operations becomes ready.
@@ -1122,7 +1132,7 @@ impl<'a> Select<'a> {
11221132
/// # t2.join().unwrap(); // join thread to avoid https://github.com/rust-lang/miri/issues/1371
11231133
/// ```
11241134
pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
1125-
match run_ready(&mut self.handles, Timeout::At(deadline)) {
1135+
match run_ready(&mut self.handles, Timeout::At(deadline), false) {
11261136
None => Err(ReadyTimeoutError),
11271137
Some(index) => Ok(index),
11281138
}

crossbeam-channel/src/select_macro.rs

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -750,7 +750,7 @@ macro_rules! crossbeam_channel_internal {
750750
$cases:tt
751751
) => {{
752752
let _oper: $crate::SelectedOperation<'_> = {
753-
let _oper = $crate::internal::select(&mut $sel);
753+
let _oper = $crate::internal::select(&mut $sel, _IS_BIASED);
754754

755755
// Erase the lifetime so that `sel` can be dropped early even without NLL.
756756
unsafe { ::std::mem::transmute(_oper) }
@@ -772,7 +772,7 @@ macro_rules! crossbeam_channel_internal {
772772
$cases:tt
773773
) => {{
774774
let _oper: ::std::option::Option<$crate::SelectedOperation<'_>> = {
775-
let _oper = $crate::internal::try_select(&mut $sel);
775+
let _oper = $crate::internal::try_select(&mut $sel, _IS_BIASED);
776776

777777
// Erase the lifetime so that `sel` can be dropped early even without NLL.
778778
unsafe { ::std::mem::transmute(_oper) }
@@ -802,7 +802,7 @@ macro_rules! crossbeam_channel_internal {
802802
$cases:tt
803803
) => {{
804804
let _oper: ::std::option::Option<$crate::SelectedOperation<'_>> = {
805-
let _oper = $crate::internal::select_timeout(&mut $sel, $timeout);
805+
let _oper = $crate::internal::select_timeout(&mut $sel, $timeout, _IS_BIASED);
806806

807807
// Erase the lifetime so that `sel` can be dropped early even without NLL.
808808
unsafe { ::std::mem::transmute(_oper) }
@@ -985,7 +985,8 @@ macro_rules! crossbeam_channel_internal {
985985
///
986986
/// This macro allows you to define a set of channel operations, wait until any one of them becomes
987987
/// ready, and finally execute it. If multiple operations are ready at the same time, a random one
988-
/// among them is selected.
988+
/// among them is selected (i.e. the unbiased selection). Use `select_biased!` for the biased
989+
/// selection.
989990
///
990991
/// It is also possible to define a `default` case that gets executed if none of the operations are
991992
/// ready, either right away or for a certain duration of time.
@@ -1121,8 +1122,33 @@ macro_rules! crossbeam_channel_internal {
11211122
#[macro_export]
11221123
macro_rules! select {
11231124
($($tokens:tt)*) => {
1124-
$crate::crossbeam_channel_internal!(
1125-
$($tokens)*
1126-
)
1125+
{
1126+
const _IS_BIASED: bool = false;
1127+
1128+
$crate::crossbeam_channel_internal!(
1129+
$($tokens)*
1130+
)
1131+
}
1132+
};
1133+
}
1134+
1135+
/// Selects from a set of channel operations.
1136+
///
1137+
/// This macro allows you to define a list of channel operations, wait until any one of them
1138+
/// becomes ready, and finally execute it. If multiple operations are ready at the same time, the
1139+
/// operation nearest to the front of the list is always selected (i.e. the biased selection). Use
1140+
/// [`select!`] for the unbiased selection.
1141+
///
1142+
/// Otherwise, this macro's functionality is identical to [`select!`]. Refer to it for the syntax.
1143+
#[macro_export]
1144+
macro_rules! select_biased {
1145+
($($tokens:tt)*) => {
1146+
{
1147+
const _IS_BIASED: bool = true;
1148+
1149+
$crate::crossbeam_channel_internal!(
1150+
$($tokens)*
1151+
)
1152+
}
11271153
};
11281154
}

crossbeam-channel/tests/mpsc.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ macro_rules! select {
176176
(
177177
$($name:pat = $rx:ident.$meth:ident() => $code:expr),+
178178
) => ({
179+
const _IS_BIASED: bool = false;
180+
179181
cc::crossbeam_channel_internal! {
180182
$(
181183
$meth(($rx).inner) -> res => {

crossbeam-channel/tests/select_macro.rs

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::ops::Deref;
99
use std::thread;
1010
use std::time::{Duration, Instant};
1111

12-
use crossbeam_channel::{after, bounded, never, select, tick, unbounded};
12+
use crossbeam_channel::{after, bounded, never, select, select_biased, tick, unbounded};
1313
use crossbeam_channel::{Receiver, RecvError, SendError, Sender, TryRecvError};
1414
use crossbeam_utils::thread::scope;
1515

@@ -943,6 +943,121 @@ fn fairness_send() {
943943
assert!(hits.iter().all(|x| *x >= COUNT / 4));
944944
}
945945

946+
#[test]
947+
fn unfairness() {
948+
#[cfg(miri)]
949+
const COUNT: usize = 100;
950+
#[cfg(not(miri))]
951+
const COUNT: usize = 10_000;
952+
953+
let (s1, r1) = unbounded::<()>();
954+
let (s2, r2) = unbounded::<()>();
955+
let (s3, r3) = unbounded::<()>();
956+
957+
for _ in 0..COUNT {
958+
s1.send(()).unwrap();
959+
s2.send(()).unwrap();
960+
}
961+
s3.send(()).unwrap();
962+
963+
let mut hits = [0usize; 3];
964+
for _ in 0..COUNT {
965+
select_biased! {
966+
recv(r1) -> _ => hits[0] += 1,
967+
recv(r2) -> _ => hits[1] += 1,
968+
recv(r3) -> _ => hits[2] += 1,
969+
}
970+
}
971+
assert_eq!(hits, [COUNT, 0, 0]);
972+
973+
for _ in 0..COUNT {
974+
select_biased! {
975+
recv(r1) -> _ => hits[0] += 1,
976+
recv(r2) -> _ => hits[1] += 1,
977+
recv(r3) -> _ => hits[2] += 1,
978+
}
979+
}
980+
assert_eq!(hits, [COUNT, COUNT, 0]);
981+
}
982+
983+
#[test]
984+
fn unfairness_timeout() {
985+
#[cfg(miri)]
986+
const COUNT: usize = 100;
987+
#[cfg(not(miri))]
988+
const COUNT: usize = 10_000;
989+
990+
let (s1, r1) = unbounded::<()>();
991+
let (s2, r2) = unbounded::<()>();
992+
let (s3, r3) = unbounded::<()>();
993+
994+
for _ in 0..COUNT {
995+
s1.send(()).unwrap();
996+
s2.send(()).unwrap();
997+
}
998+
s3.send(()).unwrap();
999+
1000+
let mut hits = [0usize; 3];
1001+
for _ in 0..COUNT {
1002+
select_biased! {
1003+
recv(r1) -> _ => hits[0] += 1,
1004+
recv(r2) -> _ => hits[1] += 1,
1005+
recv(r3) -> _ => hits[2] += 1,
1006+
default(ms(1000)) => unreachable!(),
1007+
}
1008+
}
1009+
assert_eq!(hits, [COUNT, 0, 0]);
1010+
1011+
for _ in 0..COUNT {
1012+
select_biased! {
1013+
recv(r1) -> _ => hits[0] += 1,
1014+
recv(r2) -> _ => hits[1] += 1,
1015+
recv(r3) -> _ => hits[2] += 1,
1016+
default(ms(1000)) => unreachable!(),
1017+
}
1018+
}
1019+
assert_eq!(hits, [COUNT, COUNT, 0]);
1020+
}
1021+
1022+
#[test]
1023+
fn unfairness_try() {
1024+
#[cfg(miri)]
1025+
const COUNT: usize = 100;
1026+
#[cfg(not(miri))]
1027+
const COUNT: usize = 10_000;
1028+
1029+
let (s1, r1) = unbounded::<()>();
1030+
let (s2, r2) = unbounded::<()>();
1031+
let (s3, r3) = unbounded::<()>();
1032+
1033+
for _ in 0..COUNT {
1034+
s1.send(()).unwrap();
1035+
s2.send(()).unwrap();
1036+
}
1037+
s3.send(()).unwrap();
1038+
1039+
let mut hits = [0usize; 3];
1040+
for _ in 0..COUNT {
1041+
select_biased! {
1042+
recv(r1) -> _ => hits[0] += 1,
1043+
recv(r2) -> _ => hits[1] += 1,
1044+
recv(r3) -> _ => hits[2] += 1,
1045+
default() => unreachable!(),
1046+
}
1047+
}
1048+
assert_eq!(hits, [COUNT, 0, 0]);
1049+
1050+
for _ in 0..COUNT {
1051+
select_biased! {
1052+
recv(r1) -> _ => hits[0] += 1,
1053+
recv(r2) -> _ => hits[1] += 1,
1054+
recv(r3) -> _ => hits[2] += 1,
1055+
default() => unreachable!(),
1056+
}
1057+
}
1058+
assert_eq!(hits, [COUNT, COUNT, 0]);
1059+
}
1060+
9461061
#[allow(clippy::or_fun_call, clippy::unnecessary_literal_unwrap)] // This is intentional.
9471062
#[test]
9481063
fn references() {

0 commit comments

Comments
 (0)