Skip to content

Commit f2bf64c

Browse files
authored
Merge pull request #1065 from 0xMimir/feat/delayed-pubsub-broadcast
Added broadcast delay for snarks and transactions until they are fully vertified
2 parents 044ef77 + 4d49eb5 commit f2bf64c

29 files changed

+390
-231
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ mina-p2p-messages = { workspace = true }
3636
poseidon = { workspace = true }
3737
hex = "0.4.3"
3838
ark-ff = { workspace = true }
39+
libp2p-identity = { version = "=0.2.7", features = [
40+
"serde",
41+
"peerid"
42+
] }
3943

4044
[target.'cfg(not(target_family = "wasm"))'.dependencies]
4145
redux = { workspace = true, features = ["serializable_callbacks"] }

core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub mod constants;
1717
pub mod dummy;
1818

1919
pub mod block;
20+
pub mod p2p;
2021
pub mod snark;
2122
pub mod transaction;
2223

core/src/p2p.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/// TODO: These types and methods should be moved to `p2p` crate, they are here because they are used in `snark` crates callbacks
2+
use serde::{Deserialize, Serialize};
3+
4+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy)]
5+
pub struct P2pNetworkPubsubMessageCacheId {
6+
pub source: libp2p_identity::PeerId,
7+
pub seqno: u64,
8+
}
9+
10+
impl P2pNetworkPubsubMessageCacheId {
11+
pub fn to_raw_bytes(&self) -> Vec<u8> {
12+
let mut message_id = self.source.to_base58();
13+
message_id.push_str(&self.seqno.to_string());
14+
message_id.into_bytes()
15+
}
16+
}

core/src/transaction/mod.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,36 @@ mod transaction_with_hash;
55
pub use transaction_with_hash::*;
66

77
pub use mina_p2p_messages::v2::{MinaBaseUserCommandStableV2 as Transaction, TransactionHash};
8+
9+
use crate::{p2p::P2pNetworkPubsubMessageCacheId, requests::RpcId};
10+
11+
/// TODO: Types and methods bellow, should be moved to `node` crate, they are here because they are used in `snark` crates callbacks
12+
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, Default)]
13+
pub enum TransactionPoolMessageSource {
14+
Rpc {
15+
id: RpcId,
16+
},
17+
Pubsub {
18+
id: P2pNetworkPubsubMessageCacheId,
19+
},
20+
#[default]
21+
None,
22+
}
23+
24+
impl TransactionPoolMessageSource {
25+
pub fn rpc(id: RpcId) -> Self {
26+
Self::Rpc { id }
27+
}
28+
29+
pub fn pubsub(id: P2pNetworkPubsubMessageCacheId) -> Self {
30+
Self::Pubsub { id }
31+
}
32+
33+
pub fn is_sender_local(&self) -> bool {
34+
matches!(self, Self::Rpc { .. })
35+
}
36+
37+
pub fn is_libp2p(&self) -> bool {
38+
matches!(self, Self::Pubsub { .. })
39+
}
40+
}

node/src/rpc/rpc_reducer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use openmina_core::{
22
block::AppliedBlock,
33
bug_condition,
44
requests::{RequestId, RpcId, RpcIdType},
5-
transaction::TransactionWithHash,
5+
transaction::{TransactionPoolMessageSource, TransactionWithHash},
66
};
77
use p2p::{
88
connection::{incoming::P2pConnectionIncomingAction, outgoing::P2pConnectionOutgoingAction},
@@ -515,7 +515,7 @@ impl RpcState {
515515
dispatcher.push(RpcAction::TransactionInjectPending { rpc_id: *rpc_id });
516516
dispatcher.push(TransactionPoolAction::StartVerify {
517517
commands: commands_with_hash,
518-
from_rpc: Some(*rpc_id),
518+
from_source: TransactionPoolMessageSource::rpc(*rpc_id),
519519
});
520520
}
521521
RpcAction::TransactionInjectPending { rpc_id } => {

node/src/snark_pool/candidate/snark_pool_candidate_actions.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ pub enum SnarkPoolCandidateAction {
4949
WorkVerifyError {
5050
peer_id: PeerId,
5151
verify_id: SnarkWorkVerifyId,
52+
batch: Vec<SnarkJobId>,
5253
},
5354
WorkVerifySuccess {
5455
peer_id: PeerId,

node/src/snark_pool/candidate/snark_pool_candidate_reducer.rs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
use std::collections::BTreeMap;
22

33
use crate::{p2p_ready, SnarkPoolAction};
4-
use openmina_core::snark::Snark;
4+
use openmina_core::snark::{Snark, SnarkJobId};
55
use p2p::{
66
channels::rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcRequest},
77
disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
8-
PeerId,
8+
BroadcastMessageId, P2pNetworkPubsubAction, PeerId,
99
};
1010
use snark::{work_verify::SnarkWorkVerifyAction, work_verify_effectful::SnarkWorkVerifyId};
1111

@@ -124,10 +124,11 @@ impl SnarkPoolCandidatesState {
124124
}
125125
}),
126126
on_error: redux::callback!(
127-
on_snark_pool_candidate_work_verify_error((req_id: SnarkWorkVerifyId, sender: String)) -> crate::Action {
127+
on_snark_pool_candidate_work_verify_error((req_id: SnarkWorkVerifyId, sender: String, batch: Vec<SnarkJobId>)) -> crate::Action {
128128
SnarkPoolCandidateAction::WorkVerifyError {
129129
peer_id: sender.parse().unwrap(),
130130
verify_id: req_id,
131+
batch
131132
}
132133
}),
133134
});
@@ -144,7 +145,11 @@ impl SnarkPoolCandidatesState {
144145
} => {
145146
state.verify_pending(meta.time(), peer_id, *verify_id, job_ids);
146147
}
147-
SnarkPoolCandidateAction::WorkVerifyError { peer_id, verify_id } => {
148+
SnarkPoolCandidateAction::WorkVerifyError {
149+
peer_id,
150+
verify_id,
151+
batch,
152+
} => {
148153
state.verify_result(meta.time(), peer_id, *verify_id, Err(()));
149154

150155
// TODO(binier): blacklist peer
@@ -154,6 +159,22 @@ impl SnarkPoolCandidatesState {
154159
peer_id,
155160
reason: P2pDisconnectionReason::SnarkPoolVerifyError,
156161
});
162+
163+
// TODO: This is not correct. We are rejecting all snark messages, but the fact that the batch
164+
// failed to verify means that there is at least one invalid snark in the batch, not that all of them
165+
// are invalid.
166+
// Instead, what should happen here is that we split the batch in two and try to verify the two batches
167+
// again. Repeating until batches don't fail to verify anymore, or each batch is of size 1.
168+
// It may also be worth capping the batch sizes to 10.
169+
for snark_job_id in batch {
170+
dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
171+
message_id: Some(BroadcastMessageId::Snark {
172+
job_id: snark_job_id.clone(),
173+
}),
174+
peer_id: None,
175+
reason: "Snark work verification failed".to_string(),
176+
});
177+
}
157178
}
158179
SnarkPoolCandidateAction::WorkVerifySuccess {
159180
peer_id,

node/src/snark_pool/snark_pool_reducer.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
use std::collections::BTreeMap;
22

3+
use crate::{snark_pool::JobCommitment, ExternalSnarkWorkerAction, SnarkerStrategy};
34
use openmina_core::snark::{SnarkJobCommitment, SnarkJobId};
45
use p2p::channels::{
56
snark::P2pChannelsSnarkAction, snark_job_commitment::P2pChannelsSnarkJobCommitmentAction,
67
};
78

8-
use crate::{snark_pool::JobCommitment, ExternalSnarkWorkerAction, SnarkerStrategy};
9-
109
use super::{
1110
JobState, SnarkPoolAction, SnarkPoolActionWithMetaRef, SnarkPoolEffectfulAction,
1211
SnarkPoolState, SnarkWork,
@@ -202,8 +201,6 @@ impl SnarkPoolState {
202201
}
203202
}
204203

205-
// TODO: libp2p logic already broadcasts everything right now and doesn't
206-
// wait for validation, thad needs to be fixed. See #952
207204
dispatcher.push(P2pChannelsSnarkAction::Libp2pBroadcast {
208205
snark: snark.clone(),
209206
nonce: 0,

node/src/state.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ use std::time::Duration;
44
use malloc_size_of_derive::MallocSizeOf;
55
use mina_p2p_messages::v2;
66
use openmina_core::constants::PROTOCOL_VERSION;
7-
use openmina_core::transaction::{TransactionInfo, TransactionWithHash};
7+
use openmina_core::transaction::{
8+
TransactionInfo, TransactionPoolMessageSource, TransactionWithHash,
9+
};
810
use p2p::P2pNetworkPubsubMessageCacheId;
911
use rand::prelude::*;
1012

@@ -539,10 +541,10 @@ impl P2p {
539541
}
540542
)),
541543
on_p2p_channels_transaction_libp2p_received: Some(redux::callback!(
542-
on_p2p_channels_transaction_libp2p_received(transaction: Box<TransactionWithHash>) -> crate::Action {
544+
on_p2p_channels_transaction_libp2p_received((transactions: Vec<TransactionWithHash>, id: P2pNetworkPubsubMessageCacheId)) -> crate::Action {
543545
TransactionPoolAction::StartVerify {
544-
commands: std::iter::once(*transaction).collect(),
545-
from_rpc: None
546+
commands: transactions.into_iter().collect(),
547+
from_source: TransactionPoolMessageSource::pubsub(id),
546548
}
547549
}
548550
)),

0 commit comments

Comments
 (0)