Skip to content

Commit 27ba379

Browse files
authored
Merge pull request #1079 from 0xMimir/feat/tx-pool-candidate-update
Changed action flow for transaction in pubsub
2 parents 52c20c6 + 379a6db commit 27ba379

File tree

9 files changed

+162
-36
lines changed

9 files changed

+162
-36
lines changed

node/src/action_kind.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,7 @@ pub enum ActionKind {
607607
TransactionPoolCandidateFetchPending,
608608
TransactionPoolCandidateFetchSuccess,
609609
TransactionPoolCandidateInfoReceived,
610+
TransactionPoolCandidateLibp2pTransactionsReceived,
610611
TransactionPoolCandidatePeerPrune,
611612
TransactionPoolCandidateVerifyError,
612613
TransactionPoolCandidateVerifyNext,
@@ -719,7 +720,7 @@ pub enum ActionKind {
719720
}
720721

721722
impl ActionKind {
722-
pub const COUNT: u16 = 609;
723+
pub const COUNT: u16 = 610;
723724
}
724725

725726
impl std::fmt::Display for ActionKind {
@@ -1553,6 +1554,9 @@ impl ActionKindGet for TransactionPoolCandidateAction {
15531554
Self::FetchPending { .. } => ActionKind::TransactionPoolCandidateFetchPending,
15541555
Self::FetchError { .. } => ActionKind::TransactionPoolCandidateFetchError,
15551556
Self::FetchSuccess { .. } => ActionKind::TransactionPoolCandidateFetchSuccess,
1557+
Self::Libp2pTransactionsReceived { .. } => {
1558+
ActionKind::TransactionPoolCandidateLibp2pTransactionsReceived
1559+
}
15561560
Self::VerifyNext => ActionKind::TransactionPoolCandidateVerifyNext,
15571561
Self::VerifyPending { .. } => ActionKind::TransactionPoolCandidateVerifyPending,
15581562
Self::VerifyError { .. } => ActionKind::TransactionPoolCandidateVerifyError,

node/src/snark_pool/candidate/snark_pool_candidate_actions.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,10 @@ impl redux::EnablingCondition<crate::State> for SnarkPoolCandidateAction {
112112
}
113113
})
114114
}
115-
SnarkPoolCandidateAction::WorkVerifyNext => state.snark.work_verify.jobs.is_empty(),
115+
SnarkPoolCandidateAction::WorkVerifyNext => {
116+
state.snark.work_verify.jobs.is_empty()
117+
&& state.transition_frontier.sync.is_synced()
118+
}
116119
SnarkPoolCandidateAction::WorkVerifyPending {
117120
peer_id, job_ids, ..
118121
} => {

node/src/state.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@ use std::time::Duration;
44
use malloc_size_of_derive::MallocSizeOf;
55
use mina_p2p_messages::v2;
66
use openmina_core::constants::PROTOCOL_VERSION;
7-
use openmina_core::transaction::{
8-
TransactionInfo, TransactionPoolMessageSource, TransactionWithHash,
9-
};
7+
use openmina_core::transaction::{TransactionInfo, TransactionWithHash};
108
use p2p::P2pNetworkPubsubMessageCacheId;
119
use rand::prelude::*;
1210

@@ -59,7 +57,7 @@ pub use crate::transition_frontier::TransitionFrontierState;
5957
pub use crate::watched_accounts::WatchedAccountsState;
6058
pub use crate::Config;
6159
use crate::{config::GlobalConfig, SnarkPoolAction};
62-
use crate::{ActionWithMeta, RpcAction, TransactionPoolAction};
60+
use crate::{ActionWithMeta, RpcAction};
6361

6462
#[derive(Serialize, Deserialize, Debug, Clone)]
6563
pub struct State {
@@ -540,11 +538,12 @@ impl P2p {
540538
}
541539
}
542540
)),
543-
on_p2p_channels_transaction_libp2p_received: Some(redux::callback!(
544-
on_p2p_channels_transaction_libp2p_received((transactions: Vec<TransactionWithHash>, id: P2pNetworkPubsubMessageCacheId)) -> crate::Action {
545-
TransactionPoolAction::StartVerify {
546-
commands: transactions.into_iter().collect(),
547-
from_source: TransactionPoolMessageSource::pubsub(id),
541+
on_p2p_channels_transactions_libp2p_received: Some(redux::callback!(
542+
on_p2p_channels_transactions_libp2p_received((peer_id: PeerId, transactions: Vec<TransactionWithHash>, message_id: P2pNetworkPubsubMessageCacheId)) -> crate::Action {
543+
TransactionPoolCandidateAction::Libp2pTransactionsReceived {
544+
message_id,
545+
transactions,
546+
peer_id
548547
}
549548
}
550549
)),

node/src/transaction_pool/candidate/transaction_pool_candidate_actions.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1-
use openmina_core::transaction::{TransactionHash, TransactionInfo, TransactionWithHash};
1+
use openmina_core::transaction::{
2+
TransactionHash, TransactionInfo, TransactionPoolMessageSource, TransactionWithHash,
3+
};
24
use openmina_core::ActionEvent;
5+
use p2p::P2pNetworkPubsubMessageCacheId;
36
use serde::{Deserialize, Serialize};
47

58
use crate::p2p::channels::rpc::P2pRpcId;
@@ -37,12 +40,19 @@ pub enum TransactionPoolCandidateAction {
3740
peer_id: PeerId,
3841
transaction: TransactionWithHash,
3942
},
43+
/// Callback for transactions received over pubsub
44+
Libp2pTransactionsReceived {
45+
peer_id: PeerId,
46+
transactions: Vec<TransactionWithHash>,
47+
message_id: P2pNetworkPubsubMessageCacheId,
48+
},
4049
#[action_event(level = trace)]
4150
VerifyNext,
4251
VerifyPending {
4352
peer_id: PeerId,
4453
transaction_hashes: Vec<TransactionHash>,
4554
verify_id: (),
55+
from_source: TransactionPoolMessageSource,
4656
},
4757
VerifyError {
4858
peer_id: PeerId,
@@ -51,6 +61,7 @@ pub enum TransactionPoolCandidateAction {
5161
VerifySuccess {
5262
peer_id: PeerId,
5363
verify_id: (),
64+
from_source: TransactionPoolMessageSource,
5465
},
5566
PeerPrune {
5667
peer_id: PeerId,
@@ -100,7 +111,11 @@ impl redux::EnablingCondition<crate::State> for TransactionPoolCandidateAction {
100111
.candidates
101112
.get(*peer_id, transaction.hash())
102113
.is_some(),
103-
TransactionPoolCandidateAction::VerifyNext => true,
114+
TransactionPoolCandidateAction::Libp2pTransactionsReceived { .. } => true,
115+
TransactionPoolCandidateAction::VerifyNext => {
116+
// TODO: if a block is being applied or produced, skip this action too
117+
state.transition_frontier.sync.is_synced()
118+
}
104119
TransactionPoolCandidateAction::VerifyPending {
105120
peer_id,
106121
transaction_hashes,

node/src/transaction_pool/candidate/transaction_pool_candidate_reducer.rs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#![allow(clippy::unit_arg)]
22

33
use crate::{p2p_ready, TransactionPoolAction};
4-
use openmina_core::transaction::TransactionPoolMessageSource;
54
use p2p::{
65
channels::rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcRequest},
76
PeerId,
@@ -90,38 +89,53 @@ impl TransactionPoolCandidatesState {
9089
} => {
9190
state.transaction_received(meta.time(), *peer_id, transaction.clone());
9291
}
92+
TransactionPoolCandidateAction::Libp2pTransactionsReceived {
93+
peer_id,
94+
transactions,
95+
message_id,
96+
} => {
97+
state.transactions_received(
98+
meta.time(),
99+
*peer_id,
100+
transactions.clone(),
101+
*message_id,
102+
);
103+
}
93104
TransactionPoolCandidateAction::VerifyNext => {
94105
let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
95106

96107
let batch = global_state
97108
.transaction_pool
98109
.candidates
99110
.get_batch_to_verify();
100-
let Some((peer_id, batch)) = batch else {
111+
let Some((peer_id, batch, from_source)) = batch else {
101112
return;
102113
};
103114

104115
let transaction_hashes = batch.iter().map(|tx| tx.hash().clone()).collect();
105116
dispatcher.push(TransactionPoolAction::StartVerify {
106117
commands: batch.into_iter().collect(),
107-
from_source: TransactionPoolMessageSource::None,
118+
from_source,
108119
});
109120
dispatcher.push(TransactionPoolCandidateAction::VerifyPending {
110121
peer_id,
111122
transaction_hashes,
112123
verify_id: (),
124+
from_source,
113125
});
114126
}
115127
TransactionPoolCandidateAction::VerifyPending {
116128
peer_id,
117129
transaction_hashes,
118130
verify_id,
131+
from_source,
119132
} => {
120133
state.verify_pending(meta.time(), peer_id, *verify_id, transaction_hashes);
121134
let dispatcher = state_context.into_dispatcher();
122135
dispatcher.push(TransactionPoolCandidateAction::VerifySuccess {
123136
peer_id: *peer_id,
124137
verify_id: *verify_id,
138+
from_source: *from_source,
125139
});
126140
}
127141
TransactionPoolCandidateAction::VerifyError {
@@ -139,8 +153,12 @@ impl TransactionPoolCandidatesState {
139153
// reason: P2pDisconnectionReason::TransactionPoolVerifyError,
140154
// });
141155
}
142-
TransactionPoolCandidateAction::VerifySuccess { peer_id, verify_id } => {
143-
state.verify_result(meta.time(), peer_id, *verify_id, Ok(()));
156+
TransactionPoolCandidateAction::VerifySuccess {
157+
peer_id,
158+
verify_id,
159+
from_source,
160+
} => {
161+
state.verify_result(meta.time(), peer_id, *verify_id, from_source, Ok(()));
144162
}
145163
TransactionPoolCandidateAction::PeerPrune { peer_id } => {
146164
state.peer_remove(*peer_id);

node/src/transaction_pool/candidate/transaction_pool_candidate_state.rs

Lines changed: 96 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
use std::collections::{BTreeMap, BTreeSet};
44

55
use mina_p2p_messages::v2;
6+
use openmina_core::transaction::TransactionPoolMessageSource;
7+
use p2p::P2pNetworkPubsubMessageCacheId;
68
use redux::Timestamp;
79
use serde::{Deserialize, Serialize};
810

@@ -15,10 +17,17 @@ use crate::p2p::PeerId;
1517
static EMPTY_PEER_TX_CANDIDATES: BTreeMap<TransactionHash, TransactionPoolCandidateState> =
1618
BTreeMap::new();
1719

20+
type NextBatch = (
21+
PeerId,
22+
Vec<TransactionWithHash>,
23+
TransactionPoolMessageSource,
24+
);
25+
1826
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
1927
pub struct TransactionPoolCandidatesState {
2028
by_peer: BTreeMap<PeerId, BTreeMap<TransactionHash, TransactionPoolCandidateState>>,
2129
by_hash: BTreeMap<TransactionHash, BTreeSet<PeerId>>,
30+
by_message_id: BTreeMap<P2pNetworkPubsubMessageCacheId, (PeerId, Vec<TransactionHash>)>,
2231
}
2332

2433
#[derive(Serialize, Deserialize, Debug, Clone)]
@@ -68,6 +77,10 @@ impl TransactionPoolCandidatesState {
6877
self.by_hash.contains_key(hash)
6978
}
7079

80+
pub fn message_id_contains(&self, message_id: &P2pNetworkPubsubMessageCacheId) -> bool {
81+
self.by_message_id.contains_key(message_id)
82+
}
83+
7184
pub fn peer_contains(&self, peer_id: PeerId, hash: &TransactionHash) -> bool {
7285
self.by_peer
7386
.get(&peer_id)
@@ -203,10 +216,39 @@ impl TransactionPoolCandidatesState {
203216
self.by_peer.entry(peer_id).or_default().insert(hash, state);
204217
}
205218

206-
pub fn get_batch_to_verify(&self) -> Option<(PeerId, Vec<TransactionWithHash>)> {
207-
for hash in self.by_hash.keys() {
219+
pub fn transactions_received(
220+
&mut self,
221+
time: Timestamp,
222+
peer_id: PeerId,
223+
transactions: Vec<TransactionWithHash>,
224+
message_id: P2pNetworkPubsubMessageCacheId,
225+
) {
226+
let transaction_hashes = transactions
227+
.iter()
228+
.map(TransactionWithHash::hash)
229+
.cloned()
230+
.collect::<Vec<_>>();
231+
232+
self.by_message_id
233+
.insert(message_id, (peer_id, transaction_hashes));
234+
235+
transactions.into_iter().for_each(|transaction| {
236+
self.transaction_received(time, peer_id, transaction);
237+
})
238+
}
239+
240+
/// Get next batch of transactions to verify,
241+
/// first checks if there are any transactions to verify from pubsub
242+
/// after that checks for transactions from peers
243+
pub fn get_batch_to_verify(&self) -> Option<NextBatch> {
244+
self.next_batch_from_pubsub()
245+
.or_else(|| self.next_batch_from_peers())
246+
}
247+
248+
fn next_batch_from_peers(&self) -> Option<NextBatch> {
249+
for (hash, peers) in self.by_hash.iter() {
208250
if let Some(res) = None.or_else(|| {
209-
for peer_id in self.by_hash.get(hash)? {
251+
for peer_id in peers {
210252
let peer_transactions = self.by_peer.get(peer_id)?;
211253
if peer_transactions.get(hash)?.transaction().is_some() {
212254
let transactions = peer_transactions
@@ -219,17 +261,44 @@ impl TransactionPoolCandidatesState {
219261
})
220262
.cloned()
221263
.collect();
222-
return Some((*peer_id, transactions));
264+
return Some((*peer_id, transactions, TransactionPoolMessageSource::None));
223265
}
224266
}
225267
None
226268
}) {
227269
return Some(res);
228270
}
229271
}
272+
230273
None
231274
}
232275

276+
fn next_batch_from_pubsub(&self) -> Option<NextBatch> {
277+
let (message_id, (peer_id, transaction_hashes)) = self.by_message_id.iter().next()?;
278+
let transactions = self
279+
.by_peer
280+
.get(peer_id)?
281+
.iter()
282+
.filter_map(|(hash, state)| {
283+
let TransactionPoolCandidateState::Received { transaction, .. } = state else {
284+
return None;
285+
};
286+
if transaction_hashes.contains(hash) {
287+
Some(transaction)
288+
} else {
289+
None
290+
}
291+
})
292+
.cloned()
293+
.collect();
294+
295+
Some((
296+
*peer_id,
297+
transactions,
298+
TransactionPoolMessageSource::pubsub(*message_id),
299+
))
300+
}
301+
233302
pub fn verify_pending(
234303
&mut self,
235304
time: Timestamp,
@@ -259,17 +328,31 @@ impl TransactionPoolCandidatesState {
259328
_time: Timestamp,
260329
peer_id: &PeerId,
261330
verify_id: (),
331+
from_source: &TransactionPoolMessageSource,
262332
_result: Result<(), ()>,
263333
) {
264-
if let Some(peer_transactions) = self.by_peer.get_mut(peer_id) {
265-
let txs_to_remove = peer_transactions
266-
.iter()
267-
.filter(|(_, job_state)| job_state.pending_verify_id() == Some(verify_id))
268-
.map(|(hash, _)| hash.clone())
269-
.collect::<Vec<_>>();
270-
271-
for hash in txs_to_remove {
272-
self.transaction_remove(&hash);
334+
match from_source {
335+
TransactionPoolMessageSource::Pubsub { id } => {
336+
let Some((_, transactions)) = self.by_message_id.remove(id) else {
337+
return;
338+
};
339+
340+
for hash in transactions {
341+
self.transaction_remove(&hash);
342+
}
343+
}
344+
_ => {
345+
if let Some(peer_transactions) = self.by_peer.get_mut(peer_id) {
346+
let txs_to_remove = peer_transactions
347+
.iter()
348+
.filter(|(_, job_state)| job_state.pending_verify_id() == Some(verify_id))
349+
.map(|(hash, _)| hash.clone())
350+
.collect::<Vec<_>>();
351+
352+
for hash in txs_to_remove {
353+
self.transaction_remove(&hash);
354+
}
355+
}
273356
}
274357
}
275358
}

node/src/transaction_pool/transaction_pool_reducer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ impl TransactionPoolState {
165165
}),
166166
peer_id: None,
167167
reason: "Transaction diff rejected".to_owned(),
168-
})
168+
});
169169
}
170170
TransactionPoolMessageSource::None => {}
171171
}

0 commit comments

Comments
 (0)