Skip to content

Commit 437e667

Browse files
committed
rch::mpsc: add Sender::try_reserve
1 parent 726277d commit 437e667

File tree

2 files changed

+214
-1
lines changed

2 files changed

+214
-1
lines changed

remoc/src/rch/mpsc/sender.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,32 @@ where
436436
}
437437
}
438438

439+
/// Tries to acquire a slot in the channel without waiting for the slot to become available.
440+
/// If capacity to send one message is available, it is reserved for the caller.
441+
/// Otherwise an error is returned.
442+
///
443+
/// # Error reporting
444+
/// Sending and error reporting are done asynchronously.
445+
/// Thus, the reporting of an error may be delayed and this function may
446+
/// return errors caused by previous invocations.
447+
pub fn try_reserve(&self) -> Result<Permit<T>, TrySendError<()>> {
448+
if let Some(err) = self.remote_send_err_rx.borrow().as_ref() {
449+
return Err(TrySendError::from_remote_send_error(err.clone(), ()));
450+
}
451+
452+
match self.tx.upgrade() {
453+
Some(tx) => {
454+
let tx = (*tx).clone();
455+
match tx.try_reserve_owned() {
456+
Ok(permit) => Ok(Permit(permit)),
457+
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => Err(TrySendError::Full(())),
458+
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => Err(TrySendError::Closed(())),
459+
}
460+
}
461+
_ => Err(TrySendError::Closed(())),
462+
}
463+
}
464+
439465
/// Returns the current capacity of the channel.
440466
///
441467
/// Zero is returned when the channel has been closed or an error has occurred.

remoc/tests/rch/mpsc.rs

Lines changed: 188 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use remoc::{
1212
rch::{
1313
ClosedReason, SendResultExt, SendingError,
1414
base::{self, SendErrorKind},
15-
mpsc::{self, SendError},
15+
mpsc::{self, SendError, TrySendError},
1616
},
1717
};
1818

@@ -606,3 +606,190 @@ async fn max_item_size_exceeded() {
606606
assert_eq!(tx.closed_reason(), Some(ClosedReason::Failed));
607607
println!("Close reason: {:?}", tx.closed_reason());
608608
}
609+
610+
#[cfg_attr(not(feature = "js"), tokio::test)]
611+
#[cfg_attr(feature = "js", wasm_bindgen_test)]
612+
async fn try_reserve_send() {
613+
crate::init();
614+
let ((mut a_tx, _), (_, mut b_rx)) = loop_channel::<mpsc::Receiver<i16>>().await;
615+
616+
println!("Sending remote mpsc channel receiver");
617+
let (tx, rx) = mpsc::channel(16);
618+
a_tx.send(rx).await.unwrap();
619+
println!("Receiving remote mpsc channel receiver");
620+
let mut rx = b_rx.recv().await.unwrap().unwrap();
621+
622+
for i in 1..100 {
623+
println!("Reserving slot for {i}");
624+
let permit = tx.try_reserve().expect("try_reserve should succeed");
625+
println!("Sending {i} via permit");
626+
permit.send(i);
627+
let r = rx.recv().await.unwrap().unwrap();
628+
println!("Received {r}");
629+
assert_eq!(i, r, "send/receive mismatch");
630+
}
631+
632+
println!("Verifying that channel is open");
633+
assert!(!tx.is_closed());
634+
assert_eq!(tx.closed_reason(), None);
635+
rx.close();
636+
637+
println!("Closing channel");
638+
tx.closed().await;
639+
assert!(tx.is_closed());
640+
assert_eq!(tx.closed_reason(), Some(ClosedReason::Closed));
641+
}
642+
643+
#[cfg_attr(not(feature = "js"), tokio::test)]
644+
#[cfg_attr(feature = "js", wasm_bindgen_test)]
645+
async fn try_reserve_full() {
646+
crate::init();
647+
let ((mut a_tx, _), (_, mut b_rx)) = loop_channel::<mpsc::Receiver<i16>>().await;
648+
649+
println!("Sending remote mpsc channel receiver");
650+
let buffer_size = 4;
651+
let (tx, rx) = mpsc::channel(buffer_size);
652+
a_tx.send(rx).await.unwrap();
653+
println!("Receiving remote mpsc channel receiver");
654+
let mut rx = b_rx.recv().await.unwrap().unwrap();
655+
656+
println!("Filling the channel buffer");
657+
let mut permits = Vec::new();
658+
for i in 0..buffer_size {
659+
println!("Reserving slot {i}");
660+
let permit = tx.try_reserve().expect("try_reserve should succeed while buffer not full");
661+
permits.push(permit);
662+
}
663+
664+
println!("Trying to reserve when channel is full");
665+
match tx.try_reserve() {
666+
Err(TrySendError::Full(())) => println!("Correctly got Full error"),
667+
Ok(_) => panic!("try_reserve should fail when channel is full"),
668+
Err(other) => panic!("expected Full error, got: {other}"),
669+
}
670+
671+
println!("Sending values via permits");
672+
for (i, permit) in permits.into_iter().enumerate() {
673+
permit.send(i as i16);
674+
}
675+
676+
println!("Receiving all sent values");
677+
for i in 0..buffer_size {
678+
let r = rx.recv().await.unwrap().unwrap();
679+
println!("Received {r}");
680+
assert_eq!(i as i16, r, "send/receive mismatch");
681+
}
682+
683+
println!("Verifying try_reserve works again after draining");
684+
let permit = tx.try_reserve().expect("try_reserve should succeed after draining buffer");
685+
permit.send(42);
686+
let r = rx.recv().await.unwrap().unwrap();
687+
assert_eq!(42, r);
688+
}
689+
690+
#[cfg_attr(not(feature = "js"), tokio::test)]
691+
#[cfg_attr(feature = "js", wasm_bindgen_test)]
692+
async fn try_reserve_closed() {
693+
crate::init();
694+
let ((mut a_tx, _), (_, mut b_rx)) = loop_channel::<mpsc::Receiver<i16>>().await;
695+
696+
println!("Sending remote mpsc channel receiver");
697+
let (tx, rx) = mpsc::channel(16);
698+
a_tx.send(rx).await.unwrap();
699+
println!("Receiving remote mpsc channel receiver");
700+
let mut rx = b_rx.recv().await.unwrap().unwrap();
701+
702+
println!("Verifying try_reserve works before close");
703+
let permit = tx.try_reserve().expect("try_reserve should succeed on open channel");
704+
permit.send(1);
705+
let r = rx.recv().await.unwrap().unwrap();
706+
assert_eq!(1, r);
707+
708+
println!("Closing receiver");
709+
rx.close();
710+
711+
println!("Waiting for sender to notice closure");
712+
tx.closed().await;
713+
assert!(tx.is_closed());
714+
assert_eq!(tx.closed_reason(), Some(ClosedReason::Closed));
715+
716+
println!("Trying try_reserve after close");
717+
match tx.try_reserve() {
718+
Err(err) => {
719+
println!("Got expected error: {err}");
720+
assert!(err.is_closed() || err.is_disconnected(), "error should indicate closed/disconnected");
721+
assert!(err.is_final(), "error after close should be final");
722+
}
723+
Ok(_) => panic!("try_reserve should fail after channel is closed"),
724+
}
725+
}
726+
727+
#[cfg_attr(not(feature = "js"), tokio::test)]
728+
#[cfg_attr(feature = "js", wasm_bindgen_test)]
729+
async fn reserve_send() {
730+
crate::init();
731+
let ((mut a_tx, _), (_, mut b_rx)) = loop_channel::<mpsc::Receiver<i16>>().await;
732+
733+
println!("Sending remote mpsc channel receiver");
734+
let (tx, rx) = mpsc::channel(16);
735+
a_tx.send(rx).await.unwrap();
736+
println!("Receiving remote mpsc channel receiver");
737+
let mut rx = b_rx.recv().await.unwrap().unwrap();
738+
739+
for i in 1..100 {
740+
println!("Reserving slot for {i}");
741+
let permit = tx.reserve().await.expect("reserve should succeed");
742+
println!("Sending {i} via permit");
743+
permit.send(i);
744+
let r = rx.recv().await.unwrap().unwrap();
745+
println!("Received {r}");
746+
assert_eq!(i, r, "send/receive mismatch");
747+
}
748+
749+
println!("Verifying that channel is open");
750+
assert!(!tx.is_closed());
751+
assert_eq!(tx.closed_reason(), None);
752+
rx.close();
753+
754+
println!("Closing channel");
755+
tx.closed().await;
756+
assert!(tx.is_closed());
757+
assert_eq!(tx.closed_reason(), Some(ClosedReason::Closed));
758+
}
759+
760+
#[cfg_attr(not(feature = "js"), tokio::test)]
761+
#[cfg_attr(feature = "js", wasm_bindgen_test)]
762+
async fn reserve_closed() {
763+
crate::init();
764+
let ((mut a_tx, _), (_, mut b_rx)) = loop_channel::<mpsc::Receiver<i16>>().await;
765+
766+
println!("Sending remote mpsc channel receiver");
767+
let (tx, rx) = mpsc::channel(16);
768+
a_tx.send(rx).await.unwrap();
769+
println!("Receiving remote mpsc channel receiver");
770+
let mut rx = b_rx.recv().await.unwrap().unwrap();
771+
772+
println!("Verifying reserve works before close");
773+
let permit = tx.reserve().await.expect("reserve should succeed on open channel");
774+
permit.send(1);
775+
let r = rx.recv().await.unwrap().unwrap();
776+
assert_eq!(1, r);
777+
778+
println!("Closing receiver");
779+
rx.close();
780+
781+
println!("Waiting for sender to notice closure");
782+
tx.closed().await;
783+
assert!(tx.is_closed());
784+
assert_eq!(tx.closed_reason(), Some(ClosedReason::Closed));
785+
786+
println!("Trying reserve after close");
787+
match tx.reserve().await {
788+
Err(err) => {
789+
println!("Got expected error: {err}");
790+
assert!(err.is_closed() && err.is_disconnected(), "error should indicate closed/disconnected");
791+
assert_eq!(err.closed_reason(), Some(ClosedReason::Closed));
792+
}
793+
Ok(_) => panic!("reserve should fail after channel is closed"),
794+
}
795+
}

0 commit comments

Comments
 (0)