Skip to content

Commit 6eaa246

Browse files
committed
spillway docs
1 parent 6aab04a commit 6eaa246

File tree

6 files changed

+82
-1
lines changed

6 files changed

+82
-1
lines changed

Cargo.lock

Lines changed: 36 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ socket2 = { version = "0.6" }
5757
thiserror = { version = "2.0" }
5858
tokio = { version = "1.39", features = ["net", "rt"] }
5959
tokio-rustls = { version = "0.26" }
60+
tokio-test = { version = "0.4" }
6061
tokio-util = { version = "0.7" }
6162
tracing = { version = "0.1" }
6263
webpki-roots = { version = "1" }

spillway/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@ authors.workspace = true
66
repository.workspace = true
77
edition.workspace = true
88
license.workspace = true
9-
readme.workspace = true
109
keywords.workspace = true
1110
categories.workspace = true
1211

1312
[dependencies]
1413
futures = { workspace = true }
1514
log = { workspace = true }
1615
rand = { workspace = true }
16+
17+
[dev-dependencies]
18+
tokio-test = { workspace = true }

spillway/src/lib.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
#![deny(missing_docs)]
2+
#![doc = include_str!("../README.md")]
3+
14
mod receiver;
25
mod sender;
36
mod shared;
@@ -7,6 +10,7 @@ use std::sync::Arc;
710
pub use receiver::Receiver;
811
pub use sender::Sender;
912

13+
/// Get a new spillway channel with a default concurrency level.
1014
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
1115
// const PARALLELISM: std::sync::LazyLock<usize> = std::sync::LazyLock::new(|| {
1216
// std::thread::available_parallelism()
@@ -17,6 +21,12 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
1721
channel_with_concurrency(8)
1822
}
1923

24+
/// Get a new spillway channel with the given concurrency level.
25+
///
26+
/// Use this when you need lots of parallelism, or when you know how many Senders
27+
/// you will have. Higher numbers reduce contention, but increase the cost of
28+
/// parking the Receiver when idle. Thread count is a good starting point for
29+
/// concurrency.
2030
pub fn channel_with_concurrency<T>(concurrency: usize) -> (Sender<T>, Receiver<T>) {
2131
let shared = Arc::new(shared::Shared::new(concurrency));
2232
let sender = Sender::new(shared.clone());

spillway/src/receiver.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::{collections::VecDeque, sync::Arc};
22

33
use crate::shared::Shared;
44

5+
/// The receiving half of a Spillway channel.
56
pub struct Receiver<T> {
67
cursor: usize,
78
buffer: VecDeque<T>,
@@ -26,6 +27,20 @@ impl<T> Receiver<T> {
2627
}
2728
}
2829

30+
/// This pattern does check whether the channel is closed. It's useful for examples and
31+
/// some kinds of synchronous code, but use `next` or `poll_next` with async code.
32+
pub fn try_next(&mut self) -> Option<T> {
33+
match self.poll_next(&mut std::task::Context::from_waker(std::task::Waker::noop())) {
34+
std::task::Poll::Ready(next) => next,
35+
std::task::Poll::Pending => None,
36+
}
37+
}
38+
39+
/// The raw next value for the Receiver.
40+
///
41+
/// * `Poll::Pending` when caught up and waiting for new messages.
42+
/// * `Poll::Ready(Some(value))` for the next value.
43+
/// * `Poll::Ready(None)` when all senders have been dropped and the Receiver is caught up. The Receiver will never receive more messages and you should drop it.
2944
pub fn poll_next(&mut self, context: &mut std::task::Context) -> std::task::Poll<Option<T>> {
3045
match self.buffer.pop_front() {
3146
Some(next) => std::task::Poll::Ready(Some(next)),
@@ -79,6 +94,10 @@ impl<T> Receiver<T> {
7994
}
8095
}
8196

97+
/// The next value for the Receiver.
98+
///
99+
/// * Some(T) is the next value.
100+
/// * None when all senders have been dropped and the Receiver is caught up. The Receiver will never receive more messages and you should drop it.
82101
#[inline]
83102
pub async fn next(&mut self) -> Option<T> {
84103
std::future::poll_fn(|context| self.poll_next(context)).await

spillway/src/sender.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ use std::sync::Arc;
22

33
use crate::shared::Shared;
44

5+
/// The sending half of a Spillway channel.
6+
///
7+
/// You `clone()` to create more Senders.
58
pub struct Sender<T> {
69
chute: usize,
710
shared: Arc<Shared<T>>,
@@ -43,6 +46,16 @@ impl<T> Sender<T> {
4346
}
4447
}
4548

49+
/// Send a value to the Receiver.
50+
///
51+
/// Messages are only guaranteed to arrive in order on a per-Sender basis.
52+
///
53+
/// If you send 1, 2, 3 in this Sender, you will receive 1, 2, 3 in that order.
54+
/// If you send 4, 5, 6 from another Sender, you will receive 4, 5, 6 in that order too.
55+
///
56+
/// However, you might receive 1, 4, 5, 2, 3, 6 or any other interleaving. But
57+
/// 1 will always appear before 2, and 2 before 3; and 4 will always appear before 5,
58+
/// and 5 before 6.
4659
pub fn send(&self, value: T) -> Result<(), T> {
4760
self.shared.send(self.chute, value)
4861
}

0 commit comments

Comments
 (0)