From a4eaf4356387d55bf8beff72d49fbef20040a46f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Fri, 4 Jul 2025 22:20:30 +0100 Subject: [PATCH 1/3] feat(gossipsub): implement gossipsub 1.3 --- protocols/gossipsub/CHANGELOG.md | 4 + protocols/gossipsub/src/behaviour.rs | 56 +++++++++--- protocols/gossipsub/src/behaviour/tests.rs | 90 ++++++++++++++++++- .../gossipsub/src/generated/gossipsub/pb.rs | 17 ++++ protocols/gossipsub/src/generated/rpc.proto | 12 +++ protocols/gossipsub/src/protocol.rs | 13 ++- protocols/gossipsub/src/rpc.rs | 1 + protocols/gossipsub/src/types.rs | 38 +++++++- 8 files changed, 215 insertions(+), 16 deletions(-) diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index f83c554d333..e775b5cc1d1 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,4 +1,8 @@ ## 0.50.0 + +- Implement gossipsub 1.3 extensions control message. + See [PR 6119](https://github.com/libp2p/rust-libp2p/pull/6119) + - Remove peer penalty for duplicate messages. See [PR 6112](https://github.com/libp2p/rust-libp2p/pull/6112) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 9d7d6356211..6e88f9de5d3 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -68,8 +68,8 @@ use crate::{ topic::{Hasher, Topic, TopicHash}, transform::{DataTransform, IdentityTransform}, types::{ - ControlAction, Graft, IDontWant, IHave, IWant, Message, MessageAcceptance, MessageId, - PeerDetails, PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription, + ControlAction, Extensions, Graft, IDontWant, IHave, IWant, Message, MessageAcceptance, + MessageId, PeerDetails, PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription, SubscriptionAction, }, FailedMessages, PublishError, SubscriptionError, TopicScoreParams, ValidationError, @@ -1521,6 +1521,26 @@ where tracing::debug!(peer=%peer_id, "Completed GRAFT handling for peer"); } + fn handle_extensions(&mut self, peer_id: &PeerId, extensions: Extensions) { + let Some(peer) = self.connected_peers.get_mut(peer_id) else { + tracing::error!( + peer=%peer_id, + "Extensions by unknown peer" + ); + return; + }; + + if peer.extensions.is_some() { + tracing::debug!( + peer=%peer_id, + "Peer had already sent us extensions message" + ); + return; + } + + peer.extensions = Some(extensions); + } + /// Removes the specified peer from the mesh, returning true if it was present. fn remove_peer_from_mesh( &mut self, @@ -2898,7 +2918,8 @@ where RpcOut::Graft(_) | RpcOut::Prune(_) | RpcOut::Subscribe(_) - | RpcOut::Unsubscribe(_) => { + | RpcOut::Unsubscribe(_) + | RpcOut::Extensions(_) => { unreachable!("Channel for highpriority control messages is unbounded and should always be open.") } } @@ -3132,14 +3153,18 @@ where sender: Sender::new(self.config.connection_handler_queue_len()), topics: Default::default(), dont_send: LinkedHashMap::new(), + extensions: None, }); // Add the new connection connected_peer.connections.push(connection_id); + let receiver = connected_peer.sender.new_receiver(); + + if connected_peer.connections.len() <= 1 { + // If this is the first connection send extensions message. + self.send_message(peer_id, RpcOut::Extensions(Extensions {})); + } - Ok(Handler::new( - self.config.protocol_config(), - connected_peer.sender.new_receiver(), - )) + Ok(Handler::new(self.config.protocol_config(), receiver)) } fn handle_established_outbound_connection( @@ -3159,14 +3184,18 @@ where sender: Sender::new(self.config.connection_handler_queue_len()), topics: Default::default(), dont_send: LinkedHashMap::new(), + extensions: None, }); // Add the new connection connected_peer.connections.push(connection_id); + let receiver = connected_peer.sender.new_receiver(); + + if connected_peer.connections.len() <= 1 { + // If this is the first connection send extensions message. + self.send_message(peer_id, RpcOut::Extensions(Extensions {})); + } - Ok(Handler::new( - self.config.protocol_config(), - connected_peer.sender.new_receiver(), - )) + Ok(Handler::new(self.config.protocol_config(), receiver)) } fn on_connection_handler_event( @@ -3351,6 +3380,11 @@ where } } } + ControlAction::Extensions(extensions) => { + if let Some(extensions) = extensions { + self.handle_extensions(&propagation_source, extensions); + } + } } } if !ihave_msgs.is_empty() { diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 85aead47911..607da408ab3 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -33,8 +33,9 @@ use crate::{ config::{ConfigBuilder, TopicMeshConfig}, protocol::GossipsubCodec, rpc::Receiver, + rpc_proto::proto, subscription_filter::WhitelistSubscriptionFilter, - types::RpcIn, + types::{ControlAction, Extensions, RpcIn, RpcOut}, IdentTopic as Topic, }; @@ -248,6 +249,7 @@ where topics: Default::default(), sender, dont_send: LinkedHashMap::new(), + extensions: None, }, ); @@ -644,6 +646,7 @@ fn test_join() { topics: Default::default(), sender, dont_send: LinkedHashMap::new(), + extensions: None, }, ); receivers.insert(random_peer, receiver); @@ -1041,6 +1044,7 @@ fn test_get_random_peers() { topics: topics.clone(), sender: Sender::new(gs.config.connection_handler_queue_len()), dont_send: LinkedHashMap::new(), + extensions: None, }, ); } @@ -5595,6 +5599,7 @@ fn test_all_queues_full() { topics: topics.clone(), sender: Sender::new(2), dont_send: LinkedHashMap::new(), + extensions: None, }, ); @@ -5631,6 +5636,7 @@ fn test_slow_peer_returns_failed_publish() { topics: topics.clone(), sender: Sender::new(2), dont_send: LinkedHashMap::new(), + extensions: None, }, ); let peer_id = PeerId::random(); @@ -5644,6 +5650,7 @@ fn test_slow_peer_returns_failed_publish() { topics: topics.clone(), sender: Sender::new(gs.config.connection_handler_queue_len()), dont_send: LinkedHashMap::new(), + extensions: None, }, ); @@ -5705,6 +5712,7 @@ fn test_slow_peer_returns_failed_ihave_handling() { topics: topics.clone(), sender: Sender::new(2), dont_send: LinkedHashMap::new(), + extensions: None, }, ); peers.push(slow_peer_id); @@ -5722,6 +5730,7 @@ fn test_slow_peer_returns_failed_ihave_handling() { topics: topics.clone(), sender: Sender::new(gs.config.connection_handler_queue_len()), dont_send: LinkedHashMap::new(), + extensions: None, }, ); @@ -5819,6 +5828,7 @@ fn test_slow_peer_returns_failed_iwant_handling() { topics: topics.clone(), sender: Sender::new(2), dont_send: LinkedHashMap::new(), + extensions: None, }, ); peers.push(slow_peer_id); @@ -5836,6 +5846,7 @@ fn test_slow_peer_returns_failed_iwant_handling() { topics: topics.clone(), sender: Sender::new(gs.config.connection_handler_queue_len()), dont_send: LinkedHashMap::new(), + extensions: None, }, ); @@ -5913,6 +5924,7 @@ fn test_slow_peer_returns_failed_forward() { topics: topics.clone(), sender: Sender::new(2), dont_send: LinkedHashMap::new(), + extensions: None, }, ); peers.push(slow_peer_id); @@ -5930,6 +5942,7 @@ fn test_slow_peer_returns_failed_forward() { topics: topics.clone(), sender: Sender::new(gs.config.connection_handler_queue_len()), dont_send: LinkedHashMap::new(), + extensions: None, }, ); @@ -6012,6 +6025,7 @@ fn test_slow_peer_is_downscored_on_publish() { topics: topics.clone(), sender: Sender::new(2), dont_send: LinkedHashMap::new(), + extensions: None, }, ); gs.as_peer_score_mut().add_peer(slow_peer_id); @@ -6026,6 +6040,7 @@ fn test_slow_peer_is_downscored_on_publish() { topics: topics.clone(), sender: Sender::new(gs.config.connection_handler_queue_len()), dont_send: LinkedHashMap::new(), + extensions: None, }, ); @@ -6787,3 +6802,76 @@ fn test_validation_message_size_within_topic_specific() { _ => panic!("Unexpected event"), } } + +#[test] +fn test_extensions_message_creation() { + let extensions_rpc = RpcOut::Extensions(Extensions {}); + let proto_rpc: proto::RPC = extensions_rpc.into(); + + assert!(proto_rpc.control.is_some()); + let control = proto_rpc.control.unwrap(); + assert!(control.extensions.is_some()); + assert!(control.ihave.is_empty()); + assert!(control.iwant.is_empty()); + assert!(control.graft.is_empty()); + assert!(control.prune.is_empty()); + assert!(control.idontwant.is_empty()); +} + +#[test] +fn test_handle_extensions_message() { + let mut gs: Behaviour = Behaviour::new( + MessageAuthenticity::Anonymous, + ConfigBuilder::default() + .validation_mode(ValidationMode::None) + .build() + .unwrap(), + ) + .unwrap(); + + let peer_id = PeerId::random(); + let sender = Sender::new(gs.config.connection_handler_queue_len()); + + // Add peer without extensions + gs.connected_peers.insert( + peer_id, + PeerDetails { + kind: PeerKind::Gossipsubv1_3, + connections: vec![ConnectionId::new_unchecked(0)], + outbound: false, + topics: BTreeSet::new(), + sender, + dont_send: LinkedHashMap::new(), + extensions: None, + }, + ); + + // Simulate receiving extensions message + let extensions = Extensions {}; + gs.handle_extensions(&peer_id, extensions); + + // Verify extensions were stored + let peer_details = gs.connected_peers.get(&peer_id).unwrap(); + assert!(peer_details.extensions.is_some()); + + // Simulate receiving duplicate extensions message from another peer + // TODO: when more extensions are added, we should test that they are not overridden. + let duplicate_rpc = RpcIn { + messages: vec![], + subscriptions: vec![], + control_msgs: vec![ControlAction::Extensions(None)], + }; + + gs.on_connection_handler_event( + peer_id, + ConnectionId::new_unchecked(0), + HandlerEvent::Message { + rpc: duplicate_rpc, + invalid_messages: vec![], + }, + ); + + // Extensions should still be present (not cleared or changed) + let peer_details = gs.connected_peers.get(&peer_id).unwrap(); + assert!(peer_details.extensions.is_some()); +} diff --git a/protocols/gossipsub/src/generated/gossipsub/pb.rs b/protocols/gossipsub/src/generated/gossipsub/pb.rs index 24ac80d2755..9a3ddb2e2fb 100644 --- a/protocols/gossipsub/src/generated/gossipsub/pb.rs +++ b/protocols/gossipsub/src/generated/gossipsub/pb.rs @@ -155,6 +155,7 @@ pub struct ControlMessage { pub graft: Vec, pub prune: Vec, pub idontwant: Vec, + pub extensions: Option, } impl<'a> MessageRead<'a> for ControlMessage { @@ -167,6 +168,7 @@ impl<'a> MessageRead<'a> for ControlMessage { Ok(26) => msg.graft.push(r.read_message::(bytes)?), Ok(34) => msg.prune.push(r.read_message::(bytes)?), Ok(42) => msg.idontwant.push(r.read_message::(bytes)?), + Ok(50) => msg.extensions = Some(r.read_message::(bytes)?), Ok(t) => { r.read_unknown(bytes, t)?; } Err(e) => return Err(e), } @@ -183,6 +185,7 @@ impl MessageWrite for ControlMessage { + self.graft.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + self.prune.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + self.idontwant.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + + self.extensions.as_ref().map_or(0, |m| 1 + sizeof_len((m).get_size())) } fn write_message(&self, w: &mut Writer) -> Result<()> { @@ -191,6 +194,7 @@ impl MessageWrite for ControlMessage { for s in &self.graft { w.write_with_tag(26, |w| w.write_message(s))?; } for s in &self.prune { w.write_with_tag(34, |w| w.write_message(s))?; } for s in &self.idontwant { w.write_with_tag(42, |w| w.write_message(s))?; } + if let Some(ref s) = self.extensions { w.write_with_tag(50, |w| w.write_message(s))?; } Ok(()) } } @@ -367,6 +371,19 @@ impl MessageWrite for ControlIDontWant { } } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct ControlExtensions { } + +impl<'a> MessageRead<'a> for ControlExtensions { + fn from_reader(r: &mut BytesReader, _: &[u8]) -> Result { + r.read_to_end(); + Ok(Self::default()) + } +} + +impl MessageWrite for ControlExtensions { } + #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Debug, Default, PartialEq, Clone)] pub struct PeerInfo { diff --git a/protocols/gossipsub/src/generated/rpc.proto b/protocols/gossipsub/src/generated/rpc.proto index fe4d3bc9366..4f50bc77aaa 100644 --- a/protocols/gossipsub/src/generated/rpc.proto +++ b/protocols/gossipsub/src/generated/rpc.proto @@ -12,6 +12,12 @@ message RPC { } optional ControlMessage control = 3; + + // Canonical Extensions should register their messages here. + + // Experimental Extensions should register their messages here. They + // must use field numbers larger than 0x200000 to be encoded with at least 4 + // bytes } message Message { @@ -29,6 +35,7 @@ message ControlMessage { repeated ControlGraft graft = 3; repeated ControlPrune prune = 4; repeated ControlIDontWant idontwant = 5; + optional ControlExtensions extensions = 6; } message ControlIHave { @@ -54,6 +61,11 @@ message ControlIDontWant { repeated bytes message_ids = 1; } +message ControlExtensions { + // Initially empty. Future extensions will be added here along with a + // reference to their specification. +} + message PeerInfo { optional bytes peer_id = 1; optional bytes signed_peer_record = 2; diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index 821c11d2132..dff5c4ffe75 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -35,14 +35,19 @@ use crate::{ rpc_proto::proto, topic::TopicHash, types::{ - ControlAction, Graft, IDontWant, IHave, IWant, MessageId, PeerInfo, PeerKind, Prune, - RawMessage, RpcIn, Subscription, SubscriptionAction, + ControlAction, Extensions, Graft, IDontWant, IHave, IWant, MessageId, PeerInfo, PeerKind, + Prune, RawMessage, RpcIn, Subscription, SubscriptionAction, }, ValidationError, }; pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:"; +pub(crate) const GOSSIPSUB_1_3_0_PROTOCOL: ProtocolId = ProtocolId { + protocol: StreamProtocol::new("/meshsub/1.3.0"), + kind: PeerKind::Gossipsubv1_2, +}; + pub(crate) const GOSSIPSUB_1_2_0_PROTOCOL: ProtocolId = ProtocolId { protocol: StreamProtocol::new("/meshsub/1.2.0"), kind: PeerKind::Gossipsubv1_2, @@ -79,6 +84,7 @@ impl Default for ProtocolConfig { Self { validation_mode: ValidationMode::Strict, protocol_ids: vec![ + GOSSIPSUB_1_3_0_PROTOCOL, GOSSIPSUB_1_2_0_PROTOCOL, GOSSIPSUB_1_1_0_PROTOCOL, GOSSIPSUB_1_0_0_PROTOCOL, @@ -556,11 +562,14 @@ impl Decoder for GossipsubCodec { }) .collect(); + let extension_msg = rpc_control.extensions.map(|_extension| Extensions {}); + control_msgs.extend(ihave_msgs); control_msgs.extend(iwant_msgs); control_msgs.extend(graft_msgs); control_msgs.extend(prune_msgs); control_msgs.extend(idontwant_msgs); + control_msgs.push(ControlAction::Extensions(extension_msg)); } Ok(Some(HandlerEvent::Message { diff --git a/protocols/gossipsub/src/rpc.rs b/protocols/gossipsub/src/rpc.rs index 943df31f599..ca4b25dfd23 100644 --- a/protocols/gossipsub/src/rpc.rs +++ b/protocols/gossipsub/src/rpc.rs @@ -87,6 +87,7 @@ impl Sender { RpcOut::Publish { .. } | RpcOut::Graft(_) | RpcOut::Prune(_) + | RpcOut::Extensions(_) | RpcOut::Subscribe(_) | RpcOut::Unsubscribe(_) => &self.priority_sender, RpcOut::Forward { .. } | RpcOut::IHave(_) | RpcOut::IWant(_) | RpcOut::IDontWant(_) => { diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 8f8a4f38a88..30edfe28c57 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -30,7 +30,11 @@ use quick_protobuf::MessageWrite; use serde::{Deserialize, Serialize}; use web_time::Instant; -use crate::{rpc::Sender, rpc_proto::proto, TopicHash}; +use crate::{ + rpc::Sender, + rpc_proto::proto::{self}, + TopicHash, +}; /// Messages that have expired while attempting to be sent to a peer. #[derive(Clone, Debug, Default)] @@ -105,6 +109,8 @@ impl std::fmt::Debug for MessageId { pub(crate) struct PeerDetails { /// The kind of protocol the peer supports. pub(crate) kind: PeerKind, + /// The Extensions supported by the peer if any. + pub(crate) extensions: Option, /// If the peer is an outbound connection. pub(crate) outbound: bool, /// Its current connections. @@ -124,6 +130,8 @@ pub(crate) struct PeerDetails { derive(prometheus_client::encoding::EncodeLabelValue) )] pub enum PeerKind { + /// A gossipsub 1.3 peer. + Gossipsubv1_3, /// A gossipsub 1.2 peer. Gossipsubv1_2, /// A gossipsub 1.1 peer. @@ -271,6 +279,8 @@ pub enum ControlAction { /// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant /// control message. IDontWant(IDontWant), + /// The Node has sent us its supported extensions. + Extensions(Option), } /// Node broadcasts known messages per topic - IHave control message. @@ -314,10 +324,14 @@ pub struct IDontWant { pub(crate) message_ids: Vec, } +/// The node has sent us the supported Gossipsub Extensions. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct Extensions {} + /// A Gossipsub RPC message sent. #[derive(Debug)] pub enum RpcOut { - /// Publish a Gossipsub message on network.`timeout` limits the duration the message + /// PublishV a Gossipsub message on network.`timeout` limits the duration the message /// can wait to be sent before it is abandoned. Publish { message: RawMessage, timeout: Delay }, /// Forward a Gossipsub message on network. `timeout` limits the duration the message @@ -338,6 +352,8 @@ pub enum RpcOut { /// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant /// control message. IDontWant(IDontWant), + /// Send a Extensions control message. + Extensions(Extensions), } impl RpcOut { @@ -399,6 +415,7 @@ impl From for proto::RPC { graft: vec![], prune: vec![], idontwant: vec![], + extensions: None, }), }, RpcOut::IWant(IWant { message_ids }) => proto::RPC { @@ -412,6 +429,7 @@ impl From for proto::RPC { graft: vec![], prune: vec![], idontwant: vec![], + extensions: None, }), }, RpcOut::Graft(Graft { topic_hash }) => proto::RPC { @@ -425,6 +443,7 @@ impl From for proto::RPC { }], prune: vec![], idontwant: vec![], + extensions: None, }), }, RpcOut::Prune(Prune { @@ -452,6 +471,7 @@ impl From for proto::RPC { backoff, }], idontwant: vec![], + extensions: None, }), } } @@ -466,6 +486,19 @@ impl From for proto::RPC { idontwant: vec![proto::ControlIDontWant { message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), }], + extensions: None, + }), + }, + RpcOut::Extensions(Extensions {}) => proto::RPC { + publish: Vec::new(), + subscriptions: Vec::new(), + control: Some(proto::ControlMessage { + ihave: vec![], + iwant: vec![], + graft: vec![], + prune: vec![], + idontwant: vec![], + extensions: Some(proto::ControlExtensions {}), }), }, } @@ -507,6 +540,7 @@ impl PeerKind { Self::Gossipsub => "Gossipsub v1.0", Self::Gossipsubv1_1 => "Gossipsub v1.1", Self::Gossipsubv1_2 => "Gossipsub v1.2", + Self::Gossipsubv1_3 => "Gossipsub v1.3", } } } From 4a10beb470ea6f1b11bf4e4860138e5ea0140392 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Thu, 7 Aug 2025 23:11:47 +0100 Subject: [PATCH 2/3] implement test extesion --- protocols/gossipsub/src/behaviour.rs | 27 +++++++++++-- protocols/gossipsub/src/behaviour/tests.rs | 29 +++++++++++--- .../gossipsub/src/generated/gossipsub/pb.rs | 40 ++++++++++++++++++- protocols/gossipsub/src/generated/rpc.proto | 8 +++- protocols/gossipsub/src/protocol.rs | 7 +++- protocols/gossipsub/src/rpc.rs | 8 ++-- protocols/gossipsub/src/types.rs | 39 ++++++++++++++---- 7 files changed, 134 insertions(+), 24 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 6e88f9de5d3..efe4983d1e0 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1539,6 +1539,12 @@ where } peer.extensions = Some(extensions); + + if extensions.test_extension.unwrap_or(false) + && matches!(peer.kind, PeerKind::Gossipsubv1_3) + { + self.send_message(*peer_id, RpcOut::TestExtension); + } } /// Removes the specified peer from the mesh, returning true if it was present. @@ -2919,7 +2925,8 @@ where | RpcOut::Prune(_) | RpcOut::Subscribe(_) | RpcOut::Unsubscribe(_) - | RpcOut::Extensions(_) => { + | RpcOut::Extensions(_) + | RpcOut::TestExtension => { unreachable!("Channel for highpriority control messages is unbounded and should always be open.") } } @@ -3161,7 +3168,12 @@ where if connected_peer.connections.len() <= 1 { // If this is the first connection send extensions message. - self.send_message(peer_id, RpcOut::Extensions(Extensions {})); + self.send_message( + peer_id, + RpcOut::Extensions(Extensions { + test_extension: Some(true), + }), + ); } Ok(Handler::new(self.config.protocol_config(), receiver)) @@ -3192,7 +3204,12 @@ where if connected_peer.connections.len() <= 1 { // If this is the first connection send extensions message. - self.send_message(peer_id, RpcOut::Extensions(Extensions {})); + self.send_message( + peer_id, + RpcOut::Extensions(Extensions { + test_extension: Some(true), + }), + ); } Ok(Handler::new(self.config.protocol_config(), receiver)) @@ -3396,6 +3413,10 @@ where if !prune_msgs.is_empty() { self.handle_prune(&propagation_source, prune_msgs); } + + if let Some(_extension) = rpc.test_extension { + tracing::debug!("Received Test Extension"); + } } } } diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 607da408ab3..5cb98bf2856 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -420,6 +420,7 @@ fn proto_to_message(rpc: &proto::RPC) -> RpcIn { }) .collect(), control_msgs, + test_extension: None, } } @@ -1253,6 +1254,7 @@ fn test_handle_iwant_msg_but_already_sent_idontwant() { control_msgs: vec![ControlAction::IDontWant(IDontWant { message_ids: vec![msg_id.clone()], })], + test_extension: None, }; gs.on_connection_handler_event( peers[1], @@ -3145,6 +3147,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { messages: vec![raw_message1], subscriptions: vec![subscription.clone()], control_msgs: vec![control_action], + test_extension: None, }, invalid_messages: Vec::new(), }, @@ -3171,6 +3174,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { messages: vec![raw_message3], subscriptions: vec![subscription], control_msgs: vec![control_action], + test_extension: None, }, invalid_messages: Vec::new(), }, @@ -3781,6 +3785,7 @@ fn test_scoring_p4_invalid_signature() { messages: vec![], subscriptions: vec![], control_msgs: vec![], + test_extension: None, }, invalid_messages: vec![(m, ValidationError::InvalidSignature)], }, @@ -5540,6 +5545,7 @@ fn parses_idontwant() { control_msgs: vec![ControlAction::IDontWant(IDontWant { message_ids: vec![message_id.clone()], })], + test_extension: None, }; gs.on_connection_handler_event( peers[1], @@ -6638,6 +6644,7 @@ fn test_validation_error_message_size_too_large_topic_specific() { messages: vec![raw_message], subscriptions: vec![], control_msgs: vec![], + test_extension: None, }, invalid_messages: vec![], }, @@ -6682,6 +6689,7 @@ fn test_validation_error_message_size_too_large_topic_specific() { }], subscriptions: vec![], control: None, + testExtension: None, }; codec.encode(rpc, &mut buf).unwrap(); @@ -6742,6 +6750,7 @@ fn test_validation_message_size_within_topic_specific() { messages: vec![raw_message], subscriptions: vec![], control_msgs: vec![], + test_extension: None, }, invalid_messages: vec![], }, @@ -6786,6 +6795,7 @@ fn test_validation_message_size_within_topic_specific() { }], subscriptions: vec![], control: None, + testExtension: None, }; codec.encode(rpc, &mut buf).unwrap(); @@ -6805,12 +6815,16 @@ fn test_validation_message_size_within_topic_specific() { #[test] fn test_extensions_message_creation() { - let extensions_rpc = RpcOut::Extensions(Extensions {}); + let extensions_rpc = RpcOut::Extensions(Extensions { + test_extension: Some(true), + }); let proto_rpc: proto::RPC = extensions_rpc.into(); assert!(proto_rpc.control.is_some()); let control = proto_rpc.control.unwrap(); assert!(control.extensions.is_some()); + let test_extension = control.extensions.unwrap().testExtension.unwrap(); + assert!(test_extension); assert!(control.ihave.is_empty()); assert!(control.iwant.is_empty()); assert!(control.graft.is_empty()); @@ -6847,7 +6861,9 @@ fn test_handle_extensions_message() { ); // Simulate receiving extensions message - let extensions = Extensions {}; + let extensions = Extensions { + test_extension: Some(false), + }; gs.handle_extensions(&peer_id, extensions); // Verify extensions were stored @@ -6855,11 +6871,13 @@ fn test_handle_extensions_message() { assert!(peer_details.extensions.is_some()); // Simulate receiving duplicate extensions message from another peer - // TODO: when more extensions are added, we should test that they are not overridden. let duplicate_rpc = RpcIn { messages: vec![], subscriptions: vec![], - control_msgs: vec![ControlAction::Extensions(None)], + control_msgs: vec![ControlAction::Extensions(Some(Extensions { + test_extension: Some(true), + }))], + test_extension: None, }; gs.on_connection_handler_event( @@ -6873,5 +6891,6 @@ fn test_handle_extensions_message() { // Extensions should still be present (not cleared or changed) let peer_details = gs.connected_peers.get(&peer_id).unwrap(); - assert!(peer_details.extensions.is_some()); + let test_extension = peer_details.extensions.unwrap().test_extension.unwrap(); + assert!(!test_extension); } diff --git a/protocols/gossipsub/src/generated/gossipsub/pb.rs b/protocols/gossipsub/src/generated/gossipsub/pb.rs index 9a3ddb2e2fb..a62ac15b6d9 100644 --- a/protocols/gossipsub/src/generated/gossipsub/pb.rs +++ b/protocols/gossipsub/src/generated/gossipsub/pb.rs @@ -19,6 +19,7 @@ pub struct RPC { pub subscriptions: Vec, pub publish: Vec, pub control: Option, + pub testExtension: Option, } impl<'a> MessageRead<'a> for RPC { @@ -29,6 +30,7 @@ impl<'a> MessageRead<'a> for RPC { Ok(10) => msg.subscriptions.push(r.read_message::(bytes)?), Ok(18) => msg.publish.push(r.read_message::(bytes)?), Ok(26) => msg.control = Some(r.read_message::(bytes)?), + Ok(51939474) => msg.testExtension = Some(r.read_message::(bytes)?), Ok(t) => { r.read_unknown(bytes, t)?; } Err(e) => return Err(e), } @@ -43,12 +45,14 @@ impl MessageWrite for RPC { + self.subscriptions.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + self.publish.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + self.control.as_ref().map_or(0, |m| 1 + sizeof_len((m).get_size())) + + self.testExtension.as_ref().map_or(0, |m| 4 + sizeof_len((m).get_size())) } fn write_message(&self, w: &mut Writer) -> Result<()> { for s in &self.subscriptions { w.write_with_tag(10, |w| w.write_message(s))?; } for s in &self.publish { w.write_with_tag(18, |w| w.write_message(s))?; } if let Some(ref s) = self.control { w.write_with_tag(26, |w| w.write_message(s))?; } + if let Some(ref s) = self.testExtension { w.write_with_tag(51939474, |w| w.write_message(s))?; } Ok(()) } } @@ -373,16 +377,48 @@ impl MessageWrite for ControlIDontWant { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Debug, Default, PartialEq, Clone)] -pub struct ControlExtensions { } +pub struct ControlExtensions { + pub testExtension: Option, +} impl<'a> MessageRead<'a> for ControlExtensions { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(51939472) => msg.testExtension = Some(r.read_bool(bytes)?), + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl MessageWrite for ControlExtensions { + fn get_size(&self) -> usize { + 0 + + self.testExtension.as_ref().map_or(0, |m| 4 + sizeof_varint(*(m) as u64)) + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + if let Some(ref s) = self.testExtension { w.write_with_tag(51939472, |w| w.write_bool(*s))?; } + Ok(()) + } +} + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct TestExtension { } + +impl<'a> MessageRead<'a> for TestExtension { fn from_reader(r: &mut BytesReader, _: &[u8]) -> Result { r.read_to_end(); Ok(Self::default()) } } -impl MessageWrite for ControlExtensions { } +impl MessageWrite for TestExtension { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Debug, Default, PartialEq, Clone)] diff --git a/protocols/gossipsub/src/generated/rpc.proto b/protocols/gossipsub/src/generated/rpc.proto index 4f50bc77aaa..4a95257b0d8 100644 --- a/protocols/gossipsub/src/generated/rpc.proto +++ b/protocols/gossipsub/src/generated/rpc.proto @@ -12,12 +12,12 @@ message RPC { } optional ControlMessage control = 3; - // Canonical Extensions should register their messages here. // Experimental Extensions should register their messages here. They // must use field numbers larger than 0x200000 to be encoded with at least 4 // bytes + optional TestExtension testExtension = 6492434; } message Message { @@ -64,8 +64,14 @@ message ControlIDontWant { message ControlExtensions { // Initially empty. Future extensions will be added here along with a // reference to their specification. + + // Experimental extensions must use field numbers larger than 0x200000 to be + // encoded with at least 4 bytes + optional bool testExtension = 6492434; } +message TestExtension {} + message PeerInfo { optional bytes peer_id = 1; optional bytes signed_peer_record = 2; diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index dff5c4ffe75..e91bbbb1837 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -36,7 +36,7 @@ use crate::{ topic::TopicHash, types::{ ControlAction, Extensions, Graft, IDontWant, IHave, IWant, MessageId, PeerInfo, PeerKind, - Prune, RawMessage, RpcIn, Subscription, SubscriptionAction, + Prune, RawMessage, RpcIn, Subscription, SubscriptionAction, TestExtension, }, ValidationError, }; @@ -562,7 +562,9 @@ impl Decoder for GossipsubCodec { }) .collect(); - let extension_msg = rpc_control.extensions.map(|_extension| Extensions {}); + let extension_msg = rpc_control.extensions.map(|extensions| Extensions { + test_extension: extensions.testExtension, + }); control_msgs.extend(ihave_msgs); control_msgs.extend(iwant_msgs); @@ -588,6 +590,7 @@ impl Decoder for GossipsubCodec { }) .collect(), control_msgs, + test_extension: rpc.testExtension.map(|_extension| TestExtension {}), }, invalid_messages, })) diff --git a/protocols/gossipsub/src/rpc.rs b/protocols/gossipsub/src/rpc.rs index ca4b25dfd23..3b20e70afa9 100644 --- a/protocols/gossipsub/src/rpc.rs +++ b/protocols/gossipsub/src/rpc.rs @@ -90,9 +90,11 @@ impl Sender { | RpcOut::Extensions(_) | RpcOut::Subscribe(_) | RpcOut::Unsubscribe(_) => &self.priority_sender, - RpcOut::Forward { .. } | RpcOut::IHave(_) | RpcOut::IWant(_) | RpcOut::IDontWant(_) => { - &self.non_priority_sender - } + RpcOut::Forward { .. } + | RpcOut::IHave(_) + | RpcOut::IWant(_) + | RpcOut::IDontWant(_) + | RpcOut::TestExtension => &self.non_priority_sender, }; sender.try_send(rpc).map_err(|err| err.into_inner()) } diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 30edfe28c57..59fe4cfe407 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -30,11 +30,7 @@ use quick_protobuf::MessageWrite; use serde::{Deserialize, Serialize}; use web_time::Instant; -use crate::{ - rpc::Sender, - rpc_proto::proto::{self}, - TopicHash, -}; +use crate::{rpc::Sender, rpc_proto::proto, TopicHash}; /// Messages that have expired while attempting to be sent to a peer. #[derive(Clone, Debug, Default)] @@ -326,7 +322,12 @@ pub struct IDontWant { /// The node has sent us the supported Gossipsub Extensions. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub struct Extensions {} +pub struct Extensions { + pub(crate) test_extension: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct TestExtension {} /// A Gossipsub RPC message sent. #[derive(Debug)] @@ -354,6 +355,8 @@ pub enum RpcOut { IDontWant(IDontWant), /// Send a Extensions control message. Extensions(Extensions), + /// Send a test extension message. + TestExtension, } impl RpcOut { @@ -375,6 +378,7 @@ impl From for proto::RPC { subscriptions: Vec::new(), publish: vec![message.into()], control: None, + testExtension: None, }, RpcOut::Forward { message, @@ -383,6 +387,7 @@ impl From for proto::RPC { publish: vec![message.into()], subscriptions: Vec::new(), control: None, + testExtension: None, }, RpcOut::Subscribe(topic) => proto::RPC { publish: Vec::new(), @@ -391,6 +396,7 @@ impl From for proto::RPC { topic_id: Some(topic.into_string()), }], control: None, + testExtension: None, }, RpcOut::Unsubscribe(topic) => proto::RPC { publish: Vec::new(), @@ -399,6 +405,7 @@ impl From for proto::RPC { topic_id: Some(topic.into_string()), }], control: None, + testExtension: None, }, RpcOut::IHave(IHave { topic_hash, @@ -417,6 +424,7 @@ impl From for proto::RPC { idontwant: vec![], extensions: None, }), + testExtension: None, }, RpcOut::IWant(IWant { message_ids }) => proto::RPC { publish: Vec::new(), @@ -431,6 +439,7 @@ impl From for proto::RPC { idontwant: vec![], extensions: None, }), + testExtension: None, }, RpcOut::Graft(Graft { topic_hash }) => proto::RPC { publish: Vec::new(), @@ -445,6 +454,7 @@ impl From for proto::RPC { idontwant: vec![], extensions: None, }), + testExtension: None, }, RpcOut::Prune(Prune { topic_hash, @@ -473,6 +483,7 @@ impl From for proto::RPC { idontwant: vec![], extensions: None, }), + testExtension: None, } } RpcOut::IDontWant(IDontWant { message_ids }) => proto::RPC { @@ -488,8 +499,9 @@ impl From for proto::RPC { }], extensions: None, }), + testExtension: None, }, - RpcOut::Extensions(Extensions {}) => proto::RPC { + RpcOut::Extensions(Extensions { test_extension }) => proto::RPC { publish: Vec::new(), subscriptions: Vec::new(), control: Some(proto::ControlMessage { @@ -498,8 +510,17 @@ impl From for proto::RPC { graft: vec![], prune: vec![], idontwant: vec![], - extensions: Some(proto::ControlExtensions {}), + extensions: Some(proto::ControlExtensions { + testExtension: test_extension, + }), }), + testExtension: None, + }, + RpcOut::TestExtension => proto::RPC { + subscriptions: vec![], + publish: vec![], + control: None, + testExtension: Some(proto::TestExtension {}), }, } } @@ -514,6 +535,8 @@ pub struct RpcIn { pub subscriptions: Vec, /// List of Gossipsub control messages. pub control_msgs: Vec, + /// Gossipsub test extension. + pub test_extension: Option, } impl fmt::Debug for RpcIn { From 9cb66d80ee4bcfb93be53decf510a64bd93d1e91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Mon, 11 Aug 2025 14:17:39 +0100 Subject: [PATCH 3/3] address age's review --- protocols/gossipsub/Cargo.toml | 1 + protocols/gossipsub/src/behaviour.rs | 33 ++++++++-------------------- protocols/gossipsub/src/handler.rs | 20 +++++++++++++++-- protocols/gossipsub/src/protocol.rs | 2 +- 4 files changed, 29 insertions(+), 27 deletions(-) diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 911b453f477..d2aab7d2a32 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -13,6 +13,7 @@ categories = ["network-programming", "asynchronous"] [features] wasm-bindgen = ["getrandom/js", "futures-timer/wasm-bindgen"] metrics = ["prometheus-client"] +test-extension = [] [dependencies] async-channel = "2.3.1" diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index efe4983d1e0..6cbd92047a8 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1540,6 +1540,7 @@ where peer.extensions = Some(extensions); + #[cfg(feature = "test-extension")] if extensions.test_extension.unwrap_or(false) && matches!(peer.kind, PeerKind::Gossipsubv1_3) { @@ -3164,19 +3165,11 @@ where }); // Add the new connection connected_peer.connections.push(connection_id); - let receiver = connected_peer.sender.new_receiver(); - if connected_peer.connections.len() <= 1 { - // If this is the first connection send extensions message. - self.send_message( - peer_id, - RpcOut::Extensions(Extensions { - test_extension: Some(true), - }), - ); - } - - Ok(Handler::new(self.config.protocol_config(), receiver)) + Ok(Handler::new( + self.config.protocol_config(), + connected_peer.sender.new_receiver(), + )) } fn handle_established_outbound_connection( @@ -3200,19 +3193,11 @@ where }); // Add the new connection connected_peer.connections.push(connection_id); - let receiver = connected_peer.sender.new_receiver(); - - if connected_peer.connections.len() <= 1 { - // If this is the first connection send extensions message. - self.send_message( - peer_id, - RpcOut::Extensions(Extensions { - test_extension: Some(true), - }), - ); - } - Ok(Handler::new(self.config.protocol_config(), receiver)) + Ok(Handler::new( + self.config.protocol_config(), + connected_peer.sender.new_receiver(), + )) } fn on_connection_handler_event( diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index a2d05d8a3ff..7be6446f026 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -39,7 +39,7 @@ use crate::{ protocol::{GossipsubCodec, ProtocolConfig}, rpc::Receiver, rpc_proto::proto, - types::{PeerKind, RawMessage, RpcIn, RpcOut}, + types::{Extensions, PeerKind, RawMessage, RpcIn, RpcOut}, ValidationError, }; @@ -211,7 +211,23 @@ impl EnabledHandler { self.outbound_substream.is_none(), "Established an outbound substream with one already available" ); - self.outbound_substream = Some(OutboundSubstreamState::WaitingOutput(substream)); + + // For gossipsub 1.3 peers, immediately send Extensions message. + if matches!(peer_kind, PeerKind::Gossipsubv1_3) { + let test_extension = if cfg!(feature = "test-extension") { + Some(true) + } else { + None + }; + + let extensions_rpc = RpcOut::Extensions(Extensions { test_extension }); + self.outbound_substream = Some(OutboundSubstreamState::PendingSend( + substream, + extensions_rpc.into_protobuf(), + )); + } else { + self.outbound_substream = Some(OutboundSubstreamState::WaitingOutput(substream)); + } } fn poll( diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index e91bbbb1837..bbe79f743e2 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -45,7 +45,7 @@ pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:"; pub(crate) const GOSSIPSUB_1_3_0_PROTOCOL: ProtocolId = ProtocolId { protocol: StreamProtocol::new("/meshsub/1.3.0"), - kind: PeerKind::Gossipsubv1_2, + kind: PeerKind::Gossipsubv1_3, }; pub(crate) const GOSSIPSUB_1_2_0_PROTOCOL: ProtocolId = ProtocolId {