Skip to content

Commit 9364ca6

Browse files
asonixspacejam
authored andcommitted
Expand the store-pending pattern to Subscriber::next_timeout
1 parent c8a31e3 commit 9364ca6

File tree

2 files changed

+18
-5
lines changed

2 files changed

+18
-5
lines changed

src/oneshot.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,12 @@ impl<T> OneShot<T> {
6363
/// returning an error if not filled
6464
/// before a given timeout or if the
6565
/// system shuts down before then.
66+
///
67+
/// Upon a successful receive, the
68+
/// oneshot should be dropped, as it
69+
/// will never yield that value again.
6670
pub fn wait_timeout(
67-
self,
71+
&mut self,
6872
mut timeout: Duration,
6973
) -> Result<T, std::sync::mpsc::RecvTimeoutError> {
7074
let mut inner = self.mu.lock();

src/subscriber.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,16 @@ impl Subscriber {
108108
/// an error if no event arrives within the provided `Duration`
109109
/// or if the backing `Db` shuts down.
110110
pub fn next_timeout(
111-
&self,
111+
&mut self,
112112
mut timeout: Duration,
113113
) -> std::result::Result<Event, std::sync::mpsc::RecvTimeoutError> {
114114
loop {
115115
let start = Instant::now();
116-
let future_rx = self.rx.recv_timeout(timeout)?;
116+
let mut future_rx = if let Some(future_rx) = self.existing.take() {
117+
future_rx
118+
} else {
119+
self.rx.recv_timeout(timeout)?
120+
};
117121
timeout =
118122
if let Some(timeout) = timeout.checked_sub(start.elapsed()) {
119123
timeout
@@ -122,8 +126,13 @@ impl Subscriber {
122126
};
123127

124128
let start = Instant::now();
125-
if let Some(event) = future_rx.wait_timeout(timeout)? {
126-
return Ok(event);
129+
match future_rx.wait_timeout(timeout) {
130+
Ok(Some(event)) => return Ok(event),
131+
Ok(None) => (),
132+
Err(timeout_error) => {
133+
self.existing = Some(future_rx);
134+
return Err(timeout_error);
135+
}
127136
}
128137
timeout =
129138
if let Some(timeout) = timeout.checked_sub(start.elapsed()) {

0 commit comments

Comments
 (0)