Skip to content

Commit 62e44fd

Browse files
committed
fix(p2p): Cleanup pubsub data immediately after it has been processed
1 parent cecd765 commit 62e44fd

File tree

3 files changed

+31
-14
lines changed

3 files changed

+31
-14
lines changed

node/src/action_kind.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ pub enum ActionKind {
404404
P2pNetworkPubsubGraft,
405405
P2pNetworkPubsubIncomingData,
406406
P2pNetworkPubsubIncomingMessage,
407+
P2pNetworkPubsubIncomingMessageCleanup,
407408
P2pNetworkPubsubNewStream,
408409
P2pNetworkPubsubOutgoingData,
409410
P2pNetworkPubsubOutgoingMessage,
@@ -687,7 +688,7 @@ pub enum ActionKind {
687688
}
688689

689690
impl ActionKind {
690-
pub const COUNT: u16 = 578;
691+
pub const COUNT: u16 = 579;
691692
}
692693

693694
impl std::fmt::Display for ActionKind {
@@ -1892,6 +1893,9 @@ impl ActionKindGet for P2pNetworkPubsubAction {
18921893
Self::NewStream { .. } => ActionKind::P2pNetworkPubsubNewStream,
18931894
Self::IncomingData { .. } => ActionKind::P2pNetworkPubsubIncomingData,
18941895
Self::IncomingMessage { .. } => ActionKind::P2pNetworkPubsubIncomingMessage,
1896+
Self::IncomingMessageCleanup { .. } => {
1897+
ActionKind::P2pNetworkPubsubIncomingMessageCleanup
1898+
}
18951899
Self::Graft { .. } => ActionKind::P2pNetworkPubsubGraft,
18961900
Self::Prune { .. } => ActionKind::P2pNetworkPubsubPrune,
18971901
Self::Broadcast { .. } => ActionKind::P2pNetworkPubsubBroadcast,

p2p/src/network/pubsub/p2p_network_pubsub_actions.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ pub enum P2pNetworkPubsubAction {
2525
message: pb::Message,
2626
seen_limit: usize,
2727
},
28+
IncomingMessageCleanup {
29+
peer_id: PeerId,
30+
},
2831
Graft {
2932
peer_id: PeerId,
3033
topic_id: String,

p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,9 @@ impl P2pNetworkPubsubState {
163163
pubsub_state.reduce_incoming_message(peer_id, message, seen_limit)?;
164164

165165
let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
166+
167+
dispatcher.push(P2pNetworkPubsubAction::IncomingMessageCleanup { peer_id });
168+
166169
let state: &Self = global_state.substate()?;
167170
let config: &P2pConfig = global_state.substate()?;
168171

@@ -206,6 +209,26 @@ impl P2pNetworkPubsubState {
206209
}
207210
Ok(())
208211
}
212+
P2pNetworkPubsubAction::IncomingMessageCleanup { peer_id } => {
213+
pubsub_state.incoming_transactions.clear();
214+
pubsub_state.incoming_snarks.clear();
215+
216+
pubsub_state.incoming_transactions.shrink_to(0x20);
217+
pubsub_state.incoming_snarks.shrink_to(0x20);
218+
219+
pubsub_state.incoming_block = None;
220+
221+
let Some(client_state) = pubsub_state.clients.get_mut(&peer_id) else {
222+
bug_condition!(
223+
"State not found for action P2pNetworkPubsubAction::IncomingMessageCleanup"
224+
);
225+
return Ok(());
226+
};
227+
client_state.incoming_messages.clear();
228+
client_state.incoming_messages.shrink_to(0x20);
229+
230+
Ok(())
231+
}
209232
// we want to add peer to our mesh
210233
P2pNetworkPubsubAction::Graft { peer_id, topic_id } => {
211234
let Some(state) = pubsub_state
@@ -441,19 +464,6 @@ impl P2pNetworkPubsubState {
441464
message: Message,
442465
seen_limit: usize,
443466
) -> Result<(), String> {
444-
self.incoming_transactions.clear();
445-
self.incoming_snarks.clear();
446-
447-
self.incoming_transactions.shrink_to(0x20);
448-
self.incoming_snarks.shrink_to(0x20);
449-
450-
let Some(state) = self.clients.get_mut(&peer_id) else {
451-
bug_condition!("State not found for action P2pNetworkPubsubAction::IncomingMessage");
452-
return Ok(());
453-
};
454-
state.incoming_messages.clear();
455-
state.incoming_messages.shrink_to(0x20);
456-
457467
let topic = self.topics.entry(message.topic.clone()).or_default();
458468

459469
if let Some(signature) = &message.signature {

0 commit comments

Comments
 (0)