Skip to content

Commit b89d5c9

Browse files
committed
Changed action flow for transaction in pubsub
1 parent 7e58931 commit b89d5c9

File tree

9 files changed

+161
-25
lines changed

9 files changed

+161
-25
lines changed

node/src/action_kind.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,7 @@ pub enum ActionKind {
607607
TransactionPoolCandidateFetchPending,
608608
TransactionPoolCandidateFetchSuccess,
609609
TransactionPoolCandidateInfoReceived,
610+
TransactionPoolCandidateLibp2pTransactionsReceived,
610611
TransactionPoolCandidatePeerPrune,
611612
TransactionPoolCandidateVerifyError,
612613
TransactionPoolCandidateVerifyNext,
@@ -719,7 +720,7 @@ pub enum ActionKind {
719720
}
720721

721722
impl ActionKind {
722-
pub const COUNT: u16 = 609;
723+
pub const COUNT: u16 = 610;
723724
}
724725

725726
impl std::fmt::Display for ActionKind {
@@ -1553,6 +1554,9 @@ impl ActionKindGet for TransactionPoolCandidateAction {
15531554
Self::FetchPending { .. } => ActionKind::TransactionPoolCandidateFetchPending,
15541555
Self::FetchError { .. } => ActionKind::TransactionPoolCandidateFetchError,
15551556
Self::FetchSuccess { .. } => ActionKind::TransactionPoolCandidateFetchSuccess,
1557+
Self::Libp2pTransactionsReceived { .. } => {
1558+
ActionKind::TransactionPoolCandidateLibp2pTransactionsReceived
1559+
}
15561560
Self::VerifyNext => ActionKind::TransactionPoolCandidateVerifyNext,
15571561
Self::VerifyPending { .. } => ActionKind::TransactionPoolCandidateVerifyPending,
15581562
Self::VerifyError { .. } => ActionKind::TransactionPoolCandidateVerifyError,

node/src/snark_pool/candidate/snark_pool_candidate_actions.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,10 @@ impl redux::EnablingCondition<crate::State> for SnarkPoolCandidateAction {
112112
}
113113
})
114114
}
115-
SnarkPoolCandidateAction::WorkVerifyNext => state.snark.work_verify.jobs.is_empty(),
115+
SnarkPoolCandidateAction::WorkVerifyNext => {
116+
state.snark.work_verify.jobs.is_empty()
117+
&& state.transition_frontier.sync.is_synced()
118+
}
116119
SnarkPoolCandidateAction::WorkVerifyPending {
117120
peer_id, job_ids, ..
118121
} => {

node/src/state.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@ use std::time::Duration;
44
use malloc_size_of_derive::MallocSizeOf;
55
use mina_p2p_messages::v2;
66
use openmina_core::constants::PROTOCOL_VERSION;
7-
use openmina_core::transaction::{
8-
TransactionInfo, TransactionPoolMessageSource, TransactionWithHash,
9-
};
7+
use openmina_core::transaction::{TransactionInfo, TransactionWithHash};
108
use p2p::P2pNetworkPubsubMessageCacheId;
119
use rand::prelude::*;
1210

@@ -59,7 +57,7 @@ pub use crate::transition_frontier::TransitionFrontierState;
5957
pub use crate::watched_accounts::WatchedAccountsState;
6058
pub use crate::Config;
6159
use crate::{config::GlobalConfig, SnarkPoolAction};
62-
use crate::{ActionWithMeta, RpcAction, TransactionPoolAction};
60+
use crate::{ActionWithMeta, RpcAction};
6361

6462
#[derive(Serialize, Deserialize, Debug, Clone)]
6563
pub struct State {
@@ -540,11 +538,12 @@ impl P2p {
540538
}
541539
}
542540
)),
543-
on_p2p_channels_transaction_libp2p_received: Some(redux::callback!(
544-
on_p2p_channels_transaction_libp2p_received((transactions: Vec<TransactionWithHash>, id: P2pNetworkPubsubMessageCacheId)) -> crate::Action {
545-
TransactionPoolAction::StartVerify {
546-
commands: transactions.into_iter().collect(),
547-
from_source: TransactionPoolMessageSource::pubsub(id),
541+
on_p2p_channels_transactions_libp2p_received: Some(redux::callback!(
542+
on_p2p_channels_transactions_libp2p_received((peer_id: PeerId, transactions: Vec<TransactionWithHash>, message_id: P2pNetworkPubsubMessageCacheId)) -> crate::Action {
543+
TransactionPoolCandidateAction::Libp2pTransactionsReceived {
544+
message_id,
545+
transactions,
546+
peer_id
548547
}
549548
}
550549
)),

node/src/transaction_pool/candidate/transaction_pool_candidate_actions.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use openmina_core::transaction::{TransactionHash, TransactionInfo, TransactionWithHash};
22
use openmina_core::ActionEvent;
3+
use p2p::P2pNetworkPubsubMessageCacheId;
34
use serde::{Deserialize, Serialize};
45

56
use crate::p2p::channels::rpc::P2pRpcId;
@@ -37,12 +38,19 @@ pub enum TransactionPoolCandidateAction {
3738
peer_id: PeerId,
3839
transaction: TransactionWithHash,
3940
},
41+
/// Callback for transactions received over pubsub
42+
Libp2pTransactionsReceived {
43+
peer_id: PeerId,
44+
transactions: Vec<TransactionWithHash>,
45+
message_id: P2pNetworkPubsubMessageCacheId,
46+
},
4047
#[action_event(level = trace)]
4148
VerifyNext,
4249
VerifyPending {
4350
peer_id: PeerId,
4451
transaction_hashes: Vec<TransactionHash>,
4552
verify_id: (),
53+
from_source: Option<P2pNetworkPubsubMessageCacheId>,
4654
},
4755
VerifyError {
4856
peer_id: PeerId,
@@ -51,6 +59,7 @@ pub enum TransactionPoolCandidateAction {
5159
VerifySuccess {
5260
peer_id: PeerId,
5361
verify_id: (),
62+
from_source: Option<P2pNetworkPubsubMessageCacheId>,
5463
},
5564
PeerPrune {
5665
peer_id: PeerId,
@@ -100,7 +109,10 @@ impl redux::EnablingCondition<crate::State> for TransactionPoolCandidateAction {
100109
.candidates
101110
.get(*peer_id, transaction.hash())
102111
.is_some(),
103-
TransactionPoolCandidateAction::VerifyNext => true,
112+
TransactionPoolCandidateAction::Libp2pTransactionsReceived { .. } => true,
113+
TransactionPoolCandidateAction::VerifyNext => {
114+
state.transition_frontier.sync.is_synced()
115+
}
104116
TransactionPoolCandidateAction::VerifyPending {
105117
peer_id,
106118
transaction_hashes,

node/src/transaction_pool/candidate/transaction_pool_candidate_reducer.rs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,38 +90,55 @@ impl TransactionPoolCandidatesState {
9090
} => {
9191
state.transaction_received(meta.time(), *peer_id, transaction.clone());
9292
}
93+
TransactionPoolCandidateAction::Libp2pTransactionsReceived {
94+
peer_id,
95+
transactions,
96+
message_id,
97+
} => {
98+
state.transactions_received(
99+
meta.time(),
100+
*peer_id,
101+
transactions.clone(),
102+
*message_id,
103+
);
104+
}
93105
TransactionPoolCandidateAction::VerifyNext => {
94106
let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
95107

96108
let batch = global_state
97109
.transaction_pool
98110
.candidates
99111
.get_batch_to_verify();
100-
let Some((peer_id, batch)) = batch else {
112+
let Some((peer_id, batch, from_source)) = batch else {
101113
return;
102114
};
103115

104116
let transaction_hashes = batch.iter().map(|tx| tx.hash().clone()).collect();
105117
dispatcher.push(TransactionPoolAction::StartVerify {
106118
commands: batch.into_iter().collect(),
107-
from_source: TransactionPoolMessageSource::None,
119+
from_source: from_source
120+
.map(TransactionPoolMessageSource::pubsub)
121+
.unwrap_or_default(),
108122
});
109123
dispatcher.push(TransactionPoolCandidateAction::VerifyPending {
110124
peer_id,
111125
transaction_hashes,
112126
verify_id: (),
127+
from_source,
113128
});
114129
}
115130
TransactionPoolCandidateAction::VerifyPending {
116131
peer_id,
117132
transaction_hashes,
118133
verify_id,
134+
from_source,
119135
} => {
120136
state.verify_pending(meta.time(), peer_id, *verify_id, transaction_hashes);
121137
let dispatcher = state_context.into_dispatcher();
122138
dispatcher.push(TransactionPoolCandidateAction::VerifySuccess {
123139
peer_id: *peer_id,
124140
verify_id: *verify_id,
141+
from_source: *from_source,
125142
});
126143
}
127144
TransactionPoolCandidateAction::VerifyError {
@@ -139,8 +156,12 @@ impl TransactionPoolCandidatesState {
139156
// reason: P2pDisconnectionReason::TransactionPoolVerifyError,
140157
// });
141158
}
142-
TransactionPoolCandidateAction::VerifySuccess { peer_id, verify_id } => {
143-
state.verify_result(meta.time(), peer_id, *verify_id, Ok(()));
159+
TransactionPoolCandidateAction::VerifySuccess {
160+
peer_id,
161+
verify_id,
162+
from_source,
163+
} => {
164+
state.verify_result(meta.time(), peer_id, *verify_id, from_source, Ok(()));
144165
}
145166
TransactionPoolCandidateAction::PeerPrune { peer_id } => {
146167
state.peer_remove(*peer_id);

node/src/transaction_pool/candidate/transaction_pool_candidate_state.rs

Lines changed: 97 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
use std::collections::{BTreeMap, BTreeSet};
44

55
use mina_p2p_messages::v2;
6+
use p2p::P2pNetworkPubsubMessageCacheId;
67
use redux::Timestamp;
78
use serde::{Deserialize, Serialize};
89

@@ -15,10 +16,17 @@ use crate::p2p::PeerId;
1516
static EMPTY_PEER_TX_CANDIDATES: BTreeMap<TransactionHash, TransactionPoolCandidateState> =
1617
BTreeMap::new();
1718

19+
type NextBatch = Option<(
20+
PeerId,
21+
Vec<TransactionWithHash>,
22+
Option<P2pNetworkPubsubMessageCacheId>,
23+
)>;
24+
1825
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
1926
pub struct TransactionPoolCandidatesState {
2027
by_peer: BTreeMap<PeerId, BTreeMap<TransactionHash, TransactionPoolCandidateState>>,
2128
by_hash: BTreeMap<TransactionHash, BTreeSet<PeerId>>,
29+
by_message_id: BTreeMap<P2pNetworkPubsubMessageCacheId, (PeerId, Vec<TransactionHash>)>,
2230
}
2331

2432
#[derive(Serialize, Deserialize, Debug, Clone)]
@@ -68,6 +76,10 @@ impl TransactionPoolCandidatesState {
6876
self.by_hash.contains_key(hash)
6977
}
7078

79+
pub fn message_id_contains(&self, message_id: &P2pNetworkPubsubMessageCacheId) -> bool {
80+
self.by_message_id.contains_key(message_id)
81+
}
82+
7183
pub fn peer_contains(&self, peer_id: PeerId, hash: &TransactionHash) -> bool {
7284
self.by_peer
7385
.get(&peer_id)
@@ -203,10 +215,42 @@ impl TransactionPoolCandidatesState {
203215
self.by_peer.entry(peer_id).or_default().insert(hash, state);
204216
}
205217

206-
pub fn get_batch_to_verify(&self) -> Option<(PeerId, Vec<TransactionWithHash>)> {
207-
for hash in self.by_hash.keys() {
218+
pub fn transactions_received(
219+
&mut self,
220+
time: Timestamp,
221+
peer_id: PeerId,
222+
transactions: Vec<TransactionWithHash>,
223+
message_id: P2pNetworkPubsubMessageCacheId,
224+
) {
225+
let transaction_hashes = transactions
226+
.iter()
227+
.map(TransactionWithHash::hash)
228+
.cloned()
229+
.collect::<Vec<_>>();
230+
231+
self.by_message_id
232+
.insert(message_id, (peer_id, transaction_hashes));
233+
234+
transactions.into_iter().for_each(|transaction| {
235+
self.transaction_received(time, peer_id, transaction);
236+
})
237+
}
238+
239+
/// Get next batch of transactions to verify,
240+
/// first checks if there are any transactions to verify from pubsub
241+
/// after that checks for transactions from peers
242+
pub fn get_batch_to_verify(&self) -> NextBatch {
243+
if let Some(batch) = self.next_batch_from_pubsub() {
244+
return Some(batch);
245+
}
246+
247+
self.next_batch_from_peers()
248+
}
249+
250+
fn next_batch_from_peers(&self) -> NextBatch {
251+
for (hash, peers) in self.by_hash.iter() {
208252
if let Some(res) = None.or_else(|| {
209-
for peer_id in self.by_hash.get(hash)? {
253+
for peer_id in peers {
210254
let peer_transactions = self.by_peer.get(peer_id)?;
211255
if peer_transactions.get(hash)?.transaction().is_some() {
212256
let transactions = peer_transactions
@@ -219,17 +263,39 @@ impl TransactionPoolCandidatesState {
219263
})
220264
.cloned()
221265
.collect();
222-
return Some((*peer_id, transactions));
266+
return Some((*peer_id, transactions, None));
223267
}
224268
}
225269
None
226270
}) {
227271
return Some(res);
228272
}
229273
}
274+
230275
None
231276
}
232277

278+
fn next_batch_from_pubsub(&self) -> NextBatch {
279+
let (message_id, (peer_id, transaction_hashes)) = self.by_message_id.iter().next()?;
280+
let transactions = self
281+
.by_peer
282+
.get(peer_id)?
283+
.iter()
284+
.filter_map(|(hash, state)| {
285+
let TransactionPoolCandidateState::Received { transaction, .. } = state else {
286+
return None;
287+
};
288+
if transaction_hashes.contains(hash) {
289+
Some(transaction)
290+
} else {
291+
None
292+
}
293+
})
294+
.cloned()
295+
.collect();
296+
297+
return Some((*peer_id, transactions, Some(*message_id)));
298+
}
233299
pub fn verify_pending(
234300
&mut self,
235301
time: Timestamp,
@@ -259,8 +325,21 @@ impl TransactionPoolCandidatesState {
259325
_time: Timestamp,
260326
peer_id: &PeerId,
261327
verify_id: (),
328+
from_source: &Option<P2pNetworkPubsubMessageCacheId>,
262329
_result: Result<(), ()>,
263330
) {
331+
if let Some(from_source) = from_source {
332+
let Some((_, transactions)) = self.by_message_id.remove(from_source) else {
333+
return;
334+
};
335+
336+
for hash in transactions {
337+
self.transaction_remove(&hash);
338+
}
339+
340+
return;
341+
}
342+
264343
if let Some(peer_transactions) = self.by_peer.get_mut(peer_id) {
265344
let txs_to_remove = peer_transactions
266345
.iter()
@@ -344,6 +423,20 @@ impl TransactionPoolCandidatesState {
344423
!peers.is_empty()
345424
})
346425
}
426+
427+
pub fn remove_pubsub_transactions(&mut self, message_id: &P2pNetworkPubsubMessageCacheId) {
428+
let Some((peer_id, transactions)) = self.by_message_id.remove(message_id) else {
429+
return;
430+
};
431+
432+
let Some(peer_transactions) = self.by_peer.get_mut(&peer_id) else {
433+
return;
434+
};
435+
436+
transactions.into_iter().for_each(|tx| {
437+
peer_transactions.remove(&tx);
438+
});
439+
}
347440
}
348441

349442
impl TransactionPoolCandidateState {

node/src/transaction_pool/transaction_pool_reducer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ impl TransactionPoolState {
165165
}),
166166
peer_id: None,
167167
reason: "Transaction diff rejected".to_owned(),
168-
})
168+
});
169169
}
170170
TransactionPoolMessageSource::None => {}
171171
}

p2p/src/channels/transaction/p2p_channels_transaction_reducer.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,22 +215,23 @@ impl P2pChannelsTransactionState {
215215
P2pChannelsTransactionAction::Libp2pReceived {
216216
transactions,
217217
message_id,
218+
peer_id,
218219
..
219220
} => {
220221
let (dispatcher, state) = state_context.into_dispatcher_and_state();
221222
let p2p_state: &P2pState = state.substate()?;
222223

223224
if let Some(callback) = &p2p_state
224225
.callbacks
225-
.on_p2p_channels_transaction_libp2p_received
226+
.on_p2p_channels_transactions_libp2p_received
226227
{
227228
let transactions = transactions
228229
.into_iter()
229230
.map(TransactionWithHash::try_new)
230231
.filter_map(Result::ok)
231232
.collect();
232233

233-
dispatcher.push_callback(callback.clone(), (transactions, message_id));
234+
dispatcher.push_callback(callback.clone(), (peer_id, transactions, message_id));
234235
}
235236

236237
Ok(())

0 commit comments

Comments
 (0)