Skip to content

Commit 85bc539

Browse files
committed
feat(swarm): smart dialing
1 parent b0edc68 commit 85bc539

File tree

9 files changed

+700
-41
lines changed

9 files changed

+700
-41
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ libp2p-rendezvous = { version = "0.17.0", path = "protocols/rendezvous" }
102102
libp2p-request-response = { version = "0.29.0", path = "protocols/request-response" }
103103
libp2p-server = { version = "0.12.7", path = "misc/server" }
104104
libp2p-stream = { version = "0.4.0-alpha", path = "protocols/stream" }
105-
libp2p-swarm = { version = "0.47.0", path = "swarm" }
105+
libp2p-swarm = { version = "0.47.1", path = "swarm" }
106106
libp2p-swarm-derive = { version = "=0.35.1", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required.
107107
libp2p-swarm-test = { version = "0.6.0", path = "swarm-test" }
108108
libp2p-tcp = { version = "0.44.0", path = "transports/tcp" }

swarm/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 0.47.1
2+
3+
- Add smart dialing support.
4+
15
## 0.47.0
26

37
- Remove `async-std` support.

swarm/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name = "libp2p-swarm"
33
edition.workspace = true
44
rust-version = { workspace = true }
55
description = "The libp2p swarm"
6-
version = "0.47.0"
6+
version = "0.47.1"
77
authors = ["Parity Technologies <[email protected]>"]
88
license = "MIT"
99
repository = "https://github.com/libp2p/rust-libp2p"

swarm/src/connection/pool.rs

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,23 @@
1818
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
1919
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
2020
// DEALINGS IN THE SOFTWARE.
21+
2122
use std::{
2223
collections::HashMap,
2324
convert::Infallible,
2425
fmt,
2526
num::{NonZeroU8, NonZeroUsize},
2627
pin::Pin,
28+
sync::Arc,
2729
task::{Context, Poll, Waker},
2830
};
2931

30-
use concurrent_dial::ConcurrentDial;
32+
use concurrent_dial::{ConcurrentDial, Dial};
33+
use dial_ranker::DialRanker;
3134
use fnv::FnvHashMap;
3235
use futures::{
3336
channel::{mpsc, oneshot},
34-
future::{poll_fn, BoxFuture, Either},
37+
future::{poll_fn, Either},
3538
prelude::*,
3639
ready,
3740
stream::{FuturesUnordered, SelectAll},
@@ -44,16 +47,16 @@ use libp2p_core::{
4447
use tracing::Instrument;
4548
use web_time::{Duration, Instant};
4649

50+
use super::{
51+
Connected, Connection, ConnectionError, ConnectionId, IncomingInfo,
52+
PendingInboundConnectionError, PendingOutboundConnectionError, PendingPoint,
53+
};
4754
use crate::{
48-
connection::{
49-
Connected, Connection, ConnectionError, ConnectionId, IncomingInfo,
50-
PendingInboundConnectionError, PendingOutboundConnectionError, PendingPoint,
51-
},
52-
transport::TransportError,
53-
ConnectedPoint, ConnectionHandler, Executor, Multiaddr, PeerId,
55+
transport::TransportError, ConnectedPoint, ConnectionHandler, Executor, Multiaddr, PeerId,
5456
};
5557

5658
mod concurrent_dial;
59+
pub(crate) mod dial_ranker;
5760
mod task;
5861

5962
enum ExecSwitch {
@@ -142,6 +145,9 @@ where
142145

143146
/// How long a connection should be kept alive once it starts idling.
144147
idle_connection_timeout: Duration,
148+
149+
/// Ranker that determines the ranking of outgoing connection attempts.
150+
dial_ranker: Option<Arc<DialRanker>>,
145151
}
146152

147153
#[derive(Debug)]
@@ -333,6 +339,7 @@ where
333339
no_established_connections_waker: None,
334340
established_connection_events: Default::default(),
335341
new_connection_dropped_listeners: Default::default(),
342+
dial_ranker: config.dial_ranker,
336343
}
337344
}
338345

@@ -413,15 +420,7 @@ where
413420
/// that establishes and negotiates the connection.
414421
pub(crate) fn add_outgoing(
415422
&mut self,
416-
dials: Vec<
417-
BoxFuture<
418-
'static,
419-
(
420-
Multiaddr,
421-
Result<(PeerId, StreamMuxerBox), TransportError<std::io::Error>>,
422-
),
423-
>,
424-
>,
423+
dials: Vec<(Multiaddr, Dial)>,
425424
peer: Option<PeerId>,
426425
role_override: Endpoint,
427426
port_use: PortUse,
@@ -438,7 +437,7 @@ where
438437
self.executor.spawn(
439438
task::new_for_pending_outgoing_connection(
440439
connection_id,
441-
ConcurrentDial::new(dials, concurrency_factor),
440+
ConcurrentDial::new(dials, concurrency_factor, self.dial_ranker.clone()),
442441
abort_receiver,
443442
self.pending_connection_events_tx.clone(),
444443
)
@@ -979,6 +978,8 @@ pub(crate) struct PoolConfig {
979978
pub(crate) per_connection_event_buffer_size: usize,
980979
/// Number of addresses concurrently dialed for a single outbound connection attempt.
981980
pub(crate) dial_concurrency_factor: NonZeroU8,
981+
/// Ranker that determines the ranking of outgoing connection attempts.
982+
pub(crate) dial_ranker: Option<Arc<DialRanker>>,
982983
/// How long a connection should be kept alive once it is idling.
983984
pub(crate) idle_connection_timeout: Duration,
984985
/// The configured override for substream protocol upgrades, if any.
@@ -1000,6 +1001,7 @@ impl PoolConfig {
10001001
idle_connection_timeout: Duration::from_secs(10),
10011002
substream_upgrade_protocol_override: None,
10021003
max_negotiating_inbound_streams: 128,
1004+
dial_ranker: None,
10031005
}
10041006
}
10051007

swarm/src/connection/pool/concurrent_dial.rs

Lines changed: 54 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,28 @@
1919
// DEALINGS IN THE SOFTWARE.
2020

2121
use std::{
22+
collections::HashMap,
2223
num::NonZeroU8,
2324
pin::Pin,
25+
sync::Arc,
2426
task::{Context, Poll},
27+
time::Duration,
2528
};
2629

2730
use futures::{
2831
future::{BoxFuture, Future},
2932
ready,
3033
stream::{FuturesUnordered, StreamExt},
34+
FutureExt,
3135
};
36+
use futures_timer::Delay;
3237
use libp2p_core::muxing::StreamMuxerBox;
3338
use libp2p_identity::PeerId;
3439

40+
use super::DialRanker;
3541
use crate::{transport::TransportError, Multiaddr};
3642

37-
type Dial = BoxFuture<
43+
pub(crate) type Dial = BoxFuture<
3844
'static,
3945
(
4046
Multiaddr,
@@ -43,29 +49,56 @@ type Dial = BoxFuture<
4349
>;
4450

4551
pub(crate) struct ConcurrentDial {
52+
concurrency_factor: NonZeroU8,
4653
dials: FuturesUnordered<Dial>,
47-
pending_dials: Box<dyn Iterator<Item = Dial> + Send>,
54+
pending_dials: Box<dyn Iterator<Item = (Multiaddr, Option<Duration>, Dial)> + Send>,
4855
errors: Vec<(Multiaddr, TransportError<std::io::Error>)>,
4956
}
5057

5158
impl Unpin for ConcurrentDial {}
5259

5360
impl ConcurrentDial {
54-
pub(crate) fn new(pending_dials: Vec<Dial>, concurrency_factor: NonZeroU8) -> Self {
55-
let mut pending_dials = pending_dials.into_iter();
56-
61+
pub(crate) fn new(
62+
pending_dials: Vec<(Multiaddr, Dial)>,
63+
concurrency_factor: NonZeroU8,
64+
dial_ranker: Option<Arc<DialRanker>>,
65+
) -> Self {
5766
let dials = FuturesUnordered::new();
58-
for dial in pending_dials.by_ref() {
59-
dials.push(dial);
60-
if dials.len() == concurrency_factor.get() as usize {
61-
break;
62-
}
63-
}
64-
67+
let pending_dials: Vec<_> = if let Some(dial_ranker) = dial_ranker {
68+
let addresses = pending_dials.iter().map(|(k, _)| k.clone()).collect();
69+
let mut dials: HashMap<Multiaddr, Dial> = HashMap::from_iter(pending_dials);
70+
dial_ranker(addresses)
71+
.into_iter()
72+
.filter_map(|(addr, delay)| dials.remove(&addr).map(|dial| (addr, delay, dial)))
73+
.collect()
74+
} else {
75+
pending_dials
76+
.into_iter()
77+
.map(|(addr, dial)| (addr, None, dial))
78+
.collect()
79+
};
6580
Self {
81+
concurrency_factor,
6682
dials,
6783
errors: Default::default(),
68-
pending_dials: Box::new(pending_dials),
84+
pending_dials: Box::new(pending_dials.into_iter()),
85+
}
86+
}
87+
88+
fn dial_pending(&mut self) -> bool {
89+
if let Some((_, delay, dial)) = self.pending_dials.next() {
90+
self.dials.push(
91+
async move {
92+
if let Some(delay) = delay {
93+
Delay::new(delay).await;
94+
}
95+
dial.await
96+
}
97+
.boxed(),
98+
);
99+
true
100+
} else {
101+
false
69102
}
70103
}
71104
}
@@ -92,12 +125,16 @@ impl Future for ConcurrentDial {
92125
}
93126
Some((addr, Err(e))) => {
94127
self.errors.push((addr, e));
95-
if let Some(dial) = self.pending_dials.next() {
96-
self.dials.push(dial)
97-
}
128+
self.dial_pending();
98129
}
99130
None => {
100-
return Poll::Ready(Err(std::mem::take(&mut self.errors)));
131+
while self.dials.len() < self.concurrency_factor.get() as usize
132+
&& self.dial_pending()
133+
{}
134+
135+
if self.dials.is_empty() {
136+
return Poll::Ready(Err(std::mem::take(&mut self.errors)));
137+
}
101138
}
102139
}
103140
}

0 commit comments

Comments
 (0)