Skip to content

Commit 0c02a8a

Browse files
authored
swarm/src/protocols_handler: Use FuturesUnordered in NodeHandlerWrapper (#1775)
> Futures managed by FuturesUnordered will only be polled when they generate wake-up notifications. This reduces the required amount of work needed to poll large numbers of futures. https://docs.rs/futures/0.3.5/futures/stream/struct.FuturesUnordered.html Instead of iterating each inbound and outbound upgrade looking for one to make progress, use a `FuturesUnordered` for both pending inbound and pending outbound upgrades. As a result only those upgrades are polled that are ready to progress.
1 parent b74285a commit 0c02a8a

File tree

5 files changed

+86
-70
lines changed

5 files changed

+86
-70
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
- Update `libp2p-core`, `libp2p-floodsub`, `libp2p-gossipsub`, `libp2p-mplex`,
2929
`libp2p-noise`, `libp2p-plaintext`, `libp2p-pnet`, `libp2p-request-response`,
30-
`libp2p-tcp`, `libp2p-websocket` and `parity-multiaddr`.
30+
`libp2p-swarm`, `libp2p-tcp`, `libp2p-websocket` and `parity-multiaddr`.
3131

3232
# Version 0.28.1 [2020-09-10]
3333

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ libp2p-ping = { version = "0.22.0", path = "protocols/ping", optional = true }
7474
libp2p-plaintext = { version = "0.23.0", path = "protocols/plaintext", optional = true }
7575
libp2p-pnet = { version = "0.19.2", path = "protocols/pnet", optional = true }
7676
libp2p-request-response = { version = "0.4.0", path = "protocols/request-response", optional = true }
77-
libp2p-swarm = { version = "0.22.0", path = "swarm" }
77+
libp2p-swarm = { version = "0.22.1", path = "swarm" }
7878
libp2p-uds = { version = "0.22.0", path = "transports/uds", optional = true }
7979
libp2p-wasm-ext = { version = "0.22.0", path = "transports/wasm-ext", optional = true }
8080
libp2p-yamux = { version = "0.25.0", path = "muxers/yamux", optional = true }

swarm/CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
1+
# 0.22.1 [unreleased]
2+
3+
- Instead of iterating each inbound and outbound substream upgrade looking for
4+
one to make progress, use a `FuturesUnordered` for both pending inbound and
5+
pending outbound upgrades. As a result only those upgrades are polled that are
6+
ready to progress.
7+
8+
Implementors of `InboundUpgrade` and `OutboundUpgrade` need to ensure to wake
9+
up the underlying task once they are ready to make progress as they won't be
10+
polled otherwise.
11+
12+
[PR 1775](https://github.com/libp2p/rust-libp2p/pull/1775)
13+
114
# 0.22.0 [2020-09-09]
215

316
- Bump `libp2p-core` dependency.

swarm/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
name = "libp2p-swarm"
33
edition = "2018"
44
description = "The libp2p swarm"
5-
version = "0.22.0"
5+
version = "0.22.1"
66
authors = ["Parity Technologies <[email protected]>"]
77
license = "MIT"
88
repository = "https://github.com/libp2p/rust-libp2p"

swarm/src/protocols_handler/node_handler.rs

Lines changed: 70 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use crate::protocols_handler::{
2828
};
2929

3030
use futures::prelude::*;
31+
use futures::stream::FuturesUnordered;
3132
use libp2p_core::{
3233
Multiaddr,
3334
PeerId,
@@ -41,7 +42,7 @@ use libp2p_core::{
4142
SubstreamEndpoint,
4243
},
4344
muxing::StreamMuxerBox,
44-
upgrade::{self, InboundUpgradeApply, OutboundUpgradeApply}
45+
upgrade::{self, InboundUpgradeApply, OutboundUpgradeApply, UpgradeError}
4546
};
4647
use std::{error, fmt, pin::Pin, task::Context, task::Poll, time::Duration};
4748
use wasm_timer::{Delay, Instant};
@@ -76,8 +77,8 @@ where
7677
fn into_handler(self, connected: &Connected<TConnInfo>) -> Self::Handler {
7778
NodeHandlerWrapper {
7879
handler: self.handler.into_handler(connected.peer_id(), &connected.endpoint),
79-
negotiating_in: Vec::new(),
80-
negotiating_out: Vec::new(),
80+
negotiating_in: Default::default(),
81+
negotiating_out: Default::default(),
8182
queued_dial_upgrades: Vec::new(),
8283
unique_dial_upgrade_id: 0,
8384
shutdown: Shutdown::None,
@@ -95,18 +96,15 @@ where
9596
/// The underlying handler.
9697
handler: TProtoHandler,
9798
/// Futures that upgrade incoming substreams.
98-
negotiating_in: Vec<(
99+
negotiating_in: FuturesUnordered<SubstreamUpgrade<
99100
TProtoHandler::InboundOpenInfo,
100101
InboundUpgradeApply<Substream<StreamMuxerBox>, SendWrapper<TProtoHandler::InboundProtocol>>,
101-
Delay
102-
)>,
103-
/// Futures that upgrade outgoing substreams. The first element of the tuple is the userdata
104-
/// to pass back once successfully opened.
105-
negotiating_out: Vec<(
102+
>>,
103+
/// Futures that upgrade outgoing substreams.
104+
negotiating_out: FuturesUnordered<SubstreamUpgrade<
106105
TProtoHandler::OutboundOpenInfo,
107106
OutboundUpgradeApply<Substream<StreamMuxerBox>, SendWrapper<TProtoHandler::OutboundProtocol>>,
108-
Delay,
109-
)>,
107+
>>,
110108
/// For each outbound substream request, how to upgrade it. The first element of the tuple
111109
/// is the unique identifier (see `unique_dial_upgrade_id`).
112110
queued_dial_upgrades: Vec<(u64, (upgrade::Version, SendWrapper<TProtoHandler::OutboundProtocol>))>,
@@ -116,6 +114,48 @@ where
116114
shutdown: Shutdown,
117115
}
118116

117+
struct SubstreamUpgrade<UserData, Upgrade> {
118+
user_data: Option<UserData>,
119+
timeout: Delay,
120+
upgrade: Upgrade,
121+
}
122+
123+
impl<UserData, Upgrade> Unpin for SubstreamUpgrade<UserData, Upgrade> {}
124+
125+
impl<UserData, Upgrade, UpgradeOutput, TUpgradeError> Future for SubstreamUpgrade<UserData, Upgrade>
126+
where
127+
Upgrade: Future<Output = Result<UpgradeOutput, UpgradeError<TUpgradeError>>> + Unpin,
128+
{
129+
type Output = (UserData, Result<UpgradeOutput, ProtocolsHandlerUpgrErr<TUpgradeError>>);
130+
131+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
132+
match self.timeout.poll_unpin(cx) {
133+
Poll::Ready(Ok(_)) => return Poll::Ready((
134+
self.user_data.take().expect("Future not to be polled again once ready."),
135+
Err(ProtocolsHandlerUpgrErr::Timeout)),
136+
),
137+
Poll::Ready(Err(_)) => return Poll::Ready((
138+
self.user_data.take().expect("Future not to be polled again once ready."),
139+
Err(ProtocolsHandlerUpgrErr::Timer)),
140+
),
141+
Poll::Pending => {},
142+
}
143+
144+
match self.upgrade.poll_unpin(cx) {
145+
Poll::Ready(Ok(upgrade)) => Poll::Ready((
146+
self.user_data.take().expect("Future not to be polled again once ready."),
147+
Ok(upgrade),
148+
)),
149+
Poll::Ready(Err(err)) => Poll::Ready((
150+
self.user_data.take().expect("Future not to be polled again once ready."),
151+
Err(ProtocolsHandlerUpgrErr::Upgrade(err)),
152+
)),
153+
Poll::Pending => Poll::Pending,
154+
}
155+
}
156+
}
157+
158+
119159
/// The options for a planned connection & handler shutdown.
120160
///
121161
/// A shutdown is planned anew based on the the return value of
@@ -195,10 +235,14 @@ where
195235
SubstreamEndpoint::Listener => {
196236
let protocol = self.handler.listen_protocol();
197237
let timeout = protocol.timeout().clone();
198-
let (_, upgrade, info) = protocol.into_upgrade();
238+
let (_, upgrade, user_data) = protocol.into_upgrade();
199239
let upgrade = upgrade::apply_inbound(substream, SendWrapper(upgrade));
200240
let timeout = Delay::new(timeout);
201-
self.negotiating_in.push((info, upgrade, timeout));
241+
self.negotiating_in.push(SubstreamUpgrade {
242+
user_data: Some(user_data),
243+
timeout,
244+
upgrade,
245+
});
202246
}
203247
SubstreamEndpoint::Dialer((upgrade_id, user_data, timeout)) => {
204248
let pos = match self
@@ -216,7 +260,11 @@ where
216260
let (_, (version, upgrade)) = self.queued_dial_upgrades.remove(pos);
217261
let upgrade = upgrade::apply_outbound(substream, upgrade, version);
218262
let timeout = Delay::new(timeout);
219-
self.negotiating_out.push((user_data, upgrade, timeout));
263+
self.negotiating_out.push(SubstreamUpgrade {
264+
user_data: Some(user_data),
265+
timeout,
266+
upgrade,
267+
});
220268
}
221269
}
222270
}
@@ -232,62 +280,17 @@ where
232280
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<
233281
Result<ConnectionHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>, Self::Error>
234282
> {
235-
// Continue negotiation of newly-opened substreams on the listening side.
236-
// We remove each element from `negotiating_in` one by one and add them back if not ready.
237-
for n in (0..self.negotiating_in.len()).rev() {
238-
let (info, mut in_progress, mut timeout) = self.negotiating_in.swap_remove(n);
239-
match Future::poll(Pin::new(&mut timeout), cx) {
240-
Poll::Ready(Ok(_)) => {
241-
let err = ProtocolsHandlerUpgrErr::Timeout;
242-
self.handler.inject_listen_upgrade_error(info, err);
243-
continue
244-
}
245-
Poll::Ready(Err(_)) => {
246-
let err = ProtocolsHandlerUpgrErr::Timer;
247-
self.handler.inject_listen_upgrade_error(info, err);
248-
continue;
249-
}
250-
Poll::Pending => {},
251-
}
252-
match Future::poll(Pin::new(&mut in_progress), cx) {
253-
Poll::Ready(Ok(upgrade)) =>
254-
self.handler.inject_fully_negotiated_inbound(upgrade, info),
255-
Poll::Pending => self.negotiating_in.push((info, in_progress, timeout)),
256-
Poll::Ready(Err(err)) => {
257-
let err = ProtocolsHandlerUpgrErr::Upgrade(err);
258-
self.handler.inject_listen_upgrade_error(info, err);
259-
}
283+
while let Poll::Ready(Some((user_data, res))) = self.negotiating_in.poll_next_unpin(cx) {
284+
match res {
285+
Ok(upgrade) => self.handler.inject_fully_negotiated_inbound(upgrade, user_data),
286+
Err(err) => self.handler.inject_listen_upgrade_error(user_data, err),
260287
}
261288
}
262289

263-
// Continue negotiation of newly-opened substreams.
264-
// We remove each element from `negotiating_out` one by one and add them back if not ready.
265-
for n in (0..self.negotiating_out.len()).rev() {
266-
let (upgr_info, mut in_progress, mut timeout) = self.negotiating_out.swap_remove(n);
267-
match Future::poll(Pin::new(&mut timeout), cx) {
268-
Poll::Ready(Ok(_)) => {
269-
let err = ProtocolsHandlerUpgrErr::Timeout;
270-
self.handler.inject_dial_upgrade_error(upgr_info, err);
271-
continue;
272-
},
273-
Poll::Ready(Err(_)) => {
274-
let err = ProtocolsHandlerUpgrErr::Timer;
275-
self.handler.inject_dial_upgrade_error(upgr_info, err);
276-
continue;
277-
},
278-
Poll::Pending => {},
279-
}
280-
match Future::poll(Pin::new(&mut in_progress), cx) {
281-
Poll::Ready(Ok(upgrade)) => {
282-
self.handler.inject_fully_negotiated_outbound(upgrade, upgr_info);
283-
}
284-
Poll::Pending => {
285-
self.negotiating_out.push((upgr_info, in_progress, timeout));
286-
}
287-
Poll::Ready(Err(err)) => {
288-
let err = ProtocolsHandlerUpgrErr::Upgrade(err);
289-
self.handler.inject_dial_upgrade_error(upgr_info, err);
290-
}
290+
while let Poll::Ready(Some((user_data, res))) = self.negotiating_out.poll_next_unpin(cx) {
291+
match res {
292+
Ok(upgrade) => self.handler.inject_fully_negotiated_outbound(upgrade, user_data),
293+
Err(err) => self.handler.inject_dial_upgrade_error(user_data, err),
291294
}
292295
}
293296

0 commit comments

Comments
 (0)