Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ Test that node discovers peers another rust node and is able to bootstrap

Tests related to pubsub layer.

* `P2pReceiveBlock`
Test that node receives block over meshsub from node
* `P2pReceiveMessage`
Test that node receives message over meshsub from node

### [P2P Incoming](../../node/testing/tests/p2p_basic_incoming.rs)

Expand Down
22 changes: 21 additions & 1 deletion node/src/action_kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ pub enum ActionKind {
P2pCallbacksP2pChannelsStreamingRpcResponseReceived,
P2pCallbacksP2pChannelsStreamingRpcTimeout,
P2pCallbacksP2pDisconnection,
P2pCallbacksP2pPubsubValidateMessage,
P2pCallbacksRpcRespondBestTip,
P2pChannelsBestTipInit,
P2pChannelsBestTipPending,
Expand Down Expand Up @@ -404,7 +405,10 @@ pub enum ActionKind {
P2pNetworkPnetEffectfulSetupNonce,
P2pNetworkPubsubBroadcast,
P2pNetworkPubsubBroadcastSigned,
P2pNetworkPubsubBroadcastValidatedMessage,
P2pNetworkPubsubGraft,
P2pNetworkPubsubHandleIncomingMessage,
P2pNetworkPubsubIgnoreMessage,
P2pNetworkPubsubIncomingData,
P2pNetworkPubsubIncomingMessage,
P2pNetworkPubsubIncomingMessageCleanup,
Expand All @@ -414,8 +418,11 @@ pub enum ActionKind {
P2pNetworkPubsubOutgoingMessageClear,
P2pNetworkPubsubOutgoingMessageError,
P2pNetworkPubsubPrune,
P2pNetworkPubsubPruneMessages,
P2pNetworkPubsubRejectMessage,
P2pNetworkPubsubSign,
P2pNetworkPubsubSignError,
P2pNetworkPubsubValidateIncomingMessage,
P2pNetworkPubsubValidateIncomingMessages,
P2pNetworkPubsubEffectfulSign,
P2pNetworkPubsubEffectfulValidateIncomingMessages,
Expand Down Expand Up @@ -707,7 +714,7 @@ pub enum ActionKind {
}

impl ActionKind {
pub const COUNT: u16 = 597;
pub const COUNT: u16 = 604;
}

impl std::fmt::Display for ActionKind {
Expand Down Expand Up @@ -809,6 +816,9 @@ impl ActionKindGet for P2pCallbacksAction {
}
Self::P2pDisconnection { .. } => ActionKind::P2pCallbacksP2pDisconnection,
Self::RpcRespondBestTip { .. } => ActionKind::P2pCallbacksRpcRespondBestTip,
Self::P2pPubsubValidateMessage { .. } => {
ActionKind::P2pCallbacksP2pPubsubValidateMessage
}
}
}
}
Expand Down Expand Up @@ -1953,6 +1963,16 @@ impl ActionKindGet for P2pNetworkPubsubAction {
Self::OutgoingMessageClear { .. } => ActionKind::P2pNetworkPubsubOutgoingMessageClear,
Self::OutgoingMessageError { .. } => ActionKind::P2pNetworkPubsubOutgoingMessageError,
Self::OutgoingData { .. } => ActionKind::P2pNetworkPubsubOutgoingData,
Self::HandleIncomingMessage { .. } => ActionKind::P2pNetworkPubsubHandleIncomingMessage,
Self::ValidateIncomingMessage { .. } => {
ActionKind::P2pNetworkPubsubValidateIncomingMessage
}
Self::PruneMessages { .. } => ActionKind::P2pNetworkPubsubPruneMessages,
Self::RejectMessage { .. } => ActionKind::P2pNetworkPubsubRejectMessage,
Self::IgnoreMessage { .. } => ActionKind::P2pNetworkPubsubIgnoreMessage,
Self::BroadcastValidatedMessage { .. } => {
ActionKind::P2pNetworkPubsubBroadcastValidatedMessage
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion node/src/consensus/consensus_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ impl ConsensusState {
/// Ideally we would differentiate between requested blocks and blocks
/// received from gossip, but this difference doesn't really exist
/// in the WebRTC transport, hence this heuristic.
fn allow_block_too_late(state: &crate::State, block: &ArcBlockWithHash) -> bool {
pub fn allow_block_too_late(state: &crate::State, block: &ArcBlockWithHash) -> bool {
let (has_greater_blobal_slot, diff_with_best_tip) = state
.transition_frontier
.best_tip()
Expand Down
1 change: 1 addition & 0 deletions node/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ mod consensus_actions;
pub use consensus_actions::*;

mod consensus_reducer;
pub use consensus_reducer::allow_block_too_late;
6 changes: 5 additions & 1 deletion node/src/p2p/callbacks/p2p_callbacks_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use p2p::{
rpc::{P2pRpcId, P2pRpcRequest, P2pRpcResponse},
streaming_rpc::P2pStreamingRpcResponseFull,
},
PeerId,
P2pNetworkPubsubMessageCacheId, PeerId,
};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -46,6 +46,9 @@ pub enum P2pCallbacksAction {
RpcRespondBestTip {
peer_id: PeerId,
},
P2pPubsubValidateMessage {
message_id: P2pNetworkPubsubMessageCacheId,
},
}

impl redux::EnablingCondition<crate::State> for P2pCallbacksAction {
Expand All @@ -63,6 +66,7 @@ impl redux::EnablingCondition<crate::State> for P2pCallbacksAction {
P2pCallbacksAction::RpcRespondBestTip { .. } => {
state.transition_frontier.best_tip().is_some()
}
P2pCallbacksAction::P2pPubsubValidateMessage { .. } => true,
}
}
}
92 changes: 89 additions & 3 deletions node/src/p2p/callbacks/p2p_callbacks_reducer.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
use ark_ff::fields::arithmetic::InvalidBigInt;
use mina_p2p_messages::v2::{MinaLedgerSyncLedgerAnswerStableV2, StateHash};
use openmina_core::{block::BlockWithHash, bug_condition, transaction::TransactionWithHash};
use mina_p2p_messages::{
gossip::GossipNetMessageV2,
v2::{MinaLedgerSyncLedgerAnswerStableV2, StateHash},
};
use openmina_core::{block::BlockWithHash, bug_condition, log, transaction::TransactionWithHash};
use p2p::{
channels::{
best_tip::P2pChannelsBestTipAction,
rpc::{BestTipWithProof, P2pChannelsRpcAction, P2pRpcRequest, P2pRpcResponse},
streaming_rpc::P2pStreamingRpcResponseFull,
},
disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
PeerId,
P2pNetworkPubsubAction, PeerId,
};
use redux::{ActionMeta, ActionWithMeta, Dispatcher};

use crate::{
consensus::allow_block_too_late,
p2p_ready,
snark_pool::candidate::SnarkPoolCandidateAction,
state::BlockPrevalidationError,
transaction_pool::candidate::TransactionPoolCandidateAction,
transition_frontier::sync::{
ledger::{
Expand Down Expand Up @@ -48,6 +53,7 @@ impl crate::State {
action: ActionWithMeta<&P2pCallbacksAction>,
) {
let (action, meta) = action.split();
let time = meta.time();
let (dispatcher, state) = state_context.into_dispatcher_and_state();

match action {
Expand Down Expand Up @@ -290,6 +296,80 @@ impl crate::State {
best_tip: best_tip.clone(),
});
}
P2pCallbacksAction::P2pPubsubValidateMessage { message_id } => {
let Some(message_content) = state.p2p.ready().and_then(|p2p| {
p2p.network
.scheduler
.broadcast_state
.mcache
.get_message(message_id)
}) else {
bug_condition!("Failed to find message for id: {:?}", message_id);
return;
};

let pre_validation_result = match message_content {
GossipNetMessageV2::NewState(new_best_tip) => {
match BlockWithHash::try_new(new_best_tip.clone()) {
Ok(block) => {
let allow_block_too_late = allow_block_too_late(state, &block);
match state.prevalidate_block(&block, allow_block_too_late) {
Ok(()) => PreValidationResult::Continue,
Err(error)
if matches!(
error,
BlockPrevalidationError::ReceivedTooEarly { .. }
) =>
{
PreValidationResult::Ignore {
reason: format!(
"Block prevalidation failed: {:?}",
error
),
}
}
Err(error) => PreValidationResult::Reject {
reason: format!("Block prevalidation failed: {:?}", error),
},
}
}
Err(_) => {
log::error!(time; "P2pCallbacksAction::P2pPubsubValidateMessage: Invalid bigint in block");
PreValidationResult::Reject{reason: "P2pCallbacksAction::P2pPubsubValidateMessage: Invalid bigint in block".to_owned()}
}
}
}
_ => {
// TODO: add pre validation for Snark pool and Transaction pool diffs
PreValidationResult::Continue
}
};

match pre_validation_result {
PreValidationResult::Continue => {
dispatcher.push(P2pNetworkPubsubAction::ValidateIncomingMessage {
message_id: *message_id,
});
}
PreValidationResult::Reject { reason } => {
dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
message_id: Some(p2p::BroadcastMessageId::MessageId {
message_id: *message_id,
}),
peer_id: None,
reason,
});
}
PreValidationResult::Ignore { reason } => {
dispatcher.push(P2pNetworkPubsubAction::IgnoreMessage {
message_id: Some(p2p::BroadcastMessageId::MessageId {
message_id: *message_id,
}),
reason,
});
}
}
}
}
}

Expand Down Expand Up @@ -574,3 +654,9 @@ impl crate::State {
}
}
}

enum PreValidationResult {
Continue,
Reject { reason: String },
Ignore { reason: String },
}
11 changes: 8 additions & 3 deletions node/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::sync::Arc;
use std::time::Duration;

use mina_p2p_messages::v2;
use openmina_core::constants::PROTOCOL_VERSION;
use openmina_core::transaction::{TransactionInfo, TransactionWithHash};
use p2p::P2pNetworkPubsubMessageCacheId;
use rand::prelude::*;
use std::sync::Arc;
use std::time::Duration;

use openmina_core::block::BlockWithHash;
use openmina_core::requests::RpcId;
Expand Down Expand Up @@ -628,6 +628,11 @@ impl P2p {
P2pCallbacksAction::P2pChannelsStreamingRpcTimeout { peer_id, id }
}
)),
on_p2p_pubsub_message_received: Some(redux::callback!(
on_p2p_pubsub_message_received((message_id: P2pNetworkPubsubMessageCacheId)) -> crate::Action{
P2pCallbacksAction::P2pPubsubValidateMessage { message_id }
}
)),
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use mina_p2p_messages::v2::LedgerHash;
use openmina_core::block::{AppliedBlock, ArcBlockWithHash};
use p2p::channels::rpc::{P2pChannelsRpcAction, P2pRpcId};
use p2p::PeerId;
use p2p::{P2pNetworkPubsubAction, PeerId};
use redux::ActionMeta;

use crate::ledger::write::{LedgerWriteAction, LedgerWriteRequest};
Expand Down Expand Up @@ -304,6 +304,12 @@ impl TransitionFrontierSyncAction {
};
let error = SyncError::BlockApplyFailed(failed_block.clone(), error.clone());
store.dispatch(TransitionFrontierAction::SyncFailed { best_tip, error });
// TODO this should be handled by a callback
store.dispatch(P2pNetworkPubsubAction::RejectMessage {
message_id: Some(p2p::BroadcastMessageId::BlockHash { hash: hash.clone() }),
peer_id: None,
reason: "Failed to apply block".to_owned(),
});
}
TransitionFrontierSyncAction::BlocksNextApplySuccess {
hash,
Expand All @@ -316,6 +322,11 @@ impl TransitionFrontierSyncAction {
if !store.dispatch(TransitionFrontierSyncAction::BlocksNextApplyInit) {
store.dispatch(TransitionFrontierSyncAction::BlocksSuccess);
}

// TODO this should be handled by a callback
store.dispatch(P2pNetworkPubsubAction::BroadcastValidatedMessage {
message_id: p2p::BroadcastMessageId::BlockHash { hash: hash.clone() },
});
}
TransitionFrontierSyncAction::BlocksSuccess => {}
// Bootstrap/Catchup is practically complete at this point.
Expand Down
8 changes: 4 additions & 4 deletions node/testing/src/scenarios/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use self::p2p::basic_outgoing_connections::{
MakeMultipleOutgoingConnections, MakeOutgoingConnection,
};
use self::p2p::kademlia::KademliaBootstrap;
use self::p2p::pubsub::P2pReceiveBlock;
use self::p2p::pubsub::P2pReceiveMessage;
use self::p2p::signaling::P2pSignaling;
use self::record_replay::block_production::RecordReplayBlockProduction;
use self::record_replay::bootstrap::RecordReplayBootstrap;
Expand Down Expand Up @@ -83,7 +83,7 @@ pub enum Scenarios {
MultiNodeBasicConnectivityPeerDiscovery(MultiNodeBasicConnectivityPeerDiscovery),
SimulationSmall(SimulationSmall),
SimulationSmallForeverRealTime(SimulationSmallForeverRealTime),
P2pReceiveBlock(P2pReceiveBlock),
P2pReceiveMessage(P2pReceiveMessage),
P2pSignaling(P2pSignaling),
P2pConnectionDiscoveryRustNodeAsSeed(P2pConnectionDiscoveryRustNodeAsSeed),
MultiNodePubsubPropagateBlock(MultiNodePubsubPropagateBlock),
Expand Down Expand Up @@ -189,7 +189,7 @@ impl Scenarios {
}
Self::SimulationSmall(_) => SimulationSmall::DOCS,
Self::SimulationSmallForeverRealTime(_) => SimulationSmallForeverRealTime::DOCS,
Self::P2pReceiveBlock(_) => P2pReceiveBlock::DOCS,
Self::P2pReceiveMessage(_) => P2pReceiveMessage::DOCS,
Self::P2pSignaling(_) => P2pSignaling::DOCS,
Self::P2pConnectionDiscoveryRustNodeAsSeed(_) => {
P2pConnectionDiscoveryRustNodeAsSeed::DOCS
Expand Down Expand Up @@ -260,7 +260,7 @@ impl Scenarios {
Self::MultiNodeBasicConnectivityPeerDiscovery(v) => v.run(runner).await,
Self::SimulationSmall(v) => v.run(runner).await,
Self::SimulationSmallForeverRealTime(v) => v.run(runner).await,
Self::P2pReceiveBlock(v) => v.run(runner).await,
Self::P2pReceiveMessage(v) => v.run(runner).await,
Self::P2pSignaling(v) => v.run(runner).await,
Self::P2pConnectionDiscoveryRustNodeAsSeed(v) => v.run(runner).await,
Self::MultiNodePubsubPropagateBlock(v) => v.run(runner).await,
Expand Down
Loading
Loading