Skip to content

Commit dd3bca3

Browse files
authored
Merge pull request #611 from openmina/feat/p2p/meshsub
Pubsub Bugs
2 parents 7160931 + a6f0234 commit dd3bca3

File tree

5 files changed

+66
-74
lines changed

5 files changed

+66
-74
lines changed
Lines changed: 30 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
use std::{iter, str::FromStr, time::Duration};
1+
use std::time::Duration;
22

3-
use node::{account::AccountSecretKey, BlockProducerConfig};
3+
use mina_p2p_messages::v2;
4+
use node::transition_frontier::genesis::{GenesisConfig, NonStakers};
45

56
use crate::{
6-
node::{RustNodeBlockProducerTestingConfig, RustNodeTestingConfig},
7-
scenario::ListenerNode,
8-
scenarios::ClusterRunner,
7+
node::Recorder,
8+
scenarios::{ClusterRunner, RunCfgAdvanceTime},
9+
simulator::{Simulator, SimulatorConfig, SimulatorRunUntil},
910
};
1011

1112
/// Create and Sync up 50 nodes, one amoung them is block producer.
@@ -19,66 +20,31 @@ use crate::{
1920
pub struct MultiNodePubsubPropagateBlock;
2021

2122
impl MultiNodePubsubPropagateBlock {
22-
const WORKERS: usize = 4;
23+
const WORKERS: usize = 20;
2324

2425
pub async fn run(self, mut runner: ClusterRunner<'_>) {
25-
// let seed_node = ListenerNode::Custom(
26-
// "/ip4/34.135.63.47/tcp/10001/p2p/12D3KooWLjs54xHzVmMmGYb7W5RVibqbwD1co7M2ZMfPgPm7iAag"
27-
// .parse()
28-
// .unwrap(),
29-
// );
30-
31-
let seed_config = RustNodeTestingConfig::devnet_default()
32-
.max_peers(100)
33-
.ask_initial_peers_interval(Duration::from_secs(60 * 60));
34-
35-
let seed_node = ListenerNode::Rust(runner.add_rust_node(seed_config));
36-
37-
// for account B62qrztYfPinaKqpXaYGY6QJ3SSW2NNKs7SajBLF1iFNXW9BoALN2Aq
38-
let sec_key =
39-
AccountSecretKey::from_str("EKEEpMELfQkMbJDt2fB4cFXKwSf1x4t7YD4twREy5yuJ84HBZtF9")
40-
.unwrap();
41-
42-
let producer_node = runner.add_rust_node(RustNodeTestingConfig {
43-
initial_time: redux::Timestamp::ZERO,
44-
genesis: node::config::DEVNET_CONFIG.clone(),
45-
max_peers: 10,
46-
ask_initial_peers_interval: Duration::from_secs(60 * 60),
47-
initial_peers: vec![seed_node.clone()],
48-
peer_id: Default::default(),
49-
block_producer: Some(RustNodeBlockProducerTestingConfig {
50-
config: BlockProducerConfig {
51-
pub_key: sec_key.public_key().into(),
52-
custom_coinbase_receiver: None,
53-
proposed_protocol_version: None,
54-
},
55-
sec_key,
56-
}),
57-
snark_worker: None,
58-
timeouts: Default::default(),
59-
libp2p_port: None,
60-
recorder: Default::default(),
61-
});
62-
63-
tokio::time::sleep(Duration::from_secs(2)).await;
64-
65-
eprintln!("Producer node connected");
66-
67-
let worker_config = RustNodeTestingConfig::devnet_default()
68-
.initial_peers(vec![seed_node])
69-
.max_peers(10)
70-
.ask_initial_peers_interval(Duration::from_secs(60 * 60));
71-
let workers = iter::repeat(worker_config)
72-
.take(Self::WORKERS)
73-
.map(|config| runner.add_rust_node(config))
74-
.collect::<Vec<_>>();
75-
76-
runner
77-
.run_until_nodes_synced(Duration::from_secs(10 * 60), &workers)
78-
.await
79-
.unwrap();
80-
81-
let _ = producer_node;
82-
// TODO:
26+
let initial_time = redux::Timestamp::global_now();
27+
let mut constants = v2::PROTOCOL_CONSTANTS.clone();
28+
constants.genesis_state_timestamp =
29+
v2::BlockTimeTimeStableV1((u64::from(initial_time) / 1_000_000).into());
30+
let genesis_cfg = GenesisConfig::Counts {
31+
whales: 1,
32+
fish: 0,
33+
non_stakers: NonStakers::None,
34+
constants,
35+
};
36+
let config = SimulatorConfig {
37+
genesis: genesis_cfg.into(),
38+
seed_nodes: 1,
39+
normal_nodes: Self::WORKERS,
40+
snark_workers: 1,
41+
block_producers: 1,
42+
advance_time: RunCfgAdvanceTime::Rand(1..=200),
43+
run_until: SimulatorRunUntil::BlockchainLength(3),
44+
run_until_timeout: Duration::from_secs(10 * 60),
45+
recorder: Recorder::StateWithInputActions,
46+
};
47+
let mut simulator = Simulator::new(initial_time, config);
48+
simulator.run(&mut runner).await;
8349
}
8450
}

node/testing/tests/multi_node.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ scenario_test!(
1717
MultiNodeBasicConnectivityInitialJoining
1818
);
1919

20-
// #[cfg(feature = "p2p-libp2p")]
21-
// scenario_test!(
22-
// propagate_block,
23-
// openmina_node_testing::scenarios::multi_node::pubsub_advanced::MultiNodePubsubPropagateBlock,
24-
// openmina_node_testing::scenarios::multi_node::pubsub_advanced::MultiNodePubsubPropagateBlock
25-
// );
20+
#[cfg(feature = "p2p-libp2p")]
21+
scenario_test!(
22+
propagate_block,
23+
openmina_node_testing::scenarios::multi_node::pubsub_advanced::MultiNodePubsubPropagateBlock,
24+
openmina_node_testing::scenarios::multi_node::pubsub_advanced::MultiNodePubsubPropagateBlock
25+
);

p2p/src/network/pubsub/p2p_network_pubsub_effects.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@ impl P2pNetworkPubsubAction {
2323
let state = &store.state().network.scheduler.broadcast_state;
2424
let config = &store.state().config;
2525

26+
// let this = config.identity_pub_key.peer_id();
27+
2628
match self {
2729
P2pNetworkPubsubAction::NewStream {
2830
peer_id, incoming, ..
2931
} => {
32+
// println!("(pubsub) {this} new stream {peer_id} {incoming}");
3033
if !incoming {
3134
let subscrption = {
3235
let msg = pb::Rpc {
@@ -65,7 +68,7 @@ impl P2pNetworkPubsubAction {
6568
ihave: vec![],
6669
iwant: vec![],
6770
graft: vec![pb::ControlGraft {
68-
topic_id: Some(dbg!(topic_id.clone())),
71+
topic_id: Some(topic_id.clone()),
6972
}],
7073
prune: vec![],
7174
}),
@@ -82,7 +85,7 @@ impl P2pNetworkPubsubAction {
8285
iwant: vec![],
8386
graft: vec![],
8487
prune: vec![pb::ControlPrune {
85-
topic_id: Some(dbg!(topic_id.clone())),
88+
topic_id: Some(topic_id.clone()),
8689
peers: vec![pb::PeerInfo {
8790
peer_id: None,
8891
signed_peer_record: None,
@@ -95,6 +98,7 @@ impl P2pNetworkPubsubAction {
9598
store.dispatch(P2pNetworkPubsubAction::OutgoingMessage { msg, peer_id });
9699
}
97100
P2pNetworkPubsubAction::Broadcast { message } => {
101+
// println!("(pubsub) {this} broadcast");
98102
let mut buffer = vec![0; 8];
99103
binprot::BinProtWrite::binprot_write(&message, &mut buffer).expect("msg");
100104
let len = buffer.len() - 8;
@@ -117,6 +121,8 @@ impl P2pNetworkPubsubAction {
117121
}
118122
P2pNetworkPubsubAction::BroadcastSigned { .. } => broadcast(store),
119123
P2pNetworkPubsubAction::IncomingData { peer_id, .. } => {
124+
// println!("(pubsub) {this} <- {peer_id}");
125+
120126
let incoming_block = state.incoming_block.as_ref().cloned();
121127
let incoming_transactions = state.incoming_transactions.clone();
122128
let incoming_snarks = state.incoming_snarks.clone();
@@ -162,6 +168,16 @@ impl P2pNetworkPubsubAction {
162168
}
163169
P2pNetworkPubsubAction::OutgoingMessage { msg, peer_id } => {
164170
if !message_is_empty(&msg) {
171+
// println!(
172+
// "(pubsub) {this} -> {peer_id}, {:?}, {:?}, {}",
173+
// msg.subscriptions,
174+
// msg.control,
175+
// msg.publish.len()
176+
// );
177+
// for ele in &msg.publish {
178+
// let id = super::p2p_network_pubsub_state::compute_message_id(ele);
179+
// println!("{}", std::str::from_utf8(&id).unwrap());
180+
// }
165181
let mut data = vec![];
166182
if prost::Message::encode_length_delimited(&msg, &mut data).is_ok() {
167183
store.dispatch(P2pNetworkPubsubAction::OutgoingData {

p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ impl P2pNetworkPubsubState {
3737
});
3838
state.protocol = *protocol;
3939
state.addr = *addr;
40+
41+
self.topics
42+
.insert(super::TOPIC.to_owned(), Default::default());
4043
}
4144
P2pNetworkPubsubAction::NewStream {
4245
incoming: false,
@@ -83,6 +86,13 @@ impl P2pNetworkPubsubState {
8386
match <pb::Rpc as prost::Message>::decode_length_delimited(slice) {
8487
Ok(v) => {
8588
state.buffer.clear();
89+
// println!(
90+
// "(pubsub) this <- {peer_id}, {:?}, {:?}, {}",
91+
// v.subscriptions,
92+
// v.control,
93+
// v.publish.len()
94+
// );
95+
8696
for subscription in v.subscriptions {
8797
let topic_id = subscription.topic_id().to_owned();
8898
let topic = self.topics.entry(topic_id).or_default();
@@ -169,7 +179,7 @@ impl P2pNetworkPubsubState {
169179
for graft in &control.graft {
170180
if let Some(mesh_state) = self
171181
.topics
172-
.get_mut(dbg!(graft.topic_id()))
182+
.get_mut(graft.topic_id())
173183
.and_then(|m| m.get_mut(peer_id))
174184
{
175185
mesh_state.mesh = P2pNetworkPubsubClientMeshAddingState::Added;

p2p/src/p2p_config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl Default for P2pMeshsubConfig {
5757
outbound_degree_desired: 6,
5858
outbound_degree_low: 4,
5959
outbound_degree_high: 12,
60-
mcache_len: 5,
60+
mcache_len: 256,
6161
}
6262
}
6363
}

0 commit comments

Comments
 (0)