Skip to content

Commit bc2a99a

Browse files
authored
Merge of #6849
2 parents a088b0b + 43ae790 commit bc2a99a

File tree

5 files changed

+62
-50
lines changed

5 files changed

+62
-50
lines changed

beacon_node/lighthouse_network/src/service/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -708,11 +708,17 @@ impl<E: EthSpec> Network<E> {
708708
}
709709

710710
// Subscribe to core topics for the new fork
711-
for kind in fork_core_topics::<E>(&new_fork, &self.fork_context.spec) {
711+
for kind in fork_core_topics::<E>(
712+
&new_fork,
713+
&self.fork_context.spec,
714+
&self.network_globals.as_topic_config(),
715+
) {
712716
let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest);
713717
self.subscribe(topic);
714718
}
715719

720+
// TODO(das): unsubscribe from blob topics at the Fulu fork
721+
716722
// Register the new topics for metrics
717723
let topics_to_keep_metrics_for = attestation_sync_committee_topics::<E>()
718724
.map(|gossip_kind| {

beacon_node/lighthouse_network/src/types/globals.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
//! A collection of variables that are accessible outside of the network thread itself.
2+
use super::TopicConfig;
23
use crate::peer_manager::peerdb::PeerDB;
34
use crate::rpc::{MetaData, MetaDataV3};
45
use crate::types::{BackFillState, SyncState};
@@ -183,6 +184,14 @@ impl<E: EthSpec> NetworkGlobals<E> {
183184
.collect::<Vec<_>>()
184185
}
185186

187+
/// Returns the TopicConfig to compute the set of Gossip topics for a given fork
188+
pub fn as_topic_config(&self) -> TopicConfig {
189+
TopicConfig {
190+
subscribe_all_data_column_subnets: self.config.subscribe_all_data_column_subnets,
191+
sampling_subnets: &self.sampling_subnets,
192+
}
193+
}
194+
186195
/// TESTING ONLY. Build a dummy NetworkGlobals instance.
187196
pub fn new_test_globals(
188197
trusted_peers: Vec<PeerId>,

beacon_node/lighthouse_network/src/types/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,6 @@ pub use subnet::{Subnet, SubnetDiscovery};
1717
pub use sync_state::{BackFillState, SyncState};
1818
pub use topics::{
1919
attestation_sync_committee_topics, core_topics_to_subscribe, fork_core_topics,
20-
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, ALTAIR_CORE_TOPICS,
21-
BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
20+
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, TopicConfig,
21+
ALTAIR_CORE_TOPICS, BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
2222
};

beacon_node/lighthouse_network/src/types/topics.rs

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use gossipsub::{IdentTopic as Topic, TopicHash};
22
use serde::{Deserialize, Serialize};
3+
use std::collections::HashSet;
34
use strum::AsRefStr;
45
use types::{ChainSpec, DataColumnSubnetId, EthSpec, ForkName, SubnetId, SyncSubnetId, Unsigned};
56

@@ -41,8 +42,18 @@ pub const LIGHT_CLIENT_GOSSIP_TOPICS: [GossipKind; 2] = [
4142
GossipKind::LightClientOptimisticUpdate,
4243
];
4344

45+
#[derive(Debug)]
46+
pub struct TopicConfig<'a> {
47+
pub subscribe_all_data_column_subnets: bool,
48+
pub sampling_subnets: &'a HashSet<DataColumnSubnetId>,
49+
}
50+
4451
/// Returns the core topics associated with each fork that are new to the previous fork
45-
pub fn fork_core_topics<E: EthSpec>(fork_name: &ForkName, spec: &ChainSpec) -> Vec<GossipKind> {
52+
pub fn fork_core_topics<E: EthSpec>(
53+
fork_name: &ForkName,
54+
spec: &ChainSpec,
55+
topic_config: &TopicConfig,
56+
) -> Vec<GossipKind> {
4657
match fork_name {
4758
ForkName::Base => BASE_CORE_TOPICS.to_vec(),
4859
ForkName::Altair => ALTAIR_CORE_TOPICS.to_vec(),
@@ -64,7 +75,21 @@ pub fn fork_core_topics<E: EthSpec>(fork_name: &ForkName, spec: &ChainSpec) -> V
6475
}
6576
electra_blob_topics
6677
}
67-
ForkName::Fulu => vec![],
78+
ForkName::Fulu => {
79+
let mut topics = vec![];
80+
if topic_config.subscribe_all_data_column_subnets {
81+
for column_subnet in 0..spec.data_column_sidecar_subnet_count {
82+
topics.push(GossipKind::DataColumnSidecar(DataColumnSubnetId::new(
83+
column_subnet,
84+
)));
85+
}
86+
} else {
87+
for column_subnet in topic_config.sampling_subnets {
88+
topics.push(GossipKind::DataColumnSidecar(*column_subnet));
89+
}
90+
}
91+
topics
92+
}
6893
}
6994
}
7095

@@ -84,10 +109,11 @@ pub fn attestation_sync_committee_topics<E: EthSpec>() -> impl Iterator<Item = G
84109
pub fn core_topics_to_subscribe<E: EthSpec>(
85110
mut current_fork: ForkName,
86111
spec: &ChainSpec,
112+
topic_config: &TopicConfig,
87113
) -> Vec<GossipKind> {
88-
let mut topics = fork_core_topics::<E>(&current_fork, spec);
114+
let mut topics = fork_core_topics::<E>(&current_fork, spec, topic_config);
89115
while let Some(previous_fork) = current_fork.previous_fork() {
90-
let previous_fork_topics = fork_core_topics::<E>(&previous_fork, spec);
116+
let previous_fork_topics = fork_core_topics::<E>(&previous_fork, spec, topic_config);
91117
topics.extend(previous_fork_topics);
92118
current_fork = previous_fork;
93119
}
@@ -475,16 +501,23 @@ mod tests {
475501
type E = MainnetEthSpec;
476502
let spec = E::default_spec();
477503
let mut all_topics = Vec::new();
478-
let mut electra_core_topics = fork_core_topics::<E>(&ForkName::Electra, &spec);
479-
let mut deneb_core_topics = fork_core_topics::<E>(&ForkName::Deneb, &spec);
504+
let topic_config = TopicConfig {
505+
subscribe_all_data_column_subnets: false,
506+
sampling_subnets: &HashSet::from_iter([1, 2].map(DataColumnSubnetId::new)),
507+
};
508+
let mut fulu_core_topics = fork_core_topics::<E>(&ForkName::Fulu, &spec, &topic_config);
509+
let mut electra_core_topics =
510+
fork_core_topics::<E>(&ForkName::Electra, &spec, &topic_config);
511+
let mut deneb_core_topics = fork_core_topics::<E>(&ForkName::Deneb, &spec, &topic_config);
512+
all_topics.append(&mut fulu_core_topics);
480513
all_topics.append(&mut electra_core_topics);
481514
all_topics.append(&mut deneb_core_topics);
482515
all_topics.extend(CAPELLA_CORE_TOPICS);
483516
all_topics.extend(ALTAIR_CORE_TOPICS);
484517
all_topics.extend(BASE_CORE_TOPICS);
485518

486519
let latest_fork = *ForkName::list_all().last().unwrap();
487-
let core_topics = core_topics_to_subscribe::<E>(latest_fork, &spec);
520+
let core_topics = core_topics_to_subscribe::<E>(latest_fork, &spec, &topic_config);
488521
// Need to check all the topics exist in an order independent manner
489522
for topic in all_topics {
490523
assert!(core_topics.contains(&topic));

beacon_node/network/src/service.rs

Lines changed: 4 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ use task_executor::ShutdownReason;
3333
use tokio::sync::mpsc;
3434
use tokio::time::Sleep;
3535
use types::{
36-
ChainSpec, DataColumnSubnetId, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription,
37-
SyncSubnetId, Unsigned, ValidatorSubscription,
36+
ChainSpec, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId,
37+
Unsigned, ValidatorSubscription,
3838
};
3939

4040
mod tests;
@@ -181,8 +181,6 @@ pub struct NetworkService<T: BeaconChainTypes> {
181181
next_fork_subscriptions: Pin<Box<OptionFuture<Sleep>>>,
182182
/// A delay that expires when we need to unsubscribe from old fork topics.
183183
next_unsubscribe: Pin<Box<OptionFuture<Sleep>>>,
184-
/// Subscribe to all the data column subnets.
185-
subscribe_all_data_column_subnets: bool,
186184
/// Subscribe to all the subnets once synced.
187185
subscribe_all_subnets: bool,
188186
/// Shutdown beacon node after sync is complete.
@@ -349,7 +347,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
349347
next_fork_update,
350348
next_fork_subscriptions,
351349
next_unsubscribe,
352-
subscribe_all_data_column_subnets: config.subscribe_all_data_column_subnets,
353350
subscribe_all_subnets: config.subscribe_all_subnets,
354351
shutdown_after_sync: config.shutdown_after_sync,
355352
metrics_enabled: config.metrics_enabled,
@@ -717,6 +714,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
717714
for topic_kind in core_topics_to_subscribe::<T::EthSpec>(
718715
self.fork_context.current_fork(),
719716
&self.fork_context.spec,
717+
&self.network_globals.as_topic_config(),
720718
) {
721719
for fork_digest in self.required_gossip_fork_digests() {
722720
let topic = GossipTopic::new(
@@ -751,10 +749,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
751749
}
752750
}
753751

754-
if self.fork_context.spec.is_peer_das_scheduled() {
755-
self.subscribe_to_peer_das_topics(&mut subscribed_topics);
756-
}
757-
758752
// If we are to subscribe to all subnets we do it here
759753
if self.subscribe_all_subnets {
760754
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() {
@@ -801,37 +795,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
801795
}
802796
}
803797

804-
/// Keeping these separate from core topics because it has custom logic:
805-
/// 1. Data column subscription logic depends on subscription configuration.
806-
/// 2. Data column topic subscriptions will be dynamic based on validator balances due to
807-
/// validator custody.
808-
///
809-
/// TODO(das): The downside with not including it in core fork topic is - we subscribe to
810-
/// PeerDAS topics on startup if Fulu is scheduled, rather than waiting until the fork.
811-
/// If this is an issue we could potentially consider adding the logic to
812-
/// `network.subscribe_new_fork_topics()`.
813-
fn subscribe_to_peer_das_topics(&mut self, subscribed_topics: &mut Vec<GossipTopic>) {
814-
let column_subnets_to_subscribe = if self.subscribe_all_data_column_subnets {
815-
&(0..self.fork_context.spec.data_column_sidecar_subnet_count)
816-
.map(DataColumnSubnetId::new)
817-
.collect()
818-
} else {
819-
&self.network_globals.sampling_subnets
820-
};
821-
822-
for column_subnet in column_subnets_to_subscribe.iter() {
823-
for fork_digest in self.required_gossip_fork_digests() {
824-
let gossip_kind = Subnet::DataColumn(*column_subnet).into();
825-
let topic = GossipTopic::new(gossip_kind, GossipEncoding::default(), fork_digest);
826-
if self.libp2p.subscribe(topic.clone()) {
827-
subscribed_topics.push(topic);
828-
} else {
829-
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
830-
}
831-
}
832-
}
833-
}
834-
835798
/// Handle a message sent to the network service.
836799
async fn on_validator_subscription_msg(&mut self, msg: ValidatorSubscriptionMessage) {
837800
match msg {
@@ -947,6 +910,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
947910
let core_topics = core_topics_to_subscribe::<T::EthSpec>(
948911
self.fork_context.current_fork(),
949912
&self.fork_context.spec,
913+
&self.network_globals.as_topic_config(),
950914
);
951915
let core_topics: HashSet<&GossipKind> = HashSet::from_iter(&core_topics);
952916
let subscriptions = self.network_globals.gossipsub_subscriptions.read();

0 commit comments

Comments
 (0)