diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 434a73420dc..415462ab548 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3075,14 +3075,35 @@ impl BeaconChain { publish_fn: impl FnOnce() -> Result<(), BlockError>, ) -> Result { let block_root = execution_proof.block_root(); + let proof_id = execution_proof.subnet_id(); + let slot = execution_proof.slot(); + + info!( + ?block_root, + ?proof_id, + %slot, + "[ZKVM-DEBUG] process_gossip_execution_proof called" + ); // If this block has already been imported to forkchoice it must have been available, so // we don't need to process its execution proofs again. - if self + let in_fork_choice = self .canonical_head .fork_choice_read_lock() - .contains_block(&block_root) - { + .contains_block(&block_root); + + info!( + ?block_root, + in_fork_choice = in_fork_choice, + "[ZKVM-DEBUG] process_gossip_execution_proof: fork choice check" + ); + + if in_fork_choice { + warn!( + ?block_root, + ?proof_id, + "[ZKVM-DEBUG] process_gossip_execution_proof: REJECTING - block already in fork choice" + ); return Err(BlockError::DuplicateFullyImported(block_root)); } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 8359de354d9..56f57f78b41 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -18,7 +18,7 @@ use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; use task_executor::TaskExecutor; -use tracing::{debug, error, instrument, warn}; +use tracing::{debug, error, info, instrument, warn}; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; use types::{ BlobSidecarList, BlockImportSource, ChainSpec, DataColumnSidecar, DataColumnSidecarList, Epoch, @@ -388,17 +388,17 @@ impl DataAvailabilityChecker { block_root: Hash256, proofs: Vec>, ) -> Result, AvailabilityCheckError> { - debug!( + info!( ?block_root, num_proofs = proofs.len(), - "Verifying and storing execution proofs in DA checker" + "[ZKVM-DEBUG] put_rpc_execution_proofs called" ); // If no verifier registry is configured, skip verification let Some(verifier_registry) = &self.verifier_registry else { - debug!( + info!( ?block_root, - "No verifier registry configured, storing proofs without verification" + "[ZKVM-DEBUG] No verifier registry configured, storing proofs without verification" ); let owned_proofs = proofs.iter().map(|p| (**p).clone()); return self @@ -406,6 +406,27 @@ impl DataAvailabilityChecker { .put_verified_execution_proofs(block_root, owned_proofs); }; + // Check what's in the cache for this block + let cache_status = self + .availability_cache + .peek_pending_components(&block_root, |components| { + match components { + None => "no_entry".to_string(), + Some(c) => { + let has_block = c.block.is_some(); + let has_exec_hash = c.block.as_ref().and_then(|b| b.execution_payload_hash()).is_some(); + let num_proofs = c.get_cached_execution_proofs().len(); + format!("has_block={}, has_exec_hash={}, cached_proofs={}", has_block, has_exec_hash, num_proofs) + } + } + }); + + info!( + ?block_root, + cache_status = %cache_status, + "[ZKVM-DEBUG] DA cache status for block" + ); + // Get the execution payload hash from the block let execution_payload_hash = self .availability_cache @@ -415,21 +436,29 @@ impl DataAvailabilityChecker { .ok_or_else(|| { warn!( ?block_root, - "Cannot verify proofs: block not in cache or has no execution payload" + "[ZKVM-DEBUG] Cannot verify proofs: block not in cache or has no execution payload" ); AvailabilityCheckError::MissingExecutionPayload })?; - debug!( + info!( ?block_root, ?execution_payload_hash, - "Got execution payload hash for proof verification" + "[ZKVM-DEBUG] Got execution payload hash for proof verification" ); let mut verified_proofs = Vec::new(); for proof in proofs { let proof_id = proof.proof_id; + info!( + ?block_root, + ?proof_id, + proof_block_hash = ?proof.block_hash, + ?execution_payload_hash, + "[ZKVM-DEBUG] Checking proof hash match" + ); + // Check that the proof's block_hash matches the execution payload hash if proof.block_hash != execution_payload_hash { warn!( @@ -437,7 +466,7 @@ impl DataAvailabilityChecker { ?proof_id, proof_hash = ?proof.block_hash, ?execution_payload_hash, - "Proof execution payload hash mismatch" + "[ZKVM-DEBUG] Proof execution payload hash mismatch" ); return Err(AvailabilityCheckError::ExecutionPayloadHashMismatch { proof_hash: proof.block_hash, @@ -446,21 +475,21 @@ impl DataAvailabilityChecker { } let verifier = verifier_registry.get_verifier(proof_id).ok_or_else(|| { - warn!(?proof_id, "No verifier registered for proof ID"); + warn!(?proof_id, "[ZKVM-DEBUG] No verifier registered for proof ID"); AvailabilityCheckError::UnsupportedProofID(proof_id) })?; // Verify the proof (proof contains block_hash internally) match verifier.verify(&proof) { Ok(true) => { - debug!(?proof_id, ?block_root, "Proof verification succeeded"); + info!(?proof_id, ?block_root, "[ZKVM-DEBUG] Proof verification succeeded"); verified_proofs.push((*proof).clone()); } Ok(false) => { warn!( ?proof_id, ?block_root, - "Proof verification failed: proof is invalid" + "[ZKVM-DEBUG] Proof verification failed: proof is invalid" ); return Err(AvailabilityCheckError::InvalidProof { proof_id, @@ -472,7 +501,7 @@ impl DataAvailabilityChecker { ?proof_id, ?block_root, error = ?e, - "Proof verification error" + "[ZKVM-DEBUG] Proof verification error" ); return Err(AvailabilityCheckError::ProofVerificationError( e.to_string(), @@ -481,14 +510,22 @@ impl DataAvailabilityChecker { } } - debug!( + info!( ?block_root, verified_count = verified_proofs.len(), - "All proofs verified successfully" + "[ZKVM-DEBUG] All proofs verified, calling put_verified_execution_proofs" ); - self.availability_cache - .put_verified_execution_proofs(block_root, verified_proofs) + let result = self.availability_cache + .put_verified_execution_proofs(block_root, verified_proofs); + + info!( + ?block_root, + result = ?result.as_ref().map(|a| format!("{:?}", a)), + "[ZKVM-DEBUG] put_verified_execution_proofs result" + ); + + result } /// Check if we've cached other blobs for this block. If it completes a set and we also diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 5eebffdd2c4..a50ab6eab5b 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -16,12 +16,12 @@ use ssz_types::{RuntimeFixedVector, RuntimeVariableList}; use std::cmp::Ordering; use std::num::NonZeroUsize; use std::sync::Arc; -use tracing::{Span, debug, debug_span}; +use tracing::{Span, debug, debug_span, info}; use types::beacon_block_body::KzgCommitments; use types::blob_sidecar::BlobIdentifier; use types::{ BlobSidecar, BlockImportSource, ChainSpec, ColumnIndex, DataColumnSidecar, - DataColumnSidecarList, Epoch, EthSpec, Hash256, SignedBeaconBlock, + DataColumnSidecarList, Epoch, EthSpec, ExecPayload, Hash256, SignedBeaconBlock, }; #[derive(Clone)] @@ -280,8 +280,20 @@ impl PendingComponents { &Span, ) -> Result, AvailabilityCheckError>, { + info!( + block_root = ?self.block_root, + has_block = self.block.is_some(), + num_proofs = self.verified_execution_proofs.len(), + num_columns = self.verified_data_columns.len(), + "[ZKVM-DEBUG] make_available called" + ); + let Some(CachedBlock::Executed(block)) = &self.block else { // Block not available yet + info!( + block_root = ?self.block_root, + "[ZKVM-DEBUG] make_available: block not executed yet, returning None" + ); return Ok(None); }; @@ -352,13 +364,37 @@ impl PendingComponents { // Check if this node needs execution proofs to validate blocks. let needs_execution_proofs = spec.zkvm_min_proofs_required().is_some(); + info!( + block_root = ?self.block_root, + needs_execution_proofs = needs_execution_proofs, + zkvm_min_proofs_required = ?spec.zkvm_min_proofs_required(), + "[ZKVM-DEBUG] make_available: checking execution proof requirements" + ); + + if needs_execution_proofs { let min_proofs = spec.zkvm_min_proofs_required().unwrap(); let num_proofs = self.execution_proof_subnet_count(); + info!( + block_root = ?self.block_root, + min_proofs = min_proofs, + num_proofs = num_proofs, + "[ZKVM-DEBUG] make_available: execution proof count check" + ); if num_proofs < min_proofs { // Not enough execution proofs yet + info!( + block_root = ?self.block_root, + min_proofs = min_proofs, + num_proofs = num_proofs, + "[ZKVM-DEBUG] make_available: NOT ENOUGH PROOFS, returning None" + ); return Ok(None); } + info!( + block_root = ?self.block_root, + "[ZKVM-DEBUG] make_available: execution proof requirement satisfied!" + ); } // Block is available, construct `AvailableExecutedBlock` @@ -661,10 +697,30 @@ impl DataAvailabilityCheckerInner { let mut execution_proofs = execution_proofs.into_iter().peekable(); if execution_proofs.peek().is_none() { - // No proofs to process + info!( + ?block_root, + "[ZKVM-DEBUG] put_verified_execution_proofs: no proofs to process" + ); return Ok(Availability::MissingComponents(block_root)); } + // Check if block already exists in cache + let existing_status = self + .critical + .read() + .peek(&block_root) + .map(|pending| { + let has_block = pending.block.is_some(); + let existing_proofs = pending.execution_proof_subnet_count(); + format!("has_block={}, existing_proofs={}", has_block, existing_proofs) + }); + + info!( + ?block_root, + existing_status = ?existing_status, + "[ZKVM-DEBUG] put_verified_execution_proofs: checking existing cache entry" + ); + // Try to get epoch from existing pending components (if block already arrived) // Otherwise use Epoch::new(0) as placeholder (will be corrected when block arrives) // Also the component cannot be marked as available, if the block is missing @@ -683,6 +739,14 @@ impl DataAvailabilityCheckerInner { let num_expected_columns_opt = self.get_num_expected_columns(epoch); + info!( + ?block_root, + has_block = pending_components.block.is_some(), + num_proofs_after_merge = pending_components.execution_proof_subnet_count(), + num_columns = pending_components.verified_data_columns.len(), + "[ZKVM-DEBUG] put_verified_execution_proofs: after merge, calling check_availability" + ); + pending_components.span.in_scope(|| { debug!( component = "execution_proofs", @@ -692,11 +756,19 @@ impl DataAvailabilityCheckerInner { ); }); - self.check_availability_and_cache_components( + let result = self.check_availability_and_cache_components( block_root, pending_components, num_expected_columns_opt, - ) + ); + + info!( + ?block_root, + result = ?result.as_ref().map(|a| format!("{:?}", a)), + "[ZKVM-DEBUG] put_verified_execution_proofs: check_availability result" + ); + + result } fn check_availability_and_cache_components( @@ -863,6 +935,35 @@ impl DataAvailabilityCheckerInner { ) -> Result, AvailabilityCheckError> { let epoch = executed_block.as_block().epoch(); let block_root = executed_block.import_data.block_root; + let slot = executed_block.as_block().slot(); + let exec_payload_hash = executed_block.as_block() + .message() + .body() + .execution_payload() + .ok() + .map(|p| p.block_hash()); + + info!( + ?block_root, + %slot, + %epoch, + execution_payload_hash = ?exec_payload_hash, + "[ZKVM-DEBUG] put_executed_block called" + ); + + // Check if we already have proofs waiting for this block + let existing_proofs = self + .critical + .read() + .peek(&block_root) + .map(|pending| pending.execution_proof_subnet_count()) + .unwrap_or(0); + + info!( + ?block_root, + existing_proofs = existing_proofs, + "[ZKVM-DEBUG] put_executed_block: proofs already cached for this block" + ); // register the block to get the diet block let diet_executed_block = self @@ -877,6 +978,14 @@ impl DataAvailabilityCheckerInner { let num_expected_columns_opt = self.get_num_expected_columns(epoch); + info!( + ?block_root, + has_block = pending_components.block.is_some(), + num_proofs = pending_components.execution_proof_subnet_count(), + num_columns = pending_components.verified_data_columns.len(), + "[ZKVM-DEBUG] put_executed_block: after merge, checking availability" + ); + pending_components.span.in_scope(|| { debug!( component = "block", @@ -885,11 +994,19 @@ impl DataAvailabilityCheckerInner { ); }); - self.check_availability_and_cache_components( + let result = self.check_availability_and_cache_components( block_root, pending_components, num_expected_columns_opt, - ) + ); + + info!( + ?block_root, + result = ?result.as_ref().map(|a| format!("{:?}", a)), + "[ZKVM-DEBUG] put_executed_block: availability check result" + ); + + result } fn get_num_expected_columns(&self, epoch: Epoch) -> Option { diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 1b280d54035..52b98d4d3c7 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -606,6 +606,7 @@ impl PeerManager { Protocol::DataColumnsByRoot => PeerAction::MidToleranceError, Protocol::DataColumnsByRange => PeerAction::MidToleranceError, Protocol::ExecutionProofsByRoot => PeerAction::MidToleranceError, + Protocol::ExecutionProofsByRange => PeerAction::MidToleranceError, Protocol::Goodbye => PeerAction::LowToleranceError, Protocol::MetaData => PeerAction::LowToleranceError, Protocol::Status => PeerAction::LowToleranceError, @@ -627,6 +628,7 @@ impl PeerManager { Protocol::DataColumnsByRoot => return, Protocol::DataColumnsByRange => return, Protocol::ExecutionProofsByRoot => return, + Protocol::ExecutionProofsByRange => return, Protocol::Goodbye => return, Protocol::LightClientBootstrap => return, Protocol::LightClientOptimisticUpdate => return, @@ -651,6 +653,7 @@ impl PeerManager { Protocol::DataColumnsByRoot => PeerAction::MidToleranceError, Protocol::DataColumnsByRange => PeerAction::MidToleranceError, Protocol::ExecutionProofsByRoot => PeerAction::MidToleranceError, + Protocol::ExecutionProofsByRange => PeerAction::MidToleranceError, Protocol::LightClientBootstrap => return, Protocol::LightClientOptimisticUpdate => return, Protocol::LightClientFinalityUpdate => return, diff --git a/beacon_node/lighthouse_network/src/rpc/codec.rs b/beacon_node/lighthouse_network/src/rpc/codec.rs index b3401038df8..6679885ecbe 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec.rs @@ -81,6 +81,7 @@ impl SSZSnappyInboundCodec { RpcSuccessResponse::DataColumnsByRoot(res) => res.as_ssz_bytes(), RpcSuccessResponse::DataColumnsByRange(res) => res.as_ssz_bytes(), RpcSuccessResponse::ExecutionProofsByRoot(res) => res.as_ssz_bytes(), + RpcSuccessResponse::ExecutionProofsByRange(res) => res.as_ssz_bytes(), RpcSuccessResponse::LightClientBootstrap(res) => res.as_ssz_bytes(), RpcSuccessResponse::LightClientOptimisticUpdate(res) => res.as_ssz_bytes(), RpcSuccessResponse::LightClientFinalityUpdate(res) => res.as_ssz_bytes(), @@ -362,6 +363,7 @@ impl Encoder> for SSZSnappyOutboundCodec { RequestType::DataColumnsByRange(req) => req.as_ssz_bytes(), RequestType::DataColumnsByRoot(req) => req.data_column_ids.as_ssz_bytes(), RequestType::ExecutionProofsByRoot(req) => req.as_ssz_bytes(), + RequestType::ExecutionProofsByRange(req) => req.as_ssz_bytes(), RequestType::Ping(req) => req.as_ssz_bytes(), RequestType::LightClientBootstrap(req) => req.as_ssz_bytes(), RequestType::LightClientUpdatesByRange(req) => req.as_ssz_bytes(), @@ -578,6 +580,11 @@ fn handle_rpc_request( Ok(Some(RequestType::ExecutionProofsByRoot(request))) } + SupportedProtocol::ExecutionProofsByRangeV1 => { + Ok(Some(RequestType::ExecutionProofsByRange( + ExecutionProofsByRangeRequest::from_ssz_bytes(decoded_buffer)?, + ))) + } SupportedProtocol::PingV1 => Ok(Some(RequestType::Ping(Ping { data: u64::from_ssz_bytes(decoded_buffer)?, }))), @@ -746,6 +753,11 @@ fn handle_rpc_response( ExecutionProof::from_ssz_bytes(decoded_buffer)?, )))) } + SupportedProtocol::ExecutionProofsByRangeV1 => { + Ok(Some(RpcSuccessResponse::ExecutionProofsByRange(Arc::new( + ExecutionProof::from_ssz_bytes(decoded_buffer)?, + )))) + } SupportedProtocol::PingV1 => Ok(Some(RpcSuccessResponse::Pong(Ping { data: u64::from_ssz_bytes(decoded_buffer)?, }))), diff --git a/beacon_node/lighthouse_network/src/rpc/config.rs b/beacon_node/lighthouse_network/src/rpc/config.rs index d23c16f8fa1..a7bc9b69bec 100644 --- a/beacon_node/lighthouse_network/src/rpc/config.rs +++ b/beacon_node/lighthouse_network/src/rpc/config.rs @@ -94,6 +94,7 @@ pub struct RateLimiterConfig { pub(super) data_columns_by_root_quota: Quota, pub(super) data_columns_by_range_quota: Quota, pub(super) execution_proofs_by_root_quota: Quota, + pub(super) execution_proofs_by_range_quota: Quota, pub(super) light_client_bootstrap_quota: Quota, pub(super) light_client_optimistic_update_quota: Quota, pub(super) light_client_finality_update_quota: Quota, @@ -126,6 +127,9 @@ impl RateLimiterConfig { // TODO(zkproofs): Configure this to be less arbitrary pub const DEFAULT_EXECUTION_PROOFS_BY_ROOT_QUOTA: Quota = Quota::n_every(NonZeroU64::new(128).unwrap(), 10); + // TODO(zkproofs): Configure this to be less arbitrary + pub const DEFAULT_EXECUTION_PROOFS_BY_RANGE_QUOTA: Quota = + Quota::n_every(NonZeroU64::new(128).unwrap(), 10); pub const DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA: Quota = Quota::one_every(10); pub const DEFAULT_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUOTA: Quota = Quota::one_every(10); pub const DEFAULT_LIGHT_CLIENT_FINALITY_UPDATE_QUOTA: Quota = Quota::one_every(10); @@ -146,6 +150,7 @@ impl Default for RateLimiterConfig { data_columns_by_root_quota: Self::DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA, data_columns_by_range_quota: Self::DEFAULT_DATA_COLUMNS_BY_RANGE_QUOTA, execution_proofs_by_root_quota: Self::DEFAULT_EXECUTION_PROOFS_BY_ROOT_QUOTA, + execution_proofs_by_range_quota: Self::DEFAULT_EXECUTION_PROOFS_BY_RANGE_QUOTA, light_client_bootstrap_quota: Self::DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA, light_client_optimistic_update_quota: Self::DEFAULT_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUOTA, @@ -207,6 +212,7 @@ impl FromStr for RateLimiterConfig { let mut data_columns_by_root_quota = None; let mut data_columns_by_range_quota = None; let mut execution_proofs_by_root_quota = None; + let mut execution_proofs_by_range_quota = None; let mut light_client_bootstrap_quota = None; let mut light_client_optimistic_update_quota = None; let mut light_client_finality_update_quota = None; @@ -231,6 +237,9 @@ impl FromStr for RateLimiterConfig { Protocol::ExecutionProofsByRoot => { execution_proofs_by_root_quota = execution_proofs_by_root_quota.or(quota) } + Protocol::ExecutionProofsByRange => { + execution_proofs_by_range_quota = execution_proofs_by_range_quota.or(quota) + } Protocol::Ping => ping_quota = ping_quota.or(quota), Protocol::MetaData => meta_data_quota = meta_data_quota.or(quota), Protocol::LightClientBootstrap => { @@ -268,6 +277,8 @@ impl FromStr for RateLimiterConfig { .unwrap_or(Self::DEFAULT_DATA_COLUMNS_BY_RANGE_QUOTA), execution_proofs_by_root_quota: execution_proofs_by_root_quota .unwrap_or(Self::DEFAULT_EXECUTION_PROOFS_BY_ROOT_QUOTA), + execution_proofs_by_range_quota: execution_proofs_by_range_quota + .unwrap_or(Self::DEFAULT_EXECUTION_PROOFS_BY_RANGE_QUOTA), light_client_bootstrap_quota: light_client_bootstrap_quota .unwrap_or(Self::DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA), light_client_optimistic_update_quota: light_client_optimistic_update_quota diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 9ba8f66dafa..00cdc32d451 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -603,6 +603,25 @@ impl ExecutionProofsByRootRequest { } } +/// Request execution proofs for a range of slots. +#[derive(Encode, Decode, Clone, Debug, PartialEq)] +pub struct ExecutionProofsByRangeRequest { + /// The starting slot to request execution proofs. + pub start_slot: Slot, + /// The number of slots to request proofs for. + pub count: u64, +} + +impl ExecutionProofsByRangeRequest { + pub fn new(start_slot: Slot, count: u64) -> Self { + Self { start_slot, count } + } + + pub fn max_requested(&self) -> u64 { + self.count + } +} + /// Request a number of beacon data columns from a peer. #[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct LightClientUpdatesByRangeRequest { @@ -673,6 +692,9 @@ pub enum RpcSuccessResponse { /// A response to a get EXECUTION_PROOFS_BY_ROOT request. ExecutionProofsByRoot(Arc), + /// A response to a get EXECUTION_PROOFS_BY_RANGE request. + ExecutionProofsByRange(Arc), + /// A PONG response to a PING request. Pong(Ping), @@ -704,6 +726,9 @@ pub enum ResponseTermination { /// Execution proofs by root stream termination. ExecutionProofsByRoot, + /// Execution proofs by range stream termination. + ExecutionProofsByRange, + /// Light client updates by range stream termination. LightClientUpdatesByRange, } @@ -718,6 +743,7 @@ impl ResponseTermination { ResponseTermination::DataColumnsByRoot => Protocol::DataColumnsByRoot, ResponseTermination::DataColumnsByRange => Protocol::DataColumnsByRange, ResponseTermination::ExecutionProofsByRoot => Protocol::ExecutionProofsByRoot, + ResponseTermination::ExecutionProofsByRange => Protocol::ExecutionProofsByRange, ResponseTermination::LightClientUpdatesByRange => Protocol::LightClientUpdatesByRange, } } @@ -814,6 +840,7 @@ impl RpcSuccessResponse { RpcSuccessResponse::DataColumnsByRoot(_) => Protocol::DataColumnsByRoot, RpcSuccessResponse::DataColumnsByRange(_) => Protocol::DataColumnsByRange, RpcSuccessResponse::ExecutionProofsByRoot(_) => Protocol::ExecutionProofsByRoot, + RpcSuccessResponse::ExecutionProofsByRange(_) => Protocol::ExecutionProofsByRange, RpcSuccessResponse::Pong(_) => Protocol::Ping, RpcSuccessResponse::MetaData(_) => Protocol::MetaData, RpcSuccessResponse::LightClientBootstrap(_) => Protocol::LightClientBootstrap, @@ -840,6 +867,7 @@ impl RpcSuccessResponse { Self::LightClientUpdatesByRange(r) => Some(r.attested_header_slot()), // TODO(zkproofs): Change this when we add Slot to ExecutionProof Self::ExecutionProofsByRoot(_) + | Self::ExecutionProofsByRange(_) | Self::MetaData(_) | Self::Status(_) | Self::Pong(_) => None, @@ -905,6 +933,9 @@ impl std::fmt::Display for RpcSuccessResponse { RpcSuccessResponse::ExecutionProofsByRoot(proof) => { write!(f, "ExecutionProofsByRoot: Block root: {}", proof.block_root) } + RpcSuccessResponse::ExecutionProofsByRange(proof) => { + write!(f, "ExecutionProofsByRange: Block root: {}", proof.block_root) + } RpcSuccessResponse::Pong(ping) => write!(f, "Pong: {}", ping.data), RpcSuccessResponse::MetaData(metadata) => { write!(f, "Metadata: {}", metadata.seq_number()) @@ -1027,3 +1058,13 @@ impl std::fmt::Display for ExecutionProofsByRootRequest { ) } } + +impl std::fmt::Display for ExecutionProofsByRangeRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Request: ExecutionProofsByRange: Start Slot: {}, Count: {}", + self.start_slot, self.count + ) + } +} diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index dfa44976390..d1025d88918 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -254,6 +254,9 @@ pub enum Protocol { /// The `ExecutionProofsByRoot` protocol name. #[strum(serialize = "execution_proofs_by_root")] ExecutionProofsByRoot, + /// The `ExecutionProofsByRange` protocol name. + #[strum(serialize = "execution_proofs_by_range")] + ExecutionProofsByRange, /// The `Ping` protocol name. Ping, /// The `MetaData` protocol name. @@ -285,6 +288,7 @@ impl Protocol { Protocol::DataColumnsByRoot => Some(ResponseTermination::DataColumnsByRoot), Protocol::DataColumnsByRange => Some(ResponseTermination::DataColumnsByRange), Protocol::ExecutionProofsByRoot => Some(ResponseTermination::ExecutionProofsByRoot), + Protocol::ExecutionProofsByRange => Some(ResponseTermination::ExecutionProofsByRange), Protocol::Ping => None, Protocol::MetaData => None, Protocol::LightClientBootstrap => None, @@ -316,6 +320,7 @@ pub enum SupportedProtocol { DataColumnsByRootV1, DataColumnsByRangeV1, ExecutionProofsByRootV1, + ExecutionProofsByRangeV1, PingV1, MetaDataV1, MetaDataV2, @@ -341,6 +346,7 @@ impl SupportedProtocol { SupportedProtocol::DataColumnsByRootV1 => "1", SupportedProtocol::DataColumnsByRangeV1 => "1", SupportedProtocol::ExecutionProofsByRootV1 => "1", + SupportedProtocol::ExecutionProofsByRangeV1 => "1", SupportedProtocol::PingV1 => "1", SupportedProtocol::MetaDataV1 => "1", SupportedProtocol::MetaDataV2 => "2", @@ -366,6 +372,7 @@ impl SupportedProtocol { SupportedProtocol::DataColumnsByRootV1 => Protocol::DataColumnsByRoot, SupportedProtocol::DataColumnsByRangeV1 => Protocol::DataColumnsByRange, SupportedProtocol::ExecutionProofsByRootV1 => Protocol::ExecutionProofsByRoot, + SupportedProtocol::ExecutionProofsByRangeV1 => Protocol::ExecutionProofsByRange, SupportedProtocol::PingV1 => Protocol::Ping, SupportedProtocol::MetaDataV1 => Protocol::MetaData, SupportedProtocol::MetaDataV2 => Protocol::MetaData, @@ -417,10 +424,10 @@ impl SupportedProtocol { ]); } if fork_context.spec.is_zkvm_enabled() { - supported.push(ProtocolId::new( - SupportedProtocol::ExecutionProofsByRootV1, - Encoding::SSZSnappy, - )); + supported.extend_from_slice(&[ + ProtocolId::new(SupportedProtocol::ExecutionProofsByRootV1, Encoding::SSZSnappy), + ProtocolId::new(SupportedProtocol::ExecutionProofsByRangeV1, Encoding::SSZSnappy), + ]); } supported } @@ -535,6 +542,10 @@ impl ProtocolId { DataColumnsByRangeRequest::ssz_max_len::(), ), Protocol::ExecutionProofsByRoot => RpcLimits::new(0, spec.max_blocks_by_root_request), + Protocol::ExecutionProofsByRange => RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), + ), Protocol::Ping => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -572,6 +583,7 @@ impl ProtocolId { rpc_data_column_limits::(fork_context.current_fork_epoch(), &fork_context.spec) } Protocol::ExecutionProofsByRoot => rpc_execution_proof_limits(), + Protocol::ExecutionProofsByRange => rpc_execution_proof_limits(), Protocol::Ping => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -605,6 +617,7 @@ impl ProtocolId { | SupportedProtocol::BlobsByRootV1 | SupportedProtocol::DataColumnsByRootV1 | SupportedProtocol::DataColumnsByRangeV1 + | SupportedProtocol::ExecutionProofsByRangeV1 | SupportedProtocol::LightClientBootstrapV1 | SupportedProtocol::LightClientOptimisticUpdateV1 | SupportedProtocol::LightClientFinalityUpdateV1 @@ -748,6 +761,7 @@ pub enum RequestType { DataColumnsByRoot(DataColumnsByRootRequest), DataColumnsByRange(DataColumnsByRangeRequest), ExecutionProofsByRoot(ExecutionProofsByRootRequest), + ExecutionProofsByRange(ExecutionProofsByRangeRequest), LightClientBootstrap(LightClientBootstrapRequest), LightClientOptimisticUpdate, LightClientFinalityUpdate, @@ -772,6 +786,7 @@ impl RequestType { RequestType::DataColumnsByRoot(req) => req.max_requested() as u64, RequestType::DataColumnsByRange(req) => req.max_requested::(), RequestType::ExecutionProofsByRoot(req) => req.max_requested() as u64, + RequestType::ExecutionProofsByRange(req) => req.count, RequestType::Ping(_) => 1, RequestType::MetaData(_) => 1, RequestType::LightClientBootstrap(_) => 1, @@ -802,6 +817,7 @@ impl RequestType { RequestType::DataColumnsByRoot(_) => SupportedProtocol::DataColumnsByRootV1, RequestType::DataColumnsByRange(_) => SupportedProtocol::DataColumnsByRangeV1, RequestType::ExecutionProofsByRoot(_) => SupportedProtocol::ExecutionProofsByRootV1, + RequestType::ExecutionProofsByRange(_) => SupportedProtocol::ExecutionProofsByRangeV1, RequestType::Ping(_) => SupportedProtocol::PingV1, RequestType::MetaData(req) => match req { MetadataRequest::V1(_) => SupportedProtocol::MetaDataV1, @@ -834,6 +850,7 @@ impl RequestType { RequestType::DataColumnsByRoot(_) => ResponseTermination::DataColumnsByRoot, RequestType::DataColumnsByRange(_) => ResponseTermination::DataColumnsByRange, RequestType::ExecutionProofsByRoot(_) => ResponseTermination::ExecutionProofsByRoot, + RequestType::ExecutionProofsByRange(_) => ResponseTermination::ExecutionProofsByRange, RequestType::Status(_) => unreachable!(), RequestType::Goodbye(_) => unreachable!(), RequestType::Ping(_) => unreachable!(), @@ -884,6 +901,10 @@ impl RequestType { SupportedProtocol::ExecutionProofsByRootV1, Encoding::SSZSnappy, )], + RequestType::ExecutionProofsByRange(_) => vec![ProtocolId::new( + SupportedProtocol::ExecutionProofsByRangeV1, + Encoding::SSZSnappy, + )], RequestType::Ping(_) => vec![ProtocolId::new( SupportedProtocol::PingV1, Encoding::SSZSnappy, @@ -923,6 +944,7 @@ impl RequestType { RequestType::DataColumnsByRoot(_) => false, RequestType::DataColumnsByRange(_) => false, RequestType::ExecutionProofsByRoot(_) => false, + RequestType::ExecutionProofsByRange(_) => false, RequestType::Ping(_) => true, RequestType::MetaData(_) => true, RequestType::LightClientBootstrap(_) => true, @@ -1039,6 +1061,9 @@ impl std::fmt::Display for RequestType { RequestType::ExecutionProofsByRoot(req) => { write!(f, "Execution proofs by root: {:?}", req) } + RequestType::ExecutionProofsByRange(req) => { + write!(f, "Execution proofs by range: {:?}", req) + } RequestType::Ping(ping) => write!(f, "Ping: {}", ping.data), RequestType::MetaData(_) => write!(f, "MetaData request"), RequestType::LightClientBootstrap(bootstrap) => { diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index f70b29cfe45..9dfbc668c89 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -107,6 +107,8 @@ pub struct RPCRateLimiter { dcbrange_rl: Limiter, /// ExecutionProofsByRoot rate limiter. execution_proofs_by_root_rl: Limiter, + /// ExecutionProofsByRange rate limiter. + execution_proofs_by_range_rl: Limiter, /// LightClientBootstrap rate limiter. lc_bootstrap_rl: Limiter, /// LightClientOptimisticUpdate rate limiter. @@ -152,6 +154,8 @@ pub struct RPCRateLimiterBuilder { dcbrange_quota: Option, /// Quota for the ExecutionProofsByRoot protocol. execution_proofs_by_root_quota: Option, + /// Quota for the ExecutionProofsByRange protocol. + execution_proofs_by_range_quota: Option, /// Quota for the LightClientBootstrap protocol. lcbootstrap_quota: Option, /// Quota for the LightClientOptimisticUpdate protocol. @@ -178,6 +182,7 @@ impl RPCRateLimiterBuilder { Protocol::DataColumnsByRoot => self.dcbroot_quota = q, Protocol::DataColumnsByRange => self.dcbrange_quota = q, Protocol::ExecutionProofsByRoot => self.execution_proofs_by_root_quota = q, + Protocol::ExecutionProofsByRange => self.execution_proofs_by_range_quota = q, Protocol::LightClientBootstrap => self.lcbootstrap_quota = q, Protocol::LightClientOptimisticUpdate => self.lc_optimistic_update_quota = q, Protocol::LightClientFinalityUpdate => self.lc_finality_update_quota = q, @@ -230,6 +235,10 @@ impl RPCRateLimiterBuilder { .execution_proofs_by_root_quota .ok_or("ExecutionProofsByRoot quota not specified")?; + let execution_proofs_by_range_quota = self + .execution_proofs_by_range_quota + .ok_or("ExecutionProofsByRange quota not specified")?; + // create the rate limiters let ping_rl = Limiter::from_quota(ping_quota)?; let metadata_rl = Limiter::from_quota(metadata_quota)?; @@ -242,6 +251,7 @@ impl RPCRateLimiterBuilder { let dcbroot_rl = Limiter::from_quota(dcbroot_quota)?; let dcbrange_rl = Limiter::from_quota(dcbrange_quota)?; let execution_proofs_by_root_rl = Limiter::from_quota(execution_proofs_by_root_quota)?; + let execution_proofs_by_range_rl = Limiter::from_quota(execution_proofs_by_range_quota)?; let lc_bootstrap_rl = Limiter::from_quota(lc_bootstrap_quota)?; let lc_optimistic_update_rl = Limiter::from_quota(lc_optimistic_update_quota)?; let lc_finality_update_rl = Limiter::from_quota(lc_finality_update_quota)?; @@ -266,6 +276,7 @@ impl RPCRateLimiterBuilder { dcbroot_rl, dcbrange_rl, execution_proofs_by_root_rl, + execution_proofs_by_range_rl, lc_bootstrap_rl, lc_optimistic_update_rl, lc_finality_update_rl, @@ -320,6 +331,7 @@ impl RPCRateLimiter { data_columns_by_root_quota, data_columns_by_range_quota, execution_proofs_by_root_quota, + execution_proofs_by_range_quota, light_client_bootstrap_quota, light_client_optimistic_update_quota, light_client_finality_update_quota, @@ -341,6 +353,10 @@ impl RPCRateLimiter { Protocol::ExecutionProofsByRoot, execution_proofs_by_root_quota, ) + .set_quota( + Protocol::ExecutionProofsByRange, + execution_proofs_by_range_quota, + ) .set_quota(Protocol::LightClientBootstrap, light_client_bootstrap_quota) .set_quota( Protocol::LightClientOptimisticUpdate, @@ -389,6 +405,7 @@ impl RPCRateLimiter { Protocol::DataColumnsByRoot => &mut self.dcbroot_rl, Protocol::DataColumnsByRange => &mut self.dcbrange_rl, Protocol::ExecutionProofsByRoot => &mut self.execution_proofs_by_root_rl, + Protocol::ExecutionProofsByRange => &mut self.execution_proofs_by_range_rl, Protocol::LightClientBootstrap => &mut self.lc_bootstrap_rl, Protocol::LightClientOptimisticUpdate => &mut self.lc_optimistic_update_rl, Protocol::LightClientFinalityUpdate => &mut self.lc_finality_update_rl, @@ -414,6 +431,7 @@ impl RPCRateLimiter { dcbroot_rl, dcbrange_rl, execution_proofs_by_root_rl, + execution_proofs_by_range_rl, lc_bootstrap_rl, lc_optimistic_update_rl, lc_finality_update_rl, @@ -432,6 +450,7 @@ impl RPCRateLimiter { dcbrange_rl.prune(time_since_start); dcbroot_rl.prune(time_since_start); execution_proofs_by_root_rl.prune(time_since_start); + execution_proofs_by_range_rl.prune(time_since_start); lc_bootstrap_rl.prune(time_since_start); lc_optimistic_update_rl.prune(time_since_start); lc_finality_update_rl.prune(time_since_start); diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index d97506653b5..6ac7d84ea30 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -32,6 +32,8 @@ pub enum SyncRequestId { BlobsByRange(BlobsByRangeRequestId), /// Data columns by range request DataColumnsByRange(DataColumnsByRangeRequestId), + /// Execution proofs by range request + ExecutionProofsByRange(ExecutionProofsByRangeRequestId), } /// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly. @@ -71,6 +73,14 @@ pub struct DataColumnsByRangeRequestId { pub peer: PeerId, } +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct ExecutionProofsByRangeRequestId { + /// Id to identify this attempt at an execution_proofs_by_range request for `parent_request_id` + pub id: Id, + /// The Id of the overall By Range request for block components. + pub parent_request_id: ComponentsByRangeRequestId, +} + #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum DataColumnsByRangeRequester { ComponentsByRange(ComponentsByRangeRequestId), @@ -168,6 +178,8 @@ pub enum Response { DataColumnsByRoot(Option>>), /// A response to a get EXECUTION_PROOFS_BY_ROOT request. ExecutionProofsByRoot(Option>), + /// A response to a get EXECUTION_PROOFS_BY_RANGE request. + ExecutionProofsByRange(Option>), /// A response to a LightClientUpdate request. LightClientBootstrap(Arc>), /// A response to a LightClientOptimisticUpdate request. @@ -209,6 +221,10 @@ impl std::convert::From> for RpcResponse { Some(p) => RpcResponse::Success(RpcSuccessResponse::ExecutionProofsByRoot(p)), None => RpcResponse::StreamTermination(ResponseTermination::ExecutionProofsByRoot), }, + Response::ExecutionProofsByRange(r) => match r { + Some(p) => RpcResponse::Success(RpcSuccessResponse::ExecutionProofsByRange(p)), + None => RpcResponse::StreamTermination(ResponseTermination::ExecutionProofsByRange), + }, Response::Status(s) => RpcResponse::Success(RpcSuccessResponse::Status(s)), Response::LightClientBootstrap(b) => { RpcResponse::Success(RpcSuccessResponse::LightClientBootstrap(b)) @@ -245,6 +261,7 @@ macro_rules! impl_display { impl_display!(BlocksByRangeRequestId, "{}/{}", id, parent_request_id); impl_display!(BlobsByRangeRequestId, "{}/{}", id, parent_request_id); impl_display!(DataColumnsByRangeRequestId, "{}/{}", id, parent_request_id); +impl_display!(ExecutionProofsByRangeRequestId, "{}/{}", id, parent_request_id); impl_display!(ComponentsByRangeRequestId, "{}/{}", id, requester); impl_display!(DataColumnsByRootRequestId, "{}/{}", id, requester); impl_display!(SingleLookupReqId, "{}/Lookup/{}", req_id, lookup_id); diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 9f1530ec732..d934131b647 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1580,6 +1580,17 @@ impl Network { request_type, }) } + RequestType::ExecutionProofsByRange(_) => { + metrics::inc_counter_vec( + &metrics::TOTAL_RPC_REQUESTS, + &["execution_proofs_by_range"], + ); + Some(NetworkEvent::RequestReceived { + peer_id, + inbound_request_id, + request_type, + }) + } RequestType::LightClientBootstrap(_) => { metrics::inc_counter_vec( &metrics::TOTAL_RPC_REQUESTS, @@ -1670,6 +1681,11 @@ impl Network { peer_id, Response::ExecutionProofsByRoot(Some(resp)), ), + RpcSuccessResponse::ExecutionProofsByRange(resp) => self.build_response( + id, + peer_id, + Response::ExecutionProofsByRange(Some(resp)), + ), // Should never be reached RpcSuccessResponse::LightClientBootstrap(bootstrap) => { self.build_response(id, peer_id, Response::LightClientBootstrap(bootstrap)) @@ -1702,6 +1718,9 @@ impl Network { ResponseTermination::ExecutionProofsByRoot => { Response::ExecutionProofsByRoot(None) } + ResponseTermination::ExecutionProofsByRange => { + Response::ExecutionProofsByRange(None) + } ResponseTermination::LightClientUpdatesByRange => { Response::LightClientUpdatesByRange(None) } diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index eb02ddad921..6bdead59728 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -323,6 +323,9 @@ impl Router { Response::ExecutionProofsByRoot(execution_proof) => { self.on_execution_proofs_by_root_response(peer_id, app_request_id, execution_proof); } + Response::ExecutionProofsByRange(execution_proof) => { + self.on_execution_proofs_by_range_response(peer_id, app_request_id, execution_proof); + } // Light client responses should not be received Response::LightClientBootstrap(_) | Response::LightClientOptimisticUpdate(_) @@ -727,6 +730,22 @@ impl Router { }); } + /// Handle an `ExecutionProofsByRange` response from the peer. + /// + /// TODO(zkproofs): The full implementation should integrate + /// with range sync to couple proofs with blocks during batch sync. + pub fn on_execution_proofs_by_range_response( + &mut self, + peer_id: PeerId, + _app_request_id: AppRequestId, + _execution_proof: Option>, + ) { + trace!( + %peer_id, + "Received ExecutionProofsByRange Response" + ); + } + /// Handle a `DataColumnsByRoot` response from the peer. pub fn on_data_columns_by_root_response( &mut self, diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index ed9a11a03de..9bb00a2d70a 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -5,6 +5,7 @@ use lighthouse_network::{ PeerId, service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, + ExecutionProofsByRangeRequestId, }, }; use ssz_types::RuntimeVariableList; @@ -12,7 +13,7 @@ use std::{collections::HashMap, sync::Arc}; use tracing::{Span, debug}; use types::{ BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, - Hash256, SignedBeaconBlock, + ExecutionProof, Hash256, SignedBeaconBlock, }; use crate::sync::network_context::MAX_COLUMN_RETRIES; @@ -33,6 +34,9 @@ pub struct RangeBlockComponentsRequest { blocks_request: ByRangeRequest>>>, /// Sidecars we have received awaiting for their corresponding block. block_data_request: RangeBlockDataRequest, + /// Execution proofs request (optional, only when zkVM verification is enabled). + execution_proofs_request: + Option>>>, /// Span to track the range request and all children range requests. pub(crate) request_span: Span, } @@ -76,6 +80,7 @@ impl RangeBlockComponentsRequest { /// * `blocks_req_id` - Request ID for the blocks /// * `blobs_req_id` - Optional request ID for blobs (pre-Fulu fork) /// * `data_columns` - Optional tuple of (request_id->column_indices pairs, expected_custody_columns) for Fulu fork + /// * `execution_proofs_req_id` - Optional request ID for execution proofs (zkVM verification) #[allow(clippy::type_complexity)] pub fn new( blocks_req_id: BlocksByRangeRequestId, @@ -84,6 +89,7 @@ impl RangeBlockComponentsRequest { Vec<(DataColumnsByRangeRequestId, Vec)>, Vec, )>, + execution_proofs_req_id: Option, request_span: Span, ) -> Self { let block_data_request = if let Some(blobs_req_id) = blobs_req_id { @@ -103,9 +109,13 @@ impl RangeBlockComponentsRequest { RangeBlockDataRequest::NoData }; + let execution_proofs_request = + execution_proofs_req_id.map(ByRangeRequest::Active); + Self { blocks_request: ByRangeRequest::Active(blocks_req_id), block_data_request, + execution_proofs_request, request_span, } } @@ -187,6 +197,29 @@ impl RangeBlockComponentsRequest { } } + /// Adds received execution proofs to the request. + /// + /// Returns an error if this request does not expect execution proofs, + /// or if the request ID doesn't match. + pub fn add_execution_proofs( + &mut self, + req_id: ExecutionProofsByRangeRequestId, + proofs: Vec>, + ) -> Result<(), String> { + match &mut self.execution_proofs_request { + None => Err("received execution proofs but none expected".to_owned()), + Some(req) => req.finish(req_id, proofs), + } + } + + /// Returns the execution proofs if the request is complete. + /// TODO(zkproofs): currently unused because everything is not hooked up + pub fn get_execution_proofs(&self) -> Option<&Vec>> { + self.execution_proofs_request + .as_ref() + .and_then(|req| req.to_finished()) + } + /// Attempts to construct RPC blocks from all received components. /// /// Returns `None` if not all expected requests have completed. @@ -200,6 +233,13 @@ impl RangeBlockComponentsRequest { return None; }; + // If execution proofs are expected, check if they have been received + if let Some(proofs_request) = &self.execution_proofs_request { + if proofs_request.to_finished().is_none() { + return None; + } + } + // Increment the attempt once this function returns the response or errors match &mut self.block_data_request { RangeBlockDataRequest::NoData => { @@ -529,7 +569,7 @@ mod tests { let blocks_req_id = blocks_id(components_id()); let mut info = - RangeBlockComponentsRequest::::new(blocks_req_id, None, None, Span::none()); + RangeBlockComponentsRequest::::new(blocks_req_id, None, None, None, Span::none()); // Send blocks and complete terminate response info.add_blocks(blocks_req_id, blocks).unwrap(); @@ -557,6 +597,7 @@ mod tests { blocks_req_id, Some(blobs_req_id), None, + None, Span::none(), ); @@ -567,7 +608,7 @@ mod tests { // Assert response is finished and RpcBlocks can be constructed, even if blobs weren't returned. // This makes sure we don't expect blobs here when they have expired. Checking this logic should - // be hendled elsewhere. + // be handled elsewhere. info.responses(&test_spec::()).unwrap().unwrap(); } @@ -606,6 +647,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expects_custody_columns.clone())), + None, Span::none(), ); // Send blocks and complete terminate response @@ -674,6 +716,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expects_custody_columns.clone())), + None, Span::none(), ); @@ -762,6 +805,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_custody_columns.clone())), + None, Span::none(), ); @@ -848,6 +892,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_custody_columns.clone())), + None, Span::none(), ); @@ -941,6 +986,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_custody_columns.clone())), + None, Span::none(), ); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index c0af69d7a40..b5cfd21ad6f 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -61,7 +61,8 @@ use lighthouse_network::service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId, - DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, + DataColumnsByRootRequester, ExecutionProofsByRangeRequestId, Id, SingleLookupReqId, + SyncRequestId, }; use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::{PeerAction, PeerId}; @@ -518,6 +519,9 @@ impl SyncManager { SyncRequestId::DataColumnsByRange(req_id) => { self.on_data_columns_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)) } + SyncRequestId::ExecutionProofsByRange(req_id) => { + self.on_execution_proofs_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)) + } } } @@ -1352,6 +1356,24 @@ impl SyncManager { } } + fn on_execution_proofs_by_range_response( + &mut self, + id: ExecutionProofsByRangeRequestId, + peer_id: PeerId, + execution_proof: RpcEvent>, + ) { + if let Some(resp) = self + .network + .on_execution_proofs_by_range_response(id, peer_id, execution_proof) + { + self.on_range_components_response( + id.parent_request_id, + peer_id, + RangeBlockComponent::ExecutionProofs(id, resp), + ); + } + } + fn on_custody_by_root_result( &mut self, requester: CustodyRequester, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 0943787c925..ca357c8e00c 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -28,7 +28,8 @@ use lighthouse_network::service::api_types::{ AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId, - DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, + DataColumnsByRootRequester, ExecutionProofsByRangeRequestId, Id, SingleLookupReqId, + SyncRequestId, }; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Subnet}; use lighthouse_tracing::{SPAN_OUTGOING_BLOCK_BY_ROOT_REQUEST, SPAN_OUTGOING_RANGE_REQUEST}; @@ -37,7 +38,8 @@ pub use requests::LookupVerifyError; use requests::{ ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems, BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, - ExecutionProofsByRootRequestItems, ExecutionProofsByRootSingleBlockRequest, + ExecutionProofsByRangeRequestItems, ExecutionProofsByRootRequestItems, + ExecutionProofsByRootSingleBlockRequest, }; #[cfg(test)] use slot_clock::SlotClock; @@ -217,6 +219,9 @@ pub struct SyncNetworkContext { /// A mapping of active DataColumnsByRange requests data_columns_by_range_requests: ActiveRequests>, + /// A mapping of active ExecutionProofsByRange requests + execution_proofs_by_range_requests: + ActiveRequests, /// Mapping of active custody column requests for a block root custody_by_root_requests: FnvHashMap>, @@ -254,6 +259,10 @@ pub enum RangeBlockComponent { DataColumnsByRangeRequestId, RpcResponseResult>>>, ), + ExecutionProofs( + ExecutionProofsByRangeRequestId, + RpcResponseResult>>, + ), } #[cfg(test)] @@ -303,6 +312,7 @@ impl SyncNetworkContext { blocks_by_range_requests: ActiveRequests::new("blocks_by_range"), blobs_by_range_requests: ActiveRequests::new("blobs_by_range"), data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"), + execution_proofs_by_range_requests: ActiveRequests::new("execution_proofs_by_range"), custody_by_root_requests: <_>::default(), components_by_range_requests: FnvHashMap::default(), custody_backfill_data_column_batch_requests: FnvHashMap::default(), @@ -332,6 +342,7 @@ impl SyncNetworkContext { blocks_by_range_requests, blobs_by_range_requests, data_columns_by_range_requests, + execution_proofs_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests @@ -371,6 +382,10 @@ impl SyncNetworkContext { .active_requests_of_peer(peer_id) .into_iter() .map(|req_id| SyncRequestId::DataColumnsByRange(*req_id)); + let execution_proofs_by_range_ids = execution_proofs_by_range_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|req_id| SyncRequestId::ExecutionProofsByRange(*req_id)); blocks_by_root_ids .chain(blobs_by_root_ids) .chain(data_column_by_root_ids) @@ -378,6 +393,7 @@ impl SyncNetworkContext { .chain(blocks_by_range_ids) .chain(blobs_by_range_ids) .chain(data_column_by_range_ids) + .chain(execution_proofs_by_range_ids) .collect() } @@ -435,6 +451,7 @@ impl SyncNetworkContext { blocks_by_range_requests, blobs_by_range_requests, data_columns_by_range_requests, + execution_proofs_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests @@ -458,6 +475,7 @@ impl SyncNetworkContext { .chain(blocks_by_range_requests.iter_request_peers()) .chain(blobs_by_range_requests.iter_request_peers()) .chain(data_columns_by_range_requests.iter_request_peers()) + .chain(execution_proofs_by_range_requests.iter_request_peers()) { *active_request_count_by_peer.entry(peer_id).or_default() += 1; } @@ -672,6 +690,8 @@ impl SyncNetworkContext { .transpose()?; let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); + // TODO(zkproofs): Add execution proofs by range request when zkVM verification is enabled + let execution_proofs_req_id: Option = None; let info = RangeBlockComponentsRequest::new( blocks_req_id, blobs_req_id, @@ -681,6 +701,7 @@ impl SyncNetworkContext { self.chain.sampling_columns_for_epoch(epoch).to_vec(), ) }), + execution_proofs_req_id, range_request_span, ); self.components_by_range_requests.insert(id, info); @@ -783,6 +804,17 @@ impl SyncNetworkContext { }) }) } + RangeBlockComponent::ExecutionProofs(req_id, resp) => { + resp.and_then(|(proofs, _)| { + request + .add_execution_proofs(req_id, proofs) + .map_err(|e| { + RpcResponseError::BlockComponentCouplingError( + CouplingError::InternalError(e), + ) + }) + }) + } } } { entry.remove(); @@ -1640,6 +1672,18 @@ impl SyncNetworkContext { self.on_rpc_response_result(id, "DataColumnsByRange", resp, peer_id, |d| d.len()) } + pub(crate) fn on_execution_proofs_by_range_response( + &mut self, + id: ExecutionProofsByRangeRequestId, + peer_id: PeerId, + rpc_event: RpcEvent>, + ) -> Option>>> { + let resp = self + .execution_proofs_by_range_requests + .on_response(id, rpc_event); + self.on_rpc_response_result(id, "ExecutionProofsByRange", resp, peer_id, |d| d.len()) + } + fn on_rpc_response_result usize>( &mut self, id: I, diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 63249ed2a4b..238e551659d 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -15,6 +15,7 @@ pub use data_columns_by_range::DataColumnsByRangeRequestItems; pub use data_columns_by_root::{ DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest, }; +pub use execution_proofs_by_range::ExecutionProofsByRangeRequestItems; pub use execution_proofs_by_root::{ ExecutionProofsByRootRequestItems, ExecutionProofsByRootSingleBlockRequest, }; @@ -29,6 +30,7 @@ mod blocks_by_range; mod blocks_by_root; mod data_columns_by_range; mod data_columns_by_root; +mod execution_proofs_by_range; mod execution_proofs_by_root; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] diff --git a/beacon_node/network/src/sync/network_context/requests/execution_proofs_by_range.rs b/beacon_node/network/src/sync/network_context/requests/execution_proofs_by_range.rs new file mode 100644 index 00000000000..4444dfd2c7e --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests/execution_proofs_by_range.rs @@ -0,0 +1,51 @@ +use super::{ActiveRequestItems, LookupVerifyError}; +use lighthouse_network::rpc::methods::ExecutionProofsByRangeRequest; +use std::sync::Arc; +use types::ExecutionProof; + +/// Accumulates results of an execution_proofs_by_range request. Only returns items after receiving +/// the stream termination. +pub struct ExecutionProofsByRangeRequestItems { + request: ExecutionProofsByRangeRequest, + items: Vec>, +} + +impl ExecutionProofsByRangeRequestItems { + pub fn new(request: ExecutionProofsByRangeRequest) -> Self { + Self { + request, + items: vec![], + } + } +} + +impl ActiveRequestItems for ExecutionProofsByRangeRequestItems { + type Item = Arc; + + fn add(&mut self, proof: Self::Item) -> Result { + // TODO(zkproofs): Add proper validation + // For now, just check the slot is within the requested range + if proof.slot < self.request.start_slot + || proof.slot >= self.request.start_slot + self.request.count + { + return Err(LookupVerifyError::UnrequestedSlot(proof.slot)); + } + + // Check for duplicate proofs + if self.items.iter().any(|existing| { + existing.slot == proof.slot && existing.proof_id == proof.proof_id + }) { + return Err(LookupVerifyError::DuplicatedProofIDs(proof.proof_id)); + } + + self.items.push(proof); + + // We can't know exactly how many proofs to expect, so we never return true here. + // The stream termination will signal completion. + Ok(false) + } + + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) + } +} diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index c4137191744..e38c90e0dd2 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -961,6 +961,58 @@ impl, Cold: ItemStore> HotColdDB )); } + /// Store execution proofs for a block. + pub fn put_execution_proofs( + &self, + block_root: &Hash256, + proofs: &[ExecutionProof], + ) -> Result<(), Error> { + let proofs_vec: Vec = proofs.to_vec(); + self.blobs_db.put_bytes( + DBColumn::BeaconExecutionProof, + block_root.as_slice(), + &proofs_vec.as_ssz_bytes(), + ) + } + + /// Fetch execution proofs for a given block from the store. + pub fn get_execution_proofs( + &self, + block_root: &Hash256, + ) -> Result>, Error> { + match self + .blobs_db + .get_bytes(DBColumn::BeaconExecutionProof, block_root.as_slice())? + { + Some(ref bytes) => { + let proofs = Vec::::from_ssz_bytes(bytes)?; + Ok(Some(proofs)) + } + None => Ok(None), + } + } + + /// Generate key-value store ops for execution proofs (for batch operations). + pub fn execution_proofs_as_kv_store_ops( + &self, + key: &Hash256, + proofs: &[ExecutionProof], + ops: &mut Vec, + ) { + let proofs_vec: Vec = proofs.to_vec(); + ops.push(KeyValueStoreOp::PutKeyValue( + DBColumn::BeaconExecutionProof, + key.as_slice().to_vec(), + proofs_vec.as_ssz_bytes(), + )); + } + + /// Delete execution proofs for a given block root. + pub fn delete_execution_proofs(&self, block_root: &Hash256) -> Result<(), Error> { + self.blobs_db + .key_delete(DBColumn::BeaconExecutionProof, block_root.as_slice()) + } + pub fn data_column_as_kv_store_ops( &self, block_root: &Hash256, diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index a3d4e4a8cea..851c9b03ae6 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -266,6 +266,12 @@ pub enum DBColumn { BeaconDataColumn, #[strum(serialize = "bdi")] BeaconDataColumnCustodyInfo, + /// Execution proofs for zkVM validation. + /// + /// - Key: `Hash256` block root. + /// - Value: SSZ-encoded list of execution proofs. + #[strum(serialize = "bep")] + BeaconExecutionProof, /// For full `BeaconState`s in the hot database (finalized or fork-boundary states). /// /// DEPRECATED. @@ -409,6 +415,7 @@ impl DBColumn { | Self::BeaconBlock | Self::BeaconState | Self::BeaconBlob + | Self::BeaconExecutionProof | Self::BeaconStateSummary | Self::BeaconStateHotDiff | Self::BeaconStateHotSnapshot