diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index f085684442b..7b556b84676 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -4726,6 +4726,11 @@ impl BeaconChain { // efficient packing of execution blocks. Err(Error::SkipProposerPreparation) } else { + debug!( + ?shuffling_decision_root, + epoch = %proposal_epoch, + "Proposer shuffling cache miss for proposer prep" + ); let head = self.canonical_head.cached_head(); Ok(( head.head_state_root(), @@ -6557,6 +6562,26 @@ impl BeaconChain { } } + /// This function provides safe and efficient multi-threaded access to the beacon proposer cache. + /// + /// The arguments are: + /// + /// - `shuffling_decision_block`: The block root of the decision block for the desired proposer + /// shuffling. This should be computed using one of the methods for computing proposer + /// shuffling decision roots, e.g. `BeaconState::proposer_shuffling_decision_root_at_epoch`. + /// - `proposal_epoch`: The epoch at which the proposer shuffling is required. + /// - `accessor`: A closure to run against the proposers for the selected epoch. Usually this + /// closure just grabs a single proposer, or takes the vec of proposers for the epoch. + /// - `state_provider`: A closure to compute a state suitable for determining the shuffling. + /// This closure is evaluated lazily ONLY in the case that a cache miss occurs. It is + /// recommended for code that wants to keep track of cache misses to produce a log and/or + /// increment a metric inside this closure . + /// + /// This function makes use of closures in order to efficiently handle concurrent accesses to + /// the cache. + /// + /// The error type is polymorphic, if in doubt you can use `BeaconChainError`. You might need + /// to use a turbofish if type inference can't work it out. pub fn with_proposer_cache + From>( &self, shuffling_decision_block: Hash256, @@ -6575,12 +6600,6 @@ impl BeaconChain { // If it is already initialised, then `get_or_try_init` will return immediately without // executing the initialisation code at all. let epoch_block_proposers = cache_entry.get_or_try_init(|| { - debug!( - ?shuffling_decision_block, - %proposal_epoch, - "Proposer shuffling cache miss" - ); - // Fetch the state on-demand if the required epoch was missing from the cache. // If the caller wants to not compute the state they must return an error here and then // catch it at the call site. @@ -6610,11 +6629,18 @@ impl BeaconChain { } let proposers = state.get_beacon_proposer_indices(proposal_epoch, &self.spec)?; - Ok::<_, E>(EpochBlockProposers::new( - proposal_epoch, - state.fork(), - proposers, - )) + + // Use fork_at_epoch rather than the state's fork, because post-Fulu we may not have + // advanced the state completely into the new epoch. + let fork = self.spec.fork_at_epoch(proposal_epoch); + + debug!( + ?shuffling_decision_block, + epoch = %proposal_epoch, + "Priming proposer shuffling cache" + ); + + Ok::<_, E>(EpochBlockProposers::new(proposal_epoch, fork, proposers)) })?; // Run the accessor function on the computed epoch proposers. diff --git a/beacon_node/beacon_chain/src/beacon_proposer_cache.rs b/beacon_node/beacon_chain/src/beacon_proposer_cache.rs index a64b4981cc4..6effce49f8b 100644 --- a/beacon_node/beacon_chain/src/beacon_proposer_cache.rs +++ b/beacon_node/beacon_chain/src/beacon_proposer_cache.rs @@ -17,6 +17,7 @@ use smallvec::SmallVec; use state_processing::state_advance::partial_state_advance; use std::num::NonZeroUsize; use std::sync::Arc; +use tracing::instrument; use types::non_zero_usize::new_non_zero_usize; use types::{ BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, Hash256, Slot, Unsigned, @@ -199,11 +200,14 @@ pub fn compute_proposer_duties_from_head( .map_err(BeaconChainError::from)?; let dependent_root = state - // The only block which decides its own shuffling is the genesis block. - .proposer_shuffling_decision_root(chain.genesis_block_root, &chain.spec) + .proposer_shuffling_decision_root_at_epoch(request_epoch, head_block_root, &chain.spec) .map_err(BeaconChainError::from)?; - Ok((indices, dependent_root, execution_status, state.fork())) + // Use fork_at_epoch rather than the state's fork, because post-Fulu we may not have advanced + // the state completely into the new epoch. + let fork = chain.spec.fork_at_epoch(request_epoch); + + Ok((indices, dependent_root, execution_status, fork)) } /// If required, advance `state` to the epoch required to determine proposer indices in `target_epoch`. @@ -214,6 +218,7 @@ pub fn compute_proposer_duties_from_head( /// - No-op if `state.current_epoch() == target_epoch`. /// - It must be the case that `state.canonical_root() == state_root`, but this function will not /// check that. +#[instrument(skip_all, fields(?state_root, %target_epoch, state_slot = %state.slot()), level = "debug")] pub fn ensure_state_can_determine_proposers_for_epoch( state: &mut BeaconState, state_root: Hash256, @@ -234,14 +239,6 @@ pub fn ensure_state_can_determine_proposers_for_epoch( if state.current_epoch() > maximum_epoch { Err(BeaconStateError::SlotOutOfBounds.into()) } else if state.current_epoch() >= minimum_epoch { - if target_epoch > state.current_epoch() { - let target_slot = target_epoch.start_slot(E::slots_per_epoch()); - - // Advance the state into the same epoch as the block. Use the "partial" method since state - // roots are not important for proposer/attester shuffling. - partial_state_advance(state, Some(state_root), target_slot, spec) - .map_err(BeaconChainError::from)?; - } Ok(()) } else { // State's current epoch is less than the minimum epoch. diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index d0ed8258e55..691293b2000 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -950,8 +950,6 @@ impl GossipVerifiedBlock { let proposer_shuffling_decision_block = parent_block.proposer_shuffling_root_for_child_block(block_epoch, &chain.spec); - // We assign to a variable instead of using `if let Some` directly to ensure we drop the - // write lock before trying to acquire it again in the `else` clause. let block_slot = block.slot(); let mut opt_parent = None; let proposer = chain.with_proposer_cache::<_, BlockError>( diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index 87348cb01be..b10edf23369 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -333,25 +333,54 @@ fn advance_head(beacon_chain: &Arc>) -> Resu .build_committee_cache(RelativeEpoch::Next, &beacon_chain.spec) .map_err(BeaconChainError::from)?; - // If the `pre_state` is in a later epoch than `state`, pre-emptively add the proposer shuffling - // for the state's current epoch and the committee cache for the state's next epoch. + // The state root is required to prime the proposer cache AND for writing it to disk. + let advanced_state_root = state.update_tree_hash_cache()?; + + // If the `pre_state` is in a later epoch than `state`, pre-emptively update the proposer + // shuffling and attester shuffling caches. if initial_epoch < state.current_epoch() { - // Update the proposer cache. - // - // We supply the `head_block_root` as the decision block since the prior `if` statement guarantees - // the head root is the latest block from the prior epoch. - beacon_chain - .beacon_proposer_cache - .lock() - .insert( - state.current_epoch(), - head_block_root, - state - .get_beacon_proposer_indices(state.current_epoch(), &beacon_chain.spec) - .map_err(BeaconChainError::from)?, - state.fork(), - ) - .map_err(BeaconChainError::from)?; + // Include the proposer shuffling from the current epoch, which is likely to be useful + // pre-Fulu, and probably redundant post-Fulu (it should already have been in the cache). + let current_epoch_decision_root = state.proposer_shuffling_decision_root_at_epoch( + state.current_epoch(), + head_block_root, + &beacon_chain.spec, + )?; + beacon_chain.with_proposer_cache( + current_epoch_decision_root, + state.current_epoch(), + |_| Ok(()), + || { + debug!( + shuffling_decision_root = ?current_epoch_decision_root, + epoch = %state.current_epoch(), + "Computing current epoch proposer shuffling in state advance" + ); + Ok::<_, Error>((advanced_state_root, state.clone())) + }, + )?; + + // For epochs *greater than* the Fulu fork epoch, we have also determined the proposer + // shuffling for the next epoch. + let next_epoch = state.next_epoch()?; + let next_epoch_decision_root = state.proposer_shuffling_decision_root_at_epoch( + next_epoch, + head_block_root, + &beacon_chain.spec, + )?; + beacon_chain.with_proposer_cache( + next_epoch_decision_root, + next_epoch, + |_| Ok(()), + || { + debug!( + shuffling_decision_root = ?next_epoch_decision_root, + epoch = %next_epoch, + "Computing next epoch proposer shuffling in state advance" + ); + Ok::<_, Error>((advanced_state_root, state.clone())) + }, + )?; // Update the attester cache. let shuffling_id = @@ -406,7 +435,6 @@ fn advance_head(beacon_chain: &Arc>) -> Resu // even if we race with the deletion of this state by the finalization pruning code, the worst // case is we end up with a finalized state stored, that will get pruned the next time pruning // runs. - let advanced_state_root = state.update_tree_hash_cache()?; beacon_chain.store.put_state(&advanced_state_root, &state)?; debug!( diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index cd4032f55d9..b367b6d4374 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -13,7 +13,11 @@ use beacon_chain::test_utils::{ use beacon_chain::{ BeaconChain, BeaconChainError, BeaconChainTypes, BeaconSnapshot, BlockError, ChainConfig, NotifyExecutionLayer, ServerSentEventHandler, WhenSlotSkipped, - data_availability_checker::MaybeAvailableBlock, historical_blocks::HistoricalBlockError, + beacon_proposer_cache::{ + compute_proposer_duties_from_head, ensure_state_can_determine_proposers_for_epoch, + }, + data_availability_checker::MaybeAvailableBlock, + historical_blocks::HistoricalBlockError, migrate::MigratorConfig, }; use logging::create_test_tracing_subscriber; @@ -1273,19 +1277,34 @@ async fn proposer_shuffling_root_consistency_test( #[tokio::test] async fn proposer_shuffling_root_consistency_same_epoch() { let spec = test_spec::(); - proposer_shuffling_root_consistency_test(spec, 32, 39).await; + proposer_shuffling_root_consistency_test( + spec, + 4 * E::slots_per_epoch(), + 5 * E::slots_per_epoch() - 1, + ) + .await; } #[tokio::test] async fn proposer_shuffling_root_consistency_next_epoch() { let spec = test_spec::(); - proposer_shuffling_root_consistency_test(spec, 32, 47).await; + proposer_shuffling_root_consistency_test( + spec, + 4 * E::slots_per_epoch(), + 6 * E::slots_per_epoch() - 1, + ) + .await; } #[tokio::test] async fn proposer_shuffling_root_consistency_two_epochs() { let spec = test_spec::(); - proposer_shuffling_root_consistency_test(spec, 32, 55).await; + proposer_shuffling_root_consistency_test( + spec, + 4 * E::slots_per_epoch(), + 7 * E::slots_per_epoch() - 1, + ) + .await; } #[tokio::test] @@ -1501,6 +1520,120 @@ async fn proposer_shuffling_changing_with_lookahead() { ); } +#[tokio::test] +async fn proposer_duties_from_head_fulu() { + let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + + let db_path = tempdir().unwrap(); + let store = get_store_generic(&db_path, Default::default(), spec.clone()); + let validators_keypairs = + types::test_utils::generate_deterministic_keypairs(LOW_VALIDATOR_COUNT); + let harness = TestHarness::builder(MinimalEthSpec) + .spec(spec.into()) + .keypairs(validators_keypairs) + .fresh_disk_store(store) + .mock_execution_layer() + .build(); + let spec = &harness.chain.spec; + + let initial_blocks = E::slots_per_epoch() * 3; + + // Build chain out to parent block. + let initial_slots: Vec = (1..=initial_blocks).map(Into::into).collect(); + let (state, state_root) = harness.get_current_state_and_root(); + let all_validators = harness.get_all_validators(); + let (_, _, head_block_root, head_state) = harness + .add_attested_blocks_at_slots(state, state_root, &initial_slots, &all_validators) + .await; + + // Compute the proposer duties at the next epoch from the head + let next_epoch = head_state.next_epoch().unwrap(); + let (_indices, dependent_root, _, fork) = + compute_proposer_duties_from_head(next_epoch, &harness.chain).unwrap(); + + assert_eq!( + dependent_root, + head_state + .proposer_shuffling_decision_root_at_epoch(next_epoch, head_block_root.into(), spec) + .unwrap() + ); + assert_eq!(fork, head_state.fork()); +} + +/// Test that we can compute the proposer shuffling for the Gloas fork epoch itself using lookahead! +#[tokio::test] +async fn proposer_lookahead_gloas_fork_epoch() { + let gloas_fork_epoch = Epoch::new(4); + let mut spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + spec.gloas_fork_epoch = Some(gloas_fork_epoch); + + let db_path = tempdir().unwrap(); + let store = get_store_generic(&db_path, Default::default(), spec.clone()); + let validators_keypairs = + types::test_utils::generate_deterministic_keypairs(LOW_VALIDATOR_COUNT); + let harness = TestHarness::builder(MinimalEthSpec) + .spec(spec.into()) + .keypairs(validators_keypairs) + .fresh_disk_store(store) + .mock_execution_layer() + .build(); + let spec = &harness.chain.spec; + + let initial_blocks = (gloas_fork_epoch - 1) + .start_slot(E::slots_per_epoch()) + .as_u64(); + + // Build chain out to parent block. + let initial_slots: Vec = (1..=initial_blocks).map(Into::into).collect(); + let (state, state_root) = harness.get_current_state_and_root(); + let all_validators = harness.get_all_validators(); + let (_, _, head_block_root, mut head_state) = harness + .add_attested_blocks_at_slots(state, state_root, &initial_slots, &all_validators) + .await; + let head_state_root = head_state.canonical_root().unwrap(); + + // Check that we have access to the next epoch shuffling according to + // `ensure_state_can_determine_proposers_for_epoch`. + ensure_state_can_determine_proposers_for_epoch( + &mut head_state, + head_state_root, + gloas_fork_epoch, + spec, + ) + .unwrap(); + assert_eq!(head_state.current_epoch(), gloas_fork_epoch - 1); + + // Compute the proposer duties at the fork epoch from the head. + let (indices, dependent_root, _, fork) = + compute_proposer_duties_from_head(gloas_fork_epoch, &harness.chain).unwrap(); + + assert_eq!( + dependent_root, + head_state + .proposer_shuffling_decision_root_at_epoch( + gloas_fork_epoch, + head_block_root.into(), + spec + ) + .unwrap() + ); + assert_ne!(fork, head_state.fork()); + assert_eq!(fork, spec.fork_at_epoch(gloas_fork_epoch)); + + // Build a block in the Gloas fork epoch and assert that the shuffling does not change. + let gloas_slots = vec![gloas_fork_epoch.start_slot(E::slots_per_epoch())]; + let (_, _, _, _) = harness + .add_attested_blocks_at_slots(head_state, head_state_root, &gloas_slots, &all_validators) + .await; + + let (no_lookahead_indices, no_lookahead_dependent_root, _, no_lookahead_fork) = + compute_proposer_duties_from_head(gloas_fork_epoch, &harness.chain).unwrap(); + + assert_eq!(no_lookahead_indices, indices); + assert_eq!(no_lookahead_dependent_root, dependent_root); + assert_eq!(no_lookahead_fork, fork); +} + // Ensure blocks from abandoned forks are pruned from the Hot DB #[tokio::test] async fn prunes_abandoned_fork_between_two_finalized_checkpoints() { diff --git a/beacon_node/http_api/src/proposer_duties.rs b/beacon_node/http_api/src/proposer_duties.rs index ceac60cbad1..78f99c475ce 100644 --- a/beacon_node/http_api/src/proposer_duties.rs +++ b/beacon_node/http_api/src/proposer_duties.rs @@ -103,14 +103,6 @@ fn try_proposer_duties_from_cache( let head_block = &head.snapshot.beacon_block; let head_block_root = head.head_block_root(); let head_epoch = head_block.slot().epoch(T::EthSpec::slots_per_epoch()); - let head_decision_root = head - .snapshot - .beacon_state - .proposer_shuffling_decision_root(head_block_root, &chain.spec) - .map_err(warp_utils::reject::beacon_state_error)?; - let execution_optimistic = chain - .is_optimistic_or_invalid_head_block(head_block) - .map_err(warp_utils::reject::unhandled_error)?; // This code path can't handle requests for past epochs. if head_epoch > request_epoch { @@ -119,6 +111,15 @@ fn try_proposer_duties_from_cache( ))); } + let head_decision_root = head + .snapshot + .beacon_state + .proposer_shuffling_decision_root_at_epoch(request_epoch, head_block_root, &chain.spec) + .map_err(warp_utils::reject::beacon_state_error)?; + let execution_optimistic = chain + .is_optimistic_or_invalid_head_block(head_block) + .map_err(warp_utils::reject::unhandled_error)?; + chain .beacon_proposer_cache .lock() diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index 1398d8c72fe..9732a62a2f0 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -940,3 +940,106 @@ async fn queue_attestations_from_http() { attestation_future.await.unwrap(); } + +// Test that a request for next epoch proposer duties suceeds when the current slot clock is within +// gossip clock disparity (500ms) of the new epoch. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn proposer_duties_with_gossip_tolerance() { + let validator_count = 24; + let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + + let tester = InteractiveTester::::new(Some(spec.clone()), validator_count).await; + let harness = &tester.harness; + let client = &tester.client; + + let num_initial = 4 * E::slots_per_epoch() - 1; + let next_epoch_start_slot = Slot::new(num_initial + 1); + + harness.advance_slot(); + harness + .extend_chain_with_sync( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + SyncCommitteeStrategy::NoValidators, + LightClientStrategy::Disabled, + ) + .await; + + assert_eq!(harness.chain.slot().unwrap(), num_initial); + + // Set the clock to just before the next epoch. + harness.chain.slot_clock.advance_time( + Duration::from_secs(spec.seconds_per_slot) - spec.maximum_gossip_clock_disparity(), + ); + assert_eq!( + harness + .chain + .slot_clock + .now_with_future_tolerance(spec.maximum_gossip_clock_disparity()) + .unwrap(), + next_epoch_start_slot + ); + + let head_state = harness.get_current_state(); + let head_block_root = harness.head_block_root(); + let tolerant_current_epoch = next_epoch_start_slot.epoch(E::slots_per_epoch()); + + // This is a regression test for the bug described here: + // https://github.com/sigp/lighthouse/pull/8130/files#r2386594566 + // + // To trigger it, we need to prime the proposer shuffling cache with an incorrect entry which + // the previous code would be liable to lookup due to the bugs in its decision root calculation. + let wrong_decision_root = head_state + .proposer_shuffling_decision_root(head_block_root, &spec) + .unwrap(); + let wrong_proposer_indices = vec![0; E::slots_per_epoch() as usize]; + harness + .chain + .beacon_proposer_cache + .lock() + .insert( + tolerant_current_epoch, + wrong_decision_root, + wrong_proposer_indices.clone(), + head_state.fork(), + ) + .unwrap(); + + // Request the proposer duties. + let proposer_duties_tolerant_current_epoch = client + .get_validator_duties_proposer(tolerant_current_epoch) + .await + .unwrap(); + + assert_eq!( + proposer_duties_tolerant_current_epoch.dependent_root, + head_state + .proposer_shuffling_decision_root_at_epoch(tolerant_current_epoch, Hash256::ZERO, &spec) + .unwrap() + ); + assert_ne!( + proposer_duties_tolerant_current_epoch + .data + .iter() + .map(|data| data.validator_index as usize) + .collect::>(), + wrong_proposer_indices, + ); + + // We should get the exact same result after properly advancing into the epoch. + harness + .chain + .slot_clock + .advance_time(spec.maximum_gossip_clock_disparity()); + assert_eq!(harness.chain.slot().unwrap(), next_epoch_start_slot); + let proposer_duties_current_epoch = client + .get_validator_duties_proposer(tolerant_current_epoch) + .await + .unwrap(); + + assert_eq!( + proposer_duties_tolerant_current_epoch, + proposer_duties_current_epoch + ); +} diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 50a2f268e00..421655777e7 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -476,15 +476,23 @@ impl ChainSpec { /// Returns a full `Fork` struct for a given epoch. pub fn fork_at_epoch(&self, epoch: Epoch) -> Fork { let current_fork_name = self.fork_name_at_epoch(epoch); - let previous_fork_name = current_fork_name.previous_fork().unwrap_or(ForkName::Base); - let epoch = self + + let fork_epoch = self .fork_epoch(current_fork_name) .unwrap_or_else(|| Epoch::new(0)); + // At genesis the Fork is initialised with two copies of the same value for both + // `previous_version` and `current_version` (see `initialize_beacon_state_from_eth1`). + let previous_fork_name = if fork_epoch == 0 { + current_fork_name + } else { + current_fork_name.previous_fork().unwrap_or(ForkName::Base) + }; + Fork { previous_version: self.fork_version_for_name(previous_fork_name), current_version: self.fork_version_for_name(current_fork_name), - epoch, + epoch: fork_epoch, } } @@ -3010,9 +3018,11 @@ mod yaml_tests { fn proposer_shuffling_decision_root_around_epoch_boundary() { type E = MainnetEthSpec; let fulu_fork_epoch = 5; + let gloas_fork_epoch = 10; let spec = { let mut spec = ForkName::Electra.make_genesis_spec(E::default_spec()); spec.fulu_fork_epoch = Some(Epoch::new(fulu_fork_epoch)); + spec.gloas_fork_epoch = Some(Epoch::new(gloas_fork_epoch)); Arc::new(spec) }; @@ -3026,7 +3036,7 @@ mod yaml_tests { } // For epochs after Fulu, the decision slot is the end of the epoch two epochs prior. - for epoch in ((fulu_fork_epoch + 1)..(fulu_fork_epoch + 10)).map(Epoch::new) { + for epoch in ((fulu_fork_epoch + 1)..=(gloas_fork_epoch + 1)).map(Epoch::new) { assert_eq!( spec.proposer_shuffling_decision_slot::(epoch), (epoch - 1).start_slot(E::slots_per_epoch()) - 1