From eb22c19d2dae0ccca1fc5b9f07f25ca09c55a153 Mon Sep 17 00:00:00 2001 From: Darius Date: Tue, 2 Sep 2025 09:18:41 -0400 Subject: [PATCH 01/12] feat: implement autorelay behaviour --- protocols/relay/src/autorelay.rs | 433 +++++++++++++++++++++++ protocols/relay/src/autorelay/handler.rs | 106 ++++++ protocols/relay/src/lib.rs | 1 + protocols/relay/src/multiaddr_ext.rs | 38 ++ 4 files changed, 578 insertions(+) create mode 100644 protocols/relay/src/autorelay.rs create mode 100644 protocols/relay/src/autorelay/handler.rs diff --git a/protocols/relay/src/autorelay.rs b/protocols/relay/src/autorelay.rs new file mode 100644 index 00000000000..27bc66f47f3 --- /dev/null +++ b/protocols/relay/src/autorelay.rs @@ -0,0 +1,433 @@ +use crate::autorelay::handler::Out; +use crate::multiaddr_ext::MultiaddrExt; +use either::Either; +use libp2p_core::multiaddr::Protocol; +use libp2p_core::transport::{ListenerId, PortUse}; +use libp2p_core::Endpoint; +use libp2p_identity::PeerId; +use libp2p_swarm::derive_prelude::{ + AddressChange, ConnectionClosed, ConnectionDenied, ConnectionEstablished, ConnectionId, + DialFailure, ExpiredListenAddr, FromSwarm, ListenerClosed, ListenerError, Multiaddr, + NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, +}; +use libp2p_swarm::{dummy, ExternalAddresses, ListenOpts, NewListenAddr}; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::num::NonZeroU8; +use std::task::{Context, Poll, Waker}; + +mod handler; + +#[derive(Default, Debug)] +pub struct Behaviour { + config: Config, + external_addresses: ExternalAddresses, + events: VecDeque::ToSwarm, THandlerInEvent>>, + + connections: HashMap<(PeerId, ConnectionId), Connection>, + + pending_reservation: HashMap, + pending_target: HashSet<(PeerId, ConnectionId)>, + + waker: Option, +} + +#[derive(Debug)] +struct Connection { + address: Multiaddr, + relay_status: RelayStatus, +} + +impl Connection { + /// Mark relayed connection as not supported + pub fn disqualify_connection(&mut self) -> bool { + match self.address.is_relayed() { + true => { + self.relay_status = RelayStatus::NotSupported; + true + } + false => { + self.relay_status = RelayStatus::Pending; + false + } + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum RelayStatus { + Supported { status: ReservationStatus }, + NotSupported, + Pending, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum ReservationStatus { + Idle, + Pending { id: ListenerId }, + Active { id: ListenerId }, +} + +#[derive(Debug)] +pub struct Config { + max_reservations: NonZeroU8, +} + +impl Default for Config { + fn default() -> Self { + Self { + max_reservations: NonZeroU8::new(2).unwrap(), + } + } +} + +impl Config { + pub fn set_max_reservations(mut self, max_reservations: u8) -> Self { + assert!(max_reservations > 0); + self.max_reservations = NonZeroU8::new(max_reservations).expect("greater than zero"); + self + } +} + +#[derive(Debug)] +#[non_exhaustive] +pub enum Event {} + +impl Behaviour { + + pub fn new_with_config(config: Config) -> Self { + Self { + config, + ..Default::default() + } + } + + fn select_connection_for_reservation( + &mut self, + (peer_id, connection_id): (PeerId, ConnectionId), + ) -> bool { + if self.pending_target.contains(&(peer_id, connection_id)) { + return false; + } + + let info = self + .connections + .get_mut(&(peer_id, connection_id)) + .expect("connection is present"); + + let addr_with_peer_id = match info.address.clone().with_p2p(peer_id) { + Ok(addr) => addr, + Err(addr) => { + tracing::warn!(%addr, "address unexpectedly contains a different peer id than the connection"); + return false; + } + }; + + let relay_addr = addr_with_peer_id.with(Protocol::P2pCircuit); + + let opts = ListenOpts::new(relay_addr); + + let id = opts.listener_id(); + + info.relay_status = RelayStatus::Supported { + status: ReservationStatus::Pending { id }, + }; + self.pending_reservation + .insert(id, (peer_id, connection_id)); + self.events.push_back(ToSwarm::ListenOn { opts }); + self.pending_target.insert((peer_id, connection_id)); + + true + } + + fn meet_reservation_target(&mut self) { + // check to determine if there is a public external address that could possibly let us know the node + // is reachable + if self + .external_addresses + .iter() + .any(|addr| addr.is_public() && !addr.is_relayed()) + { + return; + } + + let max_reservation = self.config.max_reservations.get() as usize; + + let peers_not_supported = self.connections.is_empty() + || self + .connections + .iter() + .all(|(_, connection)| connection.relay_status == RelayStatus::NotSupported); + + if peers_not_supported { + return; + } + + let relayed_targets = self + .connections + .iter() + .filter(|(_, info)| { + matches!( + info.relay_status, + RelayStatus::Supported { + status: ReservationStatus::Active { .. } + } + ) + }) + .count(); + + if relayed_targets == max_reservation { + return; + } + + let pending_target_len = self.pending_target.len(); + + if pending_target_len >= max_reservation { + return; + } + + let possible_targets = self + .connections + .iter() + .filter(|(_, info)| { + matches!( + info.relay_status, + RelayStatus::Supported { + status: ReservationStatus::Idle + } + ) + }) + .map(|((peer_id, connection_id), _)| (*peer_id, *connection_id)) + .collect::>(); + + let targets_count = std::cmp::min(possible_targets.len(), max_reservation); + + if targets_count == 0 { + return; + } + + let remaining_targets_needed = targets_count + .checked_sub(self.pending_target.len()) + .unwrap_or_default(); + + if remaining_targets_needed == 0 { + return; + } + + for (peer_id, connection_id) in possible_targets + .iter() + .copied() + .take(remaining_targets_needed) + { + if !self.select_connection_for_reservation((peer_id, connection_id)) { + continue; + } + + if self.pending_target.len() == max_reservation { + break; + } + } + + debug_assert!(self.pending_target.len() <= max_reservation); + } + + fn active_reservations(&self) -> usize { + self.connections.values().filter(|info| matches!(info.relay_status, RelayStatus::Supported { status: ReservationStatus::Active { .. } })).count() + } + + // fn on_connection_established( + // &mut self, + // peer_id: PeerId, + // connection_id: ConnectionId, + // connection_id: ConnectionId, + // address: Multiaddr, + // ) { + // } + // + // fn on_connection_closed(&mut self, peer_id: PeerId, connection_id: ConnectionId) {} + // + // fn on_address_change( + // &mut self, + // peer_id: PeerId, + // connection_id: ConnectionId, + // old_addr: &Multiaddr, + // new_addr: &Multiaddr, + // ) { + // } + // + // fn on_new_listen_addr(&mut self, listener_id: ListenerId, addr: &Multiaddr) {} + // fn on_expired_listen_addr(&mut self, listener_id: ListenerId) {} + // fn on_listener_error(&mut self, listener_id: ListenerId, error: &std::io::Error) {} + // fn on_listener_closed(&mut self, listener_id: ListenerId) {} +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = Either; + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + local_addr: &Multiaddr, + _remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + if local_addr.is_relayed() { + Ok(Either::Right(dummy::ConnectionHandler)) + } else { + Ok(Either::Left(handler::Handler::default())) + } + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + addr: &Multiaddr, + _role_override: Endpoint, + _port_use: PortUse, + ) -> Result, ConnectionDenied> { + if addr.is_relayed() { + Ok(Either::Right(dummy::ConnectionHandler)) + } else { + Ok(Either::Left(handler::Handler::default())) + } + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + let _change = self.external_addresses.on_swarm_event(&event); + + match event { + FromSwarm::ConnectionEstablished(ConnectionEstablished { + peer_id, + endpoint, + connection_id, + .. + }) => { + let remote_addr = endpoint.get_remote_address().clone(); + + let mut connection = Connection { + address: remote_addr, + relay_status: RelayStatus::Pending, + }; + + connection.disqualify_connection(); + + self.connections + .insert((peer_id, connection_id), connection); + } + FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, + connection_id, + .. + }) => { + let connection = self + .connections + .remove(&(peer_id, connection_id)) + .expect("valid connection"); + + let id = match connection.relay_status { + RelayStatus::Supported { status: ReservationStatus::Active { id } } => id, + RelayStatus::Supported { status: ReservationStatus::Pending { id } } => id, + _ => return, + }; + + self.pending_reservation.remove(&id); + self.pending_target.remove(&(peer_id, connection_id)); + + let max_reservation = self.config.max_reservations.get() as usize; + + let active_reservations = self.active_reservations(); + + if active_reservations < max_reservation { + self.meet_reservation_target(); + } + } + FromSwarm::AddressChange(AddressChange { + peer_id, + connection_id, + old, + new, + }) => { + let connection = self + .connections + .get_mut(&(peer_id, connection_id)) + .expect("valid connection"); + + let old_addr = old.get_remote_address(); + let new_addr = new.get_remote_address(); + + debug_assert!(old_addr != new_addr); + + connection.address = new_addr.clone(); + } + FromSwarm::NewListenAddr(NewListenAddr { listener_id, addr }) => { + // we only care about any new relayed address + if !addr.iter().any(|protocol| protocol == Protocol::P2pCircuit) { + return; + } + + let Some((peer_id, connection_id)) = self.pending_reservation.remove(&listener_id) + else { + return; + }; + + let connection = self + .connections + .get_mut(&(peer_id, connection_id)) + .expect("valid connection"); + + let RelayStatus::Supported { + status: ReservationStatus::Pending { id }, + } = connection.relay_status + else { + return; + }; + + connection.relay_status = RelayStatus::Supported { + status: ReservationStatus::Active { id }, + }; + } + FromSwarm::ExpiredListenAddr(_) => {} + FromSwarm::ListenerError(_) => {} + FromSwarm::ListenerClosed(_) => {} + _ => {} + } + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: THandlerOutEvent, + ) { + let Either::Left(event) = event; + + let Some(connection) = self.connections.get_mut(&(peer_id, connection_id)) else { + return; + }; + + match event { + Out::Supported => { + connection.relay_status = RelayStatus::Supported { + status: ReservationStatus::Idle, + }; + self.meet_reservation_target(); + } + Out::Unsupported => { + let _previous_status = connection.relay_status; + connection.relay_status = RelayStatus::NotSupported; + } + } + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(event); + } + self.waker.replace(cx.waker().clone()); + Poll::Pending + } +} diff --git a/protocols/relay/src/autorelay/handler.rs b/protocols/relay/src/autorelay/handler.rs new file mode 100644 index 00000000000..4306e53b7bd --- /dev/null +++ b/protocols/relay/src/autorelay/handler.rs @@ -0,0 +1,106 @@ +use std::{ + collections::VecDeque, + task::{Context, Poll}, +}; + +use libp2p_core::upgrade::DeniedUpgrade; + +use crate::HOP_PROTOCOL_NAME; +use libp2p_swarm::{ + handler::ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, SubstreamProtocol, + SupportedProtocols, +}; + +#[derive(Default, Debug)] +pub struct Handler { + events: VecDeque< + ConnectionHandlerEvent< + ::OutboundProtocol, + ::OutboundOpenInfo, + ::ToBehaviour, + >, + >, + + supported: bool, + + supported_protocol: SupportedProtocols, +} + +#[derive(Debug, Copy, Clone)] +pub enum Out { + Supported, + Unsupported, +} + +#[allow(deprecated)] +impl ConnectionHandler for Handler { + type FromBehaviour = (); + type ToBehaviour = Out; + type InboundProtocol = DeniedUpgrade; + type OutboundProtocol = DeniedUpgrade; + type InboundOpenInfo = (); + type OutboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(DeniedUpgrade, ()) + } + + fn connection_keep_alive(&self) -> bool { + false + } + + fn on_behaviour_event(&mut self, _event: Self::FromBehaviour) {} + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::RemoteProtocolsChange(protocol) + | ConnectionEvent::LocalProtocolsChange(protocol) => { + let change = self.supported_protocol.on_protocols_change(protocol); + if change { + let valid = self + .supported_protocol + .iter() + .any(|proto| HOP_PROTOCOL_NAME.eq(proto)); + + match (valid, self.supported) { + (true, false) => { + self.supported = true; + self.events + .push_back(ConnectionHandlerEvent::NotifyBehaviour(Out::Supported)); + } + (false, true) => { + self.supported = false; + self.events + .push_back(ConnectionHandlerEvent::NotifyBehaviour( + Out::Unsupported, + )); + } + (true, true) => {} + _ => {} + } + } + } + _ => {} + } + } + + fn poll( + &mut self, + _: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent, + > { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(event); + } + Poll::Pending + } +} diff --git a/protocols/relay/src/lib.rs b/protocols/relay/src/lib.rs index 515fb40ef4b..6dbdc0e8673 100644 --- a/protocols/relay/src/lib.rs +++ b/protocols/relay/src/lib.rs @@ -23,6 +23,7 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +pub mod autorelay; mod behaviour; mod copy_future; mod multiaddr_ext; diff --git a/protocols/relay/src/multiaddr_ext.rs b/protocols/relay/src/multiaddr_ext.rs index 7c06eb7eab0..44e9dc5ac08 100644 --- a/protocols/relay/src/multiaddr_ext.rs +++ b/protocols/relay/src/multiaddr_ext.rs @@ -2,10 +2,48 @@ use libp2p_core::{multiaddr::Protocol, Multiaddr}; pub(crate) trait MultiaddrExt { fn is_relayed(&self) -> bool; + + fn is_public(&self) -> bool; + + fn is_loopback(&self) -> bool; + + fn is_private(&self) -> bool; + + fn is_unspecified(&self) -> bool; } impl MultiaddrExt for Multiaddr { fn is_relayed(&self) -> bool { self.iter().any(|p| p == Protocol::P2pCircuit) } + + fn is_public(&self) -> bool { + !self.is_private() && !self.is_loopback() && !self.is_unspecified() + } + + fn is_loopback(&self) -> bool { + self.iter().any(|proto| match proto { + Protocol::Ip4(ip) => ip.is_loopback(), + Protocol::Ip6(ip) => ip.is_loopback(), + _ => false, + }) + } + + fn is_private(&self) -> bool { + self.iter().any(|proto| match proto { + Protocol::Ip4(ip) => ip.is_private(), + Protocol::Ip6(ip) => { + (ip.segments()[0] & 0xffc0) != 0xfe80 && (ip.segments()[0] & 0xfe00) != 0xfc00 + } + _ => false, + }) + } + + fn is_unspecified(&self) -> bool { + self.iter().any(|proto| match proto { + Protocol::Ip4(ip) => ip.is_unspecified(), + Protocol::Ip6(ip) => ip.is_unspecified(), + _ => false, + }) + } } From b1c1cdc468f3d03cc98900328de833349701047e Mon Sep 17 00:00:00 2001 From: Darius Date: Tue, 2 Sep 2025 09:31:11 -0400 Subject: [PATCH 02/12] chore: update cargo.toml and add changelog entry --- Cargo.lock | 2 +- Cargo.toml | 2 +- protocols/relay/CHANGELOG.md | 4 ++++ protocols/relay/Cargo.toml | 2 +- 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b088d0da44f..2d254c1a07c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2959,7 +2959,7 @@ dependencies = [ [[package]] name = "libp2p-relay" -version = "0.21.0" +version = "0.21.1" dependencies = [ "asynchronous-codec", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 527d20c27e4..6d3a460cc58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,7 +97,7 @@ libp2p-ping = { version = "0.47.0", path = "protocols/ping" } libp2p-plaintext = { version = "0.43.0", path = "transports/plaintext" } libp2p-pnet = { version = "0.26.0", path = "transports/pnet" } libp2p-quic = { version = "0.13.0", path = "transports/quic" } -libp2p-relay = { version = "0.21.0", path = "protocols/relay" } +libp2p-relay = { version = "0.21.1", path = "protocols/relay" } libp2p-rendezvous = { version = "0.17.0", path = "protocols/rendezvous" } libp2p-request-response = { version = "0.29.0", path = "protocols/request-response" } libp2p-server = { version = "0.12.7", path = "misc/server" } diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index 0f17112a76e..822476eea25 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.21.1 +- Implements autorelay that would make a reservation as soon as a connection reports supporting HOP protocol. + See [PR XXXX](https://github.com/libp2p/rust-libp2p/pull/6067/files) + ## 0.21.0 diff --git a/protocols/relay/Cargo.toml b/protocols/relay/Cargo.toml index 6124744cb0d..3871abbcf8a 100644 --- a/protocols/relay/Cargo.toml +++ b/protocols/relay/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-relay" edition.workspace = true rust-version = { workspace = true } description = "Communications relaying for libp2p" -version = "0.21.0" +version = "0.21.1" authors = ["Parity Technologies ", "Max Inden "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" From cb2fbab0f96ce9873b453cacf113a557d1c27a73 Mon Sep 17 00:00:00 2001 From: Darius Date: Tue, 2 Sep 2025 09:44:37 -0400 Subject: [PATCH 03/12] chore: rename function --- protocols/relay/src/autorelay.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/protocols/relay/src/autorelay.rs b/protocols/relay/src/autorelay.rs index 27bc66f47f3..fa67e5effc8 100644 --- a/protocols/relay/src/autorelay.rs +++ b/protocols/relay/src/autorelay.rs @@ -39,7 +39,7 @@ struct Connection { impl Connection { /// Mark relayed connection as not supported - pub fn disqualify_connection(&mut self) -> bool { + pub(crate) fn disqualify_connection_if_relayed(&mut self) -> bool { match self.address.is_relayed() { true => { self.relay_status = RelayStatus::NotSupported; @@ -310,7 +310,7 @@ impl NetworkBehaviour for Behaviour { relay_status: RelayStatus::Pending, }; - connection.disqualify_connection(); + connection.disqualify_connection_if_relayed(); self.connections .insert((peer_id, connection_id), connection); From 40a05829f3d97bb58e914c6ceee8ca9ce0f068e5 Mon Sep 17 00:00:00 2001 From: Darius Date: Tue, 2 Sep 2025 09:47:03 -0400 Subject: [PATCH 04/12] chore: remofe let/else condition --- protocols/relay/src/autorelay.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/protocols/relay/src/autorelay.rs b/protocols/relay/src/autorelay.rs index fa67e5effc8..788e768a833 100644 --- a/protocols/relay/src/autorelay.rs +++ b/protocols/relay/src/autorelay.rs @@ -402,9 +402,7 @@ impl NetworkBehaviour for Behaviour { ) { let Either::Left(event) = event; - let Some(connection) = self.connections.get_mut(&(peer_id, connection_id)) else { - return; - }; + let connection = self.connections.get_mut(&(peer_id, connection_id)).expect("valid connection"); match event { Out::Supported => { From eb7a6c7250fe130e0b6c649928b559414a7eadf0 Mon Sep 17 00:00:00 2001 From: Darius Date: Tue, 2 Sep 2025 10:21:58 -0400 Subject: [PATCH 05/12] chore: remove redundant trait members --- protocols/relay/src/autorelay.rs | 14 ++++++++-- protocols/relay/src/multiaddr_ext.rs | 38 ---------------------------- 2 files changed, 12 insertions(+), 40 deletions(-) diff --git a/protocols/relay/src/autorelay.rs b/protocols/relay/src/autorelay.rs index 788e768a833..f23dc61757c 100644 --- a/protocols/relay/src/autorelay.rs +++ b/protocols/relay/src/autorelay.rs @@ -145,7 +145,7 @@ impl Behaviour { if self .external_addresses .iter() - .any(|addr| addr.is_public() && !addr.is_relayed()) + .any(|addr| !addr.is_relayed()) { return; } @@ -294,7 +294,17 @@ impl NetworkBehaviour for Behaviour { } fn on_swarm_event(&mut self, event: FromSwarm) { - let _change = self.external_addresses.on_swarm_event(&event); + let change = self.external_addresses.on_swarm_event(&event); + + if change + && self + .external_addresses + .iter() + .any(|addr| !addr.is_relayed()) + { + self.remove_all_reservations(); + return; + } match event { FromSwarm::ConnectionEstablished(ConnectionEstablished { diff --git a/protocols/relay/src/multiaddr_ext.rs b/protocols/relay/src/multiaddr_ext.rs index 44e9dc5ac08..7c06eb7eab0 100644 --- a/protocols/relay/src/multiaddr_ext.rs +++ b/protocols/relay/src/multiaddr_ext.rs @@ -2,48 +2,10 @@ use libp2p_core::{multiaddr::Protocol, Multiaddr}; pub(crate) trait MultiaddrExt { fn is_relayed(&self) -> bool; - - fn is_public(&self) -> bool; - - fn is_loopback(&self) -> bool; - - fn is_private(&self) -> bool; - - fn is_unspecified(&self) -> bool; } impl MultiaddrExt for Multiaddr { fn is_relayed(&self) -> bool { self.iter().any(|p| p == Protocol::P2pCircuit) } - - fn is_public(&self) -> bool { - !self.is_private() && !self.is_loopback() && !self.is_unspecified() - } - - fn is_loopback(&self) -> bool { - self.iter().any(|proto| match proto { - Protocol::Ip4(ip) => ip.is_loopback(), - Protocol::Ip6(ip) => ip.is_loopback(), - _ => false, - }) - } - - fn is_private(&self) -> bool { - self.iter().any(|proto| match proto { - Protocol::Ip4(ip) => ip.is_private(), - Protocol::Ip6(ip) => { - (ip.segments()[0] & 0xffc0) != 0xfe80 && (ip.segments()[0] & 0xfe00) != 0xfc00 - } - _ => false, - }) - } - - fn is_unspecified(&self) -> bool { - self.iter().any(|proto| match proto { - Protocol::Ip4(ip) => ip.is_unspecified(), - Protocol::Ip6(ip) => ip.is_unspecified(), - _ => false, - }) - } } From 60b69315bcee49544a131f742139952a542933cb Mon Sep 17 00:00:00 2001 From: Darius Date: Tue, 2 Sep 2025 11:19:11 -0400 Subject: [PATCH 06/12] chore: remove unused imports --- protocols/relay/src/autorelay.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/protocols/relay/src/autorelay.rs b/protocols/relay/src/autorelay.rs index f23dc61757c..e926668e88e 100644 --- a/protocols/relay/src/autorelay.rs +++ b/protocols/relay/src/autorelay.rs @@ -7,8 +7,8 @@ use libp2p_core::Endpoint; use libp2p_identity::PeerId; use libp2p_swarm::derive_prelude::{ AddressChange, ConnectionClosed, ConnectionDenied, ConnectionEstablished, ConnectionId, - DialFailure, ExpiredListenAddr, FromSwarm, ListenerClosed, ListenerError, Multiaddr, - NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + ExpiredListenAddr, FromSwarm, ListenerClosed, ListenerError, Multiaddr, NetworkBehaviour, + THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use libp2p_swarm::{dummy, ExternalAddresses, ListenOpts, NewListenAddr}; use std::collections::{HashMap, HashSet, VecDeque}; From 8ed974efeb0286e497aa50ea6139be423c4fd590 Mon Sep 17 00:00:00 2001 From: Darius Date: Tue, 2 Sep 2025 11:26:01 -0400 Subject: [PATCH 07/12] chore: mark status accordingly when listener errors, closes or expires. --- protocols/relay/src/autorelay.rs | 113 +++++++++++++++++++++++++++---- 1 file changed, 98 insertions(+), 15 deletions(-) diff --git a/protocols/relay/src/autorelay.rs b/protocols/relay/src/autorelay.rs index e926668e88e..bcf038d2945 100644 --- a/protocols/relay/src/autorelay.rs +++ b/protocols/relay/src/autorelay.rs @@ -25,7 +25,7 @@ pub struct Behaviour { connections: HashMap<(PeerId, ConnectionId), Connection>, - pending_reservation: HashMap, + reservations: HashMap, pending_target: HashSet<(PeerId, ConnectionId)>, waker: Option, @@ -93,7 +93,6 @@ impl Config { pub enum Event {} impl Behaviour { - pub fn new_with_config(config: Config) -> Self { Self { config, @@ -103,7 +102,7 @@ impl Behaviour { fn select_connection_for_reservation( &mut self, - (peer_id, connection_id): (PeerId, ConnectionId), + peer_id: PeerId, connection_id: ConnectionId ) -> bool { if self.pending_target.contains(&(peer_id, connection_id)) { return false; @@ -131,14 +130,75 @@ impl Behaviour { info.relay_status = RelayStatus::Supported { status: ReservationStatus::Pending { id }, }; - self.pending_reservation - .insert(id, (peer_id, connection_id)); + self.reservations.insert(id, (peer_id, connection_id)); self.events.push_back(ToSwarm::ListenOn { opts }); self.pending_target.insert((peer_id, connection_id)); true } + fn remove_all_reservations(&mut self) { + let relay_listeners = self + .reservations + .iter() + .map(|(id, (peer_id, conn_id))| (*id, *peer_id, *conn_id)) + .collect::>(); + + for (listener_id, peer_id, connection_id) in relay_listeners { + let Some(connection) = self.connections.get_mut(&(peer_id, connection_id)) else { + continue; + }; + + assert!(matches!( + connection.relay_status, + RelayStatus::Supported { + status: ReservationStatus::Active { id } | ReservationStatus::Pending { id } + } if id == listener_id + )); + + connection.relay_status = RelayStatus::Supported { + status: ReservationStatus::Idle, + }; + + self.events + .push_back(ToSwarm::RemoveListener { id: listener_id }); + } + } + + fn disable_reservation(&mut self, id: ListenerId) { + let Some((peer_id, connection_id)) = self.reservations.remove(&id) else { + return; + }; + + let Some(connection) = self.connections.get_mut(&(peer_id, connection_id)) else { + return; + }; + + match connection.relay_status { + RelayStatus::Supported { + status: ReservationStatus::Active { .. }, + } => { + connection.relay_status = RelayStatus::Supported { + status: ReservationStatus::Idle, + }; + } + RelayStatus::Supported { + status: ReservationStatus::Pending { .. }, + } => { + connection.relay_status = RelayStatus::Supported { + status: ReservationStatus::Idle, + }; + } + RelayStatus::Pending => { + self.pending_target.remove(&(peer_id, connection_id)); + } + RelayStatus::Supported { + status: ReservationStatus::Idle, + } + | RelayStatus::NotSupported => {} + } + } + fn meet_reservation_target(&mut self) { // check to determine if there is a public external address that could possibly let us know the node // is reachable @@ -218,7 +278,7 @@ impl Behaviour { .copied() .take(remaining_targets_needed) { - if !self.select_connection_for_reservation((peer_id, connection_id)) { + if !self.select_connection_for_reservation(peer_id, connection_id) { continue; } @@ -231,7 +291,17 @@ impl Behaviour { } fn active_reservations(&self) -> usize { - self.connections.values().filter(|info| matches!(info.relay_status, RelayStatus::Supported { status: ReservationStatus::Active { .. } })).count() + self.connections + .values() + .filter(|info| { + matches!( + info.relay_status, + RelayStatus::Supported { + status: ReservationStatus::Active { .. } + } + ) + }) + .count() } // fn on_connection_established( @@ -336,12 +406,16 @@ impl NetworkBehaviour for Behaviour { .expect("valid connection"); let id = match connection.relay_status { - RelayStatus::Supported { status: ReservationStatus::Active { id } } => id, - RelayStatus::Supported { status: ReservationStatus::Pending { id } } => id, + RelayStatus::Supported { + status: ReservationStatus::Active { id }, + } => id, + RelayStatus::Supported { + status: ReservationStatus::Pending { id }, + } => id, _ => return, }; - self.pending_reservation.remove(&id); + self.reservations.remove(&id); self.pending_target.remove(&(peer_id, connection_id)); let max_reservation = self.config.max_reservations.get() as usize; @@ -376,7 +450,7 @@ impl NetworkBehaviour for Behaviour { return; } - let Some((peer_id, connection_id)) = self.pending_reservation.remove(&listener_id) + let Some((peer_id, connection_id)) = self.reservations.get(&listener_id).copied() else { return; }; @@ -397,9 +471,15 @@ impl NetworkBehaviour for Behaviour { status: ReservationStatus::Active { id }, }; } - FromSwarm::ExpiredListenAddr(_) => {} - FromSwarm::ListenerError(_) => {} - FromSwarm::ListenerClosed(_) => {} + FromSwarm::ExpiredListenAddr(ExpiredListenAddr { listener_id, .. }) => { + self.disable_reservation(listener_id); + } + FromSwarm::ListenerError(ListenerError { listener_id, .. }) => { + self.disable_reservation(listener_id); + } + FromSwarm::ListenerClosed(ListenerClosed { listener_id, .. }) => { + self.disable_reservation(listener_id); + } _ => {} } } @@ -412,7 +492,10 @@ impl NetworkBehaviour for Behaviour { ) { let Either::Left(event) = event; - let connection = self.connections.get_mut(&(peer_id, connection_id)).expect("valid connection"); + let connection = self + .connections + .get_mut(&(peer_id, connection_id)) + .expect("valid connection"); match event { Out::Supported => { From ed6481379b0e8d3f35ad885b587eff3d595d8f31 Mon Sep 17 00:00:00 2001 From: Darius Date: Tue, 2 Sep 2025 11:26:24 -0400 Subject: [PATCH 08/12] chore: fmt --- protocols/relay/src/autorelay.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/protocols/relay/src/autorelay.rs b/protocols/relay/src/autorelay.rs index bcf038d2945..c6e54f51787 100644 --- a/protocols/relay/src/autorelay.rs +++ b/protocols/relay/src/autorelay.rs @@ -102,7 +102,8 @@ impl Behaviour { fn select_connection_for_reservation( &mut self, - peer_id: PeerId, connection_id: ConnectionId + peer_id: PeerId, + connection_id: ConnectionId, ) -> bool { if self.pending_target.contains(&(peer_id, connection_id)) { return false; From bc76458f37604276d778bd4ae1a0f5eee104e37f Mon Sep 17 00:00:00 2001 From: Darius Date: Tue, 2 Sep 2025 11:28:17 -0400 Subject: [PATCH 09/12] chore: fmt --- protocols/relay/src/autorelay.rs | 37 ++++++++++++++---------- protocols/relay/src/autorelay/handler.rs | 4 +-- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/protocols/relay/src/autorelay.rs b/protocols/relay/src/autorelay.rs index c6e54f51787..73714d85bd7 100644 --- a/protocols/relay/src/autorelay.rs +++ b/protocols/relay/src/autorelay.rs @@ -1,19 +1,26 @@ -use crate::autorelay::handler::Out; -use crate::multiaddr_ext::MultiaddrExt; +use std::{ + collections::{HashMap, HashSet, VecDeque}, + num::NonZeroU8, + task::{Context, Poll, Waker}, +}; + use either::Either; -use libp2p_core::multiaddr::Protocol; -use libp2p_core::transport::{ListenerId, PortUse}; -use libp2p_core::Endpoint; +use libp2p_core::{ + multiaddr::Protocol, + transport::{ListenerId, PortUse}, + Endpoint, +}; use libp2p_identity::PeerId; -use libp2p_swarm::derive_prelude::{ - AddressChange, ConnectionClosed, ConnectionDenied, ConnectionEstablished, ConnectionId, - ExpiredListenAddr, FromSwarm, ListenerClosed, ListenerError, Multiaddr, NetworkBehaviour, - THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, +use libp2p_swarm::{ + derive_prelude::{ + AddressChange, ConnectionClosed, ConnectionDenied, ConnectionEstablished, ConnectionId, + ExpiredListenAddr, FromSwarm, ListenerClosed, ListenerError, Multiaddr, NetworkBehaviour, + THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + }, + dummy, ExternalAddresses, ListenOpts, NewListenAddr, }; -use libp2p_swarm::{dummy, ExternalAddresses, ListenOpts, NewListenAddr}; -use std::collections::{HashMap, HashSet, VecDeque}; -use std::num::NonZeroU8; -use std::task::{Context, Poll, Waker}; + +use crate::{autorelay::handler::Out, multiaddr_ext::MultiaddrExt}; mod handler; @@ -201,8 +208,8 @@ impl Behaviour { } fn meet_reservation_target(&mut self) { - // check to determine if there is a public external address that could possibly let us know the node - // is reachable + // check to determine if there is a public external address that could possibly let us know + // the node is reachable if self .external_addresses .iter() diff --git a/protocols/relay/src/autorelay/handler.rs b/protocols/relay/src/autorelay/handler.rs index 4306e53b7bd..8f4c633a75d 100644 --- a/protocols/relay/src/autorelay/handler.rs +++ b/protocols/relay/src/autorelay/handler.rs @@ -4,13 +4,13 @@ use std::{ }; use libp2p_core::upgrade::DeniedUpgrade; - -use crate::HOP_PROTOCOL_NAME; use libp2p_swarm::{ handler::ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, SubstreamProtocol, SupportedProtocols, }; +use crate::HOP_PROTOCOL_NAME; + #[derive(Default, Debug)] pub struct Handler { events: VecDeque< From bb02c3824b7d9fc055980bafe3fb306d47560919 Mon Sep 17 00:00:00 2001 From: Darius Date: Tue, 2 Sep 2025 20:15:22 -0400 Subject: [PATCH 10/12] chore: remove pending target --- protocols/relay/src/autorelay.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/protocols/relay/src/autorelay.rs b/protocols/relay/src/autorelay.rs index 73714d85bd7..4e44dfc3ffb 100644 --- a/protocols/relay/src/autorelay.rs +++ b/protocols/relay/src/autorelay.rs @@ -196,11 +196,10 @@ impl Behaviour { connection.relay_status = RelayStatus::Supported { status: ReservationStatus::Idle, }; - } - RelayStatus::Pending => { self.pending_target.remove(&(peer_id, connection_id)); } - RelayStatus::Supported { + RelayStatus::Pending + | RelayStatus::Supported { status: ReservationStatus::Idle, } | RelayStatus::NotSupported => {} From c0e50c0073d533198117788785907dbdadd2e02c Mon Sep 17 00:00:00 2001 From: Darius Date: Wed, 10 Sep 2025 16:58:02 -0400 Subject: [PATCH 11/12] chore: remove pending target and add functions to check relay status and get pending amount --- protocols/relay/src/autorelay.rs | 68 +++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 24 deletions(-) diff --git a/protocols/relay/src/autorelay.rs b/protocols/relay/src/autorelay.rs index 4e44dfc3ffb..f6027910130 100644 --- a/protocols/relay/src/autorelay.rs +++ b/protocols/relay/src/autorelay.rs @@ -33,7 +33,6 @@ pub struct Behaviour { connections: HashMap<(PeerId, ConnectionId), Connection>, reservations: HashMap, - pending_target: HashSet<(PeerId, ConnectionId)>, waker: Option, } @@ -58,6 +57,28 @@ impl Connection { } } } + + pub(crate) fn is_reservation_confirmed(&self) -> bool { + matches!( + self.relay_status, + RelayStatus::Supported { + status: ReservationStatus::Active { .. } + } + ) + } + + pub(crate) fn is_confirmation_pending(&self) -> bool { + self.relay_status == RelayStatus::Pending + } + + pub(crate) fn is_reservation_pending(&self) -> bool { + matches!( + self.relay_status, + RelayStatus::Supported { + status: ReservationStatus::Pending { .. } + } + ) + } } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -107,20 +128,27 @@ impl Behaviour { } } + fn get_pending_reservations(&self) -> usize { + self.connections + .values() + .filter(|info| info.is_reservation_pending()) + .count() + } + fn select_connection_for_reservation( &mut self, peer_id: PeerId, connection_id: ConnectionId, ) -> bool { - if self.pending_target.contains(&(peer_id, connection_id)) { - return false; - } - let info = self .connections .get_mut(&(peer_id, connection_id)) .expect("connection is present"); + if !info.is_confirmation_pending() { + return false; + } + let addr_with_peer_id = match info.address.clone().with_p2p(peer_id) { Ok(addr) => addr, Err(addr) => { @@ -140,7 +168,6 @@ impl Behaviour { }; self.reservations.insert(id, (peer_id, connection_id)); self.events.push_back(ToSwarm::ListenOn { opts }); - self.pending_target.insert((peer_id, connection_id)); true } @@ -196,7 +223,6 @@ impl Behaviour { connection.relay_status = RelayStatus::Supported { status: ReservationStatus::Idle, }; - self.pending_target.remove(&(peer_id, connection_id)); } RelayStatus::Pending | RelayStatus::Supported { @@ -246,9 +272,9 @@ impl Behaviour { return; } - let pending_target_len = self.pending_target.len(); + let pending_target_len = self.get_pending_reservations(); - if pending_target_len >= max_reservation { + if pending_target_len == max_reservation { return; } @@ -256,12 +282,10 @@ impl Behaviour { .connections .iter() .filter(|(_, info)| { - matches!( - info.relay_status, - RelayStatus::Supported { - status: ReservationStatus::Idle + info.relay_status + == RelayStatus::Supported { + status: ReservationStatus::Idle, } - ) }) .map(|((peer_id, connection_id), _)| (*peer_id, *connection_id)) .collect::>(); @@ -273,7 +297,7 @@ impl Behaviour { } let remaining_targets_needed = targets_count - .checked_sub(self.pending_target.len()) + .checked_sub(pending_target_len) .unwrap_or_default(); if remaining_targets_needed == 0 { @@ -289,12 +313,12 @@ impl Behaviour { continue; } - if self.pending_target.len() == max_reservation { + if self.get_pending_reservations() == max_reservation { break; } } - debug_assert!(self.pending_target.len() <= max_reservation); + debug_assert!(self.get_pending_reservations() <= max_reservation); } fn active_reservations(&self) -> usize { @@ -423,7 +447,6 @@ impl NetworkBehaviour for Behaviour { }; self.reservations.remove(&id); - self.pending_target.remove(&(peer_id, connection_id)); let max_reservation = self.config.max_reservations.get() as usize; @@ -467,15 +490,12 @@ impl NetworkBehaviour for Behaviour { .get_mut(&(peer_id, connection_id)) .expect("valid connection"); - let RelayStatus::Supported { - status: ReservationStatus::Pending { id }, - } = connection.relay_status - else { + if !connection.is_reservation_confirmed() { return; - }; + } connection.relay_status = RelayStatus::Supported { - status: ReservationStatus::Active { id }, + status: ReservationStatus::Active { id: listener_id }, }; } FromSwarm::ExpiredListenAddr(ExpiredListenAddr { listener_id, .. }) => { From 709a6f6267e5e01a0d092815c3c5ae953be847af Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Fri, 12 Sep 2025 05:51:01 -0400 Subject: [PATCH 12/12] Update protocols/relay/CHANGELOG.md --- protocols/relay/CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index 782e1598efa..4bc2ec39b8d 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -5,7 +5,6 @@ - reduce allocations by replacing `get_or_insert` with `get_or_insert_with` See [PR 6136](https://github.com/libp2p/rust-libp2p/pull/6136) - ## 0.21.0