Skip to content

Commit 414ce3e

Browse files
committed
Added limit for IWANT requests
1 parent 26354df commit 414ce3e

File tree

2 files changed

+82
-13
lines changed

2 files changed

+82
-13
lines changed

p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{collections::btree_map::Entry, sync::Arc};
33
use binprot::BinProtRead;
44
use mina_p2p_messages::{gossip, v2};
55
use openmina_core::{block::BlockWithHash, bug_condition, fuzz_maybe, fuzzed_maybe, Substate};
6-
use redux::Dispatcher;
6+
use redux::{Dispatcher, Timestamp};
77

88
use crate::{
99
channels::{snark::P2pChannelsSnarkAction, transaction::P2pChannelsTransactionAction},
@@ -28,7 +28,7 @@ impl P2pNetworkPubsubState {
2828
Action: crate::P2pActionTrait<State>,
2929
{
3030
let pubsub_state = state_context.get_substate_mut()?;
31-
let (action, _meta) = action.split();
31+
let (action, meta) = action.split();
3232

3333
match action {
3434
P2pNetworkPubsubAction::NewStream {
@@ -125,7 +125,7 @@ impl P2pNetworkPubsubState {
125125
seen_limit,
126126
..
127127
} => {
128-
pubsub_state.reduce_incoming_data(&peer_id, data)?;
128+
pubsub_state.reduce_incoming_data(&peer_id, data, meta.time())?;
129129

130130
let dispatcher: &mut Dispatcher<Action, State> = state_context.into_dispatcher();
131131
dispatcher.push(P2pNetworkPubsubEffectfulAction::IncomingData {
@@ -447,7 +447,12 @@ impl P2pNetworkPubsubState {
447447
Ok(())
448448
}
449449

450-
fn reduce_incoming_data(&mut self, peer_id: &PeerId, data: Data) -> Result<(), String> {
450+
fn reduce_incoming_data(
451+
&mut self,
452+
peer_id: &PeerId,
453+
data: Data,
454+
timestamp: Timestamp,
455+
) -> Result<(), String> {
451456
let Some(state) = self.clients.get_mut(peer_id) else {
452457
// TODO: investigate, cannot reproduce this
453458
// bug_condition!("State not found for action: P2pNetworkPubsubAction::IncomingData");
@@ -529,14 +534,19 @@ impl P2pNetworkPubsubState {
529534
}
530535
}
531536

532-
for ihave in &control.ihave {
533-
let message_ids = ihave
534-
.message_ids
535-
.iter()
536-
.filter(|msg_id| !self.mcache.map.contains_key(*msg_id))
537-
.cloned()
538-
.collect();
539-
if let Some(client) = self.clients.get_mut(peer_id) {
537+
for ihave in control.ihave {
538+
if self.clients.contains_key(peer_id) {
539+
let message_ids = ihave
540+
.message_ids
541+
.into_iter()
542+
.filter(|message_id| self.filter_iwant_message_ids(message_id, timestamp))
543+
.collect::<Vec<_>>();
544+
545+
let Some(client) = self.clients.get_mut(peer_id) else {
546+
bug_condition!("State not found for {}", peer_id);
547+
return Ok(());
548+
};
549+
540550
let ctr = client.message.control.get_or_insert_with(Default::default);
541551
ctr.iwant.push(pb::ControlIWant { message_ids })
542552
}

p2p/src/network/pubsub/p2p_network_pubsub_state.rs

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
use super::pb;
22
use crate::{token::BroadcastAlgorithm, ConnectionAddr, PeerId, StreamId};
33

4-
use std::collections::{BTreeMap, VecDeque};
4+
use std::{
5+
collections::{BTreeMap, VecDeque},
6+
time::Duration,
7+
};
58

69
use mina_p2p_messages::v2;
710
use openmina_core::{snark::Snark, transaction::Transaction};
11+
use redux::Timestamp;
812
use serde::{Deserialize, Serialize};
913

14+
pub const IWANT_TIMEOUT_DURATION: Duration = Duration::from_secs(5);
15+
1016
#[derive(Default, Serialize, Deserialize, Debug, Clone)]
1117
pub struct P2pNetworkPubsubState {
1218
pub clients: BTreeMap<PeerId, P2pNetworkPubsubClientState>,
@@ -18,12 +24,65 @@ pub struct P2pNetworkPubsubState {
1824
pub incoming_transactions: Vec<(Transaction, u32)>,
1925
pub incoming_snarks: Vec<(Snark, u32)>,
2026
pub topics: BTreeMap<String, BTreeMap<PeerId, P2pNetworkPubsubClientTopicState>>,
27+
pub iwant: VecDeque<P2pNetworkPubsubIwantRequestCount>,
28+
}
29+
30+
#[derive(Default, Serialize, Deserialize, Debug, Clone)]
31+
pub struct P2pNetworkPubsubIwantRequestCount {
32+
pub message_id: Vec<u8>,
33+
pub count: Vec<Timestamp>,
2134
}
2235

2336
impl P2pNetworkPubsubState {
2437
pub fn prune_peer_state(&mut self, peer_id: &PeerId) {
2538
self.clients.remove(peer_id);
2639
}
40+
41+
pub fn filter_iwant_message_ids(&mut self, message_id: &Vec<u8>, timestamp: Timestamp) -> bool {
42+
if self.mcache.map.contains_key(message_id) {
43+
return false;
44+
}
45+
46+
let message_count = self
47+
.iwant
48+
.iter_mut()
49+
.find(|message| &message.message_id == message_id);
50+
51+
match message_count {
52+
Some(message) => {
53+
let message_counts = std::mem::take(&mut message.count);
54+
55+
message.count = message_counts
56+
.into_iter()
57+
.filter(|time| {
58+
timestamp
59+
.checked_sub(*time)
60+
.map_or(false, |duration| duration < IWANT_TIMEOUT_DURATION)
61+
})
62+
.collect();
63+
64+
if message.count.len() < 3 {
65+
message.count.push(timestamp);
66+
return true;
67+
}
68+
69+
false
70+
}
71+
None => {
72+
let message_count = P2pNetworkPubsubIwantRequestCount {
73+
message_id: message_id.to_owned(),
74+
count: vec![timestamp],
75+
};
76+
77+
self.iwant.push_back(message_count);
78+
if self.iwant.len() > 10 {
79+
self.iwant.pop_front();
80+
}
81+
82+
true
83+
}
84+
}
85+
}
2786
}
2887

2988
#[derive(Serialize, Deserialize, Debug, Clone)]

0 commit comments

Comments
 (0)