Skip to content

Commit de96717

Browse files
author
rusty
committed
Add StopSource and StopToken cancellation types
1 parent a60f84f commit de96717

File tree

3 files changed

+220
-0
lines changed

3 files changed

+220
-0
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ async-session = { version = "3.0", optional = true }
3838
async-sse = "4.0.1"
3939
async-std = { version = "1.6.5", features = ["unstable"] }
4040
async-trait = "0.1.41"
41+
event-listener = "2.5.1"
4142
femme = { version = "2.1.1", optional = true }
4243
futures-util = "0.3.6"
4344
http-client = { version = "6.1.0", default-features = false }

src/cancellation.rs

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
use std::pin::Pin;
2+
use std::sync::atomic::{AtomicBool, Ordering};
3+
4+
use async_std::future::Future;
5+
use async_std::stream::Stream;
6+
use async_std::sync::Arc;
7+
use async_std::task::{Context, Poll};
8+
9+
use event_listener::{Event, EventListener};
10+
use pin_project_lite::pin_project;
11+
12+
#[derive(Debug)]
13+
pub struct StopSource {
14+
stopped: Arc<AtomicBool>,
15+
event: Arc<Event>,
16+
}
17+
18+
impl StopSource {
19+
pub fn new() -> Self {
20+
Self {
21+
stopped: Arc::new(AtomicBool::new(false)),
22+
event: Arc::new(Event::new()),
23+
}
24+
}
25+
26+
pub fn token(&self) -> StopToken {
27+
StopToken {
28+
stopped: self.stopped.clone(),
29+
event_listener: self.event.listen(),
30+
event: self.event.clone(),
31+
}
32+
}
33+
}
34+
35+
impl Drop for StopSource {
36+
fn drop(&mut self) {
37+
self.stopped.store(true, Ordering::SeqCst);
38+
self.event.notify(usize::MAX);
39+
}
40+
}
41+
42+
pin_project! {
43+
#[derive(Debug)]
44+
pub struct StopToken {
45+
#[pin]
46+
stopped: Arc<AtomicBool>,
47+
#[pin]
48+
event_listener: EventListener,
49+
event: Arc<Event>,
50+
}
51+
}
52+
53+
impl StopToken {
54+
pub fn never() -> Self {
55+
let event = Event::new();
56+
Self {
57+
stopped: Arc::new(AtomicBool::new(false)),
58+
event_listener: event.listen(),
59+
event: Arc::new(event),
60+
}
61+
}
62+
}
63+
64+
impl Clone for StopToken {
65+
fn clone(&self) -> Self {
66+
Self {
67+
stopped: self.stopped.clone(),
68+
event_listener: self.event.listen(),
69+
event: self.event.clone(),
70+
}
71+
}
72+
}
73+
74+
impl Future for StopToken {
75+
type Output = ();
76+
77+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
78+
let mut this = self.project();
79+
let _ = Future::poll(Pin::new(&mut this.event_listener), cx);
80+
if this.stopped.load(Ordering::Relaxed) {
81+
Poll::Ready(())
82+
} else {
83+
Poll::Pending
84+
}
85+
}
86+
}
87+
88+
pin_project! {
89+
#[derive(Debug)]
90+
pub struct StopStream<S> {
91+
#[pin]
92+
stream: S,
93+
#[pin]
94+
stop_token: StopToken,
95+
}
96+
}
97+
98+
impl<S> StopStream<S> {
99+
pub fn new(stream: S, stop_token: StopToken) -> Self {
100+
Self { stream, stop_token }
101+
}
102+
}
103+
104+
impl<S> Stream for StopStream<S>
105+
where
106+
S: Stream,
107+
{
108+
type Item = S::Item;
109+
110+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
111+
let mut this = self.project();
112+
if Future::poll(Pin::new(&mut this.stop_token), cx).is_ready() {
113+
Poll::Ready(None)
114+
} else {
115+
this.stream.poll_next(cx)
116+
}
117+
}
118+
}
119+
120+
pub trait StopStreamExt: Sized {
121+
fn stop_on(self, stop_token: StopToken) -> StopStream<Self> {
122+
StopStream::new(self, stop_token)
123+
}
124+
}
125+
126+
impl<S> StopStreamExt for S where S: Stream {}
127+
128+
pin_project! {
129+
#[derive(Debug)]
130+
pub struct StopFuture<F> {
131+
#[pin]
132+
future: F,
133+
#[pin]
134+
stop_token: StopToken,
135+
}
136+
}
137+
138+
impl<F> StopFuture<F> {
139+
pub fn new(future: F, stop_token: StopToken) -> Self {
140+
Self { future, stop_token }
141+
}
142+
}
143+
144+
impl<F> Future for StopFuture<F>
145+
where
146+
F: Future,
147+
{
148+
type Output = Option<F::Output>;
149+
150+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
151+
let mut this = self.project();
152+
if Future::poll(Pin::new(&mut this.stop_token), cx).is_ready() {
153+
Poll::Ready(None)
154+
} else {
155+
match this.future.poll(cx) {
156+
Poll::Pending => Poll::Pending,
157+
Poll::Ready(output) => Poll::Ready(Some(output)),
158+
}
159+
}
160+
}
161+
}
162+
163+
pub trait StopFutureExt: Sized {
164+
fn stop_on(self, stop_token: StopToken) -> StopFuture<Self> {
165+
StopFuture::new(self, stop_token)
166+
}
167+
}
168+
169+
impl<F> StopFutureExt for F where F: Future {}
170+
171+
#[cfg(test)]
172+
mod tests {
173+
use std::thread;
174+
use std::time::Duration;
175+
176+
use async_std::prelude::{FutureExt, StreamExt};
177+
178+
use super::*;
179+
180+
#[test]
181+
fn test_cancellation() {
182+
let source = StopSource::new();
183+
let stop_token = source.token();
184+
185+
let pending_stream1 = async_std::stream::pending::<()>();
186+
let pending_stream2 = async_std::stream::pending::<()>();
187+
let pending_future1 = async_std::future::pending::<()>();
188+
let pending_future2 = async_std::future::pending::<()>();
189+
let wrapped_stream1 = pending_stream1.stop_on(stop_token.clone());
190+
let wrapped_stream2 = pending_stream2.stop_on(stop_token.clone());
191+
let wrapped_future1 = pending_future1.stop_on(stop_token.clone());
192+
let wrapped_future2 = pending_future2.stop_on(stop_token);
193+
194+
let join_future = wrapped_stream1
195+
.last()
196+
.join(wrapped_stream2.last())
197+
.join(wrapped_future1)
198+
.join(wrapped_future2);
199+
200+
thread::spawn(move || {
201+
let source = source;
202+
thread::sleep(Duration::from_secs(1));
203+
drop(source);
204+
});
205+
206+
let res = async_std::task::block_on(join_future);
207+
assert_eq!(res, (((None, None), None), None));
208+
}
209+
210+
#[test]
211+
fn test_never() {
212+
let pending_future = async_std::future::pending::<()>();
213+
let wrapped_future = pending_future.stop_on(StopToken::never());
214+
215+
let res = async_std::task::block_on(wrapped_future.timeout(Duration::from_secs(1)));
216+
assert!(res.is_err());
217+
}
218+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ mod route;
7676
mod router;
7777
mod server;
7878

79+
pub mod cancellation;
7980
pub mod convert;
8081
pub mod listener;
8182
pub mod log;

0 commit comments

Comments
 (0)