Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 43 additions & 65 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
MEMORY_POOL_PORT,
Worker,
events::{EventCodec, PrimaryPing},
helpers::{Cache, PrimarySender, Resolver, Storage, SyncSender, WorkerSender, assign_to_worker},
helpers::{Cache, PrimarySender, Storage, SyncSender, WorkerSender, assign_to_worker},
spawn_blocking,
};
use aleo_std::StorageMode;
Expand All @@ -43,7 +43,7 @@ use snarkos_node_bft_events::{
ValidatorsResponse,
};
use snarkos_node_bft_ledger_service::LedgerService;
use snarkos_node_router::{NodeType, Peer, PeerPoolHandling};
use snarkos_node_router::{NodeType, Peer, PeerPoolHandling, Resolver};
use snarkos_node_sync::{MAX_BLOCKS_BEHIND, communication_service::CommunicationService};
use snarkos_node_tcp::{
Config,
Expand Down Expand Up @@ -161,11 +161,21 @@ pub struct InnerGateway<N: Network> {
}

impl<N: Network> PeerPoolHandling<N> for Gateway<N> {
const MAXIMUM_POOL_SIZE: usize = 200;
const OWNER: &str = CONTEXT;
const PEER_SLASHING_COUNT: usize = 20;

fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
&self.peer_pool
}

fn resolver(&self) -> &RwLock<Resolver<N>> {
&self.resolver
}

fn is_dev(&self) -> bool {
self.dev.is_some()
}
}

impl<N: Network> Gateway<N> {
Expand Down Expand Up @@ -443,35 +453,14 @@ impl<N: Network> Gateway<N> {
#[cfg(test)]
pub fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
// Adds a bidirectional map between the listener address and (ambiguous) peer address.
self.resolver.write().insert_peer(peer_ip, peer_addr, address);
self.resolver.write().insert_peer(peer_ip, peer_addr, Some(address));
// Add a transmission for this peer in the connected peers.
self.peer_pool.write().insert(peer_ip, Peer::new_connecting(peer_ip, false));
if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) {
peer.upgrade_to_connected(peer_addr, peer_ip.port(), address, NodeType::Validator, 0);
}
}

/// Removes the connected peer and adds them to the candidate peers.
fn remove_connected_peer(&self, peer_ip: SocketAddr) {
// Remove the peer from the sync module. Except for some tests, there is always a sync sender.
if let Some(sync_sender) = self.sync_sender.get() {
let tx_block_sync_remove_peer_ = sync_sender.tx_block_sync_remove_peer.clone();
tokio::spawn(async move {
if let Err(e) = tx_block_sync_remove_peer_.send(peer_ip).await {
warn!("Unable to remove '{peer_ip}' from the sync module - {e}");
}
});
}
if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) {
if let Peer::Connected(connected_peer) = peer {
self.resolver.write().remove_peer(peer_ip, connected_peer.aleo_addr);
}
peer.downgrade_to_candidate(peer_ip);
}
#[cfg(feature = "metrics")]
self.update_metrics();
}

/// Sends the given event to specified peer.
///
/// This function returns as soon as the event is queued to be sent,
Expand Down Expand Up @@ -751,45 +740,24 @@ impl<N: Network> Gateway<N> {
// Decrement the number of validators requests for this peer.
self.cache.decrement_outbound_validators_requests(peer_ip);

// If the number of connected validators is less than the minimum, connect to more validators.
if self.number_of_connected_peers() < N::LATEST_MAX_CERTIFICATES().unwrap() as usize {
// Attempt to connect to any validators that are not already connected.
let self_ = self.clone();
tokio::spawn(async move {
for (validator_ip, validator_address) in validators {
if self_.dev.is_some() {
// Ensure the validator IP is not this node.
if self_.is_local_ip(validator_ip) {
continue;
}
} else {
// Ensure the validator IP is not this node and is well-formed.
if !self_.is_valid_peer_ip(validator_ip) {
continue;
}
}

// Ensure the validator address is not this node.
if self_.account.address() == validator_address {
continue;
}
// Ensure the validator IP is not already connected or connecting.
if self_.is_connected(validator_ip) || self_.is_connecting(validator_ip) {
continue;
}
// Ensure the validator address is not already connected.
if self_.is_connected_address(validator_address) {
continue;
}
// Ensure the validator address is an authorized validator.
if !self_.is_authorized_validator_address(validator_address) {
continue;
}
// Attempt to connect to the validator.
self_.connect(validator_ip);
}
});
// Add valid validators as candidates to the peer pool; only validator-related
// filters need to be applied, the rest is handled by `PeerPoolHandling`.
let valid_addrs = validators
.into_iter()
.filter_map(|(listener_addr, aleo_addr)| {
(self.account.address() != aleo_addr
&& !self.is_connected_address(aleo_addr)
&& self.is_authorized_validator_address(aleo_addr))
.then_some(listener_addr)
})
.collect::<Vec<_>>();
if !valid_addrs.is_empty() {
self.insert_candidate_peers(valid_addrs);
}

#[cfg(feature = "metrics")]
self.update_metrics();

Ok(true)
}
Event::WorkerPing(ping) => {
Expand Down Expand Up @@ -1147,13 +1115,23 @@ impl<N: Network> Disconnect for Gateway<N> {
/// Any extra operations to be performed during a disconnect.
async fn handle_disconnect(&self, peer_addr: SocketAddr) {
if let Some(peer_ip) = self.resolve_to_listener(&peer_addr) {
self.remove_connected_peer(peer_ip);

self.downgrade_peer_to_candidate(peer_ip);
// Remove the peer from the sync module. Except for some tests, there is always a sync sender.
if let Some(sync_sender) = self.sync_sender.get() {
let tx_block_sync_remove_peer_ = sync_sender.tx_block_sync_remove_peer.clone();
tokio::spawn(async move {
if let Err(e) = tx_block_sync_remove_peer_.send(peer_ip).await {
warn!("Unable to remove '{peer_ip}' from the sync module - {e}");
}
});
}
// We don't clear this map based on time but only on peer disconnect.
// This is sufficient to avoid infinite growth as the committee has a fixed number
// of members.
self.cache.clear_outbound_validators_requests(peer_ip);
self.cache.clear_outbound_block_requests(peer_ip);
#[cfg(feature = "metrics")]
self.update_metrics();
}
}
}
Expand Down Expand Up @@ -1218,7 +1196,7 @@ impl<N: Network> Handshake for Gateway<N> {
match handshake_result {
Ok(Some(ref cr)) => {
if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
self.resolver.write().insert_peer(addr, peer_addr, cr.address);
self.resolver.write().insert_peer(addr, peer_addr, Some(cr.address));
peer.upgrade_to_connected(
peer_addr,
cr.listener_port,
Expand Down
3 changes: 0 additions & 3 deletions node/bft/src/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ pub use proposal_cache::*;
pub mod ready;
pub use ready::*;

pub mod resolver;
pub use resolver::*;

pub mod signed_proposals;
pub use signed_proposals::*;

Expand Down
99 changes: 0 additions & 99 deletions node/bft/src/helpers/resolver.rs

This file was deleted.

18 changes: 9 additions & 9 deletions node/bft/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2231,7 +2231,7 @@ mod tests {
fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
// First account is primary, which doesn't need to resolve.
for (addr, acct) in accounts.iter().skip(1) {
primary.gateway.resolver().write().insert_peer(*addr, *addr, acct.address());
primary.gateway.resolver().write().insert_peer(*addr, *addr, Some(acct.address()));
}
}

Expand Down Expand Up @@ -2428,7 +2428,7 @@ mod tests {
}

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address());
primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));

// The primary will only consider itself synced if we received
// block locators from a peer.
Expand Down Expand Up @@ -2467,7 +2467,7 @@ mod tests {
}

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address());
primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));

// Add a high block locator to indicate we are not synced.
primary.sync.test_update_peer_locators(peer_ip, sample_block_locators(20)).unwrap();
Expand Down Expand Up @@ -2507,7 +2507,7 @@ mod tests {
}

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address());
primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));

// The primary will only consider itself synced if we received
// block locators from a peer.
Expand Down Expand Up @@ -2544,7 +2544,7 @@ mod tests {
}

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address());
primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
// The primary must be considered synced.
primary.sync.try_block_sync().await;

Expand Down Expand Up @@ -2589,7 +2589,7 @@ mod tests {
}

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address());
primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
// The primary must be considered synced.
primary.sync.try_block_sync().await;

Expand Down Expand Up @@ -2645,7 +2645,7 @@ mod tests {
}

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address());
primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
// The primary must be considered synced.
primary.sync.try_block_sync().await;

Expand Down Expand Up @@ -2692,8 +2692,8 @@ mod tests {
}

// The author must be known to resolver to pass propose checks.
primary_v4.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address());
primary_v5.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address());
primary_v4.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
primary_v5.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));

// primary v4 must be considered synced.
primary_v4.sync.test_update_peer_locators(peer_ip, sample_block_locators(0)).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion node/router/src/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl<N: Network> Router<N> {
match handshake_result {
Ok(Some(ref cr)) => {
if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
self.resolver.write().insert_peer(peer.listener_addr(), peer_addr);
self.resolver.write().insert_peer(peer.listener_addr(), peer_addr, None);
peer.upgrade_to_connected(peer_addr, cr.listener_port, cr.address, cr.node_type, cr.version);
}
#[cfg(feature = "metrics")]
Expand Down
2 changes: 1 addition & 1 deletion node/router/src/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ mod peer;
pub use peer::*;

mod resolver;
pub(crate) use resolver::*;
pub use resolver::*;
Loading