Skip to content

Commit 383da87

Browse files
authored
sync: implement oneshot::Receiver::is_empty() (#7153)
1 parent 17117b5 commit 383da87

File tree

2 files changed

+150
-0
lines changed

2 files changed

+150
-0
lines changed

tokio/src/sync/oneshot.rs

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -989,6 +989,91 @@ impl<T> Receiver<T> {
989989
self.inner.is_none()
990990
}
991991

992+
/// Checks if a channel is empty.
993+
///
994+
/// This method returns `true` if the channel has no messages.
995+
///
996+
/// It is not necessarily safe to poll an empty receiver, which may have
997+
/// already yielded a value. Use [`is_terminated()`][Self::is_terminated]
998+
/// to check whether or not a receiver can be safely polled, instead.
999+
///
1000+
/// # Examples
1001+
///
1002+
/// Sending a value.
1003+
///
1004+
/// ```
1005+
/// use tokio::sync::oneshot;
1006+
///
1007+
/// #[tokio::main]
1008+
/// async fn main() {
1009+
/// let (tx, mut rx) = oneshot::channel();
1010+
/// assert!(rx.is_empty());
1011+
///
1012+
/// tx.send(0).unwrap();
1013+
/// assert!(!rx.is_empty());
1014+
///
1015+
/// let _ = (&mut rx).await;
1016+
/// assert!(rx.is_empty());
1017+
/// }
1018+
/// ```
1019+
///
1020+
/// Dropping the sender.
1021+
///
1022+
/// ```
1023+
/// use tokio::sync::oneshot;
1024+
///
1025+
/// #[tokio::main]
1026+
/// async fn main() {
1027+
/// let (tx, mut rx) = oneshot::channel::<()>();
1028+
///
1029+
/// // A channel is empty if the sender is dropped.
1030+
/// drop(tx);
1031+
/// assert!(rx.is_empty());
1032+
///
1033+
/// // A closed channel still yields an error, however.
1034+
/// (&mut rx).await.expect_err("should yield an error");
1035+
/// assert!(rx.is_empty());
1036+
/// }
1037+
/// ```
1038+
///
1039+
/// Terminated channels are empty.
1040+
///
1041+
/// ```should_panic
1042+
/// use tokio::sync::oneshot;
1043+
///
1044+
/// #[tokio::main]
1045+
/// async fn main() {
1046+
/// let (tx, mut rx) = oneshot::channel();
1047+
/// tx.send(0).unwrap();
1048+
/// let _ = (&mut rx).await;
1049+
///
1050+
/// // NB: an empty channel is not necessarily safe to poll!
1051+
/// assert!(rx.is_empty());
1052+
/// let _ = (&mut rx).await;
1053+
/// }
1054+
/// ```
1055+
pub fn is_empty(&self) -> bool {
1056+
let Some(inner) = self.inner.as_ref() else {
1057+
// The channel has already terminated.
1058+
return true;
1059+
};
1060+
1061+
let state = State::load(&inner.state, Acquire);
1062+
if state.is_complete() {
1063+
// SAFETY: If `state.is_complete()` returns true, then the
1064+
// `VALUE_SENT` bit has been set and the sender side of the
1065+
// channel will no longer attempt to access the inner
1066+
// `UnsafeCell`. Therefore, it is now safe for us to access the
1067+
// cell.
1068+
//
1069+
// The channel is empty if it does not have a value.
1070+
unsafe { !inner.has_value() }
1071+
} else {
1072+
// The receiver closed the channel or no value has been sent yet.
1073+
true
1074+
}
1075+
}
1076+
9921077
/// Attempts to receive a value.
9931078
///
9941079
/// If a pending value exists in the channel, it is returned. If no value
@@ -1291,6 +1376,19 @@ impl<T> Inner<T> {
12911376
unsafe fn consume_value(&self) -> Option<T> {
12921377
self.value.with_mut(|ptr| (*ptr).take())
12931378
}
1379+
1380+
/// Returns true if there is a value. This function does not check `state`.
1381+
///
1382+
/// # Safety
1383+
///
1384+
/// Calling this method concurrently on multiple threads will result in a
1385+
/// data race. The `VALUE_SENT` state bit is used to ensure that only the
1386+
/// sender *or* the receiver will call this method at a given point in time.
1387+
/// If `VALUE_SENT` is not set, then only the sender may call this method;
1388+
/// if it is set, then only the receiver may call this method.
1389+
unsafe fn has_value(&self) -> bool {
1390+
self.value.with(|ptr| (*ptr).is_some())
1391+
}
12941392
}
12951393

12961394
unsafe impl<T: Send> Send for Inner<T> {}

tokio/tests/sync_oneshot.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,3 +386,55 @@ fn receiver_is_terminated_rx_close() {
386386
"channel IS terminated after value is read"
387387
);
388388
}
389+
390+
#[test]
391+
fn receiver_is_empty_send() {
392+
let (tx, mut rx) = oneshot::channel::<i32>();
393+
394+
assert!(rx.is_empty(), "channel IS empty before value is sent");
395+
tx.send(17).unwrap();
396+
assert!(!rx.is_empty(), "channel is NOT empty after value is sent");
397+
398+
let mut task = task::spawn(());
399+
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
400+
assert_ready_eq!(poll, Ok(17));
401+
402+
assert!(rx.is_empty(), "channel IS empty after value is read");
403+
}
404+
405+
#[test]
406+
fn receiver_is_empty_try_recv() {
407+
let (tx, mut rx) = oneshot::channel::<i32>();
408+
409+
assert!(rx.is_empty(), "channel IS empty before value is sent");
410+
tx.send(17).unwrap();
411+
assert!(!rx.is_empty(), "channel is NOT empty after value is sent");
412+
413+
let value = rx.try_recv().expect("value is waiting");
414+
assert_eq!(value, 17);
415+
416+
assert!(rx.is_empty(), "channel IS empty after value is read");
417+
}
418+
419+
#[test]
420+
fn receiver_is_empty_drop() {
421+
let (tx, mut rx) = oneshot::channel::<i32>();
422+
423+
assert!(rx.is_empty(), "channel IS empty before sender is dropped");
424+
drop(tx);
425+
assert!(rx.is_empty(), "channel IS empty after sender is dropped");
426+
427+
let mut task = task::spawn(());
428+
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
429+
assert_ready_err!(poll);
430+
431+
assert!(rx.is_empty(), "channel IS empty after value is read");
432+
}
433+
434+
#[test]
435+
fn receiver_is_empty_rx_close() {
436+
let (_tx, mut rx) = oneshot::channel::<i32>();
437+
assert!(rx.is_empty());
438+
rx.close();
439+
assert!(rx.is_empty());
440+
}

0 commit comments

Comments
 (0)