Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,24 @@ use tracing::info;

use crate::discovery::{RetryConfig, ToOtherBehaviourEvent};

const NUMBER_OF_CONNECTIONS: usize = 20;

/// A stream that handles the bootstrapping with a bootstrap peer.
/// This stream will automatically dial the bootstrap peer if not already connected.
/// This stream will automatically dial the bootstrap peer to establish and maintain
/// NUMBER_OF_CONNECTIONS connections.
pub struct BootstrapPeerEventStream {
dial_retry_config: RetryConfig,
peer_address: Multiaddr,
peer_id: PeerId,
dial_mode: DialMode,
established_connections: usize,
pending_dials: usize,
should_add_peer_to_kad_routing_table: bool,
dial_retry_strategy: ExponentialBackoff,
time_for_next_bootstrap_dial: Instant,
waker: Option<Waker>,
sleeper: Option<Pin<Box<Sleep>>>,
}

#[derive(Debug, PartialEq, Eq)]
enum DialMode {
Dialing,
Connected,
Disconnected,
}

impl BootstrapPeerEventStream {
fn wake_if_needed(&mut self) {
if let Some(waker) = self.waker.take() {
Expand All @@ -55,11 +52,12 @@ impl BootstrapPeerEventStream {
FromSwarm::DialFailure(DialFailure { peer_id: Some(peer_id), .. })
if peer_id == self.peer_id =>
{
if self.dial_mode != DialMode::Dialing {
if self.pending_dials > 0 {
self.pending_dials -= 1;
} else {
// Not my dial
return;
}
self.dial_mode = DialMode::Disconnected;
// For the case that the reason for failure is consistent (e.g the bootstrap peer
// is down), we sleep before redialing
let delta_duration = self
Expand All @@ -72,18 +70,23 @@ impl BootstrapPeerEventStream {
FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. })
if peer_id == self.peer_id =>
{
self.dial_mode = DialMode::Connected;
self.established_connections += 1;
if self.pending_dials > 0 {
self.pending_dials -= 1;
}
// Reset retry dial strategy to original values since we succeeded in dialing
self.dial_retry_strategy = self.dial_retry_config.strategy();
self.wake_if_needed();
}
FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
remaining_established,
..
}) if peer_id == self.peer_id && remaining_established == 0 => {
self.dial_mode = DialMode::Disconnected;
self.should_add_peer_to_kad_routing_table = true;
FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, .. })
if peer_id == self.peer_id =>
{
if self.established_connections > 0 {
self.established_connections -= 1;
}
if self.established_connections == 0 {
self.should_add_peer_to_kad_routing_table = true;
}
self.time_for_next_bootstrap_dial = now;
self.wake_if_needed();
}
Expand All @@ -104,7 +107,8 @@ impl BootstrapPeerEventStream {
dial_retry_config: bootstrap_dial_retry_config,
peer_id: bootstrap_peer_id,
peer_address: bootstrap_peer_address,
dial_mode: DialMode::Disconnected,
established_connections: 0,
pending_dials: 0,
should_add_peer_to_kad_routing_table: true,
dial_retry_strategy: bootstrap_dial_retry_strategy,
time_for_next_bootstrap_dial: tokio::time::Instant::now(),
Expand All @@ -113,15 +117,16 @@ impl BootstrapPeerEventStream {
}
}

fn switch_to_dialing_mode<T, W>(&mut self) -> ToSwarm<T, W> {
fn initiate_dial<T, W>(&mut self) -> ToSwarm<T, W> {
self.sleeper = None;
self.dial_mode = DialMode::Dialing;
self.pending_dials += 1;
info!(?self.peer_id, ?self.peer_address, "Performing bootstrap dial");
ToSwarm::Dial {
opts: DialOpts::peer_id(self.peer_id)
.addresses(vec![self.peer_address.clone()])
// The peer manager might also be dialing to the bootstrap node.
.condition(PeerCondition::DisconnectedAndNotDialing)
// Allow dialing even if already connected, to establish multiple connections.
// But avoid dialing if we're already in the process of dialing.
.condition(PeerCondition::NotDialing)
.build(),
}
}
Expand All @@ -138,42 +143,48 @@ impl Stream for BootstrapPeerEventStream {
// The future contract requires that we always awake the most recent waker.
self.waker = Some(cx.waker().clone());

match self.dial_mode {
DialMode::Connected => {
if self.should_add_peer_to_kad_routing_table {
self.should_add_peer_to_kad_routing_table = false;
Poll::Ready(Some(ToSwarm::GenerateEvent(
ToOtherBehaviourEvent::FoundListenAddresses {
peer_id: self.peer_id,
listen_addresses: vec![self.peer_address.clone()],
},
)))
} else {
// We are connected (and the peer is already in the routing table). Nothing for
// us to do until something changes.
Poll::Pending
}
// If we have at least one connection and need to add the peer to the routing table, do that
// first.
if self.established_connections > 0 && self.should_add_peer_to_kad_routing_table {
self.should_add_peer_to_kad_routing_table = false;
return Poll::Ready(Some(ToSwarm::GenerateEvent(
ToOtherBehaviourEvent::FoundListenAddresses {
peer_id: self.peer_id,
listen_addresses: vec![self.peer_address.clone()],
},
)));
}

// Check if we need to establish more connections.
let total_connections = self.established_connections + self.pending_dials;
if total_connections < NUMBER_OF_CONNECTIONS {
// We need more connections. Check if it's time to dial.
if self.time_for_next_bootstrap_dial <= now {
return Poll::Ready(Some(self.initiate_dial()));
}

DialMode::Disconnected => {
if self.time_for_next_bootstrap_dial <= now {
return Poll::Ready(Some(self.switch_to_dialing_mode()));
}
if self.sleeper.is_none() {
let next_wake_up = self.time_for_next_bootstrap_dial;
self.sleeper = Some(Box::pin(tokio::time::sleep_until(next_wake_up)));
}
let sleeper = self
.sleeper
.as_mut()
.expect("Sleeper cannot be None after being created above.");
// Not time to dial yet. Set up or poll the sleeper.
if self.sleeper.is_none() {
let next_wake_up = self.time_for_next_bootstrap_dial;
self.sleeper = Some(Box::pin(tokio::time::sleep_until(next_wake_up)));
}
let sleeper =
self.sleeper.as_mut().expect("Sleeper cannot be None after being created above.");

match sleeper.as_mut().poll(cx) {
Poll::Ready(()) => Poll::Ready(Some(self.switch_to_dialing_mode())),
Poll::Pending => Poll::Pending,
match sleeper.as_mut().poll(cx) {
Poll::Ready(()) => {
info!(
"Sleeper completed sleep in the time between checking it's not time to \
dial yet, and polling the sleeper. This should be extremely rare/non \
existent"
);
Poll::Ready(Some(self.initiate_dial()))
}
Poll::Pending => Poll::Pending,
}
DialMode::Dialing => Poll::Pending,
} else {
// We have enough connections (established + pending). Nothing to do.
Poll::Pending
}
}
}
23 changes: 15 additions & 8 deletions crates/apollo_propeller/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Propeller network behaviour implementation.

use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::time::Duration;
Expand Down Expand Up @@ -308,7 +308,7 @@ pub struct Behaviour {
channel_manager: ChannelManager,

/// Currently connected peers.
connected_peers: HashSet<PeerId>,
connected_peers: HashMap<PeerId, Vec<ConnectionId>>,

/// Message authenticity configuration for signing/verification.
message_authenticity: MessageAuthenticity,
Expand Down Expand Up @@ -362,7 +362,7 @@ impl Behaviour {
channel_manager: ChannelManager::new(local_peer_id, config.finalized_message_ttl()),
config,
events: VecDeque::new(),
connected_peers: HashSet::new(),
connected_peers: HashMap::new(),
message_authenticity,
local_peer_id,
processor_results_rx,
Expand Down Expand Up @@ -1117,7 +1117,7 @@ impl Behaviour {
}

fn emit_handler_event(&mut self, peer_id: PeerId, event: HandlerIn) {
if !self.connected_peers.contains(&peer_id) {
if !self.connected_peers.contains_key(&peer_id) {
if let Some(metrics) = &self.metrics {
metrics.increment_send_failure(
crate::metrics::ShardSendFailureReason::NotConnectedToPeer,
Expand All @@ -1131,9 +1131,11 @@ impl Behaviour {
return;
}

let connections = self.connected_peers.get(&peer_id).unwrap();
let random_connection = connections.choose(&mut rand::thread_rng()).unwrap();
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
handler: NotifyHandler::One(*random_connection),
event,
});
self.update_collection_metrics();
Expand Down Expand Up @@ -1178,12 +1180,12 @@ impl NetworkBehaviour for Behaviour {
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
connection_id: _,
connection_id,
endpoint: _,
failed_addresses: _,
other_established: _,
}) => {
self.connected_peers.insert(peer_id);
self.connected_peers.entry(peer_id).or_default().push(connection_id);

// Update connected peers metric
if let Some(metrics) = &self.metrics {
Expand All @@ -1195,11 +1197,16 @@ impl NetworkBehaviour for Behaviour {
}
FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id: _,
connection_id,
endpoint: _,
remaining_established,
cause: _,
}) => {
// Remove the connection ID from tracking
if let Some(connections) = self.connected_peers.get_mut(&peer_id) {
connections.retain(|&id| id != connection_id);
}

if remaining_established == 0 {
self.connected_peers.remove(&peer_id);

Expand Down
Loading