Skip to content

Commit b5c83e8

Browse files
committed
feat(p2p/libp2p): rebroadcast gossip messages received on webrtc to libp2p peers
1 parent 60d28a9 commit b5c83e8

15 files changed

+185
-49
lines changed

node/src/action_kind.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,7 @@ pub enum ActionKind {
425425
P2pNetworkPubsubSignError,
426426
P2pNetworkPubsubValidateIncomingMessage,
427427
P2pNetworkPubsubValidateIncomingMessages,
428+
P2pNetworkPubsubWebRtcRebroadcast,
428429
P2pNetworkPubsubEffectfulSign,
429430
P2pNetworkPubsubEffectfulValidateIncomingMessages,
430431
P2pNetworkRpcHeartbeatSend,
@@ -717,7 +718,7 @@ pub enum ActionKind {
717718
}
718719

719720
impl ActionKind {
720-
pub const COUNT: u16 = 607;
721+
pub const COUNT: u16 = 608;
721722
}
722723

723724
impl std::fmt::Display for ActionKind {
@@ -1961,6 +1962,7 @@ impl ActionKindGet for P2pNetworkPubsubAction {
19611962
}
19621963
Self::Graft { .. } => ActionKind::P2pNetworkPubsubGraft,
19631964
Self::Prune { .. } => ActionKind::P2pNetworkPubsubPrune,
1965+
Self::WebRtcRebroadcast { .. } => ActionKind::P2pNetworkPubsubWebRtcRebroadcast,
19641966
Self::Broadcast { .. } => ActionKind::P2pNetworkPubsubBroadcast,
19651967
Self::Sign { .. } => ActionKind::P2pNetworkPubsubSign,
19661968
Self::SignError { .. } => ActionKind::P2pNetworkPubsubSignError,

node/src/snark_pool/snark_pool_reducer.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -202,15 +202,13 @@ impl SnarkPoolState {
202202
}
203203
}
204204

205-
// TODO: we only rebroadcast locally produced snarks here.
206-
// libp2p logic already broadcasts everything right now and doesn't
205+
// TODO: libp2p logic already broadcasts everything right now and doesn't
207206
// wait for validation, thad needs to be fixed. See #952
208-
if *is_sender_local {
209-
dispatcher.push(P2pChannelsSnarkAction::Libp2pBroadcast {
210-
snark: snark.clone(),
211-
nonce: 0,
212-
});
213-
}
207+
dispatcher.push(P2pChannelsSnarkAction::Libp2pBroadcast {
208+
snark: snark.clone(),
209+
nonce: 0,
210+
is_local: *is_sender_local,
211+
});
214212
}
215213
SnarkPoolAction::P2pSendAll { .. } => {
216214
let (dispatcher, global_state) = state_context.into_dispatcher_and_state();

node/src/transaction_pool/transaction_pool_actions.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ pub enum TransactionPoolAction {
6969
Rebroadcast {
7070
accepted: Vec<ValidCommandWithHash>,
7171
rejected: Vec<(ValidCommandWithHash, diff::Error)>,
72+
is_local: bool,
7273
},
7374
CollectTransactionsByFee,
7475
#[action_event(level = trace)]
@@ -115,9 +116,9 @@ impl redux::EnablingCondition<crate::State> for TransactionPoolAction {
115116
last_index,
116117
)
117118
}),
118-
TransactionPoolAction::Rebroadcast { accepted, rejected } => {
119-
!(accepted.is_empty() && rejected.is_empty())
120-
}
119+
TransactionPoolAction::Rebroadcast {
120+
accepted, rejected, ..
121+
} => !(accepted.is_empty() && rejected.is_empty()),
121122
_ => true,
122123
}
123124
}

node/src/transaction_pool/transaction_pool_reducer.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -300,11 +300,14 @@ impl TransactionPoolState {
300300
if let Some(rpc_action) = rpc_action {
301301
dispatcher.push(rpc_action);
302302
}
303-
// TODO: we only rebroadcast locally injected transactions here.
304-
// libp2p logic already broadcasts everything right now and doesn't
303+
// TODO: libp2p logic already broadcasts everything right now and doesn't
305304
// wait for validation, thad needs to be fixed. See #952
306-
if is_sender_local && was_accepted {
307-
dispatcher.push(TransactionPoolAction::Rebroadcast { accepted, rejected });
305+
if was_accepted {
306+
dispatcher.push(TransactionPoolAction::Rebroadcast {
307+
accepted,
308+
rejected,
309+
is_local: is_sender_local,
310+
});
308311
}
309312
}
310313
TransactionPoolAction::ApplyTransitionFrontierDiff {
@@ -372,7 +375,11 @@ impl TransactionPoolState {
372375
);
373376
}
374377
}
375-
TransactionPoolAction::Rebroadcast { accepted, rejected } => {
378+
TransactionPoolAction::Rebroadcast {
379+
accepted,
380+
rejected,
381+
is_local,
382+
} => {
376383
let rejected = rejected.iter().map(|(cmd, _)| cmd.data.forget_check());
377384

378385
let all_commands = accepted
@@ -387,6 +394,7 @@ impl TransactionPoolState {
387394
dispatcher.push(P2pChannelsTransactionAction::Libp2pBroadcast {
388395
transaction: Box::new((&cmd).into()),
389396
nonce: 0,
397+
is_local: *is_local,
390398
});
391399
}
392400
}

node/src/transition_frontier/transition_frontier_effects.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
use mina_p2p_messages::gossip::GossipNetMessageV2;
12
use redux::Timestamp;
23

34
use crate::block_producer::BlockProducerAction;
45
use crate::consensus::ConsensusAction;
56
use crate::ledger::LEDGER_DEPTH;
67
use crate::p2p::channels::best_tip::P2pChannelsBestTipAction;
8+
use crate::p2p::P2pNetworkPubsubAction;
79
use crate::snark_pool::{SnarkPoolAction, SnarkWork};
810
use crate::stats::sync::SyncingLedger;
911
use crate::{Store, TransactionPoolAction};
@@ -305,6 +307,9 @@ fn synced_effects<S: crate::Service>(
305307
best_tip: best_tip.block.clone(),
306308
});
307309
}
310+
store.dispatch(P2pNetworkPubsubAction::WebRtcRebroadcast {
311+
message: GossipNetMessageV2::NewState(best_tip.block().clone()),
312+
});
308313

309314
let best_tip_hash = best_tip.merkle_root_hash().clone();
310315
store.dispatch(ConsensusAction::Prune);

p2p/src/channels/snark/p2p_channels_snark_actions.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ pub enum P2pChannelsSnarkAction {
5454
Libp2pBroadcast {
5555
snark: Snark,
5656
nonce: u32,
57+
is_local: bool,
5758
},
5859
}
5960

p2p/src/channels/snark/p2p_channels_snark_reducer.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,18 +210,26 @@ impl P2pChannelsSnarkState {
210210
}
211211
Ok(())
212212
}
213+
#[cfg(not(feature = "p2p-libp2p"))]
214+
P2pChannelsSnarkAction::Libp2pBroadcast { .. } => Ok(()),
213215
#[cfg(feature = "p2p-libp2p")]
214-
P2pChannelsSnarkAction::Libp2pBroadcast { snark, nonce } => {
216+
P2pChannelsSnarkAction::Libp2pBroadcast {
217+
snark,
218+
nonce,
219+
is_local,
220+
} => {
215221
let dispatcher = state_context.into_dispatcher();
216222
let message = Box::new((snark.statement(), (&snark).into()));
217223
let message = v2::NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(message);
218224
let nonce = nonce.into();
219225
let message = GossipNetMessageV2::SnarkPoolDiff { message, nonce };
220-
dispatcher.push(P2pNetworkPubsubAction::Broadcast { message });
226+
if is_local {
227+
dispatcher.push(P2pNetworkPubsubAction::Broadcast { message });
228+
} else {
229+
dispatcher.push(P2pNetworkPubsubAction::WebRtcRebroadcast { message });
230+
}
221231
Ok(())
222232
}
223-
#[cfg(not(feature = "p2p-libp2p"))]
224-
P2pChannelsSnarkAction::Libp2pBroadcast { .. } => Ok(()),
225233
P2pChannelsSnarkAction::Libp2pReceived { peer_id, snark, .. } => {
226234
let (dispatcher, state) = state_context.into_dispatcher_and_state();
227235
let p2p_state: &P2pState = state.substate()?;

p2p/src/channels/transaction/p2p_channels_transaction_actions.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ pub enum P2pChannelsTransactionAction {
5252
Libp2pBroadcast {
5353
transaction: Box<Transaction>,
5454
nonce: u32,
55+
is_local: bool,
5556
},
5657
}
5758

p2p/src/channels/transaction/p2p_channels_transaction_reducer.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,14 +230,22 @@ impl P2pChannelsTransactionState {
230230
#[cfg(not(feature = "p2p-libp2p"))]
231231
P2pChannelsTransactionAction::Libp2pBroadcast { .. } => Ok(()),
232232
#[cfg(feature = "p2p-libp2p")]
233-
P2pChannelsTransactionAction::Libp2pBroadcast { transaction, nonce } => {
233+
P2pChannelsTransactionAction::Libp2pBroadcast {
234+
transaction,
235+
nonce,
236+
is_local,
237+
} => {
234238
let dispatcher = state_context.into_dispatcher();
235239
let message = v2::NetworkPoolTransactionPoolDiffVersionedStableV2(
236240
std::iter::once(*transaction).collect(),
237241
);
238242
let nonce = nonce.into();
239243
let message = GossipNetMessageV2::TransactionPoolDiff { message, nonce };
240-
dispatcher.push(P2pNetworkPubsubAction::Broadcast { message });
244+
if is_local {
245+
dispatcher.push(P2pNetworkPubsubAction::Broadcast { message });
246+
} else {
247+
dispatcher.push(P2pNetworkPubsubAction::WebRtcRebroadcast { message });
248+
}
241249
Ok(())
242250
}
243251
}

p2p/src/identity/secret_key.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,15 @@ impl SecretKey {
178178
pub fn libp2p_pubsub_sign(&mut self, msg: &[u8]) -> Signature {
179179
self.sign(&[b"libp2p-pubsub:", msg].concat())
180180
}
181+
182+
pub fn libp2p_pubsub_pb_message_sign(
183+
&mut self,
184+
msg: &crate::pb::Message,
185+
) -> Result<Signature, prost::EncodeError> {
186+
let mut buf = Vec::new();
187+
prost::Message::encode(msg, &mut buf)?;
188+
Ok(self.libp2p_pubsub_sign(&buf))
189+
}
181190
}
182191

183192
pub trait EncryptableType: Serialize + for<'a> Deserialize<'a> {

0 commit comments

Comments
 (0)