diff --git a/docs/testing/README.md b/docs/testing/README.md index 3fe8f6d31b..f1af329259 100644 --- a/docs/testing/README.md +++ b/docs/testing/README.md @@ -103,8 +103,8 @@ Test that node discovers peers another rust node and is able to bootstrap Tests related to pubsub layer. -* `P2pReceiveBlock` -Test that node receives block over meshsub from node +* `P2pReceiveMessage` +Test that node receives message over meshsub from node ### [P2P Incoming](../../node/testing/tests/p2p_basic_incoming.rs) diff --git a/node/src/action_kind.rs b/node/src/action_kind.rs index 347ea5078e..ef5ba28fa5 100644 --- a/node/src/action_kind.rs +++ b/node/src/action_kind.rs @@ -206,6 +206,7 @@ pub enum ActionKind { P2pCallbacksP2pChannelsStreamingRpcResponseReceived, P2pCallbacksP2pChannelsStreamingRpcTimeout, P2pCallbacksP2pDisconnection, + P2pCallbacksP2pPubsubValidateMessage, P2pCallbacksRpcRespondBestTip, P2pChannelsBestTipInit, P2pChannelsBestTipPending, @@ -404,7 +405,10 @@ pub enum ActionKind { P2pNetworkPnetEffectfulSetupNonce, P2pNetworkPubsubBroadcast, P2pNetworkPubsubBroadcastSigned, + P2pNetworkPubsubBroadcastValidatedMessage, P2pNetworkPubsubGraft, + P2pNetworkPubsubHandleIncomingMessage, + P2pNetworkPubsubIgnoreMessage, P2pNetworkPubsubIncomingData, P2pNetworkPubsubIncomingMessage, P2pNetworkPubsubIncomingMessageCleanup, @@ -414,8 +418,11 @@ pub enum ActionKind { P2pNetworkPubsubOutgoingMessageClear, P2pNetworkPubsubOutgoingMessageError, P2pNetworkPubsubPrune, + P2pNetworkPubsubPruneMessages, + P2pNetworkPubsubRejectMessage, P2pNetworkPubsubSign, P2pNetworkPubsubSignError, + P2pNetworkPubsubValidateIncomingMessage, P2pNetworkPubsubValidateIncomingMessages, P2pNetworkPubsubEffectfulSign, P2pNetworkPubsubEffectfulValidateIncomingMessages, @@ -707,7 +714,7 @@ pub enum ActionKind { } impl ActionKind { - pub const COUNT: u16 = 597; + pub const COUNT: u16 = 604; } impl std::fmt::Display for ActionKind { @@ -809,6 +816,9 @@ impl ActionKindGet for P2pCallbacksAction { } Self::P2pDisconnection { .. } => ActionKind::P2pCallbacksP2pDisconnection, Self::RpcRespondBestTip { .. } => ActionKind::P2pCallbacksRpcRespondBestTip, + Self::P2pPubsubValidateMessage { .. } => { + ActionKind::P2pCallbacksP2pPubsubValidateMessage + } } } } @@ -1953,6 +1963,16 @@ impl ActionKindGet for P2pNetworkPubsubAction { Self::OutgoingMessageClear { .. } => ActionKind::P2pNetworkPubsubOutgoingMessageClear, Self::OutgoingMessageError { .. } => ActionKind::P2pNetworkPubsubOutgoingMessageError, Self::OutgoingData { .. } => ActionKind::P2pNetworkPubsubOutgoingData, + Self::HandleIncomingMessage { .. } => ActionKind::P2pNetworkPubsubHandleIncomingMessage, + Self::ValidateIncomingMessage { .. } => { + ActionKind::P2pNetworkPubsubValidateIncomingMessage + } + Self::PruneMessages { .. } => ActionKind::P2pNetworkPubsubPruneMessages, + Self::RejectMessage { .. } => ActionKind::P2pNetworkPubsubRejectMessage, + Self::IgnoreMessage { .. } => ActionKind::P2pNetworkPubsubIgnoreMessage, + Self::BroadcastValidatedMessage { .. } => { + ActionKind::P2pNetworkPubsubBroadcastValidatedMessage + } } } } diff --git a/node/src/consensus/consensus_reducer.rs b/node/src/consensus/consensus_reducer.rs index 0f7c977c1e..fcbc760c79 100644 --- a/node/src/consensus/consensus_reducer.rs +++ b/node/src/consensus/consensus_reducer.rs @@ -345,7 +345,7 @@ impl ConsensusState { /// Ideally we would differentiate between requested blocks and blocks /// received from gossip, but this difference doesn't really exist /// in the WebRTC transport, hence this heuristic. -fn allow_block_too_late(state: &crate::State, block: &ArcBlockWithHash) -> bool { +pub fn allow_block_too_late(state: &crate::State, block: &ArcBlockWithHash) -> bool { let (has_greater_blobal_slot, diff_with_best_tip) = state .transition_frontier .best_tip() diff --git a/node/src/consensus/mod.rs b/node/src/consensus/mod.rs index da66985907..445b2af309 100644 --- a/node/src/consensus/mod.rs +++ b/node/src/consensus/mod.rs @@ -5,3 +5,4 @@ mod consensus_actions; pub use consensus_actions::*; mod consensus_reducer; +pub use consensus_reducer::allow_block_too_late; diff --git a/node/src/p2p/callbacks/p2p_callbacks_actions.rs b/node/src/p2p/callbacks/p2p_callbacks_actions.rs index 92078b9fac..46fa16c04a 100644 --- a/node/src/p2p/callbacks/p2p_callbacks_actions.rs +++ b/node/src/p2p/callbacks/p2p_callbacks_actions.rs @@ -4,7 +4,7 @@ use p2p::{ rpc::{P2pRpcId, P2pRpcRequest, P2pRpcResponse}, streaming_rpc::P2pStreamingRpcResponseFull, }, - PeerId, + P2pNetworkPubsubMessageCacheId, PeerId, }; use serde::{Deserialize, Serialize}; @@ -46,6 +46,9 @@ pub enum P2pCallbacksAction { RpcRespondBestTip { peer_id: PeerId, }, + P2pPubsubValidateMessage { + message_id: P2pNetworkPubsubMessageCacheId, + }, } impl redux::EnablingCondition for P2pCallbacksAction { @@ -63,6 +66,7 @@ impl redux::EnablingCondition for P2pCallbacksAction { P2pCallbacksAction::RpcRespondBestTip { .. } => { state.transition_frontier.best_tip().is_some() } + P2pCallbacksAction::P2pPubsubValidateMessage { .. } => true, } } } diff --git a/node/src/p2p/callbacks/p2p_callbacks_reducer.rs b/node/src/p2p/callbacks/p2p_callbacks_reducer.rs index 0eb4826b90..ce92ae69a6 100644 --- a/node/src/p2p/callbacks/p2p_callbacks_reducer.rs +++ b/node/src/p2p/callbacks/p2p_callbacks_reducer.rs @@ -1,6 +1,9 @@ use ark_ff::fields::arithmetic::InvalidBigInt; -use mina_p2p_messages::v2::{MinaLedgerSyncLedgerAnswerStableV2, StateHash}; -use openmina_core::{block::BlockWithHash, bug_condition, transaction::TransactionWithHash}; +use mina_p2p_messages::{ + gossip::GossipNetMessageV2, + v2::{MinaLedgerSyncLedgerAnswerStableV2, StateHash}, +}; +use openmina_core::{block::BlockWithHash, bug_condition, log, transaction::TransactionWithHash}; use p2p::{ channels::{ best_tip::P2pChannelsBestTipAction, @@ -8,13 +11,15 @@ use p2p::{ streaming_rpc::P2pStreamingRpcResponseFull, }, disconnection::{P2pDisconnectionAction, P2pDisconnectionReason}, - PeerId, + P2pNetworkPubsubAction, PeerId, }; use redux::{ActionMeta, ActionWithMeta, Dispatcher}; use crate::{ + consensus::allow_block_too_late, p2p_ready, snark_pool::candidate::SnarkPoolCandidateAction, + state::BlockPrevalidationError, transaction_pool::candidate::TransactionPoolCandidateAction, transition_frontier::sync::{ ledger::{ @@ -48,6 +53,7 @@ impl crate::State { action: ActionWithMeta<&P2pCallbacksAction>, ) { let (action, meta) = action.split(); + let time = meta.time(); let (dispatcher, state) = state_context.into_dispatcher_and_state(); match action { @@ -290,6 +296,80 @@ impl crate::State { best_tip: best_tip.clone(), }); } + P2pCallbacksAction::P2pPubsubValidateMessage { message_id } => { + let Some(message_content) = state.p2p.ready().and_then(|p2p| { + p2p.network + .scheduler + .broadcast_state + .mcache + .get_message(message_id) + }) else { + bug_condition!("Failed to find message for id: {:?}", message_id); + return; + }; + + let pre_validation_result = match message_content { + GossipNetMessageV2::NewState(new_best_tip) => { + match BlockWithHash::try_new(new_best_tip.clone()) { + Ok(block) => { + let allow_block_too_late = allow_block_too_late(state, &block); + match state.prevalidate_block(&block, allow_block_too_late) { + Ok(()) => PreValidationResult::Continue, + Err(error) + if matches!( + error, + BlockPrevalidationError::ReceivedTooEarly { .. } + ) => + { + PreValidationResult::Ignore { + reason: format!( + "Block prevalidation failed: {:?}", + error + ), + } + } + Err(error) => PreValidationResult::Reject { + reason: format!("Block prevalidation failed: {:?}", error), + }, + } + } + Err(_) => { + log::error!(time; "P2pCallbacksAction::P2pPubsubValidateMessage: Invalid bigint in block"); + PreValidationResult::Reject{reason: "P2pCallbacksAction::P2pPubsubValidateMessage: Invalid bigint in block".to_owned()} + } + } + } + _ => { + // TODO: add pre validation for Snark pool and Transaction pool diffs + PreValidationResult::Continue + } + }; + + match pre_validation_result { + PreValidationResult::Continue => { + dispatcher.push(P2pNetworkPubsubAction::ValidateIncomingMessage { + message_id: *message_id, + }); + } + PreValidationResult::Reject { reason } => { + dispatcher.push(P2pNetworkPubsubAction::RejectMessage { + message_id: Some(p2p::BroadcastMessageId::MessageId { + message_id: *message_id, + }), + peer_id: None, + reason, + }); + } + PreValidationResult::Ignore { reason } => { + dispatcher.push(P2pNetworkPubsubAction::IgnoreMessage { + message_id: Some(p2p::BroadcastMessageId::MessageId { + message_id: *message_id, + }), + reason, + }); + } + } + } } } @@ -574,3 +654,9 @@ impl crate::State { } } } + +enum PreValidationResult { + Continue, + Reject { reason: String }, + Ignore { reason: String }, +} diff --git a/node/src/state.rs b/node/src/state.rs index 293301906f..ad063de2da 100644 --- a/node/src/state.rs +++ b/node/src/state.rs @@ -1,10 +1,10 @@ -use std::sync::Arc; -use std::time::Duration; - use mina_p2p_messages::v2; use openmina_core::constants::PROTOCOL_VERSION; use openmina_core::transaction::{TransactionInfo, TransactionWithHash}; +use p2p::P2pNetworkPubsubMessageCacheId; use rand::prelude::*; +use std::sync::Arc; +use std::time::Duration; use openmina_core::block::BlockWithHash; use openmina_core::requests::RpcId; @@ -628,6 +628,11 @@ impl P2p { P2pCallbacksAction::P2pChannelsStreamingRpcTimeout { peer_id, id } } )), + on_p2p_pubsub_message_received: Some(redux::callback!( + on_p2p_pubsub_message_received((message_id: P2pNetworkPubsubMessageCacheId)) -> crate::Action{ + P2pCallbacksAction::P2pPubsubValidateMessage { message_id } + } + )), } } diff --git a/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs b/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs index ac92f9569c..54e26eca88 100644 --- a/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs +++ b/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs @@ -1,7 +1,7 @@ use mina_p2p_messages::v2::LedgerHash; use openmina_core::block::{AppliedBlock, ArcBlockWithHash}; use p2p::channels::rpc::{P2pChannelsRpcAction, P2pRpcId}; -use p2p::PeerId; +use p2p::{P2pNetworkPubsubAction, PeerId}; use redux::ActionMeta; use crate::ledger::write::{LedgerWriteAction, LedgerWriteRequest}; @@ -304,6 +304,12 @@ impl TransitionFrontierSyncAction { }; let error = SyncError::BlockApplyFailed(failed_block.clone(), error.clone()); store.dispatch(TransitionFrontierAction::SyncFailed { best_tip, error }); + // TODO this should be handled by a callback + store.dispatch(P2pNetworkPubsubAction::RejectMessage { + message_id: Some(p2p::BroadcastMessageId::BlockHash { hash: hash.clone() }), + peer_id: None, + reason: "Failed to apply block".to_owned(), + }); } TransitionFrontierSyncAction::BlocksNextApplySuccess { hash, @@ -316,6 +322,11 @@ impl TransitionFrontierSyncAction { if !store.dispatch(TransitionFrontierSyncAction::BlocksNextApplyInit) { store.dispatch(TransitionFrontierSyncAction::BlocksSuccess); } + + // TODO this should be handled by a callback + store.dispatch(P2pNetworkPubsubAction::BroadcastValidatedMessage { + message_id: p2p::BroadcastMessageId::BlockHash { hash: hash.clone() }, + }); } TransitionFrontierSyncAction::BlocksSuccess => {} // Bootstrap/Catchup is practically complete at this point. diff --git a/node/testing/src/scenarios/mod.rs b/node/testing/src/scenarios/mod.rs index 9bacc6dbe1..06ba500a8f 100644 --- a/node/testing/src/scenarios/mod.rs +++ b/node/testing/src/scenarios/mod.rs @@ -51,7 +51,7 @@ use self::p2p::basic_outgoing_connections::{ MakeMultipleOutgoingConnections, MakeOutgoingConnection, }; use self::p2p::kademlia::KademliaBootstrap; -use self::p2p::pubsub::P2pReceiveBlock; +use self::p2p::pubsub::P2pReceiveMessage; use self::p2p::signaling::P2pSignaling; use self::record_replay::block_production::RecordReplayBlockProduction; use self::record_replay::bootstrap::RecordReplayBootstrap; @@ -83,7 +83,7 @@ pub enum Scenarios { MultiNodeBasicConnectivityPeerDiscovery(MultiNodeBasicConnectivityPeerDiscovery), SimulationSmall(SimulationSmall), SimulationSmallForeverRealTime(SimulationSmallForeverRealTime), - P2pReceiveBlock(P2pReceiveBlock), + P2pReceiveMessage(P2pReceiveMessage), P2pSignaling(P2pSignaling), P2pConnectionDiscoveryRustNodeAsSeed(P2pConnectionDiscoveryRustNodeAsSeed), MultiNodePubsubPropagateBlock(MultiNodePubsubPropagateBlock), @@ -189,7 +189,7 @@ impl Scenarios { } Self::SimulationSmall(_) => SimulationSmall::DOCS, Self::SimulationSmallForeverRealTime(_) => SimulationSmallForeverRealTime::DOCS, - Self::P2pReceiveBlock(_) => P2pReceiveBlock::DOCS, + Self::P2pReceiveMessage(_) => P2pReceiveMessage::DOCS, Self::P2pSignaling(_) => P2pSignaling::DOCS, Self::P2pConnectionDiscoveryRustNodeAsSeed(_) => { P2pConnectionDiscoveryRustNodeAsSeed::DOCS @@ -260,7 +260,7 @@ impl Scenarios { Self::MultiNodeBasicConnectivityPeerDiscovery(v) => v.run(runner).await, Self::SimulationSmall(v) => v.run(runner).await, Self::SimulationSmallForeverRealTime(v) => v.run(runner).await, - Self::P2pReceiveBlock(v) => v.run(runner).await, + Self::P2pReceiveMessage(v) => v.run(runner).await, Self::P2pSignaling(v) => v.run(runner).await, Self::P2pConnectionDiscoveryRustNodeAsSeed(v) => v.run(runner).await, Self::MultiNodePubsubPropagateBlock(v) => v.run(runner).await, diff --git a/node/testing/src/scenarios/p2p/pubsub.rs b/node/testing/src/scenarios/p2p/pubsub.rs index afd9635267..e6fbf6ad8b 100644 --- a/node/testing/src/scenarios/p2p/pubsub.rs +++ b/node/testing/src/scenarios/p2p/pubsub.rs @@ -1,30 +1,26 @@ use std::time::Duration; +use node::ActionKind; + use crate::{ hosts, node::RustNodeTestingConfig, scenarios::{ClusterRunner, RunCfg, RunCfgAdvanceTime}, }; -/// Receive a block via meshsub +/// Receive a message via meshsub /// 1. Create a normal node with default devnet config, with devnet peers as initial peers /// 2. Wait for 2 minutes /// 3. Create a node with discovery disabled and first node as only peer -/// 4. Wait for first node to broadcast block to second one +/// 4. Wait for first node to broadcast message to second one #[derive(documented::Documented, Default, Clone, Copy)] -pub struct P2pReceiveBlock; +pub struct P2pReceiveMessage; -impl P2pReceiveBlock { +impl P2pReceiveMessage { pub async fn run(self, mut runner: ClusterRunner<'_>) { let config = RustNodeTestingConfig::devnet_default().initial_peers(hosts::devnet()); let retransmitter_openmina_node = runner.add_rust_node(config); - let retransmitter_peer_id = runner - .node(retransmitter_openmina_node) - .unwrap() - .state() - .p2p - .my_id(); let _ = runner .run( @@ -47,22 +43,15 @@ impl P2pReceiveBlock { RunCfg::default() .timeout(Duration::from_secs(60 * 30)) .advance_time(RunCfgAdvanceTime::Real) - .action_handler(move |node, state, _, _| { - let Some(p2p) = state.p2p.ready() else { - return false; - }; - + .action_handler(move |node, _state, _, action| { node == receiver_openmina_node - && p2p - .network - .scheduler - .broadcast_state - .incoming_block - .as_ref() - .map_or(false, |(peer_id, _)| peer_id.eq(&retransmitter_peer_id)) + && matches!( + action.action().kind(), + ActionKind::P2pNetworkPubsubValidateIncomingMessage + ) }), ) .await - .expect("Failed to receive block"); + .expect("Test failed"); } } diff --git a/node/testing/tests/p2p_pubsub.rs b/node/testing/tests/p2p_pubsub.rs index a870b346bc..025853d646 100644 --- a/node/testing/tests/p2p_pubsub.rs +++ b/node/testing/tests/p2p_pubsub.rs @@ -1,5 +1,5 @@ -use openmina_node_testing::scenarios::p2p::pubsub::P2pReceiveBlock; +use openmina_node_testing::scenarios::p2p::pubsub::P2pReceiveMessage; mod common; -scenario_test!(pubsub_receive_block, P2pReceiveBlock, P2pReceiveBlock); +scenario_test!(pubsub_receive_block, P2pReceiveMessage, P2pReceiveMessage); diff --git a/p2p/src/disconnection/mod.rs b/p2p/src/disconnection/mod.rs index 29debe3d77..a2fbc649c5 100644 --- a/p2p/src/disconnection/mod.rs +++ b/p2p/src/disconnection/mod.rs @@ -41,4 +41,6 @@ pub enum P2pDisconnectionReason { Timeout, #[error("rpc protocol not supported")] Unsupported, + #[error("invalid pubsub message")] + InvalidMessage, } diff --git a/p2p/src/network/pubsub/mod.rs b/p2p/src/network/pubsub/mod.rs index 9f9bdbc94e..f93cab0c71 100644 --- a/p2p/src/network/pubsub/mod.rs +++ b/p2p/src/network/pubsub/mod.rs @@ -1,4 +1,4 @@ -mod pb { +pub mod pb { include!(concat!(env!("OUT_DIR"), "/gossipsub.rs")); } @@ -7,7 +7,8 @@ pub use self::p2p_network_pubsub_actions::P2pNetworkPubsubAction; mod p2p_network_pubsub_state; pub use self::p2p_network_pubsub_state::{ - P2pNetworkPubsubClientState, P2pNetworkPubsubClientTopicState, P2pNetworkPubsubState, + P2pNetworkPubsubClientState, P2pNetworkPubsubClientTopicState, P2pNetworkPubsubMessageCacheId, + P2pNetworkPubsubState, }; #[cfg(feature = "p2p-libp2p")] @@ -18,3 +19,13 @@ const TOPIC: &str = "coda/consensus-messages/0.0.1"; pub mod pubsub_effectful; pub use pubsub_effectful::P2pNetworkPubsubEffectfulAction; + +#[derive(serde::Serialize, serde:: Deserialize, Debug, Clone)] +pub enum BroadcastMessageId { + BlockHash { + hash: mina_p2p_messages::v2::StateHash, + }, + MessageId { + message_id: P2pNetworkPubsubMessageCacheId, + }, +} diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs b/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs index ee12863b77..e75f06bfb9 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs @@ -1,4 +1,4 @@ -use super::pb; +use super::{p2p_network_pubsub_state::P2pNetworkPubsubMessageCacheId, pb, BroadcastMessageId}; use crate::{token::BroadcastAlgorithm, ConnectionAddr, Data, P2pState, PeerId, StreamId}; use mina_p2p_messages::gossip::GossipNetMessageV2; use openmina_core::ActionEvent; @@ -61,19 +61,29 @@ pub enum P2pNetworkPubsubAction { }, /// Clean up temporary states after processing an incoming message. - IncomingMessageCleanup { peer_id: PeerId }, + IncomingMessageCleanup { + peer_id: PeerId, + }, /// Add a peer to the mesh network for a specific topic. - Graft { peer_id: PeerId, topic_id: String }, + Graft { + peer_id: PeerId, + topic_id: String, + }, /// Remove a peer from the mesh network for a specific topic. - Prune { peer_id: PeerId, topic_id: String }, + Prune { + peer_id: PeerId, + topic_id: String, + }, /// Initiate the broadcasting of a message to all subscribed peers. /// /// **Fields:** /// - `message`: The gossip network message to broadcast. - Broadcast { message: GossipNetMessageV2 }, + Broadcast { + message: GossipNetMessageV2, + }, /// Prepare a message for signing before broadcasting. /// @@ -91,32 +101,75 @@ pub enum P2pNetworkPubsubAction { /// An error occured during the signing process. #[action_event(level = warn, fields(display(author), display(topic)))] - SignError { author: PeerId, topic: String }, + SignError { + author: PeerId, + topic: String, + }, /// Finalize the broadcasting of a signed message by attaching the signature. /// /// **Fields:** /// - `signature`: The cryptographic signature of the message. - BroadcastSigned { signature: Data }, + BroadcastSigned { + signature: Data, + }, /// Prepare an outgoing message to send to a specific peer. - OutgoingMessage { peer_id: PeerId }, + OutgoingMessage { + peer_id: PeerId, + }, /// Clear the outgoing message state for a specific peer after sending. - OutgoingMessageClear { peer_id: PeerId }, + OutgoingMessageClear { + peer_id: PeerId, + }, /// An error occured during the sending of an outgoing message. /// /// **Fields:** /// - `msg`: The protobuf message that failed to send. #[action_event(level = warn, fields(display(peer_id), debug(msg)))] - OutgoingMessageError { msg: pb::Rpc, peer_id: PeerId }, + OutgoingMessageError { + msg: pb::Rpc, + peer_id: PeerId, + }, /// Send encoded data over an outgoing stream to a specific peer. /// /// **Fields:** /// - `data`: The encoded data to be sent. - OutgoingData { data: Data, peer_id: PeerId }, + OutgoingData { + data: Data, + peer_id: PeerId, + }, + + HandleIncomingMessage { + message: pb::Message, + message_content: GossipNetMessageV2, + peer_id: PeerId, + }, + + ValidateIncomingMessage { + message_id: P2pNetworkPubsubMessageCacheId, + }, + + /// Delete expired messages from state + PruneMessages {}, + + RejectMessage { + message_id: Option, + peer_id: Option, + reason: String, + }, + IgnoreMessage { + message_id: Option, + reason: String, + }, + + // After message is fully validated, broadcast it to other peers + BroadcastValidatedMessage { + message_id: BroadcastMessageId, + }, } impl From for crate::P2pAction { @@ -127,14 +180,21 @@ impl From for crate::P2pAction { impl redux::EnablingCondition for P2pNetworkPubsubAction { fn is_enabled(&self, state: &P2pState, _time: redux::Timestamp) -> bool { + let pubsub = &state.network.scheduler.broadcast_state; match self { - P2pNetworkPubsubAction::OutgoingMessage { peer_id } => state - .network - .scheduler - .broadcast_state + P2pNetworkPubsubAction::OutgoingMessage { peer_id } => pubsub .clients .get(peer_id) .map_or(false, |s| !s.message_is_empty()), + P2pNetworkPubsubAction::Prune { peer_id, topic_id } => pubsub + .topics + .get(topic_id) + .map_or(false, |topics| topics.contains_key(peer_id)), + P2pNetworkPubsubAction::BroadcastValidatedMessage { message_id } + | P2pNetworkPubsubAction::RejectMessage { + message_id: Some(message_id), + .. + } => pubsub.mcache.contains_broadcast_id(message_id), _ => true, } } diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs b/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs index 7782ad9965..5509a1e551 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs @@ -1,23 +1,32 @@ -use std::collections::btree_map::Entry; +use std::{collections::btree_map::Entry, time::Duration}; use binprot::BinProtRead; -use mina_p2p_messages::{gossip, v2}; +use mina_p2p_messages::{ + gossip::{self, GossipNetMessageV2}, + v2::NetworkPoolSnarkPoolDiffVersionedStableV2, +}; use openmina_core::{block::BlockWithHash, bug_condition, fuzz_maybe, fuzzed_maybe, Substate}; use redux::{Dispatcher, Timestamp}; use crate::{ channels::{snark::P2pChannelsSnarkAction, transaction::P2pChannelsTransactionAction}, + disconnection::{P2pDisconnectionAction, P2pDisconnectionReason}, peer::P2pPeerAction, - Data, P2pConfig, P2pNetworkYamuxAction, PeerId, + Data, P2pConfig, P2pNetworkYamuxAction, P2pState, PeerId, }; use super::{ - p2p_network_pubsub_state::P2pNetworkPubsubClientMeshAddingState, + p2p_network_pubsub_state::{ + source_from_message, P2pNetworkPubsubClientMeshAddingState, + P2pNetworkPubsubMessageCacheMessage, + }, pb::{self, Message}, P2pNetworkPubsubAction, P2pNetworkPubsubClientState, P2pNetworkPubsubEffectfulAction, - P2pNetworkPubsubState, TOPIC, + P2pNetworkPubsubMessageCacheId, P2pNetworkPubsubState, TOPIC, }; +const MAX_MESSAGE_KEEP_DURATION: Duration = Duration::from_secs(300); + impl P2pNetworkPubsubState { pub fn reducer( mut state_context: Substate, @@ -29,6 +38,7 @@ impl P2pNetworkPubsubState { { let pubsub_state = state_context.get_substate_mut()?; let (action, meta) = action.split(); + let time = meta.time(); match action { P2pNetworkPubsubAction::NewStream { @@ -173,22 +183,32 @@ impl P2pNetworkPubsubState { message, seen_limit, } => { + // Check that if we can extract source from message, this is pre check + if source_from_message(&message).is_err() { + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(P2pNetworkPubsubAction::RejectMessage { + message_id: None, + peer_id: Some(peer_id), + reason: "Invalid originator in message".to_owned(), + }); + return Ok(()); + } + // Check result later to ensure we always dispatch the cleanup action let reduce_incoming_result = - pubsub_state.reduce_incoming_message(peer_id, message, seen_limit); + pubsub_state.reduce_incoming_message(&message, seen_limit); let (dispatcher, global_state) = state_context.into_dispatcher_and_state(); + let p2p_state: &P2pState = global_state.substate()?; + let state: &Self = global_state.substate()?; dispatcher.push(P2pNetworkPubsubAction::IncomingMessageCleanup { peer_id }); - reduce_incoming_result?; - - let state: &Self = global_state.substate()?; - let config: &P2pConfig = global_state.substate()?; + let message_content = reduce_incoming_result?; for (topic_id, map) in &state.topics { let mesh_size = map.values().filter(|s| s.on_mesh()).count(); - let could_accept = mesh_size < config.meshsub.outbound_degree_high; + let could_accept = mesh_size < p2p_state.config.meshsub.outbound_degree_high; if !could_accept { if let Some(topic_state) = map.get(&peer_id) { @@ -200,29 +220,43 @@ impl P2pNetworkPubsubState { } } - if let Err(error) = Self::broadcast(dispatcher, global_state) { - bug_condition!( - "Failure when trying to broadcast incoming pubsub message: {error}" - ); - }; - - if let Some((_, block)) = state.incoming_block.as_ref() { - let best_tip = BlockWithHash::try_new(block.clone())?; - dispatcher.push(P2pPeerAction::BestTipUpdate { peer_id, best_tip }); - } - for (transaction, nonce) in &state.incoming_transactions { - dispatcher.push(P2pChannelsTransactionAction::Libp2pReceived { + // This happens if message was already seen + if let Some(message_content) = message_content { + dispatcher.push(P2pNetworkPubsubAction::HandleIncomingMessage { + message, + message_content, peer_id, - transaction: Box::new(transaction.clone()), - nonce: *nonce, }); - } - for (snark, nonce) in &state.incoming_snarks { - dispatcher.push(P2pChannelsSnarkAction::Libp2pReceived { - peer_id, - snark: Box::new(snark.clone()), - nonce: *nonce, + } else { + dispatcher.push(P2pNetworkPubsubAction::IgnoreMessage { + message_id: None, + reason: "Message already seen".to_owned(), }); + }; + + Ok(()) + } + P2pNetworkPubsubAction::HandleIncomingMessage { + message, + message_content, + peer_id, + } => { + let Ok(message_id) = + pubsub_state + .mcache + .put(message, message_content, peer_id, time) + else { + bug_condition!("Unable to add message to `mcache`"); + return Ok(()); + }; + + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + let p2p_state: &P2pState = state.substate()?; + + if let Some(callback) = p2p_state.callbacks.on_p2p_pubsub_message_received.clone() { + dispatcher.push_callback(callback, message_id); + } else { + dispatcher.push(P2pNetworkPubsubAction::ValidateIncomingMessage { message_id }); } Ok(()) } @@ -440,6 +474,167 @@ impl P2pNetworkPubsubState { } Ok(()) } + P2pNetworkPubsubAction::ValidateIncomingMessage { message_id } => { + let Some(message) = pubsub_state.mcache.map.remove(&message_id) else { + bug_condition!("Message with id: {:?} not found", message_id); + return Ok(()); + }; + + let P2pNetworkPubsubMessageCacheMessage::Init { + message, + content, + time, + peer_id, + } = message + else { + bug_condition!( + "`P2pNetworkPubsubAction::ValidateIncomingMessage` called on invalid state" + ); + return Ok(()); + }; + + let new_message_state = match &content { + GossipNetMessageV2::NewState(block) => { + let block_hash = block.try_hash()?; + P2pNetworkPubsubMessageCacheMessage::PreValidatedBlockMessage { + block_hash, + message, + peer_id, + time, + } + } + _ => P2pNetworkPubsubMessageCacheMessage::PreValidated { + message, + peer_id, + time, + }, + }; + pubsub_state + .mcache + .map + .insert(message_id, new_message_state); + + let dispatcher = state_context.into_dispatcher(); + + match content { + GossipNetMessageV2::NewState(block) => { + let best_tip = BlockWithHash::try_new(block.clone())?; + dispatcher.push(P2pPeerAction::BestTipUpdate { peer_id, best_tip }); + return Ok(()); + } + GossipNetMessageV2::TransactionPoolDiff { message, nonce } => { + let nonce = nonce.as_u32(); + for transaction in message.0 { + dispatcher.push(P2pChannelsTransactionAction::Libp2pReceived { + peer_id, + transaction: Box::new(transaction), + nonce, + }); + } + } + GossipNetMessageV2::SnarkPoolDiff { + message: NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(work), + nonce, + } => { + dispatcher.push(P2pChannelsSnarkAction::Libp2pReceived { + peer_id, + snark: Box::new(work.1.into()), + nonce: nonce.as_u32(), + }); + } + _ => {} + } + + dispatcher.push(P2pNetworkPubsubAction::BroadcastValidatedMessage { + message_id: super::BroadcastMessageId::MessageId { message_id }, + }); + Ok(()) + } + P2pNetworkPubsubAction::BroadcastValidatedMessage { message_id } => { + let Some((mcache_message_id, message)) = + pubsub_state.mcache.get_message_id_and_message(&message_id) + else { + bug_condition!("Message with id: {:?} not found", message_id); + return Ok(()); + }; + let raw_message = message.message().clone(); + let peer_id = message.peer_id(); + + pubsub_state.reduce_incoming_validated_message( + mcache_message_id, + peer_id, + &raw_message, + ); + + let Some((_message_id, message)) = + pubsub_state.mcache.get_message_id_and_message(&message_id) + else { + bug_condition!("Message with id: {:?} not found", message_id); + return Ok(()); + }; + + *message = P2pNetworkPubsubMessageCacheMessage::Validated { + message: raw_message, + peer_id, + time: message.time(), + }; + + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + + Self::broadcast(dispatcher, state) + } + P2pNetworkPubsubAction::PruneMessages {} => { + let messages = pubsub_state + .mcache + .map + .iter() + .filter_map(|(message_id, message)| { + if message.time() + MAX_MESSAGE_KEEP_DURATION > time { + Some(message_id.to_owned()) + } else { + None + } + }) + .collect::>(); + + for message_id in messages { + pubsub_state.mcache.remove_message(message_id); + } + Ok(()) + } + P2pNetworkPubsubAction::RejectMessage { + message_id, + peer_id, + .. + } => { + let mut peer_id = peer_id; + if let Some(message_id) = message_id { + let Some((_message_id, message)) = + pubsub_state.mcache.get_message_id_and_message(&message_id) + else { + bug_condition!("Message not found for id: {:?}", message_id); + return Ok(()); + }; + + if peer_id.is_none() { + peer_id = Some(message.peer_id()); + } + + pubsub_state.mcache.remove_message(_message_id); + } + + let dispatcher = state_context.into_dispatcher(); + + if let Some(peer_id) = peer_id { + dispatcher.push(P2pDisconnectionAction::Init { + peer_id, + reason: P2pDisconnectionReason::InvalidMessage, + }); + } + + Ok(()) + } + P2pNetworkPubsubAction::IgnoreMessage { .. } => Ok(()), } } @@ -468,58 +663,14 @@ impl P2pNetworkPubsubState { Ok(()) } - #[inline(never)] - fn reduce_incoming_message( + fn reduce_incoming_validated_message( &mut self, + message_id: P2pNetworkPubsubMessageCacheId, peer_id: PeerId, - message: Message, - seen_limit: usize, - ) -> Result<(), String> { + message: &Message, + ) { let topic = self.topics.entry(message.topic.clone()).or_default(); - if let Some(signature) = &message.signature { - // skip recently seen message - if !self.seen.contains(signature) { - self.seen.push_back(signature.clone()); - // keep only last `n` to avoid memory leak - if self.seen.len() > seen_limit { - self.seen.pop_front(); - } - } else { - return Ok(()); - } - } - - if let Some(data) = &message.data { - if data.len() > 8 { - let mut slice = &data[8..]; - match gossip::GossipNetMessageV2::binprot_read(&mut slice) { - Ok(gossip::GossipNetMessageV2::NewState(block)) => { - self.incoming_block = Some((peer_id, block)); - } - Ok(gossip::GossipNetMessageV2::TransactionPoolDiff { message, nonce }) => { - let nonce = nonce.as_u32(); - let txs = message.0.into_iter().map(|tx| (tx, nonce)); - self.incoming_transactions.extend(txs); - } - Ok(gossip::GossipNetMessageV2::SnarkPoolDiff { message, nonce }) => { - if let v2::NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(work) = - message - { - self.incoming_snarks.push((work.1.into(), nonce.as_u32())); - } - } - Err(err) => { - return Err(err.to_string()); - } - } - } - } - - let message_id = self.mcache.put(message.clone()); - - // TODO: this should only happen after the contents have been validated. - // The only validation that has happened so far is that the message can be parsed. self.clients .iter_mut() .filter(|(c, _)| { @@ -531,17 +682,66 @@ impl P2pNetworkPubsubState { return; }; if topic_state.on_mesh() { - state.publish(&message) + state.publish(message) } else { let ctr = state.message.control.get_or_insert_with(Default::default); ctr.ihave.push(pb::ControlIHave { topic_id: Some(message.topic.clone()), - message_ids: message_id.clone().into_iter().collect(), + message_ids: vec![message_id.to_raw_bytes()], }) } }); + } - Ok(()) + /// Processes an incoming message by checking for duplicates and deserializing its contents. + /// + /// This function performs two main operations: + /// 1. Deduplication: Tracks recently seen messages using their signatures to avoid processing duplicates + /// 2. Deserialization: Converts valid message data into a `GossipNetMessageV2` structure + /// + /// # Arguments + /// + /// * `message` - The incoming message to process + /// * `seen_limit` - Maximum number of message signatures to keep in the deduplication cache + /// + /// # Returns + /// + /// * `Ok(Some(GossipNetMessageV2))` - Successfully processed and deserialized message + /// * `Ok(None)` - Message was a duplicate (already seen) + /// * `Err(String)` - Error during processing (invalid message format or deserialization failure) + /// + #[inline(never)] + fn reduce_incoming_message( + &mut self, + message: &Message, + seen_limit: usize, + ) -> Result, String> { + let Some(signature) = &message.signature else { + bug_condition!("Validation failed: missing signature"); + return Ok(None); + }; + + // skip recently seen message + if !self.seen.contains(signature) { + self.seen.push_back(signature.clone()); + // keep only last `n` to avoid memory leak + if self.seen.len() > seen_limit { + self.seen.pop_front(); + } + } else { + return Ok(None); + } + + match &message.data { + Some(data) if data.len() > 8 => { + let mut slice = &data[8..]; + Ok(Some( + gossip::GossipNetMessageV2::binprot_read(&mut slice) + .map_err(|e| format!("Invalid `GossipNetMessageV2` message, error: {e}"))?, + )) + } + _ => Err("Invalid message".to_owned()), + } } fn combined_with_pending_buffer<'a>(buffer: &'a mut Vec, data: &'a [u8]) -> &'a [u8] { @@ -644,9 +844,9 @@ impl P2pNetworkPubsubState { // Respond to iwant requests by publishing available messages from the cache. for iwant in iwant_requests { for msg_id in &iwant.message_ids { - if let Some(msg) = self.mcache.map.get(msg_id) { + if let Some(msg) = self.mcache.get_message_from_raw_message_id(msg_id) { if let Some(client) = self.clients.get_mut(peer_id) { - client.publish(msg); + client.publish(msg.message()); } } } diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_state.rs b/p2p/src/network/pubsub/p2p_network_pubsub_state.rs index 8ece03753e..80f687172b 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_state.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_state.rs @@ -1,17 +1,16 @@ -use super::pb; +use super::{pb, BroadcastMessageId}; use crate::{token::BroadcastAlgorithm, ConnectionAddr, PeerId, StreamId}; +use libp2p_identity::ParseError; +use mina_p2p_messages::gossip::GossipNetMessageV2; +use openmina_core::{snark::Snark, transaction::Transaction}; +use redux::Timestamp; +use serde::{Deserialize, Serialize}; use std::{ collections::{BTreeMap, BTreeSet, VecDeque}, - sync::Arc, time::Duration, }; -use mina_p2p_messages::v2; -use openmina_core::{snark::Snark, transaction::Transaction}; -use redux::Timestamp; -use serde::{Deserialize, Serialize}; - pub const IWANT_TIMEOUT_DURATION: Duration = Duration::from_secs(5); /// State of the P2P Network PubSub system. @@ -44,9 +43,6 @@ pub struct P2pNetworkPubsubState { /// For quick access and reducing redundant data transmission across peers. pub mcache: P2pNetworkPubsubMessageCache, - /// Incoming block from a peer, if any. - pub incoming_block: Option<(PeerId, Arc)>, - /// Incoming transactions from peers along with their nonces. pub incoming_transactions: Vec<(Transaction, u32)>, @@ -71,15 +67,19 @@ impl P2pNetworkPubsubState { self.clients.remove(peer_id); } - pub fn filter_iwant_message_ids(&mut self, message_id: &Vec, timestamp: Timestamp) -> bool { - if self.mcache.map.contains_key(message_id) { + pub fn filter_iwant_message_ids(&mut self, message_id: &[u8], timestamp: Timestamp) -> bool { + if self + .mcache + .get_message_from_raw_message_id(message_id) + .is_some() + { return false; } let message_count = self .iwant .iter_mut() - .find(|message| &message.message_id == message_id); + .find(|message| message.message_id == message_id); match message_count { Some(message) => { @@ -103,7 +103,7 @@ impl P2pNetworkPubsubState { } None => { let message_count = P2pNetworkPubsubIwantRequestCount { - message_id: message_id.to_owned(), + message_id: message_id.to_vec(), count: vec![timestamp], }; @@ -123,8 +123,6 @@ impl P2pNetworkPubsubState { self.incoming_transactions.shrink_to(0x20); self.incoming_snarks.shrink_to(0x20); - - self.incoming_block = None; } } @@ -172,11 +170,11 @@ pub struct P2pNetworkPubsubClientState { impl P2pNetworkPubsubClientState { pub fn publish(&mut self, message: &pb::Message) { - let Some(id) = compute_message_id(message) else { + let Ok(id) = P2pNetworkPubsubMessageCacheId::compute_message_id(message) else { self.message.publish.push(message.clone()); return; }; - if self.cache.map.insert(id.clone()) { + if self.cache.map.insert(id) { self.message.publish.push(message.clone()); } self.cache.queue.push_back(id); @@ -200,54 +198,207 @@ impl P2pNetworkPubsubClientState { #[derive(Default, Serialize, Deserialize, Debug, Clone)] pub struct P2pNetworkPubsubRecentlyPublishCache { - pub map: BTreeSet>, - pub queue: VecDeque>, + pub map: BTreeSet, + pub queue: VecDeque, } // TODO: store blocks, snarks and txs separately #[derive(Default, Serialize, Deserialize, Debug, Clone)] pub struct P2pNetworkPubsubMessageCache { - pub map: BTreeMap, pb::Message>, - pub queue: VecDeque>, + pub map: BTreeMap, + pub queue: VecDeque, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum P2pNetworkPubsubMessageCacheMessage { + Init { + message: pb::Message, + content: GossipNetMessageV2, + peer_id: PeerId, + time: Timestamp, + }, + PreValidatedBlockMessage { + block_hash: mina_p2p_messages::v2::StateHash, + message: pb::Message, + peer_id: PeerId, + time: Timestamp, + }, + // This is temporary handling for transactions and snark pool + PreValidated { + message: pb::Message, + peer_id: PeerId, + time: Timestamp, + }, + Validated { + message: pb::Message, + peer_id: PeerId, + time: Timestamp, + }, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy)] +pub struct P2pNetworkPubsubMessageCacheId { + pub source: libp2p_identity::PeerId, + pub seqno: u64, +} + +impl P2pNetworkPubsubMessageCacheId { + // TODO: what if wasm32? + // How to test it? + pub fn compute_message_id( + message: &pb::Message, + ) -> Result { + let source = source_from_message(message)?; + + let seqno = message + .seqno + .as_ref() + .and_then(|b| <[u8; 8]>::try_from(b.as_slice()).ok()) + .map(u64::from_be_bytes) + .unwrap_or_default(); + + Ok(P2pNetworkPubsubMessageCacheId { source, seqno }) + } + + pub fn to_raw_bytes(&self) -> Vec { + let mut message_id = self.source.to_base58(); + message_id.push_str(&self.seqno.to_string()); + message_id.into_bytes() + } +} + +impl P2pNetworkPubsubMessageCacheMessage { + pub fn message(&self) -> &pb::Message { + match self { + Self::Init { message, .. } => message, + Self::PreValidated { message, .. } => message, + Self::PreValidatedBlockMessage { message, .. } => message, + Self::Validated { message, .. } => message, + } + } + pub fn time(&self) -> Timestamp { + *match self { + Self::Init { time, .. } => time, + Self::PreValidated { time, .. } => time, + Self::PreValidatedBlockMessage { time, .. } => time, + Self::Validated { time, .. } => time, + } + } + pub fn peer_id(&self) -> PeerId { + *match self { + Self::Init { peer_id, .. } => peer_id, + Self::PreValidated { peer_id, .. } => peer_id, + Self::PreValidatedBlockMessage { peer_id, .. } => peer_id, + Self::Validated { peer_id, .. } => peer_id, + } + } } impl P2pNetworkPubsubMessageCache { const CAPACITY: usize = 100; - pub fn put(&mut self, message: pb::Message) -> Option> { - let id = compute_message_id(&message)?; - self.map.insert(id.clone(), message); - self.queue.push_back(id.clone()); + pub fn put( + &mut self, + message: pb::Message, + content: GossipNetMessageV2, + peer_id: PeerId, + time: Timestamp, + ) -> Result { + let id = P2pNetworkPubsubMessageCacheId::compute_message_id(&message)?; + self.map.insert( + id, + P2pNetworkPubsubMessageCacheMessage::Init { + message, + content, + time, + peer_id, + }, + ); + + self.queue.push_back(id); if self.queue.len() > Self::CAPACITY { if let Some(id) = self.queue.pop_front() { self.map.remove(&id); } } - Some(id) + Ok(id) + } + + pub fn get_message(&self, id: &P2pNetworkPubsubMessageCacheId) -> Option<&GossipNetMessageV2> { + let message = self.map.get(id)?; + match message { + P2pNetworkPubsubMessageCacheMessage::Init { content, .. } => Some(content), + _ => None, + } + } + + pub fn contains_broadcast_id(&self, message_id: &BroadcastMessageId) -> bool { + match message_id { + super::BroadcastMessageId::BlockHash { hash } => self + .map + .values() + .any(|message| matches!(message, P2pNetworkPubsubMessageCacheMessage::PreValidatedBlockMessage { block_hash, .. } if block_hash == hash)), + super::BroadcastMessageId::MessageId { message_id } => { + self.map.contains_key(message_id) + } + } + } + + pub fn get_message_id_and_message( + &mut self, + message_id: &BroadcastMessageId, + ) -> Option<( + P2pNetworkPubsubMessageCacheId, + &mut P2pNetworkPubsubMessageCacheMessage, + )> { + match message_id { + super::BroadcastMessageId::BlockHash { hash } => { + self.map + .iter_mut() + .find_map(|(message_id, message)| match message { + P2pNetworkPubsubMessageCacheMessage::PreValidatedBlockMessage { + block_hash, + .. + } if block_hash == hash => Some((*message_id, message)), + _ => None, + }) + } + super::BroadcastMessageId::MessageId { message_id } => self + .map + .get_mut(message_id) + .map(|content| (*message_id, content)), + } + } + + pub fn remove_message(&mut self, message_id: P2pNetworkPubsubMessageCacheId) { + let _ = self.map.remove(&message_id); + if let Some(position) = self.queue.iter().position(|id| id == &message_id) { + self.queue.remove(position); + } + } + + pub fn get_message_from_raw_message_id( + &self, + message_id: &[u8], + ) -> Option<&P2pNetworkPubsubMessageCacheMessage> { + self.map.iter().find_map(|(key, value)| { + if key.to_raw_bytes() == message_id { + Some(value) + } else { + None + } + }) } } -// TODO: what if wasm32? -// How to test it? -pub fn compute_message_id(message: &pb::Message) -> Option> { +pub fn source_from_message(message: &pb::Message) -> Result { let source_bytes = message .from .as_ref() .map(AsRef::as_ref) .unwrap_or(&[0, 1, 0][..]); - let mut source_string = libp2p_identity::PeerId::from_bytes(source_bytes) - .ok()? - .to_base58(); - - let sequence_number = message - .seqno - .as_ref() - .and_then(|b| <[u8; 8]>::try_from(b.as_slice()).ok()) - .map(u64::from_be_bytes) - .unwrap_or_default(); - source_string.push_str(&sequence_number.to_string()); - Some(source_string.into_bytes()) + libp2p_identity::PeerId::from_bytes(source_bytes) } #[derive(Default, Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] diff --git a/p2p/src/p2p_reducer.rs b/p2p/src/p2p_reducer.rs index e98125bad5..3cb17bc77a 100644 --- a/p2p/src/p2p_reducer.rs +++ b/p2p/src/p2p_reducer.rs @@ -9,7 +9,8 @@ use crate::{ }, disconnection::{P2pDisconnectedState, P2pDisconnectionAction}, P2pAction, P2pNetworkKadKey, P2pNetworkKademliaAction, P2pNetworkPnetAction, - P2pNetworkRpcAction, P2pNetworkSelectAction, P2pNetworkState, P2pPeerState, P2pState, PeerId, + P2pNetworkPubsubAction, P2pNetworkRpcAction, P2pNetworkSelectAction, P2pNetworkState, + P2pPeerState, P2pState, PeerId, }; use openmina_core::{bug_condition, Substate}; use redux::{ActionMeta, ActionWithMeta, Dispatcher, Timestamp}; @@ -92,6 +93,7 @@ impl P2pState { state.p2p_pnet_timeouts(dispatcher, time)?; state.p2p_select_timeouts(dispatcher, time)?; state.p2p_rpc_heartbeats(dispatcher, time)?; + dispatcher.push(P2pNetworkPubsubAction::PruneMessages {}); } state.rpc_timeouts(dispatcher, time)?; diff --git a/p2p/src/p2p_state.rs b/p2p/src/p2p_state.rs index 664b7682a7..1d6f386f5b 100644 --- a/p2p/src/p2p_state.rs +++ b/p2p/src/p2p_state.rs @@ -33,8 +33,8 @@ use crate::{ identify::{P2pNetworkIdentify, P2pNetworkIdentifyState}, P2pNetworkState, }, - Limit, P2pConfig, P2pLimits, P2pNetworkKadState, P2pNetworkPubsubState, - P2pNetworkSchedulerState, P2pTimeouts, PeerId, + Limit, P2pConfig, P2pLimits, P2pNetworkKadState, P2pNetworkPubsubMessageCacheId, + P2pNetworkPubsubState, P2pNetworkSchedulerState, P2pTimeouts, PeerId, }; use mina_p2p_messages::v2; @@ -562,6 +562,9 @@ pub struct P2pCallbacks { /// Callback for [`P2pChannelsStreamingRpcAction::ResponseReceived`] pub on_p2p_channels_streaming_rpc_response_received: OptionalCallback<(PeerId, P2pRpcId, Option)>, + + /// Callback for received pubsub message + pub on_p2p_pubsub_message_received: OptionalCallback, } impl_substate_access!(P2pState, P2pNetworkState, network);