From ef88e87680c8824a5fb229ab6daa3830005a74e3 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 21 May 2025 14:03:37 +1000 Subject: [PATCH 1/6] Update `getBlobsV2` response type and update handling of missing blobs. --- beacon_node/beacon_chain/src/fetch_blobs.rs | 32 +++++++++++-------- .../execution_layer/src/engine_api/http.rs | 2 +- beacon_node/execution_layer/src/lib.rs | 2 +- 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/beacon_node/beacon_chain/src/fetch_blobs.rs b/beacon_node/beacon_chain/src/fetch_blobs.rs index d91f103b9de..77611282bf7 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs.rs @@ -22,7 +22,7 @@ use ssz_types::FixedVector; use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash; use std::collections::HashSet; use std::sync::Arc; -use tracing::debug; +use tracing::{debug, warn}; use types::blob_sidecar::{BlobSidecarError, FixedBlobSidecarList}; use types::data_column_sidecar::DataColumnSidecarError; use types::{ @@ -212,31 +212,37 @@ async fn fetch_and_process_blobs_v2( }) .map_err(FetchEngineBlobError::RequestFailed)?; - let (blobs, proofs): (Vec<_>, Vec<_>) = response + let Some(blobs_and_proofs) = response else { + debug!(num_expected_blobs, "No blobs fetched from the EL"); + inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL); + return Ok(None); + }; + + let (blobs, proofs): (Vec<_>, Vec<_>) = blobs_and_proofs .into_iter() - .filter_map(|blob_and_proof_opt| { - blob_and_proof_opt.map(|blob_and_proof| { - let BlobAndProofV2 { blob, proofs } = blob_and_proof; - (blob, proofs) - }) + .map(|blob_and_proof| { + let BlobAndProofV2 { blob, proofs } = blob_and_proof; + (blob, proofs) }) .unzip(); let num_fetched_blobs = blobs.len(); metrics::observe(&metrics::BLOBS_FROM_EL_RECEIVED, num_fetched_blobs as f64); - // Partial blobs response isn't useful for PeerDAS, so we don't bother building and publishing data columns. if num_fetched_blobs != num_expected_blobs { - debug!( - info = "Unable to compute data columns", - num_fetched_blobs, num_expected_blobs, "Not all blobs fetched from the EL" + // This scenario is not supposed to happen if the EL is spec compliant. + // It should either return all requested blobs or none, but NOT partial responses. + // If we attempt to compute columns with partial blobs, we'd end up with invalid columns. + warn!( + num_fetched_blobs, + num_expected_blobs, "The EL did not return all requested blobs" ); inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL); return Ok(None); - } else { - inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL); } + inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL); + if chain .canonical_head .fork_choice_read_lock() diff --git a/beacon_node/execution_layer/src/engine_api/http.rs b/beacon_node/execution_layer/src/engine_api/http.rs index bf4c391a8d8..cce6a0effe2 100644 --- a/beacon_node/execution_layer/src/engine_api/http.rs +++ b/beacon_node/execution_layer/src/engine_api/http.rs @@ -727,7 +727,7 @@ impl HttpJsonRpc { pub async fn get_blobs_v2( &self, versioned_hashes: Vec, - ) -> Result>>, Error> { + ) -> Result>>, Error> { let params = json!([versioned_hashes]); self.rpc_request( diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index ddd8bb50088..d649eae0662 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -1861,7 +1861,7 @@ impl ExecutionLayer { pub async fn get_blobs_v2( &self, query: Vec, - ) -> Result>>, Error> { + ) -> Result>>, Error> { let capabilities = self.get_engine_capabilities(None).await?; if capabilities.get_blobs_v2 { From c303d3a590e91bada210e9c86c84cff2a4bf8a40 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 21 May 2025 17:44:52 +1000 Subject: [PATCH 2/6] Refactor fetch blobs to be testable and add tests. --- Cargo.lock | 84 +++++++ Cargo.toml | 2 + beacon_node/beacon_chain/Cargo.toml | 2 + .../fetch_blobs/fetch_blobs_beacon_adapter.rs | 89 ++++++++ .../{fetch_blobs.rs => fetch_blobs/mod.rs} | 90 +++++--- .../beacon_chain/src/fetch_blobs/tests.rs | 216 ++++++++++++++++++ 6 files changed, 448 insertions(+), 35 deletions(-) create mode 100644 beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs rename beacon_node/beacon_chain/src/{fetch_blobs.rs => fetch_blobs/mod.rs} (87%) create mode 100644 beacon_node/beacon_chain/src/fetch_blobs/tests.rs diff --git a/Cargo.lock b/Cargo.lock index 30be5fa233a..1b506f62127 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -813,6 +813,8 @@ dependencies = [ "maplit", "merkle_proof", "metrics", + "mockall", + "mockall_double", "once_cell", "oneshot_broadcast", "operation_pool", @@ -2376,6 +2378,12 @@ dependencies = [ "validator_store", ] +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "dtoa" version = "1.0.10" @@ -3504,6 +3512,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619" + [[package]] name = "fs2" version = "0.4.3" @@ -5932,6 +5946,44 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9366861eb2a2c436c20b12c8dbec5f798cea6b47ad99216be0282942e2c81ea0" +[[package]] +name = "mockall" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39a6bfcc6c8c7eed5ee98b9c3e33adc726054389233e201c95dab2d41a3839d2" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25ca3004c2efe9011bd4e461bd8256445052b9615405b4f7ea43fc8ca5c20898" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.101", +] + +[[package]] +name = "mockall_double" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1ca96e5ac35256ae3e13536edd39b172b88f41615e1d7b653c8ad24524113e8" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "mockito" version = "1.7.0" @@ -6898,6 +6950,32 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "predicates" +version = "3.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5d19ee57562043d37e82899fade9a22ebab7be9cef5026b07fda9cdd4293573" +dependencies = [ + "anstyle", + "predicates-core", +] + +[[package]] +name = "predicates-core" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "727e462b119fe9c93fd0eb1429a5f7647394014cf3c04ab2c0350eeb09095ffa" + +[[package]] +name = "predicates-tree" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72dd2d6d381dfb73a193c7fca536518d7caee39fc8503f74e7dc0be0531b425c" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "pretty_reqwest_error" version = "0.1.0" @@ -8902,6 +8980,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "termtree" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683" + [[package]] name = "test_random_derive" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 86cca0a2598..15f83167151 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -163,6 +163,8 @@ logroller = "0.1.4" lru = "0.12" maplit = "1" milhouse = "0.5" +mockall = "0.13" +mockall_double = "0.3" mockito = "1.5.0" num_cpus = "1" once_cell = "1.17.1" diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 18b40cab7ef..319c6cb655a 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -21,6 +21,8 @@ test_backfill = [] criterion = { workspace = true } maplit = { workspace = true } serde_json = { workspace = true } +mockall = { workspace = true } +mockall_double = { workspace = true } [dependencies] alloy-primitives = { workspace = true } diff --git a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs new file mode 100644 index 00000000000..5af4fc3fba0 --- /dev/null +++ b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs @@ -0,0 +1,89 @@ +use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; +use crate::fetch_blobs::{EngineGetBlobsOutput, FetchEngineBlobError}; +use crate::observed_data_sidecars::DoNotObserve; +use crate::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes}; +use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2}; +#[cfg(test)] +use mockall::automock; +use std::sync::Arc; +use types::{BlobSidecar, ChainSpec, Hash256, Slot}; + +/// An adapter to the `BeaconChain` functionalities to remove `BeaconChain` from direct dependency to enable testing fetch blobs logic. +pub(crate) struct FetchBlobsBeaconAdapter { + chain: Arc>, + spec: Arc, +} + +#[cfg_attr(test, automock, allow(dead_code))] +impl FetchBlobsBeaconAdapter { + pub(crate) fn new(chain: Arc>) -> Self { + let spec = chain.spec.clone(); + Self { chain, spec } + } + + pub(crate) fn chain(&self) -> &Arc> { + &self.chain + } + + pub(crate) fn spec(&self) -> &Arc { + &self.spec + } + + pub(crate) async fn get_blobs_v1( + &self, + versioned_hashes: Vec, + ) -> Result>>, FetchEngineBlobError> { + let execution_layer = self + .chain + .execution_layer + .as_ref() + .ok_or(FetchEngineBlobError::ExecutionLayerMissing)?; + + execution_layer + .get_blobs_v1(versioned_hashes) + .await + .map_err(FetchEngineBlobError::RequestFailed) + } + + pub(crate) async fn get_blobs_v2( + &self, + versioned_hashes: Vec, + ) -> Result>>, FetchEngineBlobError> { + let execution_layer = self + .chain + .execution_layer + .as_ref() + .ok_or(FetchEngineBlobError::ExecutionLayerMissing)?; + + execution_layer + .get_blobs_v2(versioned_hashes) + .await + .map_err(FetchEngineBlobError::RequestFailed) + } + + pub(crate) fn verify_blob_for_gossip( + &self, + blob: &Arc>, + ) -> Result, GossipBlobError> { + GossipVerifiedBlob::::new(blob.clone(), blob.index, &self.chain) + } + + pub(crate) async fn process_engine_blobs( + &self, + slot: Slot, + block_root: Hash256, + blobs: EngineGetBlobsOutput, + ) -> Result { + self.chain + .process_engine_blobs(slot, block_root, blobs) + .await + .map_err(FetchEngineBlobError::BlobProcessingError) + } + + pub(crate) fn fork_choice_contains_block(&self, block_root: &Hash256) -> bool { + self.chain + .canonical_head + .fork_choice_read_lock() + .contains_block(block_root) + } +} diff --git a/beacon_node/beacon_chain/src/fetch_blobs.rs b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs similarity index 87% rename from beacon_node/beacon_chain/src/fetch_blobs.rs rename to beacon_node/beacon_chain/src/fetch_blobs/mod.rs index 77611282bf7..247773d7284 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs @@ -8,7 +8,13 @@ //! broadcasting blobs requires a much higher bandwidth, and is only done by high capacity //! supernodes. +mod fetch_blobs_beacon_adapter; +#[cfg(test)] +mod tests; + use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; +#[cfg_attr(test, double)] +use crate::fetch_blobs::fetch_blobs_beacon_adapter::FetchBlobsBeaconAdapter; use crate::kzg_utils::blobs_to_data_column_sidecars; use crate::observed_data_sidecars::DoNotObserve; use crate::{ @@ -18,6 +24,8 @@ use crate::{ use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2}; use execution_layer::Error as ExecutionLayerError; use metrics::{inc_counter, TryExt}; +#[cfg(test)] +use mockall_double::double; use ssz_types::FixedVector; use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash; use std::collections::HashSet; @@ -68,6 +76,25 @@ pub async fn fetch_and_process_engine_blobs( block: Arc>>, custody_columns: HashSet, publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, +) -> Result, FetchEngineBlobError> { + fetch_and_process_engine_blobs_inner( + FetchBlobsBeaconAdapter::new(chain), + block_root, + block, + custody_columns, + publish_fn, + ) + .await +} + +/// Internal implementation of fetch blobs, which uses `FetchBlobsBeaconAdapter` instead of +/// `BeaconChain` for better testability. +async fn fetch_and_process_engine_blobs_inner( + chain_adapter: FetchBlobsBeaconAdapter, + block_root: Hash256, + block: Arc>>, + custody_columns: HashSet, + publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, ) -> Result, FetchEngineBlobError> { let versioned_hashes = if let Some(kzg_commitments) = block .message() @@ -90,9 +117,12 @@ pub async fn fetch_and_process_engine_blobs( "Fetching blobs from the EL" ); - if chain.spec.is_peer_das_enabled_for_epoch(block.epoch()) { + if chain_adapter + .spec() + .is_peer_das_enabled_for_epoch(block.epoch()) + { fetch_and_process_blobs_v2( - chain, + chain_adapter, block_root, block, versioned_hashes, @@ -101,32 +131,33 @@ pub async fn fetch_and_process_engine_blobs( ) .await } else { - fetch_and_process_blobs_v1(chain, block_root, block, versioned_hashes, publish_fn).await + fetch_and_process_blobs_v1( + chain_adapter, + block_root, + block, + versioned_hashes, + publish_fn, + ) + .await } } async fn fetch_and_process_blobs_v1( - chain: Arc>, + chain_adapter: FetchBlobsBeaconAdapter, block_root: Hash256, block: Arc>, versioned_hashes: Vec, publish_fn: impl Fn(BlobsOrDataColumns) + Send + Sized, ) -> Result, FetchEngineBlobError> { let num_expected_blobs = versioned_hashes.len(); - let execution_layer = chain - .execution_layer - .as_ref() - .ok_or(FetchEngineBlobError::ExecutionLayerMissing)?; - metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64); debug!(num_expected_blobs, "Fetching blobs from the EL"); - let response = execution_layer + let response = chain_adapter .get_blobs_v1(versioned_hashes) .await .inspect_err(|_| { inc_counter(&metrics::BLOBS_FROM_EL_ERROR_TOTAL); - }) - .map_err(FetchEngineBlobError::RequestFailed)?; + })?; let num_fetched_blobs = response.iter().filter(|opt| opt.is_some()).count(); metrics::observe(&metrics::BLOBS_FROM_EL_RECEIVED, num_fetched_blobs as f64); @@ -148,7 +179,7 @@ async fn fetch_and_process_blobs_v1( response, signed_block_header, &kzg_commitments_proof, - &chain.spec, + chain_adapter.spec(), )?; // Gossip verify blobs before publishing. This prevents blobs with invalid KZG proofs from @@ -160,7 +191,7 @@ async fn fetch_and_process_blobs_v1( .iter() .filter_map(|opt_blob| { let blob = opt_blob.as_ref()?; - match GossipVerifiedBlob::::new(blob.clone(), blob.index, &chain) { + match chain_adapter.verify_blob_for_gossip(blob) { Ok(verified) => Some(Ok(verified)), // Ignore already seen blobs. Err(GossipBlobError::RepeatBlob { .. }) => None, @@ -176,20 +207,19 @@ async fn fetch_and_process_blobs_v1( debug!(num_fetched_blobs, "Processing engine blobs"); - let availability_processing_status = chain + let availability_processing_status = chain_adapter .process_engine_blobs( block.slot(), block_root, EngineGetBlobsOutput::Blobs(fixed_blob_sidecar_list.clone()), ) - .await - .map_err(FetchEngineBlobError::BlobProcessingError)?; + .await?; Ok(Some(availability_processing_status)) } async fn fetch_and_process_blobs_v2( - chain: Arc>, + chain_adapter: FetchBlobsBeaconAdapter, block_root: Hash256, block: Arc>, versioned_hashes: Vec, @@ -197,20 +227,15 @@ async fn fetch_and_process_blobs_v2( publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, ) -> Result, FetchEngineBlobError> { let num_expected_blobs = versioned_hashes.len(); - let execution_layer = chain - .execution_layer - .as_ref() - .ok_or(FetchEngineBlobError::ExecutionLayerMissing)?; metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64); debug!(num_expected_blobs, "Fetching blobs from the EL"); - let response = execution_layer + let response = chain_adapter .get_blobs_v2(versioned_hashes) .await .inspect_err(|_| { inc_counter(&metrics::BLOBS_FROM_EL_ERROR_TOTAL); - }) - .map_err(FetchEngineBlobError::RequestFailed)?; + })?; let Some(blobs_and_proofs) = response else { debug!(num_expected_blobs, "No blobs fetched from the EL"); @@ -243,12 +268,8 @@ async fn fetch_and_process_blobs_v2( inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL); - if chain - .canonical_head - .fork_choice_read_lock() - .contains_block(&block_root) - { - // Avoid computing columns if block has already been imported. + if chain_adapter.fork_choice_contains_block(&block_root) { + // Avoid computing columns if the block has already been imported. debug!( info = "block has already been imported", "Ignoring EL blobs response" @@ -257,7 +278,7 @@ async fn fetch_and_process_blobs_v2( } let custody_columns = compute_and_publish_data_columns( - &chain, + chain_adapter.chain(), block.clone(), blobs, proofs, @@ -268,14 +289,13 @@ async fn fetch_and_process_blobs_v2( debug!(num_fetched_blobs, "Processing engine blobs"); - let availability_processing_status = chain + let availability_processing_status = chain_adapter .process_engine_blobs( block.slot(), block_root, EngineGetBlobsOutput::CustodyColumns(custody_columns), ) - .await - .map_err(FetchEngineBlobError::BlobProcessingError)?; + .await?; Ok(Some(availability_processing_status)) } diff --git a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs new file mode 100644 index 00000000000..abaa47bf965 --- /dev/null +++ b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs @@ -0,0 +1,216 @@ +use crate::fetch_blobs::fetch_blobs_beacon_adapter::MockFetchBlobsBeaconAdapter; +use crate::fetch_blobs::{ + fetch_and_process_engine_blobs_inner, BlobsOrDataColumns, FetchEngineBlobError, +}; +use crate::test_utils::{BeaconChainHarness, EphemeralHarnessType}; +use crate::AvailabilityProcessingStatus; +use bls::Signature; +use eth2::types::BlobsBundle; +use execution_layer::json_structures::BlobAndProofV2; +use execution_layer::test_utils::generate_blobs; +use maplit::hashset; +use std::sync::{Arc, Mutex}; +use types::{ + BeaconBlockFulu, EmptyBlock, EthSpec, ForkName, Hash256, MainnetEthSpec, SignedBeaconBlock, + SignedBeaconBlockFulu, +}; + +type E = MainnetEthSpec; +type T = EphemeralHarnessType; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_fetch_blobs_v2_no_blobs_in_block() { + let mut mock_adapter = mock_beacon_adapter(); + let (publish_fn, _s) = mock_publish_fn(); + let block = SignedBeaconBlock::::Fulu(SignedBeaconBlockFulu { + message: BeaconBlockFulu::empty(mock_adapter.spec()), + signature: Signature::empty(), + }); + let block_root = block.canonical_root(); + + // Expectations: engine fetch blobs should not be triggered + mock_adapter.expect_get_blobs_v2().times(0); + mock_adapter.expect_process_engine_blobs().times(0); + + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + Arc::new(block), + custody_columns.clone(), + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); + + assert_eq!(processing_status, None); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_fetch_blobs_v2_no_blobs_returned() { + let mut mock_adapter = mock_beacon_adapter(); + let (publish_fn, _) = mock_publish_fn(); + let (block, _blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter); + let block_root = block.canonical_root(); + + // Set up expectations: no blobs in EL resposne + expect_get_blobs_v2_response(&mut mock_adapter, None); + + // Trigger fetch blobs on the block + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + block, + custody_columns.clone(), + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); + + assert_eq!(processing_status, None); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_fetch_blobs_v2_success() { + let mut mock_adapter = mock_beacon_adapter(); + let (publish_fn, publish_fn_args) = mock_publish_fn(); + let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter); + let block_root = block.canonical_root(); + + // Set up expectations: all blobs returned + expect_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs)); + expect_fork_choice_contains_block(&mut mock_adapter, vec![]); + expect_process_engine_blobs_result( + &mut mock_adapter, + Ok(AvailabilityProcessingStatus::Imported(block_root)), + ); + + // Trigger fetch blobs on the block + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + block, + custody_columns.clone(), + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); + + assert_eq!( + processing_status, + Some(AvailabilityProcessingStatus::Imported(block_root)) + ); + + let published_columns = extract_published_blobs_or_columns(publish_fn_args); + assert!( + matches!( + published_columns, + BlobsOrDataColumns::DataColumns (columns) if columns.len() == custody_columns.len() + ), + "should publish custody columns" + ); +} + +/// Extract the `BlobsOrDataColumns` passed to the `publish_fn`. +fn extract_published_blobs_or_columns( + publish_fn_args: Arc>>>, +) -> BlobsOrDataColumns { + let mut calls = publish_fn_args.lock().unwrap(); + assert_eq!(calls.len(), 1); + calls.pop().unwrap() +} + +fn expect_process_engine_blobs_result( + mock_adapter: &mut MockFetchBlobsBeaconAdapter, + result: Result, +) { + mock_adapter + .expect_process_engine_blobs() + .return_once(move |_, _, _| result); +} + +fn expect_fork_choice_contains_block( + mock_adapter: &mut MockFetchBlobsBeaconAdapter, + block_roots: Vec, +) { + mock_adapter + .expect_fork_choice_contains_block() + .returning(move |block_root| block_roots.contains(&block_root)); +} + +fn expect_get_blobs_v2_response( + mock_adapter: &mut MockFetchBlobsBeaconAdapter, + blobs_and_proofs_opt: Option>>, +) { + mock_adapter + .expect_get_blobs_v2() + .return_once(move |_| Ok(blobs_and_proofs_opt)); +} + +fn create_test_block_and_blobs( + mock_adapter: &MockFetchBlobsBeaconAdapter, +) -> (Arc>, Vec>) { + let mut block = SignedBeaconBlock::Fulu(SignedBeaconBlockFulu { + message: BeaconBlockFulu::empty(mock_adapter.spec()), + signature: Signature::empty(), + }); + let (blobs_bundle, _tx) = generate_blobs::(2, block.fork_name_unchecked()).unwrap(); + let BlobsBundle { + commitments, + proofs, + blobs, + } = blobs_bundle; + + *block + .message_mut() + .body_mut() + .blob_kzg_commitments_mut() + .unwrap() = commitments; + + let proofs_len = proofs.len() / blobs.len(); + let blob_and_proofs: Vec> = blobs + .into_iter() + .zip(proofs.chunks(proofs_len)) + .map(|(blob, proofs)| BlobAndProofV2 { + blob, + proofs: proofs.to_vec().into(), + }) + .collect(); + (Arc::new(block), blob_and_proofs) +} + +fn mock_publish_fn() -> ( + impl Fn(BlobsOrDataColumns) + Send + 'static, + Arc>>>, +) { + // Keep track of the arguments captured by `publish_fn`. + let captured_args = Arc::new(Mutex::new(vec![])); + let captured_args_clone = captured_args.clone(); + let publish_fn = move |args| { + let mut lock = captured_args_clone.lock().unwrap(); + lock.push(args); + }; + (publish_fn, captured_args) +} + +fn mock_beacon_adapter() -> MockFetchBlobsBeaconAdapter { + let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + let validator_count = 1; + // Set up a minimal beacon chain - we don't need a real chain for this test. + let harness = BeaconChainHarness::builder(E::default()) + .spec(Arc::new(spec)) + .deterministic_keypairs(validator_count) + .fresh_ephemeral_store() + .build(); + + let mut mock_adapter = MockFetchBlobsBeaconAdapter::default(); + mock_adapter + .expect_spec() + .return_const(harness.spec.clone()); + mock_adapter + .expect_chain() + .return_const(harness.chain.clone()); + mock_adapter +} From f456e562ab280c51167c7d155f853563844ba7d0 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 21 May 2025 17:51:15 +1000 Subject: [PATCH 3/6] Fix lint and cargo sort. --- beacon_node/beacon_chain/Cargo.toml | 2 +- beacon_node/beacon_chain/src/fetch_blobs/tests.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 319c6cb655a..66f4425f5b3 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -20,9 +20,9 @@ test_backfill = [] [dev-dependencies] criterion = { workspace = true } maplit = { workspace = true } -serde_json = { workspace = true } mockall = { workspace = true } mockall_double = { workspace = true } +serde_json = { workspace = true } [dependencies] alloy-primitives = { workspace = true } diff --git a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs index abaa47bf965..9863685f987 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs @@ -137,7 +137,7 @@ fn expect_fork_choice_contains_block( ) { mock_adapter .expect_fork_choice_contains_block() - .returning(move |block_root| block_roots.contains(&block_root)); + .returning(move |block_root| block_roots.contains(block_root)); } fn expect_get_blobs_v2_response( @@ -181,6 +181,7 @@ fn create_test_block_and_blobs( (Arc::new(block), blob_and_proofs) } +#[allow(clippy::type_complexity)] fn mock_publish_fn() -> ( impl Fn(BlobsOrDataColumns) + Send + 'static, Arc>>>, From 0c4fb3ef6e0f5408c6907f6eb91df9517e170b68 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Fri, 23 May 2025 12:53:36 +1000 Subject: [PATCH 4/6] Remove `BeaconChainHarness` from the fetch blob unit tests. --- .../fetch_blobs/fetch_blobs_beacon_adapter.rs | 10 +++++++ .../beacon_chain/src/fetch_blobs/mod.rs | 26 +++++++++---------- .../beacon_chain/src/fetch_blobs/tests.rs | 23 +++++++--------- 3 files changed, 31 insertions(+), 28 deletions(-) diff --git a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs index 5af4fc3fba0..6dfdf47cd12 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs @@ -3,9 +3,11 @@ use crate::fetch_blobs::{EngineGetBlobsOutput, FetchEngineBlobError}; use crate::observed_data_sidecars::DoNotObserve; use crate::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes}; use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2}; +use kzg::Kzg; #[cfg(test)] use mockall::automock; use std::sync::Arc; +use task_executor::TaskExecutor; use types::{BlobSidecar, ChainSpec, Hash256, Slot}; /// An adapter to the `BeaconChain` functionalities to remove `BeaconChain` from direct dependency to enable testing fetch blobs logic. @@ -29,6 +31,14 @@ impl FetchBlobsBeaconAdapter { &self.spec } + pub(crate) fn kzg(&self) -> &Arc { + &self.chain.kzg + } + + pub(crate) fn executor(&self) -> &TaskExecutor { + &self.chain.task_executor + } + pub(crate) async fn get_blobs_v1( &self, versioned_hashes: Vec, diff --git a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs index 247773d7284..ba798137b01 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs @@ -66,6 +66,7 @@ pub enum FetchEngineBlobError { GossipBlob(GossipBlobError), RequestFailed(ExecutionLayerError), RuntimeShutdown, + TokioJoin(tokio::task::JoinError), } /// Fetches blobs from the EL mempool and processes them. It also broadcasts unseen blobs or @@ -278,7 +279,7 @@ async fn fetch_and_process_blobs_v2( } let custody_columns = compute_and_publish_data_columns( - chain_adapter.chain(), + &chain_adapter, block.clone(), blobs, proofs, @@ -302,15 +303,17 @@ async fn fetch_and_process_blobs_v2( /// Offload the data column computation to a blocking task to avoid holding up the async runtime. async fn compute_and_publish_data_columns( - chain: &Arc>, + chain_adapter: &FetchBlobsBeaconAdapter, block: Arc>>, blobs: Vec>, proofs: Vec>, custody_columns_indices: HashSet, publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, ) -> Result, FetchEngineBlobError> { - let chain_cloned = chain.clone(); - chain + let kzg = chain_adapter.kzg().clone(); + let spec = chain_adapter.spec().clone(); + chain_adapter + .executor() .spawn_blocking_handle( move || { let mut timer = metrics::start_timer_vec( @@ -320,14 +323,9 @@ async fn compute_and_publish_data_columns( let blob_refs = blobs.iter().collect::>(); let cell_proofs = proofs.into_iter().flatten().collect(); - let data_columns_result = blobs_to_data_column_sidecars( - &blob_refs, - cell_proofs, - &block, - &chain_cloned.kzg, - &chain_cloned.spec, - ) - .discard_timer_on_break(&mut timer); + let data_columns_result = + blobs_to_data_column_sidecars(&blob_refs, cell_proofs, &block, &kzg, &spec) + .discard_timer_on_break(&mut timer); drop(timer); // This filtering ensures we only import and publish the custody columns. @@ -345,9 +343,9 @@ async fn compute_and_publish_data_columns( }, "compute_and_publish_data_columns", ) + .ok_or(FetchEngineBlobError::RuntimeShutdown)? .await - .map_err(|e| FetchEngineBlobError::BeaconChainError(Box::new(e))) - .and_then(|r| r) + .map_err(FetchEngineBlobError::TokioJoin)? } fn build_blob_sidecars( diff --git a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs index 9863685f987..5e0d4af0366 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs @@ -2,7 +2,7 @@ use crate::fetch_blobs::fetch_blobs_beacon_adapter::MockFetchBlobsBeaconAdapter; use crate::fetch_blobs::{ fetch_and_process_engine_blobs_inner, BlobsOrDataColumns, FetchEngineBlobError, }; -use crate::test_utils::{BeaconChainHarness, EphemeralHarnessType}; +use crate::test_utils::{get_kzg, EphemeralHarnessType}; use crate::AvailabilityProcessingStatus; use bls::Signature; use eth2::types::BlobsBundle; @@ -10,6 +10,7 @@ use execution_layer::json_structures::BlobAndProofV2; use execution_layer::test_utils::generate_blobs; use maplit::hashset; use std::sync::{Arc, Mutex}; +use task_executor::test_utils::TestRuntime; use types::{ BeaconBlockFulu, EmptyBlock, EthSpec, ForkName, Hash256, MainnetEthSpec, SignedBeaconBlock, SignedBeaconBlockFulu, @@ -197,21 +198,15 @@ fn mock_publish_fn() -> ( } fn mock_beacon_adapter() -> MockFetchBlobsBeaconAdapter { - let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); - let validator_count = 1; - // Set up a minimal beacon chain - we don't need a real chain for this test. - let harness = BeaconChainHarness::builder(E::default()) - .spec(Arc::new(spec)) - .deterministic_keypairs(validator_count) - .fresh_ephemeral_store() - .build(); + let test_runtime = TestRuntime::default(); + let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec())); + let kzg = get_kzg(&spec); let mut mock_adapter = MockFetchBlobsBeaconAdapter::default(); + mock_adapter.expect_spec().return_const(spec.clone()); + mock_adapter.expect_kzg().return_const(kzg.clone()); mock_adapter - .expect_spec() - .return_const(harness.spec.clone()); - mock_adapter - .expect_chain() - .return_const(harness.chain.clone()); + .expect_executor() + .return_const(test_runtime.task_executor.clone()); mock_adapter } From 40e95aa2714929000b00d2002735109240e81541 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Fri, 23 May 2025 14:22:59 +1000 Subject: [PATCH 5/6] Fix lint. --- .../src/fetch_blobs/fetch_blobs_beacon_adapter.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs index 6dfdf47cd12..d5cef2624d2 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs @@ -23,10 +23,6 @@ impl FetchBlobsBeaconAdapter { Self { chain, spec } } - pub(crate) fn chain(&self) -> &Arc> { - &self.chain - } - pub(crate) fn spec(&self) -> &Arc { &self.spec } From 6967ce5d57bcb260fccbca81a53559376ee73df2 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Fri, 23 May 2025 14:38:32 +1000 Subject: [PATCH 6/6] Add more fetch blobs tests. --- .../beacon_chain/src/fetch_blobs/tests.rs | 88 ++++++++++++++++--- 1 file changed, 77 insertions(+), 11 deletions(-) diff --git a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs index 5e0d4af0366..be3d29e9c92 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs @@ -54,8 +54,8 @@ async fn test_fetch_blobs_v2_no_blobs_returned() { let (block, _blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter); let block_root = block.canonical_root(); - // Set up expectations: no blobs in EL resposne - expect_get_blobs_v2_response(&mut mock_adapter, None); + // No blobs in EL response + mock_get_blobs_v2_response(&mut mock_adapter, None); // Trigger fetch blobs on the block let custody_columns = hashset![0, 1, 2]; @@ -72,6 +72,72 @@ async fn test_fetch_blobs_v2_no_blobs_returned() { assert_eq!(processing_status, None); } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_fetch_blobs_v2_partial_blobs_returned() { + let mut mock_adapter = mock_beacon_adapter(); + let (publish_fn, publish_fn_args) = mock_publish_fn(); + let (block, mut blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter); + let block_root = block.canonical_root(); + + // Missing blob in EL response + blobs_and_proofs.pop(); + mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs)); + // No blobs should be processed + mock_adapter.expect_process_engine_blobs().times(0); + + // Trigger fetch blobs on the block + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + block, + custody_columns.clone(), + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); + + assert_eq!(processing_status, None); + assert_eq!( + publish_fn_args.lock().unwrap().len(), + 0, + "no columns should be published" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_fetch_blobs_v2_block_imported_after_el_response() { + let mut mock_adapter = mock_beacon_adapter(); + let (publish_fn, publish_fn_args) = mock_publish_fn(); + let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter); + let block_root = block.canonical_root(); + + // All blobs returned, but fork choice already imported the block + mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs)); + mock_fork_choice_contains_block(&mut mock_adapter, vec![block.canonical_root()]); + // No blobs should be processed + mock_adapter.expect_process_engine_blobs().times(0); + + // Trigger fetch blobs on the block + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + block, + custody_columns.clone(), + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); + + assert_eq!(processing_status, None); + assert_eq!( + publish_fn_args.lock().unwrap().len(), + 0, + "no columns should be published" + ); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_fetch_blobs_v2_success() { let mut mock_adapter = mock_beacon_adapter(); @@ -79,10 +145,10 @@ async fn test_fetch_blobs_v2_success() { let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter); let block_root = block.canonical_root(); - // Set up expectations: all blobs returned - expect_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs)); - expect_fork_choice_contains_block(&mut mock_adapter, vec![]); - expect_process_engine_blobs_result( + // All blobs returned, fork choice doesn't contain block + mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs)); + mock_fork_choice_contains_block(&mut mock_adapter, vec![]); + mock_process_engine_blobs_result( &mut mock_adapter, Ok(AvailabilityProcessingStatus::Imported(block_root)), ); @@ -104,7 +170,7 @@ async fn test_fetch_blobs_v2_success() { Some(AvailabilityProcessingStatus::Imported(block_root)) ); - let published_columns = extract_published_blobs_or_columns(publish_fn_args); + let published_columns = extract_published_blobs(publish_fn_args); assert!( matches!( published_columns, @@ -115,7 +181,7 @@ async fn test_fetch_blobs_v2_success() { } /// Extract the `BlobsOrDataColumns` passed to the `publish_fn`. -fn extract_published_blobs_or_columns( +fn extract_published_blobs( publish_fn_args: Arc>>>, ) -> BlobsOrDataColumns { let mut calls = publish_fn_args.lock().unwrap(); @@ -123,7 +189,7 @@ fn extract_published_blobs_or_columns( calls.pop().unwrap() } -fn expect_process_engine_blobs_result( +fn mock_process_engine_blobs_result( mock_adapter: &mut MockFetchBlobsBeaconAdapter, result: Result, ) { @@ -132,7 +198,7 @@ fn expect_process_engine_blobs_result( .return_once(move |_, _, _| result); } -fn expect_fork_choice_contains_block( +fn mock_fork_choice_contains_block( mock_adapter: &mut MockFetchBlobsBeaconAdapter, block_roots: Vec, ) { @@ -141,7 +207,7 @@ fn expect_fork_choice_contains_block( .returning(move |block_root| block_roots.contains(block_root)); } -fn expect_get_blobs_v2_response( +fn mock_get_blobs_v2_response( mock_adapter: &mut MockFetchBlobsBeaconAdapter, blobs_and_proofs_opt: Option>>, ) {