Skip to content

Commit a467f56

Browse files
authored
Merge pull request #576 from openmina/feat/p2p/meshsub
Implement meshsub
2 parents 22cae91 + df2f681 commit a467f56

File tree

16 files changed

+440
-75
lines changed

16 files changed

+440
-75
lines changed

node/native/src/node/builder.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use node::{
1313
daemon_json::Daemon,
1414
p2p::{
1515
channels::ChannelId, connection::outgoing::P2pConnectionOutgoingInitOpts,
16-
identity::SecretKey as P2pSecretKey, P2pLimits, P2pTimeouts,
16+
identity::SecretKey as P2pSecretKey, P2pLimits, P2pMeshsubConfig, P2pTimeouts,
1717
},
1818
service::Recorder,
1919
snark::{get_srs, get_verifier_index, VerifierIndex, VerifierKind, VerifierSRS},
@@ -296,9 +296,12 @@ impl NodeBuilder {
296296
ask_initial_peers_interval: Duration::from_secs(3600),
297297
enabled_channels: ChannelId::iter_all().collect(),
298298
peer_discovery: !self.p2p_no_discovery,
299-
initial_time: initial_time
300-
.checked_sub(redux::Timestamp::ZERO)
301-
.unwrap_or_default(),
299+
meshsub: P2pMeshsubConfig {
300+
initial_time: initial_time
301+
.checked_sub(redux::Timestamp::ZERO)
302+
.unwrap_or_default(),
303+
..Default::default()
304+
},
302305
timeouts: P2pTimeouts::default(),
303306
limits: P2pLimits::default().with_max_peers(Some(100)),
304307
},

node/src/action_kind.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,10 +305,12 @@ pub enum ActionKind {
305305
P2pNetworkPnetTimeout,
306306
P2pNetworkPubsubBroadcast,
307307
P2pNetworkPubsubBroadcastSigned,
308+
P2pNetworkPubsubGraft,
308309
P2pNetworkPubsubIncomingData,
309310
P2pNetworkPubsubNewStream,
310311
P2pNetworkPubsubOutgoingData,
311312
P2pNetworkPubsubOutgoingMessage,
313+
P2pNetworkPubsubPrune,
312314
P2pNetworkPubsubSign,
313315
P2pNetworkRpcHeartbeatSend,
314316
P2pNetworkRpcIncomingData,
@@ -534,7 +536,7 @@ pub enum ActionKind {
534536
}
535537

536538
impl ActionKind {
537-
pub const COUNT: u16 = 442;
539+
pub const COUNT: u16 = 444;
538540
}
539541

540542
impl std::fmt::Display for ActionKind {
@@ -1456,6 +1458,8 @@ impl ActionKindGet for P2pNetworkPubsubAction {
14561458
match self {
14571459
Self::NewStream { .. } => ActionKind::P2pNetworkPubsubNewStream,
14581460
Self::IncomingData { .. } => ActionKind::P2pNetworkPubsubIncomingData,
1461+
Self::Graft { .. } => ActionKind::P2pNetworkPubsubGraft,
1462+
Self::Prune { .. } => ActionKind::P2pNetworkPubsubPrune,
14591463
Self::Broadcast { .. } => ActionKind::P2pNetworkPubsubBroadcast,
14601464
Self::Sign { .. } => ActionKind::P2pNetworkPubsubSign,
14611465
Self::BroadcastSigned { .. } => ActionKind::P2pNetworkPubsubBroadcastSigned,

node/testing/src/cluster/mod.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use node::core::constants::constraint_constants;
2424
use node::core::log::system_time;
2525
use node::core::requests::RpcId;
2626
use node::core::{thread, warn};
27-
use node::p2p::{P2pConnectionEvent, P2pEvent, P2pLimits, PeerId};
27+
use node::p2p::{P2pConnectionEvent, P2pEvent, P2pLimits, P2pMeshsubConfig, PeerId};
2828
use node::snark::{VerifierIndex, VerifierSRS};
2929
use node::{
3030
event_source::Event,
@@ -286,10 +286,13 @@ impl Cluster {
286286
peer_discovery: true,
287287
timeouts: testing_config.timeouts,
288288
limits: P2pLimits::default().with_max_peers(Some(testing_config.max_peers)),
289-
initial_time: testing_config
290-
.initial_time
291-
.checked_sub(redux::Timestamp::ZERO)
292-
.unwrap_or_default(),
289+
meshsub: P2pMeshsubConfig {
290+
initial_time: testing_config
291+
.initial_time
292+
.checked_sub(redux::Timestamp::ZERO)
293+
.unwrap_or_default(),
294+
..Default::default()
295+
},
293296
},
294297
transition_frontier: TransitionFrontierConfig::new(testing_config.genesis),
295298
block_producer: block_producer_config,

node/testing/src/scenarios/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::scenario::{Scenario, ScenarioId, ScenarioStep};
2727

2828
use self::multi_node::basic_connectivity_initial_joining::MultiNodeBasicConnectivityInitialJoining;
2929
use self::multi_node::basic_connectivity_peer_discovery::MultiNodeBasicConnectivityPeerDiscovery;
30+
use self::multi_node::pubsub_advanced::MultiNodePubsubPropagateBlock;
3031
use self::multi_node::sync_4_block_producers::MultiNodeSync4BlockProducers;
3132
use self::multi_node::vrf_correct_ledgers::MultiNodeVrfGetCorrectLedgers;
3233
use self::multi_node::vrf_correct_slots::MultiNodeVrfGetCorrectSlots;
@@ -64,6 +65,7 @@ pub enum Scenarios {
6465
SimulationSmall(SimulationSmall),
6566
SimulationSmallForeverRealTime(SimulationSmallForeverRealTime),
6667
P2pReceiveBlock(P2pReceiveBlock),
68+
MultiNodePubsubPropagateBlock(MultiNodePubsubPropagateBlock),
6769
RecordReplayBootstrap(RecordReplayBootstrap),
6870
RecordReplayBlockProduction(RecordReplayBlockProduction),
6971
}
@@ -86,6 +88,7 @@ impl Scenarios {
8688
Self::MultiNodeBasicConnectivityPeerDiscovery(_) => cfg!(feature = "p2p-webrtc"),
8789
Self::SimulationSmall(_) => true,
8890
Self::SimulationSmallForeverRealTime(_) => true,
91+
Self::MultiNodePubsubPropagateBlock(_) => true, // in progress
8992
_ => false,
9093
}
9194
}
@@ -144,6 +147,7 @@ impl Scenarios {
144147
Self::SimulationSmall(_) => SimulationSmall::DOCS,
145148
Self::SimulationSmallForeverRealTime(_) => SimulationSmallForeverRealTime::DOCS,
146149
Self::P2pReceiveBlock(_) => P2pReceiveBlock::DOCS,
150+
Self::MultiNodePubsubPropagateBlock(_) => MultiNodePubsubPropagateBlock::DOCS,
147151
Self::RecordReplayBootstrap(_) => RecordReplayBootstrap::DOCS,
148152
Self::RecordReplayBlockProduction(_) => RecordReplayBlockProduction::DOCS,
149153
}
@@ -179,6 +183,7 @@ impl Scenarios {
179183
Self::SimulationSmall(v) => v.run(runner).await,
180184
Self::SimulationSmallForeverRealTime(v) => v.run(runner).await,
181185
Self::P2pReceiveBlock(v) => v.run(runner).await,
186+
Self::MultiNodePubsubPropagateBlock(v) => v.run(runner).await,
182187
Self::RecordReplayBootstrap(v) => v.run(runner).await,
183188
Self::RecordReplayBlockProduction(v) => v.run(runner).await,
184189
}

node/testing/src/scenarios/multi_node/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ pub mod basic_connectivity_peer_discovery;
55

66
#[cfg(feature = "p2p-libp2p")]
77
pub mod connection_discovery;
8+
#[cfg(feature = "p2p-libp2p")]
9+
pub mod pubsub_advanced;
810
pub mod vrf_correct_ledgers;
911
pub mod vrf_correct_slots;
1012
pub mod vrf_epoch_bounds_correct_ledgers;
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
use std::{iter, str::FromStr, time::Duration};
2+
3+
use node::{account::AccountSecretKey, BlockProducerConfig};
4+
5+
use crate::{
6+
node::{RustNodeBlockProducerTestingConfig, RustNodeTestingConfig},
7+
scenario::ListenerNode,
8+
scenarios::ClusterRunner,
9+
};
10+
11+
/// Create and Sync up 50 nodes, one amoung them is block producer.
12+
///
13+
/// 1. Create the nodes.
14+
/// 2. Connect them to each other.
15+
/// 3. Wait kademlia bootstrap is done, observe the connection graph.
16+
/// 4. Wait pubsub mesh construction is done, observe the mesh.
17+
/// 5. Wait block is produced and observe the propagation.
18+
#[derive(documented::Documented, Default, Clone, Copy)]
19+
pub struct MultiNodePubsubPropagateBlock;
20+
21+
impl MultiNodePubsubPropagateBlock {
22+
const WORKERS: usize = 4;
23+
24+
pub async fn run(self, mut runner: ClusterRunner<'_>) {
25+
// let seed_node = ListenerNode::Custom(
26+
// "/ip4/34.135.63.47/tcp/10001/p2p/12D3KooWLjs54xHzVmMmGYb7W5RVibqbwD1co7M2ZMfPgPm7iAag"
27+
// .parse()
28+
// .unwrap(),
29+
// );
30+
31+
let seed_config = RustNodeTestingConfig::devnet_default()
32+
.max_peers(100)
33+
.ask_initial_peers_interval(Duration::from_secs(60 * 60));
34+
35+
let seed_node = ListenerNode::Rust(runner.add_rust_node(seed_config));
36+
37+
// for account B62qrztYfPinaKqpXaYGY6QJ3SSW2NNKs7SajBLF1iFNXW9BoALN2Aq
38+
let sec_key =
39+
AccountSecretKey::from_str("EKEEpMELfQkMbJDt2fB4cFXKwSf1x4t7YD4twREy5yuJ84HBZtF9")
40+
.unwrap();
41+
42+
let producer_node = runner.add_rust_node(RustNodeTestingConfig {
43+
initial_time: redux::Timestamp::ZERO,
44+
genesis: node::config::DEVNET_CONFIG.clone(),
45+
max_peers: 10,
46+
ask_initial_peers_interval: Duration::from_secs(60 * 60),
47+
initial_peers: vec![seed_node.clone()],
48+
peer_id: Default::default(),
49+
block_producer: Some(RustNodeBlockProducerTestingConfig {
50+
config: BlockProducerConfig {
51+
pub_key: sec_key.public_key().into(),
52+
custom_coinbase_receiver: None,
53+
proposed_protocol_version: None,
54+
},
55+
sec_key,
56+
}),
57+
snark_worker: None,
58+
timeouts: Default::default(),
59+
libp2p_port: None,
60+
recorder: Default::default(),
61+
});
62+
63+
tokio::time::sleep(Duration::from_secs(2)).await;
64+
65+
eprintln!("Producer node connected");
66+
67+
let worker_config = RustNodeTestingConfig::devnet_default()
68+
.initial_peers(vec![seed_node])
69+
.max_peers(10)
70+
.ask_initial_peers_interval(Duration::from_secs(60 * 60));
71+
let workers = iter::repeat(worker_config)
72+
.take(Self::WORKERS)
73+
.map(|config| runner.add_rust_node(config))
74+
.collect::<Vec<_>>();
75+
76+
runner
77+
.run_until_nodes_synced(Duration::from_secs(10 * 60), &workers)
78+
.await
79+
.unwrap();
80+
81+
let _ = producer_node;
82+
// TODO:
83+
}
84+
}

node/testing/tests/multi_node.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,10 @@ scenario_test!(
1616
MultiNodeBasicConnectivityInitialJoining,
1717
MultiNodeBasicConnectivityInitialJoining
1818
);
19+
20+
// #[cfg(feature = "p2p-libp2p")]
21+
// scenario_test!(
22+
// propagate_block,
23+
// openmina_node_testing::scenarios::multi_node::pubsub_advanced::MultiNodePubsubPropagateBlock,
24+
// openmina_node_testing::scenarios::multi_node::pubsub_advanced::MultiNodePubsubPropagateBlock
25+
// );

node/web/src/node/builder.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use node::{
99
core::{consensus::ConsensusConstants, constants::constraint_constants},
1010
p2p::{
1111
channels::ChannelId, connection::outgoing::P2pConnectionOutgoingInitOpts,
12-
identity::SecretKey as P2pSecretKey, P2pLimits, P2pTimeouts,
12+
identity::SecretKey as P2pSecretKey, P2pLimits, P2pMeshsubConfig, P2pTimeouts,
1313
},
1414
snark::{get_srs, get_verifier_index, VerifierIndex, VerifierKind, VerifierSRS},
1515
transition_frontier::genesis::GenesisConfig,
@@ -227,9 +227,12 @@ impl NodeBuilder {
227227
ask_initial_peers_interval: Duration::from_secs(3600),
228228
enabled_channels: ChannelId::iter_all().collect(),
229229
peer_discovery: !self.p2p_no_discovery,
230-
initial_time: initial_time
231-
.checked_sub(redux::Timestamp::ZERO)
232-
.unwrap_or_default(),
230+
meshsub: P2pMeshsubConfig {
231+
initial_time: initial_time
232+
.checked_sub(redux::Timestamp::ZERO)
233+
.unwrap_or_default(),
234+
..Default::default()
235+
},
233236
timeouts: P2pTimeouts::default(),
234237
limits: P2pLimits::default().with_max_peers(Some(100)),
235238
},

p2p/src/network/pubsub/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ mod p2p_network_pubsub_actions;
66
pub use self::p2p_network_pubsub_actions::P2pNetworkPubsubAction;
77

88
mod p2p_network_pubsub_state;
9-
pub use self::p2p_network_pubsub_state::{P2pNetworkPubsubClientState, P2pNetworkPubsubState};
9+
pub use self::p2p_network_pubsub_state::{
10+
P2pNetworkPubsubClientState, P2pNetworkPubsubClientTopicState, P2pNetworkPubsubState,
11+
};
1012

1113
#[cfg(feature = "p2p-libp2p")]
1214
mod p2p_network_pubsub_reducer;

p2p/src/network/pubsub/p2p_network_pubsub_actions.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,15 @@ pub enum P2pNetworkPubsubAction {
1818
addr: ConnectionAddr,
1919
stream_id: StreamId,
2020
data: Data,
21+
seen_limit: usize,
22+
},
23+
Graft {
24+
peer_id: PeerId,
25+
topic_id: String,
26+
},
27+
Prune {
28+
peer_id: PeerId,
29+
topic_id: String,
2130
},
2231
Broadcast {
2332
message: Box<GossipNetMessageV2>,

0 commit comments

Comments
 (0)