diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index f83c554d333..e775b5cc1d1 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,4 +1,8 @@ ## 0.50.0 + +- Implement gossipsub 1.3 extensions control message. + See [PR 6119](https://github.com/libp2p/rust-libp2p/pull/6119) + - Remove peer penalty for duplicate messages. See [PR 6112](https://github.com/libp2p/rust-libp2p/pull/6112) diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 911b453f477..d2aab7d2a32 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -13,6 +13,7 @@ categories = ["network-programming", "asynchronous"] [features] wasm-bindgen = ["getrandom/js", "futures-timer/wasm-bindgen"] metrics = ["prometheus-client"] +test-extension = [] [dependencies] async-channel = "2.3.1" diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 9d7d6356211..6cbd92047a8 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -68,8 +68,8 @@ use crate::{ topic::{Hasher, Topic, TopicHash}, transform::{DataTransform, IdentityTransform}, types::{ - ControlAction, Graft, IDontWant, IHave, IWant, Message, MessageAcceptance, MessageId, - PeerDetails, PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription, + ControlAction, Extensions, Graft, IDontWant, IHave, IWant, Message, MessageAcceptance, + MessageId, PeerDetails, PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription, SubscriptionAction, }, FailedMessages, PublishError, SubscriptionError, TopicScoreParams, ValidationError, @@ -1521,6 +1521,33 @@ where tracing::debug!(peer=%peer_id, "Completed GRAFT handling for peer"); } + fn handle_extensions(&mut self, peer_id: &PeerId, extensions: Extensions) { + let Some(peer) = self.connected_peers.get_mut(peer_id) else { + tracing::error!( + peer=%peer_id, + "Extensions by unknown peer" + ); + return; + }; + + if peer.extensions.is_some() { + tracing::debug!( + peer=%peer_id, + "Peer had already sent us extensions message" + ); + return; + } + + peer.extensions = Some(extensions); + + #[cfg(feature = "test-extension")] + if extensions.test_extension.unwrap_or(false) + && matches!(peer.kind, PeerKind::Gossipsubv1_3) + { + self.send_message(*peer_id, RpcOut::TestExtension); + } + } + /// Removes the specified peer from the mesh, returning true if it was present. fn remove_peer_from_mesh( &mut self, @@ -2898,7 +2925,9 @@ where RpcOut::Graft(_) | RpcOut::Prune(_) | RpcOut::Subscribe(_) - | RpcOut::Unsubscribe(_) => { + | RpcOut::Unsubscribe(_) + | RpcOut::Extensions(_) + | RpcOut::TestExtension => { unreachable!("Channel for highpriority control messages is unbounded and should always be open.") } } @@ -3132,6 +3161,7 @@ where sender: Sender::new(self.config.connection_handler_queue_len()), topics: Default::default(), dont_send: LinkedHashMap::new(), + extensions: None, }); // Add the new connection connected_peer.connections.push(connection_id); @@ -3159,6 +3189,7 @@ where sender: Sender::new(self.config.connection_handler_queue_len()), topics: Default::default(), dont_send: LinkedHashMap::new(), + extensions: None, }); // Add the new connection connected_peer.connections.push(connection_id); @@ -3351,6 +3382,11 @@ where } } } + ControlAction::Extensions(extensions) => { + if let Some(extensions) = extensions { + self.handle_extensions(&propagation_source, extensions); + } + } } } if !ihave_msgs.is_empty() { @@ -3362,6 +3398,10 @@ where if !prune_msgs.is_empty() { self.handle_prune(&propagation_source, prune_msgs); } + + if let Some(_extension) = rpc.test_extension { + tracing::debug!("Received Test Extension"); + } } } } diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 85aead47911..5cb98bf2856 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -33,8 +33,9 @@ use crate::{ config::{ConfigBuilder, TopicMeshConfig}, protocol::GossipsubCodec, rpc::Receiver, + rpc_proto::proto, subscription_filter::WhitelistSubscriptionFilter, - types::RpcIn, + types::{ControlAction, Extensions, RpcIn, RpcOut}, IdentTopic as Topic, }; @@ -248,6 +249,7 @@ where topics: Default::default(), sender, dont_send: LinkedHashMap::new(), + extensions: None, }, ); @@ -418,6 +420,7 @@ fn proto_to_message(rpc: &proto::RPC) -> RpcIn { }) .collect(), control_msgs, + test_extension: None, } } @@ -644,6 +647,7 @@ fn test_join() { topics: Default::default(), sender, dont_send: LinkedHashMap::new(), + extensions: None, }, ); receivers.insert(random_peer, receiver); @@ -1041,6 +1045,7 @@ fn test_get_random_peers() { topics: topics.clone(), sender: Sender::new(gs.config.connection_handler_queue_len()), dont_send: LinkedHashMap::new(), + extensions: None, }, ); } @@ -1249,6 +1254,7 @@ fn test_handle_iwant_msg_but_already_sent_idontwant() { control_msgs: vec![ControlAction::IDontWant(IDontWant { message_ids: vec![msg_id.clone()], })], + test_extension: None, }; gs.on_connection_handler_event( peers[1], @@ -3141,6 +3147,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { messages: vec![raw_message1], subscriptions: vec![subscription.clone()], control_msgs: vec![control_action], + test_extension: None, }, invalid_messages: Vec::new(), }, @@ -3167,6 +3174,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { messages: vec![raw_message3], subscriptions: vec![subscription], control_msgs: vec![control_action], + test_extension: None, }, invalid_messages: Vec::new(), }, @@ -3777,6 +3785,7 @@ fn test_scoring_p4_invalid_signature() { messages: vec![], subscriptions: vec![], control_msgs: vec![], + test_extension: None, }, invalid_messages: vec![(m, ValidationError::InvalidSignature)], }, @@ -5536,6 +5545,7 @@ fn parses_idontwant() { control_msgs: vec![ControlAction::IDontWant(IDontWant { message_ids: vec![message_id.clone()], })], + test_extension: None, }; gs.on_connection_handler_event( peers[1], @@ -5595,6 +5605,7 @@ fn test_all_queues_full() { topics: topics.clone(), sender: Sender::new(2), dont_send: LinkedHashMap::new(), + extensions: None, }, ); @@ -5631,6 +5642,7 @@ fn test_slow_peer_returns_failed_publish() { topics: topics.clone(), sender: Sender::new(2), dont_send: LinkedHashMap::new(), + extensions: None, }, ); let peer_id = PeerId::random(); @@ -5644,6 +5656,7 @@ fn test_slow_peer_returns_failed_publish() { topics: topics.clone(), sender: Sender::new(gs.config.connection_handler_queue_len()), dont_send: LinkedHashMap::new(), + extensions: None, }, ); @@ -5705,6 +5718,7 @@ fn test_slow_peer_returns_failed_ihave_handling() { topics: topics.clone(), sender: Sender::new(2), dont_send: LinkedHashMap::new(), + extensions: None, }, ); peers.push(slow_peer_id); @@ -5722,6 +5736,7 @@ fn test_slow_peer_returns_failed_ihave_handling() { topics: topics.clone(), sender: Sender::new(gs.config.connection_handler_queue_len()), dont_send: LinkedHashMap::new(), + extensions: None, }, ); @@ -5819,6 +5834,7 @@ fn test_slow_peer_returns_failed_iwant_handling() { topics: topics.clone(), sender: Sender::new(2), dont_send: LinkedHashMap::new(), + extensions: None, }, ); peers.push(slow_peer_id); @@ -5836,6 +5852,7 @@ fn test_slow_peer_returns_failed_iwant_handling() { topics: topics.clone(), sender: Sender::new(gs.config.connection_handler_queue_len()), dont_send: LinkedHashMap::new(), + extensions: None, }, ); @@ -5913,6 +5930,7 @@ fn test_slow_peer_returns_failed_forward() { topics: topics.clone(), sender: Sender::new(2), dont_send: LinkedHashMap::new(), + extensions: None, }, ); peers.push(slow_peer_id); @@ -5930,6 +5948,7 @@ fn test_slow_peer_returns_failed_forward() { topics: topics.clone(), sender: Sender::new(gs.config.connection_handler_queue_len()), dont_send: LinkedHashMap::new(), + extensions: None, }, ); @@ -6012,6 +6031,7 @@ fn test_slow_peer_is_downscored_on_publish() { topics: topics.clone(), sender: Sender::new(2), dont_send: LinkedHashMap::new(), + extensions: None, }, ); gs.as_peer_score_mut().add_peer(slow_peer_id); @@ -6026,6 +6046,7 @@ fn test_slow_peer_is_downscored_on_publish() { topics: topics.clone(), sender: Sender::new(gs.config.connection_handler_queue_len()), dont_send: LinkedHashMap::new(), + extensions: None, }, ); @@ -6623,6 +6644,7 @@ fn test_validation_error_message_size_too_large_topic_specific() { messages: vec![raw_message], subscriptions: vec![], control_msgs: vec![], + test_extension: None, }, invalid_messages: vec![], }, @@ -6667,6 +6689,7 @@ fn test_validation_error_message_size_too_large_topic_specific() { }], subscriptions: vec![], control: None, + testExtension: None, }; codec.encode(rpc, &mut buf).unwrap(); @@ -6727,6 +6750,7 @@ fn test_validation_message_size_within_topic_specific() { messages: vec![raw_message], subscriptions: vec![], control_msgs: vec![], + test_extension: None, }, invalid_messages: vec![], }, @@ -6771,6 +6795,7 @@ fn test_validation_message_size_within_topic_specific() { }], subscriptions: vec![], control: None, + testExtension: None, }; codec.encode(rpc, &mut buf).unwrap(); @@ -6787,3 +6812,85 @@ fn test_validation_message_size_within_topic_specific() { _ => panic!("Unexpected event"), } } + +#[test] +fn test_extensions_message_creation() { + let extensions_rpc = RpcOut::Extensions(Extensions { + test_extension: Some(true), + }); + let proto_rpc: proto::RPC = extensions_rpc.into(); + + assert!(proto_rpc.control.is_some()); + let control = proto_rpc.control.unwrap(); + assert!(control.extensions.is_some()); + let test_extension = control.extensions.unwrap().testExtension.unwrap(); + assert!(test_extension); + assert!(control.ihave.is_empty()); + assert!(control.iwant.is_empty()); + assert!(control.graft.is_empty()); + assert!(control.prune.is_empty()); + assert!(control.idontwant.is_empty()); +} + +#[test] +fn test_handle_extensions_message() { + let mut gs: Behaviour = Behaviour::new( + MessageAuthenticity::Anonymous, + ConfigBuilder::default() + .validation_mode(ValidationMode::None) + .build() + .unwrap(), + ) + .unwrap(); + + let peer_id = PeerId::random(); + let sender = Sender::new(gs.config.connection_handler_queue_len()); + + // Add peer without extensions + gs.connected_peers.insert( + peer_id, + PeerDetails { + kind: PeerKind::Gossipsubv1_3, + connections: vec![ConnectionId::new_unchecked(0)], + outbound: false, + topics: BTreeSet::new(), + sender, + dont_send: LinkedHashMap::new(), + extensions: None, + }, + ); + + // Simulate receiving extensions message + let extensions = Extensions { + test_extension: Some(false), + }; + gs.handle_extensions(&peer_id, extensions); + + // Verify extensions were stored + let peer_details = gs.connected_peers.get(&peer_id).unwrap(); + assert!(peer_details.extensions.is_some()); + + // Simulate receiving duplicate extensions message from another peer + let duplicate_rpc = RpcIn { + messages: vec![], + subscriptions: vec![], + control_msgs: vec![ControlAction::Extensions(Some(Extensions { + test_extension: Some(true), + }))], + test_extension: None, + }; + + gs.on_connection_handler_event( + peer_id, + ConnectionId::new_unchecked(0), + HandlerEvent::Message { + rpc: duplicate_rpc, + invalid_messages: vec![], + }, + ); + + // Extensions should still be present (not cleared or changed) + let peer_details = gs.connected_peers.get(&peer_id).unwrap(); + let test_extension = peer_details.extensions.unwrap().test_extension.unwrap(); + assert!(!test_extension); +} diff --git a/protocols/gossipsub/src/generated/gossipsub/pb.rs b/protocols/gossipsub/src/generated/gossipsub/pb.rs index 24ac80d2755..a62ac15b6d9 100644 --- a/protocols/gossipsub/src/generated/gossipsub/pb.rs +++ b/protocols/gossipsub/src/generated/gossipsub/pb.rs @@ -19,6 +19,7 @@ pub struct RPC { pub subscriptions: Vec, pub publish: Vec, pub control: Option, + pub testExtension: Option, } impl<'a> MessageRead<'a> for RPC { @@ -29,6 +30,7 @@ impl<'a> MessageRead<'a> for RPC { Ok(10) => msg.subscriptions.push(r.read_message::(bytes)?), Ok(18) => msg.publish.push(r.read_message::(bytes)?), Ok(26) => msg.control = Some(r.read_message::(bytes)?), + Ok(51939474) => msg.testExtension = Some(r.read_message::(bytes)?), Ok(t) => { r.read_unknown(bytes, t)?; } Err(e) => return Err(e), } @@ -43,12 +45,14 @@ impl MessageWrite for RPC { + self.subscriptions.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + self.publish.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + self.control.as_ref().map_or(0, |m| 1 + sizeof_len((m).get_size())) + + self.testExtension.as_ref().map_or(0, |m| 4 + sizeof_len((m).get_size())) } fn write_message(&self, w: &mut Writer) -> Result<()> { for s in &self.subscriptions { w.write_with_tag(10, |w| w.write_message(s))?; } for s in &self.publish { w.write_with_tag(18, |w| w.write_message(s))?; } if let Some(ref s) = self.control { w.write_with_tag(26, |w| w.write_message(s))?; } + if let Some(ref s) = self.testExtension { w.write_with_tag(51939474, |w| w.write_message(s))?; } Ok(()) } } @@ -155,6 +159,7 @@ pub struct ControlMessage { pub graft: Vec, pub prune: Vec, pub idontwant: Vec, + pub extensions: Option, } impl<'a> MessageRead<'a> for ControlMessage { @@ -167,6 +172,7 @@ impl<'a> MessageRead<'a> for ControlMessage { Ok(26) => msg.graft.push(r.read_message::(bytes)?), Ok(34) => msg.prune.push(r.read_message::(bytes)?), Ok(42) => msg.idontwant.push(r.read_message::(bytes)?), + Ok(50) => msg.extensions = Some(r.read_message::(bytes)?), Ok(t) => { r.read_unknown(bytes, t)?; } Err(e) => return Err(e), } @@ -183,6 +189,7 @@ impl MessageWrite for ControlMessage { + self.graft.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + self.prune.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + self.idontwant.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + + self.extensions.as_ref().map_or(0, |m| 1 + sizeof_len((m).get_size())) } fn write_message(&self, w: &mut Writer) -> Result<()> { @@ -191,6 +198,7 @@ impl MessageWrite for ControlMessage { for s in &self.graft { w.write_with_tag(26, |w| w.write_message(s))?; } for s in &self.prune { w.write_with_tag(34, |w| w.write_message(s))?; } for s in &self.idontwant { w.write_with_tag(42, |w| w.write_message(s))?; } + if let Some(ref s) = self.extensions { w.write_with_tag(50, |w| w.write_message(s))?; } Ok(()) } } @@ -367,6 +375,51 @@ impl MessageWrite for ControlIDontWant { } } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct ControlExtensions { + pub testExtension: Option, +} + +impl<'a> MessageRead<'a> for ControlExtensions { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(51939472) => msg.testExtension = Some(r.read_bool(bytes)?), + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl MessageWrite for ControlExtensions { + fn get_size(&self) -> usize { + 0 + + self.testExtension.as_ref().map_or(0, |m| 4 + sizeof_varint(*(m) as u64)) + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + if let Some(ref s) = self.testExtension { w.write_with_tag(51939472, |w| w.write_bool(*s))?; } + Ok(()) + } +} + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct TestExtension { } + +impl<'a> MessageRead<'a> for TestExtension { + fn from_reader(r: &mut BytesReader, _: &[u8]) -> Result { + r.read_to_end(); + Ok(Self::default()) + } +} + +impl MessageWrite for TestExtension { } + #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Debug, Default, PartialEq, Clone)] pub struct PeerInfo { diff --git a/protocols/gossipsub/src/generated/rpc.proto b/protocols/gossipsub/src/generated/rpc.proto index fe4d3bc9366..4a95257b0d8 100644 --- a/protocols/gossipsub/src/generated/rpc.proto +++ b/protocols/gossipsub/src/generated/rpc.proto @@ -12,6 +12,12 @@ message RPC { } optional ControlMessage control = 3; + // Canonical Extensions should register their messages here. + + // Experimental Extensions should register their messages here. They + // must use field numbers larger than 0x200000 to be encoded with at least 4 + // bytes + optional TestExtension testExtension = 6492434; } message Message { @@ -29,6 +35,7 @@ message ControlMessage { repeated ControlGraft graft = 3; repeated ControlPrune prune = 4; repeated ControlIDontWant idontwant = 5; + optional ControlExtensions extensions = 6; } message ControlIHave { @@ -54,6 +61,17 @@ message ControlIDontWant { repeated bytes message_ids = 1; } +message ControlExtensions { + // Initially empty. Future extensions will be added here along with a + // reference to their specification. + + // Experimental extensions must use field numbers larger than 0x200000 to be + // encoded with at least 4 bytes + optional bool testExtension = 6492434; +} + +message TestExtension {} + message PeerInfo { optional bytes peer_id = 1; optional bytes signed_peer_record = 2; diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index a2d05d8a3ff..7be6446f026 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -39,7 +39,7 @@ use crate::{ protocol::{GossipsubCodec, ProtocolConfig}, rpc::Receiver, rpc_proto::proto, - types::{PeerKind, RawMessage, RpcIn, RpcOut}, + types::{Extensions, PeerKind, RawMessage, RpcIn, RpcOut}, ValidationError, }; @@ -211,7 +211,23 @@ impl EnabledHandler { self.outbound_substream.is_none(), "Established an outbound substream with one already available" ); - self.outbound_substream = Some(OutboundSubstreamState::WaitingOutput(substream)); + + // For gossipsub 1.3 peers, immediately send Extensions message. + if matches!(peer_kind, PeerKind::Gossipsubv1_3) { + let test_extension = if cfg!(feature = "test-extension") { + Some(true) + } else { + None + }; + + let extensions_rpc = RpcOut::Extensions(Extensions { test_extension }); + self.outbound_substream = Some(OutboundSubstreamState::PendingSend( + substream, + extensions_rpc.into_protobuf(), + )); + } else { + self.outbound_substream = Some(OutboundSubstreamState::WaitingOutput(substream)); + } } fn poll( diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index 821c11d2132..bbe79f743e2 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -35,14 +35,19 @@ use crate::{ rpc_proto::proto, topic::TopicHash, types::{ - ControlAction, Graft, IDontWant, IHave, IWant, MessageId, PeerInfo, PeerKind, Prune, - RawMessage, RpcIn, Subscription, SubscriptionAction, + ControlAction, Extensions, Graft, IDontWant, IHave, IWant, MessageId, PeerInfo, PeerKind, + Prune, RawMessage, RpcIn, Subscription, SubscriptionAction, TestExtension, }, ValidationError, }; pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:"; +pub(crate) const GOSSIPSUB_1_3_0_PROTOCOL: ProtocolId = ProtocolId { + protocol: StreamProtocol::new("/meshsub/1.3.0"), + kind: PeerKind::Gossipsubv1_3, +}; + pub(crate) const GOSSIPSUB_1_2_0_PROTOCOL: ProtocolId = ProtocolId { protocol: StreamProtocol::new("/meshsub/1.2.0"), kind: PeerKind::Gossipsubv1_2, @@ -79,6 +84,7 @@ impl Default for ProtocolConfig { Self { validation_mode: ValidationMode::Strict, protocol_ids: vec![ + GOSSIPSUB_1_3_0_PROTOCOL, GOSSIPSUB_1_2_0_PROTOCOL, GOSSIPSUB_1_1_0_PROTOCOL, GOSSIPSUB_1_0_0_PROTOCOL, @@ -556,11 +562,16 @@ impl Decoder for GossipsubCodec { }) .collect(); + let extension_msg = rpc_control.extensions.map(|extensions| Extensions { + test_extension: extensions.testExtension, + }); + control_msgs.extend(ihave_msgs); control_msgs.extend(iwant_msgs); control_msgs.extend(graft_msgs); control_msgs.extend(prune_msgs); control_msgs.extend(idontwant_msgs); + control_msgs.push(ControlAction::Extensions(extension_msg)); } Ok(Some(HandlerEvent::Message { @@ -579,6 +590,7 @@ impl Decoder for GossipsubCodec { }) .collect(), control_msgs, + test_extension: rpc.testExtension.map(|_extension| TestExtension {}), }, invalid_messages, })) diff --git a/protocols/gossipsub/src/rpc.rs b/protocols/gossipsub/src/rpc.rs index 943df31f599..3b20e70afa9 100644 --- a/protocols/gossipsub/src/rpc.rs +++ b/protocols/gossipsub/src/rpc.rs @@ -87,11 +87,14 @@ impl Sender { RpcOut::Publish { .. } | RpcOut::Graft(_) | RpcOut::Prune(_) + | RpcOut::Extensions(_) | RpcOut::Subscribe(_) | RpcOut::Unsubscribe(_) => &self.priority_sender, - RpcOut::Forward { .. } | RpcOut::IHave(_) | RpcOut::IWant(_) | RpcOut::IDontWant(_) => { - &self.non_priority_sender - } + RpcOut::Forward { .. } + | RpcOut::IHave(_) + | RpcOut::IWant(_) + | RpcOut::IDontWant(_) + | RpcOut::TestExtension => &self.non_priority_sender, }; sender.try_send(rpc).map_err(|err| err.into_inner()) } diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 8f8a4f38a88..59fe4cfe407 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -105,6 +105,8 @@ impl std::fmt::Debug for MessageId { pub(crate) struct PeerDetails { /// The kind of protocol the peer supports. pub(crate) kind: PeerKind, + /// The Extensions supported by the peer if any. + pub(crate) extensions: Option, /// If the peer is an outbound connection. pub(crate) outbound: bool, /// Its current connections. @@ -124,6 +126,8 @@ pub(crate) struct PeerDetails { derive(prometheus_client::encoding::EncodeLabelValue) )] pub enum PeerKind { + /// A gossipsub 1.3 peer. + Gossipsubv1_3, /// A gossipsub 1.2 peer. Gossipsubv1_2, /// A gossipsub 1.1 peer. @@ -271,6 +275,8 @@ pub enum ControlAction { /// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant /// control message. IDontWant(IDontWant), + /// The Node has sent us its supported extensions. + Extensions(Option), } /// Node broadcasts known messages per topic - IHave control message. @@ -314,10 +320,19 @@ pub struct IDontWant { pub(crate) message_ids: Vec, } +/// The node has sent us the supported Gossipsub Extensions. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct Extensions { + pub(crate) test_extension: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct TestExtension {} + /// A Gossipsub RPC message sent. #[derive(Debug)] pub enum RpcOut { - /// Publish a Gossipsub message on network.`timeout` limits the duration the message + /// PublishV a Gossipsub message on network.`timeout` limits the duration the message /// can wait to be sent before it is abandoned. Publish { message: RawMessage, timeout: Delay }, /// Forward a Gossipsub message on network. `timeout` limits the duration the message @@ -338,6 +353,10 @@ pub enum RpcOut { /// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant /// control message. IDontWant(IDontWant), + /// Send a Extensions control message. + Extensions(Extensions), + /// Send a test extension message. + TestExtension, } impl RpcOut { @@ -359,6 +378,7 @@ impl From for proto::RPC { subscriptions: Vec::new(), publish: vec![message.into()], control: None, + testExtension: None, }, RpcOut::Forward { message, @@ -367,6 +387,7 @@ impl From for proto::RPC { publish: vec![message.into()], subscriptions: Vec::new(), control: None, + testExtension: None, }, RpcOut::Subscribe(topic) => proto::RPC { publish: Vec::new(), @@ -375,6 +396,7 @@ impl From for proto::RPC { topic_id: Some(topic.into_string()), }], control: None, + testExtension: None, }, RpcOut::Unsubscribe(topic) => proto::RPC { publish: Vec::new(), @@ -383,6 +405,7 @@ impl From for proto::RPC { topic_id: Some(topic.into_string()), }], control: None, + testExtension: None, }, RpcOut::IHave(IHave { topic_hash, @@ -399,7 +422,9 @@ impl From for proto::RPC { graft: vec![], prune: vec![], idontwant: vec![], + extensions: None, }), + testExtension: None, }, RpcOut::IWant(IWant { message_ids }) => proto::RPC { publish: Vec::new(), @@ -412,7 +437,9 @@ impl From for proto::RPC { graft: vec![], prune: vec![], idontwant: vec![], + extensions: None, }), + testExtension: None, }, RpcOut::Graft(Graft { topic_hash }) => proto::RPC { publish: Vec::new(), @@ -425,7 +452,9 @@ impl From for proto::RPC { }], prune: vec![], idontwant: vec![], + extensions: None, }), + testExtension: None, }, RpcOut::Prune(Prune { topic_hash, @@ -452,7 +481,9 @@ impl From for proto::RPC { backoff, }], idontwant: vec![], + extensions: None, }), + testExtension: None, } } RpcOut::IDontWant(IDontWant { message_ids }) => proto::RPC { @@ -466,7 +497,30 @@ impl From for proto::RPC { idontwant: vec![proto::ControlIDontWant { message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), }], + extensions: None, }), + testExtension: None, + }, + RpcOut::Extensions(Extensions { test_extension }) => proto::RPC { + publish: Vec::new(), + subscriptions: Vec::new(), + control: Some(proto::ControlMessage { + ihave: vec![], + iwant: vec![], + graft: vec![], + prune: vec![], + idontwant: vec![], + extensions: Some(proto::ControlExtensions { + testExtension: test_extension, + }), + }), + testExtension: None, + }, + RpcOut::TestExtension => proto::RPC { + subscriptions: vec![], + publish: vec![], + control: None, + testExtension: Some(proto::TestExtension {}), }, } } @@ -481,6 +535,8 @@ pub struct RpcIn { pub subscriptions: Vec, /// List of Gossipsub control messages. pub control_msgs: Vec, + /// Gossipsub test extension. + pub test_extension: Option, } impl fmt::Debug for RpcIn { @@ -507,6 +563,7 @@ impl PeerKind { Self::Gossipsub => "Gossipsub v1.0", Self::Gossipsubv1_1 => "Gossipsub v1.1", Self::Gossipsubv1_2 => "Gossipsub v1.2", + Self::Gossipsubv1_3 => "Gossipsub v1.3", } } }