Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion consensus/core/src/authority_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@

use std::{sync::Arc, time::Instant};

use consensus_config::{AuthorityIndex, Committee, NetworkKeyPair, Parameters, ProtocolKeyPair};
use consensus_config::{
AuthorityIndex, Committee, NetworkKeyPair, NetworkPublicKey, Parameters, ProtocolKeyPair,
};
use itertools::Itertools;
use mysten_metrics::spawn_logged_monitored_task;
use mysten_network::Multiaddr;
use parking_lot::RwLock;
use prometheus::Registry;
use sui_protocol_config::ProtocolConfig;
Expand Down Expand Up @@ -92,6 +95,12 @@ impl ConsensusAuthority {
}
}

pub fn update_peer_address(&self, network_pubkey: NetworkPublicKey, addresses: Vec<Multiaddr>) {
match self {
Self::WithTonic(authority) => authority.update_peer_address(network_pubkey, addresses),
}
}

pub fn transaction_client(&self) -> Arc<TransactionClient> {
match self {
Self::WithTonic(authority) => authority.transaction_client(),
Expand Down Expand Up @@ -402,6 +411,36 @@ where
pub(crate) fn transaction_client(&self) -> Arc<TransactionClient> {
self.transaction_client.clone()
}

pub(crate) fn update_peer_address(
&self,
network_pubkey: NetworkPublicKey,
addresses: Vec<Multiaddr>,
) {
// Find the peer index for this network key
let Some(peer) = self
.context
.committee
.authorities()
.find(|(_, authority)| authority.network_key == network_pubkey)
.map(|(index, _)| index)
else {
warn!(
"Network public key {:?} not found in committee, ignoring address update",
network_pubkey
);
return;
};

// Update the address in the network manager
self.network_manager.update_peer_address(peer, addresses);

// Re-subscribe to the peer to force reconnection with new address
if peer != self.context.own_index {
info!("Re-subscribing to peer {} after address update", peer);
self.subscriber.subscribe(peer);
}
}
}

#[cfg(test)]
Expand Down
5 changes: 5 additions & 0 deletions consensus/core/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use bytes::Bytes;
use consensus_config::{AuthorityIndex, NetworkKeyPair};
use consensus_types::block::{BlockRef, Round};
use futures::Stream;
use mysten_network::Multiaddr;

use crate::{
block::{ExtendedBlock, VerifiedBlock},
Expand Down Expand Up @@ -190,6 +191,10 @@ where

/// Stops the network service.
async fn stop(&mut self);

/// Updates the network address for a peer identified by their authority index.
/// If addresses is empty, any address override is cleared.
fn update_peer_address(&self, peer: AuthorityIndex, addresses: Vec<Multiaddr>);
}

/// Serialized block with extended information from the proposing authority.
Expand Down
177 changes: 176 additions & 1 deletion consensus/core/src/network/tonic_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ impl TonicClient {
.accept_compressed(CompressionEncoding::Zstd);
Ok(client)
}

pub(crate) fn update_peer_address(&self, peer: AuthorityIndex, addresses: Vec<Multiaddr>) {
self.channel_pool.update_address(peer, addresses);
}
}

// TODO: make sure callsites do not send request to own index, and return error otherwise.
Expand Down Expand Up @@ -357,13 +361,50 @@ struct ChannelPool {
context: Arc<Context>,
// Size is limited by known authorities in the committee.
channels: RwLock<BTreeMap<AuthorityIndex, Channel>>,
// Address overrides for peers, indexed by AuthorityIndex
address_overrides: RwLock<BTreeMap<AuthorityIndex, Multiaddr>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will probably need some refactoring to work with the source-based priorities

maybe it's worth moving that logic to track which address group currently has priority into EndpointManager, instead of duplicating it in both the p2p network code and the consensus code? but I will leave that up to you

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok let me rebase and see what can I do here

}

impl ChannelPool {
fn new(context: Arc<Context>) -> Self {
Self {
context,
channels: RwLock::new(BTreeMap::new()),
address_overrides: RwLock::new(BTreeMap::new()),
}
}

// Update the address override for a peer. If the list of addresses is empty, the override is cleared.
fn update_address(&self, peer: AuthorityIndex, addresses: Vec<Multiaddr>) {
{
let mut overrides = self.address_overrides.write();

// Treat the empty list as a clear operation.
if addresses.is_empty() {
overrides.remove(&peer);
info!("Cleared address override for peer {}", peer);
return;
}

// Otherwise, set the first address as the override.
// TODO: support multiple addresses. For now, we only support one address per peer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's file a task because this is a big missing piece.

let address = addresses.first().cloned().unwrap();
if let Some(previous_address) = overrides.insert(peer, address.clone()) {
info!(
"Updated address override for peer {} from {} to {}",
peer, previous_address, address
);
} else {
info!("Set address override for peer {} to {}", peer, address);
}
}

let mut channels = self.channels.write();
if channels.remove(&peer).is_some() {
info!(
"Cleared cached channel for peer {} due to address update",
peer
);
}
}

Expand All @@ -381,7 +422,16 @@ impl ChannelPool {
}

let authority = self.context.committee.authority(peer);
let address = to_host_port_str(&authority.address).map_err(|e| {

let peer_address = {
let overrides = self.address_overrides.read();
overrides
.get(&peer)
.cloned()
.unwrap_or_else(|| authority.address.clone())
};

let address = to_host_port_str(&peer_address).map_err(|e| {
ConsensusError::NetworkConfig(format!("Cannot convert address to host:port: {e:?}"))
})?;
let address = format!("https://{address}");
Expand Down Expand Up @@ -712,6 +762,10 @@ impl<S: NetworkService> NetworkManager<S> for TonicManager {
self.client.clone()
}

fn update_peer_address(&self, peer: AuthorityIndex, addresses: Vec<Multiaddr>) {
self.client.update_peer_address(peer, addresses);
}

async fn install_service(&mut self, service: Arc<S>) {
self.context
.metrics
Expand Down Expand Up @@ -1166,3 +1220,124 @@ fn chunk_blocks(blocks: Vec<Bytes>, chunk_limit: usize) -> Vec<Vec<Bytes>> {
}
chunks
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{context::Clock, metrics::initialise_metrics};
use consensus_config::{Parameters, local_committee_and_keys};
use prometheus::Registry;
use sui_protocol_config::ProtocolConfig;

fn create_test_context_and_client() -> (Arc<Context>, TonicClient) {
let (committee, mut keypairs) = local_committee_and_keys(0, vec![1, 1, 1, 1]);
let parameters = Parameters::default();
let protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
let metrics = initialise_metrics(Registry::new());

let context = Arc::new(Context::new(
0,
committee.to_authority_index(0).unwrap(),
committee,
parameters,
protocol_config,
metrics,
Arc::new(Clock::default()),
));

let (network_keypair, _protocol_keypair) = keypairs.remove(0);
let client = TonicClient::new(context.clone(), network_keypair);

(context, client)
}

#[test]
fn test_update_peer_address() {
let (context, client) = create_test_context_and_client();

let peer = context.committee.to_authority_index(1).unwrap();
let new_address: Multiaddr = "/ip4/127.0.0.1/udp/9000".parse().unwrap();

client.update_peer_address(peer, vec![new_address.clone()]);

// Verify the override was set
{
let overrides = client.channel_pool.address_overrides.read();
assert_eq!(overrides.get(&peer), Some(&new_address));
}

// Update with a different address
let newer_address: Multiaddr = "/ip4/127.0.0.1/udp/9001".parse().unwrap();
client.update_peer_address(peer, vec![newer_address.clone()]);

// Verify the override was updated
{
let overrides = client.channel_pool.address_overrides.read();
assert_eq!(overrides.get(&peer), Some(&newer_address));
}

// Verify channels map doesn't contain the peer
{
let channels = client.channel_pool.channels.read();
assert!(!channels.contains_key(&peer));
}
}

#[test]
fn test_clear_peer_address() {
let (context, client) = create_test_context_and_client();

let peer = context.committee.to_authority_index(1).unwrap();
let new_address: Multiaddr = "/ip4/127.0.0.1/udp/9000".parse().unwrap();

// Set address override
client.update_peer_address(peer, vec![new_address]);

// Verify the override was set
{
let overrides = client.channel_pool.address_overrides.read();
assert!(overrides.contains_key(&peer));
}

// Clear the override with empty list
client.update_peer_address(peer, vec![]);

// Verify the override was cleared
{
let overrides = client.channel_pool.address_overrides.read();
assert!(!overrides.contains_key(&peer));
}
}

#[test]
fn test_different_peers_independent() {
let (context, client) = create_test_context_and_client();

let peer1 = context.committee.to_authority_index(1).unwrap();
let peer2 = context.committee.to_authority_index(2).unwrap();

let address1: Multiaddr = "/ip4/127.0.0.1/udp/9000".parse().unwrap();
let address2: Multiaddr = "/ip4/127.0.0.1/udp/9001".parse().unwrap();

// Set different addresses for different peers
client.update_peer_address(peer1, vec![address1.clone()]);
client.update_peer_address(peer2, vec![address2.clone()]);

// Verify both overrides are set correctly
{
let overrides = client.channel_pool.address_overrides.read();
assert_eq!(overrides.get(&peer1), Some(&address1));
assert_eq!(overrides.get(&peer2), Some(&address2));
}

// Clear one peer's override
client.update_peer_address(peer1, vec![]);

// Verify only peer1's override was cleared
{
let overrides = client.channel_pool.address_overrides.read();
assert!(!overrides.contains_key(&peer1));
assert_eq!(overrides.get(&peer2), Some(&address2));
}
}
}
26 changes: 24 additions & 2 deletions crates/sui-core/src/consensus_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,23 @@ use consensus_core::{
use core::panic;
use fastcrypto::traits::KeyPair as _;
use mysten_metrics::{RegistryID, RegistryService};
use mysten_network::Multiaddr;
use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use sui_config::{ConsensusConfig, NodeConfig};
use sui_network::endpoint_manager::ConsensusAddressUpdater;
use sui_protocol_config::ProtocolVersion;
use sui_types::error::SuiResult;
use sui_types::crypto::NetworkPublicKey;
use sui_types::error::{SuiErrorKind, SuiResult};
use sui_types::messages_consensus::{ConsensusPosition, ConsensusTransaction};
use sui_types::{
committee::EpochId, sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
};
use tokio::sync::{Mutex, broadcast};
use tokio::time::{sleep, timeout};
use tracing::{error, info};
use tracing::{error, info, warn};

#[cfg(test)]
#[path = "../unit_tests/consensus_manager_tests.rs"]
Expand Down Expand Up @@ -316,6 +319,25 @@ impl ConsensusManager {
}
}

// Implementing the interface so we can update the consensus peer addresses when requested.
impl ConsensusAddressUpdater for ConsensusManager {
fn update(&self, network_pubkey: NetworkPublicKey, addresses: Vec<Multiaddr>) -> SuiResult<()> {
if let Some(authority) = self.authority.load_full() {
let network_pubkey = consensus_config::NetworkPublicKey::new(network_pubkey);
authority.0.update_peer_address(network_pubkey, addresses);
Ok(())
} else {
warn!(
"Consensus authority node is not running, ignoring update of peer addresses for network public key {network_pubkey:?} and addresses {addresses:?}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it safe to ignore updates here? could you get into a race where you have an update that you want to be applied once consensus starts, but it gets dropped here because it came too soon?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method returns an error and basically I am thinking it's the responsibility of the caller to retry or take further decisions. Now, for the use cases we are looking at I guess this will be quite an edge case?

);
Err(SuiErrorKind::GenericAuthorityError {
error: "Consensus authority node is not running".to_string(),
}
.into())
}
}
}

/// A ConsensusClient that can be updated internally at any time. This usually happening during epoch
/// change where a client is set after the new consensus is started for the new epoch.
#[derive(Default)]
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-network/src/discovery/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ async fn peers_are_added_from_endpoint_manager() -> Result<()> {
let peer2_addr: Multiaddr = format!("/dns/localhost/udp/{}", network_2.local_addr().port())
.parse()
.unwrap();
endpoint_manager_1.update_endpoint(
let _ = endpoint_manager_1.update_endpoint(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably should not be ignoring errors here?

EndpointId::P2p(PeerId(peer_2_network_pubkey.0.to_bytes())),
vec![peer2_addr],
);
Expand Down
Loading
Loading