Skip to content
58 changes: 41 additions & 17 deletions beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,12 +800,39 @@ impl<E: EthSpec> PeerDB<E> {
);
}

/// Updates the connection state. MUST ONLY BE USED IN TESTS.
pub fn __add_connected_peer_testing_only(
/// Adds a connected peer to the PeerDB and sets the custody subnets.
/// WARNING: This updates the connection state. MUST ONLY BE USED IN TESTS.
pub fn __add_connected_peer_with_custody_subnets(
&mut self,
supernode: bool,
spec: &ChainSpec,
enr_key: CombinedKey,
) -> PeerId {
let peer_id = self.__add_connected_peer(supernode, enr_key, spec);

let subnets = if supernode {
(0..spec.data_column_sidecar_subnet_count)
.map(|subnet_id| subnet_id.into())
.collect()
} else {
let node_id = peer_id_to_node_id(&peer_id).expect("convert peer_id to node_id");
compute_subnets_for_node::<E>(node_id.raw(), spec.custody_requirement, spec)
.expect("should compute custody subnets")
};

let peer_info = self.peers.get_mut(&peer_id).expect("peer exists");
peer_info.set_custody_subnets(subnets);

peer_id
}

/// Adds a connected peer to the PeerDB and updates the connection state.
/// MUST ONLY BE USED IN TESTS.
pub fn __add_connected_peer(
&mut self,
supernode: bool,
enr_key: CombinedKey,
spec: &ChainSpec,
) -> PeerId {
let mut enr = Enr::builder().build(&enr_key).unwrap();
let peer_id = enr.peer_id();
Expand Down Expand Up @@ -842,24 +869,21 @@ impl<E: EthSpec> PeerDB<E> {
},
);

if supernode {
let peer_info = self.peers.get_mut(&peer_id).expect("peer exists");
let all_subnets = (0..spec.data_column_sidecar_subnet_count)
.map(|subnet_id| subnet_id.into())
.collect();
peer_info.set_custody_subnets(all_subnets);
} else {
let peer_info = self.peers.get_mut(&peer_id).expect("peer exists");
let node_id = peer_id_to_node_id(&peer_id).expect("convert peer_id to node_id");
let subnets =
compute_subnets_for_node::<E>(node_id.raw(), spec.custody_requirement, spec)
.expect("should compute custody subnets");
peer_info.set_custody_subnets(subnets);
}

peer_id
}

/// MUST ONLY BE USED IN TESTS.
pub fn __set_custody_subnets(
&mut self,
peer_id: &PeerId,
custody_subnets: HashSet<DataColumnSubnetId>,
) -> Result<(), String> {
self.peers
.get_mut(peer_id)
.map(|info| info.set_custody_subnets(custody_subnets))
.ok_or("Cannot set custody subnets, peer not found".to_string())
}

/// The connection state of the peer has been changed. Modify the peer in the db to ensure all
/// variables are in sync with libp2p.
/// Updating the state can lead to a `BanOperation` which needs to be processed via the peer
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1222,7 +1222,7 @@ mod tests {
let peer_id = network_globals
.peers
.write()
.__add_connected_peer_testing_only(
.__add_connected_peer_with_custody_subnets(
true,
&beacon_chain.spec,
k256::ecdsa::SigningKey::random(&mut rng).into(),
Expand Down
73 changes: 10 additions & 63 deletions beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,11 +424,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
self.request_batches(network)?;
}
}
} else if !self.good_peers_on_sampling_subnets(self.processing_target, network) {
// This is to handle the case where no batch was sent for the current processing
// target when there is no sampling peers available. This is a valid state and should not
// return an error.
return Ok(KeepChain);
} else {
// NOTE: It is possible that the batch doesn't exist for the processing id. This can happen
// when we complete a batch and attempt to download a new batch but there are:
Expand Down Expand Up @@ -969,18 +964,11 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
.collect();
debug!(
?awaiting_downloads,
src, "Attempting to send batches awaiting downlaod"
src, "Attempting to send batches awaiting download"
);

for batch_id in awaiting_downloads {
if self.good_peers_on_sampling_subnets(batch_id, network) {
self.send_batch(network, batch_id)?;
} else {
debug!(
src = "attempt_send_awaiting_download_batches",
"Waiting for peers to be available on sampling column subnets"
);
}
self.send_batch(network, batch_id)?;
}
Ok(KeepChain)
}
Expand Down Expand Up @@ -1031,16 +1019,12 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
return Ok(KeepChain);
}
Err(e) => match e {
// TODO(das): Handle the NoPeer case explicitly and don't drop the batch. For
// sync to work properly it must be okay to have "stalled" batches in
// AwaitingDownload state. Currently it will error with invalid state if
// that happens. Sync manager must periodicatlly prune stalled batches like
// we do for lookup sync. Then we can deprecate the redundant
// `good_peers_on_sampling_subnets` checks.
e
@ (RpcRequestSendError::NoPeer(_) | RpcRequestSendError::InternalError(_)) => {
RpcRequestSendError::NoPeer(err) => {
debug!(error = ?err, "Did not send batch request due to insufficient peers");
}
RpcRequestSendError::InternalError(err) => {
// NOTE: under normal conditions this shouldn't happen but we handle it anyway
warn!(%batch_id, error = ?e, "batch_id" = %batch_id, %batch, "Could not send batch request");
warn!(%batch_id, error = ?err, "batch_id" = %batch_id, %batch, "Could not send batch request");
// register the failed download and check if the batch can be retried
batch.start_downloading(1)?; // fake request_id = 1 is not relevant
match batch.download_failed(None)? {
Expand Down Expand Up @@ -1123,6 +1107,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
) -> Result<KeepChain, RemoveChain> {
let _guard = self.span.clone().entered();
debug!("Resuming chain");
// attempt to download any batches stuck in the `AwaitingDownload` state because of
// a lack of peers earlier
self.attempt_send_awaiting_download_batches(network, "start_syncing")?;
// Request more batches if needed.
self.request_batches(network)?;
// If there is any batch ready for processing, send it.
Expand All @@ -1140,14 +1127,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// check if we have the batch for our optimistic start. If not, request it first.
// We wait for this batch before requesting any other batches.
if let Some(epoch) = self.optimistic_start {
if !self.good_peers_on_sampling_subnets(epoch, network) {
debug!(
src = "request_batches_optimistic",
"Waiting for peers to be available on sampling column subnets"
);
return Ok(KeepChain);
}

if let Entry::Vacant(entry) = self.batches.entry(epoch) {
let batch_type = network.batch_type(epoch);
let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH, batch_type);
Expand All @@ -1173,26 +1152,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
Ok(KeepChain)
}

/// Checks all sampling column subnets for peers. Returns `true` if there is at least one peer in
/// every sampling column subnet.
fn good_peers_on_sampling_subnets(
&self,
epoch: Epoch,
network: &SyncNetworkContext<T>,
) -> bool {
if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) {
// Require peers on all sampling column subnets before sending batches
let sampling_subnets = network.network_globals().sampling_subnets();
network
.network_globals()
.peers
.read()
.has_good_custody_range_sync_peer(&sampling_subnets, epoch)
} else {
true
}
}

/// Creates the next required batch from the chain. If there are no more batches required,
/// `false` is returned.
fn include_next_batch(&mut self, network: &mut SyncNetworkContext<T>) -> Option<BatchId> {
Expand Down Expand Up @@ -1224,18 +1183,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
return None;
}

// don't send batch requests until we have peers on sampling subnets
// TODO(das): this is a workaround to avoid sending out excessive block requests because
// block and data column requests are currently coupled. This can be removed once we find a
// way to decouple the requests and do retries individually, see issue #6258.
if !self.good_peers_on_sampling_subnets(self.to_be_downloaded, network) {
debug!(
src = "include_next_batch",
"Waiting for peers to be available on custody column subnets"
);
return None;
}

// If no batch needs a retry, attempt to send the batch of the next epoch to download
let next_batch_id = self.to_be_downloaded;
// this batch could have been included already being an optimistic batch
Expand Down
34 changes: 30 additions & 4 deletions beacon_node/network/src/sync/tests/lookups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::sync::{
SyncMessage,
manager::{BlockProcessType, BlockProcessingResult, SyncManager},
};
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -41,8 +42,8 @@ use slot_clock::{SlotClock, TestingSlotClock};
use tokio::sync::mpsc;
use tracing::info;
use types::{
BeaconState, BeaconStateBase, BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, ForkName,
Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot,
BeaconState, BeaconStateBase, BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec,
ForkContext, ForkName, Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot,
data_column_sidecar::ColumnIndex,
test_utils::{SeedableRng, TestRandom, XorShiftRng},
};
Expand Down Expand Up @@ -336,7 +337,7 @@ impl TestRig {
.network_globals
.peers
.write()
.__add_connected_peer_testing_only(false, &self.harness.spec, key);
.__add_connected_peer_with_custody_subnets(false, &self.harness.spec, key);
self.log(&format!("Added new peer for testing {peer_id:?}"));
peer_id
}
Expand All @@ -346,7 +347,32 @@ impl TestRig {
self.network_globals
.peers
.write()
.__add_connected_peer_testing_only(true, &self.harness.spec, key)
.__add_connected_peer_with_custody_subnets(true, &self.harness.spec, key)
}

/// Add a connected supernode peer, but without setting the peers' custody subnet.
/// This is to simulate the real behaviour where metadata is only received some time after
/// a connection is established.
pub fn new_connected_supernode_peer_no_metadata_custody_subnet(&mut self) -> PeerId {
let key = self.determinstic_key();
self.network_globals
.peers
.write()
.__add_connected_peer(true, key, &self.harness.spec)
}

/// Update the peer's custody subnet in PeerDB and send a `UpdatedPeerCgc` message to sync.
pub fn send_peer_cgc_update_to_sync(
&mut self,
peer_id: &PeerId,
subnets: HashSet<DataColumnSubnetId>,
) {
self.network_globals
.peers
.write()
.__set_custody_subnets(peer_id, subnets)
.unwrap();
self.send_sync_message(SyncMessage::UpdatedPeerCgc(*peer_id))
}

fn determinstic_key(&mut self) -> CombinedKey {
Expand Down
74 changes: 71 additions & 3 deletions beacon_node/network/src/sync/tests/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::sync::manager::SLOT_IMPORT_TOLERANCE;
use crate::sync::network_context::RangeRequestId;
use crate::sync::range_sync::RangeSyncType;
use beacon_chain::data_column_verification::CustodyDataColumn;
use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy};
use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy, test_spec};
use beacon_chain::{EngineState, NotifyExecutionLayer, block_verification_types::RpcBlock};
use beacon_processor::WorkType;
use lighthouse_network::rpc::RequestType;
Expand All @@ -19,6 +19,7 @@ use lighthouse_network::service::api_types::{
SyncRequestId,
};
use lighthouse_network::{PeerId, SyncInfo};
use std::collections::HashSet;
use std::time::Duration;
use types::{
BlobSidecarList, BlockImportSource, Epoch, EthSpec, Hash256, MinimalEthSpec as E,
Expand Down Expand Up @@ -98,7 +99,7 @@ impl TestRig {
finalized_root,
head_slot: finalized_epoch.start_slot(E::slots_per_epoch()),
head_root: Hash256::random(),
earliest_available_slot: None,
earliest_available_slot: Some(Slot::new(0)),
})
}

Expand All @@ -110,7 +111,7 @@ impl TestRig {
finalized_root: Hash256::random(),
head_slot: finalized_epoch.start_slot(E::slots_per_epoch()),
head_root: Hash256::random(),
earliest_available_slot: None,
earliest_available_slot: Some(Slot::new(0)),
}
}

Expand Down Expand Up @@ -601,3 +602,70 @@ fn finalized_sync_not_enough_custody_peers_on_start() {
let last_epoch = advanced_epochs + EXTRA_SYNCED_EPOCHS;
r.complete_and_process_range_sync_until(last_epoch, filter());
}

/// This is a regression test for the following race condition scenario:
/// 1. A node is connected to 3 supernode peers: peer 1 is synced, & peer 2 and 3 are advanced.
/// 2. No metadata has been received yet (i.e. no custody info), so the node cannot start data
/// column range sync yet.
/// 3. Now peer 1 sends the CGC via metadata response, we now have one peer on all custody subnets,
/// BUT not on the finalized syncing chain.
/// 4. The node tries to `send_batch` but fails repeatedly with `NoPeers`, as there's no peer
/// that is able to serve columns for the advanced epochs. The chain is removed after 5 failed attempts.
/// 5. Now peer 2 & 3 send CGC updates, BUT because there's no syncing chain, nothing happens -
/// sync is stuck until finding new peers.
///
/// The expected behaivour in this scenario should be:
/// 4. not finding suitable peers, chain is kept and batch remains in AwaitingDownload
/// 5. finalized sync should resume as soon as CGC updates are received from peer 2 or 3.
#[test]
fn finalized_sync_not_enough_custody_peers_resume_after_peer_cgc_update() {
// Only run post-PeerDAS
if !test_spec::<E>().is_fulu_scheduled() {
return;
}
let mut r = TestRig::test_setup();

// GIVEN: the node is connected to 3 supernode peers:

// Peer 1 is synced (same finalized epoch)
let peer_1 = r.new_connected_supernode_peer_no_metadata_custody_subnet();
let remote_info = r.local_info().clone();
r.send_sync_message(SyncMessage::AddPeer(peer_1, remote_info));

// Peer 2 is advanced (local finalized epoch + 2)
let advanced_epochs: u64 = 2;
let peer_2 = r.new_connected_supernode_peer_no_metadata_custody_subnet();
let remote_info = r.finalized_remote_info_advanced_by(advanced_epochs.into());
r.send_sync_message(SyncMessage::AddPeer(peer_2, remote_info.clone()));
// We expect a finalized chain to be created with peer 2, but no requests sent out yet due to missing custody info.
r.assert_state(RangeSyncType::Finalized);
r.expect_empty_network();

// Peer 3 is connected and advanced
let peer_3 = r.new_connected_supernode_peer_no_metadata_custody_subnet();
r.send_sync_message(SyncMessage::AddPeer(peer_3, remote_info));
// We are still in finalized sync state (now with peer 3 added)
r.assert_state(RangeSyncType::Finalized);

for (i, p) in [peer_1, peer_2, peer_3].iter().enumerate() {
let peer_idx = i + 1;
r.log(&format!("Peer {peer_idx}: {p:?}"));
}

// WHEN: peer 1 sends its CGC via metadata response
let all_custody_subnets = (0..r.spec.data_column_sidecar_subnet_count)
.map(|i| i.into())
.collect::<HashSet<_>>();
r.send_peer_cgc_update_to_sync(&peer_1, all_custody_subnets.clone());

// We still don't have any peers on the syncing chain with custody columns (peer 1 & 2)
// The node won't send the batch and will remain in the finalized sync state (this was failing before!)
r.assert_state(RangeSyncType::Finalized);

// Now we receive peer 2 & 3's CGC updates, the node will resume syncing from these two peers
r.send_peer_cgc_update_to_sync(&peer_2, all_custody_subnets.clone());
r.send_peer_cgc_update_to_sync(&peer_3, all_custody_subnets);

let last_epoch = advanced_epochs + EXTRA_SYNCED_EPOCHS;
r.complete_and_process_range_sync_until(last_epoch, filter());
}
Loading