Skip to content

Commit 64ad2af

Browse files
Subscribe to altair gossip topics 2 slots before fork (#2532)
## Issue Addressed N/A ## Proposed Changes Add a fork_digest to `ForkContext` only if it is set in the config. Reject gossip messages on post fork topics before the fork happens. Edit: Instead of rejecting gossip messages on post fork topics, we now subscribe to post fork topics 2 slots before the fork. Co-authored-by: Age Manning <[email protected]>
1 parent acdcea9 commit 64ad2af

File tree

7 files changed

+145
-24
lines changed

7 files changed

+145
-24
lines changed

beacon_node/eth2_libp2p/src/behaviour/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,15 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
326326
self.unsubscribe(gossip_topic)
327327
}
328328

329+
/// Subscribe to all currently subscribed topics with the new fork digest.
330+
pub fn subscribe_new_fork_topics(&mut self, new_fork_digest: [u8; 4]) {
331+
let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone();
332+
for mut topic in subscriptions.into_iter() {
333+
topic.fork_digest = new_fork_digest;
334+
self.subscribe(topic);
335+
}
336+
}
337+
329338
/// Unsubscribe from all topics that doesn't have the given fork_digest
330339
pub fn unsubscribe_from_fork_topics_except(&mut self, except: [u8; 4]) {
331340
let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone();

beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,11 @@ mod tests {
615615
type Spec = types::MainnetEthSpec;
616616

617617
fn fork_context() -> ForkContext {
618-
ForkContext::new::<Spec>(types::Slot::new(0), Hash256::zero(), &Spec::default_spec())
618+
let mut chain_spec = Spec::default_spec();
619+
// Set fork_epoch to `Some` to ensure that the `ForkContext` object
620+
// includes altair in the list of forks
621+
chain_spec.altair_fork_epoch = Some(types::Epoch::new(42));
622+
ForkContext::new::<Spec>(types::Slot::new(0), Hash256::zero(), &chain_spec)
619623
}
620624

621625
fn base_block() -> SignedBeaconBlock<Spec> {

beacon_node/eth2_libp2p/tests/common/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,18 @@ use std::sync::Arc;
1111
use std::sync::Weak;
1212
use std::time::Duration;
1313
use tokio::runtime::Runtime;
14-
use types::{ChainSpec, EnrForkId, ForkContext, Hash256, MinimalEthSpec};
14+
use types::{ChainSpec, EnrForkId, EthSpec, ForkContext, Hash256, MinimalEthSpec};
1515

1616
type E = MinimalEthSpec;
1717
use tempfile::Builder as TempBuilder;
1818

1919
/// Returns a dummy fork context
2020
fn fork_context() -> ForkContext {
21-
ForkContext::new::<E>(types::Slot::new(0), Hash256::zero(), &ChainSpec::minimal())
21+
let mut chain_spec = E::default_spec();
22+
// Set fork_epoch to `Some` to ensure that the `ForkContext` object
23+
// includes altair in the list of forks
24+
chain_spec.altair_fork_epoch = Some(types::Epoch::new(42));
25+
ForkContext::new::<E>(types::Slot::new(0), Hash256::zero(), &chain_spec)
2226
}
2327

2428
pub struct Libp2pInstance(LibP2PService<E>, exit_future::Signal);

beacon_node/eth2_libp2p/tests/rpc_tests.rs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,16 @@ fn test_blocks_by_range_chunked_rpc() {
138138
step: 0,
139139
});
140140

141-
// BlocksByRange Response
142141
let spec = E::default_spec();
143-
let empty_block = BeaconBlock::empty(&spec);
144-
let empty_signed = SignedBeaconBlock::from_block(empty_block, Signature::empty());
145-
let rpc_response = Response::BlocksByRange(Some(Box::new(empty_signed)));
142+
143+
// BlocksByRange Response
144+
let full_block = BeaconBlock::Base(BeaconBlockBase::<E>::full(&spec));
145+
let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty());
146+
let rpc_response_base = Response::BlocksByRange(Some(Box::new(signed_full_block)));
147+
148+
let full_block = BeaconBlock::Altair(BeaconBlockAltair::<E>::full(&spec));
149+
let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty());
150+
let rpc_response_altair = Response::BlocksByRange(Some(Box::new(signed_full_block)));
146151

147152
// keep count of the number of messages received
148153
let mut messages_received = 0;
@@ -167,7 +172,11 @@ fn test_blocks_by_range_chunked_rpc() {
167172
warn!(log, "Sender received a response");
168173
match response {
169174
Response::BlocksByRange(Some(_)) => {
170-
assert_eq!(response, rpc_response.clone());
175+
if messages_received < 5 {
176+
assert_eq!(response, rpc_response_base.clone());
177+
} else {
178+
assert_eq!(response, rpc_response_altair.clone());
179+
}
171180
messages_received += 1;
172181
warn!(log, "Chunk received");
173182
}
@@ -197,7 +206,14 @@ fn test_blocks_by_range_chunked_rpc() {
197206
if request == rpc_request {
198207
// send the response
199208
warn!(log, "Receiver got request");
200-
for _ in 1..=messages_to_send {
209+
for i in 0..messages_to_send {
210+
// Send first half of responses as base blocks and
211+
// second half as altair blocks.
212+
let rpc_response = if i < 5 {
213+
rpc_response_base.clone()
214+
} else {
215+
rpc_response_altair.clone()
216+
};
201217
receiver.swarm.behaviour_mut().send_successful_response(
202218
peer_id,
203219
id,
@@ -481,7 +497,7 @@ fn test_blocks_by_root_chunked_rpc() {
481497
let log_level = Level::Debug;
482498
let enable_logging = false;
483499

484-
let messages_to_send = 3;
500+
let messages_to_send = 10;
485501

486502
let log = common::build_log(log_level, enable_logging);
487503
let spec = E::default_spec();
@@ -497,6 +513,13 @@ fn test_blocks_by_root_chunked_rpc() {
497513
Hash256::from_low_u64_be(0),
498514
Hash256::from_low_u64_be(0),
499515
Hash256::from_low_u64_be(0),
516+
Hash256::from_low_u64_be(0),
517+
Hash256::from_low_u64_be(0),
518+
Hash256::from_low_u64_be(0),
519+
Hash256::from_low_u64_be(0),
520+
Hash256::from_low_u64_be(0),
521+
Hash256::from_low_u64_be(0),
522+
Hash256::from_low_u64_be(0),
500523
]),
501524
});
502525

beacon_node/network/src/service.rs

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,16 @@ use task_executor::ShutdownReason;
2525
use tokio::sync::mpsc;
2626
use tokio::time::Sleep;
2727
use types::{
28-
EthSpec, ForkContext, ForkName, RelativeEpoch, SubnetId, SyncCommitteeSubscription,
29-
SyncSubnetId, Unsigned, ValidatorSubscription,
28+
ChainSpec, EthSpec, ForkContext, ForkName, RelativeEpoch, Slot, SubnetId,
29+
SyncCommitteeSubscription, SyncSubnetId, Unsigned, ValidatorSubscription,
3030
};
3131

3232
mod tests;
3333

3434
/// The interval (in seconds) that various network metrics will update.
3535
const METRIC_UPDATE_INTERVAL: u64 = 1;
36+
/// Number of slots before the fork when we should subscribe to the new fork topics.
37+
const SUBSCRIBE_DELAY_SLOTS: u64 = 2;
3638
/// Delay after a fork where we unsubscribe from pre-fork topics.
3739
const UNSUBSCRIBE_DELAY_EPOCHS: u64 = 2;
3840

@@ -129,6 +131,8 @@ pub struct NetworkService<T: BeaconChainTypes> {
129131
discovery_auto_update: bool,
130132
/// A delay that expires when a new fork takes place.
131133
next_fork_update: Pin<Box<OptionFuture<Sleep>>>,
134+
/// A delay that expires when we need to subscribe to a new fork's topics.
135+
next_fork_subscriptions: Pin<Box<OptionFuture<Sleep>>>,
132136
/// A delay that expires when we need to unsubscribe from old fork topics.
133137
next_unsubscribe: Pin<Box<OptionFuture<Sleep>>>,
134138
/// Subscribe to all the subnets once synced.
@@ -179,6 +183,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
179183

180184
// keep track of when our fork_id needs to be updated
181185
let next_fork_update = Box::pin(next_fork_delay(&beacon_chain).into());
186+
let next_fork_subscriptions = Box::pin(next_fork_subscriptions_delay(&beacon_chain).into());
182187
let next_unsubscribe = Box::pin(None.into());
183188

184189
let current_slot = beacon_chain
@@ -192,6 +197,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
192197
&beacon_chain.spec,
193198
));
194199

200+
debug!(network_log, "Current fork"; "fork_name" => ?fork_context.current_fork());
201+
195202
// launch libp2p service
196203
let (network_globals, mut libp2p) = LibP2PService::new(
197204
executor.clone(),
@@ -254,6 +261,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
254261
upnp_mappings: (None, None),
255262
discovery_auto_update: config.discv5_config.enr_update,
256263
next_fork_update,
264+
next_fork_subscriptions,
257265
next_unsubscribe,
258266
subscribe_all_subnets: config.subscribe_all_subnets,
259267
shutdown_after_sync: config.shutdown_after_sync,
@@ -274,12 +282,23 @@ impl<T: BeaconChainTypes> NetworkService<T> {
274282
/// digests since we should be subscribed to post fork topics before the fork.
275283
pub fn required_gossip_fork_digests(&self) -> Vec<[u8; 4]> {
276284
let fork_context = &self.fork_context;
285+
let spec = &self.beacon_chain.spec;
277286
match fork_context.current_fork() {
278287
ForkName::Base => {
279-
if fork_context.fork_exists(ForkName::Altair) {
280-
fork_context.all_fork_digests()
281-
} else {
282-
vec![fork_context.genesis_context_bytes()]
288+
// If we are SUBSCRIBE_DELAY_SLOTS before the fork slot, subscribe only to Base,
289+
// else subscribe to Base and Altair.
290+
let current_slot = self.beacon_chain.slot().unwrap_or(spec.genesis_slot);
291+
match spec.next_fork_epoch::<T::EthSpec>(current_slot) {
292+
Some((_, fork_epoch)) => {
293+
if current_slot.saturating_add(Slot::new(SUBSCRIBE_DELAY_SLOTS))
294+
>= fork_epoch.start_slot(T::EthSpec::slots_per_epoch())
295+
{
296+
fork_context.all_fork_digests()
297+
} else {
298+
vec![fork_context.genesis_context_bytes()]
299+
}
300+
}
301+
None => vec![fork_context.genesis_context_bytes()],
283302
}
284303
}
285304
ForkName::Altair => vec![fork_context
@@ -619,6 +638,7 @@ fn spawn_service<T: BeaconChainTypes>(
619638
} => {
620639
// Update prometheus metrics.
621640
metrics::expose_receive_metrics(&message);
641+
622642
match message {
623643
// attestation information gets processed in the attestation service
624644
PubsubMessage::Attestation(ref subnet_and_attestation) => {
@@ -671,7 +691,7 @@ fn spawn_service<T: BeaconChainTypes>(
671691
if let Some(new_fork_name) = fork_context.from_context_bytes(new_enr_fork_id.fork_digest) {
672692
info!(
673693
service.log,
674-
"Updating enr fork version";
694+
"Transitioned to new fork";
675695
"old_fork" => ?fork_context.current_fork(),
676696
"new_fork" => ?new_fork_name,
677697
);
@@ -701,6 +721,18 @@ fn spawn_service<T: BeaconChainTypes>(
701721
info!(service.log, "Unsubscribed from old fork topics");
702722
service.next_unsubscribe = Box::pin(None.into());
703723
}
724+
Some(_) = &mut service.next_fork_subscriptions => {
725+
if let Some((fork_name, _)) = service.beacon_chain.duration_to_next_fork() {
726+
let fork_version = service.beacon_chain.spec.fork_version_for_name(fork_name);
727+
let fork_digest = ChainSpec::compute_fork_digest(fork_version, service.beacon_chain.genesis_validators_root);
728+
info!(service.log, "Subscribing to new fork topics");
729+
service.libp2p.swarm.behaviour_mut().subscribe_new_fork_topics(fork_digest);
730+
}
731+
else {
732+
error!(service.log, "Fork subscription scheduled but no fork scheduled");
733+
}
734+
service.next_fork_subscriptions = Box::pin(next_fork_subscriptions_delay(&service.beacon_chain).into());
735+
}
704736
}
705737
metrics::update_bandwidth_metrics(service.libp2p.bandwidth.clone());
706738
}
@@ -717,6 +749,22 @@ fn next_fork_delay<T: BeaconChainTypes>(
717749
.map(|(_, until_fork)| tokio::time::sleep(until_fork))
718750
}
719751

752+
/// Returns a `Sleep` that triggers `SUBSCRIBE_DELAY_SLOTS` before the next fork.
753+
/// Returns `None` if there are no scheduled forks or we are already past `current_slot + SUBSCRIBE_DELAY_SLOTS > fork_slot`.
754+
fn next_fork_subscriptions_delay<T: BeaconChainTypes>(
755+
beacon_chain: &BeaconChain<T>,
756+
) -> Option<tokio::time::Sleep> {
757+
if let Some((_, duration_to_fork)) = beacon_chain.duration_to_next_fork() {
758+
let duration_to_subscription = duration_to_fork.saturating_sub(Duration::from_secs(
759+
beacon_chain.spec.seconds_per_slot * SUBSCRIBE_DELAY_SLOTS,
760+
));
761+
if !duration_to_subscription.is_zero() {
762+
return Some(tokio::time::sleep(duration_to_subscription));
763+
}
764+
}
765+
None
766+
}
767+
720768
impl<T: BeaconChainTypes> Drop for NetworkService<T> {
721769
fn drop(&mut self) {
722770
// network thread is terminating

consensus/types/src/chain_spec.rs

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use crate::*;
22
use eth2_serde_utils::quoted_u64::MaybeQuoted;
33
use int_to_bytes::int_to_bytes4;
4-
use serde_derive::{Deserialize, Serialize};
4+
use serde::{Deserializer, Serialize, Serializer};
5+
use serde_derive::Deserialize;
56
use std::fs::File;
67
use std::path::Path;
78
use tree_hash::TreeHash;
@@ -467,7 +468,7 @@ impl ChainSpec {
467468
domain_sync_committee_selection_proof: 8,
468469
domain_contribution_and_proof: 9,
469470
altair_fork_version: [0x01, 0x00, 0x00, 0x00],
470-
altair_fork_epoch: Some(Epoch::new(u64::MAX)),
471+
altair_fork_epoch: None,
471472

472473
/*
473474
* Network specific
@@ -506,7 +507,7 @@ impl ChainSpec {
506507
// Altair
507508
epochs_per_sync_committee_period: Epoch::new(8),
508509
altair_fork_version: [0x01, 0x00, 0x00, 0x01],
509-
altair_fork_epoch: Some(Epoch::new(u64::MAX)),
510+
altair_fork_epoch: None,
510511
// Other
511512
network_id: 2, // lighthouse testnet network id
512513
deposit_chain_id: 5,
@@ -544,7 +545,9 @@ pub struct Config {
544545

545546
#[serde(with = "eth2_serde_utils::bytes_4_hex")]
546547
altair_fork_version: [u8; 4],
547-
altair_fork_epoch: Option<MaybeQuoted<Epoch>>,
548+
#[serde(serialize_with = "serialize_fork_epoch")]
549+
#[serde(deserialize_with = "deserialize_fork_epoch")]
550+
pub altair_fork_epoch: Option<MaybeQuoted<Epoch>>,
548551

549552
#[serde(with = "eth2_serde_utils::quoted_u64")]
550553
seconds_per_slot: u64,
@@ -582,6 +585,35 @@ impl Default for Config {
582585
}
583586
}
584587

588+
/// Util function to serialize a `None` fork epoch value
589+
/// as `Epoch::max_value()`.
590+
fn serialize_fork_epoch<S>(val: &Option<MaybeQuoted<Epoch>>, s: S) -> Result<S::Ok, S::Error>
591+
where
592+
S: Serializer,
593+
{
594+
match val {
595+
None => MaybeQuoted {
596+
value: Epoch::max_value(),
597+
}
598+
.serialize(s),
599+
Some(epoch) => epoch.serialize(s),
600+
}
601+
}
602+
603+
/// Util function to deserialize a u64::max() fork epoch as `None`.
604+
fn deserialize_fork_epoch<'de, D>(deserializer: D) -> Result<Option<MaybeQuoted<Epoch>>, D::Error>
605+
where
606+
D: Deserializer<'de>,
607+
{
608+
let decoded: Option<MaybeQuoted<Epoch>> = serde::de::Deserialize::deserialize(deserializer)?;
609+
if let Some(fork_epoch) = decoded {
610+
if fork_epoch.value != Epoch::max_value() {
611+
return Ok(Some(fork_epoch));
612+
}
613+
}
614+
Ok(None)
615+
}
616+
585617
impl Config {
586618
/// Maps `self` to an identifier for an `EthSpec` instance.
587619
///
@@ -606,7 +638,7 @@ impl Config {
606638
altair_fork_version: spec.altair_fork_version,
607639
altair_fork_epoch: spec
608640
.altair_fork_epoch
609-
.map(|slot| MaybeQuoted { value: slot }),
641+
.map(|epoch| MaybeQuoted { value: epoch }),
610642

611643
seconds_per_slot: spec.seconds_per_slot,
612644
seconds_per_eth1_block: spec.seconds_per_eth1_block,

consensus/types/src/fork_context.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,13 @@ impl ForkContext {
2626
ChainSpec::compute_fork_digest(spec.genesis_fork_version, genesis_validators_root),
2727
)];
2828

29-
// Only add Altair to list of forks if it's enabled (i.e. spec.altair_fork_epoch != None)
29+
// Only add Altair to list of forks if it's enabled
30+
// Note: `altair_fork_epoch == None` implies altair hasn't been activated yet on the config.
3031
if spec.altair_fork_epoch.is_some() {
3132
fork_to_digest.push((
3233
ForkName::Altair,
3334
ChainSpec::compute_fork_digest(spec.altair_fork_version, genesis_validators_root),
34-
))
35+
));
3536
}
3637

3738
let fork_to_digest: HashMap<ForkName, [u8; 4]> = fork_to_digest.into_iter().collect();

0 commit comments

Comments
 (0)