Skip to content

Commit 379a6db

Browse files
committed
Review fixes
1 parent c2ec5e5 commit 379a6db

File tree

3 files changed

+44
-55
lines changed

3 files changed

+44
-55
lines changed

node/src/transaction_pool/candidate/transaction_pool_candidate_actions.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use openmina_core::transaction::{TransactionHash, TransactionInfo, TransactionWithHash};
1+
use openmina_core::transaction::{
2+
TransactionHash, TransactionInfo, TransactionPoolMessageSource, TransactionWithHash,
3+
};
24
use openmina_core::ActionEvent;
35
use p2p::P2pNetworkPubsubMessageCacheId;
46
use serde::{Deserialize, Serialize};
@@ -50,7 +52,7 @@ pub enum TransactionPoolCandidateAction {
5052
peer_id: PeerId,
5153
transaction_hashes: Vec<TransactionHash>,
5254
verify_id: (),
53-
from_source: Option<P2pNetworkPubsubMessageCacheId>,
55+
from_source: TransactionPoolMessageSource,
5456
},
5557
VerifyError {
5658
peer_id: PeerId,
@@ -59,7 +61,7 @@ pub enum TransactionPoolCandidateAction {
5961
VerifySuccess {
6062
peer_id: PeerId,
6163
verify_id: (),
62-
from_source: Option<P2pNetworkPubsubMessageCacheId>,
64+
from_source: TransactionPoolMessageSource,
6365
},
6466
PeerPrune {
6567
peer_id: PeerId,

node/src/transaction_pool/candidate/transaction_pool_candidate_reducer.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#![allow(clippy::unit_arg)]
22

33
use crate::{p2p_ready, TransactionPoolAction};
4-
use openmina_core::transaction::TransactionPoolMessageSource;
54
use p2p::{
65
channels::rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcRequest},
76
PeerId,
@@ -116,9 +115,7 @@ impl TransactionPoolCandidatesState {
116115
let transaction_hashes = batch.iter().map(|tx| tx.hash().clone()).collect();
117116
dispatcher.push(TransactionPoolAction::StartVerify {
118117
commands: batch.into_iter().collect(),
119-
from_source: from_source
120-
.map(TransactionPoolMessageSource::pubsub)
121-
.unwrap_or_default(),
118+
from_source,
122119
});
123120
dispatcher.push(TransactionPoolCandidateAction::VerifyPending {
124121
peer_id,

node/src/transaction_pool/candidate/transaction_pool_candidate_state.rs

Lines changed: 38 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
use std::collections::{BTreeMap, BTreeSet};
44

55
use mina_p2p_messages::v2;
6+
use openmina_core::transaction::TransactionPoolMessageSource;
67
use p2p::P2pNetworkPubsubMessageCacheId;
78
use redux::Timestamp;
89
use serde::{Deserialize, Serialize};
@@ -16,11 +17,11 @@ use crate::p2p::PeerId;
1617
static EMPTY_PEER_TX_CANDIDATES: BTreeMap<TransactionHash, TransactionPoolCandidateState> =
1718
BTreeMap::new();
1819

19-
type NextBatch = Option<(
20+
type NextBatch = (
2021
PeerId,
2122
Vec<TransactionWithHash>,
22-
Option<P2pNetworkPubsubMessageCacheId>,
23-
)>;
23+
TransactionPoolMessageSource,
24+
);
2425

2526
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
2627
pub struct TransactionPoolCandidatesState {
@@ -236,18 +237,15 @@ impl TransactionPoolCandidatesState {
236237
})
237238
}
238239

239-
/// Get next batch of transactions to verify,
240+
/// Get next batch of transactions to verify,
240241
/// first checks if there are any transactions to verify from pubsub
241242
/// after that checks for transactions from peers
242-
pub fn get_batch_to_verify(&self) -> NextBatch {
243-
if let Some(batch) = self.next_batch_from_pubsub() {
244-
return Some(batch);
245-
}
246-
247-
self.next_batch_from_peers()
243+
pub fn get_batch_to_verify(&self) -> Option<NextBatch> {
244+
self.next_batch_from_pubsub()
245+
.or_else(|| self.next_batch_from_peers())
248246
}
249247

250-
fn next_batch_from_peers(&self) -> NextBatch {
248+
fn next_batch_from_peers(&self) -> Option<NextBatch> {
251249
for (hash, peers) in self.by_hash.iter() {
252250
if let Some(res) = None.or_else(|| {
253251
for peer_id in peers {
@@ -263,7 +261,7 @@ impl TransactionPoolCandidatesState {
263261
})
264262
.cloned()
265263
.collect();
266-
return Some((*peer_id, transactions, None));
264+
return Some((*peer_id, transactions, TransactionPoolMessageSource::None));
267265
}
268266
}
269267
None
@@ -275,7 +273,7 @@ impl TransactionPoolCandidatesState {
275273
None
276274
}
277275

278-
fn next_batch_from_pubsub(&self) -> NextBatch {
276+
fn next_batch_from_pubsub(&self) -> Option<NextBatch> {
279277
let (message_id, (peer_id, transaction_hashes)) = self.by_message_id.iter().next()?;
280278
let transactions = self
281279
.by_peer
@@ -294,8 +292,13 @@ impl TransactionPoolCandidatesState {
294292
.cloned()
295293
.collect();
296294

297-
return Some((*peer_id, transactions, Some(*message_id)));
295+
Some((
296+
*peer_id,
297+
transactions,
298+
TransactionPoolMessageSource::pubsub(*message_id),
299+
))
298300
}
301+
299302
pub fn verify_pending(
300303
&mut self,
301304
time: Timestamp,
@@ -325,30 +328,31 @@ impl TransactionPoolCandidatesState {
325328
_time: Timestamp,
326329
peer_id: &PeerId,
327330
verify_id: (),
328-
from_source: &Option<P2pNetworkPubsubMessageCacheId>,
331+
from_source: &TransactionPoolMessageSource,
329332
_result: Result<(), ()>,
330333
) {
331-
if let Some(from_source) = from_source {
332-
let Some((_, transactions)) = self.by_message_id.remove(from_source) else {
333-
return;
334-
};
334+
match from_source {
335+
TransactionPoolMessageSource::Pubsub { id } => {
336+
let Some((_, transactions)) = self.by_message_id.remove(id) else {
337+
return;
338+
};
335339

336-
for hash in transactions {
337-
self.transaction_remove(&hash);
340+
for hash in transactions {
341+
self.transaction_remove(&hash);
342+
}
338343
}
339-
340-
return;
341-
}
342-
343-
if let Some(peer_transactions) = self.by_peer.get_mut(peer_id) {
344-
let txs_to_remove = peer_transactions
345-
.iter()
346-
.filter(|(_, job_state)| job_state.pending_verify_id() == Some(verify_id))
347-
.map(|(hash, _)| hash.clone())
348-
.collect::<Vec<_>>();
349-
350-
for hash in txs_to_remove {
351-
self.transaction_remove(&hash);
344+
_ => {
345+
if let Some(peer_transactions) = self.by_peer.get_mut(peer_id) {
346+
let txs_to_remove = peer_transactions
347+
.iter()
348+
.filter(|(_, job_state)| job_state.pending_verify_id() == Some(verify_id))
349+
.map(|(hash, _)| hash.clone())
350+
.collect::<Vec<_>>();
351+
352+
for hash in txs_to_remove {
353+
self.transaction_remove(&hash);
354+
}
355+
}
352356
}
353357
}
354358
}
@@ -423,20 +427,6 @@ impl TransactionPoolCandidatesState {
423427
!peers.is_empty()
424428
})
425429
}
426-
427-
pub fn remove_pubsub_transactions(&mut self, message_id: &P2pNetworkPubsubMessageCacheId) {
428-
let Some((peer_id, transactions)) = self.by_message_id.remove(message_id) else {
429-
return;
430-
};
431-
432-
let Some(peer_transactions) = self.by_peer.get_mut(&peer_id) else {
433-
return;
434-
};
435-
436-
transactions.into_iter().for_each(|tx| {
437-
peer_transactions.remove(&tx);
438-
});
439-
}
440430
}
441431

442432
impl TransactionPoolCandidateState {

0 commit comments

Comments
 (0)