Skip to content

Commit f7384f5

Browse files
committed
fix(p2p): Avoid duplicating incoming messages during validation
1 parent 56a7d0a commit f7384f5

File tree

5 files changed

+58
-14
lines changed

5 files changed

+58
-14
lines changed

node/src/action_kind.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -413,8 +413,9 @@ pub enum ActionKind {
413413
P2pNetworkPubsubPrune,
414414
P2pNetworkPubsubSign,
415415
P2pNetworkPubsubSignError,
416-
P2pNetworkPubsubEffectfulIncomingData,
416+
P2pNetworkPubsubValidateIncomingMessages,
417417
P2pNetworkPubsubEffectfulSign,
418+
P2pNetworkPubsubEffectfulValidateIncomingMessages,
418419
P2pNetworkRpcHeartbeatSend,
419420
P2pNetworkRpcIncomingData,
420421
P2pNetworkRpcIncomingMessage,
@@ -688,7 +689,7 @@ pub enum ActionKind {
688689
}
689690

690691
impl ActionKind {
691-
pub const COUNT: u16 = 579;
692+
pub const COUNT: u16 = 580;
692693
}
693694

694695
impl std::fmt::Display for ActionKind {
@@ -1892,6 +1893,9 @@ impl ActionKindGet for P2pNetworkPubsubAction {
18921893
match self {
18931894
Self::NewStream { .. } => ActionKind::P2pNetworkPubsubNewStream,
18941895
Self::IncomingData { .. } => ActionKind::P2pNetworkPubsubIncomingData,
1896+
Self::ValidateIncomingMessages { .. } => {
1897+
ActionKind::P2pNetworkPubsubValidateIncomingMessages
1898+
}
18951899
Self::IncomingMessage { .. } => ActionKind::P2pNetworkPubsubIncomingMessage,
18961900
Self::IncomingMessageCleanup { .. } => {
18971901
ActionKind::P2pNetworkPubsubIncomingMessageCleanup
@@ -1994,7 +1998,9 @@ impl ActionKindGet for P2pNetworkPubsubEffectfulAction {
19941998
fn kind(&self) -> ActionKind {
19951999
match self {
19962000
Self::Sign { .. } => ActionKind::P2pNetworkPubsubEffectfulSign,
1997-
Self::IncomingData { .. } => ActionKind::P2pNetworkPubsubEffectfulIncomingData,
2001+
Self::ValidateIncomingMessages { .. } => {
2002+
ActionKind::P2pNetworkPubsubEffectfulValidateIncomingMessages
2003+
}
19982004
}
19992005
}
20002006
}

p2p/src/network/pubsub/p2p_network_pubsub_actions.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,14 @@ pub enum P2pNetworkPubsubAction {
4242
seen_limit: usize,
4343
},
4444

45-
/// Handle a fully decoded message received from a peer.
45+
/// Validate a batch of decoded incoming messages.
46+
ValidateIncomingMessages {
47+
peer_id: PeerId,
48+
seen_limit: usize,
49+
addr: ConnectionAddr,
50+
},
51+
52+
/// Handle a fully decoded and validated message received from a peer.
4653
///
4754
/// **Fields:**
4855
/// - `message`: The decoded protobuf message.

p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use redux::{Dispatcher, Timestamp};
88
use crate::{
99
channels::{snark::P2pChannelsSnarkAction, transaction::P2pChannelsTransactionAction},
1010
peer::P2pPeerAction,
11-
Data, P2pConfig, P2pNetworkYamuxAction, P2pState, PeerId,
11+
Data, P2pConfig, P2pNetworkYamuxAction, PeerId,
1212
};
1313

1414
use super::{
@@ -135,18 +135,31 @@ impl P2pNetworkPubsubState {
135135
} => {
136136
pubsub_state.reduce_incoming_data(&peer_id, data, meta.time())?;
137137

138-
let (dispatcher, state) = state_context.into_dispatcher_and_state();
139-
let p2p_state: &P2pState = state.substate()?;
140-
let state = &p2p_state.network.scheduler.broadcast_state;
141-
let Some(state) = state.clients.get(&peer_id) else {
138+
let dispatcher = state_context.into_dispatcher();
139+
140+
dispatcher.push(P2pNetworkPubsubAction::ValidateIncomingMessages {
141+
peer_id,
142+
seen_limit,
143+
addr,
144+
});
145+
146+
Ok(())
147+
}
148+
P2pNetworkPubsubAction::ValidateIncomingMessages {
149+
peer_id,
150+
seen_limit,
151+
addr,
152+
} => {
153+
let Some(state) = pubsub_state.clients.get_mut(&peer_id) else {
142154
// TODO: investigate, cannot reproduce this
143155
// bug_condition!("{:?} not found in state.clients", peer_id);
144156
return Ok(());
145157
};
158+
let messages = std::mem::take(&mut state.incoming_messages);
159+
160+
let dispatcher = state_context.into_dispatcher();
146161

147-
// TODO: try to reuse data instead of cloning all message
148-
let messages = state.incoming_messages.clone();
149-
dispatcher.push(P2pNetworkPubsubEffectfulAction::IncomingData {
162+
dispatcher.push(P2pNetworkPubsubEffectfulAction::ValidateIncomingMessages {
150163
peer_id,
151164
seen_limit,
152165
addr,

p2p/src/network/pubsub/pubsub_effectful/p2p_network_pubsub_effectful_actions.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,29 @@ use crate::{pubsub::pb::Message, ConnectionAddr, P2pState, PeerId};
22
use openmina_core::ActionEvent;
33
use serde::{Deserialize, Serialize};
44

5+
/// Eeffectful actions within the P2P Network PubSub system.
56
#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)]
67
pub enum P2pNetworkPubsubEffectfulAction {
8+
/// Initiate the signing of a message before broadcasting.
9+
///
10+
/// **Fields:**
11+
/// - `author`: The identifier of the peer authoring the message.
12+
/// - `topic`: The topic under which the message is published.
13+
/// - `message`: The protobuf message to be signed.
714
Sign {
815
author: PeerId,
916
topic: String,
1017
message: Message,
1118
},
12-
IncomingData {
19+
20+
/// Validate a batch of incoming messages from a peer.
21+
///
22+
/// **Fields:**
23+
/// - `peer_id`: The identifier of the peer sending the messages.
24+
/// - `seen_limit`: The limit for tracking seen messages to prevent duplication.
25+
/// - `addr`: The connection address of the peer.
26+
/// - `messages`: Decoded protobuf messages.
27+
ValidateIncomingMessages {
1328
peer_id: PeerId,
1429
seen_limit: usize,
1530
addr: ConnectionAddr,

p2p/src/network/pubsub/pubsub_effectful/p2p_network_pubsub_effectful_effects.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ impl P2pNetworkPubsubEffectfulAction {
3030
store.dispatch(P2pNetworkPubsubAction::BroadcastSigned { signature });
3131
}
3232
}
33-
P2pNetworkPubsubEffectfulAction::IncomingData {
33+
P2pNetworkPubsubEffectfulAction::ValidateIncomingMessages {
3434
peer_id,
3535
seen_limit,
3636
addr,
@@ -73,6 +73,9 @@ impl P2pNetworkPubsubEffectfulAction {
7373
error: P2pNetworkConnectionError::PubSubError(error.to_string()),
7474
});
7575

76+
// TODO: should this error short-circuit the whole batch?
77+
// if yes, shouldn't every message be verified first before
78+
// dispatching `P2pNetworkPubsubAction::IncomingMessage`?
7679
return;
7780
}
7881
}

0 commit comments

Comments
 (0)