Skip to content

Commit b766d45

Browse files
romanbmxinden
andauthored
[mplex] Split the receive buffer per substream. (#1784)
* Split the receive buffer per substream. This split allows more efficient reading from the buffer for a particular substream and to reset only the offending substream if it reaches its buffer limit with `MaxBufferBehaviour::ResetStream`. Previously this was implemented as `MaxBufferBehaviour::CloseAll` and resulted in the entire connection closing. The buffer split should be advantageous whenever not all substreams are read at the same pace and some temporarily fall behind in consuming inbound data frames. * Tweak logging. * Oops. * Update muxers/mplex/src/io.rs Co-authored-by: Max Inden <[email protected]> * Rename field as per review suggestion. * Adjust and clarify max-buffer-behaviour. * Set max_buffer_len to 32. Since the limit is now per substream and the default `max_substreams` is `128`, this new limit retains the previous overall resource bounds for the buffers. * Expand tests and small cleanup. Co-authored-by: Max Inden <[email protected]>
1 parent 6ed92ab commit b766d45

File tree

4 files changed

+589
-226
lines changed

4 files changed

+589
-226
lines changed

muxers/mplex/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,13 @@ futures_codec = "0.4"
1717
libp2p-core = { version = "0.22.0", path = "../../core" }
1818
log = "0.4"
1919
parking_lot = "0.11"
20+
smallvec = "1.4"
2021
unsigned-varint = { version = "0.5", features = ["futures-codec"] }
2122

2223
[dev-dependencies]
2324
async-std = "1.6.2"
25+
env_logger = "0.6"
26+
futures = "0.3"
2427
libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] }
28+
quickcheck = "0.9"
29+
rand = "0.7"

muxers/mplex/src/codec.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,25 @@ impl LocalStreamId {
7777
Self { num, role: Endpoint::Dialer }
7878
}
7979

80+
#[cfg(test)]
81+
pub fn listener(num: u32) -> Self {
82+
Self { num, role: Endpoint::Listener }
83+
}
84+
8085
pub fn next(self) -> Self {
8186
Self {
8287
num: self.num.checked_add(1).expect("Mplex substream ID overflowed"),
8388
.. self
8489
}
8590
}
91+
92+
#[cfg(test)]
93+
pub fn into_remote(self) -> RemoteStreamId {
94+
RemoteStreamId {
95+
num: self.num,
96+
role: !self.role,
97+
}
98+
}
8699
}
87100

88101
impl RemoteStreamId {
@@ -105,7 +118,7 @@ impl RemoteStreamId {
105118
}
106119

107120
/// An Mplex protocol frame.
108-
#[derive(Debug, Clone)]
121+
#[derive(Debug, Clone, PartialEq, Eq)]
109122
pub enum Frame<T> {
110123
Open { stream_id: T },
111124
Data { stream_id: T, data: Bytes },
@@ -114,20 +127,14 @@ pub enum Frame<T> {
114127
}
115128

116129
impl Frame<RemoteStreamId> {
117-
fn remote_id(&self) -> RemoteStreamId {
130+
pub fn remote_id(&self) -> RemoteStreamId {
118131
match *self {
119132
Frame::Open { stream_id } => stream_id,
120133
Frame::Data { stream_id, .. } => stream_id,
121134
Frame::Close { stream_id, .. } => stream_id,
122135
Frame::Reset { stream_id, .. } => stream_id,
123136
}
124137
}
125-
126-
/// Gets the `LocalStreamId` corresponding to the `RemoteStreamId`
127-
/// received with this frame.
128-
pub fn local_id(&self) -> LocalStreamId {
129-
self.remote_id().into_local()
130-
}
131138
}
132139

133140
pub struct Codec {

muxers/mplex/src/config.rs

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ use std::cmp;
2424
/// Configuration for the multiplexer.
2525
#[derive(Debug, Clone)]
2626
pub struct MplexConfig {
27-
/// Maximum number of simultaneously-open substreams.
27+
/// Maximum number of simultaneously used substreams.
2828
pub(crate) max_substreams: usize,
29-
/// Maximum number of frames in the internal buffer.
29+
/// Maximum number of frames buffered per substream.
3030
pub(crate) max_buffer_len: usize,
31-
/// Behaviour when the buffer size limit is reached.
31+
/// Behaviour when the buffer size limit is reached for a substream.
3232
pub(crate) max_buffer_behaviour: MaxBufferBehaviour,
3333
/// When sending data, split it into frames whose maximum size is this value
3434
/// (max 1MByte, as per the Mplex spec).
@@ -41,32 +41,37 @@ impl MplexConfig {
4141
Default::default()
4242
}
4343

44-
/// Sets the maximum number of simultaneously open substreams.
44+
/// Sets the maximum number of simultaneously used substreams.
45+
///
46+
/// A substream is used as long as it has not been dropped,
47+
/// even if it may already be closed or reset at the protocol
48+
/// level (in which case it may still have buffered data that
49+
/// can be read before the `StreamMuxer` API signals EOF).
4550
///
4651
/// When the limit is reached, opening of outbound substreams
47-
/// is delayed until another substream closes, whereas new
52+
/// is delayed until another substream is dropped, whereas new
4853
/// inbound substreams are immediately answered with a `Reset`.
4954
/// If the number of inbound substreams that need to be reset
5055
/// accumulates too quickly (judged by internal bounds), the
51-
/// connection is closed, the connection is closed with an error
52-
/// due to the misbehaved remote.
56+
/// connection is closed with an error due to the misbehaved
57+
/// remote.
5358
pub fn max_substreams(&mut self, max: usize) -> &mut Self {
5459
self.max_substreams = max;
5560
self
5661
}
5762

58-
/// Sets the maximum number of frames buffered that have
59-
/// not yet been consumed.
63+
/// Sets the maximum number of frames buffered per substream.
6064
///
6165
/// A limit is necessary in order to avoid DoS attacks.
6266
pub fn max_buffer_len(&mut self, max: usize) -> &mut Self {
6367
self.max_buffer_len = max;
6468
self
6569
}
6670

67-
/// Sets the behaviour when the maximum buffer length has been reached.
71+
/// Sets the behaviour when the maximum buffer size is reached
72+
/// for a substream.
6873
///
69-
/// See the documentation of `MaxBufferBehaviour`.
74+
/// See the documentation of [`MaxBufferBehaviour`].
7075
pub fn max_buffer_len_behaviour(&mut self, behaviour: MaxBufferBehaviour) -> &mut Self {
7176
self.max_buffer_behaviour = behaviour;
7277
self
@@ -84,21 +89,24 @@ impl MplexConfig {
8489
/// Behaviour when the maximum length of the buffer is reached.
8590
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
8691
pub enum MaxBufferBehaviour {
87-
/// Produce an error on all the substreams.
88-
CloseAll,
89-
/// No new message will be read from the underlying connection if the buffer is full.
92+
/// Reset the substream whose frame buffer overflowed.
93+
ResetStream,
94+
/// No new message can be read from any substream as long as the buffer
95+
/// for a single substream is full.
9096
///
91-
/// This can potentially introduce a deadlock if you are waiting for a message from a substream
92-
/// before processing the messages received on another substream.
97+
/// This can potentially introduce a deadlock if you are waiting for a
98+
/// message from a substream before processing the messages received
99+
/// on another substream, i.e. if there are data dependencies across
100+
/// substreams.
93101
Block,
94102
}
95103

96104
impl Default for MplexConfig {
97105
fn default() -> MplexConfig {
98106
MplexConfig {
99107
max_substreams: 128,
100-
max_buffer_len: 4096,
101-
max_buffer_behaviour: MaxBufferBehaviour::CloseAll,
108+
max_buffer_len: 32,
109+
max_buffer_behaviour: MaxBufferBehaviour::ResetStream,
102110
split_send_size: 1024,
103111
}
104112
}

0 commit comments

Comments
 (0)