Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion node/src/action_kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ pub enum ActionKind {
P2pCallbacksP2pChannelsStreamingRpcResponseReceived,
P2pCallbacksP2pChannelsStreamingRpcTimeout,
P2pCallbacksP2pDisconnection,
P2pCallbacksP2pPubsubValidateMessage,
P2pCallbacksRpcRespondBestTip,
P2pChannelsBestTipInit,
P2pChannelsBestTipPending,
Expand Down Expand Up @@ -403,7 +404,9 @@ pub enum ActionKind {
P2pNetworkPnetEffectfulOutgoingData,
P2pNetworkPnetEffectfulSetupNonce,
P2pNetworkPubsubBroadcast,
P2pNetworkPubsubBroadcastAcceptedBlock,
P2pNetworkPubsubBroadcastSigned,
P2pNetworkPubsubBroadcastValidationCallback,
P2pNetworkPubsubGraft,
P2pNetworkPubsubIncomingData,
P2pNetworkPubsubIncomingMessage,
Expand All @@ -414,6 +417,7 @@ pub enum ActionKind {
P2pNetworkPubsubOutgoingMessageClear,
P2pNetworkPubsubOutgoingMessageError,
P2pNetworkPubsubPrune,
P2pNetworkPubsubPruneMessages,
P2pNetworkPubsubSign,
P2pNetworkPubsubSignError,
P2pNetworkPubsubValidateIncomingMessages,
Expand Down Expand Up @@ -707,7 +711,7 @@ pub enum ActionKind {
}

impl ActionKind {
pub const COUNT: u16 = 597;
pub const COUNT: u16 = 601;
}

impl std::fmt::Display for ActionKind {
Expand Down Expand Up @@ -809,6 +813,9 @@ impl ActionKindGet for P2pCallbacksAction {
}
Self::P2pDisconnection { .. } => ActionKind::P2pCallbacksP2pDisconnection,
Self::RpcRespondBestTip { .. } => ActionKind::P2pCallbacksRpcRespondBestTip,
Self::P2pPubsubValidateMessage { .. } => {
ActionKind::P2pCallbacksP2pPubsubValidateMessage
}
}
}
}
Expand Down Expand Up @@ -1953,6 +1960,13 @@ impl ActionKindGet for P2pNetworkPubsubAction {
Self::OutgoingMessageClear { .. } => ActionKind::P2pNetworkPubsubOutgoingMessageClear,
Self::OutgoingMessageError { .. } => ActionKind::P2pNetworkPubsubOutgoingMessageError,
Self::OutgoingData { .. } => ActionKind::P2pNetworkPubsubOutgoingData,
Self::BroadcastValidationCallback { .. } => {
ActionKind::P2pNetworkPubsubBroadcastValidationCallback
}
Self::BroadcastAcceptedBlock { .. } => {
ActionKind::P2pNetworkPubsubBroadcastAcceptedBlock
}
Self::PruneMessages { .. } => ActionKind::P2pNetworkPubsubPruneMessages,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion node/src/consensus/consensus_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ impl ConsensusState {
/// Ideally we would differentiate between requested blocks and blocks
/// received from gossip, but this difference doesn't really exist
/// in the WebRTC transport, hence this heuristic.
fn allow_block_too_late(state: &crate::State, block: &ArcBlockWithHash) -> bool {
pub fn allow_block_too_late(state: &crate::State, block: &ArcBlockWithHash) -> bool {
let (has_greater_blobal_slot, diff_with_best_tip) = state
.transition_frontier
.best_tip()
Expand Down
1 change: 1 addition & 0 deletions node/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ mod consensus_actions;
pub use consensus_actions::*;

mod consensus_reducer;
pub use consensus_reducer::allow_block_too_late;
7 changes: 7 additions & 0 deletions node/src/p2p/callbacks/p2p_callbacks_actions.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use mina_p2p_messages::gossip::GossipNetMessageV2;
use openmina_core::ActionEvent;
use p2p::{
channels::{
Expand Down Expand Up @@ -46,6 +47,11 @@ pub enum P2pCallbacksAction {
RpcRespondBestTip {
peer_id: PeerId,
},
P2pPubsubValidateMessage {
message_content: GossipNetMessageV2,
message: p2p::network::pubsub::pb::Message,
peer_id: PeerId,
},
}

impl redux::EnablingCondition<crate::State> for P2pCallbacksAction {
Expand All @@ -63,6 +69,7 @@ impl redux::EnablingCondition<crate::State> for P2pCallbacksAction {
P2pCallbacksAction::RpcRespondBestTip { .. } => {
state.transition_frontier.best_tip().is_some()
}
P2pCallbacksAction::P2pPubsubValidateMessage { .. } => true,
}
}
}
49 changes: 46 additions & 3 deletions node/src/p2p/callbacks/p2p_callbacks_reducer.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
use ark_ff::fields::arithmetic::InvalidBigInt;
use mina_p2p_messages::v2::{MinaLedgerSyncLedgerAnswerStableV2, StateHash};
use openmina_core::{block::BlockWithHash, bug_condition, transaction::TransactionWithHash};
use mina_p2p_messages::{
gossip::GossipNetMessageV2,
v2::{MinaLedgerSyncLedgerAnswerStableV2, StateHash},
};
use openmina_core::{block::BlockWithHash, bug_condition, log, transaction::TransactionWithHash};
use p2p::{
channels::{
best_tip::P2pChannelsBestTipAction,
rpc::{BestTipWithProof, P2pChannelsRpcAction, P2pRpcRequest, P2pRpcResponse},
streaming_rpc::P2pStreamingRpcResponseFull,
},
disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
PeerId,
P2pNetworkPubsubAction, PeerId, ValidationResult,
};
use redux::{ActionMeta, ActionWithMeta, Dispatcher};

use crate::{
consensus::allow_block_too_late,
p2p_ready,
snark_pool::candidate::SnarkPoolCandidateAction,
state::BlockPrevalidationError,
transaction_pool::candidate::TransactionPoolCandidateAction,
transition_frontier::sync::{
ledger::{
Expand Down Expand Up @@ -48,6 +53,7 @@ impl crate::State {
action: ActionWithMeta<&P2pCallbacksAction>,
) {
let (action, meta) = action.split();
let time = meta.time();
let (dispatcher, state) = state_context.into_dispatcher_and_state();

match action {
Expand Down Expand Up @@ -290,6 +296,43 @@ impl crate::State {
best_tip: best_tip.clone(),
});
}
P2pCallbacksAction::P2pPubsubValidateMessage {
message,
message_content,
peer_id,
} => {
let result = match message_content {
GossipNetMessageV2::NewState(new_best_tip) => {
match BlockWithHash::try_new(new_best_tip.clone()) {
Ok(block) => {
let allow_block_too_late = allow_block_too_late(state, &block);
match state.prevalidate_block(&block, allow_block_too_late) {
Ok(()) => ValidationResult::Valid,
Err(BlockPrevalidationError::ReceivedTooLate { .. }) => {
ValidationResult::Ignore
}
Err(_) => ValidationResult::Reject,
}
}
Err(_) => {
log::error!(time; "P2pCallbacksAction::P2pPubsubValidateMessage: Invalid bigint in block");
return;
}
}
}
_ => {
// TODO: add pre validation for Snark pool and Transaction pool diffs
ValidationResult::Valid
}
};

dispatcher.push(P2pNetworkPubsubAction::BroadcastValidationCallback {
message: message.clone(),
message_content: message_content.clone(),
peer_id: *peer_id,
result,
});
}
}
}

Expand Down
7 changes: 7 additions & 0 deletions node/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::sync::Arc;
use std::time::Duration;

use mina_p2p_messages::gossip::GossipNetMessageV2;
use mina_p2p_messages::v2;
use openmina_core::constants::PROTOCOL_VERSION;
use openmina_core::transaction::{TransactionInfo, TransactionWithHash};
use p2p::network;
use rand::prelude::*;

use openmina_core::block::BlockWithHash;
Expand Down Expand Up @@ -628,6 +630,11 @@ impl P2p {
P2pCallbacksAction::P2pChannelsStreamingRpcTimeout { peer_id, id }
}
)),
on_p2p_pubsub_message_received: Some(redux::callback!(
on_p2p_pubsub_message_received((message: network::pubsub::pb::Message, message_content: GossipNetMessageV2, peer_id: PeerId)) -> crate::Action{
P2pCallbacksAction::P2pPubsubValidateMessage { message_content, message, peer_id }
}
)),
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use mina_p2p_messages::v2::LedgerHash;
use openmina_core::block::{AppliedBlock, ArcBlockWithHash};
use p2p::channels::rpc::{P2pChannelsRpcAction, P2pRpcId};
use p2p::PeerId;
use p2p::{P2pNetworkPubsubAction, PeerId};
use redux::ActionMeta;

use crate::ledger::write::{LedgerWriteAction, LedgerWriteRequest};
Expand Down Expand Up @@ -76,6 +76,9 @@ impl TransitionFrontierSyncAction {
if let Some(callback) = on_success {
store.dispatch_callback(callback.clone(), ());
}

let hash = best_tip.hash.clone();
store.dispatch(P2pNetworkPubsubAction::BroadcastAcceptedBlock { hash });
}
// TODO(tizoc): this action is never called with the current implementation,
// either remove it or figure out how to recover it as a reaction to
Expand Down
31 changes: 10 additions & 21 deletions node/testing/src/scenarios/p2p/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use std::time::Duration;

use node::ActionKind;

use crate::{
hosts,
node::RustNodeTestingConfig,
scenarios::{ClusterRunner, RunCfg, RunCfgAdvanceTime},
};

/// Receive a block via meshsub
/// Receive a message via meshsub
/// 1. Create a normal node with default devnet config, with devnet peers as initial peers
/// 2. Wait for 2 minutes
/// 3. Create a node with discovery disabled and first node as only peer
/// 4. Wait for first node to broadcast block to second one
/// 4. Wait for first node to broadcast message to second one
#[derive(documented::Documented, Default, Clone, Copy)]
pub struct P2pReceiveBlock;

Expand All @@ -19,12 +21,6 @@ impl P2pReceiveBlock {
let config = RustNodeTestingConfig::devnet_default().initial_peers(hosts::devnet());

let retransmitter_openmina_node = runner.add_rust_node(config);
let retransmitter_peer_id = runner
.node(retransmitter_openmina_node)
.unwrap()
.state()
.p2p
.my_id();

let _ = runner
.run(
Expand All @@ -47,22 +43,15 @@ impl P2pReceiveBlock {
RunCfg::default()
.timeout(Duration::from_secs(60 * 30))
.advance_time(RunCfgAdvanceTime::Real)
.action_handler(move |node, state, _, _| {
let Some(p2p) = state.p2p.ready() else {
return false;
};

.action_handler(move |node, _state, _, action| {
node == receiver_openmina_node
&& p2p
.network
.scheduler
.broadcast_state
.incoming_block
.as_ref()
.map_or(false, |(peer_id, _)| peer_id.eq(&retransmitter_peer_id))
&& matches!(
action.action().kind(),
ActionKind::P2pNetworkPubsubBroadcastValidationCallback
)
}),
)
.await
.expect("Failed to receive block");
.expect("Test failed");
}
}
2 changes: 2 additions & 0 deletions p2p/src/disconnection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub enum P2pDisconnectionReason {
TransitionFrontierSyncLedgerSnarkedNumAccountsRejected,
#[error("failed to verify snark pool diff")]
SnarkPoolVerifyError,
#[error("failed to verify block")]
BlockVerifyError,
#[error("duplicate connection")]
DuplicateConnection,
#[error("timeout")]
Expand Down
10 changes: 9 additions & 1 deletion p2p/src/network/pubsub/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
mod pb {
pub mod pb {
include!(concat!(env!("OUT_DIR"), "/gossipsub.rs"));
}

Expand All @@ -18,3 +18,11 @@ const TOPIC: &str = "coda/consensus-messages/0.0.1";

pub mod pubsub_effectful;
pub use pubsub_effectful::P2pNetworkPubsubEffectfulAction;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
pub enum ValidationResult {
Valid,
Reject,
Ignore,
}
Loading
Loading