Skip to content

Commit da83a61

Browse files
committed
Updated sources for transaction pool
1 parent 0fee577 commit da83a61

25 files changed

+241
-248
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ mina-p2p-messages = { workspace = true }
3636
poseidon = { workspace = true }
3737
hex = "0.4.3"
3838
ark-ff = { workspace = true }
39+
libp2p-identity = { version = "=0.2.7", features = [
40+
"serde",
41+
"peerid"
42+
] }
3943

4044
[target.'cfg(not(target_family = "wasm"))'.dependencies]
4145
redux = { workspace = true, features = ["serializable_callbacks"] }

core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub mod constants;
1717
pub mod dummy;
1818

1919
pub mod block;
20+
pub mod p2p;
2021
pub mod snark;
2122
pub mod transaction;
2223

core/src/p2p.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/// TODO: These types and methods should be moved to `p2p` crate, they are here because they are used in `snark` crates callbacks
2+
use serde::{Deserialize, Serialize};
3+
4+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy)]
5+
pub struct P2pNetworkPubsubMessageCacheId {
6+
pub source: libp2p_identity::PeerId,
7+
pub seqno: u64,
8+
}
9+
10+
impl P2pNetworkPubsubMessageCacheId {
11+
pub fn to_raw_bytes(&self) -> Vec<u8> {
12+
let mut message_id = self.source.to_base58();
13+
message_id.push_str(&self.seqno.to_string());
14+
message_id.into_bytes()
15+
}
16+
}

core/src/transaction/mod.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,36 @@ mod transaction_with_hash;
55
pub use transaction_with_hash::*;
66

77
pub use mina_p2p_messages::v2::{MinaBaseUserCommandStableV2 as Transaction, TransactionHash};
8+
9+
use crate::{p2p::P2pNetworkPubsubMessageCacheId, requests::RpcId};
10+
11+
/// TODO: Types and methods bellow, should be moved to `node` crate, they are here because they are used in `snark` crates callbacks
12+
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, Default)]
13+
pub enum TransactionPoolMessageSource {
14+
Rpc {
15+
id: RpcId,
16+
},
17+
Pubsub {
18+
id: P2pNetworkPubsubMessageCacheId,
19+
},
20+
#[default]
21+
None,
22+
}
23+
24+
impl TransactionPoolMessageSource {
25+
pub fn rpc(id: RpcId) -> Self {
26+
Self::Rpc { id }
27+
}
28+
29+
pub fn pubsub(id: P2pNetworkPubsubMessageCacheId) -> Self {
30+
Self::Pubsub { id }
31+
}
32+
33+
pub fn is_sender_local(&self) -> bool {
34+
matches!(self, Self::Rpc { .. })
35+
}
36+
37+
pub fn is_libp2p(&self) -> bool {
38+
matches!(self, Self::Pubsub { .. })
39+
}
40+
}

node/src/rpc/rpc_reducer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use openmina_core::{
22
block::AppliedBlock,
33
bug_condition,
44
requests::{RequestId, RpcId, RpcIdType},
5-
transaction::TransactionWithHash,
5+
transaction::{TransactionPoolMessageSource, TransactionWithHash},
66
};
77
use p2p::{
88
connection::{incoming::P2pConnectionIncomingAction, outgoing::P2pConnectionOutgoingAction},
@@ -515,7 +515,7 @@ impl RpcState {
515515
dispatcher.push(RpcAction::TransactionInjectPending { rpc_id: *rpc_id });
516516
dispatcher.push(TransactionPoolAction::StartVerify {
517517
commands: commands_with_hash,
518-
from_rpc: Some(*rpc_id),
518+
from_source: TransactionPoolMessageSource::rpc(*rpc_id),
519519
});
520520
}
521521
RpcAction::TransactionInjectPending { rpc_id } => {

node/src/snark_pool/candidate/snark_pool_candidate_reducer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ impl SnarkPoolCandidatesState {
160160
reason: P2pDisconnectionReason::SnarkPoolVerifyError,
161161
});
162162

163+
// TODO: this is incorrect, only one snark in batch could be invalid but we would reject that whole batch
163164
for snark_job_id in batch {
164165
dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
165166
message_id: Some(BroadcastMessageId::Snark {

node/src/snark_pool/snark_pool_reducer.rs

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
11
use std::collections::BTreeMap;
22

3+
use crate::{snark_pool::JobCommitment, ExternalSnarkWorkerAction, SnarkerStrategy};
34
use openmina_core::snark::{SnarkJobCommitment, SnarkJobId};
4-
use p2p::{
5-
channels::{
6-
snark::P2pChannelsSnarkAction, snark_job_commitment::P2pChannelsSnarkJobCommitmentAction,
7-
},
8-
BroadcastMessageId, P2pNetworkPubsubAction,
5+
use p2p::channels::{
6+
snark::P2pChannelsSnarkAction, snark_job_commitment::P2pChannelsSnarkJobCommitmentAction,
97
};
108

11-
use crate::{snark_pool::JobCommitment, ExternalSnarkWorkerAction, SnarkerStrategy};
12-
139
use super::{
1410
JobState, SnarkPoolAction, SnarkPoolActionWithMetaRef, SnarkPoolEffectfulAction,
1511
SnarkPoolState, SnarkWork,
@@ -210,12 +206,6 @@ impl SnarkPoolState {
210206
nonce: 0,
211207
is_local: *is_sender_local,
212208
});
213-
214-
dispatcher.push(P2pNetworkPubsubAction::BroadcastValidatedMessage {
215-
message_id: BroadcastMessageId::Snark {
216-
job_id: snark.job_id(),
217-
},
218-
});
219209
}
220210
SnarkPoolAction::P2pSendAll { .. } => {
221211
let (dispatcher, global_state) = state_context.into_dispatcher_and_state();

node/src/state.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ use std::time::Duration;
44
use malloc_size_of_derive::MallocSizeOf;
55
use mina_p2p_messages::v2;
66
use openmina_core::constants::PROTOCOL_VERSION;
7-
use openmina_core::transaction::{TransactionInfo, TransactionWithHash};
7+
use openmina_core::transaction::{
8+
TransactionInfo, TransactionPoolMessageSource, TransactionWithHash,
9+
};
810
use p2p::P2pNetworkPubsubMessageCacheId;
911
use rand::prelude::*;
1012

@@ -539,10 +541,10 @@ impl P2p {
539541
}
540542
)),
541543
on_p2p_channels_transaction_libp2p_received: Some(redux::callback!(
542-
on_p2p_channels_transaction_libp2p_received(transactions: Vec<TransactionWithHash>) -> crate::Action {
544+
on_p2p_channels_transaction_libp2p_received((transactions: Vec<TransactionWithHash>, id: P2pNetworkPubsubMessageCacheId)) -> crate::Action {
543545
TransactionPoolAction::StartVerify {
544546
commands: transactions.into_iter().collect(),
545-
from_rpc: None
547+
from_source: TransactionPoolMessageSource::pubsub(id),
546548
}
547549
}
548550
)),

node/src/transaction_pool/candidate/transaction_pool_candidate_reducer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#![allow(clippy::unit_arg)]
22

33
use crate::{p2p_ready, TransactionPoolAction};
4+
use openmina_core::transaction::TransactionPoolMessageSource;
45
use p2p::{
56
channels::rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcRequest},
67
PeerId,
@@ -103,7 +104,7 @@ impl TransactionPoolCandidatesState {
103104
let transaction_hashes = batch.iter().map(|tx| tx.hash().clone()).collect();
104105
dispatcher.push(TransactionPoolAction::StartVerify {
105106
commands: batch.into_iter().collect(),
106-
from_rpc: None,
107+
from_source: TransactionPoolMessageSource::None,
107108
});
108109
dispatcher.push(TransactionPoolCandidateAction::VerifyPending {
109110
peer_id,

0 commit comments

Comments
 (0)