Skip to content

Commit a4eaf43

Browse files
committed
feat(gossipsub): implement gossipsub 1.3
1 parent e29dad6 commit a4eaf43

File tree

8 files changed

+215
-16
lines changed

8 files changed

+215
-16
lines changed

protocols/gossipsub/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
## 0.50.0
2+
3+
- Implement gossipsub 1.3 extensions control message.
4+
See [PR 6119](https://github.com/libp2p/rust-libp2p/pull/6119)
5+
26
- Remove peer penalty for duplicate messages.
37
See [PR 6112](https://github.com/libp2p/rust-libp2p/pull/6112)
48

protocols/gossipsub/src/behaviour.rs

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ use crate::{
6868
topic::{Hasher, Topic, TopicHash},
6969
transform::{DataTransform, IdentityTransform},
7070
types::{
71-
ControlAction, Graft, IDontWant, IHave, IWant, Message, MessageAcceptance, MessageId,
72-
PeerDetails, PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription,
71+
ControlAction, Extensions, Graft, IDontWant, IHave, IWant, Message, MessageAcceptance,
72+
MessageId, PeerDetails, PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription,
7373
SubscriptionAction,
7474
},
7575
FailedMessages, PublishError, SubscriptionError, TopicScoreParams, ValidationError,
@@ -1521,6 +1521,26 @@ where
15211521
tracing::debug!(peer=%peer_id, "Completed GRAFT handling for peer");
15221522
}
15231523

1524+
fn handle_extensions(&mut self, peer_id: &PeerId, extensions: Extensions) {
1525+
let Some(peer) = self.connected_peers.get_mut(peer_id) else {
1526+
tracing::error!(
1527+
peer=%peer_id,
1528+
"Extensions by unknown peer"
1529+
);
1530+
return;
1531+
};
1532+
1533+
if peer.extensions.is_some() {
1534+
tracing::debug!(
1535+
peer=%peer_id,
1536+
"Peer had already sent us extensions message"
1537+
);
1538+
return;
1539+
}
1540+
1541+
peer.extensions = Some(extensions);
1542+
}
1543+
15241544
/// Removes the specified peer from the mesh, returning true if it was present.
15251545
fn remove_peer_from_mesh(
15261546
&mut self,
@@ -2898,7 +2918,8 @@ where
28982918
RpcOut::Graft(_)
28992919
| RpcOut::Prune(_)
29002920
| RpcOut::Subscribe(_)
2901-
| RpcOut::Unsubscribe(_) => {
2921+
| RpcOut::Unsubscribe(_)
2922+
| RpcOut::Extensions(_) => {
29022923
unreachable!("Channel for highpriority control messages is unbounded and should always be open.")
29032924
}
29042925
}
@@ -3132,14 +3153,18 @@ where
31323153
sender: Sender::new(self.config.connection_handler_queue_len()),
31333154
topics: Default::default(),
31343155
dont_send: LinkedHashMap::new(),
3156+
extensions: None,
31353157
});
31363158
// Add the new connection
31373159
connected_peer.connections.push(connection_id);
3160+
let receiver = connected_peer.sender.new_receiver();
3161+
3162+
if connected_peer.connections.len() <= 1 {
3163+
// If this is the first connection send extensions message.
3164+
self.send_message(peer_id, RpcOut::Extensions(Extensions {}));
3165+
}
31383166

3139-
Ok(Handler::new(
3140-
self.config.protocol_config(),
3141-
connected_peer.sender.new_receiver(),
3142-
))
3167+
Ok(Handler::new(self.config.protocol_config(), receiver))
31433168
}
31443169

31453170
fn handle_established_outbound_connection(
@@ -3159,14 +3184,18 @@ where
31593184
sender: Sender::new(self.config.connection_handler_queue_len()),
31603185
topics: Default::default(),
31613186
dont_send: LinkedHashMap::new(),
3187+
extensions: None,
31623188
});
31633189
// Add the new connection
31643190
connected_peer.connections.push(connection_id);
3191+
let receiver = connected_peer.sender.new_receiver();
3192+
3193+
if connected_peer.connections.len() <= 1 {
3194+
// If this is the first connection send extensions message.
3195+
self.send_message(peer_id, RpcOut::Extensions(Extensions {}));
3196+
}
31653197

3166-
Ok(Handler::new(
3167-
self.config.protocol_config(),
3168-
connected_peer.sender.new_receiver(),
3169-
))
3198+
Ok(Handler::new(self.config.protocol_config(), receiver))
31703199
}
31713200

31723201
fn on_connection_handler_event(
@@ -3351,6 +3380,11 @@ where
33513380
}
33523381
}
33533382
}
3383+
ControlAction::Extensions(extensions) => {
3384+
if let Some(extensions) = extensions {
3385+
self.handle_extensions(&propagation_source, extensions);
3386+
}
3387+
}
33543388
}
33553389
}
33563390
if !ihave_msgs.is_empty() {

protocols/gossipsub/src/behaviour/tests.rs

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ use crate::{
3333
config::{ConfigBuilder, TopicMeshConfig},
3434
protocol::GossipsubCodec,
3535
rpc::Receiver,
36+
rpc_proto::proto,
3637
subscription_filter::WhitelistSubscriptionFilter,
37-
types::RpcIn,
38+
types::{ControlAction, Extensions, RpcIn, RpcOut},
3839
IdentTopic as Topic,
3940
};
4041

@@ -248,6 +249,7 @@ where
248249
topics: Default::default(),
249250
sender,
250251
dont_send: LinkedHashMap::new(),
252+
extensions: None,
251253
},
252254
);
253255

@@ -644,6 +646,7 @@ fn test_join() {
644646
topics: Default::default(),
645647
sender,
646648
dont_send: LinkedHashMap::new(),
649+
extensions: None,
647650
},
648651
);
649652
receivers.insert(random_peer, receiver);
@@ -1041,6 +1044,7 @@ fn test_get_random_peers() {
10411044
topics: topics.clone(),
10421045
sender: Sender::new(gs.config.connection_handler_queue_len()),
10431046
dont_send: LinkedHashMap::new(),
1047+
extensions: None,
10441048
},
10451049
);
10461050
}
@@ -5595,6 +5599,7 @@ fn test_all_queues_full() {
55955599
topics: topics.clone(),
55965600
sender: Sender::new(2),
55975601
dont_send: LinkedHashMap::new(),
5602+
extensions: None,
55985603
},
55995604
);
56005605

@@ -5631,6 +5636,7 @@ fn test_slow_peer_returns_failed_publish() {
56315636
topics: topics.clone(),
56325637
sender: Sender::new(2),
56335638
dont_send: LinkedHashMap::new(),
5639+
extensions: None,
56345640
},
56355641
);
56365642
let peer_id = PeerId::random();
@@ -5644,6 +5650,7 @@ fn test_slow_peer_returns_failed_publish() {
56445650
topics: topics.clone(),
56455651
sender: Sender::new(gs.config.connection_handler_queue_len()),
56465652
dont_send: LinkedHashMap::new(),
5653+
extensions: None,
56475654
},
56485655
);
56495656

@@ -5705,6 +5712,7 @@ fn test_slow_peer_returns_failed_ihave_handling() {
57055712
topics: topics.clone(),
57065713
sender: Sender::new(2),
57075714
dont_send: LinkedHashMap::new(),
5715+
extensions: None,
57085716
},
57095717
);
57105718
peers.push(slow_peer_id);
@@ -5722,6 +5730,7 @@ fn test_slow_peer_returns_failed_ihave_handling() {
57225730
topics: topics.clone(),
57235731
sender: Sender::new(gs.config.connection_handler_queue_len()),
57245732
dont_send: LinkedHashMap::new(),
5733+
extensions: None,
57255734
},
57265735
);
57275736

@@ -5819,6 +5828,7 @@ fn test_slow_peer_returns_failed_iwant_handling() {
58195828
topics: topics.clone(),
58205829
sender: Sender::new(2),
58215830
dont_send: LinkedHashMap::new(),
5831+
extensions: None,
58225832
},
58235833
);
58245834
peers.push(slow_peer_id);
@@ -5836,6 +5846,7 @@ fn test_slow_peer_returns_failed_iwant_handling() {
58365846
topics: topics.clone(),
58375847
sender: Sender::new(gs.config.connection_handler_queue_len()),
58385848
dont_send: LinkedHashMap::new(),
5849+
extensions: None,
58395850
},
58405851
);
58415852

@@ -5913,6 +5924,7 @@ fn test_slow_peer_returns_failed_forward() {
59135924
topics: topics.clone(),
59145925
sender: Sender::new(2),
59155926
dont_send: LinkedHashMap::new(),
5927+
extensions: None,
59165928
},
59175929
);
59185930
peers.push(slow_peer_id);
@@ -5930,6 +5942,7 @@ fn test_slow_peer_returns_failed_forward() {
59305942
topics: topics.clone(),
59315943
sender: Sender::new(gs.config.connection_handler_queue_len()),
59325944
dont_send: LinkedHashMap::new(),
5945+
extensions: None,
59335946
},
59345947
);
59355948

@@ -6012,6 +6025,7 @@ fn test_slow_peer_is_downscored_on_publish() {
60126025
topics: topics.clone(),
60136026
sender: Sender::new(2),
60146027
dont_send: LinkedHashMap::new(),
6028+
extensions: None,
60156029
},
60166030
);
60176031
gs.as_peer_score_mut().add_peer(slow_peer_id);
@@ -6026,6 +6040,7 @@ fn test_slow_peer_is_downscored_on_publish() {
60266040
topics: topics.clone(),
60276041
sender: Sender::new(gs.config.connection_handler_queue_len()),
60286042
dont_send: LinkedHashMap::new(),
6043+
extensions: None,
60296044
},
60306045
);
60316046

@@ -6787,3 +6802,76 @@ fn test_validation_message_size_within_topic_specific() {
67876802
_ => panic!("Unexpected event"),
67886803
}
67896804
}
6805+
6806+
#[test]
6807+
fn test_extensions_message_creation() {
6808+
let extensions_rpc = RpcOut::Extensions(Extensions {});
6809+
let proto_rpc: proto::RPC = extensions_rpc.into();
6810+
6811+
assert!(proto_rpc.control.is_some());
6812+
let control = proto_rpc.control.unwrap();
6813+
assert!(control.extensions.is_some());
6814+
assert!(control.ihave.is_empty());
6815+
assert!(control.iwant.is_empty());
6816+
assert!(control.graft.is_empty());
6817+
assert!(control.prune.is_empty());
6818+
assert!(control.idontwant.is_empty());
6819+
}
6820+
6821+
#[test]
6822+
fn test_handle_extensions_message() {
6823+
let mut gs: Behaviour = Behaviour::new(
6824+
MessageAuthenticity::Anonymous,
6825+
ConfigBuilder::default()
6826+
.validation_mode(ValidationMode::None)
6827+
.build()
6828+
.unwrap(),
6829+
)
6830+
.unwrap();
6831+
6832+
let peer_id = PeerId::random();
6833+
let sender = Sender::new(gs.config.connection_handler_queue_len());
6834+
6835+
// Add peer without extensions
6836+
gs.connected_peers.insert(
6837+
peer_id,
6838+
PeerDetails {
6839+
kind: PeerKind::Gossipsubv1_3,
6840+
connections: vec![ConnectionId::new_unchecked(0)],
6841+
outbound: false,
6842+
topics: BTreeSet::new(),
6843+
sender,
6844+
dont_send: LinkedHashMap::new(),
6845+
extensions: None,
6846+
},
6847+
);
6848+
6849+
// Simulate receiving extensions message
6850+
let extensions = Extensions {};
6851+
gs.handle_extensions(&peer_id, extensions);
6852+
6853+
// Verify extensions were stored
6854+
let peer_details = gs.connected_peers.get(&peer_id).unwrap();
6855+
assert!(peer_details.extensions.is_some());
6856+
6857+
// Simulate receiving duplicate extensions message from another peer
6858+
// TODO: when more extensions are added, we should test that they are not overridden.
6859+
let duplicate_rpc = RpcIn {
6860+
messages: vec![],
6861+
subscriptions: vec![],
6862+
control_msgs: vec![ControlAction::Extensions(None)],
6863+
};
6864+
6865+
gs.on_connection_handler_event(
6866+
peer_id,
6867+
ConnectionId::new_unchecked(0),
6868+
HandlerEvent::Message {
6869+
rpc: duplicate_rpc,
6870+
invalid_messages: vec![],
6871+
},
6872+
);
6873+
6874+
// Extensions should still be present (not cleared or changed)
6875+
let peer_details = gs.connected_peers.get(&peer_id).unwrap();
6876+
assert!(peer_details.extensions.is_some());
6877+
}

protocols/gossipsub/src/generated/gossipsub/pb.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ pub struct ControlMessage {
155155
pub graft: Vec<gossipsub::pb::ControlGraft>,
156156
pub prune: Vec<gossipsub::pb::ControlPrune>,
157157
pub idontwant: Vec<gossipsub::pb::ControlIDontWant>,
158+
pub extensions: Option<gossipsub::pb::ControlExtensions>,
158159
}
159160

160161
impl<'a> MessageRead<'a> for ControlMessage {
@@ -167,6 +168,7 @@ impl<'a> MessageRead<'a> for ControlMessage {
167168
Ok(26) => msg.graft.push(r.read_message::<gossipsub::pb::ControlGraft>(bytes)?),
168169
Ok(34) => msg.prune.push(r.read_message::<gossipsub::pb::ControlPrune>(bytes)?),
169170
Ok(42) => msg.idontwant.push(r.read_message::<gossipsub::pb::ControlIDontWant>(bytes)?),
171+
Ok(50) => msg.extensions = Some(r.read_message::<gossipsub::pb::ControlExtensions>(bytes)?),
170172
Ok(t) => { r.read_unknown(bytes, t)?; }
171173
Err(e) => return Err(e),
172174
}
@@ -183,6 +185,7 @@ impl MessageWrite for ControlMessage {
183185
+ self.graft.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::<usize>()
184186
+ self.prune.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::<usize>()
185187
+ self.idontwant.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::<usize>()
188+
+ self.extensions.as_ref().map_or(0, |m| 1 + sizeof_len((m).get_size()))
186189
}
187190

188191
fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
@@ -191,6 +194,7 @@ impl MessageWrite for ControlMessage {
191194
for s in &self.graft { w.write_with_tag(26, |w| w.write_message(s))?; }
192195
for s in &self.prune { w.write_with_tag(34, |w| w.write_message(s))?; }
193196
for s in &self.idontwant { w.write_with_tag(42, |w| w.write_message(s))?; }
197+
if let Some(ref s) = self.extensions { w.write_with_tag(50, |w| w.write_message(s))?; }
194198
Ok(())
195199
}
196200
}
@@ -367,6 +371,19 @@ impl MessageWrite for ControlIDontWant {
367371
}
368372
}
369373

374+
#[allow(clippy::derive_partial_eq_without_eq)]
375+
#[derive(Debug, Default, PartialEq, Clone)]
376+
pub struct ControlExtensions { }
377+
378+
impl<'a> MessageRead<'a> for ControlExtensions {
379+
fn from_reader(r: &mut BytesReader, _: &[u8]) -> Result<Self> {
380+
r.read_to_end();
381+
Ok(Self::default())
382+
}
383+
}
384+
385+
impl MessageWrite for ControlExtensions { }
386+
370387
#[allow(clippy::derive_partial_eq_without_eq)]
371388
#[derive(Debug, Default, PartialEq, Clone)]
372389
pub struct PeerInfo {

protocols/gossipsub/src/generated/rpc.proto

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@ message RPC {
1212
}
1313

1414
optional ControlMessage control = 3;
15+
16+
// Canonical Extensions should register their messages here.
17+
18+
// Experimental Extensions should register their messages here. They
19+
// must use field numbers larger than 0x200000 to be encoded with at least 4
20+
// bytes
1521
}
1622

1723
message Message {
@@ -29,6 +35,7 @@ message ControlMessage {
2935
repeated ControlGraft graft = 3;
3036
repeated ControlPrune prune = 4;
3137
repeated ControlIDontWant idontwant = 5;
38+
optional ControlExtensions extensions = 6;
3239
}
3340

3441
message ControlIHave {
@@ -54,6 +61,11 @@ message ControlIDontWant {
5461
repeated bytes message_ids = 1;
5562
}
5663

64+
message ControlExtensions {
65+
// Initially empty. Future extensions will be added here along with a
66+
// reference to their specification.
67+
}
68+
5769
message PeerInfo {
5870
optional bytes peer_id = 1;
5971
optional bytes signed_peer_record = 2;

0 commit comments

Comments
 (0)