diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index fde8a2a6807..4bc2ec39b8d 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -1,4 +1,6 @@ ## 0.21.1 +- Implements autorelay that would make a reservation as soon as a connection reports supporting HOP protocol. + See [PR XXXX](https://github.com/libp2p/rust-libp2p/pull/XXXX) - reduce allocations by replacing `get_or_insert` with `get_or_insert_with` See [PR 6136](https://github.com/libp2p/rust-libp2p/pull/6136) diff --git a/protocols/relay/src/autorelay.rs b/protocols/relay/src/autorelay.rs new file mode 100644 index 00000000000..f6027910130 --- /dev/null +++ b/protocols/relay/src/autorelay.rs @@ -0,0 +1,551 @@ +use std::{ + collections::{HashMap, HashSet, VecDeque}, + num::NonZeroU8, + task::{Context, Poll, Waker}, +}; + +use either::Either; +use libp2p_core::{ + multiaddr::Protocol, + transport::{ListenerId, PortUse}, + Endpoint, +}; +use libp2p_identity::PeerId; +use libp2p_swarm::{ + derive_prelude::{ + AddressChange, ConnectionClosed, ConnectionDenied, ConnectionEstablished, ConnectionId, + ExpiredListenAddr, FromSwarm, ListenerClosed, ListenerError, Multiaddr, NetworkBehaviour, + THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + }, + dummy, ExternalAddresses, ListenOpts, NewListenAddr, +}; + +use crate::{autorelay::handler::Out, multiaddr_ext::MultiaddrExt}; + +mod handler; + +#[derive(Default, Debug)] +pub struct Behaviour { + config: Config, + external_addresses: ExternalAddresses, + events: VecDeque::ToSwarm, THandlerInEvent>>, + + connections: HashMap<(PeerId, ConnectionId), Connection>, + + reservations: HashMap, + + waker: Option, +} + +#[derive(Debug)] +struct Connection { + address: Multiaddr, + relay_status: RelayStatus, +} + +impl Connection { + /// Mark relayed connection as not supported + pub(crate) fn disqualify_connection_if_relayed(&mut self) -> bool { + match self.address.is_relayed() { + true => { + self.relay_status = RelayStatus::NotSupported; + true + } + false => { + self.relay_status = RelayStatus::Pending; + false + } + } + } + + pub(crate) fn is_reservation_confirmed(&self) -> bool { + matches!( + self.relay_status, + RelayStatus::Supported { + status: ReservationStatus::Active { .. } + } + ) + } + + pub(crate) fn is_confirmation_pending(&self) -> bool { + self.relay_status == RelayStatus::Pending + } + + pub(crate) fn is_reservation_pending(&self) -> bool { + matches!( + self.relay_status, + RelayStatus::Supported { + status: ReservationStatus::Pending { .. } + } + ) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum RelayStatus { + Supported { status: ReservationStatus }, + NotSupported, + Pending, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum ReservationStatus { + Idle, + Pending { id: ListenerId }, + Active { id: ListenerId }, +} + +#[derive(Debug)] +pub struct Config { + max_reservations: NonZeroU8, +} + +impl Default for Config { + fn default() -> Self { + Self { + max_reservations: NonZeroU8::new(2).unwrap(), + } + } +} + +impl Config { + pub fn set_max_reservations(mut self, max_reservations: u8) -> Self { + assert!(max_reservations > 0); + self.max_reservations = NonZeroU8::new(max_reservations).expect("greater than zero"); + self + } +} + +#[derive(Debug)] +#[non_exhaustive] +pub enum Event {} + +impl Behaviour { + pub fn new_with_config(config: Config) -> Self { + Self { + config, + ..Default::default() + } + } + + fn get_pending_reservations(&self) -> usize { + self.connections + .values() + .filter(|info| info.is_reservation_pending()) + .count() + } + + fn select_connection_for_reservation( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + ) -> bool { + let info = self + .connections + .get_mut(&(peer_id, connection_id)) + .expect("connection is present"); + + if !info.is_confirmation_pending() { + return false; + } + + let addr_with_peer_id = match info.address.clone().with_p2p(peer_id) { + Ok(addr) => addr, + Err(addr) => { + tracing::warn!(%addr, "address unexpectedly contains a different peer id than the connection"); + return false; + } + }; + + let relay_addr = addr_with_peer_id.with(Protocol::P2pCircuit); + + let opts = ListenOpts::new(relay_addr); + + let id = opts.listener_id(); + + info.relay_status = RelayStatus::Supported { + status: ReservationStatus::Pending { id }, + }; + self.reservations.insert(id, (peer_id, connection_id)); + self.events.push_back(ToSwarm::ListenOn { opts }); + + true + } + + fn remove_all_reservations(&mut self) { + let relay_listeners = self + .reservations + .iter() + .map(|(id, (peer_id, conn_id))| (*id, *peer_id, *conn_id)) + .collect::>(); + + for (listener_id, peer_id, connection_id) in relay_listeners { + let Some(connection) = self.connections.get_mut(&(peer_id, connection_id)) else { + continue; + }; + + assert!(matches!( + connection.relay_status, + RelayStatus::Supported { + status: ReservationStatus::Active { id } | ReservationStatus::Pending { id } + } if id == listener_id + )); + + connection.relay_status = RelayStatus::Supported { + status: ReservationStatus::Idle, + }; + + self.events + .push_back(ToSwarm::RemoveListener { id: listener_id }); + } + } + + fn disable_reservation(&mut self, id: ListenerId) { + let Some((peer_id, connection_id)) = self.reservations.remove(&id) else { + return; + }; + + let Some(connection) = self.connections.get_mut(&(peer_id, connection_id)) else { + return; + }; + + match connection.relay_status { + RelayStatus::Supported { + status: ReservationStatus::Active { .. }, + } => { + connection.relay_status = RelayStatus::Supported { + status: ReservationStatus::Idle, + }; + } + RelayStatus::Supported { + status: ReservationStatus::Pending { .. }, + } => { + connection.relay_status = RelayStatus::Supported { + status: ReservationStatus::Idle, + }; + } + RelayStatus::Pending + | RelayStatus::Supported { + status: ReservationStatus::Idle, + } + | RelayStatus::NotSupported => {} + } + } + + fn meet_reservation_target(&mut self) { + // check to determine if there is a public external address that could possibly let us know + // the node is reachable + if self + .external_addresses + .iter() + .any(|addr| !addr.is_relayed()) + { + return; + } + + let max_reservation = self.config.max_reservations.get() as usize; + + let peers_not_supported = self.connections.is_empty() + || self + .connections + .iter() + .all(|(_, connection)| connection.relay_status == RelayStatus::NotSupported); + + if peers_not_supported { + return; + } + + let relayed_targets = self + .connections + .iter() + .filter(|(_, info)| { + matches!( + info.relay_status, + RelayStatus::Supported { + status: ReservationStatus::Active { .. } + } + ) + }) + .count(); + + if relayed_targets == max_reservation { + return; + } + + let pending_target_len = self.get_pending_reservations(); + + if pending_target_len == max_reservation { + return; + } + + let possible_targets = self + .connections + .iter() + .filter(|(_, info)| { + info.relay_status + == RelayStatus::Supported { + status: ReservationStatus::Idle, + } + }) + .map(|((peer_id, connection_id), _)| (*peer_id, *connection_id)) + .collect::>(); + + let targets_count = std::cmp::min(possible_targets.len(), max_reservation); + + if targets_count == 0 { + return; + } + + let remaining_targets_needed = targets_count + .checked_sub(pending_target_len) + .unwrap_or_default(); + + if remaining_targets_needed == 0 { + return; + } + + for (peer_id, connection_id) in possible_targets + .iter() + .copied() + .take(remaining_targets_needed) + { + if !self.select_connection_for_reservation(peer_id, connection_id) { + continue; + } + + if self.get_pending_reservations() == max_reservation { + break; + } + } + + debug_assert!(self.get_pending_reservations() <= max_reservation); + } + + fn active_reservations(&self) -> usize { + self.connections + .values() + .filter(|info| { + matches!( + info.relay_status, + RelayStatus::Supported { + status: ReservationStatus::Active { .. } + } + ) + }) + .count() + } + + // fn on_connection_established( + // &mut self, + // peer_id: PeerId, + // connection_id: ConnectionId, + // connection_id: ConnectionId, + // address: Multiaddr, + // ) { + // } + // + // fn on_connection_closed(&mut self, peer_id: PeerId, connection_id: ConnectionId) {} + // + // fn on_address_change( + // &mut self, + // peer_id: PeerId, + // connection_id: ConnectionId, + // old_addr: &Multiaddr, + // new_addr: &Multiaddr, + // ) { + // } + // + // fn on_new_listen_addr(&mut self, listener_id: ListenerId, addr: &Multiaddr) {} + // fn on_expired_listen_addr(&mut self, listener_id: ListenerId) {} + // fn on_listener_error(&mut self, listener_id: ListenerId, error: &std::io::Error) {} + // fn on_listener_closed(&mut self, listener_id: ListenerId) {} +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = Either; + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + local_addr: &Multiaddr, + _remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + if local_addr.is_relayed() { + Ok(Either::Right(dummy::ConnectionHandler)) + } else { + Ok(Either::Left(handler::Handler::default())) + } + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + addr: &Multiaddr, + _role_override: Endpoint, + _port_use: PortUse, + ) -> Result, ConnectionDenied> { + if addr.is_relayed() { + Ok(Either::Right(dummy::ConnectionHandler)) + } else { + Ok(Either::Left(handler::Handler::default())) + } + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + let change = self.external_addresses.on_swarm_event(&event); + + if change + && self + .external_addresses + .iter() + .any(|addr| !addr.is_relayed()) + { + self.remove_all_reservations(); + return; + } + + match event { + FromSwarm::ConnectionEstablished(ConnectionEstablished { + peer_id, + endpoint, + connection_id, + .. + }) => { + let remote_addr = endpoint.get_remote_address().clone(); + + let mut connection = Connection { + address: remote_addr, + relay_status: RelayStatus::Pending, + }; + + connection.disqualify_connection_if_relayed(); + + self.connections + .insert((peer_id, connection_id), connection); + } + FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, + connection_id, + .. + }) => { + let connection = self + .connections + .remove(&(peer_id, connection_id)) + .expect("valid connection"); + + let id = match connection.relay_status { + RelayStatus::Supported { + status: ReservationStatus::Active { id }, + } => id, + RelayStatus::Supported { + status: ReservationStatus::Pending { id }, + } => id, + _ => return, + }; + + self.reservations.remove(&id); + + let max_reservation = self.config.max_reservations.get() as usize; + + let active_reservations = self.active_reservations(); + + if active_reservations < max_reservation { + self.meet_reservation_target(); + } + } + FromSwarm::AddressChange(AddressChange { + peer_id, + connection_id, + old, + new, + }) => { + let connection = self + .connections + .get_mut(&(peer_id, connection_id)) + .expect("valid connection"); + + let old_addr = old.get_remote_address(); + let new_addr = new.get_remote_address(); + + debug_assert!(old_addr != new_addr); + + connection.address = new_addr.clone(); + } + FromSwarm::NewListenAddr(NewListenAddr { listener_id, addr }) => { + // we only care about any new relayed address + if !addr.iter().any(|protocol| protocol == Protocol::P2pCircuit) { + return; + } + + let Some((peer_id, connection_id)) = self.reservations.get(&listener_id).copied() + else { + return; + }; + + let connection = self + .connections + .get_mut(&(peer_id, connection_id)) + .expect("valid connection"); + + if !connection.is_reservation_confirmed() { + return; + } + + connection.relay_status = RelayStatus::Supported { + status: ReservationStatus::Active { id: listener_id }, + }; + } + FromSwarm::ExpiredListenAddr(ExpiredListenAddr { listener_id, .. }) => { + self.disable_reservation(listener_id); + } + FromSwarm::ListenerError(ListenerError { listener_id, .. }) => { + self.disable_reservation(listener_id); + } + FromSwarm::ListenerClosed(ListenerClosed { listener_id, .. }) => { + self.disable_reservation(listener_id); + } + _ => {} + } + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: THandlerOutEvent, + ) { + let Either::Left(event) = event; + + let connection = self + .connections + .get_mut(&(peer_id, connection_id)) + .expect("valid connection"); + + match event { + Out::Supported => { + connection.relay_status = RelayStatus::Supported { + status: ReservationStatus::Idle, + }; + self.meet_reservation_target(); + } + Out::Unsupported => { + let _previous_status = connection.relay_status; + connection.relay_status = RelayStatus::NotSupported; + } + } + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(event); + } + self.waker.replace(cx.waker().clone()); + Poll::Pending + } +} diff --git a/protocols/relay/src/autorelay/handler.rs b/protocols/relay/src/autorelay/handler.rs new file mode 100644 index 00000000000..8f4c633a75d --- /dev/null +++ b/protocols/relay/src/autorelay/handler.rs @@ -0,0 +1,106 @@ +use std::{ + collections::VecDeque, + task::{Context, Poll}, +}; + +use libp2p_core::upgrade::DeniedUpgrade; +use libp2p_swarm::{ + handler::ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, SubstreamProtocol, + SupportedProtocols, +}; + +use crate::HOP_PROTOCOL_NAME; + +#[derive(Default, Debug)] +pub struct Handler { + events: VecDeque< + ConnectionHandlerEvent< + ::OutboundProtocol, + ::OutboundOpenInfo, + ::ToBehaviour, + >, + >, + + supported: bool, + + supported_protocol: SupportedProtocols, +} + +#[derive(Debug, Copy, Clone)] +pub enum Out { + Supported, + Unsupported, +} + +#[allow(deprecated)] +impl ConnectionHandler for Handler { + type FromBehaviour = (); + type ToBehaviour = Out; + type InboundProtocol = DeniedUpgrade; + type OutboundProtocol = DeniedUpgrade; + type InboundOpenInfo = (); + type OutboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(DeniedUpgrade, ()) + } + + fn connection_keep_alive(&self) -> bool { + false + } + + fn on_behaviour_event(&mut self, _event: Self::FromBehaviour) {} + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::RemoteProtocolsChange(protocol) + | ConnectionEvent::LocalProtocolsChange(protocol) => { + let change = self.supported_protocol.on_protocols_change(protocol); + if change { + let valid = self + .supported_protocol + .iter() + .any(|proto| HOP_PROTOCOL_NAME.eq(proto)); + + match (valid, self.supported) { + (true, false) => { + self.supported = true; + self.events + .push_back(ConnectionHandlerEvent::NotifyBehaviour(Out::Supported)); + } + (false, true) => { + self.supported = false; + self.events + .push_back(ConnectionHandlerEvent::NotifyBehaviour( + Out::Unsupported, + )); + } + (true, true) => {} + _ => {} + } + } + } + _ => {} + } + } + + fn poll( + &mut self, + _: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent, + > { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(event); + } + Poll::Pending + } +} diff --git a/protocols/relay/src/lib.rs b/protocols/relay/src/lib.rs index 515fb40ef4b..6dbdc0e8673 100644 --- a/protocols/relay/src/lib.rs +++ b/protocols/relay/src/lib.rs @@ -23,6 +23,7 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +pub mod autorelay; mod behaviour; mod copy_future; mod multiaddr_ext;