Skip to content

Commit 16c8ae6

Browse files
authored
Merge pull request #953 from openmina/fix/pubsub-tweaks
fix(p2p): Pubsub message handling tweaks
2 parents 5baabe6 + 4de4e35 commit 16c8ae6

File tree

3 files changed

+77
-53
lines changed

3 files changed

+77
-53
lines changed

node/src/action_kind.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ pub enum ActionKind {
404404
P2pNetworkPubsubGraft,
405405
P2pNetworkPubsubIncomingData,
406406
P2pNetworkPubsubIncomingMessage,
407+
P2pNetworkPubsubIncomingMessageCleanup,
407408
P2pNetworkPubsubNewStream,
408409
P2pNetworkPubsubOutgoingData,
409410
P2pNetworkPubsubOutgoingMessage,
@@ -687,7 +688,7 @@ pub enum ActionKind {
687688
}
688689

689690
impl ActionKind {
690-
pub const COUNT: u16 = 578;
691+
pub const COUNT: u16 = 579;
691692
}
692693

693694
impl std::fmt::Display for ActionKind {
@@ -1892,6 +1893,9 @@ impl ActionKindGet for P2pNetworkPubsubAction {
18921893
Self::NewStream { .. } => ActionKind::P2pNetworkPubsubNewStream,
18931894
Self::IncomingData { .. } => ActionKind::P2pNetworkPubsubIncomingData,
18941895
Self::IncomingMessage { .. } => ActionKind::P2pNetworkPubsubIncomingMessage,
1896+
Self::IncomingMessageCleanup { .. } => {
1897+
ActionKind::P2pNetworkPubsubIncomingMessageCleanup
1898+
}
18951899
Self::Graft { .. } => ActionKind::P2pNetworkPubsubGraft,
18961900
Self::Prune { .. } => ActionKind::P2pNetworkPubsubPrune,
18971901
Self::Broadcast { .. } => ActionKind::P2pNetworkPubsubBroadcast,

p2p/src/network/pubsub/p2p_network_pubsub_actions.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ pub enum P2pNetworkPubsubAction {
2525
message: pb::Message,
2626
seen_limit: usize,
2727
},
28+
IncomingMessageCleanup {
29+
peer_id: PeerId,
30+
},
2831
Graft {
2932
peer_id: PeerId,
3033
topic_id: String,

p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs

Lines changed: 69 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -160,51 +160,79 @@ impl P2pNetworkPubsubState {
160160
message,
161161
seen_limit,
162162
} => {
163-
pubsub_state.reduce_incoming_message(peer_id, message, seen_limit)?;
163+
// Check result later to ensure we always dispatch the cleanup action
164+
let reduce_incoming_result =
165+
pubsub_state.reduce_incoming_message(peer_id, message, seen_limit);
164166

165167
let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
168+
169+
dispatcher.push(P2pNetworkPubsubAction::IncomingMessageCleanup { peer_id });
170+
171+
reduce_incoming_result?;
172+
166173
let state: &Self = global_state.substate()?;
167174
let config: &P2pConfig = global_state.substate()?;
168175

169-
let incoming_block = state.incoming_block.as_ref().cloned();
170-
let incoming_transactions = state.incoming_transactions.clone();
171-
let incoming_snarks = state.incoming_snarks.clone();
172-
let topics = state.topics.clone();
173-
174-
for (topic_id, map) in topics {
176+
for (topic_id, map) in &state.topics {
175177
let mesh_size = map.values().filter(|s| s.on_mesh()).count();
176178
let could_accept = mesh_size < config.meshsub.outbound_degree_high;
177179

178180
if !could_accept {
179181
if let Some(topic_state) = map.get(&peer_id) {
180182
if topic_state.on_mesh() {
183+
let topic_id = topic_id.clone();
181184
dispatcher.push(P2pNetworkPubsubAction::Prune { peer_id, topic_id })
182185
}
183186
}
184187
}
185188
}
186189

187-
broadcast(dispatcher, global_state)?;
188-
if let Some((_, block)) = incoming_block {
189-
let best_tip = BlockWithHash::try_new(block)?;
190+
if let Err(error) = broadcast(dispatcher, global_state) {
191+
bug_condition!(
192+
"Failure when trying to broadcast incoming pubsub message: {error}"
193+
);
194+
};
195+
196+
if let Some((_, block)) = state.incoming_block.as_ref() {
197+
let best_tip = BlockWithHash::try_new(block.clone())?;
190198
dispatcher.push(P2pPeerAction::BestTipUpdate { peer_id, best_tip });
191199
}
192-
for (transaction, nonce) in incoming_transactions {
200+
for (transaction, nonce) in &state.incoming_transactions {
193201
dispatcher.push(P2pChannelsTransactionAction::Libp2pReceived {
194202
peer_id,
195-
transaction: Box::new(transaction),
196-
nonce,
203+
transaction: Box::new(transaction.clone()),
204+
nonce: *nonce,
197205
});
198206
}
199-
for (snark, nonce) in incoming_snarks {
207+
for (snark, nonce) in &state.incoming_snarks {
200208
dispatcher.push(P2pChannelsSnarkAction::Libp2pReceived {
201209
peer_id,
202-
snark: Box::new(snark),
203-
nonce,
210+
snark: Box::new(snark.clone()),
211+
nonce: *nonce,
204212
});
205213
}
206214
Ok(())
207215
}
216+
P2pNetworkPubsubAction::IncomingMessageCleanup { peer_id } => {
217+
pubsub_state.incoming_transactions.clear();
218+
pubsub_state.incoming_snarks.clear();
219+
220+
pubsub_state.incoming_transactions.shrink_to(0x20);
221+
pubsub_state.incoming_snarks.shrink_to(0x20);
222+
223+
pubsub_state.incoming_block = None;
224+
225+
let Some(client_state) = pubsub_state.clients.get_mut(&peer_id) else {
226+
bug_condition!(
227+
"State not found for action P2pNetworkPubsubAction::IncomingMessageCleanup"
228+
);
229+
return Ok(());
230+
};
231+
client_state.incoming_messages.clear();
232+
client_state.incoming_messages.shrink_to(0x20);
233+
234+
Ok(())
235+
}
208236
// we want to add peer to our mesh
209237
P2pNetworkPubsubAction::Graft { peer_id, topic_id } => {
210238
let Some(state) = pubsub_state
@@ -440,21 +468,6 @@ impl P2pNetworkPubsubState {
440468
message: Message,
441469
seen_limit: usize,
442470
) -> Result<(), String> {
443-
self.incoming_transactions.clear();
444-
self.incoming_snarks.clear();
445-
446-
self.incoming_transactions.shrink_to(0x20);
447-
self.incoming_snarks.shrink_to(0x20);
448-
449-
let Some(state) = self.clients.get_mut(&peer_id) else {
450-
bug_condition!("State not found for action P2pNetworkPubsubAction::IncomingMessage");
451-
return Ok(());
452-
};
453-
state.incoming_messages.clear();
454-
state.incoming_messages.shrink_to(0x20);
455-
456-
let message_id = self.mcache.put(message.clone());
457-
458471
let topic = self.topics.entry(message.topic.clone()).or_default();
459472

460473
if let Some(signature) = &message.signature {
@@ -470,27 +483,6 @@ impl P2pNetworkPubsubState {
470483
}
471484
}
472485

473-
self.clients
474-
.iter_mut()
475-
.filter(|(c, _)| {
476-
// don't send back to who sent this
477-
*c != &peer_id
478-
})
479-
.for_each(|(c, state)| {
480-
let Some(topic_state) = topic.get(c) else {
481-
return;
482-
};
483-
if topic_state.on_mesh() {
484-
state.publish(&message)
485-
} else {
486-
let ctr = state.message.control.get_or_insert_with(Default::default);
487-
ctr.ihave.push(pb::ControlIHave {
488-
topic_id: Some(message.topic.clone()),
489-
message_ids: message_id.clone().into_iter().collect(),
490-
})
491-
}
492-
});
493-
494486
if let Some(data) = &message.data {
495487
if data.len() > 8 {
496488
let mut slice = &data[8..];
@@ -517,6 +509,31 @@ impl P2pNetworkPubsubState {
517509
}
518510
}
519511

512+
let message_id = self.mcache.put(message.clone());
513+
514+
// TODO: this should only happen after the contents have been validated.
515+
// The only validation that has happened so far is that the message can be parsed.
516+
self.clients
517+
.iter_mut()
518+
.filter(|(c, _)| {
519+
// don't send back to who sent this
520+
*c != &peer_id
521+
})
522+
.for_each(|(c, state)| {
523+
let Some(topic_state) = topic.get(c) else {
524+
return;
525+
};
526+
if topic_state.on_mesh() {
527+
state.publish(&message)
528+
} else {
529+
let ctr = state.message.control.get_or_insert_with(Default::default);
530+
ctr.ihave.push(pb::ControlIHave {
531+
topic_id: Some(message.topic.clone()),
532+
message_ids: message_id.clone().into_iter().collect(),
533+
})
534+
}
535+
});
536+
520537
Ok(())
521538
}
522539

0 commit comments

Comments
 (0)