Skip to content

Commit 7841313

Browse files
using multiple connections in propeller
1 parent 4b4ec10 commit 7841313

File tree

2 files changed

+81
-63
lines changed

2 files changed

+81
-63
lines changed

crates/apollo_network/src/discovery/behaviours/bootstrapping/bootstrap_peer.rs

Lines changed: 66 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -21,27 +21,24 @@ use tracing::info;
2121

2222
use crate::discovery::{RetryConfig, ToOtherBehaviourEvent};
2323

24+
const NUMBER_OF_CONNECTIONS: usize = 20;
25+
2426
/// A stream that handles the bootstrapping with a bootstrap peer.
25-
/// This stream will automatically dial the bootstrap peer if not already connected.
27+
/// This stream will automatically dial the bootstrap peer to establish and maintain
28+
/// NUMBER_OF_CONNECTIONS connections.
2629
pub struct BootstrapPeerEventStream {
2730
dial_retry_config: RetryConfig,
2831
peer_address: Multiaddr,
2932
peer_id: PeerId,
30-
dial_mode: DialMode,
33+
established_connections: usize,
34+
pending_dials: usize,
3135
should_add_peer_to_kad_routing_table: bool,
3236
dial_retry_strategy: ExponentialBackoff,
3337
time_for_next_bootstrap_dial: Instant,
3438
waker: Option<Waker>,
3539
sleeper: Option<Pin<Box<Sleep>>>,
3640
}
3741

38-
#[derive(Debug, PartialEq, Eq)]
39-
enum DialMode {
40-
Dialing,
41-
Connected,
42-
Disconnected,
43-
}
44-
4542
impl BootstrapPeerEventStream {
4643
fn wake_if_needed(&mut self) {
4744
if let Some(waker) = self.waker.take() {
@@ -55,11 +52,12 @@ impl BootstrapPeerEventStream {
5552
FromSwarm::DialFailure(DialFailure { peer_id: Some(peer_id), .. })
5653
if peer_id == self.peer_id =>
5754
{
58-
if self.dial_mode != DialMode::Dialing {
55+
if self.pending_dials > 0 {
56+
self.pending_dials -= 1;
57+
} else {
5958
// Not my dial
6059
return;
6160
}
62-
self.dial_mode = DialMode::Disconnected;
6361
// For the case that the reason for failure is consistent (e.g the bootstrap peer
6462
// is down), we sleep before redialing
6563
let delta_duration = self
@@ -72,18 +70,23 @@ impl BootstrapPeerEventStream {
7270
FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. })
7371
if peer_id == self.peer_id =>
7472
{
75-
self.dial_mode = DialMode::Connected;
73+
self.established_connections += 1;
74+
if self.pending_dials > 0 {
75+
self.pending_dials -= 1;
76+
}
7677
// Reset retry dial strategy to original values since we succeeded in dialing
7778
self.dial_retry_strategy = self.dial_retry_config.strategy();
7879
self.wake_if_needed();
7980
}
80-
FromSwarm::ConnectionClosed(ConnectionClosed {
81-
peer_id,
82-
remaining_established,
83-
..
84-
}) if peer_id == self.peer_id && remaining_established == 0 => {
85-
self.dial_mode = DialMode::Disconnected;
86-
self.should_add_peer_to_kad_routing_table = true;
81+
FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, .. })
82+
if peer_id == self.peer_id =>
83+
{
84+
if self.established_connections > 0 {
85+
self.established_connections -= 1;
86+
}
87+
if self.established_connections == 0 {
88+
self.should_add_peer_to_kad_routing_table = true;
89+
}
8790
self.time_for_next_bootstrap_dial = now;
8891
self.wake_if_needed();
8992
}
@@ -104,7 +107,8 @@ impl BootstrapPeerEventStream {
104107
dial_retry_config: bootstrap_dial_retry_config,
105108
peer_id: bootstrap_peer_id,
106109
peer_address: bootstrap_peer_address,
107-
dial_mode: DialMode::Disconnected,
110+
established_connections: 0,
111+
pending_dials: 0,
108112
should_add_peer_to_kad_routing_table: true,
109113
dial_retry_strategy: bootstrap_dial_retry_strategy,
110114
time_for_next_bootstrap_dial: tokio::time::Instant::now(),
@@ -113,15 +117,16 @@ impl BootstrapPeerEventStream {
113117
}
114118
}
115119

116-
fn switch_to_dialing_mode<T, W>(&mut self) -> ToSwarm<T, W> {
120+
fn initiate_dial<T, W>(&mut self) -> ToSwarm<T, W> {
117121
self.sleeper = None;
118-
self.dial_mode = DialMode::Dialing;
122+
self.pending_dials += 1;
119123
info!(?self.peer_id, ?self.peer_address, "Performing bootstrap dial");
120124
ToSwarm::Dial {
121125
opts: DialOpts::peer_id(self.peer_id)
122126
.addresses(vec![self.peer_address.clone()])
123-
// The peer manager might also be dialing to the bootstrap node.
124-
.condition(PeerCondition::DisconnectedAndNotDialing)
127+
// Allow dialing even if already connected, to establish multiple connections.
128+
// But avoid dialing if we're already in the process of dialing.
129+
.condition(PeerCondition::NotDialing)
125130
.build(),
126131
}
127132
}
@@ -138,42 +143,48 @@ impl Stream for BootstrapPeerEventStream {
138143
// The future contract requires that we always awake the most recent waker.
139144
self.waker = Some(cx.waker().clone());
140145

141-
match self.dial_mode {
142-
DialMode::Connected => {
143-
if self.should_add_peer_to_kad_routing_table {
144-
self.should_add_peer_to_kad_routing_table = false;
145-
Poll::Ready(Some(ToSwarm::GenerateEvent(
146-
ToOtherBehaviourEvent::FoundListenAddresses {
147-
peer_id: self.peer_id,
148-
listen_addresses: vec![self.peer_address.clone()],
149-
},
150-
)))
151-
} else {
152-
// We are connected (and the peer is already in the routing table). Nothing for
153-
// us to do until something changes.
154-
Poll::Pending
155-
}
146+
// If we have at least one connection and need to add the peer to the routing table, do that
147+
// first.
148+
if self.established_connections > 0 && self.should_add_peer_to_kad_routing_table {
149+
self.should_add_peer_to_kad_routing_table = false;
150+
return Poll::Ready(Some(ToSwarm::GenerateEvent(
151+
ToOtherBehaviourEvent::FoundListenAddresses {
152+
peer_id: self.peer_id,
153+
listen_addresses: vec![self.peer_address.clone()],
154+
},
155+
)));
156+
}
157+
158+
// Check if we need to establish more connections.
159+
let total_connections = self.established_connections + self.pending_dials;
160+
if total_connections < NUMBER_OF_CONNECTIONS {
161+
// We need more connections. Check if it's time to dial.
162+
if self.time_for_next_bootstrap_dial <= now {
163+
return Poll::Ready(Some(self.initiate_dial()));
156164
}
157165

158-
DialMode::Disconnected => {
159-
if self.time_for_next_bootstrap_dial <= now {
160-
return Poll::Ready(Some(self.switch_to_dialing_mode()));
161-
}
162-
if self.sleeper.is_none() {
163-
let next_wake_up = self.time_for_next_bootstrap_dial;
164-
self.sleeper = Some(Box::pin(tokio::time::sleep_until(next_wake_up)));
165-
}
166-
let sleeper = self
167-
.sleeper
168-
.as_mut()
169-
.expect("Sleeper cannot be None after being created above.");
166+
// Not time to dial yet. Set up or poll the sleeper.
167+
if self.sleeper.is_none() {
168+
let next_wake_up = self.time_for_next_bootstrap_dial;
169+
self.sleeper = Some(Box::pin(tokio::time::sleep_until(next_wake_up)));
170+
}
171+
let sleeper =
172+
self.sleeper.as_mut().expect("Sleeper cannot be None after being created above.");
170173

171-
match sleeper.as_mut().poll(cx) {
172-
Poll::Ready(()) => Poll::Ready(Some(self.switch_to_dialing_mode())),
173-
Poll::Pending => Poll::Pending,
174+
match sleeper.as_mut().poll(cx) {
175+
Poll::Ready(()) => {
176+
info!(
177+
"Sleeper completed sleep in the time between checking it's not time to \
178+
dial yet, and polling the sleeper. This should be extremely rare/non \
179+
existent"
180+
);
181+
Poll::Ready(Some(self.initiate_dial()))
174182
}
183+
Poll::Pending => Poll::Pending,
175184
}
176-
DialMode::Dialing => Poll::Pending,
185+
} else {
186+
// We have enough connections (established + pending). Nothing to do.
187+
Poll::Pending
177188
}
178189
}
179190
}

crates/apollo_propeller/src/behaviour.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Propeller network behaviour implementation.
22
3-
use std::collections::{HashMap, HashSet, VecDeque};
3+
use std::collections::{HashMap, VecDeque};
44
use std::sync::{Arc, Mutex};
55
use std::task::{Context, Poll, Waker};
66
use std::time::Duration;
@@ -308,7 +308,7 @@ pub struct Behaviour {
308308
channel_manager: ChannelManager,
309309

310310
/// Currently connected peers.
311-
connected_peers: HashSet<PeerId>,
311+
connected_peers: HashMap<PeerId, Vec<ConnectionId>>,
312312

313313
/// Message authenticity configuration for signing/verification.
314314
message_authenticity: MessageAuthenticity,
@@ -362,7 +362,7 @@ impl Behaviour {
362362
channel_manager: ChannelManager::new(local_peer_id, config.finalized_message_ttl()),
363363
config,
364364
events: VecDeque::new(),
365-
connected_peers: HashSet::new(),
365+
connected_peers: HashMap::new(),
366366
message_authenticity,
367367
local_peer_id,
368368
processor_results_rx,
@@ -1117,7 +1117,7 @@ impl Behaviour {
11171117
}
11181118

11191119
fn emit_handler_event(&mut self, peer_id: PeerId, event: HandlerIn) {
1120-
if !self.connected_peers.contains(&peer_id) {
1120+
if !self.connected_peers.contains_key(&peer_id) {
11211121
if let Some(metrics) = &self.metrics {
11221122
metrics.increment_send_failure(
11231123
crate::metrics::ShardSendFailureReason::NotConnectedToPeer,
@@ -1131,9 +1131,11 @@ impl Behaviour {
11311131
return;
11321132
}
11331133

1134+
let connections = self.connected_peers.get(&peer_id).unwrap();
1135+
let random_connection = connections.choose(&mut rand::thread_rng()).unwrap();
11341136
self.events.push_back(ToSwarm::NotifyHandler {
11351137
peer_id,
1136-
handler: NotifyHandler::Any,
1138+
handler: NotifyHandler::One(*random_connection),
11371139
event,
11381140
});
11391141
self.update_collection_metrics();
@@ -1178,12 +1180,12 @@ impl NetworkBehaviour for Behaviour {
11781180
match event {
11791181
FromSwarm::ConnectionEstablished(ConnectionEstablished {
11801182
peer_id,
1181-
connection_id: _,
1183+
connection_id,
11821184
endpoint: _,
11831185
failed_addresses: _,
11841186
other_established: _,
11851187
}) => {
1186-
self.connected_peers.insert(peer_id);
1188+
self.connected_peers.entry(peer_id).or_default().push(connection_id);
11871189

11881190
// Update connected peers metric
11891191
if let Some(metrics) = &self.metrics {
@@ -1195,11 +1197,16 @@ impl NetworkBehaviour for Behaviour {
11951197
}
11961198
FromSwarm::ConnectionClosed(ConnectionClosed {
11971199
peer_id,
1198-
connection_id: _,
1200+
connection_id,
11991201
endpoint: _,
12001202
remaining_established,
12011203
cause: _,
12021204
}) => {
1205+
// Remove the connection ID from tracking
1206+
if let Some(connections) = self.connected_peers.get_mut(&peer_id) {
1207+
connections.retain(|&id| id != connection_id);
1208+
}
1209+
12031210
if remaining_established == 0 {
12041211
self.connected_peers.remove(&peer_id);
12051212

0 commit comments

Comments
 (0)