Skip to content

Commit c1ef4bf

Browse files
core/: Redesign upgrade::{write_one, write_with_len_prefix, read_one} (#2111)
1. Deprecating the `write_one` function Semantically, this function is a composition of `write_with_len_prefix` and `io.close()`. This represents a footgun because the `close` functionality is not obvious and only mentioned in the docs. Using this function multiple times on a single substream will produces hard to debug behaviour. 2. Deprecating `read_one` and `write_with_len_prefix` functions 3. Introducing `write_length_prefixed` and `read_length_prefixed` - These functions are symmetric and do exactly what you would expect, just writing to the socket without closing - They also have a symmetric interface (no more custom errors, just `io::Error`) Co-authored-by: Max Inden <[email protected]>
1 parent 4eb0659 commit c1ef4bf

File tree

8 files changed

+116
-38
lines changed

8 files changed

+116
-38
lines changed

core/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@
66

77
- Implement `Keypair::from_protobuf_encoding` for ed25519 keys (see [PR 2090]).
88

9+
- Deprecate `upgrade::write_one`.
10+
Deprecate `upgrade::write_with_len_prefix`.
11+
Deprecate `upgrade::read_one`.
12+
Introduce `upgrade::read_length_prefixed` and `upgrade::write_length_prefixed`.
13+
See [PR 2111](https://github.com/libp2p/rust-libp2p/pull/2111).
14+
915
[PR 2090]: https://github.com/libp2p/rust-libp2p/pull/2090
1016

1117
# 0.28.3 [2021-04-26]

core/src/upgrade.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,10 @@ pub use self::{
8080
map::{MapInboundUpgrade, MapOutboundUpgrade, MapInboundUpgradeErr, MapOutboundUpgradeErr},
8181
optional::OptionalUpgrade,
8282
select::SelectUpgrade,
83-
transfer::{write_one, write_with_len_prefix, write_varint, read_one, ReadOneError, read_varint},
83+
transfer::{write_length_prefixed, write_varint, read_length_prefixed, read_varint},
8484
};
85+
#[allow(deprecated)]
86+
pub use self::transfer::ReadOneError;
8587

8688
/// Types serving as protocol names.
8789
///

core/src/upgrade/from_fn.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,20 @@ use std::iter;
2828
/// # Example
2929
///
3030
/// ```
31-
/// # use libp2p_core::transport::{Transport, MemoryTransport};
32-
/// # use libp2p_core::upgrade;
31+
/// # use libp2p_core::transport::{Transport, MemoryTransport, memory::Channel};
32+
/// # use libp2p_core::{upgrade, Negotiated};
3333
/// # use std::io;
34+
/// # use futures::AsyncWriteExt;
3435
/// let _transport = MemoryTransport::default()
3536
/// .and_then(move |out, cp| {
36-
/// upgrade::apply(out, upgrade::from_fn("/foo/1", move |mut sock, endpoint| async move {
37+
/// upgrade::apply(out, upgrade::from_fn("/foo/1", move |mut sock: Negotiated<Channel<Vec<u8>>>, endpoint| async move {
3738
/// if endpoint.is_dialer() {
38-
/// upgrade::write_one(&mut sock, "some handshake data").await?;
39+
/// upgrade::write_length_prefixed(&mut sock, "some handshake data").await?;
40+
/// sock.close().await?;
3941
/// } else {
40-
/// let handshake_data = upgrade::read_one(&mut sock, 1024).await?;
42+
/// let handshake_data = upgrade::read_length_prefixed(&mut sock, 1024).await?;
4143
/// if handshake_data != b"some handshake data" {
42-
/// return Err(upgrade::ReadOneError::from(io::Error::from(io::ErrorKind::Other)));
44+
/// return Err(io::Error::new(io::ErrorKind::Other, "bad handshake"));
4345
/// }
4446
/// }
4547
/// Ok(sock)

core/src/upgrade/transfer.rs

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,27 @@ use std::{error, fmt, io};
2525

2626
// TODO: these methods could be on an Ext trait to AsyncWrite
2727

28+
/// Writes a message to the given socket with a length prefix appended to it. Also flushes the socket.
29+
///
30+
/// > **Note**: Prepends a variable-length prefix indicate the length of the message. This is
31+
/// > compatible with what [`read_length_prefixed`] expects.
32+
pub async fn write_length_prefixed(socket: &mut (impl AsyncWrite + Unpin), data: impl AsRef<[u8]>)
33+
-> Result<(), io::Error>
34+
{
35+
write_varint(socket, data.as_ref().len()).await?;
36+
socket.write_all(data.as_ref()).await?;
37+
socket.flush().await?;
38+
39+
Ok(())
40+
}
41+
2842
/// Send a message to the given socket, then shuts down the writing side.
2943
///
3044
/// > **Note**: Prepends a variable-length prefix indicate the length of the message. This is
3145
/// > compatible with what `read_one` expects.
46+
///
47+
#[deprecated(since = "0.29.0", note = "Use `write_length_prefixed` instead. You will need to manually close the stream using `socket.close().await`.")]
48+
#[allow(dead_code)]
3249
pub async fn write_one(socket: &mut (impl AsyncWrite + Unpin), data: impl AsRef<[u8]>)
3350
-> Result<(), io::Error>
3451
{
@@ -42,6 +59,8 @@ pub async fn write_one(socket: &mut (impl AsyncWrite + Unpin), data: impl AsRef<
4259
///
4360
/// > **Note**: Prepends a variable-length prefix indicate the length of the message. This is
4461
/// > compatible with what `read_one` expects.
62+
#[deprecated(since = "0.29.0", note = "Use `write_length_prefixed` instead.")]
63+
#[allow(dead_code)]
4564
pub async fn write_with_len_prefix(socket: &mut (impl AsyncWrite + Unpin), data: impl AsRef<[u8]>)
4665
-> Result<(), io::Error>
4766
{
@@ -60,6 +79,7 @@ pub async fn write_varint(socket: &mut (impl AsyncWrite + Unpin), len: usize)
6079
let mut len_data = unsigned_varint::encode::usize_buffer();
6180
let encoded_len = unsigned_varint::encode::usize(len, &mut len_data).len();
6281
socket.write_all(&len_data[..encoded_len]).await?;
82+
6383
Ok(())
6484
}
6585

@@ -106,6 +126,27 @@ pub async fn read_varint(socket: &mut (impl AsyncRead + Unpin)) -> Result<usize,
106126
}
107127
}
108128

129+
/// Reads a length-prefixed message from the given socket.
130+
///
131+
/// The `max_size` parameter is the maximum size in bytes of the message that we accept. This is
132+
/// necessary in order to avoid DoS attacks where the remote sends us a message of several
133+
/// gigabytes.
134+
///
135+
/// > **Note**: Assumes that a variable-length prefix indicates the length of the message. This is
136+
/// > compatible with what [`write_length_prefixed`] does.
137+
pub async fn read_length_prefixed(socket: &mut (impl AsyncRead + Unpin), max_size: usize) -> io::Result<Vec<u8>>
138+
{
139+
let len = read_varint(socket).await?;
140+
if len > max_size {
141+
return Err(io::Error::new(io::ErrorKind::InvalidData, format!("Received data size ({} bytes) exceeds maximum ({} bytes)", len, max_size)))
142+
}
143+
144+
let mut buf = vec![0; len];
145+
socket.read_exact(&mut buf).await?;
146+
147+
Ok(buf)
148+
}
149+
109150
/// Reads a length-prefixed message from the given socket.
110151
///
111152
/// The `max_size` parameter is the maximum size in bytes of the message that we accept. This is
@@ -114,6 +155,8 @@ pub async fn read_varint(socket: &mut (impl AsyncRead + Unpin)) -> Result<usize,
114155
///
115156
/// > **Note**: Assumes that a variable-length prefix indicates the length of the message. This is
116157
/// > compatible with what `write_one` does.
158+
#[deprecated(since = "0.29.0", note = "Use `read_length_prefixed` instead.")]
159+
#[allow(dead_code, deprecated)]
117160
pub async fn read_one(socket: &mut (impl AsyncRead + Unpin), max_size: usize)
118161
-> Result<Vec<u8>, ReadOneError>
119162
{
@@ -132,6 +175,7 @@ pub async fn read_one(socket: &mut (impl AsyncRead + Unpin), max_size: usize)
132175

133176
/// Error while reading one message.
134177
#[derive(Debug)]
178+
#[deprecated(since = "0.29.0", note = "Use `read_length_prefixed` instead of `read_one` to avoid depending on this type.")]
135179
pub enum ReadOneError {
136180
/// Error on the socket.
137181
Io(std::io::Error),
@@ -144,12 +188,14 @@ pub enum ReadOneError {
144188
},
145189
}
146190

191+
#[allow(deprecated)]
147192
impl From<std::io::Error> for ReadOneError {
148193
fn from(err: std::io::Error) -> ReadOneError {
149194
ReadOneError::Io(err)
150195
}
151196
}
152197

198+
#[allow(deprecated)]
153199
impl fmt::Display for ReadOneError {
154200
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155201
match *self {
@@ -159,6 +205,7 @@ impl fmt::Display for ReadOneError {
159205
}
160206
}
161207

208+
#[allow(deprecated)]
162209
impl error::Error for ReadOneError {
163210
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
164211
match *self {
@@ -173,15 +220,18 @@ mod tests {
173220
use super::*;
174221

175222
#[test]
176-
fn write_one_works() {
223+
fn write_length_prefixed_works() {
177224
let data = (0..rand::random::<usize>() % 10_000)
178225
.map(|_| rand::random::<u8>())
179226
.collect::<Vec<_>>();
180-
181227
let mut out = vec![0; 10_000];
182-
futures::executor::block_on(
183-
write_one(&mut futures::io::Cursor::new(&mut out[..]), data.clone())
184-
).unwrap();
228+
229+
futures::executor::block_on(async {
230+
let mut socket = futures::io::Cursor::new(&mut out[..]);
231+
232+
write_length_prefixed(&mut socket, &data).await.unwrap();
233+
socket.close().await.unwrap();
234+
});
185235

186236
let (out_len, out_data) = unsigned_varint::decode::usize(&out).unwrap();
187237
assert_eq!(out_len, data.len());

protocols/floodsub/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
- Update dependencies.
44

5+
- Change `FloodsubDecodeError::ReadError` from a `upgrade::ReadOneError` to
6+
`std::io::Error`. See [PR 2111].
7+
8+
[PR 2111]: https://github.com/libp2p/rust-libp2p/pull/2111
9+
510
# 0.29.0 [2021-04-13]
611

712
- Update `libp2p-swarm`.

protocols/floodsub/src/protocol.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::topic::Topic;
2323
use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, PeerId, upgrade};
2424
use prost::Message;
2525
use std::{error, fmt, io, iter, pin::Pin};
26-
use futures::{Future, io::{AsyncRead, AsyncWrite}};
26+
use futures::{Future, io::{AsyncRead, AsyncWrite}, AsyncWriteExt};
2727

2828
/// Implementation of `ConnectionUpgrade` for the floodsub protocol.
2929
#[derive(Debug, Clone, Default)]
@@ -55,7 +55,7 @@ where
5555

5656
fn upgrade_inbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future {
5757
Box::pin(async move {
58-
let packet = upgrade::read_one(&mut socket, 2048).await?;
58+
let packet = upgrade::read_length_prefixed(&mut socket, 2048).await?;
5959
let rpc = rpc_proto::Rpc::decode(&packet[..])?;
6060

6161
let mut messages = Vec::with_capacity(rpc.publish.len());
@@ -95,15 +95,15 @@ where
9595
#[derive(Debug)]
9696
pub enum FloodsubDecodeError {
9797
/// Error when reading the packet from the socket.
98-
ReadError(upgrade::ReadOneError),
98+
ReadError(io::Error),
9999
/// Error when decoding the raw buffer into a protobuf.
100100
ProtobufError(prost::DecodeError),
101101
/// Error when parsing the `PeerId` in the message.
102102
InvalidPeerId,
103103
}
104104

105-
impl From<upgrade::ReadOneError> for FloodsubDecodeError {
106-
fn from(err: upgrade::ReadOneError) -> Self {
105+
impl From<io::Error> for FloodsubDecodeError {
106+
fn from(err: io::Error) -> Self {
107107
FloodsubDecodeError::ReadError(err)
108108
}
109109
}
@@ -166,7 +166,10 @@ where
166166
fn upgrade_outbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future {
167167
Box::pin(async move {
168168
let bytes = self.into_bytes();
169-
upgrade::write_one(&mut socket, bytes).await?;
169+
170+
upgrade::write_length_prefixed(&mut socket, bytes).await?;
171+
socket.close().await?;
172+
170173
Ok(())
171174
})
172175
}

protocols/identify/src/protocol.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,11 @@ where
186186

187187
let mut bytes = Vec::with_capacity(message.encoded_len());
188188
message.encode(&mut bytes).expect("Vec<u8> provides capacity as needed");
189-
upgrade::write_one(&mut io, &bytes).await
189+
190+
upgrade::write_length_prefixed(&mut io, bytes).await?;
191+
io.close().await?;
192+
193+
Ok(())
190194
}
191195

192196
async fn recv<T>(mut socket: T) -> io::Result<IdentifyInfo>
@@ -195,7 +199,7 @@ where
195199
{
196200
socket.close().await?;
197201

198-
let msg = upgrade::read_one(&mut socket, 4096)
202+
let msg = upgrade::read_length_prefixed(&mut socket, 4096)
199203
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
200204
.await?;
201205

protocols/request-response/tests/ping.rs

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@ use libp2p_core::{
2727
identity,
2828
muxing::StreamMuxerBox,
2929
transport::{self, Transport},
30-
upgrade::{self, read_one, write_one}
30+
upgrade::{self, read_length_prefixed, write_length_prefixed}
3131
};
3232
use libp2p_noise::{NoiseConfig, X25519Spec, Keypair};
3333
use libp2p_request_response::*;
3434
use libp2p_swarm::{Swarm, SwarmEvent};
3535
use libp2p_tcp::TcpConfig;
36-
use futures::{channel::mpsc, executor::LocalPool, prelude::*, task::SpawnExt};
36+
use futures::{channel::mpsc, executor::LocalPool, prelude::*, task::SpawnExt, AsyncWriteExt};
3737
use rand::{self, Rng};
3838
use std::{io, iter};
3939
use std::{collections::HashSet, num::NonZeroU16};
@@ -421,42 +421,48 @@ impl RequestResponseCodec for PingCodec {
421421
where
422422
T: AsyncRead + Unpin + Send
423423
{
424-
read_one(io, 1024)
425-
.map(|res| match res {
426-
Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)),
427-
Ok(vec) if vec.is_empty() => Err(io::ErrorKind::UnexpectedEof.into()),
428-
Ok(vec) => Ok(Ping(vec))
429-
})
430-
.await
424+
let vec = read_length_prefixed(io, 1024).await?;
425+
426+
if vec.is_empty() {
427+
return Err(io::ErrorKind::UnexpectedEof.into())
428+
}
429+
430+
Ok(Ping(vec))
431431
}
432432

433433
async fn read_response<T>(&mut self, _: &PingProtocol, io: &mut T)
434434
-> io::Result<Self::Response>
435435
where
436436
T: AsyncRead + Unpin + Send
437437
{
438-
read_one(io, 1024)
439-
.map(|res| match res {
440-
Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)),
441-
Ok(vec) if vec.is_empty() => Err(io::ErrorKind::UnexpectedEof.into()),
442-
Ok(vec) => Ok(Pong(vec))
443-
})
444-
.await
438+
let vec = read_length_prefixed(io, 1024).await?;
439+
440+
if vec.is_empty() {
441+
return Err(io::ErrorKind::UnexpectedEof.into())
442+
}
443+
444+
Ok(Pong(vec))
445445
}
446446

447447
async fn write_request<T>(&mut self, _: &PingProtocol, io: &mut T, Ping(data): Ping)
448448
-> io::Result<()>
449449
where
450450
T: AsyncWrite + Unpin + Send
451451
{
452-
write_one(io, data).await
452+
write_length_prefixed(io, data).await?;
453+
io.close().await?;
454+
455+
Ok(())
453456
}
454457

455458
async fn write_response<T>(&mut self, _: &PingProtocol, io: &mut T, Pong(data): Pong)
456459
-> io::Result<()>
457460
where
458461
T: AsyncWrite + Unpin + Send
459462
{
460-
write_one(io, data).await
463+
write_length_prefixed(io, data).await?;
464+
io.close().await?;
465+
466+
Ok(())
461467
}
462468
}

0 commit comments

Comments
 (0)