Skip to content
Open
4 changes: 4 additions & 0 deletions protocols/relay/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 0.21.1

- Automatically configure HOP protocol advertisement based on external addresses, with the ability to override this
functionality using `Behaviour::set_status` to explicitly set `Status::{Enable,Disable}` to enable or disable
protocol advertisement.
See [PR 6154](https://github.com/libp2p/rust-libp2p/pull/6154).
Comment on lines +3 to +6
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a breaking change. Previously, the HOP advertisement was always enabled, whereas now it depends on external addresses being present. The behavior in the end is probably the same (reservations only work if external addresses are present), but the error on the client side will be a different one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a bit torn on if this would be considered breaking or not since if the relay message doesnt contain the addresses (which in our case, external addresses), it would error on the client side when attempting to establish a reservation, though in this case it would give a different error message compared to previously where if the relay didnt have an external address since it would not be advertising its protocol without it being enabled in some manner. I guess in a sense, it would be though since it would be a different error than what is expecting (probably should check and see the errors myself too)

- reduce allocations by replacing `get_or_insert` with `get_or_insert_with`
See [PR 6136](https://github.com/libp2p/rust-libp2p/pull/6136)

Expand Down
104 changes: 101 additions & 3 deletions protocols/relay/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::{
collections::{hash_map, HashMap, HashSet, VecDeque},
num::NonZeroU32,
ops::Add,
task::{Context, Poll},
task::{Context, Poll, Waker},
time::Duration,
};

Expand Down Expand Up @@ -260,6 +260,21 @@ pub struct Behaviour {
queued_actions: VecDeque<ToSwarm<Event, THandlerInEvent<Self>>>,

external_addresses: ExternalAddresses,

status: Status,

auto_status_change: bool,

waker: Option<Waker>,
}

#[derive(PartialEq, Copy, Clone, Debug)]
pub enum Status {
/// Enables advertisement of the STOP protocol
Enable,

/// Disables advertisement of the STOP protocol
Disable,
}

impl Behaviour {
Expand All @@ -271,6 +286,78 @@ impl Behaviour {
circuits: Default::default(),
queued_actions: Default::default(),
external_addresses: Default::default(),
status: Status::Disable,
auto_status_change: true,
Comment on lines +292 to +293
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please document this default behavior somewhere?

waker: None,
}
}

pub fn set_status(&mut self, status: Option<Status>) {
match status {
Some(status) => {
self.status = status;
self.auto_status_change = false;
self.reconfigure_relay_status();
}
None => {
self.auto_status_change = true;
self.determine_relay_status_from_external_address();
}
}
if let Some(waker) = self.waker.take() {
waker.wake();
}
}

fn reconfigure_relay_status(&mut self) {
if self.reservations.is_empty() {
return;
}

for (peer_id, connections) in self.reservations.iter() {
self.queued_actions
.extend(connections.iter().map(|id| ToSwarm::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::One(*id),
event: Either::Left(handler::In::SetStatus {
status: self.status,
}),
}));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to inform all handlers? I think otherwise existing connection handlers without an active reservation would still have the outdated status.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea we should. Good catch. Forgot this behaviour only tracks active reservations instead of every connection. Will update this shortly.

}

fn determine_relay_status_from_external_address(&mut self) {
let old = self.status;

self.status = match (self.external_addresses.as_slice(), self.status) {
([], Status::Enable) => {
tracing::debug!("disabling protocol advertisment because we no longer have any confirmed external addresses");
Status::Disable
}
([], Status::Disable) => {
// Previously disabled because of no external addresses.
Status::Disable
}
(confirmed_external_addresses, Status::Disable) => {
debug_assert!(
!confirmed_external_addresses.is_empty(),
"Previous match arm handled empty list"
);
tracing::debug!("advertising protcol because we are now externally reachable");
Status::Enable
}
(confirmed_external_addresses, Status::Enable) => {
debug_assert!(
!confirmed_external_addresses.is_empty(),
"Previous match arm handled empty list"
);

Status::Enable
}
};

if self.status != old {
self.reconfigure_relay_status();
}
}

Expand Down Expand Up @@ -337,6 +424,7 @@ impl NetworkBehaviour for Behaviour {
local_addr: local_addr.clone(),
send_back_addr: remote_addr.clone(),
},
self.status,
)))
}

Expand Down Expand Up @@ -364,11 +452,16 @@ impl NetworkBehaviour for Behaviour {
role_override,
port_use,
},
self.status,
)))
}

fn on_swarm_event(&mut self, event: FromSwarm) {
self.external_addresses.on_swarm_event(&event);
let changed = self.external_addresses.on_swarm_event(&event);

if self.auto_status_change && changed {
self.determine_relay_status_from_external_address();
}

if let FromSwarm::ConnectionClosed(connection_closed) = event {
self.on_connection_closed(connection_closed)
Expand Down Expand Up @@ -718,11 +811,16 @@ impl NetworkBehaviour for Behaviour {
}

#[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))]
fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
if let Some(to_swarm) = self.queued_actions.pop_front() {
return Poll::Ready(to_swarm);
}

self.waker = Some(cx.waker().clone());

Poll::Pending
}
}
Expand Down
31 changes: 25 additions & 6 deletions protocols/relay/src/behaviour/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ use futures::{
stream::{FuturesUnordered, StreamExt},
};
use futures_timer::Delay;
use libp2p_core::{upgrade::ReadyUpgrade, ConnectedPoint, Multiaddr};
use libp2p_core::{
upgrade::{DeniedUpgrade, ReadyUpgrade},
ConnectedPoint, Multiaddr,
};
use libp2p_identity::PeerId;
use libp2p_swarm::{
handler::{ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound},
Expand All @@ -43,7 +46,7 @@ use libp2p_swarm::{
use web_time::Instant;

use crate::{
behaviour::CircuitId,
behaviour::{self, CircuitId},
copy_future::CopyFuture,
proto,
protocol::{inbound_hop, outbound_stop},
Expand Down Expand Up @@ -87,6 +90,9 @@ pub enum In {
dst_stream: Stream,
dst_pending_data: Bytes,
},
SetStatus {
status: behaviour::Status,
},
}

impl fmt::Debug for In {
Expand Down Expand Up @@ -137,6 +143,10 @@ impl fmt::Debug for In {
.field("circuit_id", circuit_id)
.field("dst_peer_id", dst_peer_id)
.finish(),
In::SetStatus { status } => f
.debug_struct("In::SetStatus")
.field("status", status)
.finish(),
}
}
}
Expand Down Expand Up @@ -385,10 +395,12 @@ pub struct Handler {
CircuitId,
Result<outbound_stop::Circuit, outbound_stop::Error>,
>,

status: behaviour::Status,
}

impl Handler {
pub fn new(config: Config, endpoint: ConnectedPoint) -> Handler {
pub fn new(config: Config, endpoint: ConnectedPoint, status: behaviour::Status) -> Handler {
Handler {
inbound_workers: futures_bounded::FuturesSet::new(
STREAM_TIMEOUT,
Expand All @@ -409,6 +421,7 @@ impl Handler {
active_reservation: Default::default(),
pending_connect_requests: Default::default(),
active_connect_requests: Default::default(),
status,
}
}

Expand Down Expand Up @@ -496,13 +509,18 @@ type Futures<T> = FuturesUnordered<BoxFuture<'static, T>>;
impl ConnectionHandler for Handler {
type FromBehaviour = In;
type ToBehaviour = Event;
type InboundProtocol = ReadyUpgrade<StreamProtocol>;
type InboundProtocol = Either<ReadyUpgrade<StreamProtocol>, DeniedUpgrade>;
type InboundOpenInfo = ();
type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
type OutboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ())
match self.status {
behaviour::Status::Enable => {
SubstreamProtocol::new(Either::Left(ReadyUpgrade::new(HOP_PROTOCOL_NAME)), ())
}
behaviour::Status::Disable => SubstreamProtocol::new(Either::Right(DeniedUpgrade), ()),
}
}

fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
Expand Down Expand Up @@ -594,6 +612,7 @@ impl ConnectionHandler for Handler {
.boxed(),
);
}
In::SetStatus { status } => self.status = status,
}
}

Expand Down Expand Up @@ -890,7 +909,7 @@ impl ConnectionHandler for Handler {
) {
match event {
ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
protocol: stream,
protocol: futures::future::Either::Left(stream),
..
}) => {
self.on_fully_negotiated_inbound(stream);
Expand Down
4 changes: 3 additions & 1 deletion protocols/relay/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ mod proto {
};
}

pub use behaviour::{rate_limiter::RateLimiter, Behaviour, CircuitId, Config, Event, StatusCode};
pub use behaviour::{
rate_limiter::RateLimiter, Behaviour, CircuitId, Config, Event, Status, StatusCode,
};
pub use protocol::{HOP_PROTOCOL_NAME, STOP_PROTOCOL_NAME};

/// Types related to the relay protocol inbound.
Expand Down
Loading