Skip to content

Commit 4a10beb

Browse files
committed
implement test extesion
1 parent a4eaf43 commit 4a10beb

File tree

7 files changed

+134
-24
lines changed

7 files changed

+134
-24
lines changed

protocols/gossipsub/src/behaviour.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1539,6 +1539,12 @@ where
15391539
}
15401540

15411541
peer.extensions = Some(extensions);
1542+
1543+
if extensions.test_extension.unwrap_or(false)
1544+
&& matches!(peer.kind, PeerKind::Gossipsubv1_3)
1545+
{
1546+
self.send_message(*peer_id, RpcOut::TestExtension);
1547+
}
15421548
}
15431549

15441550
/// Removes the specified peer from the mesh, returning true if it was present.
@@ -2919,7 +2925,8 @@ where
29192925
| RpcOut::Prune(_)
29202926
| RpcOut::Subscribe(_)
29212927
| RpcOut::Unsubscribe(_)
2922-
| RpcOut::Extensions(_) => {
2928+
| RpcOut::Extensions(_)
2929+
| RpcOut::TestExtension => {
29232930
unreachable!("Channel for highpriority control messages is unbounded and should always be open.")
29242931
}
29252932
}
@@ -3161,7 +3168,12 @@ where
31613168

31623169
if connected_peer.connections.len() <= 1 {
31633170
// If this is the first connection send extensions message.
3164-
self.send_message(peer_id, RpcOut::Extensions(Extensions {}));
3171+
self.send_message(
3172+
peer_id,
3173+
RpcOut::Extensions(Extensions {
3174+
test_extension: Some(true),
3175+
}),
3176+
);
31653177
}
31663178

31673179
Ok(Handler::new(self.config.protocol_config(), receiver))
@@ -3192,7 +3204,12 @@ where
31923204

31933205
if connected_peer.connections.len() <= 1 {
31943206
// If this is the first connection send extensions message.
3195-
self.send_message(peer_id, RpcOut::Extensions(Extensions {}));
3207+
self.send_message(
3208+
peer_id,
3209+
RpcOut::Extensions(Extensions {
3210+
test_extension: Some(true),
3211+
}),
3212+
);
31963213
}
31973214

31983215
Ok(Handler::new(self.config.protocol_config(), receiver))
@@ -3396,6 +3413,10 @@ where
33963413
if !prune_msgs.is_empty() {
33973414
self.handle_prune(&propagation_source, prune_msgs);
33983415
}
3416+
3417+
if let Some(_extension) = rpc.test_extension {
3418+
tracing::debug!("Received Test Extension");
3419+
}
33993420
}
34003421
}
34013422
}

protocols/gossipsub/src/behaviour/tests.rs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,7 @@ fn proto_to_message(rpc: &proto::RPC) -> RpcIn {
420420
})
421421
.collect(),
422422
control_msgs,
423+
test_extension: None,
423424
}
424425
}
425426

@@ -1253,6 +1254,7 @@ fn test_handle_iwant_msg_but_already_sent_idontwant() {
12531254
control_msgs: vec![ControlAction::IDontWant(IDontWant {
12541255
message_ids: vec![msg_id.clone()],
12551256
})],
1257+
test_extension: None,
12561258
};
12571259
gs.on_connection_handler_event(
12581260
peers[1],
@@ -3145,6 +3147,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {
31453147
messages: vec![raw_message1],
31463148
subscriptions: vec![subscription.clone()],
31473149
control_msgs: vec![control_action],
3150+
test_extension: None,
31483151
},
31493152
invalid_messages: Vec::new(),
31503153
},
@@ -3171,6 +3174,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {
31713174
messages: vec![raw_message3],
31723175
subscriptions: vec![subscription],
31733176
control_msgs: vec![control_action],
3177+
test_extension: None,
31743178
},
31753179
invalid_messages: Vec::new(),
31763180
},
@@ -3781,6 +3785,7 @@ fn test_scoring_p4_invalid_signature() {
37813785
messages: vec![],
37823786
subscriptions: vec![],
37833787
control_msgs: vec![],
3788+
test_extension: None,
37843789
},
37853790
invalid_messages: vec![(m, ValidationError::InvalidSignature)],
37863791
},
@@ -5540,6 +5545,7 @@ fn parses_idontwant() {
55405545
control_msgs: vec![ControlAction::IDontWant(IDontWant {
55415546
message_ids: vec![message_id.clone()],
55425547
})],
5548+
test_extension: None,
55435549
};
55445550
gs.on_connection_handler_event(
55455551
peers[1],
@@ -6638,6 +6644,7 @@ fn test_validation_error_message_size_too_large_topic_specific() {
66386644
messages: vec![raw_message],
66396645
subscriptions: vec![],
66406646
control_msgs: vec![],
6647+
test_extension: None,
66416648
},
66426649
invalid_messages: vec![],
66436650
},
@@ -6682,6 +6689,7 @@ fn test_validation_error_message_size_too_large_topic_specific() {
66826689
}],
66836690
subscriptions: vec![],
66846691
control: None,
6692+
testExtension: None,
66856693
};
66866694
codec.encode(rpc, &mut buf).unwrap();
66876695

@@ -6742,6 +6750,7 @@ fn test_validation_message_size_within_topic_specific() {
67426750
messages: vec![raw_message],
67436751
subscriptions: vec![],
67446752
control_msgs: vec![],
6753+
test_extension: None,
67456754
},
67466755
invalid_messages: vec![],
67476756
},
@@ -6786,6 +6795,7 @@ fn test_validation_message_size_within_topic_specific() {
67866795
}],
67876796
subscriptions: vec![],
67886797
control: None,
6798+
testExtension: None,
67896799
};
67906800
codec.encode(rpc, &mut buf).unwrap();
67916801

@@ -6805,12 +6815,16 @@ fn test_validation_message_size_within_topic_specific() {
68056815

68066816
#[test]
68076817
fn test_extensions_message_creation() {
6808-
let extensions_rpc = RpcOut::Extensions(Extensions {});
6818+
let extensions_rpc = RpcOut::Extensions(Extensions {
6819+
test_extension: Some(true),
6820+
});
68096821
let proto_rpc: proto::RPC = extensions_rpc.into();
68106822

68116823
assert!(proto_rpc.control.is_some());
68126824
let control = proto_rpc.control.unwrap();
68136825
assert!(control.extensions.is_some());
6826+
let test_extension = control.extensions.unwrap().testExtension.unwrap();
6827+
assert!(test_extension);
68146828
assert!(control.ihave.is_empty());
68156829
assert!(control.iwant.is_empty());
68166830
assert!(control.graft.is_empty());
@@ -6847,19 +6861,23 @@ fn test_handle_extensions_message() {
68476861
);
68486862

68496863
// Simulate receiving extensions message
6850-
let extensions = Extensions {};
6864+
let extensions = Extensions {
6865+
test_extension: Some(false),
6866+
};
68516867
gs.handle_extensions(&peer_id, extensions);
68526868

68536869
// Verify extensions were stored
68546870
let peer_details = gs.connected_peers.get(&peer_id).unwrap();
68556871
assert!(peer_details.extensions.is_some());
68566872

68576873
// Simulate receiving duplicate extensions message from another peer
6858-
// TODO: when more extensions are added, we should test that they are not overridden.
68596874
let duplicate_rpc = RpcIn {
68606875
messages: vec![],
68616876
subscriptions: vec![],
6862-
control_msgs: vec![ControlAction::Extensions(None)],
6877+
control_msgs: vec![ControlAction::Extensions(Some(Extensions {
6878+
test_extension: Some(true),
6879+
}))],
6880+
test_extension: None,
68636881
};
68646882

68656883
gs.on_connection_handler_event(
@@ -6873,5 +6891,6 @@ fn test_handle_extensions_message() {
68736891

68746892
// Extensions should still be present (not cleared or changed)
68756893
let peer_details = gs.connected_peers.get(&peer_id).unwrap();
6876-
assert!(peer_details.extensions.is_some());
6894+
let test_extension = peer_details.extensions.unwrap().test_extension.unwrap();
6895+
assert!(!test_extension);
68776896
}

protocols/gossipsub/src/generated/gossipsub/pb.rs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub struct RPC {
1919
pub subscriptions: Vec<gossipsub::pb::mod_RPC::SubOpts>,
2020
pub publish: Vec<gossipsub::pb::Message>,
2121
pub control: Option<gossipsub::pb::ControlMessage>,
22+
pub testExtension: Option<gossipsub::pb::TestExtension>,
2223
}
2324

2425
impl<'a> MessageRead<'a> for RPC {
@@ -29,6 +30,7 @@ impl<'a> MessageRead<'a> for RPC {
2930
Ok(10) => msg.subscriptions.push(r.read_message::<gossipsub::pb::mod_RPC::SubOpts>(bytes)?),
3031
Ok(18) => msg.publish.push(r.read_message::<gossipsub::pb::Message>(bytes)?),
3132
Ok(26) => msg.control = Some(r.read_message::<gossipsub::pb::ControlMessage>(bytes)?),
33+
Ok(51939474) => msg.testExtension = Some(r.read_message::<gossipsub::pb::TestExtension>(bytes)?),
3234
Ok(t) => { r.read_unknown(bytes, t)?; }
3335
Err(e) => return Err(e),
3436
}
@@ -43,12 +45,14 @@ impl MessageWrite for RPC {
4345
+ self.subscriptions.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::<usize>()
4446
+ self.publish.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::<usize>()
4547
+ self.control.as_ref().map_or(0, |m| 1 + sizeof_len((m).get_size()))
48+
+ self.testExtension.as_ref().map_or(0, |m| 4 + sizeof_len((m).get_size()))
4649
}
4750

4851
fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
4952
for s in &self.subscriptions { w.write_with_tag(10, |w| w.write_message(s))?; }
5053
for s in &self.publish { w.write_with_tag(18, |w| w.write_message(s))?; }
5154
if let Some(ref s) = self.control { w.write_with_tag(26, |w| w.write_message(s))?; }
55+
if let Some(ref s) = self.testExtension { w.write_with_tag(51939474, |w| w.write_message(s))?; }
5256
Ok(())
5357
}
5458
}
@@ -373,16 +377,48 @@ impl MessageWrite for ControlIDontWant {
373377

374378
#[allow(clippy::derive_partial_eq_without_eq)]
375379
#[derive(Debug, Default, PartialEq, Clone)]
376-
pub struct ControlExtensions { }
380+
pub struct ControlExtensions {
381+
pub testExtension: Option<bool>,
382+
}
377383

378384
impl<'a> MessageRead<'a> for ControlExtensions {
385+
fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result<Self> {
386+
let mut msg = Self::default();
387+
while !r.is_eof() {
388+
match r.next_tag(bytes) {
389+
Ok(51939472) => msg.testExtension = Some(r.read_bool(bytes)?),
390+
Ok(t) => { r.read_unknown(bytes, t)?; }
391+
Err(e) => return Err(e),
392+
}
393+
}
394+
Ok(msg)
395+
}
396+
}
397+
398+
impl MessageWrite for ControlExtensions {
399+
fn get_size(&self) -> usize {
400+
0
401+
+ self.testExtension.as_ref().map_or(0, |m| 4 + sizeof_varint(*(m) as u64))
402+
}
403+
404+
fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
405+
if let Some(ref s) = self.testExtension { w.write_with_tag(51939472, |w| w.write_bool(*s))?; }
406+
Ok(())
407+
}
408+
}
409+
410+
#[allow(clippy::derive_partial_eq_without_eq)]
411+
#[derive(Debug, Default, PartialEq, Clone)]
412+
pub struct TestExtension { }
413+
414+
impl<'a> MessageRead<'a> for TestExtension {
379415
fn from_reader(r: &mut BytesReader, _: &[u8]) -> Result<Self> {
380416
r.read_to_end();
381417
Ok(Self::default())
382418
}
383419
}
384420

385-
impl MessageWrite for ControlExtensions { }
421+
impl MessageWrite for TestExtension { }
386422

387423
#[allow(clippy::derive_partial_eq_without_eq)]
388424
#[derive(Debug, Default, PartialEq, Clone)]

protocols/gossipsub/src/generated/rpc.proto

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ message RPC {
1212
}
1313

1414
optional ControlMessage control = 3;
15-
1615
// Canonical Extensions should register their messages here.
1716

1817
// Experimental Extensions should register their messages here. They
1918
// must use field numbers larger than 0x200000 to be encoded with at least 4
2019
// bytes
20+
optional TestExtension testExtension = 6492434;
2121
}
2222

2323
message Message {
@@ -64,8 +64,14 @@ message ControlIDontWant {
6464
message ControlExtensions {
6565
// Initially empty. Future extensions will be added here along with a
6666
// reference to their specification.
67+
68+
// Experimental extensions must use field numbers larger than 0x200000 to be
69+
// encoded with at least 4 bytes
70+
optional bool testExtension = 6492434;
6771
}
6872

73+
message TestExtension {}
74+
6975
message PeerInfo {
7076
optional bytes peer_id = 1;
7177
optional bytes signed_peer_record = 2;

protocols/gossipsub/src/protocol.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::{
3636
topic::TopicHash,
3737
types::{
3838
ControlAction, Extensions, Graft, IDontWant, IHave, IWant, MessageId, PeerInfo, PeerKind,
39-
Prune, RawMessage, RpcIn, Subscription, SubscriptionAction,
39+
Prune, RawMessage, RpcIn, Subscription, SubscriptionAction, TestExtension,
4040
},
4141
ValidationError,
4242
};
@@ -562,7 +562,9 @@ impl Decoder for GossipsubCodec {
562562
})
563563
.collect();
564564

565-
let extension_msg = rpc_control.extensions.map(|_extension| Extensions {});
565+
let extension_msg = rpc_control.extensions.map(|extensions| Extensions {
566+
test_extension: extensions.testExtension,
567+
});
566568

567569
control_msgs.extend(ihave_msgs);
568570
control_msgs.extend(iwant_msgs);
@@ -588,6 +590,7 @@ impl Decoder for GossipsubCodec {
588590
})
589591
.collect(),
590592
control_msgs,
593+
test_extension: rpc.testExtension.map(|_extension| TestExtension {}),
591594
},
592595
invalid_messages,
593596
}))

protocols/gossipsub/src/rpc.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,11 @@ impl Sender {
9090
| RpcOut::Extensions(_)
9191
| RpcOut::Subscribe(_)
9292
| RpcOut::Unsubscribe(_) => &self.priority_sender,
93-
RpcOut::Forward { .. } | RpcOut::IHave(_) | RpcOut::IWant(_) | RpcOut::IDontWant(_) => {
94-
&self.non_priority_sender
95-
}
93+
RpcOut::Forward { .. }
94+
| RpcOut::IHave(_)
95+
| RpcOut::IWant(_)
96+
| RpcOut::IDontWant(_)
97+
| RpcOut::TestExtension => &self.non_priority_sender,
9698
};
9799
sender.try_send(rpc).map_err(|err| err.into_inner())
98100
}

0 commit comments

Comments
 (0)