Skip to content

Commit 54c4d47

Browse files
committed
feat(p2p/meshsub): prune
1 parent bdb070d commit 54c4d47

File tree

6 files changed

+91
-34
lines changed

6 files changed

+91
-34
lines changed

node/src/action_kind.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,7 @@ pub enum ActionKind {
310310
P2pNetworkPubsubNewStream,
311311
P2pNetworkPubsubOutgoingData,
312312
P2pNetworkPubsubOutgoingMessage,
313+
P2pNetworkPubsubPrune,
313314
P2pNetworkPubsubSign,
314315
P2pNetworkRpcHeartbeatSend,
315316
P2pNetworkRpcIncomingData,
@@ -535,7 +536,7 @@ pub enum ActionKind {
535536
}
536537

537538
impl ActionKind {
538-
pub const COUNT: u16 = 443;
539+
pub const COUNT: u16 = 444;
539540
}
540541

541542
impl std::fmt::Display for ActionKind {
@@ -1458,6 +1459,7 @@ impl ActionKindGet for P2pNetworkPubsubAction {
14581459
Self::NewStream { .. } => ActionKind::P2pNetworkPubsubNewStream,
14591460
Self::IncomingData { .. } => ActionKind::P2pNetworkPubsubIncomingData,
14601461
Self::Graft { .. } => ActionKind::P2pNetworkPubsubGraft,
1462+
Self::Prune { .. } => ActionKind::P2pNetworkPubsubPrune,
14611463
Self::Broadcast { .. } => ActionKind::P2pNetworkPubsubBroadcast,
14621464
Self::Sign { .. } => ActionKind::P2pNetworkPubsubSign,
14631465
Self::BroadcastSigned { .. } => ActionKind::P2pNetworkPubsubBroadcastSigned,

p2p/src/network/pubsub/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ pub use self::p2p_network_pubsub_actions::P2pNetworkPubsubAction;
77

88
mod p2p_network_pubsub_state;
99
pub use self::p2p_network_pubsub_state::{
10-
P2pNetworkPubsubClientMeshState, P2pNetworkPubsubClientState, P2pNetworkPubsubState,
10+
P2pNetworkPubsubClientState, P2pNetworkPubsubClientTopicState, P2pNetworkPubsubState,
1111
};
1212

1313
#[cfg(feature = "p2p-libp2p")]

p2p/src/network/pubsub/p2p_network_pubsub_actions.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ pub enum P2pNetworkPubsubAction {
2424
peer_id: PeerId,
2525
topic_id: String,
2626
},
27+
Prune {
28+
peer_id: PeerId,
29+
topic_id: String,
30+
},
2731
Broadcast {
2832
message: Box<GossipNetMessageV2>,
2933
},

p2p/src/network/pubsub/p2p_network_pubsub_effects.rs

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,12 @@ impl P2pNetworkPubsubAction {
3939
};
4040
Some(P2pNetworkPubsubAction::OutgoingMessage { msg, peer_id })
4141
};
42-
let graft = if state.clients.len() < config.meshsub.outbound_degree_desired {
42+
let Some(map) = state.topics.get(TOPIC) else {
43+
// must have this topic already
44+
return;
45+
};
46+
let mesh_size = map.values().filter(|s| s.on_mesh()).count();
47+
let graft = if mesh_size < config.meshsub.outbound_degree_desired {
4348
Some(P2pNetworkPubsubAction::Graft {
4449
peer_id,
4550
topic_id: TOPIC.to_owned(),
@@ -68,6 +73,27 @@ impl P2pNetworkPubsubAction {
6873

6974
store.dispatch(P2pNetworkPubsubAction::OutgoingMessage { msg, peer_id });
7075
}
76+
P2pNetworkPubsubAction::Prune { peer_id, topic_id } => {
77+
let msg = pb::Rpc {
78+
subscriptions: vec![],
79+
publish: vec![],
80+
control: Some(pb::ControlMessage {
81+
ihave: vec![],
82+
iwant: vec![],
83+
graft: vec![],
84+
prune: vec![pb::ControlPrune {
85+
topic_id: Some(dbg!(topic_id.clone())),
86+
peers: vec![pb::PeerInfo {
87+
peer_id: None,
88+
signed_peer_record: None,
89+
}],
90+
backoff: None,
91+
}],
92+
}),
93+
};
94+
95+
store.dispatch(P2pNetworkPubsubAction::OutgoingMessage { msg, peer_id });
96+
}
7197
P2pNetworkPubsubAction::Broadcast { message } => {
7298
let mut buffer = vec![0; 8];
7399
binprot::BinProtWrite::binprot_write(&message, &mut buffer).expect("msg");
@@ -95,15 +121,25 @@ impl P2pNetworkPubsubAction {
95121
let incoming_transactions = state.incoming_transactions.clone();
96122
let incoming_snarks = state.incoming_snarks.clone();
97123
let topics = state.topics.clone();
98-
let could_accept = state.clients.len() < config.meshsub.outbound_degree_high;
99124

125+
let mut prune_at_topics = vec![];
100126
for (topic_id, map) in topics {
101-
if let Some(mesh_state) = map.get(&peer_id) {
102-
let _ = (could_accept, topic_id, mesh_state);
103-
// TODO: prune
127+
let mesh_size = map.values().filter(|s| s.on_mesh()).count();
128+
let could_accept = mesh_size < config.meshsub.outbound_degree_high;
129+
130+
if !could_accept {
131+
if let Some(topic_state) = map.get(&peer_id) {
132+
if topic_state.on_mesh() {
133+
prune_at_topics.push(topic_id);
134+
}
135+
}
104136
}
105137
}
106138

139+
for topic_id in prune_at_topics {
140+
store.dispatch(Self::Prune { peer_id, topic_id });
141+
}
142+
107143
broadcast(store);
108144
if let Some((_, block)) = incoming_block {
109145
let best_tip = BlockWithHash::new(Arc::new(block));

p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
use std::collections::btree_map::Entry;
2+
13
use binprot::BinProtRead;
24
use mina_p2p_messages::{gossip, v2};
35

46
use super::{
57
p2p_network_pubsub_state::P2pNetworkPubsubClientMeshAddingState, pb, P2pNetworkPubsubAction,
6-
P2pNetworkPubsubClientMeshState, P2pNetworkPubsubClientState, P2pNetworkPubsubState,
8+
P2pNetworkPubsubClientState, P2pNetworkPubsubClientTopicState, P2pNetworkPubsubState,
79
};
810

911
impl P2pNetworkPubsubState {
@@ -16,20 +18,23 @@ impl P2pNetworkPubsubState {
1618
protocol,
1719
..
1820
} => {
19-
let state =
20-
self.clients
21-
.entry(*peer_id)
22-
.or_insert_with(|| P2pNetworkPubsubClientState {
23-
protocol: *protocol,
24-
addr: *addr,
25-
outgoing_stream_id: None,
26-
message: pb::Rpc {
27-
subscriptions: vec![],
28-
publish: vec![],
29-
control: None,
30-
},
31-
buffer: vec![],
32-
});
21+
let entry = self.clients.entry(*peer_id);
22+
// preserve it
23+
let outgoing_stream_id = match &entry {
24+
Entry::Occupied(v) => v.get().outgoing_stream_id,
25+
Entry::Vacant(_) => None,
26+
};
27+
let state = entry.or_insert_with(|| P2pNetworkPubsubClientState {
28+
protocol: *protocol,
29+
addr: *addr,
30+
outgoing_stream_id,
31+
message: pb::Rpc {
32+
subscriptions: vec![],
33+
publish: vec![],
34+
control: None,
35+
},
36+
buffer: vec![],
37+
});
3338
state.protocol = *protocol;
3439
state.addr = *addr;
3540
}
@@ -83,10 +88,9 @@ impl P2pNetworkPubsubState {
8388
let topic = self.topics.entry(topic_id).or_default();
8489

8590
if subscription.subscribe() {
86-
topic.insert(
87-
peer_id.clone(),
88-
P2pNetworkPubsubClientMeshState::default(),
89-
);
91+
if let Entry::Vacant(v) = topic.entry(peer_id.clone()) {
92+
v.insert(P2pNetworkPubsubClientTopicState::default());
93+
}
9094
} else {
9195
topic.remove(&peer_id);
9296
}
@@ -111,9 +115,10 @@ impl P2pNetworkPubsubState {
111115
.filter(|(c, _)| {
112116
// don't send back to who sent this
113117
**c != *peer_id
114-
&& topic
115-
.get(c)
116-
.map_or(false, P2pNetworkPubsubClientMeshState::on_mesh)
118+
&& topic.get(c).map_or(
119+
false,
120+
P2pNetworkPubsubClientTopicState::on_mesh,
121+
)
117122
})
118123
.for_each(|(_, state)| state.message.publish.push(message.clone()));
119124

@@ -155,7 +160,6 @@ impl P2pNetworkPubsubState {
155160
.and_then(|m| m.get_mut(&peer_id))
156161
{
157162
mesh_state.mesh = P2pNetworkPubsubClientMeshAddingState::Added;
158-
// TODO: prune if above the limit
159163
}
160164
}
161165
for prune in &control.prune {
@@ -192,6 +196,17 @@ impl P2pNetworkPubsubState {
192196

193197
state.mesh = P2pNetworkPubsubClientMeshAddingState::Added;
194198
}
199+
P2pNetworkPubsubAction::Prune { peer_id, topic_id } => {
200+
let Some(state) = self
201+
.topics
202+
.get_mut(topic_id)
203+
.and_then(|m| m.get_mut(peer_id))
204+
else {
205+
return;
206+
};
207+
208+
state.mesh = P2pNetworkPubsubClientMeshAddingState::WeRefused;
209+
}
195210
P2pNetworkPubsubAction::OutgoingMessage { peer_id, .. } => {
196211
if let Some(v) = self.clients.get_mut(peer_id) {
197212
v.message.subscriptions.clear();
@@ -227,7 +242,7 @@ impl P2pNetworkPubsubState {
227242
.filter(|(c, _)| {
228243
topic
229244
.get(c)
230-
.map_or(false, P2pNetworkPubsubClientMeshState::on_mesh)
245+
.map_or(false, P2pNetworkPubsubClientTopicState::on_mesh)
231246
})
232247
.for_each(|(_, state)| state.message.publish.push(message.clone()));
233248
}

p2p/src/network/pubsub/p2p_network_pubsub_state.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ pub struct P2pNetworkPubsubState {
1414
pub incoming_block: Option<(PeerId, v2::MinaBlockBlockStableV2)>,
1515
pub incoming_transactions: Vec<(Transaction, u32)>,
1616
pub incoming_snarks: Vec<(Snark, u32)>,
17-
pub topics: BTreeMap<String, BTreeMap<PeerId, P2pNetworkPubsubClientMeshState>>,
17+
pub topics: BTreeMap<String, BTreeMap<PeerId, P2pNetworkPubsubClientTopicState>>,
1818
}
1919

2020
impl P2pNetworkPubsubState {
@@ -33,7 +33,7 @@ pub struct P2pNetworkPubsubClientState {
3333
}
3434

3535
#[derive(Default, Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
36-
pub struct P2pNetworkPubsubClientMeshState {
36+
pub struct P2pNetworkPubsubClientTopicState {
3737
pub mesh: P2pNetworkPubsubClientMeshAddingState,
3838
}
3939

@@ -46,7 +46,7 @@ pub enum P2pNetworkPubsubClientMeshAddingState {
4646
Added,
4747
}
4848

49-
impl P2pNetworkPubsubClientMeshState {
49+
impl P2pNetworkPubsubClientTopicState {
5050
pub fn on_mesh(&self) -> bool {
5151
matches!(&self.mesh, P2pNetworkPubsubClientMeshAddingState::Added)
5252
}

0 commit comments

Comments
 (0)