Skip to content

Commit 0fee577

Browse files
committed
Review fixes
1 parent 7d40987 commit 0fee577

15 files changed

+135
-132
lines changed

node/src/action_kind.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,7 @@ pub enum ActionKind {
391391
P2pNetworkPnetEffectfulOutgoingData,
392392
P2pNetworkPnetEffectfulSetupNonce,
393393
P2pNetworkPubsubBroadcast,
394+
P2pNetworkPubsubBroadcastMessage,
394395
P2pNetworkPubsubBroadcastSigned,
395396
P2pNetworkPubsubBroadcastValidatedMessage,
396397
P2pNetworkPubsubGraft,
@@ -719,7 +720,7 @@ pub enum ActionKind {
719720
}
720721

721722
impl ActionKind {
722-
pub const COUNT: u16 = 609;
723+
pub const COUNT: u16 = 610;
723724
}
724725

725726
impl std::fmt::Display for ActionKind {
@@ -2003,6 +2004,7 @@ impl ActionKindGet for P2pNetworkPubsubAction {
20032004
Self::BroadcastValidatedMessage { .. } => {
20042005
ActionKind::P2pNetworkPubsubBroadcastValidatedMessage
20052006
}
2007+
Self::BroadcastMessage { .. } => ActionKind::P2pNetworkPubsubBroadcastMessage,
20062008
}
20072009
}
20082010
}

node/src/snark_pool/candidate/snark_pool_candidate_actions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ pub enum SnarkPoolCandidateAction {
4949
WorkVerifyError {
5050
peer_id: PeerId,
5151
verify_id: SnarkWorkVerifyId,
52-
batch: Vec<Snark>,
52+
batch: Vec<SnarkJobId>,
5353
},
5454
WorkVerifySuccess {
5555
peer_id: PeerId,

node/src/snark_pool/candidate/snark_pool_candidate_reducer.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::collections::BTreeMap;
22

33
use crate::{p2p_ready, SnarkPoolAction};
4-
use openmina_core::snark::Snark;
4+
use openmina_core::snark::{Snark, SnarkJobId};
55
use p2p::{
66
channels::rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcRequest},
77
disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
@@ -124,7 +124,7 @@ impl SnarkPoolCandidatesState {
124124
}
125125
}),
126126
on_error: redux::callback!(
127-
on_snark_pool_candidate_work_verify_error((req_id: SnarkWorkVerifyId, sender: String, batch: Vec<Snark>)) -> crate::Action {
127+
on_snark_pool_candidate_work_verify_error((req_id: SnarkWorkVerifyId, sender: String, batch: Vec<SnarkJobId>)) -> crate::Action {
128128
SnarkPoolCandidateAction::WorkVerifyError {
129129
peer_id: sender.parse().unwrap(),
130130
verify_id: req_id,
@@ -160,10 +160,10 @@ impl SnarkPoolCandidatesState {
160160
reason: P2pDisconnectionReason::SnarkPoolVerifyError,
161161
});
162162

163-
for snark in batch {
163+
for snark_job_id in batch {
164164
dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
165165
message_id: Some(BroadcastMessageId::Snark {
166-
job_id: snark.job_id(),
166+
job_id: snark_job_id.clone(),
167167
}),
168168
peer_id: None,
169169
reason: "Snark work verification failed".to_string(),

node/src/state.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -539,9 +539,9 @@ impl P2p {
539539
}
540540
)),
541541
on_p2p_channels_transaction_libp2p_received: Some(redux::callback!(
542-
on_p2p_channels_transaction_libp2p_received(transaction: Box<TransactionWithHash>) -> crate::Action {
542+
on_p2p_channels_transaction_libp2p_received(transactions: Vec<TransactionWithHash>) -> crate::Action {
543543
TransactionPoolAction::StartVerify {
544-
commands: std::iter::once(*transaction).collect(),
544+
commands: transactions.into_iter().collect(),
545545
from_rpc: None
546546
}
547547
}

node/src/transaction_pool/transaction_pool_reducer.rs

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -198,15 +198,14 @@ impl TransactionPoolState {
198198
});
199199
}
200200
TransactionPoolAction::VerifyError { tx_hashes, .. } => {
201-
// just logging the errors
202201
let dispatcher = state.into_dispatcher();
203-
for tx in tx_hashes {
204-
dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
205-
message_id: Some(BroadcastMessageId::Transaction { tx: tx.clone() }),
206-
peer_id: None,
207-
reason: "Transaction rejected".to_owned(),
208-
});
209-
}
202+
dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
203+
message_id: Some(BroadcastMessageId::TransactionDiff {
204+
txs: tx_hashes.clone(),
205+
}),
206+
peer_id: None,
207+
reason: "Transaction rejected".to_owned(),
208+
});
210209
}
211210
TransactionPoolAction::BestTipChanged { best_tip_hash } => {
212211
let account_ids = substate.pool.get_accounts_to_revalidate_on_new_best_tip();
@@ -321,20 +320,17 @@ impl TransactionPoolState {
321320
}
322321

323322
if was_accepted {
324-
for tx in &accepted {
325-
dispatcher.push(P2pNetworkPubsubAction::BroadcastValidatedMessage {
326-
message_id: BroadcastMessageId::Transaction {
327-
tx: tx.hash.clone(),
328-
},
329-
});
330-
}
331-
for (tx, _) in &rejected {
332-
dispatcher.push(P2pNetworkPubsubAction::BroadcastValidatedMessage {
333-
message_id: BroadcastMessageId::Transaction {
334-
tx: tx.hash.clone(),
335-
},
336-
});
337-
}
323+
let rejected_map = rejected.iter().map(|(cmd, _)| cmd.hash.clone());
324+
dispatcher.push(P2pNetworkPubsubAction::BroadcastValidatedMessage {
325+
message_id: BroadcastMessageId::TransactionDiff {
326+
txs: accepted
327+
.iter()
328+
.map(|cmd| cmd.hash.clone())
329+
.chain(rejected_map)
330+
.collect(),
331+
},
332+
});
333+
338334
dispatcher.push(TransactionPoolAction::Rebroadcast {
339335
accepted,
340336
rejected,

p2p/src/channels/transaction/p2p_channels_transaction_actions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ pub enum P2pChannelsTransactionAction {
4646
},
4747
Libp2pReceived {
4848
peer_id: PeerId,
49-
transaction: Box<Transaction>,
49+
transactions: Vec<Transaction>,
5050
nonce: u32,
5151
},
5252
Libp2pBroadcast {

p2p/src/channels/transaction/p2p_channels_transaction_reducer.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -212,17 +212,21 @@ impl P2pChannelsTransactionState {
212212
}
213213
Ok(())
214214
}
215-
P2pChannelsTransactionAction::Libp2pReceived { transaction, .. } => {
215+
P2pChannelsTransactionAction::Libp2pReceived { transactions, .. } => {
216216
let (dispatcher, state) = state_context.into_dispatcher_and_state();
217217
let p2p_state: &P2pState = state.substate()?;
218218

219219
if let Some(callback) = &p2p_state
220220
.callbacks
221221
.on_p2p_channels_transaction_libp2p_received
222222
{
223-
if let Ok(transaction) = TransactionWithHash::try_new(*transaction) {
224-
dispatcher.push_callback(callback.clone(), Box::new(transaction));
225-
}
223+
let transactions = transactions
224+
.into_iter()
225+
.map(TransactionWithHash::try_new)
226+
.filter_map(Result::ok)
227+
.collect();
228+
229+
dispatcher.push_callback(callback.clone(), transactions);
226230
}
227231

228232
Ok(())

p2p/src/network/pubsub/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ pub enum BroadcastMessageId {
3737
Snark {
3838
job_id: SnarkJobId,
3939
},
40-
Transaction {
41-
tx: TransactionHash,
40+
TransactionDiff {
41+
txs: Vec<TransactionHash>,
4242
},
4343
MessageId {
4444
message_id: P2pNetworkPubsubMessageCacheId,

p2p/src/network/pubsub/p2p_network_pubsub_actions.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ pub enum P2pNetworkPubsubAction {
182182
BroadcastValidatedMessage {
183183
message_id: BroadcastMessageId,
184184
},
185+
186+
BroadcastMessage {
187+
message_id: P2pNetworkPubsubMessageCacheId,
188+
},
185189
}
186190

187191
impl From<P2pNetworkPubsubAction> for crate::P2pAction {

p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs

Lines changed: 58 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
1-
use std::{
2-
collections::{btree_map::Entry, BTreeMap},
3-
time::Duration,
4-
};
1+
use std::{collections::btree_map::Entry, time::Duration};
52

63
use binprot::BinProtRead;
74
use mina_p2p_messages::{
85
gossip::{self, GossipNetMessageV2},
96
v2::NetworkPoolSnarkPoolDiffVersionedStableV2,
107
};
118
use openmina_core::{
12-
block::BlockWithHash, bug_condition, fuzz_maybe, fuzzed_maybe, snark::Snark, Substate,
9+
block::BlockWithHash, bug_condition, fuzz_maybe, fuzzed_maybe, snark::Snark,
10+
transaction::Transaction, Substate,
1311
};
1412
use redux::{Dispatcher, Timestamp};
1513

@@ -26,8 +24,8 @@ use super::{
2624
P2pNetworkPubsubMessageCacheMessage,
2725
},
2826
pb::{self, Message},
29-
BroadcastMessageId, P2pNetworkPubsubAction, P2pNetworkPubsubClientState,
30-
P2pNetworkPubsubEffectfulAction, P2pNetworkPubsubMessageCacheId, P2pNetworkPubsubState, TOPIC,
27+
P2pNetworkPubsubAction, P2pNetworkPubsubClientState, P2pNetworkPubsubEffectfulAction,
28+
P2pNetworkPubsubMessageCacheId, P2pNetworkPubsubState, TOPIC,
3129
};
3230

3331
const MAX_MESSAGE_KEEP_DURATION: Duration = Duration::from_secs(300);
@@ -577,14 +575,12 @@ impl P2pNetworkPubsubState {
577575
message: tx_message,
578576
..
579577
} => {
580-
let mut tx_hashes = BTreeMap::new();
581-
582-
for tx in &tx_message.0 {
583-
let Ok(hash) = tx.hash() else {
584-
continue;
585-
};
586-
tx_hashes.insert(hash, false);
587-
}
578+
let tx_hashes = tx_message
579+
.0
580+
.iter()
581+
.map(Transaction::hash)
582+
.filter_map(Result::ok)
583+
.collect();
588584

589585
P2pNetworkPubsubMessageCacheMessage::PreValidatedTransactions {
590586
tx_hashes,
@@ -613,13 +609,11 @@ impl P2pNetworkPubsubState {
613609
}
614610
GossipNetMessageV2::TransactionPoolDiff { message, nonce } => {
615611
let nonce = nonce.as_u32();
616-
for transaction in message.0 {
617-
dispatcher.push(P2pChannelsTransactionAction::Libp2pReceived {
618-
peer_id,
619-
transaction: Box::new(transaction),
620-
nonce,
621-
});
622-
}
612+
dispatcher.push(P2pChannelsTransactionAction::Libp2pReceived {
613+
peer_id,
614+
transactions: message.0.into_iter().collect(),
615+
nonce,
616+
});
623617
}
624618
GossipNetMessageV2::SnarkPoolDiff {
625619
message: NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(work),
@@ -637,61 +631,18 @@ impl P2pNetworkPubsubState {
637631
Ok(())
638632
}
639633
P2pNetworkPubsubAction::BroadcastValidatedMessage { message_id } => {
640-
let Some((mcache_message_id, message)) =
634+
let Some((mcache_message_id, _)) =
641635
pubsub_state.mcache.get_message_id_and_message(&message_id)
642636
else {
643637
bug_condition!("Message with id: {:?} not found", message_id);
644638
return Ok(());
645639
};
646640

647-
if let BroadcastMessageId::Transaction { tx } = &message_id {
648-
if let P2pNetworkPubsubMessageCacheMessage::PreValidatedTransactions {
649-
tx_hashes,
650-
..
651-
} = message
652-
{
653-
if let Some(value) = tx_hashes.get_mut(tx) {
654-
*value = true;
655-
} else {
656-
bug_condition!("Transaction with hash: {} not found", tx);
657-
return Ok(());
658-
}
659-
660-
let all_vertified = tx_hashes.values().all(|v| *v);
661-
if !all_vertified {
662-
return Ok(());
663-
}
664-
} else {
665-
bug_condition!("Invalid state for message id with type transaction");
666-
return Ok(());
667-
}
668-
}
669-
670-
let raw_message = message.message().clone();
671-
let peer_id = *message.peer_id();
672-
673-
pubsub_state.reduce_incoming_validated_message(
674-
mcache_message_id,
675-
peer_id,
676-
&raw_message,
677-
);
678-
679-
let Some((_message_id, message)) =
680-
pubsub_state.mcache.get_message_id_and_message(&message_id)
681-
else {
682-
bug_condition!("Message with id: {:?} not found", message_id);
683-
return Ok(());
684-
};
685-
686-
*message = P2pNetworkPubsubMessageCacheMessage::Validated {
687-
message: raw_message,
688-
peer_id,
689-
time: *message.time(),
690-
};
691-
692-
let (dispatcher, state) = state_context.into_dispatcher_and_state();
693-
694-
Self::broadcast(dispatcher, state)
641+
let dispatcher = state_context.into_dispatcher();
642+
dispatcher.push(P2pNetworkPubsubAction::BroadcastMessage {
643+
message_id: mcache_message_id,
644+
});
645+
Ok(())
695646
}
696647
P2pNetworkPubsubAction::PruneMessages {} => {
697648
let messages = pubsub_state
@@ -717,25 +668,28 @@ impl P2pNetworkPubsubState {
717668
peer_id,
718669
..
719670
} => {
720-
let mut peer_id = peer_id;
671+
let mut involved_peers = peer_id.into_iter().collect::<Vec<_>>();
672+
let mut add_peer = |peer: &PeerId| {
673+
if !involved_peers.contains(peer) {
674+
involved_peers.push(*peer);
675+
}
676+
};
677+
721678
if let Some(message_id) = message_id {
722-
let Some((_message_id, message)) =
679+
let Some((message_id, message)) =
723680
pubsub_state.mcache.get_message_id_and_message(&message_id)
724681
else {
725682
bug_condition!("Message not found for id: {:?}", message_id);
726683
return Ok(());
727684
};
728685

729-
if peer_id.is_none() {
730-
peer_id = Some(*message.peer_id());
731-
}
732-
733-
pubsub_state.mcache.remove_message(_message_id);
686+
add_peer(message.peer_id());
687+
pubsub_state.mcache.remove_message(message_id);
734688
}
735689

736690
let dispatcher = state_context.into_dispatcher();
737691

738-
if let Some(peer_id) = peer_id {
692+
for peer_id in involved_peers {
739693
dispatcher.push(P2pDisconnectionAction::Init {
740694
peer_id,
741695
reason: P2pDisconnectionReason::InvalidMessage,
@@ -745,6 +699,31 @@ impl P2pNetworkPubsubState {
745699
Ok(())
746700
}
747701
P2pNetworkPubsubAction::IgnoreMessage { .. } => Ok(()),
702+
P2pNetworkPubsubAction::BroadcastMessage { message_id } => {
703+
let Some(message) = pubsub_state.mcache.map.get(&message_id) else {
704+
bug_condition!("Message with id: {:?} not found", message_id);
705+
return Ok(());
706+
};
707+
708+
let raw_message = message.message().clone();
709+
let peer_id = *message.peer_id();
710+
711+
pubsub_state.reduce_incoming_validated_message(message_id, peer_id, &raw_message);
712+
713+
let Some(message) = pubsub_state.mcache.map.get_mut(&message_id) else {
714+
bug_condition!("Message with id: {:?} not found", message_id);
715+
return Ok(());
716+
};
717+
718+
*message = P2pNetworkPubsubMessageCacheMessage::Validated {
719+
message: raw_message,
720+
peer_id,
721+
time: *message.time(),
722+
};
723+
724+
let (dispatcher, state) = state_context.into_dispatcher_and_state();
725+
Self::broadcast(dispatcher, state)
726+
}
748727
}
749728
}
750729

0 commit comments

Comments
 (0)