Skip to content

Commit 5baabe6

Browse files
authored
Merge pull request #950 from openmina/fix/pubsub-optimizations
fix(p2p): Pubsub optimizations
2 parents 97e7eed + 205e0aa commit 5baabe6

File tree

14 files changed

+340
-131
lines changed

14 files changed

+340
-131
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mina-p2p-messages/examples/mina-types-converter.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::{
22
fs::File,
33
io::{self, Read, Write},
44
path::PathBuf,
5+
sync::Arc,
56
};
67

78
use anyhow::{bail, format_err, Result};
@@ -244,7 +245,7 @@ macro_rules! converter {
244245
fn converters() -> Vec<Converter> {
245246
vec![
246247
converter!("gossip", GossipNetMessageV2 =>
247-
("new-state", MinaBlockBlockStableV2),
248+
("new-state", Arc<MinaBlockBlockStableV2>),
248249
("snark-pool-diff", NetworkPoolSnarkPoolDiffVersionedStableV2),
249250
("tx-pool-diff", NetworkPoolTransactionPoolDiffVersionedStableV2),
250251
),

mina-p2p-messages/src/gossip.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::Arc;
2+
13
use binprot_derive::{BinProtRead, BinProtWrite};
24
use derive_more::{From, TryInto};
35
use serde::{Deserialize, Serialize};
@@ -9,7 +11,7 @@ use crate::{number::Int32, v2};
911
)]
1012
#[serde(tag = "type", content = "message", rename_all = "snake_case")]
1113
pub enum GossipNetMessageV2 {
12-
NewState(v2::MinaBlockBlockStableV2),
14+
NewState(Arc<v2::MinaBlockBlockStableV2>),
1315
SnarkPoolDiff {
1416
message: v2::NetworkPoolSnarkPoolDiffVersionedStableV2,
1517
nonce: Int32,

node/common/src/service/p2p.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,17 @@ impl webrtc::P2pServiceWebrtc for NodeService {
5757
self.p2p.sec_key.decrypt(other_pk, encrypted)
5858
}
5959

60+
#[cfg(not(feature = "p2p-webrtc"))]
61+
fn auth_encrypt_and_send(
62+
&mut self,
63+
peer_id: PeerId,
64+
other_pub_key: &PublicKey,
65+
auth: ConnectionAuth,
66+
) {
67+
let _ = (peer_id, other_pub_key, auth);
68+
}
69+
70+
#[cfg(feature = "p2p-webrtc")]
6071
fn auth_encrypt_and_send(
6172
&mut self,
6273
peer_id: PeerId,
@@ -65,7 +76,6 @@ impl webrtc::P2pServiceWebrtc for NodeService {
6576
) {
6677
let encrypted = auth.encrypt(&self.p2p.sec_key, other_pub_key, &mut self.rng);
6778
if let Some(peer) = self.peers().get(&peer_id) {
68-
#[cfg(feature = "p2p-webrtc")]
6979
let _ = peer
7080
.cmd_sender
7181
.send(webrtc::PeerCmd::ConnectionAuthorizationSend(encrypted));

node/src/action_kind.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,7 @@ pub enum ActionKind {
407407
P2pNetworkPubsubNewStream,
408408
P2pNetworkPubsubOutgoingData,
409409
P2pNetworkPubsubOutgoingMessage,
410+
P2pNetworkPubsubOutgoingMessageClear,
410411
P2pNetworkPubsubOutgoingMessageError,
411412
P2pNetworkPubsubPrune,
412413
P2pNetworkPubsubSign,
@@ -686,7 +687,7 @@ pub enum ActionKind {
686687
}
687688

688689
impl ActionKind {
689-
pub const COUNT: u16 = 577;
690+
pub const COUNT: u16 = 578;
690691
}
691692

692693
impl std::fmt::Display for ActionKind {
@@ -1898,6 +1899,7 @@ impl ActionKindGet for P2pNetworkPubsubAction {
18981899
Self::SignError { .. } => ActionKind::P2pNetworkPubsubSignError,
18991900
Self::BroadcastSigned { .. } => ActionKind::P2pNetworkPubsubBroadcastSigned,
19001901
Self::OutgoingMessage { .. } => ActionKind::P2pNetworkPubsubOutgoingMessage,
1902+
Self::OutgoingMessageClear { .. } => ActionKind::P2pNetworkPubsubOutgoingMessageClear,
19011903
Self::OutgoingMessageError { .. } => ActionKind::P2pNetworkPubsubOutgoingMessageError,
19021904
Self::OutgoingData { .. } => ActionKind::P2pNetworkPubsubOutgoingData,
19031905
}

node/src/block_producer/block_producer_reducer.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use openmina_core::{
99
global_sub_window, in_same_checkpoint_window, in_seed_update_range, relative_sub_window,
1010
},
1111
};
12+
use p2p::P2pNetworkPubsubAction;
1213
use redux::{callback, Dispatcher, Timestamp};
1314

1415
use crate::{
@@ -357,7 +358,11 @@ impl BlockProducerEnabled {
357358
bug_condition!("Invalid state for `BlockProducerAction::BlockInjected` expected: `BlockProducerCurrentState::Produced`, found: {:?}", state.current);
358359
}
359360

360-
let dispatcher = state_context.into_dispatcher();
361+
let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
362+
363+
#[cfg(feature = "p2p-libp2p")]
364+
broadcast_injected_block(global_state, dispatcher);
365+
361366
dispatcher.push(BlockProducerAction::WonSlotSearch);
362367
}
363368
}
@@ -748,6 +753,24 @@ impl BlockProducerEnabled {
748753
}
749754
}
750755

756+
#[cfg(feature = "p2p-libp2p")]
757+
fn broadcast_injected_block(global_state: &State, dispatcher: &mut Dispatcher<Action, State>) {
758+
use mina_p2p_messages::gossip::GossipNetMessageV2;
759+
760+
let Some(block) = global_state
761+
.block_producer
762+
.as_ref()
763+
.and_then(|bp| bp.current.injected_block())
764+
.map(|pb| pb.block.clone())
765+
else {
766+
// Should be impossible, we call this immediately after having injected the block.
767+
return;
768+
};
769+
770+
let message = Box::new(GossipNetMessageV2::NewState(block));
771+
dispatcher.push(P2pNetworkPubsubAction::Broadcast { message });
772+
}
773+
751774
fn can_apply_supercharged_coinbase(
752775
block_stake_winner: &v2::NonZeroCurvePoint,
753776
stake_proof_sparse_ledger: &v2::MinaBaseSparseLedgerBaseStableV2,

node/src/block_producer/block_producer_state.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,13 @@ impl BlockProducerCurrentState {
372372
}
373373
}
374374

375+
pub fn injected_block(&self) -> Option<&ArcBlockWithHash> {
376+
match self {
377+
Self::Injected { block, .. } => Some(block),
378+
_ => None,
379+
}
380+
}
381+
375382
pub fn produced_block_with_chain(&self) -> Option<(&ArcBlockWithHash, &[AppliedBlock])> {
376383
match self {
377384
Self::Produced { chain, block, .. } => Some((block, chain)),

node/testing/src/scenarios/multi_node/pubsub_advanced.rs

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,20 @@
1-
use std::time::Duration;
1+
use std::{
2+
str,
3+
sync::{Arc, Mutex},
4+
time::Duration,
5+
};
26

3-
use mina_p2p_messages::v2;
4-
use node::transition_frontier::genesis::{GenesisConfig, NonStakers};
7+
use mina_p2p_messages::{binprot::BinProtRead, gossip, v2};
8+
use node::{
9+
p2p::{P2pNetworkAction, P2pNetworkPubsubAction, PeerId},
10+
transition_frontier::genesis::{GenesisConfig, NonStakers},
11+
Action, ActionWithMeta, P2pAction,
12+
};
513

614
use crate::{
715
node::Recorder,
816
scenarios::{ClusterRunner, RunCfgAdvanceTime},
17+
service::NodeTestingService,
918
simulator::{Simulator, SimulatorConfig, SimulatorRunUntil},
1019
};
1120

@@ -20,9 +29,58 @@ use crate::{
2029
pub struct MultiNodePubsubPropagateBlock;
2130

2231
impl MultiNodePubsubPropagateBlock {
23-
const WORKERS: usize = 20;
32+
const WORKERS: usize = 10;
2433

2534
pub async fn run(self, mut runner: ClusterRunner<'_>) {
35+
let graph = Arc::new(Mutex::new("digraph {\n".to_owned()));
36+
let factory = || {
37+
let graph = graph.clone();
38+
move |_id,
39+
state: &node::State,
40+
_service: &NodeTestingService,
41+
action: &ActionWithMeta| {
42+
let this = state.p2p.my_id();
43+
44+
let cut = |peer_id: &PeerId| {
45+
let st = peer_id.to_string();
46+
let len = st.len();
47+
st[(len - 6)..len].to_owned()
48+
};
49+
let this = cut(&this);
50+
51+
match action.action() {
52+
Action::P2p(P2pAction::Network(P2pNetworkAction::Pubsub(
53+
P2pNetworkPubsubAction::OutgoingMessage { peer_id },
54+
))) => {
55+
let pubsub_state =
56+
&state.p2p.ready().unwrap().network.scheduler.broadcast_state;
57+
let msg = &pubsub_state.clients.get(peer_id).unwrap().message;
58+
59+
for publish_message in &msg.publish {
60+
let mut slice = &publish_message.data()[8..];
61+
if let Ok(gossip::GossipNetMessageV2::NewState(block)) =
62+
gossip::GossipNetMessageV2::binprot_read(&mut slice)
63+
{
64+
let height = block
65+
.header
66+
.protocol_state
67+
.body
68+
.consensus_state
69+
.global_slot();
70+
let mut lock = graph.lock().unwrap();
71+
*lock = format!(
72+
"{lock} \"{this}\" -> \"{}\" [label=\"{height}\"];\n",
73+
cut(peer_id)
74+
);
75+
}
76+
}
77+
false
78+
}
79+
_ => false,
80+
}
81+
}
82+
};
83+
2684
let initial_time = redux::Timestamp::global_now();
2785
let mut constants = v2::PROTOCOL_CONSTANTS.clone();
2886
constants.genesis_state_timestamp =
@@ -40,11 +98,15 @@ impl MultiNodePubsubPropagateBlock {
4098
snark_workers: 1,
4199
block_producers: 1,
42100
advance_time: RunCfgAdvanceTime::Rand(1..=200),
43-
run_until: SimulatorRunUntil::BlockchainLength(3),
101+
run_until: SimulatorRunUntil::BlockchainLength(4),
44102
run_until_timeout: Duration::from_secs(10 * 60),
45103
recorder: Recorder::StateWithInputActions,
46104
};
47105
let mut simulator = Simulator::new(initial_time, config);
48-
simulator.setup_and_run(&mut runner).await;
106+
simulator
107+
.setup_and_run_with_listener(&mut runner, factory)
108+
.await;
109+
110+
println!("{}}}\n", graph.lock().unwrap());
49111
}
50112
}

node/testing/src/simulator/mod.rs

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@ use mina_p2p_messages::v2::{
77

88
use std::{collections::BTreeSet, time::Duration};
99

10-
use node::{ActionKind, BlockProducerConfig, SnarkerConfig, SnarkerStrategy, State};
10+
use node::{
11+
ActionKind, ActionWithMeta, BlockProducerConfig, SnarkerConfig, SnarkerStrategy, State,
12+
};
1113

1214
use crate::{
1315
cluster::ClusterNodeId,
1416
node::{Node, RustNodeBlockProducerTestingConfig, RustNodeTestingConfig},
1517
scenario::ListenerNode,
1618
scenarios::{ClusterRunner, RunCfg},
19+
service::NodeTestingService,
1720
};
1821

1922
pub struct Simulator {
@@ -217,19 +220,46 @@ impl Simulator {
217220
self.wait_for_all_nodes_synced(runner).await;
218221
}
219222

223+
pub async fn setup_and_run_with_listener<'a, AL, ALF>(
224+
&mut self,
225+
runner: &mut ClusterRunner<'a>,
226+
listener: ALF,
227+
) where
228+
ALF: FnMut() -> AL,
229+
AL: 'static
230+
+ Send
231+
+ FnMut(ClusterNodeId, &State, &NodeTestingService, &ActionWithMeta) -> bool,
232+
{
233+
self.setup(runner).await;
234+
self.run_with_listener(runner, listener).await;
235+
}
236+
237+
pub async fn setup_and_run<'a>(&mut self, runner: &mut ClusterRunner<'a>) {
238+
self.setup(runner).await;
239+
self.run_with_listener(runner, || |_, _, _, _| false).await;
240+
}
241+
220242
pub async fn setup<'a>(&mut self, runner: &mut ClusterRunner<'a>) {
221243
self.set_up_seed_nodes(runner).await;
222244
self.set_up_normal_nodes(runner).await;
223245
self.set_up_snark_worker_nodes(runner).await;
224246
self.set_up_block_producer_nodes(runner).await;
225247
}
226248

227-
pub async fn setup_and_run<'a>(&mut self, runner: &mut ClusterRunner<'a>) {
228-
self.setup(runner).await;
229-
self.run(runner).await;
249+
pub async fn run<'a>(&mut self, runner: &mut ClusterRunner<'a>) {
250+
self.run_with_listener(runner, || |_, _, _, _| false).await;
230251
}
231252

232-
pub async fn run<'a>(&mut self, runner: &mut ClusterRunner<'a>) {
253+
pub async fn run_with_listener<'a, AL, ALF>(
254+
&mut self,
255+
runner: &mut ClusterRunner<'a>,
256+
mut listener: ALF,
257+
) where
258+
ALF: FnMut() -> AL,
259+
AL: 'static
260+
+ Send
261+
+ FnMut(ClusterNodeId, &State, &NodeTestingService, &ActionWithMeta) -> bool,
262+
{
233263
let run_until = self.config.run_until.clone();
234264
let advance_time = self.config.advance_time.clone();
235265
let start_t = *self.start_t.get_or_insert_with(redux::Instant::now);
@@ -238,13 +268,11 @@ impl Simulator {
238268

239269
while start_t.elapsed() < self.config.run_until_timeout {
240270
tokio::task::yield_now().await;
241-
let _ = runner
242-
.run(
243-
RunCfg::default()
244-
.advance_time(advance_time.clone())
245-
.timeout(Duration::ZERO),
246-
)
247-
.await;
271+
let cfg = RunCfg::default()
272+
.advance_time(advance_time.clone())
273+
.timeout(Duration::ZERO)
274+
.action_handler(listener());
275+
let _ = runner.run(cfg).await;
248276

249277
let printed_elapsed_time = {
250278
let state = runner.nodes_iter().next().unwrap().1.state();

p2p/src/channels/best_tip/p2p_channels_best_tip_reducer.rs

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use redux::ActionWithMeta;
33

44
use crate::{
55
channels::{ChannelId, ChannelMsg, MsgId, P2pChannelsEffectfulAction},
6-
P2pNetworkPubsubAction, P2pPeerAction, P2pState, PeerId,
6+
P2pPeerAction, P2pState, PeerId,
77
};
88

99
use super::{
@@ -162,23 +162,6 @@ impl P2pChannelsBestTipState {
162162
return Ok(());
163163
}
164164

165-
#[cfg(feature = "p2p-libp2p")]
166-
{
167-
use mina_p2p_messages::gossip::GossipNetMessageV2;
168-
let block = (*best_tip.block).clone();
169-
let message = Box::new(GossipNetMessageV2::NewState(block));
170-
// TODO(vlad): `P2pChannelsBestTipAction::ResponseSend`
171-
// action is dispatched for each peer. So `P2pNetworkPubsubAction::Broadcast`
172-
// will be called many times causing many duplicate
173-
// broadcasts. Either in pubsub state machine, we
174-
// need to filter out duplicate messages, or better,
175-
// have a simple action to send pubsub message to a
176-
// specific peer instead of sending to everyone.
177-
// That way we can avoid duplicate state, since we
178-
// already store last sent best tip here and we make
179-
// sure we don't send same block to same peer again.
180-
dispatcher.push(P2pNetworkPubsubAction::Broadcast { message });
181-
}
182165
Ok(())
183166
}
184167
}

0 commit comments

Comments
 (0)