Skip to content

Commit 7d40987

Browse files
committed
Added broadcast delay for snarks and transactions until they are fully vertified
1 parent 044ef77 commit 7d40987

13 files changed

+235
-68
lines changed

node/src/snark_pool/candidate/snark_pool_candidate_actions.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ pub enum SnarkPoolCandidateAction {
4949
WorkVerifyError {
5050
peer_id: PeerId,
5151
verify_id: SnarkWorkVerifyId,
52+
batch: Vec<Snark>,
5253
},
5354
WorkVerifySuccess {
5455
peer_id: PeerId,

node/src/snark_pool/candidate/snark_pool_candidate_reducer.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use openmina_core::snark::Snark;
55
use p2p::{
66
channels::rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcRequest},
77
disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
8-
PeerId,
8+
BroadcastMessageId, P2pNetworkPubsubAction, PeerId,
99
};
1010
use snark::{work_verify::SnarkWorkVerifyAction, work_verify_effectful::SnarkWorkVerifyId};
1111

@@ -124,10 +124,11 @@ impl SnarkPoolCandidatesState {
124124
}
125125
}),
126126
on_error: redux::callback!(
127-
on_snark_pool_candidate_work_verify_error((req_id: SnarkWorkVerifyId, sender: String)) -> crate::Action {
127+
on_snark_pool_candidate_work_verify_error((req_id: SnarkWorkVerifyId, sender: String, batch: Vec<Snark>)) -> crate::Action {
128128
SnarkPoolCandidateAction::WorkVerifyError {
129129
peer_id: sender.parse().unwrap(),
130130
verify_id: req_id,
131+
batch
131132
}
132133
}),
133134
});
@@ -144,7 +145,11 @@ impl SnarkPoolCandidatesState {
144145
} => {
145146
state.verify_pending(meta.time(), peer_id, *verify_id, job_ids);
146147
}
147-
SnarkPoolCandidateAction::WorkVerifyError { peer_id, verify_id } => {
148+
SnarkPoolCandidateAction::WorkVerifyError {
149+
peer_id,
150+
verify_id,
151+
batch,
152+
} => {
148153
state.verify_result(meta.time(), peer_id, *verify_id, Err(()));
149154

150155
// TODO(binier): blacklist peer
@@ -154,6 +159,16 @@ impl SnarkPoolCandidatesState {
154159
peer_id,
155160
reason: P2pDisconnectionReason::SnarkPoolVerifyError,
156161
});
162+
163+
for snark in batch {
164+
dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
165+
message_id: Some(BroadcastMessageId::Snark {
166+
job_id: snark.job_id(),
167+
}),
168+
peer_id: None,
169+
reason: "Snark work verification failed".to_string(),
170+
});
171+
}
157172
}
158173
SnarkPoolCandidateAction::WorkVerifySuccess {
159174
peer_id,

node/src/snark_pool/snark_pool_reducer.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
use std::collections::BTreeMap;
22

33
use openmina_core::snark::{SnarkJobCommitment, SnarkJobId};
4-
use p2p::channels::{
5-
snark::P2pChannelsSnarkAction, snark_job_commitment::P2pChannelsSnarkJobCommitmentAction,
4+
use p2p::{
5+
channels::{
6+
snark::P2pChannelsSnarkAction, snark_job_commitment::P2pChannelsSnarkJobCommitmentAction,
7+
},
8+
BroadcastMessageId, P2pNetworkPubsubAction,
69
};
710

811
use crate::{snark_pool::JobCommitment, ExternalSnarkWorkerAction, SnarkerStrategy};
@@ -202,13 +205,17 @@ impl SnarkPoolState {
202205
}
203206
}
204207

205-
// TODO: libp2p logic already broadcasts everything right now and doesn't
206-
// wait for validation, thad needs to be fixed. See #952
207208
dispatcher.push(P2pChannelsSnarkAction::Libp2pBroadcast {
208209
snark: snark.clone(),
209210
nonce: 0,
210211
is_local: *is_sender_local,
211212
});
213+
214+
dispatcher.push(P2pNetworkPubsubAction::BroadcastValidatedMessage {
215+
message_id: BroadcastMessageId::Snark {
216+
job_id: snark.job_id(),
217+
},
218+
});
212219
}
213220
SnarkPoolAction::P2pSendAll { .. } => {
214221
let (dispatcher, global_state) = state_context.into_dispatcher_and_state();

node/src/transaction_pool/transaction_pool_actions.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ use ledger::{
88
},
99
Account, AccountId,
1010
};
11-
use mina_p2p_messages::{list::List, v2};
11+
use mina_p2p_messages::{
12+
list::List,
13+
v2::{self, TransactionHash},
14+
};
1215
use openmina_core::{requests::RpcId, transaction::TransactionWithHash, ActionEvent};
1316
use redux::Callback;
1417
use serde::{Deserialize, Serialize};
@@ -38,6 +41,7 @@ pub enum TransactionPoolAction {
3841
#[action_event(level = warn, fields(debug(errors)))]
3942
VerifyError {
4043
errors: Vec<String>,
44+
tx_hashes: Vec<TransactionHash>,
4145
},
4246
BestTipChanged {
4347
best_tip_hash: v2::LedgerHash,

node/src/transaction_pool/transaction_pool_reducer.rs

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,16 @@ use ledger::{
66
},
77
Account, AccountId,
88
};
9+
use mina_p2p_messages::v2::TransactionHash;
910
use openmina_core::{
1011
bug_condition,
1112
constants::constraint_constants,
1213
requests::RpcId,
1314
transaction::{Transaction, TransactionWithHash},
1415
};
15-
use p2p::channels::transaction::P2pChannelsTransactionAction;
16+
use p2p::{
17+
channels::transaction::P2pChannelsTransactionAction, BroadcastMessageId, P2pNetworkPubsubAction,
18+
};
1619
use redux::callback;
1720
use snark::user_command_verify::{SnarkUserCommandVerifyAction, SnarkUserCommandVerifyId};
1821
use std::collections::{BTreeMap, BTreeSet};
@@ -97,6 +100,12 @@ impl TransactionPoolState {
97100
panic!()
98101
};
99102

103+
let tx_hashes = commands
104+
.iter()
105+
.map(TransactionWithHash::hash)
106+
.cloned()
107+
.collect::<Vec<_>>();
108+
100109
// TODO: Convert those commands only once
101110
let Ok(commands) = commands
102111
.iter()
@@ -133,10 +142,11 @@ impl TransactionPoolState {
133142
),
134143
on_error: callback!(
135144
on_snark_user_command_verify_error(
136-
(req_id: SnarkUserCommandVerifyId, errors: Vec<String>)
145+
(req_id: SnarkUserCommandVerifyId, errors: Vec<String>, tx_hashes: Vec<TransactionHash>)
137146
) -> crate::Action {
138147
TransactionPoolAction::VerifyError {
139-
errors
148+
errors,
149+
tx_hashes
140150
}
141151
}
142152
)
@@ -147,6 +157,7 @@ impl TransactionPoolState {
147157
let dispatcher = state.into_dispatcher();
148158
dispatcher.push(TransactionPoolAction::VerifyError {
149159
errors: errors.clone(),
160+
tx_hashes,
150161
});
151162
if let Some(rpc_id) = from_rpc {
152163
dispatcher.push(RpcAction::TransactionInjectFailure {
@@ -186,8 +197,16 @@ impl TransactionPoolState {
186197
from_rpc: *from_rpc,
187198
});
188199
}
189-
TransactionPoolAction::VerifyError { .. } => {
200+
TransactionPoolAction::VerifyError { tx_hashes, .. } => {
190201
// just logging the errors
202+
let dispatcher = state.into_dispatcher();
203+
for tx in tx_hashes {
204+
dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
205+
message_id: Some(BroadcastMessageId::Transaction { tx: tx.clone() }),
206+
peer_id: None,
207+
reason: "Transaction rejected".to_owned(),
208+
});
209+
}
191210
}
192211
TransactionPoolAction::BestTipChanged { best_tip_hash } => {
193212
let account_ids = substate.pool.get_accounts_to_revalidate_on_new_best_tip();
@@ -300,9 +319,22 @@ impl TransactionPoolState {
300319
if let Some(rpc_action) = rpc_action {
301320
dispatcher.push(rpc_action);
302321
}
303-
// TODO: libp2p logic already broadcasts everything right now and doesn't
304-
// wait for validation, thad needs to be fixed. See #952
322+
305323
if was_accepted {
324+
for tx in &accepted {
325+
dispatcher.push(P2pNetworkPubsubAction::BroadcastValidatedMessage {
326+
message_id: BroadcastMessageId::Transaction {
327+
tx: tx.hash.clone(),
328+
},
329+
});
330+
}
331+
for (tx, _) in &rejected {
332+
dispatcher.push(P2pNetworkPubsubAction::BroadcastValidatedMessage {
333+
message_id: BroadcastMessageId::Transaction {
334+
tx: tx.hash.clone(),
335+
},
336+
});
337+
}
306338
dispatcher.push(TransactionPoolAction::Rebroadcast {
307339
accepted,
308340
rejected,

p2p/src/network/pubsub/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ mod p2p_network_pubsub_reducer;
1818
const TOPIC: &str = "coda/consensus-messages/0.0.1";
1919

2020
pub mod pubsub_effectful;
21+
use mina_p2p_messages::v2::TransactionHash;
22+
use openmina_core::snark::SnarkJobId;
2123
pub use pubsub_effectful::P2pNetworkPubsubEffectfulAction;
2224

2325
use binprot::BinProtWrite;
@@ -32,6 +34,12 @@ pub enum BroadcastMessageId {
3234
BlockHash {
3335
hash: mina_p2p_messages::v2::StateHash,
3436
},
37+
Snark {
38+
job_id: SnarkJobId,
39+
},
40+
Transaction {
41+
tx: TransactionHash,
42+
},
3543
MessageId {
3644
message_id: P2pNetworkPubsubMessageCacheId,
3745
},

p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs

Lines changed: 72 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
1-
use std::{collections::btree_map::Entry, time::Duration};
1+
use std::{
2+
collections::{btree_map::Entry, BTreeMap},
3+
time::Duration,
4+
};
25

36
use binprot::BinProtRead;
47
use mina_p2p_messages::{
58
gossip::{self, GossipNetMessageV2},
69
v2::NetworkPoolSnarkPoolDiffVersionedStableV2,
710
};
8-
use openmina_core::{block::BlockWithHash, bug_condition, fuzz_maybe, fuzzed_maybe, Substate};
11+
use openmina_core::{
12+
block::BlockWithHash, bug_condition, fuzz_maybe, fuzzed_maybe, snark::Snark, Substate,
13+
};
914
use redux::{Dispatcher, Timestamp};
1015

1116
use crate::{
@@ -21,8 +26,8 @@ use super::{
2126
P2pNetworkPubsubMessageCacheMessage,
2227
},
2328
pb::{self, Message},
24-
P2pNetworkPubsubAction, P2pNetworkPubsubClientState, P2pNetworkPubsubEffectfulAction,
25-
P2pNetworkPubsubMessageCacheId, P2pNetworkPubsubState, TOPIC,
29+
BroadcastMessageId, P2pNetworkPubsubAction, P2pNetworkPubsubClientState,
30+
P2pNetworkPubsubEffectfulAction, P2pNetworkPubsubMessageCacheId, P2pNetworkPubsubState, TOPIC,
2631
};
2732

2833
const MAX_MESSAGE_KEEP_DURATION: Duration = Duration::from_secs(300);
@@ -555,6 +560,39 @@ impl P2pNetworkPubsubState {
555560
time,
556561
}
557562
}
563+
GossipNetMessageV2::SnarkPoolDiff {
564+
message: NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(snark),
565+
..
566+
} => {
567+
let snark: Snark = snark.1.clone().into();
568+
let job_id = snark.job_id();
569+
P2pNetworkPubsubMessageCacheMessage::PreValidatedSnark {
570+
job_id,
571+
message,
572+
peer_id,
573+
time,
574+
}
575+
}
576+
GossipNetMessageV2::TransactionPoolDiff {
577+
message: tx_message,
578+
..
579+
} => {
580+
let mut tx_hashes = BTreeMap::new();
581+
582+
for tx in &tx_message.0 {
583+
let Ok(hash) = tx.hash() else {
584+
continue;
585+
};
586+
tx_hashes.insert(hash, false);
587+
}
588+
589+
P2pNetworkPubsubMessageCacheMessage::PreValidatedTransactions {
590+
tx_hashes,
591+
message,
592+
peer_id,
593+
time,
594+
}
595+
}
558596
_ => P2pNetworkPubsubMessageCacheMessage::PreValidated {
559597
message,
560598
peer_id,
@@ -572,7 +610,6 @@ impl P2pNetworkPubsubState {
572610
GossipNetMessageV2::NewState(block) => {
573611
let best_tip = BlockWithHash::try_new(block.clone())?;
574612
dispatcher.push(P2pPeerAction::BestTipUpdate { peer_id, best_tip });
575-
return Ok(());
576613
}
577614
GossipNetMessageV2::TransactionPoolDiff { message, nonce } => {
578615
let nonce = nonce.as_u32();
@@ -588,18 +625,15 @@ impl P2pNetworkPubsubState {
588625
message: NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(work),
589626
nonce,
590627
} => {
628+
let snark: Snark = work.1.into();
591629
dispatcher.push(P2pChannelsSnarkAction::Libp2pReceived {
592630
peer_id,
593-
snark: Box::new(work.1.into()),
631+
snark: Box::new(snark),
594632
nonce: nonce.as_u32(),
595633
});
596634
}
597635
_ => {}
598636
}
599-
600-
dispatcher.push(P2pNetworkPubsubAction::BroadcastValidatedMessage {
601-
message_id: super::BroadcastMessageId::MessageId { message_id },
602-
});
603637
Ok(())
604638
}
605639
P2pNetworkPubsubAction::BroadcastValidatedMessage { message_id } => {
@@ -609,8 +643,32 @@ impl P2pNetworkPubsubState {
609643
bug_condition!("Message with id: {:?} not found", message_id);
610644
return Ok(());
611645
};
646+
647+
if let BroadcastMessageId::Transaction { tx } = &message_id {
648+
if let P2pNetworkPubsubMessageCacheMessage::PreValidatedTransactions {
649+
tx_hashes,
650+
..
651+
} = message
652+
{
653+
if let Some(value) = tx_hashes.get_mut(tx) {
654+
*value = true;
655+
} else {
656+
bug_condition!("Transaction with hash: {} not found", tx);
657+
return Ok(());
658+
}
659+
660+
let all_vertified = tx_hashes.values().all(|v| *v);
661+
if !all_vertified {
662+
return Ok(());
663+
}
664+
} else {
665+
bug_condition!("Invalid state for message id with type transaction");
666+
return Ok(());
667+
}
668+
}
669+
612670
let raw_message = message.message().clone();
613-
let peer_id = message.peer_id();
671+
let peer_id = *message.peer_id();
614672

615673
pubsub_state.reduce_incoming_validated_message(
616674
mcache_message_id,
@@ -628,7 +686,7 @@ impl P2pNetworkPubsubState {
628686
*message = P2pNetworkPubsubMessageCacheMessage::Validated {
629687
message: raw_message,
630688
peer_id,
631-
time: message.time(),
689+
time: *message.time(),
632690
};
633691

634692
let (dispatcher, state) = state_context.into_dispatcher_and_state();
@@ -641,7 +699,7 @@ impl P2pNetworkPubsubState {
641699
.map
642700
.iter()
643701
.filter_map(|(message_id, message)| {
644-
if message.time() + MAX_MESSAGE_KEEP_DURATION > time {
702+
if (*message.time() + MAX_MESSAGE_KEEP_DURATION) <= time {
645703
Some(message_id.to_owned())
646704
} else {
647705
None
@@ -669,7 +727,7 @@ impl P2pNetworkPubsubState {
669727
};
670728

671729
if peer_id.is_none() {
672-
peer_id = Some(message.peer_id());
730+
peer_id = Some(*message.peer_id());
673731
}
674732

675733
pubsub_state.mcache.remove_message(_message_id);

0 commit comments

Comments
 (0)