From f7ee15b411ba7993fb65947d4b04a56ac42ce9fc Mon Sep 17 00:00:00 2001 From: Mimir Date: Mon, 16 Dec 2024 10:05:35 +0100 Subject: [PATCH 1/5] Added block validation before broadcast --- node/src/action_kind.rs | 16 +- node/src/consensus/consensus_reducer.rs | 2 +- node/src/consensus/mod.rs | 1 + .../p2p/callbacks/p2p_callbacks_actions.rs | 7 + .../p2p/callbacks/p2p_callbacks_reducer.rs | 53 +++- node/src/state.rs | 7 + .../sync/transition_frontier_sync_effects.rs | 5 +- p2p/src/disconnection/mod.rs | 2 + p2p/src/network/pubsub/mod.rs | 10 +- .../pubsub/p2p_network_pubsub_actions.rs | 75 ++++- .../pubsub/p2p_network_pubsub_reducer.rs | 274 +++++++++++++----- .../pubsub/p2p_network_pubsub_state.rs | 9 + p2p/src/p2p_reducer.rs | 4 +- p2p/src/p2p_state.rs | 10 +- 14 files changed, 373 insertions(+), 102 deletions(-) diff --git a/node/src/action_kind.rs b/node/src/action_kind.rs index 347ea5078e..2b1b567be0 100644 --- a/node/src/action_kind.rs +++ b/node/src/action_kind.rs @@ -206,6 +206,7 @@ pub enum ActionKind { P2pCallbacksP2pChannelsStreamingRpcResponseReceived, P2pCallbacksP2pChannelsStreamingRpcTimeout, P2pCallbacksP2pDisconnection, + P2pCallbacksP2pPubsubValidateMessage, P2pCallbacksRpcRespondBestTip, P2pChannelsBestTipInit, P2pChannelsBestTipPending, @@ -403,7 +404,9 @@ pub enum ActionKind { P2pNetworkPnetEffectfulOutgoingData, P2pNetworkPnetEffectfulSetupNonce, P2pNetworkPubsubBroadcast, + P2pNetworkPubsubBroadcastAcceptedBlock, P2pNetworkPubsubBroadcastSigned, + P2pNetworkPubsubBroadcastValidationCallback, P2pNetworkPubsubGraft, P2pNetworkPubsubIncomingData, P2pNetworkPubsubIncomingMessage, @@ -414,6 +417,7 @@ pub enum ActionKind { P2pNetworkPubsubOutgoingMessageClear, P2pNetworkPubsubOutgoingMessageError, P2pNetworkPubsubPrune, + P2pNetworkPubsubPruneMessages, P2pNetworkPubsubSign, P2pNetworkPubsubSignError, P2pNetworkPubsubValidateIncomingMessages, @@ -707,7 +711,7 @@ pub enum ActionKind { } impl ActionKind { - pub const COUNT: u16 = 597; + pub const COUNT: u16 = 601; } impl std::fmt::Display for ActionKind { @@ -809,6 +813,9 @@ impl ActionKindGet for P2pCallbacksAction { } Self::P2pDisconnection { .. } => ActionKind::P2pCallbacksP2pDisconnection, Self::RpcRespondBestTip { .. } => ActionKind::P2pCallbacksRpcRespondBestTip, + Self::P2pPubsubValidateMessage { .. } => { + ActionKind::P2pCallbacksP2pPubsubValidateMessage + } } } } @@ -1953,6 +1960,13 @@ impl ActionKindGet for P2pNetworkPubsubAction { Self::OutgoingMessageClear { .. } => ActionKind::P2pNetworkPubsubOutgoingMessageClear, Self::OutgoingMessageError { .. } => ActionKind::P2pNetworkPubsubOutgoingMessageError, Self::OutgoingData { .. } => ActionKind::P2pNetworkPubsubOutgoingData, + Self::BroadcastValidationCallback { .. } => { + ActionKind::P2pNetworkPubsubBroadcastValidationCallback + } + Self::BroadcastAcceptedBlock { .. } => { + ActionKind::P2pNetworkPubsubBroadcastAcceptedBlock + } + Self::PruneMessages { .. } => ActionKind::P2pNetworkPubsubPruneMessages, } } } diff --git a/node/src/consensus/consensus_reducer.rs b/node/src/consensus/consensus_reducer.rs index 0f7c977c1e..fcbc760c79 100644 --- a/node/src/consensus/consensus_reducer.rs +++ b/node/src/consensus/consensus_reducer.rs @@ -345,7 +345,7 @@ impl ConsensusState { /// Ideally we would differentiate between requested blocks and blocks /// received from gossip, but this difference doesn't really exist /// in the WebRTC transport, hence this heuristic. -fn allow_block_too_late(state: &crate::State, block: &ArcBlockWithHash) -> bool { +pub fn allow_block_too_late(state: &crate::State, block: &ArcBlockWithHash) -> bool { let (has_greater_blobal_slot, diff_with_best_tip) = state .transition_frontier .best_tip() diff --git a/node/src/consensus/mod.rs b/node/src/consensus/mod.rs index da66985907..445b2af309 100644 --- a/node/src/consensus/mod.rs +++ b/node/src/consensus/mod.rs @@ -5,3 +5,4 @@ mod consensus_actions; pub use consensus_actions::*; mod consensus_reducer; +pub use consensus_reducer::allow_block_too_late; diff --git a/node/src/p2p/callbacks/p2p_callbacks_actions.rs b/node/src/p2p/callbacks/p2p_callbacks_actions.rs index 92078b9fac..56b1d9bca5 100644 --- a/node/src/p2p/callbacks/p2p_callbacks_actions.rs +++ b/node/src/p2p/callbacks/p2p_callbacks_actions.rs @@ -1,3 +1,4 @@ +use mina_p2p_messages::gossip::GossipNetMessageV2; use openmina_core::ActionEvent; use p2p::{ channels::{ @@ -46,6 +47,11 @@ pub enum P2pCallbacksAction { RpcRespondBestTip { peer_id: PeerId, }, + P2pPubsubValidateMessage { + message_content: Option, + message: p2p::network::pubsub::pb::Message, + peer_id: PeerId, + }, } impl redux::EnablingCondition for P2pCallbacksAction { @@ -63,6 +69,7 @@ impl redux::EnablingCondition for P2pCallbacksAction { P2pCallbacksAction::RpcRespondBestTip { .. } => { state.transition_frontier.best_tip().is_some() } + P2pCallbacksAction::P2pPubsubValidateMessage { .. } => true, } } } diff --git a/node/src/p2p/callbacks/p2p_callbacks_reducer.rs b/node/src/p2p/callbacks/p2p_callbacks_reducer.rs index 0eb4826b90..b81e74f250 100644 --- a/node/src/p2p/callbacks/p2p_callbacks_reducer.rs +++ b/node/src/p2p/callbacks/p2p_callbacks_reducer.rs @@ -1,6 +1,9 @@ use ark_ff::fields::arithmetic::InvalidBigInt; -use mina_p2p_messages::v2::{MinaLedgerSyncLedgerAnswerStableV2, StateHash}; -use openmina_core::{block::BlockWithHash, bug_condition, transaction::TransactionWithHash}; +use mina_p2p_messages::{ + gossip::GossipNetMessageV2, + v2::{MinaLedgerSyncLedgerAnswerStableV2, StateHash}, +}; +use openmina_core::{block::BlockWithHash, bug_condition, log, transaction::TransactionWithHash}; use p2p::{ channels::{ best_tip::P2pChannelsBestTipAction, @@ -8,13 +11,15 @@ use p2p::{ streaming_rpc::P2pStreamingRpcResponseFull, }, disconnection::{P2pDisconnectionAction, P2pDisconnectionReason}, - PeerId, + P2pNetworkPubsubAction, PeerId, ValidationResult, }; use redux::{ActionMeta, ActionWithMeta, Dispatcher}; use crate::{ + consensus::allow_block_too_late, p2p_ready, snark_pool::candidate::SnarkPoolCandidateAction, + state::BlockPrevalidationError, transaction_pool::candidate::TransactionPoolCandidateAction, transition_frontier::sync::{ ledger::{ @@ -48,6 +53,7 @@ impl crate::State { action: ActionWithMeta<&P2pCallbacksAction>, ) { let (action, meta) = action.split(); + let time = meta.time(); let (dispatcher, state) = state_context.into_dispatcher_and_state(); match action { @@ -290,6 +296,47 @@ impl crate::State { best_tip: best_tip.clone(), }); } + P2pCallbacksAction::P2pPubsubValidateMessage { + message, + message_content, + peer_id, + } => { + let result = if let Some(message_content) = message_content { + match message_content { + GossipNetMessageV2::NewState(new_best_tip) => { + match BlockWithHash::try_new(new_best_tip.clone()) { + Ok(block) => { + let allow_block_too_late = allow_block_too_late(state, &block); + match state.prevalidate_block(&block, allow_block_too_late) { + Ok(()) => ValidationResult::Valid, + Err(BlockPrevalidationError::ReceivedTooLate { + .. + }) => ValidationResult::Ignore, + Err(_) => ValidationResult::Reject, + } + } + Err(_) => { + log::error!(time; "P2pCallbacksAction::P2pPubsubValidateMessage: Invalid bigint in block"); + return; + } + } + } + _ => { + // TODO: add validation for Snark pool and Transaction pool diffs + ValidationResult::Valid + } + } + } else { + ValidationResult::Valid + }; + + dispatcher.push(P2pNetworkPubsubAction::BroadcastValidationCallback { + message: message.clone(), + message_content: message_content.clone(), + peer_id: *peer_id, + result, + }); + } } } diff --git a/node/src/state.rs b/node/src/state.rs index 293301906f..5d9fb912ca 100644 --- a/node/src/state.rs +++ b/node/src/state.rs @@ -1,9 +1,11 @@ use std::sync::Arc; use std::time::Duration; +use mina_p2p_messages::gossip::GossipNetMessageV2; use mina_p2p_messages::v2; use openmina_core::constants::PROTOCOL_VERSION; use openmina_core::transaction::{TransactionInfo, TransactionWithHash}; +use p2p::network; use rand::prelude::*; use openmina_core::block::BlockWithHash; @@ -628,6 +630,11 @@ impl P2p { P2pCallbacksAction::P2pChannelsStreamingRpcTimeout { peer_id, id } } )), + on_p2p_pubsub_message_received: Some(redux::callback!( + on_p2p_pubsub_message_received((message: network::pubsub::pb::Message, message_content: Option, peer_id: PeerId)) -> crate::Action{ + P2pCallbacksAction::P2pPubsubValidateMessage { message_content, message, peer_id } + } + )), } } diff --git a/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs b/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs index ac92f9569c..07da7c580f 100644 --- a/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs +++ b/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs @@ -1,7 +1,7 @@ use mina_p2p_messages::v2::LedgerHash; use openmina_core::block::{AppliedBlock, ArcBlockWithHash}; use p2p::channels::rpc::{P2pChannelsRpcAction, P2pRpcId}; -use p2p::PeerId; +use p2p::{P2pNetworkPubsubAction, PeerId}; use redux::ActionMeta; use crate::ledger::write::{LedgerWriteAction, LedgerWriteRequest}; @@ -76,6 +76,9 @@ impl TransitionFrontierSyncAction { if let Some(callback) = on_success { store.dispatch_callback(callback.clone(), ()); } + + let hash = best_tip.hash.clone(); + store.dispatch(P2pNetworkPubsubAction::BroadcastAcceptedBlock { hash }); } // TODO(tizoc): this action is never called with the current implementation, // either remove it or figure out how to recover it as a reaction to diff --git a/p2p/src/disconnection/mod.rs b/p2p/src/disconnection/mod.rs index 29debe3d77..8db9a23e46 100644 --- a/p2p/src/disconnection/mod.rs +++ b/p2p/src/disconnection/mod.rs @@ -35,6 +35,8 @@ pub enum P2pDisconnectionReason { TransitionFrontierSyncLedgerSnarkedNumAccountsRejected, #[error("failed to verify snark pool diff")] SnarkPoolVerifyError, + #[error("failed to verify block")] + BlockVerifyError, #[error("duplicate connection")] DuplicateConnection, #[error("timeout")] diff --git a/p2p/src/network/pubsub/mod.rs b/p2p/src/network/pubsub/mod.rs index 9f9bdbc94e..d52354f1e0 100644 --- a/p2p/src/network/pubsub/mod.rs +++ b/p2p/src/network/pubsub/mod.rs @@ -1,4 +1,4 @@ -mod pb { +pub mod pb { include!(concat!(env!("OUT_DIR"), "/gossipsub.rs")); } @@ -18,3 +18,11 @@ const TOPIC: &str = "coda/consensus-messages/0.0.1"; pub mod pubsub_effectful; pub use pubsub_effectful::P2pNetworkPubsubEffectfulAction; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Copy, Deserialize, Serialize)] +pub enum ValidationResult { + Valid, + Reject, + Ignore, +} diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs b/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs index ee12863b77..ca5529a949 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs @@ -1,6 +1,6 @@ -use super::pb; +use super::{pb, ValidationResult}; use crate::{token::BroadcastAlgorithm, ConnectionAddr, Data, P2pState, PeerId, StreamId}; -use mina_p2p_messages::gossip::GossipNetMessageV2; +use mina_p2p_messages::{gossip::GossipNetMessageV2, v2}; use openmina_core::ActionEvent; use serde::{Deserialize, Serialize}; @@ -61,19 +61,29 @@ pub enum P2pNetworkPubsubAction { }, /// Clean up temporary states after processing an incoming message. - IncomingMessageCleanup { peer_id: PeerId }, + IncomingMessageCleanup { + peer_id: PeerId, + }, /// Add a peer to the mesh network for a specific topic. - Graft { peer_id: PeerId, topic_id: String }, + Graft { + peer_id: PeerId, + topic_id: String, + }, /// Remove a peer from the mesh network for a specific topic. - Prune { peer_id: PeerId, topic_id: String }, + Prune { + peer_id: PeerId, + topic_id: String, + }, /// Initiate the broadcasting of a message to all subscribed peers. /// /// **Fields:** /// - `message`: The gossip network message to broadcast. - Broadcast { message: GossipNetMessageV2 }, + Broadcast { + message: GossipNetMessageV2, + }, /// Prepare a message for signing before broadcasting. /// @@ -91,32 +101,60 @@ pub enum P2pNetworkPubsubAction { /// An error occured during the signing process. #[action_event(level = warn, fields(display(author), display(topic)))] - SignError { author: PeerId, topic: String }, + SignError { + author: PeerId, + topic: String, + }, /// Finalize the broadcasting of a signed message by attaching the signature. /// /// **Fields:** /// - `signature`: The cryptographic signature of the message. - BroadcastSigned { signature: Data }, + BroadcastSigned { + signature: Data, + }, /// Prepare an outgoing message to send to a specific peer. - OutgoingMessage { peer_id: PeerId }, + OutgoingMessage { + peer_id: PeerId, + }, /// Clear the outgoing message state for a specific peer after sending. - OutgoingMessageClear { peer_id: PeerId }, + OutgoingMessageClear { + peer_id: PeerId, + }, /// An error occured during the sending of an outgoing message. /// /// **Fields:** /// - `msg`: The protobuf message that failed to send. #[action_event(level = warn, fields(display(peer_id), debug(msg)))] - OutgoingMessageError { msg: pb::Rpc, peer_id: PeerId }, + OutgoingMessageError { + msg: pb::Rpc, + peer_id: PeerId, + }, /// Send encoded data over an outgoing stream to a specific peer. /// /// **Fields:** /// - `data`: The encoded data to be sent. - OutgoingData { data: Data, peer_id: PeerId }, + OutgoingData { + data: Data, + peer_id: PeerId, + }, + + BroadcastValidationCallback { + message: pb::Message, + message_content: Option, + peer_id: PeerId, + result: ValidationResult, + }, + + BroadcastAcceptedBlock { + hash: v2::StateHash, + }, + /// Delete expired messages from state + PruneMessages {}, } impl From for crate::P2pAction { @@ -127,14 +165,19 @@ impl From for crate::P2pAction { impl redux::EnablingCondition for P2pNetworkPubsubAction { fn is_enabled(&self, state: &P2pState, _time: redux::Timestamp) -> bool { + let pubsub = &state.network.scheduler.broadcast_state; match self { - P2pNetworkPubsubAction::OutgoingMessage { peer_id } => state - .network - .scheduler - .broadcast_state + P2pNetworkPubsubAction::OutgoingMessage { peer_id } => pubsub .clients .get(peer_id) .map_or(false, |s| !s.message_is_empty()), + P2pNetworkPubsubAction::Prune { peer_id, topic_id } => pubsub + .topics + .get(topic_id) + .map_or(false, |topics| topics.contains_key(peer_id)), + P2pNetworkPubsubAction::BroadcastAcceptedBlock { hash } => { + pubsub.block_messages.contains_key(hash) + } _ => true, } } diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs b/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs index 7782ad9965..dde387dafc 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs @@ -1,18 +1,24 @@ -use std::collections::btree_map::Entry; +use std::{collections::btree_map::Entry, time::Duration}; use binprot::BinProtRead; -use mina_p2p_messages::{gossip, v2}; +use mina_p2p_messages::{ + gossip::{self, GossipNetMessageV2}, + v2::NetworkPoolSnarkPoolDiffVersionedStableV2, +}; use openmina_core::{block::BlockWithHash, bug_condition, fuzz_maybe, fuzzed_maybe, Substate}; use redux::{Dispatcher, Timestamp}; use crate::{ channels::{snark::P2pChannelsSnarkAction, transaction::P2pChannelsTransactionAction}, + disconnection::{P2pDisconnectionAction, P2pDisconnectionReason}, peer::P2pPeerAction, - Data, P2pConfig, P2pNetworkYamuxAction, PeerId, + Data, P2pConfig, P2pNetworkYamuxAction, P2pState, PeerId, }; use super::{ - p2p_network_pubsub_state::P2pNetworkPubsubClientMeshAddingState, + p2p_network_pubsub_state::{ + P2pNetworkPubsubBlockMessage, P2pNetworkPubsubClientMeshAddingState, + }, pb::{self, Message}, P2pNetworkPubsubAction, P2pNetworkPubsubClientState, P2pNetworkPubsubEffectfulAction, P2pNetworkPubsubState, TOPIC, @@ -29,6 +35,7 @@ impl P2pNetworkPubsubState { { let pubsub_state = state_context.get_substate_mut()?; let (action, meta) = action.split(); + let time = meta.time(); match action { P2pNetworkPubsubAction::NewStream { @@ -175,20 +182,19 @@ impl P2pNetworkPubsubState { } => { // Check result later to ensure we always dispatch the cleanup action let reduce_incoming_result = - pubsub_state.reduce_incoming_message(peer_id, message, seen_limit); + pubsub_state.reduce_incoming_message(&message, seen_limit); let (dispatcher, global_state) = state_context.into_dispatcher_and_state(); + let p2p_state: &P2pState = global_state.substate()?; + let state: &Self = global_state.substate()?; dispatcher.push(P2pNetworkPubsubAction::IncomingMessageCleanup { peer_id }); - reduce_incoming_result?; - - let state: &Self = global_state.substate()?; - let config: &P2pConfig = global_state.substate()?; + let message_content = reduce_incoming_result?; for (topic_id, map) in &state.topics { let mesh_size = map.values().filter(|s| s.on_mesh()).count(); - let could_accept = mesh_size < config.meshsub.outbound_degree_high; + let could_accept = mesh_size < p2p_state.config.meshsub.outbound_degree_high; if !could_accept { if let Some(topic_state) = map.get(&peer_id) { @@ -200,30 +206,31 @@ impl P2pNetworkPubsubState { } } - if let Err(error) = Self::broadcast(dispatcher, global_state) { - bug_condition!( - "Failure when trying to broadcast incoming pubsub message: {error}" - ); + let message_content = match message_content { + ReduceIncomingDataResult::AlreadySeen => { + dispatcher.push(P2pNetworkPubsubAction::BroadcastValidationCallback { + message, + message_content: None, + peer_id, + result: super::ValidationResult::Ignore, + }); + return Ok(()); + } + ReduceIncomingDataResult::None => None, + ReduceIncomingDataResult::Some(message) => Some(message), }; - if let Some((_, block)) = state.incoming_block.as_ref() { - let best_tip = BlockWithHash::try_new(block.clone())?; - dispatcher.push(P2pPeerAction::BestTipUpdate { peer_id, best_tip }); - } - for (transaction, nonce) in &state.incoming_transactions { - dispatcher.push(P2pChannelsTransactionAction::Libp2pReceived { - peer_id, - transaction: Box::new(transaction.clone()), - nonce: *nonce, - }); - } - for (snark, nonce) in &state.incoming_snarks { - dispatcher.push(P2pChannelsSnarkAction::Libp2pReceived { + if let Some(callback) = p2p_state.callbacks.on_p2p_pubsub_message_received.clone() { + dispatcher.push_callback(callback, (message, message_content, peer_id)); + } else { + dispatcher.push(P2pNetworkPubsubAction::BroadcastValidationCallback { + message, + message_content, peer_id, - snark: Box::new(snark.clone()), - nonce: *nonce, + result: super::ValidationResult::Valid, }); } + Ok(()) } P2pNetworkPubsubAction::IncomingMessageCleanup { peer_id } => { @@ -440,6 +447,126 @@ impl P2pNetworkPubsubState { } Ok(()) } + P2pNetworkPubsubAction::BroadcastValidationCallback { + message, + message_content, + peer_id, + result, + } => { + let message_id = pubsub_state.mcache.put(message.clone()); + + match result { + super::ValidationResult::Valid => match &message_content { + Some(GossipNetMessageV2::NewState(block)) => { + let hash = block.try_hash()?; + pubsub_state.block_messages.entry(hash).or_insert_with(|| { + P2pNetworkPubsubBlockMessage { + peer_id, + message_id, + expiration_time: time + Duration::from_secs(600), + } + }); + } + _ => pubsub_state + .reduce_incoming_validated_message(message_id, peer_id, message), + }, + super::ValidationResult::Reject => {} + super::ValidationResult::Ignore => {} + } + + let (dispatcher, global_state) = state_context.into_dispatcher_and_state(); + + match result { + super::ValidationResult::Valid => { + match message_content { + Some(GossipNetMessageV2::NewState(block)) => { + let best_tip = BlockWithHash::try_new(block.clone())?; + dispatcher.push(P2pPeerAction::BestTipUpdate { peer_id, best_tip }); + return Ok(()); + } + Some(GossipNetMessageV2::TransactionPoolDiff { message, nonce }) => { + let nonce = nonce.as_u32(); + for transaction in message.0 { + dispatcher.push(P2pChannelsTransactionAction::Libp2pReceived { + peer_id, + transaction: Box::new(transaction), + nonce, + }); + } + } + Some(GossipNetMessageV2::SnarkPoolDiff { + message: + NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(work), + nonce, + }) => { + dispatcher.push(P2pChannelsSnarkAction::Libp2pReceived { + peer_id, + snark: Box::new(work.1.into()), + nonce: nonce.as_u32(), + }); + } + _ => {} + } + + Self::broadcast(dispatcher, global_state) + } + super::ValidationResult::Reject => { + dispatcher.push(P2pDisconnectionAction::Init { + peer_id, + reason: P2pDisconnectionReason::BlockVerifyError, + }); + Ok(()) + } + super::ValidationResult::Ignore => Ok(()), + } + } + P2pNetworkPubsubAction::BroadcastAcceptedBlock { hash } => { + let Some(message) = dbg!(pubsub_state.block_messages.remove(&hash)) else { + bug_condition!("Block message not found for: {}", hash); + return Ok(()); + }; + + let P2pNetworkPubsubBlockMessage { + message_id, + peer_id, + .. + } = message; + + let Some(message_id) = message_id else { + return Ok(()); + }; + dbg!(); + let Some(message) = pubsub_state.mcache.map.get(&message_id) else { + return Ok(()); + }; + + dbg!(); + let message = message.clone(); + pubsub_state.reduce_incoming_validated_message(Some(message_id), peer_id, message); + + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + + Self::broadcast(dispatcher, state) + } + P2pNetworkPubsubAction::PruneMessages {} => { + let blocks = pubsub_state + .block_messages + .iter() + .filter_map(|(hash, message)| { + if message.expiration_time <= time { + Some(hash.to_owned()) + } else { + None + } + }) + .collect::>(); + + for block_hash in blocks { + pubsub_state.block_messages.remove(&block_hash); + } + + Ok(()) + } } } @@ -468,58 +595,14 @@ impl P2pNetworkPubsubState { Ok(()) } - #[inline(never)] - fn reduce_incoming_message( + fn reduce_incoming_validated_message( &mut self, + message_id: Option>, peer_id: PeerId, message: Message, - seen_limit: usize, - ) -> Result<(), String> { + ) { let topic = self.topics.entry(message.topic.clone()).or_default(); - if let Some(signature) = &message.signature { - // skip recently seen message - if !self.seen.contains(signature) { - self.seen.push_back(signature.clone()); - // keep only last `n` to avoid memory leak - if self.seen.len() > seen_limit { - self.seen.pop_front(); - } - } else { - return Ok(()); - } - } - - if let Some(data) = &message.data { - if data.len() > 8 { - let mut slice = &data[8..]; - match gossip::GossipNetMessageV2::binprot_read(&mut slice) { - Ok(gossip::GossipNetMessageV2::NewState(block)) => { - self.incoming_block = Some((peer_id, block)); - } - Ok(gossip::GossipNetMessageV2::TransactionPoolDiff { message, nonce }) => { - let nonce = nonce.as_u32(); - let txs = message.0.into_iter().map(|tx| (tx, nonce)); - self.incoming_transactions.extend(txs); - } - Ok(gossip::GossipNetMessageV2::SnarkPoolDiff { message, nonce }) => { - if let v2::NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(work) = - message - { - self.incoming_snarks.push((work.1.into(), nonce.as_u32())); - } - } - Err(err) => { - return Err(err.to_string()); - } - } - } - } - - let message_id = self.mcache.put(message.clone()); - - // TODO: this should only happen after the contents have been validated. - // The only validation that has happened so far is that the message can be parsed. self.clients .iter_mut() .filter(|(c, _)| { @@ -540,8 +623,39 @@ impl P2pNetworkPubsubState { }) } }); + } - Ok(()) + #[inline(never)] + fn reduce_incoming_message( + &mut self, + message: &Message, + seen_limit: usize, + ) -> Result { + if let Some(signature) = &message.signature { + // skip recently seen message + if !self.seen.contains(signature) { + self.seen.push_back(signature.clone()); + // keep only last `n` to avoid memory leak + if self.seen.len() > seen_limit { + self.seen.pop_front(); + } + } else { + return Ok(ReduceIncomingDataResult::AlreadySeen); + } + } + + let message_content = match &message.data { + Some(data) if data.len() > 8 => { + let mut slice = &data[8..]; + ReduceIncomingDataResult::Some( + gossip::GossipNetMessageV2::binprot_read(&mut slice) + .map_err(|e| e.to_string())?, + ) + } + _ => ReduceIncomingDataResult::None, + }; + + Ok(message_content) } fn combined_with_pending_buffer<'a>(buffer: &'a mut Vec, data: &'a [u8]) -> &'a [u8] { @@ -702,3 +816,9 @@ impl P2pNetworkPubsubState { Ok(()) } } + +enum ReduceIncomingDataResult { + AlreadySeen, + None, + Some(GossipNetMessageV2), +} diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_state.rs b/p2p/src/network/pubsub/p2p_network_pubsub_state.rs index 8ece03753e..1e3e1df591 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_state.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_state.rs @@ -58,6 +58,15 @@ pub struct P2pNetworkPubsubState { /// `iwant` requests, tracking the number of times peers have expressed interest in specific messages. pub iwant: VecDeque, + + pub block_messages: BTreeMap, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct P2pNetworkPubsubBlockMessage { + pub message_id: Option>, + pub expiration_time: Timestamp, + pub peer_id: PeerId, } #[derive(Default, Serialize, Deserialize, Debug, Clone)] diff --git a/p2p/src/p2p_reducer.rs b/p2p/src/p2p_reducer.rs index e98125bad5..3cb17bc77a 100644 --- a/p2p/src/p2p_reducer.rs +++ b/p2p/src/p2p_reducer.rs @@ -9,7 +9,8 @@ use crate::{ }, disconnection::{P2pDisconnectedState, P2pDisconnectionAction}, P2pAction, P2pNetworkKadKey, P2pNetworkKademliaAction, P2pNetworkPnetAction, - P2pNetworkRpcAction, P2pNetworkSelectAction, P2pNetworkState, P2pPeerState, P2pState, PeerId, + P2pNetworkPubsubAction, P2pNetworkRpcAction, P2pNetworkSelectAction, P2pNetworkState, + P2pPeerState, P2pState, PeerId, }; use openmina_core::{bug_condition, Substate}; use redux::{ActionMeta, ActionWithMeta, Dispatcher, Timestamp}; @@ -92,6 +93,7 @@ impl P2pState { state.p2p_pnet_timeouts(dispatcher, time)?; state.p2p_select_timeouts(dispatcher, time)?; state.p2p_rpc_heartbeats(dispatcher, time)?; + dispatcher.push(P2pNetworkPubsubAction::PruneMessages {}); } state.rpc_timeouts(dispatcher, time)?; diff --git a/p2p/src/p2p_state.rs b/p2p/src/p2p_state.rs index 664b7682a7..bad3310811 100644 --- a/p2p/src/p2p_state.rs +++ b/p2p/src/p2p_state.rs @@ -30,13 +30,14 @@ use crate::{ }, is_time_passed, network::{ + self, identify::{P2pNetworkIdentify, P2pNetworkIdentifyState}, P2pNetworkState, }, Limit, P2pConfig, P2pLimits, P2pNetworkKadState, P2pNetworkPubsubState, P2pNetworkSchedulerState, P2pTimeouts, PeerId, }; -use mina_p2p_messages::v2; +use mina_p2p_messages::{gossip::GossipNetMessageV2, v2}; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct P2pState { @@ -562,6 +563,13 @@ pub struct P2pCallbacks { /// Callback for [`P2pChannelsStreamingRpcAction::ResponseReceived`] pub on_p2p_channels_streaming_rpc_response_received: OptionalCallback<(PeerId, P2pRpcId, Option)>, + + /// Callback for received pubsub message + pub on_p2p_pubsub_message_received: OptionalCallback<( + network::pubsub::pb::Message, + Option, + PeerId, + )>, } impl_substate_access!(P2pState, P2pNetworkState, network); From d2b78db0c17c81623d9fa6aa4e92095dad9e7fe9 Mon Sep 17 00:00:00 2001 From: Mimir Date: Tue, 14 Jan 2025 13:28:11 +0100 Subject: [PATCH 2/5] Removed optional message in pubsub --- .../p2p/callbacks/p2p_callbacks_actions.rs | 2 +- .../p2p/callbacks/p2p_callbacks_reducer.rs | 40 +++++++------- node/src/state.rs | 2 +- node/testing/src/scenarios/p2p/pubsub.rs | 31 ++++------- .../pubsub/p2p_network_pubsub_actions.rs | 2 +- .../pubsub/p2p_network_pubsub_reducer.rs | 54 +++++++------------ .../pubsub/p2p_network_pubsub_state.rs | 15 ++---- p2p/src/p2p_state.rs | 7 +-- 8 files changed, 55 insertions(+), 98 deletions(-) diff --git a/node/src/p2p/callbacks/p2p_callbacks_actions.rs b/node/src/p2p/callbacks/p2p_callbacks_actions.rs index 56b1d9bca5..0e69aa3640 100644 --- a/node/src/p2p/callbacks/p2p_callbacks_actions.rs +++ b/node/src/p2p/callbacks/p2p_callbacks_actions.rs @@ -48,7 +48,7 @@ pub enum P2pCallbacksAction { peer_id: PeerId, }, P2pPubsubValidateMessage { - message_content: Option, + message_content: GossipNetMessageV2, message: p2p::network::pubsub::pb::Message, peer_id: PeerId, }, diff --git a/node/src/p2p/callbacks/p2p_callbacks_reducer.rs b/node/src/p2p/callbacks/p2p_callbacks_reducer.rs index b81e74f250..6e9df9579c 100644 --- a/node/src/p2p/callbacks/p2p_callbacks_reducer.rs +++ b/node/src/p2p/callbacks/p2p_callbacks_reducer.rs @@ -301,33 +301,29 @@ impl crate::State { message_content, peer_id, } => { - let result = if let Some(message_content) = message_content { - match message_content { - GossipNetMessageV2::NewState(new_best_tip) => { - match BlockWithHash::try_new(new_best_tip.clone()) { - Ok(block) => { - let allow_block_too_late = allow_block_too_late(state, &block); - match state.prevalidate_block(&block, allow_block_too_late) { - Ok(()) => ValidationResult::Valid, - Err(BlockPrevalidationError::ReceivedTooLate { - .. - }) => ValidationResult::Ignore, - Err(_) => ValidationResult::Reject, + let result = match message_content { + GossipNetMessageV2::NewState(new_best_tip) => { + match BlockWithHash::try_new(new_best_tip.clone()) { + Ok(block) => { + let allow_block_too_late = allow_block_too_late(state, &block); + match state.prevalidate_block(&block, allow_block_too_late) { + Ok(()) => ValidationResult::Valid, + Err(BlockPrevalidationError::ReceivedTooLate { .. }) => { + ValidationResult::Ignore } - } - Err(_) => { - log::error!(time; "P2pCallbacksAction::P2pPubsubValidateMessage: Invalid bigint in block"); - return; + Err(_) => ValidationResult::Reject, } } + Err(_) => { + log::error!(time; "P2pCallbacksAction::P2pPubsubValidateMessage: Invalid bigint in block"); + return; + } } - _ => { - // TODO: add validation for Snark pool and Transaction pool diffs - ValidationResult::Valid - } } - } else { - ValidationResult::Valid + _ => { + // TODO: add pre validation for Snark pool and Transaction pool diffs + ValidationResult::Valid + } }; dispatcher.push(P2pNetworkPubsubAction::BroadcastValidationCallback { diff --git a/node/src/state.rs b/node/src/state.rs index 5d9fb912ca..26247f9ee4 100644 --- a/node/src/state.rs +++ b/node/src/state.rs @@ -631,7 +631,7 @@ impl P2p { } )), on_p2p_pubsub_message_received: Some(redux::callback!( - on_p2p_pubsub_message_received((message: network::pubsub::pb::Message, message_content: Option, peer_id: PeerId)) -> crate::Action{ + on_p2p_pubsub_message_received((message: network::pubsub::pb::Message, message_content: GossipNetMessageV2, peer_id: PeerId)) -> crate::Action{ P2pCallbacksAction::P2pPubsubValidateMessage { message_content, message, peer_id } } )), diff --git a/node/testing/src/scenarios/p2p/pubsub.rs b/node/testing/src/scenarios/p2p/pubsub.rs index afd9635267..d976c7d4d6 100644 --- a/node/testing/src/scenarios/p2p/pubsub.rs +++ b/node/testing/src/scenarios/p2p/pubsub.rs @@ -1,16 +1,18 @@ use std::time::Duration; +use node::ActionKind; + use crate::{ hosts, node::RustNodeTestingConfig, scenarios::{ClusterRunner, RunCfg, RunCfgAdvanceTime}, }; -/// Receive a block via meshsub +/// Receive a message via meshsub /// 1. Create a normal node with default devnet config, with devnet peers as initial peers /// 2. Wait for 2 minutes /// 3. Create a node with discovery disabled and first node as only peer -/// 4. Wait for first node to broadcast block to second one +/// 4. Wait for first node to broadcast message to second one #[derive(documented::Documented, Default, Clone, Copy)] pub struct P2pReceiveBlock; @@ -19,12 +21,6 @@ impl P2pReceiveBlock { let config = RustNodeTestingConfig::devnet_default().initial_peers(hosts::devnet()); let retransmitter_openmina_node = runner.add_rust_node(config); - let retransmitter_peer_id = runner - .node(retransmitter_openmina_node) - .unwrap() - .state() - .p2p - .my_id(); let _ = runner .run( @@ -47,22 +43,15 @@ impl P2pReceiveBlock { RunCfg::default() .timeout(Duration::from_secs(60 * 30)) .advance_time(RunCfgAdvanceTime::Real) - .action_handler(move |node, state, _, _| { - let Some(p2p) = state.p2p.ready() else { - return false; - }; - + .action_handler(move |node, _state, _, action| { node == receiver_openmina_node - && p2p - .network - .scheduler - .broadcast_state - .incoming_block - .as_ref() - .map_or(false, |(peer_id, _)| peer_id.eq(&retransmitter_peer_id)) + && matches!( + action.action().kind(), + ActionKind::P2pNetworkPubsubBroadcastValidationCallback + ) }), ) .await - .expect("Failed to receive block"); + .expect("Test failed"); } } diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs b/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs index ca5529a949..d8084273c5 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs @@ -145,7 +145,7 @@ pub enum P2pNetworkPubsubAction { BroadcastValidationCallback { message: pb::Message, - message_content: Option, + message_content: GossipNetMessageV2, peer_id: PeerId, result: ValidationResult, }, diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs b/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs index dde387dafc..605662f993 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs @@ -206,18 +206,8 @@ impl P2pNetworkPubsubState { } } - let message_content = match message_content { - ReduceIncomingDataResult::AlreadySeen => { - dispatcher.push(P2pNetworkPubsubAction::BroadcastValidationCallback { - message, - message_content: None, - peer_id, - result: super::ValidationResult::Ignore, - }); - return Ok(()); - } - ReduceIncomingDataResult::None => None, - ReduceIncomingDataResult::Some(message) => Some(message), + let Some(message_content) = message_content else { + return Ok(()); }; if let Some(callback) = p2p_state.callbacks.on_p2p_pubsub_message_received.clone() { @@ -457,7 +447,7 @@ impl P2pNetworkPubsubState { match result { super::ValidationResult::Valid => match &message_content { - Some(GossipNetMessageV2::NewState(block)) => { + GossipNetMessageV2::NewState(block) => { let hash = block.try_hash()?; pubsub_state.block_messages.entry(hash).or_insert_with(|| { P2pNetworkPubsubBlockMessage { @@ -479,12 +469,12 @@ impl P2pNetworkPubsubState { match result { super::ValidationResult::Valid => { match message_content { - Some(GossipNetMessageV2::NewState(block)) => { + GossipNetMessageV2::NewState(block) => { let best_tip = BlockWithHash::try_new(block.clone())?; dispatcher.push(P2pPeerAction::BestTipUpdate { peer_id, best_tip }); return Ok(()); } - Some(GossipNetMessageV2::TransactionPoolDiff { message, nonce }) => { + GossipNetMessageV2::TransactionPoolDiff { message, nonce } => { let nonce = nonce.as_u32(); for transaction in message.0 { dispatcher.push(P2pChannelsTransactionAction::Libp2pReceived { @@ -494,11 +484,11 @@ impl P2pNetworkPubsubState { }); } } - Some(GossipNetMessageV2::SnarkPoolDiff { + GossipNetMessageV2::SnarkPoolDiff { message: NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(work), nonce, - }) => { + } => { dispatcher.push(P2pChannelsSnarkAction::Libp2pReceived { peer_id, snark: Box::new(work.1.into()), @@ -511,6 +501,7 @@ impl P2pNetworkPubsubState { Self::broadcast(dispatcher, global_state) } super::ValidationResult::Reject => { + // TODO: add error variants for transactions and snarks dispatcher.push(P2pDisconnectionAction::Init { peer_id, reason: P2pDisconnectionReason::BlockVerifyError, @@ -521,7 +512,7 @@ impl P2pNetworkPubsubState { } } P2pNetworkPubsubAction::BroadcastAcceptedBlock { hash } => { - let Some(message) = dbg!(pubsub_state.block_messages.remove(&hash)) else { + let Some(message) = pubsub_state.block_messages.remove(&hash) else { bug_condition!("Block message not found for: {}", hash); return Ok(()); }; @@ -535,12 +526,11 @@ impl P2pNetworkPubsubState { let Some(message_id) = message_id else { return Ok(()); }; - dbg!(); + let Some(message) = pubsub_state.mcache.map.get(&message_id) else { return Ok(()); }; - dbg!(); let message = message.clone(); pubsub_state.reduce_incoming_validated_message(Some(message_id), peer_id, message); @@ -630,7 +620,7 @@ impl P2pNetworkPubsubState { &mut self, message: &Message, seen_limit: usize, - ) -> Result { + ) -> Result, String> { if let Some(signature) = &message.signature { // skip recently seen message if !self.seen.contains(signature) { @@ -640,22 +630,20 @@ impl P2pNetworkPubsubState { self.seen.pop_front(); } } else { - return Ok(ReduceIncomingDataResult::AlreadySeen); + return Ok(None); } } - let message_content = match &message.data { + match &message.data { Some(data) if data.len() > 8 => { let mut slice = &data[8..]; - ReduceIncomingDataResult::Some( + Ok(Some( gossip::GossipNetMessageV2::binprot_read(&mut slice) - .map_err(|e| e.to_string())?, - ) + .map_err(|e| format!("Invalid `GossipNetMessageV2` message, error: {e}"))?, + )) } - _ => ReduceIncomingDataResult::None, - }; - - Ok(message_content) + _ => Err("Invalid message".to_owned()), + } } fn combined_with_pending_buffer<'a>(buffer: &'a mut Vec, data: &'a [u8]) -> &'a [u8] { @@ -816,9 +804,3 @@ impl P2pNetworkPubsubState { Ok(()) } } - -enum ReduceIncomingDataResult { - AlreadySeen, - None, - Some(GossipNetMessageV2), -} diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_state.rs b/p2p/src/network/pubsub/p2p_network_pubsub_state.rs index 1e3e1df591..7d08111a5b 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_state.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_state.rs @@ -1,17 +1,14 @@ use super::pb; use crate::{token::BroadcastAlgorithm, ConnectionAddr, PeerId, StreamId}; +use openmina_core::{snark::Snark, transaction::Transaction}; +use redux::Timestamp; +use serde::{Deserialize, Serialize}; use std::{ collections::{BTreeMap, BTreeSet, VecDeque}, - sync::Arc, time::Duration, }; -use mina_p2p_messages::v2; -use openmina_core::{snark::Snark, transaction::Transaction}; -use redux::Timestamp; -use serde::{Deserialize, Serialize}; - pub const IWANT_TIMEOUT_DURATION: Duration = Duration::from_secs(5); /// State of the P2P Network PubSub system. @@ -44,9 +41,6 @@ pub struct P2pNetworkPubsubState { /// For quick access and reducing redundant data transmission across peers. pub mcache: P2pNetworkPubsubMessageCache, - /// Incoming block from a peer, if any. - pub incoming_block: Option<(PeerId, Arc)>, - /// Incoming transactions from peers along with their nonces. pub incoming_transactions: Vec<(Transaction, u32)>, @@ -59,6 +53,7 @@ pub struct P2pNetworkPubsubState { /// `iwant` requests, tracking the number of times peers have expressed interest in specific messages. pub iwant: VecDeque, + /// Block messages currently being processed pub block_messages: BTreeMap, } @@ -132,8 +127,6 @@ impl P2pNetworkPubsubState { self.incoming_transactions.shrink_to(0x20); self.incoming_snarks.shrink_to(0x20); - - self.incoming_block = None; } } diff --git a/p2p/src/p2p_state.rs b/p2p/src/p2p_state.rs index bad3310811..d25d81a5d9 100644 --- a/p2p/src/p2p_state.rs +++ b/p2p/src/p2p_state.rs @@ -565,11 +565,8 @@ pub struct P2pCallbacks { OptionalCallback<(PeerId, P2pRpcId, Option)>, /// Callback for received pubsub message - pub on_p2p_pubsub_message_received: OptionalCallback<( - network::pubsub::pb::Message, - Option, - PeerId, - )>, + pub on_p2p_pubsub_message_received: + OptionalCallback<(network::pubsub::pb::Message, GossipNetMessageV2, PeerId)>, } impl_substate_access!(P2pState, P2pNetworkState, network); From da15fe1d7e3c58bbdc724f8685c4e7ea50760a5e Mon Sep 17 00:00:00 2001 From: Mimir Date: Tue, 14 Jan 2025 17:51:20 +0100 Subject: [PATCH 3/5] Review fixes --- docs/testing/README.md | 4 +- node/src/action_kind.rs | 12 +- .../p2p/callbacks/p2p_callbacks_actions.rs | 5 +- .../p2p/callbacks/p2p_callbacks_reducer.rs | 59 ++-- node/src/state.rs | 11 +- .../sync/transition_frontier_sync_effects.rs | 12 +- node/testing/src/scenarios/mod.rs | 8 +- node/testing/src/scenarios/p2p/pubsub.rs | 4 +- node/testing/tests/p2p_pubsub.rs | 4 +- p2p/src/disconnection/mod.rs | 4 +- p2p/src/network/pubsub/mod.rs | 14 +- .../pubsub/p2p_network_pubsub_actions.rs | 22 +- .../pubsub/p2p_network_pubsub_reducer.rs | 262 +++++++++++------- .../pubsub/p2p_network_pubsub_state.rs | 150 ++++++++-- p2p/src/p2p_state.rs | 6 +- 15 files changed, 391 insertions(+), 186 deletions(-) diff --git a/docs/testing/README.md b/docs/testing/README.md index 3fe8f6d31b..f1af329259 100644 --- a/docs/testing/README.md +++ b/docs/testing/README.md @@ -103,8 +103,8 @@ Test that node discovers peers another rust node and is able to bootstrap Tests related to pubsub layer. -* `P2pReceiveBlock` -Test that node receives block over meshsub from node +* `P2pReceiveMessage` +Test that node receives message over meshsub from node ### [P2P Incoming](../../node/testing/tests/p2p_basic_incoming.rs) diff --git a/node/src/action_kind.rs b/node/src/action_kind.rs index 2b1b567be0..5b5fc2f71e 100644 --- a/node/src/action_kind.rs +++ b/node/src/action_kind.rs @@ -404,10 +404,11 @@ pub enum ActionKind { P2pNetworkPnetEffectfulOutgoingData, P2pNetworkPnetEffectfulSetupNonce, P2pNetworkPubsubBroadcast, - P2pNetworkPubsubBroadcastAcceptedBlock, P2pNetworkPubsubBroadcastSigned, + P2pNetworkPubsubBroadcastValidatedMessage, P2pNetworkPubsubBroadcastValidationCallback, P2pNetworkPubsubGraft, + P2pNetworkPubsubHandleIncomingMessage, P2pNetworkPubsubIncomingData, P2pNetworkPubsubIncomingMessage, P2pNetworkPubsubIncomingMessageCleanup, @@ -418,6 +419,7 @@ pub enum ActionKind { P2pNetworkPubsubOutgoingMessageError, P2pNetworkPubsubPrune, P2pNetworkPubsubPruneMessages, + P2pNetworkPubsubRejectMessage, P2pNetworkPubsubSign, P2pNetworkPubsubSignError, P2pNetworkPubsubValidateIncomingMessages, @@ -711,7 +713,7 @@ pub enum ActionKind { } impl ActionKind { - pub const COUNT: u16 = 601; + pub const COUNT: u16 = 603; } impl std::fmt::Display for ActionKind { @@ -1963,10 +1965,12 @@ impl ActionKindGet for P2pNetworkPubsubAction { Self::BroadcastValidationCallback { .. } => { ActionKind::P2pNetworkPubsubBroadcastValidationCallback } - Self::BroadcastAcceptedBlock { .. } => { - ActionKind::P2pNetworkPubsubBroadcastAcceptedBlock + Self::BroadcastValidatedMessage { .. } => { + ActionKind::P2pNetworkPubsubBroadcastValidatedMessage } + Self::HandleIncomingMessage { .. } => ActionKind::P2pNetworkPubsubHandleIncomingMessage, Self::PruneMessages { .. } => ActionKind::P2pNetworkPubsubPruneMessages, + Self::RejectMessage { .. } => ActionKind::P2pNetworkPubsubRejectMessage, } } } diff --git a/node/src/p2p/callbacks/p2p_callbacks_actions.rs b/node/src/p2p/callbacks/p2p_callbacks_actions.rs index 0e69aa3640..0a437eb7c4 100644 --- a/node/src/p2p/callbacks/p2p_callbacks_actions.rs +++ b/node/src/p2p/callbacks/p2p_callbacks_actions.rs @@ -1,4 +1,3 @@ -use mina_p2p_messages::gossip::GossipNetMessageV2; use openmina_core::ActionEvent; use p2p::{ channels::{ @@ -48,9 +47,7 @@ pub enum P2pCallbacksAction { peer_id: PeerId, }, P2pPubsubValidateMessage { - message_content: GossipNetMessageV2, - message: p2p::network::pubsub::pb::Message, - peer_id: PeerId, + message_id: Vec, }, } diff --git a/node/src/p2p/callbacks/p2p_callbacks_reducer.rs b/node/src/p2p/callbacks/p2p_callbacks_reducer.rs index 6e9df9579c..26e6907ecd 100644 --- a/node/src/p2p/callbacks/p2p_callbacks_reducer.rs +++ b/node/src/p2p/callbacks/p2p_callbacks_reducer.rs @@ -11,7 +11,7 @@ use p2p::{ streaming_rpc::P2pStreamingRpcResponseFull, }, disconnection::{P2pDisconnectionAction, P2pDisconnectionReason}, - P2pNetworkPubsubAction, PeerId, ValidationResult, + P2pNetworkPubsubAction, PeerId, }; use redux::{ActionMeta, ActionWithMeta, Dispatcher}; @@ -296,42 +296,57 @@ impl crate::State { best_tip: best_tip.clone(), }); } - P2pCallbacksAction::P2pPubsubValidateMessage { - message, - message_content, - peer_id, - } => { - let result = match message_content { + P2pCallbacksAction::P2pPubsubValidateMessage { message_id } => { + let Some(message_content) = state.p2p.ready().and_then(|p2p| { + p2p.network + .scheduler + .broadcast_state + .mcache + .get_message(message_id) + }) else { + return; + }; + + let pre_validation_result = match message_content { GossipNetMessageV2::NewState(new_best_tip) => { match BlockWithHash::try_new(new_best_tip.clone()) { Ok(block) => { let allow_block_too_late = allow_block_too_late(state, &block); match state.prevalidate_block(&block, allow_block_too_late) { - Ok(()) => ValidationResult::Valid, - Err(BlockPrevalidationError::ReceivedTooLate { .. }) => { - ValidationResult::Ignore + Ok(()) => PreValidationResult::Continue, + Err(BlockPrevalidationError::ReceivedTooEarly { .. }) => { + PreValidationResult::Ignore } - Err(_) => ValidationResult::Reject, + Err(_) => PreValidationResult::Reject, } } Err(_) => { log::error!(time; "P2pCallbacksAction::P2pPubsubValidateMessage: Invalid bigint in block"); - return; + PreValidationResult::Reject } } } _ => { // TODO: add pre validation for Snark pool and Transaction pool diffs - ValidationResult::Valid + PreValidationResult::Continue } }; - dispatcher.push(P2pNetworkPubsubAction::BroadcastValidationCallback { - message: message.clone(), - message_content: message_content.clone(), - peer_id: *peer_id, - result, - }); + match pre_validation_result { + PreValidationResult::Continue => { + dispatcher.push(P2pNetworkPubsubAction::BroadcastValidationCallback { + message_id: message_id.clone(), + }); + } + PreValidationResult::Reject => { + dispatcher.push(P2pNetworkPubsubAction::RejectMessage { + message_id: p2p::BroadcastMessageId::MessageId { + message_id: message_id.clone(), + }, + }); + } + PreValidationResult::Ignore => {} + } } } } @@ -617,3 +632,9 @@ impl crate::State { } } } + +enum PreValidationResult { + Continue, + Reject, + Ignore, +} diff --git a/node/src/state.rs b/node/src/state.rs index 26247f9ee4..d99a7ca853 100644 --- a/node/src/state.rs +++ b/node/src/state.rs @@ -1,12 +1,9 @@ -use std::sync::Arc; -use std::time::Duration; - -use mina_p2p_messages::gossip::GossipNetMessageV2; use mina_p2p_messages::v2; use openmina_core::constants::PROTOCOL_VERSION; use openmina_core::transaction::{TransactionInfo, TransactionWithHash}; -use p2p::network; use rand::prelude::*; +use std::sync::Arc; +use std::time::Duration; use openmina_core::block::BlockWithHash; use openmina_core::requests::RpcId; @@ -631,8 +628,8 @@ impl P2p { } )), on_p2p_pubsub_message_received: Some(redux::callback!( - on_p2p_pubsub_message_received((message: network::pubsub::pb::Message, message_content: GossipNetMessageV2, peer_id: PeerId)) -> crate::Action{ - P2pCallbacksAction::P2pPubsubValidateMessage { message_content, message, peer_id } + on_p2p_pubsub_message_received((message_id: Vec)) -> crate::Action{ + P2pCallbacksAction::P2pPubsubValidateMessage { message_id } } )), } diff --git a/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs b/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs index 07da7c580f..aa0bd8b0d4 100644 --- a/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs +++ b/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs @@ -76,9 +76,6 @@ impl TransitionFrontierSyncAction { if let Some(callback) = on_success { store.dispatch_callback(callback.clone(), ()); } - - let hash = best_tip.hash.clone(); - store.dispatch(P2pNetworkPubsubAction::BroadcastAcceptedBlock { hash }); } // TODO(tizoc): this action is never called with the current implementation, // either remove it or figure out how to recover it as a reaction to @@ -307,6 +304,10 @@ impl TransitionFrontierSyncAction { }; let error = SyncError::BlockApplyFailed(failed_block.clone(), error.clone()); store.dispatch(TransitionFrontierAction::SyncFailed { best_tip, error }); + // TODO this should be handled by a callback + store.dispatch(P2pNetworkPubsubAction::RejectMessage { + message_id: p2p::BroadcastMessageId::BlockHash { hash: hash.clone() }, + }); } TransitionFrontierSyncAction::BlocksNextApplySuccess { hash, @@ -319,6 +320,11 @@ impl TransitionFrontierSyncAction { if !store.dispatch(TransitionFrontierSyncAction::BlocksNextApplyInit) { store.dispatch(TransitionFrontierSyncAction::BlocksSuccess); } + + // TODO this should be handled by a callback + store.dispatch(P2pNetworkPubsubAction::BroadcastValidatedMessage { + message_id: p2p::BroadcastMessageId::BlockHash { hash: hash.clone() }, + }); } TransitionFrontierSyncAction::BlocksSuccess => {} // Bootstrap/Catchup is practically complete at this point. diff --git a/node/testing/src/scenarios/mod.rs b/node/testing/src/scenarios/mod.rs index 9bacc6dbe1..06ba500a8f 100644 --- a/node/testing/src/scenarios/mod.rs +++ b/node/testing/src/scenarios/mod.rs @@ -51,7 +51,7 @@ use self::p2p::basic_outgoing_connections::{ MakeMultipleOutgoingConnections, MakeOutgoingConnection, }; use self::p2p::kademlia::KademliaBootstrap; -use self::p2p::pubsub::P2pReceiveBlock; +use self::p2p::pubsub::P2pReceiveMessage; use self::p2p::signaling::P2pSignaling; use self::record_replay::block_production::RecordReplayBlockProduction; use self::record_replay::bootstrap::RecordReplayBootstrap; @@ -83,7 +83,7 @@ pub enum Scenarios { MultiNodeBasicConnectivityPeerDiscovery(MultiNodeBasicConnectivityPeerDiscovery), SimulationSmall(SimulationSmall), SimulationSmallForeverRealTime(SimulationSmallForeverRealTime), - P2pReceiveBlock(P2pReceiveBlock), + P2pReceiveMessage(P2pReceiveMessage), P2pSignaling(P2pSignaling), P2pConnectionDiscoveryRustNodeAsSeed(P2pConnectionDiscoveryRustNodeAsSeed), MultiNodePubsubPropagateBlock(MultiNodePubsubPropagateBlock), @@ -189,7 +189,7 @@ impl Scenarios { } Self::SimulationSmall(_) => SimulationSmall::DOCS, Self::SimulationSmallForeverRealTime(_) => SimulationSmallForeverRealTime::DOCS, - Self::P2pReceiveBlock(_) => P2pReceiveBlock::DOCS, + Self::P2pReceiveMessage(_) => P2pReceiveMessage::DOCS, Self::P2pSignaling(_) => P2pSignaling::DOCS, Self::P2pConnectionDiscoveryRustNodeAsSeed(_) => { P2pConnectionDiscoveryRustNodeAsSeed::DOCS @@ -260,7 +260,7 @@ impl Scenarios { Self::MultiNodeBasicConnectivityPeerDiscovery(v) => v.run(runner).await, Self::SimulationSmall(v) => v.run(runner).await, Self::SimulationSmallForeverRealTime(v) => v.run(runner).await, - Self::P2pReceiveBlock(v) => v.run(runner).await, + Self::P2pReceiveMessage(v) => v.run(runner).await, Self::P2pSignaling(v) => v.run(runner).await, Self::P2pConnectionDiscoveryRustNodeAsSeed(v) => v.run(runner).await, Self::MultiNodePubsubPropagateBlock(v) => v.run(runner).await, diff --git a/node/testing/src/scenarios/p2p/pubsub.rs b/node/testing/src/scenarios/p2p/pubsub.rs index d976c7d4d6..fa5ccfa6c0 100644 --- a/node/testing/src/scenarios/p2p/pubsub.rs +++ b/node/testing/src/scenarios/p2p/pubsub.rs @@ -14,9 +14,9 @@ use crate::{ /// 3. Create a node with discovery disabled and first node as only peer /// 4. Wait for first node to broadcast message to second one #[derive(documented::Documented, Default, Clone, Copy)] -pub struct P2pReceiveBlock; +pub struct P2pReceiveMessage; -impl P2pReceiveBlock { +impl P2pReceiveMessage { pub async fn run(self, mut runner: ClusterRunner<'_>) { let config = RustNodeTestingConfig::devnet_default().initial_peers(hosts::devnet()); diff --git a/node/testing/tests/p2p_pubsub.rs b/node/testing/tests/p2p_pubsub.rs index a870b346bc..025853d646 100644 --- a/node/testing/tests/p2p_pubsub.rs +++ b/node/testing/tests/p2p_pubsub.rs @@ -1,5 +1,5 @@ -use openmina_node_testing::scenarios::p2p::pubsub::P2pReceiveBlock; +use openmina_node_testing::scenarios::p2p::pubsub::P2pReceiveMessage; mod common; -scenario_test!(pubsub_receive_block, P2pReceiveBlock, P2pReceiveBlock); +scenario_test!(pubsub_receive_block, P2pReceiveMessage, P2pReceiveMessage); diff --git a/p2p/src/disconnection/mod.rs b/p2p/src/disconnection/mod.rs index 8db9a23e46..a2fbc649c5 100644 --- a/p2p/src/disconnection/mod.rs +++ b/p2p/src/disconnection/mod.rs @@ -35,12 +35,12 @@ pub enum P2pDisconnectionReason { TransitionFrontierSyncLedgerSnarkedNumAccountsRejected, #[error("failed to verify snark pool diff")] SnarkPoolVerifyError, - #[error("failed to verify block")] - BlockVerifyError, #[error("duplicate connection")] DuplicateConnection, #[error("timeout")] Timeout, #[error("rpc protocol not supported")] Unsupported, + #[error("invalid pubsub message")] + InvalidMessage, } diff --git a/p2p/src/network/pubsub/mod.rs b/p2p/src/network/pubsub/mod.rs index d52354f1e0..26f1126729 100644 --- a/p2p/src/network/pubsub/mod.rs +++ b/p2p/src/network/pubsub/mod.rs @@ -18,11 +18,13 @@ const TOPIC: &str = "coda/consensus-messages/0.0.1"; pub mod pubsub_effectful; pub use pubsub_effectful::P2pNetworkPubsubEffectfulAction; -use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, Copy, Deserialize, Serialize)] -pub enum ValidationResult { - Valid, - Reject, - Ignore, +#[derive(serde::Serialize, serde:: Deserialize, Debug, Clone)] +pub enum BroadcastMessageId { + BlockHash { + hash: mina_p2p_messages::v2::StateHash, + }, + MessageId { + message_id: Vec, + }, } diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs b/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs index d8084273c5..8c604623c9 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs @@ -1,6 +1,6 @@ -use super::{pb, ValidationResult}; +use super::{pb, BroadcastMessageId}; use crate::{token::BroadcastAlgorithm, ConnectionAddr, Data, P2pState, PeerId, StreamId}; -use mina_p2p_messages::{gossip::GossipNetMessageV2, v2}; +use mina_p2p_messages::gossip::GossipNetMessageV2; use openmina_core::ActionEvent; use serde::{Deserialize, Serialize}; @@ -144,17 +144,22 @@ pub enum P2pNetworkPubsubAction { }, BroadcastValidationCallback { + message_id: Vec, + }, + + BroadcastValidatedMessage { + message_id: BroadcastMessageId, + }, + HandleIncomingMessage { message: pb::Message, message_content: GossipNetMessageV2, peer_id: PeerId, - result: ValidationResult, - }, - - BroadcastAcceptedBlock { - hash: v2::StateHash, }, /// Delete expired messages from state PruneMessages {}, + RejectMessage { + message_id: BroadcastMessageId, + }, } impl From for crate::P2pAction { @@ -175,9 +180,6 @@ impl redux::EnablingCondition for P2pNetworkPubsubAction { .topics .get(topic_id) .map_or(false, |topics| topics.contains_key(peer_id)), - P2pNetworkPubsubAction::BroadcastAcceptedBlock { hash } => { - pubsub.block_messages.contains_key(hash) - } _ => true, } } diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs b/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs index 605662f993..479113a12f 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs @@ -17,7 +17,8 @@ use crate::{ use super::{ p2p_network_pubsub_state::{ - P2pNetworkPubsubBlockMessage, P2pNetworkPubsubClientMeshAddingState, + source_from_message, P2pNetworkPubsubClientMeshAddingState, + P2pNetworkPubsubMessageCacheMessage, }, pb::{self, Message}, P2pNetworkPubsubAction, P2pNetworkPubsubClientState, P2pNetworkPubsubEffectfulAction, @@ -180,6 +181,16 @@ impl P2pNetworkPubsubState { message, seen_limit, } => { + // Check that if we can extract source from message, this is pre check + if source_from_message(&message).is_err() { + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(P2pDisconnectionAction::Init { + peer_id, + reason: P2pDisconnectionReason::InvalidMessage, + }); + return Ok(()); + } + // Check result later to ensure we always dispatch the cleanup action let reduce_incoming_result = pubsub_state.reduce_incoming_message(&message, seen_limit); @@ -206,21 +217,41 @@ impl P2pNetworkPubsubState { } } + // This happens if message was already seen let Some(message_content) = message_content else { return Ok(()); }; + dispatcher.push(P2pNetworkPubsubAction::HandleIncomingMessage { + message, + message_content, + peer_id, + }); + + Ok(()) + } + P2pNetworkPubsubAction::HandleIncomingMessage { + message, + message_content, + peer_id, + } => { + let Ok(message_id) = + pubsub_state + .mcache + .put(message, message_content, peer_id, time) + else { + bug_condition!("Unable to add message to `mcache`"); + return Ok(()); + }; + + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + let p2p_state: &P2pState = state.substate()?; if let Some(callback) = p2p_state.callbacks.on_p2p_pubsub_message_received.clone() { - dispatcher.push_callback(callback, (message, message_content, peer_id)); + dispatcher.push_callback(callback, message_id); } else { - dispatcher.push(P2pNetworkPubsubAction::BroadcastValidationCallback { - message, - message_content, - peer_id, - result: super::ValidationResult::Valid, - }); + dispatcher + .push(P2pNetworkPubsubAction::BroadcastValidationCallback { message_id }); } - Ok(()) } P2pNetworkPubsubAction::IncomingMessageCleanup { peer_id } => { @@ -437,123 +468,137 @@ impl P2pNetworkPubsubState { } Ok(()) } - P2pNetworkPubsubAction::BroadcastValidationCallback { - message, - message_content, - peer_id, - result, - } => { - let message_id = pubsub_state.mcache.put(message.clone()); + P2pNetworkPubsubAction::BroadcastValidationCallback { message_id } => { + let Some(message) = pubsub_state.mcache.map.remove(&message_id) else { + return Ok(()); + }; - match result { - super::ValidationResult::Valid => match &message_content { - GossipNetMessageV2::NewState(block) => { - let hash = block.try_hash()?; - pubsub_state.block_messages.entry(hash).or_insert_with(|| { - P2pNetworkPubsubBlockMessage { - peer_id, - message_id, - expiration_time: time + Duration::from_secs(600), - } - }); + let P2pNetworkPubsubMessageCacheMessage::Initial { + message, + content, + time, + peer_id, + } = message + else { + bug_condition!("`P2pNetworkPubsubAction::BroadcastValidationCallback` called on invalid state"); + return Ok(()); + }; + + let new_message_state = match &content { + GossipNetMessageV2::NewState(block) => { + let block_hash = block.try_hash()?; + P2pNetworkPubsubMessageCacheMessage::PreValidatedBlockMessage { + block_hash, + message, + peer_id, + time, } - _ => pubsub_state - .reduce_incoming_validated_message(message_id, peer_id, message), + } + _ => P2pNetworkPubsubMessageCacheMessage::PreValidated { + message, + peer_id, + time, }, - super::ValidationResult::Reject => {} - super::ValidationResult::Ignore => {} - } + }; + pubsub_state + .mcache + .map + .insert(message_id.clone(), new_message_state); - let (dispatcher, global_state) = state_context.into_dispatcher_and_state(); + let dispatcher = state_context.into_dispatcher(); - match result { - super::ValidationResult::Valid => { - match message_content { - GossipNetMessageV2::NewState(block) => { - let best_tip = BlockWithHash::try_new(block.clone())?; - dispatcher.push(P2pPeerAction::BestTipUpdate { peer_id, best_tip }); - return Ok(()); - } - GossipNetMessageV2::TransactionPoolDiff { message, nonce } => { - let nonce = nonce.as_u32(); - for transaction in message.0 { - dispatcher.push(P2pChannelsTransactionAction::Libp2pReceived { - peer_id, - transaction: Box::new(transaction), - nonce, - }); - } - } - GossipNetMessageV2::SnarkPoolDiff { - message: - NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(work), + match content { + GossipNetMessageV2::NewState(block) => { + let best_tip = BlockWithHash::try_new(block.clone())?; + dispatcher.push(P2pPeerAction::BestTipUpdate { peer_id, best_tip }); + return Ok(()); + } + GossipNetMessageV2::TransactionPoolDiff { message, nonce } => { + let nonce = nonce.as_u32(); + for transaction in message.0 { + dispatcher.push(P2pChannelsTransactionAction::Libp2pReceived { + peer_id, + transaction: Box::new(transaction), nonce, - } => { - dispatcher.push(P2pChannelsSnarkAction::Libp2pReceived { - peer_id, - snark: Box::new(work.1.into()), - nonce: nonce.as_u32(), - }); - } - _ => {} + }); } - - Self::broadcast(dispatcher, global_state) } - super::ValidationResult::Reject => { - // TODO: add error variants for transactions and snarks - dispatcher.push(P2pDisconnectionAction::Init { + GossipNetMessageV2::SnarkPoolDiff { + message: NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(work), + nonce, + } => { + dispatcher.push(P2pChannelsSnarkAction::Libp2pReceived { peer_id, - reason: P2pDisconnectionReason::BlockVerifyError, + snark: Box::new(work.1.into()), + nonce: nonce.as_u32(), }); - Ok(()) } - super::ValidationResult::Ignore => Ok(()), + _ => {} } + + dispatcher.push(P2pNetworkPubsubAction::BroadcastValidatedMessage { + message_id: super::BroadcastMessageId::MessageId { message_id }, + }); + Ok(()) } - P2pNetworkPubsubAction::BroadcastAcceptedBlock { hash } => { - let Some(message) = pubsub_state.block_messages.remove(&hash) else { - bug_condition!("Block message not found for: {}", hash); + P2pNetworkPubsubAction::BroadcastValidatedMessage { message_id } => { + let Some((message_id, message)) = + pubsub_state.mcache.get_message_id_and_message(message_id) + else { return Ok(()); }; - let P2pNetworkPubsubBlockMessage { - message_id, + let peer_id = message.peer_id(); + let raw_message = message.message().clone(); + *message = P2pNetworkPubsubMessageCacheMessage::Validated { + message: raw_message.clone(), peer_id, - .. - } = message; - - let Some(message_id) = message_id else { - return Ok(()); + time: message.time(), }; - - let Some(message) = pubsub_state.mcache.map.get(&message_id) else { - return Ok(()); - }; - - let message = message.clone(); - pubsub_state.reduce_incoming_validated_message(Some(message_id), peer_id, message); + pubsub_state.reduce_incoming_validated_message(message_id, peer_id, &raw_message); let (dispatcher, state) = state_context.into_dispatcher_and_state(); Self::broadcast(dispatcher, state) } P2pNetworkPubsubAction::PruneMessages {} => { - let blocks = pubsub_state - .block_messages + let messages = pubsub_state + .mcache + .map .iter() - .filter_map(|(hash, message)| { - if message.expiration_time <= time { - Some(hash.to_owned()) + .filter_map(|(message_id, message)| { + if message.time() + Duration::from_secs(300) > time { + Some(message_id.to_owned()) } else { None } }) - .collect::>(); + .collect::>>(); - for block_hash in blocks { - pubsub_state.block_messages.remove(&block_hash); + for message_id in messages { + pubsub_state.mcache.remove_message(message_id); } + Ok(()) + } + P2pNetworkPubsubAction::RejectMessage { message_id } => { + let Some((_message_id, message)) = + pubsub_state.mcache.get_message_id_and_message(message_id) + else { + return Ok(()); + }; + + let peer_id = message.peer_id(); + *message = P2pNetworkPubsubMessageCacheMessage::Rejected { + message: message.message().clone(), + peer_id, + time: message.time(), + }; + + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(P2pDisconnectionAction::Init { + peer_id, + reason: P2pDisconnectionReason::InvalidMessage, + }); Ok(()) } @@ -587,9 +632,9 @@ impl P2pNetworkPubsubState { fn reduce_incoming_validated_message( &mut self, - message_id: Option>, + message_id: Vec, peer_id: PeerId, - message: Message, + message: &Message, ) { let topic = self.topics.entry(message.topic.clone()).or_default(); @@ -604,17 +649,34 @@ impl P2pNetworkPubsubState { return; }; if topic_state.on_mesh() { - state.publish(&message) + state.publish(message) } else { let ctr = state.message.control.get_or_insert_with(Default::default); ctr.ihave.push(pb::ControlIHave { topic_id: Some(message.topic.clone()), - message_ids: message_id.clone().into_iter().collect(), + message_ids: vec![message_id.clone()], }) } }); } + /// Processes an incoming message by checking for duplicates and deserializing its contents. + /// + /// This function performs two main operations: + /// 1. Deduplication: Tracks recently seen messages using their signatures to avoid processing duplicates + /// 2. Deserialization: Converts valid message data into a `GossipNetMessageV2` structure + /// + /// # Arguments + /// + /// * `message` - The incoming message to process + /// * `seen_limit` - Maximum number of message signatures to keep in the deduplication cache + /// + /// # Returns + /// + /// * `Ok(Some(GossipNetMessageV2))` - Successfully processed and deserialized message + /// * `Ok(None)` - Message was a duplicate (already seen) + /// * `Err(String)` - Error during processing (invalid message format or deserialization failure) + /// #[inline(never)] fn reduce_incoming_message( &mut self, @@ -748,7 +810,7 @@ impl P2pNetworkPubsubState { for msg_id in &iwant.message_ids { if let Some(msg) = self.mcache.map.get(msg_id) { if let Some(client) = self.clients.get_mut(peer_id) { - client.publish(msg); + client.publish(msg.message()); } } } diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_state.rs b/p2p/src/network/pubsub/p2p_network_pubsub_state.rs index 7d08111a5b..fde61c75f9 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_state.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_state.rs @@ -1,6 +1,8 @@ -use super::pb; +use super::{pb, BroadcastMessageId}; use crate::{token::BroadcastAlgorithm, ConnectionAddr, PeerId, StreamId}; +use libp2p_identity::ParseError; +use mina_p2p_messages::gossip::GossipNetMessageV2; use openmina_core::{snark::Snark, transaction::Transaction}; use redux::Timestamp; use serde::{Deserialize, Serialize}; @@ -52,14 +54,11 @@ pub struct P2pNetworkPubsubState { /// `iwant` requests, tracking the number of times peers have expressed interest in specific messages. pub iwant: VecDeque, - - /// Block messages currently being processed - pub block_messages: BTreeMap, } #[derive(Serialize, Deserialize, Debug, Clone)] pub struct P2pNetworkPubsubBlockMessage { - pub message_id: Option>, + pub message_id: Vec, pub expiration_time: Timestamp, pub peer_id: PeerId, } @@ -174,7 +173,7 @@ pub struct P2pNetworkPubsubClientState { impl P2pNetworkPubsubClientState { pub fn publish(&mut self, message: &pb::Message) { - let Some(id) = compute_message_id(message) else { + let Ok(id) = compute_message_id(message) else { self.message.publish.push(message.clone()); return; }; @@ -209,38 +208,155 @@ pub struct P2pNetworkPubsubRecentlyPublishCache { // TODO: store blocks, snarks and txs separately #[derive(Default, Serialize, Deserialize, Debug, Clone)] pub struct P2pNetworkPubsubMessageCache { - pub map: BTreeMap, pb::Message>, + pub map: BTreeMap, P2pNetworkPubsubMessageCacheMessage>, pub queue: VecDeque>, } +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum P2pNetworkPubsubMessageCacheMessage { + Initial { + message: pb::Message, + content: GossipNetMessageV2, + peer_id: PeerId, + time: Timestamp, + }, + PreValidatedBlockMessage { + block_hash: mina_p2p_messages::v2::StateHash, + message: pb::Message, + peer_id: PeerId, + time: Timestamp, + }, + // This is temporary handling for transactions and snark pool + PreValidated { + message: pb::Message, + peer_id: PeerId, + time: Timestamp, + }, + Rejected { + message: pb::Message, + peer_id: PeerId, + time: Timestamp, + }, + Validated { + message: pb::Message, + peer_id: PeerId, + time: Timestamp, + }, +} + +impl P2pNetworkPubsubMessageCacheMessage { + pub fn message(&self) -> &pb::Message { + match self { + Self::Initial { message, .. } => message, + Self::PreValidated { message, .. } => message, + Self::PreValidatedBlockMessage { message, .. } => message, + Self::Rejected { message, .. } => message, + Self::Validated { message, .. } => message, + } + } + pub fn time(&self) -> Timestamp { + *match self { + Self::Initial { time, .. } => time, + Self::PreValidated { time, .. } => time, + Self::PreValidatedBlockMessage { time, .. } => time, + Self::Rejected { time, .. } => time, + Self::Validated { time, .. } => time, + } + } + pub fn peer_id(&self) -> PeerId { + *match self { + Self::Initial { peer_id, .. } => peer_id, + Self::PreValidated { peer_id, .. } => peer_id, + Self::PreValidatedBlockMessage { peer_id, .. } => peer_id, + Self::Rejected { peer_id, .. } => peer_id, + Self::Validated { peer_id, .. } => peer_id, + } + } +} + impl P2pNetworkPubsubMessageCache { const CAPACITY: usize = 100; - pub fn put(&mut self, message: pb::Message) -> Option> { + pub fn put( + &mut self, + message: pb::Message, + content: GossipNetMessageV2, + peer_id: PeerId, + time: Timestamp, + ) -> Result, ParseError> { let id = compute_message_id(&message)?; - self.map.insert(id.clone(), message); + self.map.insert( + id.clone(), + P2pNetworkPubsubMessageCacheMessage::Initial { + message, + content, + time, + peer_id, + }, + ); + self.queue.push_back(id.clone()); if self.queue.len() > Self::CAPACITY { if let Some(id) = self.queue.pop_front() { self.map.remove(&id); } } - Some(id) + Ok(id) + } + + pub fn get_message(&self, id: &Vec) -> Option<&GossipNetMessageV2> { + let message = self.map.get(id)?; + match message { + P2pNetworkPubsubMessageCacheMessage::Initial { content, .. } => Some(content), + _ => None, + } + } + + pub fn get_message_id_and_message( + &mut self, + message_id: BroadcastMessageId, + ) -> Option<(Vec, &mut P2pNetworkPubsubMessageCacheMessage)> { + match message_id { + super::BroadcastMessageId::BlockHash { hash } => { + self.map + .iter_mut() + .find_map(|(message_id, message)| match message { + P2pNetworkPubsubMessageCacheMessage::PreValidatedBlockMessage { + block_hash, + .. + } if *block_hash == hash => Some((message_id.clone(), message)), + _ => None, + }) + } + super::BroadcastMessageId::MessageId { message_id } => self + .map + .get_mut(&message_id) + .map(|content| (message_id, content)), + } + } + + pub fn remove_message(&mut self, message_id: Vec) { + let _ = self.map.remove(&message_id); + if let Some(position) = self.queue.iter().position(|id| id == &message_id) { + self.queue.remove(position); + } } } -// TODO: what if wasm32? -// How to test it? -pub fn compute_message_id(message: &pb::Message) -> Option> { +pub fn source_from_message(message: &pb::Message) -> Result { let source_bytes = message .from .as_ref() .map(AsRef::as_ref) .unwrap_or(&[0, 1, 0][..]); - let mut source_string = libp2p_identity::PeerId::from_bytes(source_bytes) - .ok()? - .to_base58(); + libp2p_identity::PeerId::from_bytes(source_bytes) +} + +// TODO: what if wasm32? +// How to test it? +pub fn compute_message_id(message: &pb::Message) -> Result, ParseError> { + let mut source_string = source_from_message(message)?.to_base58(); let sequence_number = message .seqno @@ -249,7 +365,7 @@ pub fn compute_message_id(message: &pb::Message) -> Option> { .map(u64::from_be_bytes) .unwrap_or_default(); source_string.push_str(&sequence_number.to_string()); - Some(source_string.into_bytes()) + Ok(source_string.into_bytes()) } #[derive(Default, Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] diff --git a/p2p/src/p2p_state.rs b/p2p/src/p2p_state.rs index d25d81a5d9..e083e11cbd 100644 --- a/p2p/src/p2p_state.rs +++ b/p2p/src/p2p_state.rs @@ -30,14 +30,13 @@ use crate::{ }, is_time_passed, network::{ - self, identify::{P2pNetworkIdentify, P2pNetworkIdentifyState}, P2pNetworkState, }, Limit, P2pConfig, P2pLimits, P2pNetworkKadState, P2pNetworkPubsubState, P2pNetworkSchedulerState, P2pTimeouts, PeerId, }; -use mina_p2p_messages::{gossip::GossipNetMessageV2, v2}; +use mina_p2p_messages::v2; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct P2pState { @@ -565,8 +564,7 @@ pub struct P2pCallbacks { OptionalCallback<(PeerId, P2pRpcId, Option)>, /// Callback for received pubsub message - pub on_p2p_pubsub_message_received: - OptionalCallback<(network::pubsub::pb::Message, GossipNetMessageV2, PeerId)>, + pub on_p2p_pubsub_message_received: OptionalCallback>, } impl_substate_access!(P2pState, P2pNetworkState, network); From 6d3285cd30e0612d8e5205f9419674ebd399662c Mon Sep 17 00:00:00 2001 From: Mimir Date: Wed, 15 Jan 2025 16:18:32 +0100 Subject: [PATCH 4/5] Changed type of message id --- .../p2p/callbacks/p2p_callbacks_actions.rs | 4 +- .../p2p/callbacks/p2p_callbacks_reducer.rs | 4 +- node/src/state.rs | 3 +- p2p/src/network/pubsub/mod.rs | 5 +- .../pubsub/p2p_network_pubsub_actions.rs | 4 +- .../pubsub/p2p_network_pubsub_reducer.rs | 12 +- .../pubsub/p2p_network_pubsub_state.rs | 109 +++++++++++------- p2p/src/p2p_state.rs | 6 +- 8 files changed, 89 insertions(+), 58 deletions(-) diff --git a/node/src/p2p/callbacks/p2p_callbacks_actions.rs b/node/src/p2p/callbacks/p2p_callbacks_actions.rs index 0a437eb7c4..46fa16c04a 100644 --- a/node/src/p2p/callbacks/p2p_callbacks_actions.rs +++ b/node/src/p2p/callbacks/p2p_callbacks_actions.rs @@ -4,7 +4,7 @@ use p2p::{ rpc::{P2pRpcId, P2pRpcRequest, P2pRpcResponse}, streaming_rpc::P2pStreamingRpcResponseFull, }, - PeerId, + P2pNetworkPubsubMessageCacheId, PeerId, }; use serde::{Deserialize, Serialize}; @@ -47,7 +47,7 @@ pub enum P2pCallbacksAction { peer_id: PeerId, }, P2pPubsubValidateMessage { - message_id: Vec, + message_id: P2pNetworkPubsubMessageCacheId, }, } diff --git a/node/src/p2p/callbacks/p2p_callbacks_reducer.rs b/node/src/p2p/callbacks/p2p_callbacks_reducer.rs index 26e6907ecd..184bd1984f 100644 --- a/node/src/p2p/callbacks/p2p_callbacks_reducer.rs +++ b/node/src/p2p/callbacks/p2p_callbacks_reducer.rs @@ -335,13 +335,13 @@ impl crate::State { match pre_validation_result { PreValidationResult::Continue => { dispatcher.push(P2pNetworkPubsubAction::BroadcastValidationCallback { - message_id: message_id.clone(), + message_id: *message_id, }); } PreValidationResult::Reject => { dispatcher.push(P2pNetworkPubsubAction::RejectMessage { message_id: p2p::BroadcastMessageId::MessageId { - message_id: message_id.clone(), + message_id: *message_id, }, }); } diff --git a/node/src/state.rs b/node/src/state.rs index d99a7ca853..ad063de2da 100644 --- a/node/src/state.rs +++ b/node/src/state.rs @@ -1,6 +1,7 @@ use mina_p2p_messages::v2; use openmina_core::constants::PROTOCOL_VERSION; use openmina_core::transaction::{TransactionInfo, TransactionWithHash}; +use p2p::P2pNetworkPubsubMessageCacheId; use rand::prelude::*; use std::sync::Arc; use std::time::Duration; @@ -628,7 +629,7 @@ impl P2p { } )), on_p2p_pubsub_message_received: Some(redux::callback!( - on_p2p_pubsub_message_received((message_id: Vec)) -> crate::Action{ + on_p2p_pubsub_message_received((message_id: P2pNetworkPubsubMessageCacheId)) -> crate::Action{ P2pCallbacksAction::P2pPubsubValidateMessage { message_id } } )), diff --git a/p2p/src/network/pubsub/mod.rs b/p2p/src/network/pubsub/mod.rs index 26f1126729..f93cab0c71 100644 --- a/p2p/src/network/pubsub/mod.rs +++ b/p2p/src/network/pubsub/mod.rs @@ -7,7 +7,8 @@ pub use self::p2p_network_pubsub_actions::P2pNetworkPubsubAction; mod p2p_network_pubsub_state; pub use self::p2p_network_pubsub_state::{ - P2pNetworkPubsubClientState, P2pNetworkPubsubClientTopicState, P2pNetworkPubsubState, + P2pNetworkPubsubClientState, P2pNetworkPubsubClientTopicState, P2pNetworkPubsubMessageCacheId, + P2pNetworkPubsubState, }; #[cfg(feature = "p2p-libp2p")] @@ -25,6 +26,6 @@ pub enum BroadcastMessageId { hash: mina_p2p_messages::v2::StateHash, }, MessageId { - message_id: Vec, + message_id: P2pNetworkPubsubMessageCacheId, }, } diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs b/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs index 8c604623c9..d738e10913 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs @@ -1,4 +1,4 @@ -use super::{pb, BroadcastMessageId}; +use super::{p2p_network_pubsub_state::P2pNetworkPubsubMessageCacheId, pb, BroadcastMessageId}; use crate::{token::BroadcastAlgorithm, ConnectionAddr, Data, P2pState, PeerId, StreamId}; use mina_p2p_messages::gossip::GossipNetMessageV2; use openmina_core::ActionEvent; @@ -144,7 +144,7 @@ pub enum P2pNetworkPubsubAction { }, BroadcastValidationCallback { - message_id: Vec, + message_id: P2pNetworkPubsubMessageCacheId, }, BroadcastValidatedMessage { diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs b/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs index 479113a12f..3322380813 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs @@ -22,7 +22,7 @@ use super::{ }, pb::{self, Message}, P2pNetworkPubsubAction, P2pNetworkPubsubClientState, P2pNetworkPubsubEffectfulAction, - P2pNetworkPubsubState, TOPIC, + P2pNetworkPubsubMessageCacheId, P2pNetworkPubsubState, TOPIC, }; impl P2pNetworkPubsubState { @@ -503,7 +503,7 @@ impl P2pNetworkPubsubState { pubsub_state .mcache .map - .insert(message_id.clone(), new_message_state); + .insert(message_id, new_message_state); let dispatcher = state_context.into_dispatcher(); @@ -573,7 +573,7 @@ impl P2pNetworkPubsubState { None } }) - .collect::>>(); + .collect::>(); for message_id in messages { pubsub_state.mcache.remove_message(message_id); @@ -632,7 +632,7 @@ impl P2pNetworkPubsubState { fn reduce_incoming_validated_message( &mut self, - message_id: Vec, + message_id: P2pNetworkPubsubMessageCacheId, peer_id: PeerId, message: &Message, ) { @@ -654,7 +654,7 @@ impl P2pNetworkPubsubState { let ctr = state.message.control.get_or_insert_with(Default::default); ctr.ihave.push(pb::ControlIHave { topic_id: Some(message.topic.clone()), - message_ids: vec![message_id.clone()], + message_ids: vec![message_id.to_raw_bytes()], }) } }); @@ -808,7 +808,7 @@ impl P2pNetworkPubsubState { // Respond to iwant requests by publishing available messages from the cache. for iwant in iwant_requests { for msg_id in &iwant.message_ids { - if let Some(msg) = self.mcache.map.get(msg_id) { + if let Some(msg) = self.mcache.get_message_from_raw_message_id(msg_id) { if let Some(client) = self.clients.get_mut(peer_id) { client.publish(msg.message()); } diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_state.rs b/p2p/src/network/pubsub/p2p_network_pubsub_state.rs index fde61c75f9..8311cf26d7 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_state.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_state.rs @@ -56,13 +56,6 @@ pub struct P2pNetworkPubsubState { pub iwant: VecDeque, } -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct P2pNetworkPubsubBlockMessage { - pub message_id: Vec, - pub expiration_time: Timestamp, - pub peer_id: PeerId, -} - #[derive(Default, Serialize, Deserialize, Debug, Clone)] pub struct P2pNetworkPubsubIwantRequestCount { pub message_id: Vec, @@ -74,15 +67,19 @@ impl P2pNetworkPubsubState { self.clients.remove(peer_id); } - pub fn filter_iwant_message_ids(&mut self, message_id: &Vec, timestamp: Timestamp) -> bool { - if self.mcache.map.contains_key(message_id) { + pub fn filter_iwant_message_ids(&mut self, message_id: &[u8], timestamp: Timestamp) -> bool { + if self + .mcache + .get_message_from_raw_message_id(message_id) + .is_some() + { return false; } let message_count = self .iwant .iter_mut() - .find(|message| &message.message_id == message_id); + .find(|message| message.message_id == message_id); match message_count { Some(message) => { @@ -106,7 +103,7 @@ impl P2pNetworkPubsubState { } None => { let message_count = P2pNetworkPubsubIwantRequestCount { - message_id: message_id.to_owned(), + message_id: message_id.to_vec(), count: vec![timestamp], }; @@ -173,11 +170,11 @@ pub struct P2pNetworkPubsubClientState { impl P2pNetworkPubsubClientState { pub fn publish(&mut self, message: &pb::Message) { - let Ok(id) = compute_message_id(message) else { + let Ok(id) = P2pNetworkPubsubMessageCacheId::compute_message_id(message) else { self.message.publish.push(message.clone()); return; }; - if self.cache.map.insert(id.clone()) { + if self.cache.map.insert(id) { self.message.publish.push(message.clone()); } self.cache.queue.push_back(id); @@ -201,15 +198,15 @@ impl P2pNetworkPubsubClientState { #[derive(Default, Serialize, Deserialize, Debug, Clone)] pub struct P2pNetworkPubsubRecentlyPublishCache { - pub map: BTreeSet>, - pub queue: VecDeque>, + pub map: BTreeSet, + pub queue: VecDeque, } // TODO: store blocks, snarks and txs separately #[derive(Default, Serialize, Deserialize, Debug, Clone)] pub struct P2pNetworkPubsubMessageCache { - pub map: BTreeMap, P2pNetworkPubsubMessageCacheMessage>, - pub queue: VecDeque>, + pub map: BTreeMap, + pub queue: VecDeque, } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -244,6 +241,37 @@ pub enum P2pNetworkPubsubMessageCacheMessage { }, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy)] +pub struct P2pNetworkPubsubMessageCacheId { + pub source: libp2p_identity::PeerId, + pub seqno: u64, +} + +impl P2pNetworkPubsubMessageCacheId { + // TODO: what if wasm32? + // How to test it? + pub fn compute_message_id( + message: &pb::Message, + ) -> Result { + let source = source_from_message(message)?; + + let seqno = message + .seqno + .as_ref() + .and_then(|b| <[u8; 8]>::try_from(b.as_slice()).ok()) + .map(u64::from_be_bytes) + .unwrap_or_default(); + + Ok(P2pNetworkPubsubMessageCacheId { source, seqno }) + } + + pub fn to_raw_bytes(&self) -> Vec { + let mut message_id = self.source.to_base58(); + message_id.push_str(&self.seqno.to_string()); + message_id.into_bytes() + } +} + impl P2pNetworkPubsubMessageCacheMessage { pub fn message(&self) -> &pb::Message { match self { @@ -283,10 +311,10 @@ impl P2pNetworkPubsubMessageCache { content: GossipNetMessageV2, peer_id: PeerId, time: Timestamp, - ) -> Result, ParseError> { - let id = compute_message_id(&message)?; + ) -> Result { + let id = P2pNetworkPubsubMessageCacheId::compute_message_id(&message)?; self.map.insert( - id.clone(), + id, P2pNetworkPubsubMessageCacheMessage::Initial { message, content, @@ -295,7 +323,7 @@ impl P2pNetworkPubsubMessageCache { }, ); - self.queue.push_back(id.clone()); + self.queue.push_back(id); if self.queue.len() > Self::CAPACITY { if let Some(id) = self.queue.pop_front() { self.map.remove(&id); @@ -304,7 +332,7 @@ impl P2pNetworkPubsubMessageCache { Ok(id) } - pub fn get_message(&self, id: &Vec) -> Option<&GossipNetMessageV2> { + pub fn get_message(&self, id: &P2pNetworkPubsubMessageCacheId) -> Option<&GossipNetMessageV2> { let message = self.map.get(id)?; match message { P2pNetworkPubsubMessageCacheMessage::Initial { content, .. } => Some(content), @@ -315,7 +343,10 @@ impl P2pNetworkPubsubMessageCache { pub fn get_message_id_and_message( &mut self, message_id: BroadcastMessageId, - ) -> Option<(Vec, &mut P2pNetworkPubsubMessageCacheMessage)> { + ) -> Option<( + P2pNetworkPubsubMessageCacheId, + &mut P2pNetworkPubsubMessageCacheMessage, + )> { match message_id { super::BroadcastMessageId::BlockHash { hash } => { self.map @@ -324,7 +355,7 @@ impl P2pNetworkPubsubMessageCache { P2pNetworkPubsubMessageCacheMessage::PreValidatedBlockMessage { block_hash, .. - } if *block_hash == hash => Some((message_id.clone(), message)), + } if *block_hash == hash => Some((*message_id, message)), _ => None, }) } @@ -335,12 +366,25 @@ impl P2pNetworkPubsubMessageCache { } } - pub fn remove_message(&mut self, message_id: Vec) { + pub fn remove_message(&mut self, message_id: P2pNetworkPubsubMessageCacheId) { let _ = self.map.remove(&message_id); if let Some(position) = self.queue.iter().position(|id| id == &message_id) { self.queue.remove(position); } } + + pub fn get_message_from_raw_message_id( + &self, + message_id: &[u8], + ) -> Option<&P2pNetworkPubsubMessageCacheMessage> { + self.map.iter().find_map(|(key, value)| { + if key.to_raw_bytes() == message_id { + Some(value) + } else { + None + } + }) + } } pub fn source_from_message(message: &pb::Message) -> Result { @@ -353,21 +397,6 @@ pub fn source_from_message(message: &pb::Message) -> Result Result, ParseError> { - let mut source_string = source_from_message(message)?.to_base58(); - - let sequence_number = message - .seqno - .as_ref() - .and_then(|b| <[u8; 8]>::try_from(b.as_slice()).ok()) - .map(u64::from_be_bytes) - .unwrap_or_default(); - source_string.push_str(&sequence_number.to_string()); - Ok(source_string.into_bytes()) -} - #[derive(Default, Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct P2pNetworkPubsubClientTopicState { pub mesh: P2pNetworkPubsubClientMeshAddingState, diff --git a/p2p/src/p2p_state.rs b/p2p/src/p2p_state.rs index e083e11cbd..1d6f386f5b 100644 --- a/p2p/src/p2p_state.rs +++ b/p2p/src/p2p_state.rs @@ -33,8 +33,8 @@ use crate::{ identify::{P2pNetworkIdentify, P2pNetworkIdentifyState}, P2pNetworkState, }, - Limit, P2pConfig, P2pLimits, P2pNetworkKadState, P2pNetworkPubsubState, - P2pNetworkSchedulerState, P2pTimeouts, PeerId, + Limit, P2pConfig, P2pLimits, P2pNetworkKadState, P2pNetworkPubsubMessageCacheId, + P2pNetworkPubsubState, P2pNetworkSchedulerState, P2pTimeouts, PeerId, }; use mina_p2p_messages::v2; @@ -564,7 +564,7 @@ pub struct P2pCallbacks { OptionalCallback<(PeerId, P2pRpcId, Option)>, /// Callback for received pubsub message - pub on_p2p_pubsub_message_received: OptionalCallback>, + pub on_p2p_pubsub_message_received: OptionalCallback, } impl_substate_access!(P2pState, P2pNetworkState, network); From ee2186ed038c19718ae761d433ec3864c14a7ef7 Mon Sep 17 00:00:00 2001 From: Mimir Date: Wed, 15 Jan 2025 17:58:21 +0100 Subject: [PATCH 5/5] Review fixes --- node/src/action_kind.rs | 16 ++- .../p2p/callbacks/p2p_callbacks_reducer.rs | 44 ++++-- .../sync/transition_frontier_sync_effects.rs | 4 +- node/testing/src/scenarios/p2p/pubsub.rs | 2 +- .../pubsub/p2p_network_pubsub_actions.rs | 29 +++- .../pubsub/p2p_network_pubsub_reducer.rs | 132 +++++++++++------- .../pubsub/p2p_network_pubsub_state.rs | 40 +++--- 7 files changed, 174 insertions(+), 93 deletions(-) diff --git a/node/src/action_kind.rs b/node/src/action_kind.rs index 5b5fc2f71e..ef5ba28fa5 100644 --- a/node/src/action_kind.rs +++ b/node/src/action_kind.rs @@ -406,9 +406,9 @@ pub enum ActionKind { P2pNetworkPubsubBroadcast, P2pNetworkPubsubBroadcastSigned, P2pNetworkPubsubBroadcastValidatedMessage, - P2pNetworkPubsubBroadcastValidationCallback, P2pNetworkPubsubGraft, P2pNetworkPubsubHandleIncomingMessage, + P2pNetworkPubsubIgnoreMessage, P2pNetworkPubsubIncomingData, P2pNetworkPubsubIncomingMessage, P2pNetworkPubsubIncomingMessageCleanup, @@ -422,6 +422,7 @@ pub enum ActionKind { P2pNetworkPubsubRejectMessage, P2pNetworkPubsubSign, P2pNetworkPubsubSignError, + P2pNetworkPubsubValidateIncomingMessage, P2pNetworkPubsubValidateIncomingMessages, P2pNetworkPubsubEffectfulSign, P2pNetworkPubsubEffectfulValidateIncomingMessages, @@ -713,7 +714,7 @@ pub enum ActionKind { } impl ActionKind { - pub const COUNT: u16 = 603; + pub const COUNT: u16 = 604; } impl std::fmt::Display for ActionKind { @@ -1962,15 +1963,16 @@ impl ActionKindGet for P2pNetworkPubsubAction { Self::OutgoingMessageClear { .. } => ActionKind::P2pNetworkPubsubOutgoingMessageClear, Self::OutgoingMessageError { .. } => ActionKind::P2pNetworkPubsubOutgoingMessageError, Self::OutgoingData { .. } => ActionKind::P2pNetworkPubsubOutgoingData, - Self::BroadcastValidationCallback { .. } => { - ActionKind::P2pNetworkPubsubBroadcastValidationCallback + Self::HandleIncomingMessage { .. } => ActionKind::P2pNetworkPubsubHandleIncomingMessage, + Self::ValidateIncomingMessage { .. } => { + ActionKind::P2pNetworkPubsubValidateIncomingMessage } + Self::PruneMessages { .. } => ActionKind::P2pNetworkPubsubPruneMessages, + Self::RejectMessage { .. } => ActionKind::P2pNetworkPubsubRejectMessage, + Self::IgnoreMessage { .. } => ActionKind::P2pNetworkPubsubIgnoreMessage, Self::BroadcastValidatedMessage { .. } => { ActionKind::P2pNetworkPubsubBroadcastValidatedMessage } - Self::HandleIncomingMessage { .. } => ActionKind::P2pNetworkPubsubHandleIncomingMessage, - Self::PruneMessages { .. } => ActionKind::P2pNetworkPubsubPruneMessages, - Self::RejectMessage { .. } => ActionKind::P2pNetworkPubsubRejectMessage, } } } diff --git a/node/src/p2p/callbacks/p2p_callbacks_reducer.rs b/node/src/p2p/callbacks/p2p_callbacks_reducer.rs index 184bd1984f..ce92ae69a6 100644 --- a/node/src/p2p/callbacks/p2p_callbacks_reducer.rs +++ b/node/src/p2p/callbacks/p2p_callbacks_reducer.rs @@ -304,6 +304,7 @@ impl crate::State { .mcache .get_message(message_id) }) else { + bug_condition!("Failed to find message for id: {:?}", message_id); return; }; @@ -314,15 +315,27 @@ impl crate::State { let allow_block_too_late = allow_block_too_late(state, &block); match state.prevalidate_block(&block, allow_block_too_late) { Ok(()) => PreValidationResult::Continue, - Err(BlockPrevalidationError::ReceivedTooEarly { .. }) => { - PreValidationResult::Ignore + Err(error) + if matches!( + error, + BlockPrevalidationError::ReceivedTooEarly { .. } + ) => + { + PreValidationResult::Ignore { + reason: format!( + "Block prevalidation failed: {:?}", + error + ), + } } - Err(_) => PreValidationResult::Reject, + Err(error) => PreValidationResult::Reject { + reason: format!("Block prevalidation failed: {:?}", error), + }, } } Err(_) => { log::error!(time; "P2pCallbacksAction::P2pPubsubValidateMessage: Invalid bigint in block"); - PreValidationResult::Reject + PreValidationResult::Reject{reason: "P2pCallbacksAction::P2pPubsubValidateMessage: Invalid bigint in block".to_owned()} } } } @@ -334,18 +347,27 @@ impl crate::State { match pre_validation_result { PreValidationResult::Continue => { - dispatcher.push(P2pNetworkPubsubAction::BroadcastValidationCallback { + dispatcher.push(P2pNetworkPubsubAction::ValidateIncomingMessage { message_id: *message_id, }); } - PreValidationResult::Reject => { + PreValidationResult::Reject { reason } => { dispatcher.push(P2pNetworkPubsubAction::RejectMessage { - message_id: p2p::BroadcastMessageId::MessageId { + message_id: Some(p2p::BroadcastMessageId::MessageId { message_id: *message_id, - }, + }), + peer_id: None, + reason, + }); + } + PreValidationResult::Ignore { reason } => { + dispatcher.push(P2pNetworkPubsubAction::IgnoreMessage { + message_id: Some(p2p::BroadcastMessageId::MessageId { + message_id: *message_id, + }), + reason, }); } - PreValidationResult::Ignore => {} } } } @@ -635,6 +657,6 @@ impl crate::State { enum PreValidationResult { Continue, - Reject, - Ignore, + Reject { reason: String }, + Ignore { reason: String }, } diff --git a/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs b/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs index aa0bd8b0d4..54e26eca88 100644 --- a/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs +++ b/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs @@ -306,7 +306,9 @@ impl TransitionFrontierSyncAction { store.dispatch(TransitionFrontierAction::SyncFailed { best_tip, error }); // TODO this should be handled by a callback store.dispatch(P2pNetworkPubsubAction::RejectMessage { - message_id: p2p::BroadcastMessageId::BlockHash { hash: hash.clone() }, + message_id: Some(p2p::BroadcastMessageId::BlockHash { hash: hash.clone() }), + peer_id: None, + reason: "Failed to apply block".to_owned(), }); } TransitionFrontierSyncAction::BlocksNextApplySuccess { diff --git a/node/testing/src/scenarios/p2p/pubsub.rs b/node/testing/src/scenarios/p2p/pubsub.rs index fa5ccfa6c0..e6fbf6ad8b 100644 --- a/node/testing/src/scenarios/p2p/pubsub.rs +++ b/node/testing/src/scenarios/p2p/pubsub.rs @@ -47,7 +47,7 @@ impl P2pReceiveMessage { node == receiver_openmina_node && matches!( action.action().kind(), - ActionKind::P2pNetworkPubsubBroadcastValidationCallback + ActionKind::P2pNetworkPubsubValidateIncomingMessage ) }), ) diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs b/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs index d738e10913..e75f06bfb9 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs @@ -143,21 +143,31 @@ pub enum P2pNetworkPubsubAction { peer_id: PeerId, }, - BroadcastValidationCallback { - message_id: P2pNetworkPubsubMessageCacheId, - }, - - BroadcastValidatedMessage { - message_id: BroadcastMessageId, - }, HandleIncomingMessage { message: pb::Message, message_content: GossipNetMessageV2, peer_id: PeerId, }, + + ValidateIncomingMessage { + message_id: P2pNetworkPubsubMessageCacheId, + }, + /// Delete expired messages from state PruneMessages {}, + RejectMessage { + message_id: Option, + peer_id: Option, + reason: String, + }, + IgnoreMessage { + message_id: Option, + reason: String, + }, + + // After message is fully validated, broadcast it to other peers + BroadcastValidatedMessage { message_id: BroadcastMessageId, }, } @@ -180,6 +190,11 @@ impl redux::EnablingCondition for P2pNetworkPubsubAction { .topics .get(topic_id) .map_or(false, |topics| topics.contains_key(peer_id)), + P2pNetworkPubsubAction::BroadcastValidatedMessage { message_id } + | P2pNetworkPubsubAction::RejectMessage { + message_id: Some(message_id), + .. + } => pubsub.mcache.contains_broadcast_id(message_id), _ => true, } } diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs b/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs index 3322380813..5509a1e551 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs @@ -25,6 +25,8 @@ use super::{ P2pNetworkPubsubMessageCacheId, P2pNetworkPubsubState, TOPIC, }; +const MAX_MESSAGE_KEEP_DURATION: Duration = Duration::from_secs(300); + impl P2pNetworkPubsubState { pub fn reducer( mut state_context: Substate, @@ -184,9 +186,10 @@ impl P2pNetworkPubsubState { // Check that if we can extract source from message, this is pre check if source_from_message(&message).is_err() { let dispatcher = state_context.into_dispatcher(); - dispatcher.push(P2pDisconnectionAction::Init { - peer_id, - reason: P2pDisconnectionReason::InvalidMessage, + dispatcher.push(P2pNetworkPubsubAction::RejectMessage { + message_id: None, + peer_id: Some(peer_id), + reason: "Invalid originator in message".to_owned(), }); return Ok(()); } @@ -218,14 +221,18 @@ impl P2pNetworkPubsubState { } // This happens if message was already seen - let Some(message_content) = message_content else { - return Ok(()); + if let Some(message_content) = message_content { + dispatcher.push(P2pNetworkPubsubAction::HandleIncomingMessage { + message, + message_content, + peer_id, + }); + } else { + dispatcher.push(P2pNetworkPubsubAction::IgnoreMessage { + message_id: None, + reason: "Message already seen".to_owned(), + }); }; - dispatcher.push(P2pNetworkPubsubAction::HandleIncomingMessage { - message, - message_content, - peer_id, - }); Ok(()) } @@ -249,8 +256,7 @@ impl P2pNetworkPubsubState { if let Some(callback) = p2p_state.callbacks.on_p2p_pubsub_message_received.clone() { dispatcher.push_callback(callback, message_id); } else { - dispatcher - .push(P2pNetworkPubsubAction::BroadcastValidationCallback { message_id }); + dispatcher.push(P2pNetworkPubsubAction::ValidateIncomingMessage { message_id }); } Ok(()) } @@ -468,19 +474,22 @@ impl P2pNetworkPubsubState { } Ok(()) } - P2pNetworkPubsubAction::BroadcastValidationCallback { message_id } => { + P2pNetworkPubsubAction::ValidateIncomingMessage { message_id } => { let Some(message) = pubsub_state.mcache.map.remove(&message_id) else { + bug_condition!("Message with id: {:?} not found", message_id); return Ok(()); }; - let P2pNetworkPubsubMessageCacheMessage::Initial { + let P2pNetworkPubsubMessageCacheMessage::Init { message, content, time, peer_id, } = message else { - bug_condition!("`P2pNetworkPubsubAction::BroadcastValidationCallback` called on invalid state"); + bug_condition!( + "`P2pNetworkPubsubAction::ValidateIncomingMessage` called on invalid state" + ); return Ok(()); }; @@ -542,20 +551,33 @@ impl P2pNetworkPubsubState { Ok(()) } P2pNetworkPubsubAction::BroadcastValidatedMessage { message_id } => { - let Some((message_id, message)) = - pubsub_state.mcache.get_message_id_and_message(message_id) + let Some((mcache_message_id, message)) = + pubsub_state.mcache.get_message_id_and_message(&message_id) else { + bug_condition!("Message with id: {:?} not found", message_id); return Ok(()); }; - - let peer_id = message.peer_id(); let raw_message = message.message().clone(); + let peer_id = message.peer_id(); + + pubsub_state.reduce_incoming_validated_message( + mcache_message_id, + peer_id, + &raw_message, + ); + + let Some((_message_id, message)) = + pubsub_state.mcache.get_message_id_and_message(&message_id) + else { + bug_condition!("Message with id: {:?} not found", message_id); + return Ok(()); + }; + *message = P2pNetworkPubsubMessageCacheMessage::Validated { - message: raw_message.clone(), + message: raw_message, peer_id, time: message.time(), }; - pubsub_state.reduce_incoming_validated_message(message_id, peer_id, &raw_message); let (dispatcher, state) = state_context.into_dispatcher_and_state(); @@ -567,7 +589,7 @@ impl P2pNetworkPubsubState { .map .iter() .filter_map(|(message_id, message)| { - if message.time() + Duration::from_secs(300) > time { + if message.time() + MAX_MESSAGE_KEEP_DURATION > time { Some(message_id.to_owned()) } else { None @@ -580,28 +602,39 @@ impl P2pNetworkPubsubState { } Ok(()) } - P2pNetworkPubsubAction::RejectMessage { message_id } => { - let Some((_message_id, message)) = - pubsub_state.mcache.get_message_id_and_message(message_id) - else { - return Ok(()); - }; + P2pNetworkPubsubAction::RejectMessage { + message_id, + peer_id, + .. + } => { + let mut peer_id = peer_id; + if let Some(message_id) = message_id { + let Some((_message_id, message)) = + pubsub_state.mcache.get_message_id_and_message(&message_id) + else { + bug_condition!("Message not found for id: {:?}", message_id); + return Ok(()); + }; - let peer_id = message.peer_id(); - *message = P2pNetworkPubsubMessageCacheMessage::Rejected { - message: message.message().clone(), - peer_id, - time: message.time(), - }; + if peer_id.is_none() { + peer_id = Some(message.peer_id()); + } + + pubsub_state.mcache.remove_message(_message_id); + } let dispatcher = state_context.into_dispatcher(); - dispatcher.push(P2pDisconnectionAction::Init { - peer_id, - reason: P2pDisconnectionReason::InvalidMessage, - }); + + if let Some(peer_id) = peer_id { + dispatcher.push(P2pDisconnectionAction::Init { + peer_id, + reason: P2pDisconnectionReason::InvalidMessage, + }); + } Ok(()) } + P2pNetworkPubsubAction::IgnoreMessage { .. } => Ok(()), } } @@ -683,17 +716,20 @@ impl P2pNetworkPubsubState { message: &Message, seen_limit: usize, ) -> Result, String> { - if let Some(signature) = &message.signature { - // skip recently seen message - if !self.seen.contains(signature) { - self.seen.push_back(signature.clone()); - // keep only last `n` to avoid memory leak - if self.seen.len() > seen_limit { - self.seen.pop_front(); - } - } else { - return Ok(None); + let Some(signature) = &message.signature else { + bug_condition!("Validation failed: missing signature"); + return Ok(None); + }; + + // skip recently seen message + if !self.seen.contains(signature) { + self.seen.push_back(signature.clone()); + // keep only last `n` to avoid memory leak + if self.seen.len() > seen_limit { + self.seen.pop_front(); } + } else { + return Ok(None); } match &message.data { diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_state.rs b/p2p/src/network/pubsub/p2p_network_pubsub_state.rs index 8311cf26d7..80f687172b 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_state.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_state.rs @@ -211,7 +211,7 @@ pub struct P2pNetworkPubsubMessageCache { #[derive(Serialize, Deserialize, Debug, Clone)] pub enum P2pNetworkPubsubMessageCacheMessage { - Initial { + Init { message: pb::Message, content: GossipNetMessageV2, peer_id: PeerId, @@ -229,11 +229,6 @@ pub enum P2pNetworkPubsubMessageCacheMessage { peer_id: PeerId, time: Timestamp, }, - Rejected { - message: pb::Message, - peer_id: PeerId, - time: Timestamp, - }, Validated { message: pb::Message, peer_id: PeerId, @@ -275,28 +270,25 @@ impl P2pNetworkPubsubMessageCacheId { impl P2pNetworkPubsubMessageCacheMessage { pub fn message(&self) -> &pb::Message { match self { - Self::Initial { message, .. } => message, + Self::Init { message, .. } => message, Self::PreValidated { message, .. } => message, Self::PreValidatedBlockMessage { message, .. } => message, - Self::Rejected { message, .. } => message, Self::Validated { message, .. } => message, } } pub fn time(&self) -> Timestamp { *match self { - Self::Initial { time, .. } => time, + Self::Init { time, .. } => time, Self::PreValidated { time, .. } => time, Self::PreValidatedBlockMessage { time, .. } => time, - Self::Rejected { time, .. } => time, Self::Validated { time, .. } => time, } } pub fn peer_id(&self) -> PeerId { *match self { - Self::Initial { peer_id, .. } => peer_id, + Self::Init { peer_id, .. } => peer_id, Self::PreValidated { peer_id, .. } => peer_id, Self::PreValidatedBlockMessage { peer_id, .. } => peer_id, - Self::Rejected { peer_id, .. } => peer_id, Self::Validated { peer_id, .. } => peer_id, } } @@ -315,7 +307,7 @@ impl P2pNetworkPubsubMessageCache { let id = P2pNetworkPubsubMessageCacheId::compute_message_id(&message)?; self.map.insert( id, - P2pNetworkPubsubMessageCacheMessage::Initial { + P2pNetworkPubsubMessageCacheMessage::Init { message, content, time, @@ -335,14 +327,26 @@ impl P2pNetworkPubsubMessageCache { pub fn get_message(&self, id: &P2pNetworkPubsubMessageCacheId) -> Option<&GossipNetMessageV2> { let message = self.map.get(id)?; match message { - P2pNetworkPubsubMessageCacheMessage::Initial { content, .. } => Some(content), + P2pNetworkPubsubMessageCacheMessage::Init { content, .. } => Some(content), _ => None, } } + pub fn contains_broadcast_id(&self, message_id: &BroadcastMessageId) -> bool { + match message_id { + super::BroadcastMessageId::BlockHash { hash } => self + .map + .values() + .any(|message| matches!(message, P2pNetworkPubsubMessageCacheMessage::PreValidatedBlockMessage { block_hash, .. } if block_hash == hash)), + super::BroadcastMessageId::MessageId { message_id } => { + self.map.contains_key(message_id) + } + } + } + pub fn get_message_id_and_message( &mut self, - message_id: BroadcastMessageId, + message_id: &BroadcastMessageId, ) -> Option<( P2pNetworkPubsubMessageCacheId, &mut P2pNetworkPubsubMessageCacheMessage, @@ -355,14 +359,14 @@ impl P2pNetworkPubsubMessageCache { P2pNetworkPubsubMessageCacheMessage::PreValidatedBlockMessage { block_hash, .. - } if *block_hash == hash => Some((*message_id, message)), + } if block_hash == hash => Some((*message_id, message)), _ => None, }) } super::BroadcastMessageId::MessageId { message_id } => self .map - .get_mut(&message_id) - .map(|content| (message_id, content)), + .get_mut(message_id) + .map(|content| (*message_id, content)), } }