Skip to content

Commit 3e31ea9

Browse files
authored
[multistream-select] Fix panic with V1Lazy (regression) and more convenient transport boxing. (#1783)
* [multistream-select] Fix panic with V1Lazy and add integration tests. Fixes a panic when using the `V1Lazy` negotiation protocol, a regression introduced in #1484. Thereby adds integration tests for a transport upgrade with both `V1` and `V1Lazy` to the `multistream-select` crate to prevent future regressions. * Cleanup. * Update changelog.
1 parent 2a5c183 commit 3e31ea9

File tree

13 files changed

+193
-78
lines changed

13 files changed

+193
-78
lines changed

core/src/transport.rs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,13 @@
2525
//! any desired protocols. The rest of the module defines combinators for
2626
//! modifying a transport through composition with other transports or protocol upgrades.
2727
28-
use crate::ConnectedPoint;
28+
use crate::{ConnectedPoint, ConnectionInfo, muxing::{StreamMuxer, StreamMuxerBox}};
2929
use futures::prelude::*;
3030
use multiaddr::Multiaddr;
3131
use std::{error::Error, fmt};
3232
use std::time::Duration;
3333

3434
pub mod and_then;
35-
pub mod boxed;
3635
pub mod choice;
3736
pub mod dummy;
3837
pub mod map;
@@ -41,8 +40,10 @@ pub mod memory;
4140
pub mod timeout;
4241
pub mod upgrade;
4342

43+
mod boxed;
4444
mod optional;
4545

46+
pub use self::boxed::Boxed;
4647
pub use self::choice::OrTransport;
4748
pub use self::memory::MemoryTransport;
4849
pub use self::optional::OptionalTransport;
@@ -128,14 +129,24 @@ pub trait Transport {
128129
where
129130
Self: Sized;
130131

131-
/// Turns the transport into an abstract boxed (i.e. heap-allocated) transport.
132-
fn boxed(self) -> boxed::Boxed<Self::Output, Self::Error>
133-
where Self: Sized + Clone + Send + Sync + 'static,
134-
Self::Dial: Send + 'static,
135-
Self::Listener: Send + 'static,
136-
Self::ListenerUpgrade: Send + 'static,
132+
/// Boxes an authenticated, multiplexed transport, including the
133+
/// `StreamMuxer` and transport errors.
134+
fn boxed<I, M>(self) -> boxed::Boxed<(I, StreamMuxerBox), std::io::Error>
135+
where
136+
Self: Transport<Output = (I, M)> + Sized + Clone + Send + Sync + 'static,
137+
Self::Dial: Send + 'static,
138+
Self::Listener: Send + 'static,
139+
Self::ListenerUpgrade: Send + 'static,
140+
Self::Error: Send + Sync,
141+
I: ConnectionInfo,
142+
M: StreamMuxer + Send + Sync + 'static,
143+
M::Substream: Send + 'static,
144+
M::OutboundSubstream: Send + 'static
145+
137146
{
138-
boxed::boxed(self)
147+
boxed::boxed(
148+
self.map(|(i, m), _| (i, StreamMuxerBox::new(m)))
149+
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)))
139150
}
140151

141152
/// Applies a function on the connections created by the transport.

core/src/transport/boxed.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use multiaddr::Multiaddr;
2424
use std::{error, fmt, pin::Pin, sync::Arc};
2525

2626
/// See the `Transport::boxed` method.
27-
#[inline]
2827
pub fn boxed<T>(transport: T) -> Boxed<T::Output, T::Error>
2928
where
3029
T: Transport + Clone + Send + Sync + 'static,
@@ -37,9 +36,14 @@ where
3736
}
3837
}
3938

40-
pub type Dial<O, E> = Pin<Box<dyn Future<Output = Result<O, E>> + Send>>;
41-
pub type Listener<O, E> = Pin<Box<dyn Stream<Item = Result<ListenerEvent<ListenerUpgrade<O, E>, E>, E>> + Send>>;
42-
pub type ListenerUpgrade<O, E> = Pin<Box<dyn Future<Output = Result<O, E>> + Send>>;
39+
/// See the `Transport::boxed` method.
40+
pub struct Boxed<O, E> {
41+
inner: Arc<dyn Abstract<O, E> + Send + Sync>,
42+
}
43+
44+
type Dial<O, E> = Pin<Box<dyn Future<Output = Result<O, E>> + Send>>;
45+
type Listener<O, E> = Pin<Box<dyn Stream<Item = Result<ListenerEvent<ListenerUpgrade<O, E>, E>, E>> + Send>>;
46+
type ListenerUpgrade<O, E> = Pin<Box<dyn Future<Output = Result<O, E>> + Send>>;
4347

4448
trait Abstract<O, E> {
4549
fn listen_on(&self, addr: Multiaddr) -> Result<Listener<O, E>, TransportError<E>>;
@@ -68,11 +72,6 @@ where
6872
}
6973
}
7074

71-
/// See the `Transport::boxed` method.
72-
pub struct Boxed<O, E> {
73-
inner: Arc<dyn Abstract<O, E> + Send + Sync>,
74-
}
75-
7675
impl<O, E> fmt::Debug for Boxed<O, E> {
7776
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
7877
write!(f, "BoxedTransport")

core/tests/network_dial_error.rs

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,22 +36,11 @@ use libp2p_core::{
3636
use libp2p_noise as noise;
3737
use rand::Rng;
3838
use rand::seq::SliceRandom;
39-
use std::{io, error::Error, fmt, task::Poll};
39+
use std::{io, task::Poll};
4040
use util::TestHandler;
4141

4242
type TestNetwork = Network<TestTransport, (), (), TestHandler>;
43-
type TestTransport = transport::boxed::Boxed<(PeerId, StreamMuxerBox), BoxError>;
44-
45-
#[derive(Debug)]
46-
struct BoxError(Box<dyn Error + Send + 'static>);
47-
48-
impl Error for BoxError {}
49-
50-
impl fmt::Display for BoxError {
51-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52-
write!(f, "Transport error: {}", self.0)
53-
}
54-
}
43+
type TestTransport = transport::Boxed<(PeerId, StreamMuxerBox), io::Error>;
5544

5645
fn new_network(cfg: NetworkConfig) -> TestNetwork {
5746
let local_key = identity::Keypair::generate_ed25519();
@@ -61,13 +50,11 @@ fn new_network(cfg: NetworkConfig) -> TestNetwork {
6150
.upgrade(upgrade::Version::V1)
6251
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
6352
.multiplex(libp2p_mplex::MplexConfig::new())
64-
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)))
6553
.and_then(|(peer, mplex), _| {
6654
// Gracefully close the connection to allow protocol
6755
// negotiation to complete.
6856
util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex))
6957
})
70-
.map_err(|e| BoxError(Box::new(e)))
7158
.boxed();
7259
TestNetwork::new(transport, local_public_key.into(), cfg)
7360
}

misc/multistream-select/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# 0.8.3 [unreleased]
22

3+
- Fix a regression resulting in a panic with the `V1Lazy` protocol.
4+
[PR 1783](https://github.com/libp2p/rust-libp2p/pull/1783).
5+
36
- Fix a potential deadlock during protocol negotiation due
47
to a missing flush, potentially resulting in sporadic protocol
58
upgrade timeouts.

misc/multistream-select/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ unsigned-varint = "0.5"
1919

2020
[dev-dependencies]
2121
async-std = "1.6.2"
22+
env_logger = "*"
23+
libp2p-core = { path = "../../core" }
24+
libp2p-mplex = { path = "../../muxers/mplex" }
25+
libp2p-plaintext = { path = "../../protocols/plaintext" }
2226
quickcheck = "0.9.0"
2327
rand = "0.7.2"
2428
rw-stream-sink = "0.2.1"

misc/multistream-select/src/negotiated.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ impl<TInner> Negotiated<TInner> {
127127

128128
if let Message::Header(v) = &msg {
129129
if *v == version {
130+
*this.state = State::Expecting { io, protocol, version };
130131
continue
131132
}
132133
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Copyright 2020 Parity Technologies (UK) Ltd.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
use libp2p_core::{
22+
connection::{ConnectionHandler, ConnectionHandlerEvent, Substream, SubstreamEndpoint},
23+
identity,
24+
muxing::StreamMuxerBox,
25+
upgrade,
26+
multiaddr::Protocol,
27+
Multiaddr,
28+
Network,
29+
network::{NetworkEvent, NetworkConfig},
30+
PeerId,
31+
Transport,
32+
transport::{self, MemoryTransport}
33+
};
34+
use libp2p_mplex::MplexConfig;
35+
use libp2p_plaintext::PlainText2Config;
36+
use futures::{channel::oneshot, ready, prelude::*};
37+
use rand::random;
38+
use std::{io, task::{Context, Poll}};
39+
40+
type TestTransport = transport::Boxed<(PeerId, StreamMuxerBox), io::Error>;
41+
type TestNetwork = Network<TestTransport, (), (), TestHandler>;
42+
43+
fn mk_transport(up: upgrade::Version) -> (PeerId, TestTransport) {
44+
let keys = identity::Keypair::generate_ed25519();
45+
let id = keys.public().into_peer_id();
46+
(id, MemoryTransport::default()
47+
.upgrade(up)
48+
.authenticate(PlainText2Config { local_public_key: keys.public() })
49+
.multiplex(MplexConfig::default())
50+
.boxed())
51+
}
52+
53+
/// Tests the transport upgrade process with all supported
54+
/// upgrade protocol versions.
55+
#[test]
56+
fn transport_upgrade() {
57+
let _ = env_logger::try_init();
58+
59+
fn run(up: upgrade::Version) {
60+
let (dialer_id, dialer_transport) = mk_transport(up);
61+
let (listener_id, listener_transport) = mk_transport(up);
62+
63+
let listen_addr = Multiaddr::from(Protocol::Memory(random::<u64>()));
64+
65+
let mut dialer = TestNetwork::new(dialer_transport, dialer_id, NetworkConfig::default());
66+
let mut listener = TestNetwork::new(listener_transport, listener_id, NetworkConfig::default());
67+
68+
listener.listen_on(listen_addr).unwrap();
69+
let (addr_sender, addr_receiver) = oneshot::channel();
70+
71+
let client = async move {
72+
let addr = addr_receiver.await.unwrap();
73+
dialer.dial(&addr, TestHandler()).unwrap();
74+
futures::future::poll_fn(move |cx| {
75+
loop {
76+
match ready!(dialer.poll(cx)) {
77+
NetworkEvent::ConnectionEstablished { .. } => {
78+
return Poll::Ready(())
79+
}
80+
_ => {}
81+
}
82+
}
83+
}).await
84+
};
85+
86+
let mut addr_sender = Some(addr_sender);
87+
let server = futures::future::poll_fn(move |cx| {
88+
loop {
89+
match ready!(listener.poll(cx)) {
90+
NetworkEvent::NewListenerAddress { listen_addr, .. } => {
91+
addr_sender.take().unwrap().send(listen_addr).unwrap();
92+
}
93+
NetworkEvent::IncomingConnection { connection, .. } => {
94+
listener.accept(connection, TestHandler()).unwrap();
95+
}
96+
NetworkEvent::ConnectionEstablished { .. } => {
97+
return Poll::Ready(())
98+
}
99+
_ => {}
100+
}
101+
}
102+
});
103+
104+
async_std::task::block_on(future::select(Box::pin(server), Box::pin(client)));
105+
}
106+
107+
run(upgrade::Version::V1);
108+
run(upgrade::Version::V1Lazy);
109+
}
110+
111+
struct TestHandler();
112+
113+
impl ConnectionHandler for TestHandler {
114+
type InEvent = ();
115+
type OutEvent = ();
116+
type Error = io::Error;
117+
type Substream = Substream<StreamMuxerBox>;
118+
type OutboundOpenInfo = ();
119+
120+
fn inject_substream(&mut self, _: Self::Substream, _: SubstreamEndpoint<Self::OutboundOpenInfo>)
121+
{}
122+
123+
fn inject_event(&mut self, _: Self::InEvent)
124+
{}
125+
126+
fn inject_address_change(&mut self, _: &Multiaddr)
127+
{}
128+
129+
fn poll(&mut self, _: &mut Context<'_>)
130+
-> Poll<Result<ConnectionHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>, Self::Error>>
131+
{
132+
Poll::Pending
133+
}
134+
}

protocols/gossipsub/tests/smoke.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,13 @@ use log::debug;
2323
use quickcheck::{QuickCheck, TestResult};
2424
use rand::{random, seq::SliceRandom, SeedableRng};
2525
use std::{
26-
io::Error,
2726
pin::Pin,
2827
task::{Context, Poll},
2928
time::Duration,
3029
};
3130

3231
use libp2p_core::{
33-
identity, multiaddr::Protocol, muxing::StreamMuxerBox, transport::MemoryTransport, upgrade,
32+
identity, multiaddr::Protocol, transport::MemoryTransport, upgrade,
3433
Multiaddr, Transport,
3534
};
3635
use libp2p_gossipsub::{
@@ -151,10 +150,7 @@ fn build_node() -> (Multiaddr, Swarm<Gossipsub>) {
151150
.authenticate(PlainText2Config {
152151
local_public_key: public_key.clone(),
153152
})
154-
.multiplex(yamux::Config::default())
155-
.map(|(p, m), _| (p, StreamMuxerBox::new(m)))
156-
.map_err(|e| -> Error { panic!("Failed to create transport: {:?}", e) })
157-
.boxed();
153+
.multiplex(yamux::Config::default());
158154

159155
let peer_id = public_key.clone().into_peer_id();
160156

protocols/kad/src/behaviour/test.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,14 @@ use libp2p_core::{
3838
identity,
3939
transport::MemoryTransport,
4040
multiaddr::{Protocol, Multiaddr, multiaddr},
41-
muxing::StreamMuxerBox,
4241
upgrade
4342
};
4443
use libp2p_noise as noise;
4544
use libp2p_swarm::Swarm;
4645
use libp2p_yamux as yamux;
4746
use quickcheck::*;
4847
use rand::{Rng, random, thread_rng, rngs::StdRng, SeedableRng};
49-
use std::{collections::{HashSet, HashMap}, time::Duration, io, num::NonZeroUsize, u64};
48+
use std::{collections::{HashSet, HashMap}, time::Duration, num::NonZeroUsize, u64};
5049
use multihash::{wrap, Code, Multihash};
5150

5251
type TestSwarm = Swarm<Kademlia<MemoryStore>>;
@@ -62,10 +61,7 @@ fn build_node_with_config(cfg: KademliaConfig) -> (Multiaddr, TestSwarm) {
6261
let transport = MemoryTransport::default()
6362
.upgrade(upgrade::Version::V1)
6463
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
65-
.multiplex(yamux::Config::default())
66-
.map(|(p, m), _| (p, StreamMuxerBox::new(m)))
67-
.map_err(|e| -> io::Error { panic!("Failed to create transport: {:?}", e); })
68-
.boxed();
64+
.multiplex(yamux::Config::default());
6965

7066
let local_id = local_public_key.clone().into_peer_id();
7167
let store = MemoryStore::new(local_id.clone());

protocols/ping/tests/ping.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use libp2p_core::{
2525
PeerId,
2626
identity,
2727
muxing::StreamMuxerBox,
28-
transport::{Transport, boxed::Boxed},
28+
transport::{self, Transport},
2929
upgrade
3030
};
3131
use libp2p_mplex as mplex;
@@ -196,16 +196,15 @@ fn max_failures() {
196196

197197
fn mk_transport(muxer: MuxerChoice) -> (
198198
PeerId,
199-
Boxed<
199+
transport::Boxed<
200200
(PeerId, StreamMuxerBox),
201201
io::Error
202202
>
203203
) {
204204
let id_keys = identity::Keypair::generate_ed25519();
205205
let peer_id = id_keys.public().into_peer_id();
206206
let noise_keys = noise::Keypair::<noise::X25519Spec>::new().into_authentic(&id_keys).unwrap();
207-
208-
let transport = TcpConfig::new()
207+
(peer_id, TcpConfig::new()
209208
.nodelay(true)
210209
.upgrade(upgrade::Version::V1)
211210
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
@@ -215,11 +214,7 @@ fn mk_transport(muxer: MuxerChoice) -> (
215214
MuxerChoice::Mplex =>
216215
upgrade::EitherUpgrade::B(mplex::MplexConfig::default()),
217216
})
218-
.map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer)))
219-
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
220-
.boxed();
221-
222-
(peer_id, transport)
217+
.boxed())
223218
}
224219

225220
#[derive(Debug, Copy, Clone)]

0 commit comments

Comments
 (0)