diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 0ccad8d0421..541d6364ca9 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -800,12 +800,39 @@ impl PeerDB { ); } - /// Updates the connection state. MUST ONLY BE USED IN TESTS. - pub fn __add_connected_peer_testing_only( + /// Adds a connected peer to the PeerDB and sets the custody subnets. + /// WARNING: This updates the connection state. MUST ONLY BE USED IN TESTS. + pub fn __add_connected_peer_with_custody_subnets( &mut self, supernode: bool, spec: &ChainSpec, enr_key: CombinedKey, + ) -> PeerId { + let peer_id = self.__add_connected_peer(supernode, enr_key, spec); + + let subnets = if supernode { + (0..spec.data_column_sidecar_subnet_count) + .map(|subnet_id| subnet_id.into()) + .collect() + } else { + let node_id = peer_id_to_node_id(&peer_id).expect("convert peer_id to node_id"); + compute_subnets_for_node::(node_id.raw(), spec.custody_requirement, spec) + .expect("should compute custody subnets") + }; + + let peer_info = self.peers.get_mut(&peer_id).expect("peer exists"); + peer_info.set_custody_subnets(subnets); + + peer_id + } + + /// Adds a connected peer to the PeerDB and updates the connection state. + /// MUST ONLY BE USED IN TESTS. + pub fn __add_connected_peer( + &mut self, + supernode: bool, + enr_key: CombinedKey, + spec: &ChainSpec, ) -> PeerId { let mut enr = Enr::builder().build(&enr_key).unwrap(); let peer_id = enr.peer_id(); @@ -842,24 +869,21 @@ impl PeerDB { }, ); - if supernode { - let peer_info = self.peers.get_mut(&peer_id).expect("peer exists"); - let all_subnets = (0..spec.data_column_sidecar_subnet_count) - .map(|subnet_id| subnet_id.into()) - .collect(); - peer_info.set_custody_subnets(all_subnets); - } else { - let peer_info = self.peers.get_mut(&peer_id).expect("peer exists"); - let node_id = peer_id_to_node_id(&peer_id).expect("convert peer_id to node_id"); - let subnets = - compute_subnets_for_node::(node_id.raw(), spec.custody_requirement, spec) - .expect("should compute custody subnets"); - peer_info.set_custody_subnets(subnets); - } - peer_id } + /// MUST ONLY BE USED IN TESTS. + pub fn __set_custody_subnets( + &mut self, + peer_id: &PeerId, + custody_subnets: HashSet, + ) -> Result<(), String> { + self.peers + .get_mut(peer_id) + .map(|info| info.set_custody_subnets(custody_subnets)) + .ok_or("Cannot set custody subnets, peer not found".to_string()) + } + /// The connection state of the peer has been changed. Modify the peer in the db to ensure all /// variables are in sync with libp2p. /// Updating the state can lead to a `BanOperation` which needs to be processed via the peer diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index d5a4e9b73a8..fc04f6d4f1d 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -1056,7 +1056,9 @@ impl BackFillSync { let in_buffer = |batch: &BatchInfo| { matches!( batch.state(), - BatchState::Downloading(..) | BatchState::AwaitingProcessing(..) + BatchState::AwaitingDownload + | BatchState::Downloading(..) + | BatchState::AwaitingProcessing(..) ) }; if self @@ -1222,7 +1224,7 @@ mod tests { let peer_id = network_globals .peers .write() - .__add_connected_peer_testing_only( + .__add_connected_peer_with_custody_subnets( true, &beacon_chain.spec, k256::ecdsa::SigningKey::random(&mut rng).into(), diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 3b816c09224..ebb1715c59b 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -424,11 +424,6 @@ impl SyncingChain { self.request_batches(network)?; } } - } else if !self.good_peers_on_sampling_subnets(self.processing_target, network) { - // This is to handle the case where no batch was sent for the current processing - // target when there is no sampling peers available. This is a valid state and should not - // return an error. - return Ok(KeepChain); } else { // NOTE: It is possible that the batch doesn't exist for the processing id. This can happen // when we complete a batch and attempt to download a new batch but there are: @@ -969,18 +964,11 @@ impl SyncingChain { .collect(); debug!( ?awaiting_downloads, - src, "Attempting to send batches awaiting downlaod" + src, "Attempting to send batches awaiting download" ); for batch_id in awaiting_downloads { - if self.good_peers_on_sampling_subnets(batch_id, network) { - self.send_batch(network, batch_id)?; - } else { - debug!( - src = "attempt_send_awaiting_download_batches", - "Waiting for peers to be available on sampling column subnets" - ); - } + self.send_batch(network, batch_id)?; } Ok(KeepChain) } @@ -1031,16 +1019,12 @@ impl SyncingChain { return Ok(KeepChain); } Err(e) => match e { - // TODO(das): Handle the NoPeer case explicitly and don't drop the batch. For - // sync to work properly it must be okay to have "stalled" batches in - // AwaitingDownload state. Currently it will error with invalid state if - // that happens. Sync manager must periodicatlly prune stalled batches like - // we do for lookup sync. Then we can deprecate the redundant - // `good_peers_on_sampling_subnets` checks. - e - @ (RpcRequestSendError::NoPeer(_) | RpcRequestSendError::InternalError(_)) => { + RpcRequestSendError::NoPeer(err) => { + debug!(error = ?err, "Did not send batch request due to insufficient peers"); + } + RpcRequestSendError::InternalError(err) => { // NOTE: under normal conditions this shouldn't happen but we handle it anyway - warn!(%batch_id, error = ?e, "batch_id" = %batch_id, %batch, "Could not send batch request"); + warn!(%batch_id, error = ?err, "batch_id" = %batch_id, %batch, "Could not send batch request"); // register the failed download and check if the batch can be retried batch.start_downloading(1)?; // fake request_id = 1 is not relevant match batch.download_failed(None)? { @@ -1123,6 +1107,9 @@ impl SyncingChain { ) -> Result { let _guard = self.span.clone().entered(); debug!("Resuming chain"); + // attempt to download any batches stuck in the `AwaitingDownload` state because of + // a lack of peers earlier + self.attempt_send_awaiting_download_batches(network, "start_syncing")?; // Request more batches if needed. self.request_batches(network)?; // If there is any batch ready for processing, send it. @@ -1140,14 +1127,6 @@ impl SyncingChain { // check if we have the batch for our optimistic start. If not, request it first. // We wait for this batch before requesting any other batches. if let Some(epoch) = self.optimistic_start { - if !self.good_peers_on_sampling_subnets(epoch, network) { - debug!( - src = "request_batches_optimistic", - "Waiting for peers to be available on sampling column subnets" - ); - return Ok(KeepChain); - } - if let Entry::Vacant(entry) = self.batches.entry(epoch) { let batch_type = network.batch_type(epoch); let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH, batch_type); @@ -1173,26 +1152,6 @@ impl SyncingChain { Ok(KeepChain) } - /// Checks all sampling column subnets for peers. Returns `true` if there is at least one peer in - /// every sampling column subnet. - fn good_peers_on_sampling_subnets( - &self, - epoch: Epoch, - network: &SyncNetworkContext, - ) -> bool { - if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) { - // Require peers on all sampling column subnets before sending batches - let sampling_subnets = network.network_globals().sampling_subnets(); - network - .network_globals() - .peers - .read() - .has_good_custody_range_sync_peer(&sampling_subnets, epoch) - } else { - true - } - } - /// Creates the next required batch from the chain. If there are no more batches required, /// `false` is returned. fn include_next_batch(&mut self, network: &mut SyncNetworkContext) -> Option { @@ -1211,7 +1170,9 @@ impl SyncingChain { let in_buffer = |batch: &BatchInfo| { matches!( batch.state(), - BatchState::Downloading(..) | BatchState::AwaitingProcessing(..) + BatchState::AwaitingDownload + | BatchState::Downloading(..) + | BatchState::AwaitingProcessing(..) ) }; if self @@ -1224,18 +1185,6 @@ impl SyncingChain { return None; } - // don't send batch requests until we have peers on sampling subnets - // TODO(das): this is a workaround to avoid sending out excessive block requests because - // block and data column requests are currently coupled. This can be removed once we find a - // way to decouple the requests and do retries individually, see issue #6258. - if !self.good_peers_on_sampling_subnets(self.to_be_downloaded, network) { - debug!( - src = "include_next_batch", - "Waiting for peers to be available on custody column subnets" - ); - return None; - } - // If no batch needs a retry, attempt to send the batch of the next epoch to download let next_batch_id = self.to_be_downloaded; // this batch could have been included already being an optimistic batch diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 2edcd12f019..ddee2c9b40c 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -7,6 +7,7 @@ use crate::sync::{ SyncMessage, manager::{BlockProcessType, BlockProcessingResult, SyncManager}, }; +use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; @@ -41,8 +42,8 @@ use slot_clock::{SlotClock, TestingSlotClock}; use tokio::sync::mpsc; use tracing::info; use types::{ - BeaconState, BeaconStateBase, BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, ForkName, - Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot, + BeaconState, BeaconStateBase, BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec, + ForkContext, ForkName, Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot, data_column_sidecar::ColumnIndex, test_utils::{SeedableRng, TestRandom, XorShiftRng}, }; @@ -336,7 +337,7 @@ impl TestRig { .network_globals .peers .write() - .__add_connected_peer_testing_only(false, &self.harness.spec, key); + .__add_connected_peer_with_custody_subnets(false, &self.harness.spec, key); self.log(&format!("Added new peer for testing {peer_id:?}")); peer_id } @@ -346,7 +347,32 @@ impl TestRig { self.network_globals .peers .write() - .__add_connected_peer_testing_only(true, &self.harness.spec, key) + .__add_connected_peer_with_custody_subnets(true, &self.harness.spec, key) + } + + /// Add a connected supernode peer, but without setting the peers' custody subnet. + /// This is to simulate the real behaviour where metadata is only received some time after + /// a connection is established. + pub fn new_connected_supernode_peer_no_metadata_custody_subnet(&mut self) -> PeerId { + let key = self.determinstic_key(); + self.network_globals + .peers + .write() + .__add_connected_peer(true, key, &self.harness.spec) + } + + /// Update the peer's custody subnet in PeerDB and send a `UpdatedPeerCgc` message to sync. + pub fn send_peer_cgc_update_to_sync( + &mut self, + peer_id: &PeerId, + subnets: HashSet, + ) { + self.network_globals + .peers + .write() + .__set_custody_subnets(peer_id, subnets) + .unwrap(); + self.send_sync_message(SyncMessage::UpdatedPeerCgc(*peer_id)) } fn determinstic_key(&mut self) -> CombinedKey { diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index cb728a90c1b..580eafb9090 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -6,7 +6,7 @@ use crate::sync::manager::SLOT_IMPORT_TOLERANCE; use crate::sync::network_context::RangeRequestId; use crate::sync::range_sync::RangeSyncType; use beacon_chain::data_column_verification::CustodyDataColumn; -use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy}; +use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy, test_spec}; use beacon_chain::{EngineState, NotifyExecutionLayer, block_verification_types::RpcBlock}; use beacon_processor::WorkType; use lighthouse_network::rpc::RequestType; @@ -19,6 +19,7 @@ use lighthouse_network::service::api_types::{ SyncRequestId, }; use lighthouse_network::{PeerId, SyncInfo}; +use std::collections::HashSet; use std::time::Duration; use types::{ BlobSidecarList, BlockImportSource, Epoch, EthSpec, Hash256, MinimalEthSpec as E, @@ -98,7 +99,7 @@ impl TestRig { finalized_root, head_slot: finalized_epoch.start_slot(E::slots_per_epoch()), head_root: Hash256::random(), - earliest_available_slot: None, + earliest_available_slot: Some(Slot::new(0)), }) } @@ -110,7 +111,7 @@ impl TestRig { finalized_root: Hash256::random(), head_slot: finalized_epoch.start_slot(E::slots_per_epoch()), head_root: Hash256::random(), - earliest_available_slot: None, + earliest_available_slot: Some(Slot::new(0)), } } @@ -601,3 +602,70 @@ fn finalized_sync_not_enough_custody_peers_on_start() { let last_epoch = advanced_epochs + EXTRA_SYNCED_EPOCHS; r.complete_and_process_range_sync_until(last_epoch, filter()); } + +/// This is a regression test for the following race condition scenario: +/// 1. A node is connected to 3 supernode peers: peer 1 is synced, & peer 2 and 3 are advanced. +/// 2. No metadata has been received yet (i.e. no custody info), so the node cannot start data +/// column range sync yet. +/// 3. Now peer 1 sends the CGC via metadata response, we now have one peer on all custody subnets, +/// BUT not on the finalized syncing chain. +/// 4. The node tries to `send_batch` but fails repeatedly with `NoPeers`, as there's no peer +/// that is able to serve columns for the advanced epochs. The chain is removed after 5 failed attempts. +/// 5. Now peer 2 & 3 send CGC updates, BUT because there's no syncing chain, nothing happens - +/// sync is stuck until finding new peers. +/// +/// The expected behaivour in this scenario should be: +/// 4. not finding suitable peers, chain is kept and batch remains in AwaitingDownload +/// 5. finalized sync should resume as soon as CGC updates are received from peer 2 or 3. +#[test] +fn finalized_sync_not_enough_custody_peers_resume_after_peer_cgc_update() { + // Only run post-PeerDAS + if !test_spec::().is_fulu_scheduled() { + return; + } + let mut r = TestRig::test_setup(); + + // GIVEN: the node is connected to 3 supernode peers: + + // Peer 1 is synced (same finalized epoch) + let peer_1 = r.new_connected_supernode_peer_no_metadata_custody_subnet(); + let remote_info = r.local_info().clone(); + r.send_sync_message(SyncMessage::AddPeer(peer_1, remote_info)); + + // Peer 2 is advanced (local finalized epoch + 2) + let advanced_epochs: u64 = 2; + let peer_2 = r.new_connected_supernode_peer_no_metadata_custody_subnet(); + let remote_info = r.finalized_remote_info_advanced_by(advanced_epochs.into()); + r.send_sync_message(SyncMessage::AddPeer(peer_2, remote_info.clone())); + // We expect a finalized chain to be created with peer 2, but no requests sent out yet due to missing custody info. + r.assert_state(RangeSyncType::Finalized); + r.expect_empty_network(); + + // Peer 3 is connected and advanced + let peer_3 = r.new_connected_supernode_peer_no_metadata_custody_subnet(); + r.send_sync_message(SyncMessage::AddPeer(peer_3, remote_info)); + // We are still in finalized sync state (now with peer 3 added) + r.assert_state(RangeSyncType::Finalized); + + for (i, p) in [peer_1, peer_2, peer_3].iter().enumerate() { + let peer_idx = i + 1; + r.log(&format!("Peer {peer_idx}: {p:?}")); + } + + // WHEN: peer 1 sends its CGC via metadata response + let all_custody_subnets = (0..r.spec.data_column_sidecar_subnet_count) + .map(|i| i.into()) + .collect::>(); + r.send_peer_cgc_update_to_sync(&peer_1, all_custody_subnets.clone()); + + // We still don't have any peers on the syncing chain with custody columns (peer 1 & 2) + // The node won't send the batch and will remain in the finalized sync state (this was failing before!) + r.assert_state(RangeSyncType::Finalized); + + // Now we receive peer 2 & 3's CGC updates, the node will resume syncing from these two peers + r.send_peer_cgc_update_to_sync(&peer_2, all_custody_subnets.clone()); + r.send_peer_cgc_update_to_sync(&peer_3, all_custody_subnets); + + let last_epoch = advanced_epochs + EXTRA_SYNCED_EPOCHS; + r.complete_and_process_range_sync_until(last_epoch, filter()); +}