Skip to content

Commit 4e2b3d2

Browse files
authored
Merge pull request #18 from halzy/halzy/17-multi-value
feat!: Updated OutgoingMessage to have multiple values for a payload.
2 parents 1a46c6d + 47d7f59 commit 4e2b3d2

File tree

4 files changed

+42
-24
lines changed

4 files changed

+42
-24
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "stream_multiplexer"
3-
version = "0.6.0"
3+
version = "0.7.0"
44
authors = ["Benjamin Halsted <[email protected]>"]
55
edition = "2018"
66
license = "MIT OR Apache-2.0"
@@ -21,6 +21,7 @@ tokio = { version = "0.2.13", features = ["full"] }
2121
tokio-util = { version = "0.2", features = ["codec"] }
2222
tracing = { version = "0.1", features = ["log"] }
2323
tracing-futures = "0.2"
24+
tinyvec = { version = "0.3", features = ["alloc"] }
2425

2526
[dev-dependencies]
2627
futures = { version = "0.3", default-features = false, features = ["alloc","std"] }

src/lib.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ use send_all_own::*;
155155
use sender::*;
156156
use stream_mover::*;
157157

158+
use std::iter::FromIterator;
159+
158160
type StreamId = usize;
159161

160162
/// Produced by the incoming stream
@@ -227,12 +229,16 @@ impl<V> std::fmt::Debug for IncomingPacket<V> {
227229
#[derive(Clone)]
228230
pub struct OutgoingMessage<V> {
229231
stream_ids: Vec<StreamId>,
230-
value: V,
232+
value: tinyvec::TinyVec<[Option<V>; 16]>,
231233
}
232234
impl<V> OutgoingMessage<V> {
233235
/// Creates a new message that is to be delivered to streams with `ids`.
234-
pub fn new(stream_ids: Vec<StreamId>, value: V) -> Self {
235-
Self { stream_ids, value }
236+
pub fn new(stream_ids: Vec<StreamId>, values: impl IntoIterator<Item = V>) -> Self {
237+
let values = tinyvec::TinyVec::from_iter(values.into_iter().map(Some));
238+
Self {
239+
stream_ids,
240+
value: values,
241+
}
236242
}
237243
}
238244

src/multiplexer_senders.rs

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -138,30 +138,41 @@ where
138138
(futures_len, sender_pairs_len)
139139
}
140140

141-
fn handle_new_message(&mut self, message: OutgoingMessage<Item>) {
141+
fn handle_new_message(&mut self, mut message: OutgoingMessage<Item>) {
142142
for stream_id in message.stream_ids {
143143
match self.sender_pairs.entry(stream_id) {
144144
Entry::Vacant(_) => {
145145
tracing::warn!(stream_id, "Tring to send message to non-existent stream.");
146146
}
147147
Entry::Occupied(mut sender_pair_entry) => {
148-
let sender_pair = sender_pair_entry.get_mut();
149-
match sender_pair.try_send(message.value.clone()) {
150-
Ok(()) => {
151-
// Enqueue the message and move the sender into the FuturesUnordered
152-
tracing::trace!(stream_id, "Enqueued message.");
153-
if let Some(sender) = sender_pair.take() {
154-
self.senders_stream.push(sender.into_future());
148+
let mut should_remove = false;
149+
for value in message.value.drain(..).flatten() {
150+
let sender_pair = sender_pair_entry.get_mut();
151+
match sender_pair.try_send(value.clone()) {
152+
Ok(()) => {
153+
// Enqueue the message and move the sender into the FuturesUnordered
154+
tracing::trace!(stream_id, "Enqueued message.");
155+
if let Some(sender) = sender_pair.take() {
156+
self.senders_stream.push(sender.into_future());
157+
}
155158
}
156-
}
157-
Err(TrySendError::Full(_)) => {
158-
tracing::error!(stream_id, "Stream is full, shutting down sender.");
159-
sender_pair_entry.remove_entry();
160-
}
161-
Err(TrySendError::Closed(_)) => {
162-
tracing::error!(stream_id, "Stream is closed, shutting down sender.");
163-
sender_pair_entry.remove_entry();
164-
}
159+
Err(TrySendError::Full(_)) => {
160+
tracing::error!(stream_id, "Stream is full, shutting down sender.");
161+
should_remove = true;
162+
break;
163+
}
164+
Err(TrySendError::Closed(_)) => {
165+
tracing::error!(
166+
stream_id,
167+
"Stream is closed, shutting down sender."
168+
);
169+
should_remove = true;
170+
break;
171+
}
172+
};
173+
}
174+
if should_remove {
175+
sender_pair_entry.remove_entry();
165176
}
166177
}
167178
};
@@ -298,7 +309,7 @@ mod tests {
298309

299310
// Send some data to the stream
300311
for x in 0_u8..10 {
301-
let message = OutgoingMessage::new(vec![stream_id], x);
312+
let message = OutgoingMessage::new(vec![stream_id], vec![x]);
302313
message_channel.send(message).unwrap();
303314
}
304315

tests/integration.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ async fn write_packets() {
135135
// send a message
136136
let data = Bytes::from("a message");
137137
data_write
138-
.send(OutgoingMessage::new(vec![client1_id], data.clone()).into())
138+
.send(OutgoingMessage::new(vec![client1_id], vec![data.clone()]).into())
139139
.unwrap();
140140

141141
let mut read_data = BytesMut::new();
@@ -250,7 +250,7 @@ async fn change_channel() {
250250
// send a message to the client (so that the client waits and we can change channels)
251251
let data = Bytes::from("a message from the server");
252252
data_write
253-
.send(OutgoingMessage::new(vec![client1_id], data.clone()).into())
253+
.send(OutgoingMessage::new(vec![client1_id], vec![data.clone()]).into())
254254
.unwrap();
255255

256256
// client reads data

0 commit comments

Comments
 (0)