diff --git a/Cargo.lock b/Cargo.lock index 30be5fa233a..1b506f62127 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -813,6 +813,8 @@ dependencies = [ "maplit", "merkle_proof", "metrics", + "mockall", + "mockall_double", "once_cell", "oneshot_broadcast", "operation_pool", @@ -2376,6 +2378,12 @@ dependencies = [ "validator_store", ] +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "dtoa" version = "1.0.10" @@ -3504,6 +3512,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619" + [[package]] name = "fs2" version = "0.4.3" @@ -5932,6 +5946,44 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9366861eb2a2c436c20b12c8dbec5f798cea6b47ad99216be0282942e2c81ea0" +[[package]] +name = "mockall" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39a6bfcc6c8c7eed5ee98b9c3e33adc726054389233e201c95dab2d41a3839d2" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25ca3004c2efe9011bd4e461bd8256445052b9615405b4f7ea43fc8ca5c20898" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.101", +] + +[[package]] +name = "mockall_double" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1ca96e5ac35256ae3e13536edd39b172b88f41615e1d7b653c8ad24524113e8" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "mockito" version = "1.7.0" @@ -6898,6 +6950,32 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "predicates" +version = "3.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5d19ee57562043d37e82899fade9a22ebab7be9cef5026b07fda9cdd4293573" +dependencies = [ + "anstyle", + "predicates-core", +] + +[[package]] +name = "predicates-core" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "727e462b119fe9c93fd0eb1429a5f7647394014cf3c04ab2c0350eeb09095ffa" + +[[package]] +name = "predicates-tree" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72dd2d6d381dfb73a193c7fca536518d7caee39fc8503f74e7dc0be0531b425c" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "pretty_reqwest_error" version = "0.1.0" @@ -8902,6 +8980,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "termtree" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683" + [[package]] name = "test_random_derive" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 9d7407d9eef..952b43a66b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -188,6 +188,8 @@ maplit = "1" merkle_proof = { path = "consensus/merkle_proof" } metrics = { path = "common/metrics" } milhouse = "0.5" +mockall = "0.13" +mockall_double = "0.3" mockito = "1.5.0" monitoring_api = { path = "common/monitoring_api" } network = { path = "beacon_node/network" } diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index bbe7fad6aff..1bf6ab43267 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -1,3 +1,4 @@ + [package] name = "beacon_chain" version = "0.2.0" @@ -69,6 +70,8 @@ types = { workspace = true } [dev-dependencies] criterion = { workspace = true } maplit = { workspace = true } +mockall = { workspace = true } +mockall_double = { workspace = true } serde_json = { workspace = true } [[bench]] diff --git a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs new file mode 100644 index 00000000000..d5cef2624d2 --- /dev/null +++ b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs @@ -0,0 +1,95 @@ +use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; +use crate::fetch_blobs::{EngineGetBlobsOutput, FetchEngineBlobError}; +use crate::observed_data_sidecars::DoNotObserve; +use crate::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes}; +use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2}; +use kzg::Kzg; +#[cfg(test)] +use mockall::automock; +use std::sync::Arc; +use task_executor::TaskExecutor; +use types::{BlobSidecar, ChainSpec, Hash256, Slot}; + +/// An adapter to the `BeaconChain` functionalities to remove `BeaconChain` from direct dependency to enable testing fetch blobs logic. +pub(crate) struct FetchBlobsBeaconAdapter { + chain: Arc>, + spec: Arc, +} + +#[cfg_attr(test, automock, allow(dead_code))] +impl FetchBlobsBeaconAdapter { + pub(crate) fn new(chain: Arc>) -> Self { + let spec = chain.spec.clone(); + Self { chain, spec } + } + + pub(crate) fn spec(&self) -> &Arc { + &self.spec + } + + pub(crate) fn kzg(&self) -> &Arc { + &self.chain.kzg + } + + pub(crate) fn executor(&self) -> &TaskExecutor { + &self.chain.task_executor + } + + pub(crate) async fn get_blobs_v1( + &self, + versioned_hashes: Vec, + ) -> Result>>, FetchEngineBlobError> { + let execution_layer = self + .chain + .execution_layer + .as_ref() + .ok_or(FetchEngineBlobError::ExecutionLayerMissing)?; + + execution_layer + .get_blobs_v1(versioned_hashes) + .await + .map_err(FetchEngineBlobError::RequestFailed) + } + + pub(crate) async fn get_blobs_v2( + &self, + versioned_hashes: Vec, + ) -> Result>>, FetchEngineBlobError> { + let execution_layer = self + .chain + .execution_layer + .as_ref() + .ok_or(FetchEngineBlobError::ExecutionLayerMissing)?; + + execution_layer + .get_blobs_v2(versioned_hashes) + .await + .map_err(FetchEngineBlobError::RequestFailed) + } + + pub(crate) fn verify_blob_for_gossip( + &self, + blob: &Arc>, + ) -> Result, GossipBlobError> { + GossipVerifiedBlob::::new(blob.clone(), blob.index, &self.chain) + } + + pub(crate) async fn process_engine_blobs( + &self, + slot: Slot, + block_root: Hash256, + blobs: EngineGetBlobsOutput, + ) -> Result { + self.chain + .process_engine_blobs(slot, block_root, blobs) + .await + .map_err(FetchEngineBlobError::BlobProcessingError) + } + + pub(crate) fn fork_choice_contains_block(&self, block_root: &Hash256) -> bool { + self.chain + .canonical_head + .fork_choice_read_lock() + .contains_block(block_root) + } +} diff --git a/beacon_node/beacon_chain/src/fetch_blobs.rs b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs similarity index 78% rename from beacon_node/beacon_chain/src/fetch_blobs.rs rename to beacon_node/beacon_chain/src/fetch_blobs/mod.rs index d91f103b9de..ba798137b01 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs @@ -8,7 +8,13 @@ //! broadcasting blobs requires a much higher bandwidth, and is only done by high capacity //! supernodes. +mod fetch_blobs_beacon_adapter; +#[cfg(test)] +mod tests; + use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; +#[cfg_attr(test, double)] +use crate::fetch_blobs::fetch_blobs_beacon_adapter::FetchBlobsBeaconAdapter; use crate::kzg_utils::blobs_to_data_column_sidecars; use crate::observed_data_sidecars::DoNotObserve; use crate::{ @@ -18,11 +24,13 @@ use crate::{ use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2}; use execution_layer::Error as ExecutionLayerError; use metrics::{inc_counter, TryExt}; +#[cfg(test)] +use mockall_double::double; use ssz_types::FixedVector; use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash; use std::collections::HashSet; use std::sync::Arc; -use tracing::debug; +use tracing::{debug, warn}; use types::blob_sidecar::{BlobSidecarError, FixedBlobSidecarList}; use types::data_column_sidecar::DataColumnSidecarError; use types::{ @@ -58,6 +66,7 @@ pub enum FetchEngineBlobError { GossipBlob(GossipBlobError), RequestFailed(ExecutionLayerError), RuntimeShutdown, + TokioJoin(tokio::task::JoinError), } /// Fetches blobs from the EL mempool and processes them. It also broadcasts unseen blobs or @@ -68,6 +77,25 @@ pub async fn fetch_and_process_engine_blobs( block: Arc>>, custody_columns: HashSet, publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, +) -> Result, FetchEngineBlobError> { + fetch_and_process_engine_blobs_inner( + FetchBlobsBeaconAdapter::new(chain), + block_root, + block, + custody_columns, + publish_fn, + ) + .await +} + +/// Internal implementation of fetch blobs, which uses `FetchBlobsBeaconAdapter` instead of +/// `BeaconChain` for better testability. +async fn fetch_and_process_engine_blobs_inner( + chain_adapter: FetchBlobsBeaconAdapter, + block_root: Hash256, + block: Arc>>, + custody_columns: HashSet, + publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, ) -> Result, FetchEngineBlobError> { let versioned_hashes = if let Some(kzg_commitments) = block .message() @@ -90,9 +118,12 @@ pub async fn fetch_and_process_engine_blobs( "Fetching blobs from the EL" ); - if chain.spec.is_peer_das_enabled_for_epoch(block.epoch()) { + if chain_adapter + .spec() + .is_peer_das_enabled_for_epoch(block.epoch()) + { fetch_and_process_blobs_v2( - chain, + chain_adapter, block_root, block, versioned_hashes, @@ -101,32 +132,33 @@ pub async fn fetch_and_process_engine_blobs( ) .await } else { - fetch_and_process_blobs_v1(chain, block_root, block, versioned_hashes, publish_fn).await + fetch_and_process_blobs_v1( + chain_adapter, + block_root, + block, + versioned_hashes, + publish_fn, + ) + .await } } async fn fetch_and_process_blobs_v1( - chain: Arc>, + chain_adapter: FetchBlobsBeaconAdapter, block_root: Hash256, block: Arc>, versioned_hashes: Vec, publish_fn: impl Fn(BlobsOrDataColumns) + Send + Sized, ) -> Result, FetchEngineBlobError> { let num_expected_blobs = versioned_hashes.len(); - let execution_layer = chain - .execution_layer - .as_ref() - .ok_or(FetchEngineBlobError::ExecutionLayerMissing)?; - metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64); debug!(num_expected_blobs, "Fetching blobs from the EL"); - let response = execution_layer + let response = chain_adapter .get_blobs_v1(versioned_hashes) .await .inspect_err(|_| { inc_counter(&metrics::BLOBS_FROM_EL_ERROR_TOTAL); - }) - .map_err(FetchEngineBlobError::RequestFailed)?; + })?; let num_fetched_blobs = response.iter().filter(|opt| opt.is_some()).count(); metrics::observe(&metrics::BLOBS_FROM_EL_RECEIVED, num_fetched_blobs as f64); @@ -148,7 +180,7 @@ async fn fetch_and_process_blobs_v1( response, signed_block_header, &kzg_commitments_proof, - &chain.spec, + chain_adapter.spec(), )?; // Gossip verify blobs before publishing. This prevents blobs with invalid KZG proofs from @@ -160,7 +192,7 @@ async fn fetch_and_process_blobs_v1( .iter() .filter_map(|opt_blob| { let blob = opt_blob.as_ref()?; - match GossipVerifiedBlob::::new(blob.clone(), blob.index, &chain) { + match chain_adapter.verify_blob_for_gossip(blob) { Ok(verified) => Some(Ok(verified)), // Ignore already seen blobs. Err(GossipBlobError::RepeatBlob { .. }) => None, @@ -176,20 +208,19 @@ async fn fetch_and_process_blobs_v1( debug!(num_fetched_blobs, "Processing engine blobs"); - let availability_processing_status = chain + let availability_processing_status = chain_adapter .process_engine_blobs( block.slot(), block_root, EngineGetBlobsOutput::Blobs(fixed_blob_sidecar_list.clone()), ) - .await - .map_err(FetchEngineBlobError::BlobProcessingError)?; + .await?; Ok(Some(availability_processing_status)) } async fn fetch_and_process_blobs_v2( - chain: Arc>, + chain_adapter: FetchBlobsBeaconAdapter, block_root: Hash256, block: Arc>, versioned_hashes: Vec, @@ -197,52 +228,49 @@ async fn fetch_and_process_blobs_v2( publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, ) -> Result, FetchEngineBlobError> { let num_expected_blobs = versioned_hashes.len(); - let execution_layer = chain - .execution_layer - .as_ref() - .ok_or(FetchEngineBlobError::ExecutionLayerMissing)?; metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64); debug!(num_expected_blobs, "Fetching blobs from the EL"); - let response = execution_layer + let response = chain_adapter .get_blobs_v2(versioned_hashes) .await .inspect_err(|_| { inc_counter(&metrics::BLOBS_FROM_EL_ERROR_TOTAL); - }) - .map_err(FetchEngineBlobError::RequestFailed)?; + })?; - let (blobs, proofs): (Vec<_>, Vec<_>) = response + let Some(blobs_and_proofs) = response else { + debug!(num_expected_blobs, "No blobs fetched from the EL"); + inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL); + return Ok(None); + }; + + let (blobs, proofs): (Vec<_>, Vec<_>) = blobs_and_proofs .into_iter() - .filter_map(|blob_and_proof_opt| { - blob_and_proof_opt.map(|blob_and_proof| { - let BlobAndProofV2 { blob, proofs } = blob_and_proof; - (blob, proofs) - }) + .map(|blob_and_proof| { + let BlobAndProofV2 { blob, proofs } = blob_and_proof; + (blob, proofs) }) .unzip(); let num_fetched_blobs = blobs.len(); metrics::observe(&metrics::BLOBS_FROM_EL_RECEIVED, num_fetched_blobs as f64); - // Partial blobs response isn't useful for PeerDAS, so we don't bother building and publishing data columns. if num_fetched_blobs != num_expected_blobs { - debug!( - info = "Unable to compute data columns", - num_fetched_blobs, num_expected_blobs, "Not all blobs fetched from the EL" + // This scenario is not supposed to happen if the EL is spec compliant. + // It should either return all requested blobs or none, but NOT partial responses. + // If we attempt to compute columns with partial blobs, we'd end up with invalid columns. + warn!( + num_fetched_blobs, + num_expected_blobs, "The EL did not return all requested blobs" ); inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL); return Ok(None); - } else { - inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL); } - if chain - .canonical_head - .fork_choice_read_lock() - .contains_block(&block_root) - { - // Avoid computing columns if block has already been imported. + inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL); + + if chain_adapter.fork_choice_contains_block(&block_root) { + // Avoid computing columns if the block has already been imported. debug!( info = "block has already been imported", "Ignoring EL blobs response" @@ -251,7 +279,7 @@ async fn fetch_and_process_blobs_v2( } let custody_columns = compute_and_publish_data_columns( - &chain, + &chain_adapter, block.clone(), blobs, proofs, @@ -262,29 +290,30 @@ async fn fetch_and_process_blobs_v2( debug!(num_fetched_blobs, "Processing engine blobs"); - let availability_processing_status = chain + let availability_processing_status = chain_adapter .process_engine_blobs( block.slot(), block_root, EngineGetBlobsOutput::CustodyColumns(custody_columns), ) - .await - .map_err(FetchEngineBlobError::BlobProcessingError)?; + .await?; Ok(Some(availability_processing_status)) } /// Offload the data column computation to a blocking task to avoid holding up the async runtime. async fn compute_and_publish_data_columns( - chain: &Arc>, + chain_adapter: &FetchBlobsBeaconAdapter, block: Arc>>, blobs: Vec>, proofs: Vec>, custody_columns_indices: HashSet, publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, ) -> Result, FetchEngineBlobError> { - let chain_cloned = chain.clone(); - chain + let kzg = chain_adapter.kzg().clone(); + let spec = chain_adapter.spec().clone(); + chain_adapter + .executor() .spawn_blocking_handle( move || { let mut timer = metrics::start_timer_vec( @@ -294,14 +323,9 @@ async fn compute_and_publish_data_columns( let blob_refs = blobs.iter().collect::>(); let cell_proofs = proofs.into_iter().flatten().collect(); - let data_columns_result = blobs_to_data_column_sidecars( - &blob_refs, - cell_proofs, - &block, - &chain_cloned.kzg, - &chain_cloned.spec, - ) - .discard_timer_on_break(&mut timer); + let data_columns_result = + blobs_to_data_column_sidecars(&blob_refs, cell_proofs, &block, &kzg, &spec) + .discard_timer_on_break(&mut timer); drop(timer); // This filtering ensures we only import and publish the custody columns. @@ -319,9 +343,9 @@ async fn compute_and_publish_data_columns( }, "compute_and_publish_data_columns", ) + .ok_or(FetchEngineBlobError::RuntimeShutdown)? .await - .map_err(|e| FetchEngineBlobError::BeaconChainError(Box::new(e))) - .and_then(|r| r) + .map_err(FetchEngineBlobError::TokioJoin)? } fn build_blob_sidecars( diff --git a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs new file mode 100644 index 00000000000..be3d29e9c92 --- /dev/null +++ b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs @@ -0,0 +1,278 @@ +use crate::fetch_blobs::fetch_blobs_beacon_adapter::MockFetchBlobsBeaconAdapter; +use crate::fetch_blobs::{ + fetch_and_process_engine_blobs_inner, BlobsOrDataColumns, FetchEngineBlobError, +}; +use crate::test_utils::{get_kzg, EphemeralHarnessType}; +use crate::AvailabilityProcessingStatus; +use bls::Signature; +use eth2::types::BlobsBundle; +use execution_layer::json_structures::BlobAndProofV2; +use execution_layer::test_utils::generate_blobs; +use maplit::hashset; +use std::sync::{Arc, Mutex}; +use task_executor::test_utils::TestRuntime; +use types::{ + BeaconBlockFulu, EmptyBlock, EthSpec, ForkName, Hash256, MainnetEthSpec, SignedBeaconBlock, + SignedBeaconBlockFulu, +}; + +type E = MainnetEthSpec; +type T = EphemeralHarnessType; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_fetch_blobs_v2_no_blobs_in_block() { + let mut mock_adapter = mock_beacon_adapter(); + let (publish_fn, _s) = mock_publish_fn(); + let block = SignedBeaconBlock::::Fulu(SignedBeaconBlockFulu { + message: BeaconBlockFulu::empty(mock_adapter.spec()), + signature: Signature::empty(), + }); + let block_root = block.canonical_root(); + + // Expectations: engine fetch blobs should not be triggered + mock_adapter.expect_get_blobs_v2().times(0); + mock_adapter.expect_process_engine_blobs().times(0); + + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + Arc::new(block), + custody_columns.clone(), + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); + + assert_eq!(processing_status, None); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_fetch_blobs_v2_no_blobs_returned() { + let mut mock_adapter = mock_beacon_adapter(); + let (publish_fn, _) = mock_publish_fn(); + let (block, _blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter); + let block_root = block.canonical_root(); + + // No blobs in EL response + mock_get_blobs_v2_response(&mut mock_adapter, None); + + // Trigger fetch blobs on the block + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + block, + custody_columns.clone(), + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); + + assert_eq!(processing_status, None); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_fetch_blobs_v2_partial_blobs_returned() { + let mut mock_adapter = mock_beacon_adapter(); + let (publish_fn, publish_fn_args) = mock_publish_fn(); + let (block, mut blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter); + let block_root = block.canonical_root(); + + // Missing blob in EL response + blobs_and_proofs.pop(); + mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs)); + // No blobs should be processed + mock_adapter.expect_process_engine_blobs().times(0); + + // Trigger fetch blobs on the block + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + block, + custody_columns.clone(), + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); + + assert_eq!(processing_status, None); + assert_eq!( + publish_fn_args.lock().unwrap().len(), + 0, + "no columns should be published" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_fetch_blobs_v2_block_imported_after_el_response() { + let mut mock_adapter = mock_beacon_adapter(); + let (publish_fn, publish_fn_args) = mock_publish_fn(); + let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter); + let block_root = block.canonical_root(); + + // All blobs returned, but fork choice already imported the block + mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs)); + mock_fork_choice_contains_block(&mut mock_adapter, vec![block.canonical_root()]); + // No blobs should be processed + mock_adapter.expect_process_engine_blobs().times(0); + + // Trigger fetch blobs on the block + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + block, + custody_columns.clone(), + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); + + assert_eq!(processing_status, None); + assert_eq!( + publish_fn_args.lock().unwrap().len(), + 0, + "no columns should be published" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_fetch_blobs_v2_success() { + let mut mock_adapter = mock_beacon_adapter(); + let (publish_fn, publish_fn_args) = mock_publish_fn(); + let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter); + let block_root = block.canonical_root(); + + // All blobs returned, fork choice doesn't contain block + mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs)); + mock_fork_choice_contains_block(&mut mock_adapter, vec![]); + mock_process_engine_blobs_result( + &mut mock_adapter, + Ok(AvailabilityProcessingStatus::Imported(block_root)), + ); + + // Trigger fetch blobs on the block + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + block, + custody_columns.clone(), + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); + + assert_eq!( + processing_status, + Some(AvailabilityProcessingStatus::Imported(block_root)) + ); + + let published_columns = extract_published_blobs(publish_fn_args); + assert!( + matches!( + published_columns, + BlobsOrDataColumns::DataColumns (columns) if columns.len() == custody_columns.len() + ), + "should publish custody columns" + ); +} + +/// Extract the `BlobsOrDataColumns` passed to the `publish_fn`. +fn extract_published_blobs( + publish_fn_args: Arc>>>, +) -> BlobsOrDataColumns { + let mut calls = publish_fn_args.lock().unwrap(); + assert_eq!(calls.len(), 1); + calls.pop().unwrap() +} + +fn mock_process_engine_blobs_result( + mock_adapter: &mut MockFetchBlobsBeaconAdapter, + result: Result, +) { + mock_adapter + .expect_process_engine_blobs() + .return_once(move |_, _, _| result); +} + +fn mock_fork_choice_contains_block( + mock_adapter: &mut MockFetchBlobsBeaconAdapter, + block_roots: Vec, +) { + mock_adapter + .expect_fork_choice_contains_block() + .returning(move |block_root| block_roots.contains(block_root)); +} + +fn mock_get_blobs_v2_response( + mock_adapter: &mut MockFetchBlobsBeaconAdapter, + blobs_and_proofs_opt: Option>>, +) { + mock_adapter + .expect_get_blobs_v2() + .return_once(move |_| Ok(blobs_and_proofs_opt)); +} + +fn create_test_block_and_blobs( + mock_adapter: &MockFetchBlobsBeaconAdapter, +) -> (Arc>, Vec>) { + let mut block = SignedBeaconBlock::Fulu(SignedBeaconBlockFulu { + message: BeaconBlockFulu::empty(mock_adapter.spec()), + signature: Signature::empty(), + }); + let (blobs_bundle, _tx) = generate_blobs::(2, block.fork_name_unchecked()).unwrap(); + let BlobsBundle { + commitments, + proofs, + blobs, + } = blobs_bundle; + + *block + .message_mut() + .body_mut() + .blob_kzg_commitments_mut() + .unwrap() = commitments; + + let proofs_len = proofs.len() / blobs.len(); + let blob_and_proofs: Vec> = blobs + .into_iter() + .zip(proofs.chunks(proofs_len)) + .map(|(blob, proofs)| BlobAndProofV2 { + blob, + proofs: proofs.to_vec().into(), + }) + .collect(); + (Arc::new(block), blob_and_proofs) +} + +#[allow(clippy::type_complexity)] +fn mock_publish_fn() -> ( + impl Fn(BlobsOrDataColumns) + Send + 'static, + Arc>>>, +) { + // Keep track of the arguments captured by `publish_fn`. + let captured_args = Arc::new(Mutex::new(vec![])); + let captured_args_clone = captured_args.clone(); + let publish_fn = move |args| { + let mut lock = captured_args_clone.lock().unwrap(); + lock.push(args); + }; + (publish_fn, captured_args) +} + +fn mock_beacon_adapter() -> MockFetchBlobsBeaconAdapter { + let test_runtime = TestRuntime::default(); + let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec())); + let kzg = get_kzg(&spec); + + let mut mock_adapter = MockFetchBlobsBeaconAdapter::default(); + mock_adapter.expect_spec().return_const(spec.clone()); + mock_adapter.expect_kzg().return_const(kzg.clone()); + mock_adapter + .expect_executor() + .return_const(test_runtime.task_executor.clone()); + mock_adapter +} diff --git a/beacon_node/execution_layer/src/engine_api/http.rs b/beacon_node/execution_layer/src/engine_api/http.rs index c79036ba614..300713fdca4 100644 --- a/beacon_node/execution_layer/src/engine_api/http.rs +++ b/beacon_node/execution_layer/src/engine_api/http.rs @@ -727,7 +727,7 @@ impl HttpJsonRpc { pub async fn get_blobs_v2( &self, versioned_hashes: Vec, - ) -> Result>>, Error> { + ) -> Result>>, Error> { let params = json!([versioned_hashes]); self.rpc_request( diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 4761c47d41f..cf751138d63 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -1864,7 +1864,7 @@ impl ExecutionLayer { pub async fn get_blobs_v2( &self, query: Vec, - ) -> Result>>, Error> { + ) -> Result>>, Error> { let capabilities = self.get_engine_capabilities(None).await?; if capabilities.get_blobs_v2 {