diff --git a/crates/apollo_network/src/discovery/behaviours/bootstrapping/bootstrap_peer.rs b/crates/apollo_network/src/discovery/behaviours/bootstrapping/bootstrap_peer.rs index 8ddea59849a..c9d236b2c1b 100644 --- a/crates/apollo_network/src/discovery/behaviours/bootstrapping/bootstrap_peer.rs +++ b/crates/apollo_network/src/discovery/behaviours/bootstrapping/bootstrap_peer.rs @@ -21,13 +21,17 @@ use tracing::info; use crate::discovery::{RetryConfig, ToOtherBehaviourEvent}; +const NUMBER_OF_CONNECTIONS: usize = 20; + /// A stream that handles the bootstrapping with a bootstrap peer. -/// This stream will automatically dial the bootstrap peer if not already connected. +/// This stream will automatically dial the bootstrap peer to establish and maintain +/// NUMBER_OF_CONNECTIONS connections. pub struct BootstrapPeerEventStream { dial_retry_config: RetryConfig, peer_address: Multiaddr, peer_id: PeerId, - dial_mode: DialMode, + established_connections: usize, + pending_dials: usize, should_add_peer_to_kad_routing_table: bool, dial_retry_strategy: ExponentialBackoff, time_for_next_bootstrap_dial: Instant, @@ -35,13 +39,6 @@ pub struct BootstrapPeerEventStream { sleeper: Option>>, } -#[derive(Debug, PartialEq, Eq)] -enum DialMode { - Dialing, - Connected, - Disconnected, -} - impl BootstrapPeerEventStream { fn wake_if_needed(&mut self) { if let Some(waker) = self.waker.take() { @@ -55,11 +52,12 @@ impl BootstrapPeerEventStream { FromSwarm::DialFailure(DialFailure { peer_id: Some(peer_id), .. }) if peer_id == self.peer_id => { - if self.dial_mode != DialMode::Dialing { + if self.pending_dials > 0 { + self.pending_dials -= 1; + } else { // Not my dial return; } - self.dial_mode = DialMode::Disconnected; // For the case that the reason for failure is consistent (e.g the bootstrap peer // is down), we sleep before redialing let delta_duration = self @@ -72,18 +70,23 @@ impl BootstrapPeerEventStream { FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) if peer_id == self.peer_id => { - self.dial_mode = DialMode::Connected; + self.established_connections += 1; + if self.pending_dials > 0 { + self.pending_dials -= 1; + } // Reset retry dial strategy to original values since we succeeded in dialing self.dial_retry_strategy = self.dial_retry_config.strategy(); self.wake_if_needed(); } - FromSwarm::ConnectionClosed(ConnectionClosed { - peer_id, - remaining_established, - .. - }) if peer_id == self.peer_id && remaining_established == 0 => { - self.dial_mode = DialMode::Disconnected; - self.should_add_peer_to_kad_routing_table = true; + FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, .. }) + if peer_id == self.peer_id => + { + if self.established_connections > 0 { + self.established_connections -= 1; + } + if self.established_connections == 0 { + self.should_add_peer_to_kad_routing_table = true; + } self.time_for_next_bootstrap_dial = now; self.wake_if_needed(); } @@ -104,7 +107,8 @@ impl BootstrapPeerEventStream { dial_retry_config: bootstrap_dial_retry_config, peer_id: bootstrap_peer_id, peer_address: bootstrap_peer_address, - dial_mode: DialMode::Disconnected, + established_connections: 0, + pending_dials: 0, should_add_peer_to_kad_routing_table: true, dial_retry_strategy: bootstrap_dial_retry_strategy, time_for_next_bootstrap_dial: tokio::time::Instant::now(), @@ -113,15 +117,16 @@ impl BootstrapPeerEventStream { } } - fn switch_to_dialing_mode(&mut self) -> ToSwarm { + fn initiate_dial(&mut self) -> ToSwarm { self.sleeper = None; - self.dial_mode = DialMode::Dialing; + self.pending_dials += 1; info!(?self.peer_id, ?self.peer_address, "Performing bootstrap dial"); ToSwarm::Dial { opts: DialOpts::peer_id(self.peer_id) .addresses(vec![self.peer_address.clone()]) - // The peer manager might also be dialing to the bootstrap node. - .condition(PeerCondition::DisconnectedAndNotDialing) + // Allow dialing even if already connected, to establish multiple connections. + // But avoid dialing if we're already in the process of dialing. + .condition(PeerCondition::NotDialing) .build(), } } @@ -138,42 +143,48 @@ impl Stream for BootstrapPeerEventStream { // The future contract requires that we always awake the most recent waker. self.waker = Some(cx.waker().clone()); - match self.dial_mode { - DialMode::Connected => { - if self.should_add_peer_to_kad_routing_table { - self.should_add_peer_to_kad_routing_table = false; - Poll::Ready(Some(ToSwarm::GenerateEvent( - ToOtherBehaviourEvent::FoundListenAddresses { - peer_id: self.peer_id, - listen_addresses: vec![self.peer_address.clone()], - }, - ))) - } else { - // We are connected (and the peer is already in the routing table). Nothing for - // us to do until something changes. - Poll::Pending - } + // If we have at least one connection and need to add the peer to the routing table, do that + // first. + if self.established_connections > 0 && self.should_add_peer_to_kad_routing_table { + self.should_add_peer_to_kad_routing_table = false; + return Poll::Ready(Some(ToSwarm::GenerateEvent( + ToOtherBehaviourEvent::FoundListenAddresses { + peer_id: self.peer_id, + listen_addresses: vec![self.peer_address.clone()], + }, + ))); + } + + // Check if we need to establish more connections. + let total_connections = self.established_connections + self.pending_dials; + if total_connections < NUMBER_OF_CONNECTIONS { + // We need more connections. Check if it's time to dial. + if self.time_for_next_bootstrap_dial <= now { + return Poll::Ready(Some(self.initiate_dial())); } - DialMode::Disconnected => { - if self.time_for_next_bootstrap_dial <= now { - return Poll::Ready(Some(self.switch_to_dialing_mode())); - } - if self.sleeper.is_none() { - let next_wake_up = self.time_for_next_bootstrap_dial; - self.sleeper = Some(Box::pin(tokio::time::sleep_until(next_wake_up))); - } - let sleeper = self - .sleeper - .as_mut() - .expect("Sleeper cannot be None after being created above."); + // Not time to dial yet. Set up or poll the sleeper. + if self.sleeper.is_none() { + let next_wake_up = self.time_for_next_bootstrap_dial; + self.sleeper = Some(Box::pin(tokio::time::sleep_until(next_wake_up))); + } + let sleeper = + self.sleeper.as_mut().expect("Sleeper cannot be None after being created above."); - match sleeper.as_mut().poll(cx) { - Poll::Ready(()) => Poll::Ready(Some(self.switch_to_dialing_mode())), - Poll::Pending => Poll::Pending, + match sleeper.as_mut().poll(cx) { + Poll::Ready(()) => { + info!( + "Sleeper completed sleep in the time between checking it's not time to \ + dial yet, and polling the sleeper. This should be extremely rare/non \ + existent" + ); + Poll::Ready(Some(self.initiate_dial())) } + Poll::Pending => Poll::Pending, } - DialMode::Dialing => Poll::Pending, + } else { + // We have enough connections (established + pending). Nothing to do. + Poll::Pending } } } diff --git a/crates/apollo_propeller/src/behaviour.rs b/crates/apollo_propeller/src/behaviour.rs index 04568043884..dc085df9735 100644 --- a/crates/apollo_propeller/src/behaviour.rs +++ b/crates/apollo_propeller/src/behaviour.rs @@ -1,6 +1,6 @@ //! Propeller network behaviour implementation. -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll, Waker}; use std::time::Duration; @@ -308,7 +308,7 @@ pub struct Behaviour { channel_manager: ChannelManager, /// Currently connected peers. - connected_peers: HashSet, + connected_peers: HashMap>, /// Message authenticity configuration for signing/verification. message_authenticity: MessageAuthenticity, @@ -362,7 +362,7 @@ impl Behaviour { channel_manager: ChannelManager::new(local_peer_id, config.finalized_message_ttl()), config, events: VecDeque::new(), - connected_peers: HashSet::new(), + connected_peers: HashMap::new(), message_authenticity, local_peer_id, processor_results_rx, @@ -1117,7 +1117,7 @@ impl Behaviour { } fn emit_handler_event(&mut self, peer_id: PeerId, event: HandlerIn) { - if !self.connected_peers.contains(&peer_id) { + if !self.connected_peers.contains_key(&peer_id) { if let Some(metrics) = &self.metrics { metrics.increment_send_failure( crate::metrics::ShardSendFailureReason::NotConnectedToPeer, @@ -1131,9 +1131,11 @@ impl Behaviour { return; } + let connections = self.connected_peers.get(&peer_id).unwrap(); + let random_connection = connections.choose(&mut rand::thread_rng()).unwrap(); self.events.push_back(ToSwarm::NotifyHandler { peer_id, - handler: NotifyHandler::Any, + handler: NotifyHandler::One(*random_connection), event, }); self.update_collection_metrics(); @@ -1178,12 +1180,12 @@ impl NetworkBehaviour for Behaviour { match event { FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, - connection_id: _, + connection_id, endpoint: _, failed_addresses: _, other_established: _, }) => { - self.connected_peers.insert(peer_id); + self.connected_peers.entry(peer_id).or_default().push(connection_id); // Update connected peers metric if let Some(metrics) = &self.metrics { @@ -1195,11 +1197,16 @@ impl NetworkBehaviour for Behaviour { } FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, - connection_id: _, + connection_id, endpoint: _, remaining_established, cause: _, }) => { + // Remove the connection ID from tracking + if let Some(connections) = self.connected_peers.get_mut(&peer_id) { + connections.retain(|&id| id != connection_id); + } + if remaining_established == 0 { self.connected_peers.remove(&peer_id);