Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 37 additions & 11 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4726,6 +4726,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// efficient packing of execution blocks.
Err(Error::SkipProposerPreparation)
} else {
debug!(
?shuffling_decision_root,
epoch = %proposal_epoch,
"Proposer shuffling cache miss for proposer prep"
);
let head = self.canonical_head.cached_head();
Ok((
head.head_state_root(),
Expand Down Expand Up @@ -6557,6 +6562,26 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

/// This function provides safe and efficient multi-threaded access to the beacon proposer cache.
///
/// The arguments are:
///
/// - `shuffling_decision_block`: The block root of the decision block for the desired proposer
/// shuffling. This should be computed using one of the methods for computing proposer
/// shuffling decision roots, e.g. `BeaconState::proposer_shuffling_decision_root_at_epoch`.
/// - `proposal_epoch`: The epoch at which the proposer shuffling is required.
/// - `accessor`: A closure to run against the proposers for the selected epoch. Usually this
/// closure just grabs a single proposer, or takes the vec of proposers for the epoch.
/// - `state_provider`: A closure to compute a state suitable for determining the shuffling.
/// This closure is evaluated lazily ONLY in the case that a cache miss occurs. It is
/// recommended for code that wants to keep track of cache misses to produce a log and/or
/// increment a metric inside this closure .
///
/// This function makes use of closures in order to efficiently handle concurrent accesses to
/// the cache.
///
/// The error type is polymorphic, if in doubt you can use `BeaconChainError`. You might need
/// to use a turbofish if type inference can't work it out.
pub fn with_proposer_cache<V, E: From<BeaconChainError> + From<BeaconStateError>>(
&self,
shuffling_decision_block: Hash256,
Expand All @@ -6575,12 +6600,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// If it is already initialised, then `get_or_try_init` will return immediately without
// executing the initialisation code at all.
let epoch_block_proposers = cache_entry.get_or_try_init(|| {
debug!(
?shuffling_decision_block,
%proposal_epoch,
"Proposer shuffling cache miss"
);

// Fetch the state on-demand if the required epoch was missing from the cache.
// If the caller wants to not compute the state they must return an error here and then
// catch it at the call site.
Expand Down Expand Up @@ -6610,11 +6629,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

let proposers = state.get_beacon_proposer_indices(proposal_epoch, &self.spec)?;
Ok::<_, E>(EpochBlockProposers::new(
proposal_epoch,
state.fork(),
proposers,
))

// Use fork_at_epoch rather than the state's fork, because post-Fulu we may not have
// advanced the state completely into the new epoch.
let fork = self.spec.fork_at_epoch(proposal_epoch);

debug!(
?shuffling_decision_block,
epoch = %proposal_epoch,
"Priming proposer shuffling cache"
);

Ok::<_, E>(EpochBlockProposers::new(proposal_epoch, fork, proposers))
})?;

// Run the accessor function on the computed epoch proposers.
Expand Down
19 changes: 8 additions & 11 deletions beacon_node/beacon_chain/src/beacon_proposer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use smallvec::SmallVec;
use state_processing::state_advance::partial_state_advance;
use std::num::NonZeroUsize;
use std::sync::Arc;
use tracing::instrument;
use types::non_zero_usize::new_non_zero_usize;
use types::{
BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, Hash256, Slot, Unsigned,
Expand Down Expand Up @@ -199,11 +200,14 @@ pub fn compute_proposer_duties_from_head<T: BeaconChainTypes>(
.map_err(BeaconChainError::from)?;

let dependent_root = state
// The only block which decides its own shuffling is the genesis block.
.proposer_shuffling_decision_root(chain.genesis_block_root, &chain.spec)
.proposer_shuffling_decision_root_at_epoch(request_epoch, head_block_root, &chain.spec)
.map_err(BeaconChainError::from)?;

Ok((indices, dependent_root, execution_status, state.fork()))
// Use fork_at_epoch rather than the state's fork, because post-Fulu we may not have advanced
// the state completely into the new epoch.
let fork = chain.spec.fork_at_epoch(request_epoch);

Ok((indices, dependent_root, execution_status, fork))
}

/// If required, advance `state` to the epoch required to determine proposer indices in `target_epoch`.
Expand All @@ -214,6 +218,7 @@ pub fn compute_proposer_duties_from_head<T: BeaconChainTypes>(
/// - No-op if `state.current_epoch() == target_epoch`.
/// - It must be the case that `state.canonical_root() == state_root`, but this function will not
/// check that.
#[instrument(skip_all, fields(?state_root, %target_epoch, state_slot = %state.slot()), level = "debug")]
pub fn ensure_state_can_determine_proposers_for_epoch<E: EthSpec>(
state: &mut BeaconState<E>,
state_root: Hash256,
Expand All @@ -234,14 +239,6 @@ pub fn ensure_state_can_determine_proposers_for_epoch<E: EthSpec>(
if state.current_epoch() > maximum_epoch {
Err(BeaconStateError::SlotOutOfBounds.into())
} else if state.current_epoch() >= minimum_epoch {
if target_epoch > state.current_epoch() {
let target_slot = target_epoch.start_slot(E::slots_per_epoch());

// Advance the state into the same epoch as the block. Use the "partial" method since state
// roots are not important for proposer/attester shuffling.
partial_state_advance(state, Some(state_root), target_slot, spec)
.map_err(BeaconChainError::from)?;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not necessary anymore?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pawan added this to work around the fact that we weren't advancing into the Fulu fork epoch to compute its shuffling (we were wrongly assuming we had lookahead). Now that we've fixed the function to calculate the decision slot, this is no longer a concern.

In this PR we've also started using fork_at_epoch so that the shuffling calculation works at future fork epochs (i.e. Gloas). This could be a good test case as well actually.

Removing the advance means we can take advantage of the lookahead post-Fulu. We avoid doing up to 1 epoch of unnecessary state advance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gloas fork epoch test added in 3171c12

Ok(())
} else {
// State's current epoch is less than the minimum epoch.
Expand Down
2 changes: 0 additions & 2 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -950,8 +950,6 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
let proposer_shuffling_decision_block =
parent_block.proposer_shuffling_root_for_child_block(block_epoch, &chain.spec);

// We assign to a variable instead of using `if let Some` directly to ensure we drop the
// write lock before trying to acquire it again in the `else` clause.
let block_slot = block.slot();
let mut opt_parent = None;
let proposer = chain.with_proposer_cache::<_, BlockError>(
Expand Down
66 changes: 47 additions & 19 deletions beacon_node/beacon_chain/src/state_advance_timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,25 +333,54 @@ fn advance_head<T: BeaconChainTypes>(beacon_chain: &Arc<BeaconChain<T>>) -> Resu
.build_committee_cache(RelativeEpoch::Next, &beacon_chain.spec)
.map_err(BeaconChainError::from)?;

// If the `pre_state` is in a later epoch than `state`, pre-emptively add the proposer shuffling
// for the state's current epoch and the committee cache for the state's next epoch.
// The state root is required to prime the proposer cache AND for writing it to disk.
let advanced_state_root = state.update_tree_hash_cache()?;

// If the `pre_state` is in a later epoch than `state`, pre-emptively update the proposer
// shuffling and attester shuffling caches.
if initial_epoch < state.current_epoch() {
// Update the proposer cache.
//
// We supply the `head_block_root` as the decision block since the prior `if` statement guarantees
// the head root is the latest block from the prior epoch.
beacon_chain
.beacon_proposer_cache
.lock()
.insert(
state.current_epoch(),
head_block_root,
state
.get_beacon_proposer_indices(state.current_epoch(), &beacon_chain.spec)
.map_err(BeaconChainError::from)?,
state.fork(),
)
.map_err(BeaconChainError::from)?;
// Include the proposer shuffling from the current epoch, which is likely to be useful
// pre-Fulu, and probably redundant post-Fulu (it should already have been in the cache).
let current_epoch_decision_root = state.proposer_shuffling_decision_root_at_epoch(
state.current_epoch(),
head_block_root,
&beacon_chain.spec,
)?;
beacon_chain.with_proposer_cache(
current_epoch_decision_root,
state.current_epoch(),
|_| Ok(()),
|| {
debug!(
shuffling_decision_root = ?current_epoch_decision_root,
epoch = %state.current_epoch(),
"Computing current epoch proposer shuffling in state advance"
);
Ok::<_, Error>((advanced_state_root, state.clone()))
},
)?;

// For epochs *greater than* the Fulu fork epoch, we have also determined the proposer
// shuffling for the next epoch.
let next_epoch = state.next_epoch()?;
let next_epoch_decision_root = state.proposer_shuffling_decision_root_at_epoch(
next_epoch,
head_block_root,
&beacon_chain.spec,
)?;
beacon_chain.with_proposer_cache(
next_epoch_decision_root,
next_epoch,
|_| Ok(()),
|| {
debug!(
shuffling_decision_root = ?next_epoch_decision_root,
epoch = %next_epoch,
"Computing next epoch proposer shuffling in state advance"
);
Ok::<_, Error>((advanced_state_root, state.clone()))
},
)?;

// Update the attester cache.
let shuffling_id =
Expand Down Expand Up @@ -406,7 +435,6 @@ fn advance_head<T: BeaconChainTypes>(beacon_chain: &Arc<BeaconChain<T>>) -> Resu
// even if we race with the deletion of this state by the finalization pruning code, the worst
// case is we end up with a finalized state stored, that will get pruned the next time pruning
// runs.
let advanced_state_root = state.update_tree_hash_cache()?;
beacon_chain.store.put_state(&advanced_state_root, &state)?;

debug!(
Expand Down
141 changes: 137 additions & 4 deletions beacon_node/beacon_chain/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ use beacon_chain::test_utils::{
use beacon_chain::{
BeaconChain, BeaconChainError, BeaconChainTypes, BeaconSnapshot, BlockError, ChainConfig,
NotifyExecutionLayer, ServerSentEventHandler, WhenSlotSkipped,
data_availability_checker::MaybeAvailableBlock, historical_blocks::HistoricalBlockError,
beacon_proposer_cache::{
compute_proposer_duties_from_head, ensure_state_can_determine_proposers_for_epoch,
},
data_availability_checker::MaybeAvailableBlock,
historical_blocks::HistoricalBlockError,
migrate::MigratorConfig,
};
use logging::create_test_tracing_subscriber;
Expand Down Expand Up @@ -1273,19 +1277,34 @@ async fn proposer_shuffling_root_consistency_test(
#[tokio::test]
async fn proposer_shuffling_root_consistency_same_epoch() {
let spec = test_spec::<E>();
proposer_shuffling_root_consistency_test(spec, 32, 39).await;
proposer_shuffling_root_consistency_test(
spec,
4 * E::slots_per_epoch(),
5 * E::slots_per_epoch() - 1,
)
.await;
}

#[tokio::test]
async fn proposer_shuffling_root_consistency_next_epoch() {
let spec = test_spec::<E>();
proposer_shuffling_root_consistency_test(spec, 32, 47).await;
proposer_shuffling_root_consistency_test(
spec,
4 * E::slots_per_epoch(),
6 * E::slots_per_epoch() - 1,
)
.await;
}

#[tokio::test]
async fn proposer_shuffling_root_consistency_two_epochs() {
let spec = test_spec::<E>();
proposer_shuffling_root_consistency_test(spec, 32, 55).await;
proposer_shuffling_root_consistency_test(
spec,
4 * E::slots_per_epoch(),
7 * E::slots_per_epoch() - 1,
)
.await;
}

#[tokio::test]
Expand Down Expand Up @@ -1501,6 +1520,120 @@ async fn proposer_shuffling_changing_with_lookahead() {
);
}

#[tokio::test]
async fn proposer_duties_from_head_fulu() {
let spec = ForkName::Fulu.make_genesis_spec(E::default_spec());

let db_path = tempdir().unwrap();
let store = get_store_generic(&db_path, Default::default(), spec.clone());
let validators_keypairs =
types::test_utils::generate_deterministic_keypairs(LOW_VALIDATOR_COUNT);
let harness = TestHarness::builder(MinimalEthSpec)
.spec(spec.into())
.keypairs(validators_keypairs)
.fresh_disk_store(store)
.mock_execution_layer()
.build();
let spec = &harness.chain.spec;

let initial_blocks = E::slots_per_epoch() * 3;

// Build chain out to parent block.
let initial_slots: Vec<Slot> = (1..=initial_blocks).map(Into::into).collect();
let (state, state_root) = harness.get_current_state_and_root();
let all_validators = harness.get_all_validators();
let (_, _, head_block_root, head_state) = harness
.add_attested_blocks_at_slots(state, state_root, &initial_slots, &all_validators)
.await;

// Compute the proposer duties at the next epoch from the head
let next_epoch = head_state.next_epoch().unwrap();
let (_indices, dependent_root, _, fork) =
compute_proposer_duties_from_head(next_epoch, &harness.chain).unwrap();

assert_eq!(
dependent_root,
head_state
.proposer_shuffling_decision_root_at_epoch(next_epoch, head_block_root.into(), spec)
.unwrap()
);
assert_eq!(fork, head_state.fork());
}

/// Test that we can compute the proposer shuffling for the Gloas fork epoch itself using lookahead!
#[tokio::test]
async fn proposer_lookahead_gloas_fork_epoch() {
let gloas_fork_epoch = Epoch::new(4);
let mut spec = ForkName::Fulu.make_genesis_spec(E::default_spec());
spec.gloas_fork_epoch = Some(gloas_fork_epoch);

let db_path = tempdir().unwrap();
let store = get_store_generic(&db_path, Default::default(), spec.clone());
let validators_keypairs =
types::test_utils::generate_deterministic_keypairs(LOW_VALIDATOR_COUNT);
let harness = TestHarness::builder(MinimalEthSpec)
.spec(spec.into())
.keypairs(validators_keypairs)
.fresh_disk_store(store)
.mock_execution_layer()
.build();
let spec = &harness.chain.spec;

let initial_blocks = (gloas_fork_epoch - 1)
.start_slot(E::slots_per_epoch())
.as_u64();

// Build chain out to parent block.
let initial_slots: Vec<Slot> = (1..=initial_blocks).map(Into::into).collect();
let (state, state_root) = harness.get_current_state_and_root();
let all_validators = harness.get_all_validators();
let (_, _, head_block_root, mut head_state) = harness
.add_attested_blocks_at_slots(state, state_root, &initial_slots, &all_validators)
.await;
let head_state_root = head_state.canonical_root().unwrap();

// Check that we have access to the next epoch shuffling according to
// `ensure_state_can_determine_proposers_for_epoch`.
ensure_state_can_determine_proposers_for_epoch(
&mut head_state,
head_state_root,
gloas_fork_epoch,
spec,
)
.unwrap();
assert_eq!(head_state.current_epoch(), gloas_fork_epoch - 1);

// Compute the proposer duties at the fork epoch from the head.
let (indices, dependent_root, _, fork) =
compute_proposer_duties_from_head(gloas_fork_epoch, &harness.chain).unwrap();

assert_eq!(
dependent_root,
head_state
.proposer_shuffling_decision_root_at_epoch(
gloas_fork_epoch,
head_block_root.into(),
spec
)
.unwrap()
);
assert_ne!(fork, head_state.fork());
assert_eq!(fork, spec.fork_at_epoch(gloas_fork_epoch));

// Build a block in the Gloas fork epoch and assert that the shuffling does not change.
let gloas_slots = vec![gloas_fork_epoch.start_slot(E::slots_per_epoch())];
let (_, _, _, _) = harness
.add_attested_blocks_at_slots(head_state, head_state_root, &gloas_slots, &all_validators)
.await;

let (no_lookahead_indices, no_lookahead_dependent_root, _, no_lookahead_fork) =
compute_proposer_duties_from_head(gloas_fork_epoch, &harness.chain).unwrap();

assert_eq!(no_lookahead_indices, indices);
assert_eq!(no_lookahead_dependent_root, dependent_root);
assert_eq!(no_lookahead_fork, fork);
}

// Ensure blocks from abandoned forks are pruned from the Hot DB
#[tokio::test]
async fn prunes_abandoned_fork_between_two_finalized_checkpoints() {
Expand Down
Loading
Loading