Skip to content

Commit 8f22167

Browse files
committed
Added limit for IWANT requests
1 parent 26354df commit 8f22167

File tree

2 files changed

+83
-13
lines changed

2 files changed

+83
-13
lines changed

p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{collections::btree_map::Entry, sync::Arc};
33
use binprot::BinProtRead;
44
use mina_p2p_messages::{gossip, v2};
55
use openmina_core::{block::BlockWithHash, bug_condition, fuzz_maybe, fuzzed_maybe, Substate};
6-
use redux::Dispatcher;
6+
use redux::{Dispatcher, Timestamp};
77

88
use crate::{
99
channels::{snark::P2pChannelsSnarkAction, transaction::P2pChannelsTransactionAction},
@@ -28,7 +28,7 @@ impl P2pNetworkPubsubState {
2828
Action: crate::P2pActionTrait<State>,
2929
{
3030
let pubsub_state = state_context.get_substate_mut()?;
31-
let (action, _meta) = action.split();
31+
let (action, meta) = action.split();
3232

3333
match action {
3434
P2pNetworkPubsubAction::NewStream {
@@ -125,7 +125,7 @@ impl P2pNetworkPubsubState {
125125
seen_limit,
126126
..
127127
} => {
128-
pubsub_state.reduce_incoming_data(&peer_id, data)?;
128+
pubsub_state.reduce_incoming_data(&peer_id, data, meta.time())?;
129129

130130
let dispatcher: &mut Dispatcher<Action, State> = state_context.into_dispatcher();
131131
dispatcher.push(P2pNetworkPubsubEffectfulAction::IncomingData {
@@ -447,7 +447,12 @@ impl P2pNetworkPubsubState {
447447
Ok(())
448448
}
449449

450-
fn reduce_incoming_data(&mut self, peer_id: &PeerId, data: Data) -> Result<(), String> {
450+
fn reduce_incoming_data(
451+
&mut self,
452+
peer_id: &PeerId,
453+
data: Data,
454+
timestamp: Timestamp,
455+
) -> Result<(), String> {
451456
let Some(state) = self.clients.get_mut(peer_id) else {
452457
// TODO: investigate, cannot reproduce this
453458
// bug_condition!("State not found for action: P2pNetworkPubsubAction::IncomingData");
@@ -529,14 +534,24 @@ impl P2pNetworkPubsubState {
529534
}
530535
}
531536

532-
for ihave in &control.ihave {
533-
let message_ids = ihave
534-
.message_ids
535-
.iter()
536-
.filter(|msg_id| !self.mcache.map.contains_key(*msg_id))
537-
.cloned()
538-
.collect();
539-
if let Some(client) = self.clients.get_mut(peer_id) {
537+
for ihave in control.ihave {
538+
if self.clients.contains_key(peer_id) {
539+
let message_ids = ihave
540+
.message_ids
541+
.into_iter()
542+
.filter(|msg_id| !self.mcache.map.contains_key(msg_id))
543+
.collect::<Vec<_>>();
544+
545+
let message_ids = message_ids
546+
.into_iter()
547+
.filter(|message_id| self.filter_iwant_message_ids(message_id, timestamp))
548+
.collect::<Vec<_>>();
549+
550+
let Some(client) = self.clients.get_mut(peer_id) else {
551+
bug_condition!("State not found for {}", peer_id);
552+
return Ok(());
553+
};
554+
540555
let ctr = client.message.control.get_or_insert_with(Default::default);
541556
ctr.iwant.push(pb::ControlIWant { message_ids })
542557
}

p2p/src/network/pubsub/p2p_network_pubsub_state.rs

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
use super::pb;
22
use crate::{token::BroadcastAlgorithm, ConnectionAddr, PeerId, StreamId};
33

4-
use std::collections::{BTreeMap, VecDeque};
4+
use std::{
5+
collections::{BTreeMap, VecDeque},
6+
time::Duration,
7+
};
58

69
use mina_p2p_messages::v2;
710
use openmina_core::{snark::Snark, transaction::Transaction};
11+
use redux::Timestamp;
812
use serde::{Deserialize, Serialize};
913

14+
pub const IWANT_TIMEOUT_DURATION: Duration = Duration::from_secs(5);
15+
1016
#[derive(Default, Serialize, Deserialize, Debug, Clone)]
1117
pub struct P2pNetworkPubsubState {
1218
pub clients: BTreeMap<PeerId, P2pNetworkPubsubClientState>,
@@ -18,12 +24,61 @@ pub struct P2pNetworkPubsubState {
1824
pub incoming_transactions: Vec<(Transaction, u32)>,
1925
pub incoming_snarks: Vec<(Snark, u32)>,
2026
pub topics: BTreeMap<String, BTreeMap<PeerId, P2pNetworkPubsubClientTopicState>>,
27+
pub iwant: VecDeque<P2pNetworkPubsubIwantRequestCount>,
28+
}
29+
30+
#[derive(Default, Serialize, Deserialize, Debug, Clone)]
31+
pub struct P2pNetworkPubsubIwantRequestCount {
32+
pub message_id: Vec<u8>,
33+
pub count: Vec<Timestamp>,
2134
}
2235

2336
impl P2pNetworkPubsubState {
2437
pub fn prune_peer_state(&mut self, peer_id: &PeerId) {
2538
self.clients.remove(peer_id);
2639
}
40+
41+
pub fn filter_iwant_message_ids(&mut self, message_id: &Vec<u8>, timestamp: Timestamp) -> bool {
42+
let message_count = self
43+
.iwant
44+
.iter_mut()
45+
.find(|message| &message.message_id == message_id);
46+
47+
match message_count {
48+
Some(message) => {
49+
let message_counts = std::mem::take(&mut message.count);
50+
51+
message.count = message_counts
52+
.into_iter()
53+
.filter(|time| {
54+
timestamp
55+
.checked_sub(*time)
56+
.map_or(false, |duration| duration < IWANT_TIMEOUT_DURATION)
57+
})
58+
.collect();
59+
60+
if message.count.len() < 3 {
61+
message.count.push(timestamp);
62+
return true;
63+
}
64+
65+
false
66+
}
67+
None => {
68+
let message_count = P2pNetworkPubsubIwantRequestCount {
69+
message_id: message_id.to_owned(),
70+
count: vec![timestamp],
71+
};
72+
73+
self.iwant.push_back(message_count);
74+
if self.iwant.len() > 10 {
75+
self.iwant.pop_front();
76+
}
77+
78+
true
79+
}
80+
}
81+
}
2782
}
2883

2984
#[derive(Serialize, Deserialize, Debug, Clone)]

0 commit comments

Comments
 (0)