Skip to content

Commit 1d93f0b

Browse files
committed
feat(p2p/meshsub): handle ihave and iwant, impl message cache
1 parent 94d9bc4 commit 1d93f0b

File tree

2 files changed

+69
-23
lines changed

2 files changed

+69
-23
lines changed

p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ impl P2pNetworkPubsubState {
9696
}
9797
}
9898
for message in v.publish {
99+
let message_id = self.mcache.put(message.clone());
99100
let topic = self.topics.entry(message.topic.clone()).or_default();
100101
if let Some(signature) = &message.signature {
101102
// skip recently seen message
@@ -129,7 +130,7 @@ impl P2pNetworkPubsubState {
129130
.get_or_insert_with(Default::default);
130131
ctr.ihave.push(pb::ControlIHave {
131132
topic_id: Some(message.topic.clone()),
132-
message_ids: vec![compute_message_id(&message)],
133+
message_ids: vec![message_id.clone()],
133134
})
134135
}
135136
});
@@ -184,7 +185,28 @@ impl P2pNetworkPubsubState {
184185
P2pNetworkPubsubClientMeshAddingState::TheyRefused;
185186
}
186187
}
187-
// TODO: handle iwant, ihave
188+
for iwant in &control.iwant {
189+
for msg_id in &iwant.message_ids {
190+
if let Some(msg) = self.mcache.map.get(msg_id) {
191+
if let Some(client) = self.clients.get_mut(peer_id) {
192+
client.message.publish.push(msg.clone());
193+
}
194+
}
195+
}
196+
}
197+
for ihave in &control.ihave {
198+
let message_ids = ihave
199+
.message_ids
200+
.iter()
201+
.filter(|msg_id| !self.mcache.map.contains_key(*msg_id))
202+
.cloned()
203+
.collect();
204+
if let Some(client) = self.clients.get_mut(peer_id) {
205+
let ctr =
206+
client.message.control.get_or_insert_with(Default::default);
207+
ctr.iwant.push(pb::ControlIWant { message_ids })
208+
}
209+
}
188210
}
189211
}
190212
Err(err) => {
@@ -257,24 +279,3 @@ impl P2pNetworkPubsubState {
257279
}
258280
}
259281
}
260-
261-
// TODO: what if wasm32?
262-
// How to test it?
263-
fn compute_message_id(message: &pb::Message) -> Vec<u8> {
264-
let source_bytes = message
265-
.from
266-
.as_ref()
267-
.map(AsRef::as_ref)
268-
.unwrap_or(&[0, 1, 0][..]);
269-
let mut source_string = libp2p_identity::PeerId::from_bytes(source_bytes)
270-
.expect("Valid peer id")
271-
.to_base58();
272-
let sequence_number = message
273-
.seqno
274-
.as_ref()
275-
.and_then(|b| <[u8; 8]>::try_from(b.as_slice()).ok())
276-
.map(u64::from_be_bytes)
277-
.unwrap_or_default();
278-
source_string.push_str(&sequence_number.to_string());
279-
source_string.into_bytes()
280-
}

p2p/src/network/pubsub/p2p_network_pubsub_state.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub struct P2pNetworkPubsubState {
1111
pub seq: u64,
1212
pub to_sign: VecDeque<pb::Message>,
1313
pub seen: VecDeque<Vec<u8>>,
14+
pub mcache: P2pNetworkPubsubMessageCache,
1415
pub incoming_block: Option<(PeerId, v2::MinaBlockBlockStableV2)>,
1516
pub incoming_transactions: Vec<(Transaction, u32)>,
1617
pub incoming_snarks: Vec<(Snark, u32)>,
@@ -32,6 +33,50 @@ pub struct P2pNetworkPubsubClientState {
3233
pub buffer: Vec<u8>,
3334
}
3435

36+
// TODO: store blocks, snarks and txs separately
37+
#[derive(Default, Serialize, Deserialize, Debug, Clone)]
38+
pub struct P2pNetworkPubsubMessageCache {
39+
pub map: BTreeMap<Vec<u8>, pb::Message>,
40+
pub queue: VecDeque<Vec<u8>>,
41+
}
42+
43+
impl P2pNetworkPubsubMessageCache {
44+
const CAPACITY: usize = 100;
45+
46+
pub fn put(&mut self, message: pb::Message) -> Vec<u8> {
47+
let id = compute_message_id(&message);
48+
self.map.insert(id.clone(), message);
49+
self.queue.push_back(id.clone());
50+
if self.queue.len() > Self::CAPACITY {
51+
if let Some(id) = self.queue.pop_back() {
52+
self.map.remove(&id);
53+
}
54+
}
55+
id
56+
}
57+
}
58+
59+
// TODO: what if wasm32?
60+
// How to test it?
61+
pub fn compute_message_id(message: &pb::Message) -> Vec<u8> {
62+
let source_bytes = message
63+
.from
64+
.as_ref()
65+
.map(AsRef::as_ref)
66+
.unwrap_or(&[0, 1, 0][..]);
67+
let mut source_string = libp2p_identity::PeerId::from_bytes(source_bytes)
68+
.expect("Valid peer id")
69+
.to_base58();
70+
let sequence_number = message
71+
.seqno
72+
.as_ref()
73+
.and_then(|b| <[u8; 8]>::try_from(b.as_slice()).ok())
74+
.map(u64::from_be_bytes)
75+
.unwrap_or_default();
76+
source_string.push_str(&sequence_number.to_string());
77+
source_string.into_bytes()
78+
}
79+
3580
#[derive(Default, Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3681
pub struct P2pNetworkPubsubClientTopicState {
3782
pub mesh: P2pNetworkPubsubClientMeshAddingState,

0 commit comments

Comments
 (0)