@@ -22,17 +22,19 @@ Channels have their own backpressure that does not affect other channels.
2222Incoming streams are by default set to channel 0 and can be moved to other channels via `ControlMessage`s.
2323
2424```rust
25+ # use std::error::Error;
2526use bytes::Bytes;
2627use tokio::net::TcpListener;
2728use tokio::io::{AsyncReadExt, AsyncWriteExt};
2829use tokio::sync::mpsc;
2930use stream_multiplexer::{Multiplexer, HalvesStream, ControlMessage, IncomingPacket, OutgoingPacket};
3031use futures::stream::StreamExt;
3132
33+ # fn main() -> Result<(), Box<dyn Error>> {
3234# tokio::runtime::Builder::new().basic_scheduler().enable_all().build().unwrap().block_on(async move {
3335// 3 channels of incoming streams, 0 is the channel that new streams join.
3436// Backpressure is per channel. Streams can be moved between channels by
35- // sending an OutgoingPackt ::ChangeChannel message.
37+ // sending an OutgoingPacket ::ChangeChannel message.
3638let (channel0_tx, mut channel0_rx) = mpsc::channel(32);
3739let (channel1_tx, mut channel1_rx) = mpsc::channel(32);
3840let (channel2_tx, mut channel2_rx) = mpsc::channel(32);
@@ -50,9 +52,9 @@ let multiplexer = Multiplexer::new(
5052);
5153
5254// Bind to a random port on localhost
53- let socket = TcpListener::bind("127.0.0.1:0").await.unwrap() ;
55+ let socket = TcpListener::bind("127.0.0.1:0").await? ;
5456
55- let local_addr = socket.local_addr().unwrap() ;
57+ let local_addr = socket.local_addr()? ;
5658
5759// Use the HalvesStream utility struct to map the stream of new sockets.
5860// It will use LengthDelimitedCodec with 2 bytes as the packet size.
@@ -63,11 +65,11 @@ let (control_write, control_read) = mpsc::unbounded_channel();
6365let mp_joinhandle = tokio::task::spawn(multiplexer.run(halves, control_read));
6466
6567// Make a test connection:
66- let mut client = tokio::net::TcpStream::connect(local_addr).await.unwrap() ;
68+ let mut client = tokio::net::TcpStream::connect(local_addr).await? ;
6769
6870// Send 'a message'
6971let mut data = Bytes::from("\x00\x09a message");
70- client.write_buf(&mut data).await.unwrap() ;
72+ client.write_buf(&mut data).await? ;
7173client.flush();
7274
7375// Receive 'a message' on channel 0
@@ -84,12 +86,11 @@ assert_eq!(
8486// Move the client to channel 1
8587outgoing_tx
8688 .send(OutgoingPacket::ChangeChannel(vec![incoming_packet.id()], 1))
87- .await
88- .unwrap();
89+ .await?;
8990
9091// Send 'a message' again, on channel 1 this time.
9192let mut data = Bytes::from("\x00\x09a message");
92- client.write_buf(&mut data).await.unwrap() ;
93+ client.write_buf(&mut data).await? ;
9394client.flush();
9495
9596// Receive 'a message' on channel 1
@@ -106,12 +107,11 @@ assert_eq!(
106107// Move the client to channel 2
107108outgoing_tx
108109 .send(OutgoingPacket::ChangeChannel(vec![incoming_packet.id()], 2))
109- .await
110- .unwrap();
110+ .await?;
111111
112112// Send 'a message' again, on channel 2 this time.
113113let mut data = Bytes::from("\x00\x09a message");
114- client.write_buf(&mut data).await.unwrap() ;
114+ client.write_buf(&mut data).await? ;
115115client.flush();
116116
117117// Receive 'a message' on channel 2
@@ -125,11 +125,14 @@ assert_eq!(
125125 &Bytes::from("a message")
126126);
127127
128- // Tell multiplexer te shut down
129- control_write.send(ControlMessage::Shutdown).unwrap() ;
128+ // Tell multiplexer to shut down
129+ control_write.send(ControlMessage::Shutdown)? ;
130130
131131mp_joinhandle.await.unwrap();
132+ # Ok::<_, Box<dyn Error>>(())
132133# });
134+ # Ok(())
135+ # }
133136```
134137*/
135138mod error;
0 commit comments