Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ Test that node discovers peers another rust node and is able to bootstrap

Tests related to pubsub layer.

* `P2pReceiveBlock`
Test that node receives block over meshsub from node
* `P2pReceiveMessage`
Test that node receives message over meshsub from node

### [P2P Incoming](../../node/testing/tests/p2p_basic_incoming.rs)

Expand Down
20 changes: 19 additions & 1 deletion node/src/action_kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ pub enum ActionKind {
P2pCallbacksP2pChannelsStreamingRpcResponseReceived,
P2pCallbacksP2pChannelsStreamingRpcTimeout,
P2pCallbacksP2pDisconnection,
P2pCallbacksP2pPubsubValidateMessage,
P2pCallbacksRpcRespondBestTip,
P2pChannelsBestTipInit,
P2pChannelsBestTipPending,
Expand Down Expand Up @@ -404,7 +405,10 @@ pub enum ActionKind {
P2pNetworkPnetEffectfulSetupNonce,
P2pNetworkPubsubBroadcast,
P2pNetworkPubsubBroadcastSigned,
P2pNetworkPubsubBroadcastValidatedMessage,
P2pNetworkPubsubBroadcastValidationCallback,
P2pNetworkPubsubGraft,
P2pNetworkPubsubHandleIncomingMessage,
P2pNetworkPubsubIncomingData,
P2pNetworkPubsubIncomingMessage,
P2pNetworkPubsubIncomingMessageCleanup,
Expand All @@ -414,6 +418,8 @@ pub enum ActionKind {
P2pNetworkPubsubOutgoingMessageClear,
P2pNetworkPubsubOutgoingMessageError,
P2pNetworkPubsubPrune,
P2pNetworkPubsubPruneMessages,
P2pNetworkPubsubRejectMessage,
P2pNetworkPubsubSign,
P2pNetworkPubsubSignError,
P2pNetworkPubsubValidateIncomingMessages,
Expand Down Expand Up @@ -707,7 +713,7 @@ pub enum ActionKind {
}

impl ActionKind {
pub const COUNT: u16 = 597;
pub const COUNT: u16 = 603;
}

impl std::fmt::Display for ActionKind {
Expand Down Expand Up @@ -809,6 +815,9 @@ impl ActionKindGet for P2pCallbacksAction {
}
Self::P2pDisconnection { .. } => ActionKind::P2pCallbacksP2pDisconnection,
Self::RpcRespondBestTip { .. } => ActionKind::P2pCallbacksRpcRespondBestTip,
Self::P2pPubsubValidateMessage { .. } => {
ActionKind::P2pCallbacksP2pPubsubValidateMessage
}
}
}
}
Expand Down Expand Up @@ -1953,6 +1962,15 @@ impl ActionKindGet for P2pNetworkPubsubAction {
Self::OutgoingMessageClear { .. } => ActionKind::P2pNetworkPubsubOutgoingMessageClear,
Self::OutgoingMessageError { .. } => ActionKind::P2pNetworkPubsubOutgoingMessageError,
Self::OutgoingData { .. } => ActionKind::P2pNetworkPubsubOutgoingData,
Self::BroadcastValidationCallback { .. } => {
ActionKind::P2pNetworkPubsubBroadcastValidationCallback
}
Self::BroadcastValidatedMessage { .. } => {
ActionKind::P2pNetworkPubsubBroadcastValidatedMessage
}
Self::HandleIncomingMessage { .. } => ActionKind::P2pNetworkPubsubHandleIncomingMessage,
Self::PruneMessages { .. } => ActionKind::P2pNetworkPubsubPruneMessages,
Self::RejectMessage { .. } => ActionKind::P2pNetworkPubsubRejectMessage,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion node/src/consensus/consensus_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ impl ConsensusState {
/// Ideally we would differentiate between requested blocks and blocks
/// received from gossip, but this difference doesn't really exist
/// in the WebRTC transport, hence this heuristic.
fn allow_block_too_late(state: &crate::State, block: &ArcBlockWithHash) -> bool {
pub fn allow_block_too_late(state: &crate::State, block: &ArcBlockWithHash) -> bool {
let (has_greater_blobal_slot, diff_with_best_tip) = state
.transition_frontier
.best_tip()
Expand Down
1 change: 1 addition & 0 deletions node/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ mod consensus_actions;
pub use consensus_actions::*;

mod consensus_reducer;
pub use consensus_reducer::allow_block_too_late;
6 changes: 5 additions & 1 deletion node/src/p2p/callbacks/p2p_callbacks_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use p2p::{
rpc::{P2pRpcId, P2pRpcRequest, P2pRpcResponse},
streaming_rpc::P2pStreamingRpcResponseFull,
},
PeerId,
P2pNetworkPubsubMessageCacheId, PeerId,
};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -46,6 +46,9 @@ pub enum P2pCallbacksAction {
RpcRespondBestTip {
peer_id: PeerId,
},
P2pPubsubValidateMessage {
message_id: P2pNetworkPubsubMessageCacheId,
},
}

impl redux::EnablingCondition<crate::State> for P2pCallbacksAction {
Expand All @@ -63,6 +66,7 @@ impl redux::EnablingCondition<crate::State> for P2pCallbacksAction {
P2pCallbacksAction::RpcRespondBestTip { .. } => {
state.transition_frontier.best_tip().is_some()
}
P2pCallbacksAction::P2pPubsubValidateMessage { .. } => true,
}
}
}
70 changes: 67 additions & 3 deletions node/src/p2p/callbacks/p2p_callbacks_reducer.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
use ark_ff::fields::arithmetic::InvalidBigInt;
use mina_p2p_messages::v2::{MinaLedgerSyncLedgerAnswerStableV2, StateHash};
use openmina_core::{block::BlockWithHash, bug_condition, transaction::TransactionWithHash};
use mina_p2p_messages::{
gossip::GossipNetMessageV2,
v2::{MinaLedgerSyncLedgerAnswerStableV2, StateHash},
};
use openmina_core::{block::BlockWithHash, bug_condition, log, transaction::TransactionWithHash};
use p2p::{
channels::{
best_tip::P2pChannelsBestTipAction,
rpc::{BestTipWithProof, P2pChannelsRpcAction, P2pRpcRequest, P2pRpcResponse},
streaming_rpc::P2pStreamingRpcResponseFull,
},
disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
PeerId,
P2pNetworkPubsubAction, PeerId,
};
use redux::{ActionMeta, ActionWithMeta, Dispatcher};

use crate::{
consensus::allow_block_too_late,
p2p_ready,
snark_pool::candidate::SnarkPoolCandidateAction,
state::BlockPrevalidationError,
transaction_pool::candidate::TransactionPoolCandidateAction,
transition_frontier::sync::{
ledger::{
Expand Down Expand Up @@ -48,6 +53,7 @@ impl crate::State {
action: ActionWithMeta<&P2pCallbacksAction>,
) {
let (action, meta) = action.split();
let time = meta.time();
let (dispatcher, state) = state_context.into_dispatcher_and_state();

match action {
Expand Down Expand Up @@ -290,6 +296,58 @@ impl crate::State {
best_tip: best_tip.clone(),
});
}
P2pCallbacksAction::P2pPubsubValidateMessage { message_id } => {
let Some(message_content) = state.p2p.ready().and_then(|p2p| {
p2p.network
.scheduler
.broadcast_state
.mcache
.get_message(message_id)
}) else {
return;
};

let pre_validation_result = match message_content {
GossipNetMessageV2::NewState(new_best_tip) => {
match BlockWithHash::try_new(new_best_tip.clone()) {
Ok(block) => {
let allow_block_too_late = allow_block_too_late(state, &block);
match state.prevalidate_block(&block, allow_block_too_late) {
Ok(()) => PreValidationResult::Continue,
Err(BlockPrevalidationError::ReceivedTooEarly { .. }) => {
PreValidationResult::Ignore
}
Err(_) => PreValidationResult::Reject,
}
}
Err(_) => {
log::error!(time; "P2pCallbacksAction::P2pPubsubValidateMessage: Invalid bigint in block");
PreValidationResult::Reject
}
}
}
_ => {
// TODO: add pre validation for Snark pool and Transaction pool diffs
PreValidationResult::Continue
}
};

match pre_validation_result {
PreValidationResult::Continue => {
dispatcher.push(P2pNetworkPubsubAction::BroadcastValidationCallback {
message_id: *message_id,
});
}
PreValidationResult::Reject => {
dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
message_id: p2p::BroadcastMessageId::MessageId {
message_id: *message_id,
},
});
}
PreValidationResult::Ignore => {}
}
}
}
}

Expand Down Expand Up @@ -574,3 +632,9 @@ impl crate::State {
}
}
}

enum PreValidationResult {
Continue,
Reject,
Ignore,
}
11 changes: 8 additions & 3 deletions node/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::sync::Arc;
use std::time::Duration;

use mina_p2p_messages::v2;
use openmina_core::constants::PROTOCOL_VERSION;
use openmina_core::transaction::{TransactionInfo, TransactionWithHash};
use p2p::P2pNetworkPubsubMessageCacheId;
use rand::prelude::*;
use std::sync::Arc;
use std::time::Duration;

use openmina_core::block::BlockWithHash;
use openmina_core::requests::RpcId;
Expand Down Expand Up @@ -628,6 +628,11 @@ impl P2p {
P2pCallbacksAction::P2pChannelsStreamingRpcTimeout { peer_id, id }
}
)),
on_p2p_pubsub_message_received: Some(redux::callback!(
on_p2p_pubsub_message_received((message_id: P2pNetworkPubsubMessageCacheId)) -> crate::Action{
P2pCallbacksAction::P2pPubsubValidateMessage { message_id }
}
)),
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use mina_p2p_messages::v2::LedgerHash;
use openmina_core::block::{AppliedBlock, ArcBlockWithHash};
use p2p::channels::rpc::{P2pChannelsRpcAction, P2pRpcId};
use p2p::PeerId;
use p2p::{P2pNetworkPubsubAction, PeerId};
use redux::ActionMeta;

use crate::ledger::write::{LedgerWriteAction, LedgerWriteRequest};
Expand Down Expand Up @@ -304,6 +304,10 @@ impl TransitionFrontierSyncAction {
};
let error = SyncError::BlockApplyFailed(failed_block.clone(), error.clone());
store.dispatch(TransitionFrontierAction::SyncFailed { best_tip, error });
// TODO this should be handled by a callback
store.dispatch(P2pNetworkPubsubAction::RejectMessage {
message_id: p2p::BroadcastMessageId::BlockHash { hash: hash.clone() },
});
}
TransitionFrontierSyncAction::BlocksNextApplySuccess {
hash,
Expand All @@ -316,6 +320,11 @@ impl TransitionFrontierSyncAction {
if !store.dispatch(TransitionFrontierSyncAction::BlocksNextApplyInit) {
store.dispatch(TransitionFrontierSyncAction::BlocksSuccess);
}

// TODO this should be handled by a callback
store.dispatch(P2pNetworkPubsubAction::BroadcastValidatedMessage {
message_id: p2p::BroadcastMessageId::BlockHash { hash: hash.clone() },
});
}
TransitionFrontierSyncAction::BlocksSuccess => {}
// Bootstrap/Catchup is practically complete at this point.
Expand Down
8 changes: 4 additions & 4 deletions node/testing/src/scenarios/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use self::p2p::basic_outgoing_connections::{
MakeMultipleOutgoingConnections, MakeOutgoingConnection,
};
use self::p2p::kademlia::KademliaBootstrap;
use self::p2p::pubsub::P2pReceiveBlock;
use self::p2p::pubsub::P2pReceiveMessage;
use self::p2p::signaling::P2pSignaling;
use self::record_replay::block_production::RecordReplayBlockProduction;
use self::record_replay::bootstrap::RecordReplayBootstrap;
Expand Down Expand Up @@ -83,7 +83,7 @@ pub enum Scenarios {
MultiNodeBasicConnectivityPeerDiscovery(MultiNodeBasicConnectivityPeerDiscovery),
SimulationSmall(SimulationSmall),
SimulationSmallForeverRealTime(SimulationSmallForeverRealTime),
P2pReceiveBlock(P2pReceiveBlock),
P2pReceiveMessage(P2pReceiveMessage),
P2pSignaling(P2pSignaling),
P2pConnectionDiscoveryRustNodeAsSeed(P2pConnectionDiscoveryRustNodeAsSeed),
MultiNodePubsubPropagateBlock(MultiNodePubsubPropagateBlock),
Expand Down Expand Up @@ -189,7 +189,7 @@ impl Scenarios {
}
Self::SimulationSmall(_) => SimulationSmall::DOCS,
Self::SimulationSmallForeverRealTime(_) => SimulationSmallForeverRealTime::DOCS,
Self::P2pReceiveBlock(_) => P2pReceiveBlock::DOCS,
Self::P2pReceiveMessage(_) => P2pReceiveMessage::DOCS,
Self::P2pSignaling(_) => P2pSignaling::DOCS,
Self::P2pConnectionDiscoveryRustNodeAsSeed(_) => {
P2pConnectionDiscoveryRustNodeAsSeed::DOCS
Expand Down Expand Up @@ -260,7 +260,7 @@ impl Scenarios {
Self::MultiNodeBasicConnectivityPeerDiscovery(v) => v.run(runner).await,
Self::SimulationSmall(v) => v.run(runner).await,
Self::SimulationSmallForeverRealTime(v) => v.run(runner).await,
Self::P2pReceiveBlock(v) => v.run(runner).await,
Self::P2pReceiveMessage(v) => v.run(runner).await,
Self::P2pSignaling(v) => v.run(runner).await,
Self::P2pConnectionDiscoveryRustNodeAsSeed(v) => v.run(runner).await,
Self::MultiNodePubsubPropagateBlock(v) => v.run(runner).await,
Expand Down
35 changes: 12 additions & 23 deletions node/testing/src/scenarios/p2p/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,26 @@
use std::time::Duration;

use node::ActionKind;

use crate::{
hosts,
node::RustNodeTestingConfig,
scenarios::{ClusterRunner, RunCfg, RunCfgAdvanceTime},
};

/// Receive a block via meshsub
/// Receive a message via meshsub
/// 1. Create a normal node with default devnet config, with devnet peers as initial peers
/// 2. Wait for 2 minutes
/// 3. Create a node with discovery disabled and first node as only peer
/// 4. Wait for first node to broadcast block to second one
/// 4. Wait for first node to broadcast message to second one
#[derive(documented::Documented, Default, Clone, Copy)]
pub struct P2pReceiveBlock;
pub struct P2pReceiveMessage;

impl P2pReceiveBlock {
impl P2pReceiveMessage {
pub async fn run(self, mut runner: ClusterRunner<'_>) {
let config = RustNodeTestingConfig::devnet_default().initial_peers(hosts::devnet());

let retransmitter_openmina_node = runner.add_rust_node(config);
let retransmitter_peer_id = runner
.node(retransmitter_openmina_node)
.unwrap()
.state()
.p2p
.my_id();

let _ = runner
.run(
Expand All @@ -47,22 +43,15 @@ impl P2pReceiveBlock {
RunCfg::default()
.timeout(Duration::from_secs(60 * 30))
.advance_time(RunCfgAdvanceTime::Real)
.action_handler(move |node, state, _, _| {
let Some(p2p) = state.p2p.ready() else {
return false;
};

.action_handler(move |node, _state, _, action| {
node == receiver_openmina_node
&& p2p
.network
.scheduler
.broadcast_state
.incoming_block
.as_ref()
.map_or(false, |(peer_id, _)| peer_id.eq(&retransmitter_peer_id))
&& matches!(
action.action().kind(),
ActionKind::P2pNetworkPubsubBroadcastValidationCallback
)
}),
)
.await
.expect("Failed to receive block");
.expect("Test failed");
}
}
4 changes: 2 additions & 2 deletions node/testing/tests/p2p_pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use openmina_node_testing::scenarios::p2p::pubsub::P2pReceiveBlock;
use openmina_node_testing::scenarios::p2p::pubsub::P2pReceiveMessage;

mod common;

scenario_test!(pubsub_receive_block, P2pReceiveBlock, P2pReceiveBlock);
scenario_test!(pubsub_receive_block, P2pReceiveMessage, P2pReceiveMessage);
2 changes: 2 additions & 0 deletions p2p/src/disconnection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,6 @@ pub enum P2pDisconnectionReason {
Timeout,
#[error("rpc protocol not supported")]
Unsupported,
#[error("invalid pubsub message")]
InvalidMessage,
}
Loading
Loading