Skip to content

Commit e344fbf

Browse files
authored
Merge pull request #13 from halzy/halzy/10-docs
feat!: Added HalvesStream, example.
2 parents 6cf1960 + b9f0fd7 commit e344fbf

File tree

4 files changed

+180
-4
lines changed

4 files changed

+180
-4
lines changed

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "stream_multiplexer"
3-
version = "0.4.1"
3+
version = "0.5.0"
44
authors = ["Benjamin Halsted <bhalsted@gmail.com>"]
55
edition = "2018"
66
license = "MIT OR Apache-2.0"
@@ -17,13 +17,13 @@ byteorder = "1.3"
1717
bytes = "0.5"
1818
futures = { version = "0.3", default-features = false, features = ["alloc"] }
1919
thiserror = "1.0"
20-
tokio = { version = "0.2", features = ["full"] }
20+
tokio = { version = "0.2.13", features = ["full"] }
2121
tokio-util = { version = "0.2", features = ["codec"] }
2222
tracing = { version = "0.1", features = ["log"] }
2323
tracing-futures = "0.2"
2424

2525
[dev-dependencies]
2626
futures = { version = "0.3", default-features = false, features = ["alloc","std"] }
2727
matches = "0.1"
28-
tokio = { version = "0.2", features = ["full", "test-util"] }
28+
tokio = { version = "0.2.13", features = ["full", "test-util"] }
2929
tracing-subscriber = "0.2"

src/halves_stream.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use futures::stream::TryStream;
2+
use futures::Stream;
3+
use tokio::io::{AsyncRead, AsyncWrite, ReadHalf, WriteHalf};
4+
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
5+
6+
use std::pin::Pin;
7+
use std::task::Poll;
8+
9+
/// Takes a Stream<Item=AsyncRead + AsyncWrite> and provides a
10+
/// Stream<Item=( FramedWrite<WriteHalf, LengthDelimitedCodec>, FramedRead<ReadHalf, LengthDelimitedCodec>)>
11+
#[derive(Debug)]
12+
pub struct HalvesStream<St> {
13+
inner: St,
14+
length_field_length: usize,
15+
}
16+
17+
impl<St> HalvesStream<St> {
18+
/// Takes a TcpListener to help own the listener while producing TcpStreams
19+
pub fn new(inner: St, length_field_length: usize) -> Self {
20+
Self {
21+
inner,
22+
length_field_length,
23+
}
24+
}
25+
}
26+
27+
impl<St> Stream for HalvesStream<St>
28+
where
29+
St: TryStream<Error = std::io::Error> + Unpin,
30+
St::Ok: AsyncRead + AsyncWrite,
31+
{
32+
type Item = Result<
33+
(
34+
FramedWrite<WriteHalf<St::Ok>, LengthDelimitedCodec>,
35+
FramedRead<ReadHalf<St::Ok>, LengthDelimitedCodec>,
36+
),
37+
St::Error,
38+
>;
39+
fn poll_next(
40+
mut self: Pin<&mut Self>,
41+
ctx: &mut std::task::Context,
42+
) -> Poll<Option<Self::Item>> {
43+
match futures::ready!(Pin::new(&mut self.inner).try_poll_next(ctx)) {
44+
None => None.into(),
45+
Some(Err(err)) => Poll::Ready(Some(Err(err))),
46+
Some(Ok(stream)) => {
47+
let (reader, writer) = tokio::io::split(stream);
48+
49+
// Wrap the writer in a FramedCodec
50+
let framed_write = LengthDelimitedCodec::builder()
51+
.length_field_length(self.length_field_length)
52+
.new_write(writer);
53+
54+
let framed_read = LengthDelimitedCodec::builder()
55+
.length_field_length(self.length_field_length)
56+
.new_read(reader);
57+
58+
Poll::Ready(Some(Ok((framed_write, framed_read))))
59+
}
60+
}
61+
}
62+
}

src/lib.rs

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,121 @@ This crate provides stream multiplexing with channels.
2020
Channels have their own backpressure that does not affect other channels.
2121
2222
Incoming streams are by default set to channel 0 and can be moved to other channels via `ControlMessage`s.
23+
24+
```rust
25+
use bytes::Bytes;
26+
use tokio::net::TcpListener;
27+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
28+
use tokio::sync::mpsc;
29+
use stream_multiplexer::{Multiplexer, HalvesStream, ControlMessage, IncomingPacket, OutgoingPacket};
30+
use futures::stream::StreamExt;
31+
32+
# tokio::runtime::Builder::new().basic_scheduler().enable_all().build().unwrap().block_on(async move {
33+
// 3 channels of incoming streams, 0 is the channel that new streams join.
34+
// Backpressure is per channel. Streams can be moved between channels by
35+
// sending an OutgoingPackt::ChangeChannel message.
36+
let (channel0_tx, mut channel0_rx) = mpsc::channel(32);
37+
let (channel1_tx, mut channel1_rx) = mpsc::channel(32);
38+
let (channel2_tx, mut channel2_rx) = mpsc::channel(32);
39+
40+
// A Stream for outgoing messages.
41+
let (mut outgoing_tx, outgoing_rx) = mpsc::channel::<OutgoingPacket<Bytes>>(32);
42+
43+
// Construct the multiplexer, giving it the OutgoingPacket stream, and a vector of incoming
44+
// streams. The backlog controls how much of an internal buffer each WriteHalf (TcpSocket in this example) can have.
45+
let outgoing_streams_backlog = 128;
46+
let multiplexer = Multiplexer::new(
47+
outgoing_streams_backlog,
48+
outgoing_rx,
49+
vec![channel0_tx, channel1_tx, channel2_tx],
50+
);
51+
52+
// Bind to a random port on localhost
53+
let socket = TcpListener::bind("127.0.0.1:0").await.unwrap();
54+
55+
let local_addr = socket.local_addr().unwrap();
56+
57+
// Use the HalvesStream utility struct to map the stream of new sockets.
58+
// It will use LengthDelimitedCodec with 2 bytes as the packet size.
59+
let halves = HalvesStream::new(socket, 2);
60+
61+
// Control channel for shutting down the multiplexer
62+
let (control_write, control_read) = mpsc::unbounded_channel();
63+
let mp_joinhandle = tokio::task::spawn(multiplexer.run(halves, control_read));
64+
65+
// Make a test connection:
66+
let mut client = tokio::net::TcpStream::connect(local_addr).await.unwrap();
67+
68+
// Send 'a message'
69+
let mut data = Bytes::from("\x00\x09a message");
70+
client.write_buf(&mut data).await.unwrap();
71+
client.flush();
72+
73+
// Receive 'a message' on channel 0
74+
let incoming_packet = channel0_rx.recv().await.unwrap();
75+
assert_eq!(
76+
incoming_packet
77+
.value()
78+
.expect("should have a value")
79+
.as_ref()
80+
.unwrap(),
81+
&Bytes::from("a message")
82+
);
83+
84+
// Move the client to channel 1
85+
outgoing_tx
86+
.send(OutgoingPacket::ChangeChannel(vec![incoming_packet.id()], 1))
87+
.await
88+
.unwrap();
89+
90+
// Send 'a message' again, on channel 1 this time.
91+
let mut data = Bytes::from("\x00\x09a message");
92+
client.write_buf(&mut data).await.unwrap();
93+
client.flush();
94+
95+
// Receive 'a message' on channel 1
96+
let incoming_packet = channel1_rx.recv().await.unwrap();
97+
assert_eq!(
98+
incoming_packet
99+
.value()
100+
.expect("should have a value")
101+
.as_ref()
102+
.unwrap(),
103+
&Bytes::from("a message")
104+
);
105+
106+
// Move the client to channel 2
107+
outgoing_tx
108+
.send(OutgoingPacket::ChangeChannel(vec![incoming_packet.id()], 2))
109+
.await
110+
.unwrap();
111+
112+
// Send 'a message' again, on channel 2 this time.
113+
let mut data = Bytes::from("\x00\x09a message");
114+
client.write_buf(&mut data).await.unwrap();
115+
client.flush();
116+
117+
// Receive 'a message' on channel 2
118+
let incoming_packet = channel2_rx.recv().await.unwrap();
119+
assert_eq!(
120+
incoming_packet
121+
.value()
122+
.expect("should have a value")
123+
.as_ref()
124+
.unwrap(),
125+
&Bytes::from("a message")
126+
);
127+
128+
// Tell multiplexer te shut down
129+
control_write.send(ControlMessage::Shutdown).unwrap();
130+
131+
mp_joinhandle.await.unwrap();
132+
# });
133+
```
23134
*/
24135
mod error;
25136
mod halt;
137+
mod halves_stream;
26138
mod id_gen;
27139
mod multiplexer;
28140
mod multiplexer_senders;
@@ -32,6 +144,7 @@ mod stream_mover;
32144

33145
pub use error::*;
34146
use halt::*;
147+
pub use halves_stream::*;
35148
pub use id_gen::*;
36149
pub use multiplexer::*;
37150
use multiplexer_senders::*;
@@ -45,6 +158,7 @@ type StreamId = usize;
45158
pub struct IncomingMessage<V> {
46159
/// Stream Id that the message if for
47160
pub id: StreamId,
161+
48162
/// Value received from a stream
49163
pub value: V,
50164
}

src/multiplexer_senders.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ mod tests {
260260

261261
#[tokio::test(basic_scheduler)]
262262
async fn send_message_and_shutdown() {
263-
// crate::tests::init_logging();
263+
//crate::tests::init_logging();
264264

265265
// Set up a teest incrementer
266266
let id_gen = IncrementIdGen::default();

0 commit comments

Comments
 (0)