Skip to content

Commit 4a16757

Browse files
committed
initial
0 parents  commit 4a16757

File tree

5 files changed

+272
-0
lines changed

5 files changed

+272
-0
lines changed

.github/workflows/ci.yml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
name: CI
2+
3+
on:
4+
pull_request:
5+
push:
6+
branches:
7+
- staging
8+
- trying
9+
10+
env:
11+
RUSTFLAGS: -Dwarnings
12+
13+
jobs:
14+
build_and_test:
15+
name: Build and test
16+
runs-on: ubuntu-latest
17+
18+
steps:
19+
- uses: actions/checkout@master
20+
21+
- name: Install nightly
22+
uses: actions-rs/toolchain@v1
23+
with:
24+
toolchain: nightly
25+
override: true
26+
27+
- name: tests
28+
uses: actions-rs/cargo@v1
29+
with:
30+
command: test
31+
args: --features unstable

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
/target
2+
**/*.rs.bk
3+
Cargo.lock

Cargo.toml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
[package]
2+
name = "stop-token"
3+
version = "0.1.0"
4+
authors = ["Aleksey Kladov <[email protected]>"]
5+
edition = "2018"
6+
license = "MIT OR Apache-2.0"
7+
repository = "https://github.com/async-rs/stop-token"
8+
9+
description = "Experimental cooperative cancellation for async-std"
10+
11+
[dependencies]
12+
pin-project-lite = "0.1.0"
13+
async-std = "0.99"
14+
15+
[features]
16+
unstable = ["async-std/unstable"]

README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Cooperative cancellation for [async-std](https://async.rs/).
2+
3+
Status: experimental.
4+
5+
See crate docs for details
6+
7+
```rust
8+
use stop_token::StopToken;
9+
10+
async fn do_work(work: impl Stream<Item = Event>, stop_token: StopToken) {
11+
// The `work` stream will end early: as soon as `stop_token` is cancelled.
12+
let mut work = stop_token.stop_stream(work);
13+
while let Some(event) = work.next().await {
14+
process_event(event).await
15+
}
16+
}
17+
```

src/lib.rs

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
//! Cooperative cancellation for [async-std](https://async.rs/).
2+
//!
3+
//! # Status
4+
//!
5+
//! Experimental. The library works as is, breaking changes will bump major
6+
//! version, but there are no guarantees of long-term support.
7+
//!
8+
//! Additionally, this library uses unstable cargo feature feature of `async-std` and, for
9+
//! this reason, should be used like this:
10+
//!
11+
//! ```toml
12+
//! [dependencies.stop-token]
13+
//! version = "0.1.0"
14+
//! features = [ "unstable" ]
15+
//! ```
16+
//!
17+
//! # Motivation
18+
//!
19+
//! Rust futures come with a build-in cancellation mechanism: dropping a future
20+
//! prevents any further progress of the future. This is a *hard* cancellation
21+
//! mechanism, meaning that the future can potentially be cancelled at any
22+
//! `.await` expression.
23+
//!
24+
//! Sometimes, you need are more fine-grained cancellation mechanism. Imagine a
25+
//! chat server that relays messages to peers. Sending a single message
26+
//! potentially needs several writes on the socket object. That means that, if
27+
//! we use hard-cancellation for client connections, a connection can be
28+
//! abruptly terminated mid-message (even mid-emoji, if we are especially
29+
//! unlucky). What we need here is cooperative cancellation: client connection
30+
//! should be gracefully shutdown *between* the messages.
31+
//!
32+
//! More generally, if you have an event processing loop like
33+
//!
34+
//! ```ignore
35+
//! while let Some(event) = work.next().await {
36+
//! process_event(event).await
37+
//! }
38+
//! ```
39+
//!
40+
//! you usually want to maintain an invariant that each event is either fully
41+
//! processed or not processed at all. If you need to terminate this loop early,
42+
//! you want to do this *between* iteration.
43+
//!
44+
//! # Usage
45+
//!
46+
//! You can use `stop_token` for this:
47+
//!
48+
//! ```
49+
//! use async_std::prelude::*;
50+
//! use stop_token::StopToken;
51+
//!
52+
//! struct Event;
53+
//!
54+
//! async fn do_work(work: impl Stream<Item = Event> + Unpin, stop_token: StopToken) {
55+
//! let mut work = stop_token.stop_stream(work);
56+
//! while let Some(event) = work.next().await {
57+
//! process_event(event).await
58+
//! }
59+
//! }
60+
//!
61+
//! async fn process_event(_event: Event) {
62+
//! }
63+
//! ```
64+
//!
65+
//! # Lineage
66+
//!
67+
//! The cancellation system is a subset of `C#` [`CancellationToken / CancellationTokenSource`](https://docs.microsoft.com/en-us/dotnet/standard/threading/cancellation-in-managed-threads).
68+
//! The `StopToken / StopTokenSource` terminology is from C++ paper P0660: https://wg21.link/p0660.
69+
70+
use std::pin::Pin;
71+
use std::task::{Context, Poll};
72+
73+
use async_std::prelude::*;
74+
use async_std::sync::{channel, Receiver, Sender};
75+
use pin_project_lite::pin_project;
76+
77+
enum Never {}
78+
79+
/// `StopSource` produces `StopToken` and cancels all of its tokens on drop.
80+
///
81+
/// # Example:
82+
///
83+
/// ```ignore
84+
/// let stop_source = StopSource::new();
85+
/// let stop_token = stop_source.stop_token();
86+
/// schedule_some_work(stop_token);
87+
/// drop(stop_source); // At this point, scheduled work notices that it is canceled.
88+
/// ```
89+
#[derive(Debug)]
90+
pub struct StopSource {
91+
/// Solely for `Drop`.
92+
_chan: Sender<Never>,
93+
stop_token: StopToken,
94+
}
95+
96+
/// `StopToken` is a future which completes when the associated `StopSource` is dropped.
97+
#[derive(Debug, Clone)]
98+
pub struct StopToken {
99+
chan: Receiver<Never>,
100+
}
101+
102+
impl Default for StopSource {
103+
fn default() -> StopSource {
104+
let (sender, receiver) = channel::<Never>(0);
105+
106+
StopSource {
107+
_chan: sender,
108+
stop_token: StopToken { chan: receiver },
109+
}
110+
}
111+
}
112+
113+
impl StopSource {
114+
/// Creates a new `StopSource`.
115+
pub fn new() -> StopSource {
116+
StopSource::default()
117+
}
118+
119+
/// Produces a new `StopToken`, associated with this source.
120+
///
121+
/// Once the source is destroyed, `StopToken` future completes.
122+
pub fn stop_token(&self) -> StopToken {
123+
self.stop_token.clone()
124+
}
125+
}
126+
127+
impl Future for StopToken {
128+
type Output = ();
129+
130+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
131+
let chan = Pin::new(&mut self.chan);
132+
match Stream::poll_next(chan, cx) {
133+
Poll::Pending => Poll::Pending,
134+
Poll::Ready(Some(never)) => match never {},
135+
Poll::Ready(None) => Poll::Ready(()),
136+
}
137+
}
138+
}
139+
140+
impl StopToken {
141+
/// Applies the token to the `stream`, such that the resulting stream
142+
/// produces no more items once the token becomes cancelled.
143+
pub fn stop_stream<S: Stream>(&self, stream: S) -> StopStream<S> {
144+
StopStream {
145+
stop_token: self.clone(),
146+
stream,
147+
}
148+
}
149+
150+
/// Applies the token to the `future`, such that the resulting future
151+
/// completes with `None` if the token is cancelled.
152+
pub fn stop_future<F: Future>(&self, future: F) -> StopFuture<F> {
153+
StopFuture {
154+
stop_token: self.clone(),
155+
future,
156+
}
157+
}
158+
}
159+
160+
pin_project! {
161+
#[derive(Debug)]
162+
pub struct StopStream<S> {
163+
#[pin]
164+
stop_token: StopToken,
165+
#[pin]
166+
stream: S,
167+
}
168+
}
169+
170+
impl<S: Stream> Stream for StopStream<S> {
171+
type Item = S::Item;
172+
173+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
174+
let this = self.project();
175+
if let Poll::Ready(()) = this.stop_token.poll(cx) {
176+
return Poll::Ready(None);
177+
}
178+
this.stream.poll_next(cx)
179+
}
180+
}
181+
182+
pin_project! {
183+
#[derive(Debug)]
184+
pub struct StopFuture<F> {
185+
#[pin]
186+
stop_token: StopToken,
187+
#[pin]
188+
future: F,
189+
}
190+
}
191+
192+
impl<F: Future> Future for StopFuture<F> {
193+
type Output = Option<F::Output>;
194+
195+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<F::Output>> {
196+
let this = self.project();
197+
if let Poll::Ready(()) = this.stop_token.poll(cx) {
198+
return Poll::Ready(None);
199+
}
200+
match this.future.poll(cx) {
201+
Poll::Pending => Poll::Pending,
202+
Poll::Ready(it) => Poll::Ready(Some(it)),
203+
}
204+
}
205+
}

0 commit comments

Comments
 (0)