Skip to content

Commit 676a630

Browse files
authored
protocols/identify: Allow at most one inbound identify push stream (#2694)
An identify push contains the whole identify information of a remote peer. Upgrading multiple inbound identify push streams is useless. Instead older streams are dropped in favor of newer streams.
1 parent fcc987e commit 676a630

File tree

4 files changed

+43
-11
lines changed

4 files changed

+43
-11
lines changed

protocols/identify/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
# 0.36.1 - unreleased
2+
3+
- Allow at most one inbound identify push stream.
4+
15
# 0.36.0
26

37
- Update to `libp2p-core` `v0.33.0`.

protocols/identify/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name = "libp2p-identify"
33
edition = "2021"
44
rust-version = "1.56.1"
55
description = "Nodes identifcation protocol for libp2p"
6-
version = "0.36.0"
6+
version = "0.36.1"
77
authors = ["Parity Technologies <[email protected]>"]
88
license = "MIT"
99
repository = "https://github.com/libp2p/rust-libp2p"
@@ -22,6 +22,7 @@ prost-codec = { version = "0.1", path = "../../misc/prost-codec" }
2222
prost = "0.10"
2323
smallvec = "1.6.1"
2424
thiserror = "1.0"
25+
void = "1.0"
2526

2627
[dev-dependencies]
2728
async-std = "1.6.2"

protocols/identify/src/handler.rs

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::protocol::{
2222
IdentifyInfo, IdentifyProtocol, IdentifyPushProtocol, InboundPush, OutboundPush,
2323
ReplySubstream, UpgradeError,
2424
};
25+
use futures::future::BoxFuture;
2526
use futures::prelude::*;
2627
use futures_timer::Delay;
2728
use libp2p_core::either::{EitherError, EitherOutput};
@@ -30,6 +31,7 @@ use libp2p_swarm::{
3031
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
3132
NegotiatedSubstream, SubstreamProtocol,
3233
};
34+
use log::warn;
3335
use smallvec::SmallVec;
3436
use std::{io, pin::Pin, task::Context, task::Poll, time::Duration};
3537

@@ -39,6 +41,7 @@ use std::{io, pin::Pin, task::Context, task::Poll, time::Duration};
3941
/// at least one identification request to be answered by the remote before
4042
/// permitting the underlying connection to be closed.
4143
pub struct IdentifyHandler {
44+
inbound_identify_push: Option<BoxFuture<'static, Result<IdentifyInfo, UpgradeError>>>,
4245
/// Pending events to yield.
4346
events: SmallVec<
4447
[ConnectionHandlerEvent<
@@ -80,6 +83,7 @@ impl IdentifyHandler {
8083
/// Creates a new `IdentifyHandler`.
8184
pub fn new(initial_delay: Duration, interval: Duration) -> Self {
8285
IdentifyHandler {
86+
inbound_identify_push: Default::default(),
8387
events: SmallVec::new(),
8488
trigger_next_identify: Delay::new(initial_delay),
8589
keep_alive: KeepAlive::Yes,
@@ -113,9 +117,14 @@ impl ConnectionHandler for IdentifyHandler {
113117
EitherOutput::First(substream) => self.events.push(ConnectionHandlerEvent::Custom(
114118
IdentifyHandlerEvent::Identify(substream),
115119
)),
116-
EitherOutput::Second(info) => self.events.push(ConnectionHandlerEvent::Custom(
117-
IdentifyHandlerEvent::Identified(info),
118-
)),
120+
EitherOutput::Second(fut) => {
121+
if self.inbound_identify_push.replace(fut).is_some() {
122+
warn!(
123+
"New inbound identify push stream while still upgrading previous one. \
124+
Replacing previous with new.",
125+
);
126+
}
127+
}
119128
}
120129
}
121130

@@ -189,14 +198,30 @@ impl ConnectionHandler for IdentifyHandler {
189198

190199
// Poll the future that fires when we need to identify the node again.
191200
match Future::poll(Pin::new(&mut self.trigger_next_identify), cx) {
192-
Poll::Pending => Poll::Pending,
201+
Poll::Pending => {}
193202
Poll::Ready(()) => {
194203
self.trigger_next_identify.reset(self.interval);
195204
let ev = ConnectionHandlerEvent::OutboundSubstreamRequest {
196205
protocol: SubstreamProtocol::new(EitherUpgrade::A(IdentifyProtocol), ()),
197206
};
198-
Poll::Ready(ev)
207+
return Poll::Ready(ev);
208+
}
209+
}
210+
211+
if let Some(Poll::Ready(res)) = self
212+
.inbound_identify_push
213+
.as_mut()
214+
.map(|f| f.poll_unpin(cx))
215+
{
216+
self.inbound_identify_push.take();
217+
218+
if let Ok(info) = res {
219+
return Poll::Ready(ConnectionHandlerEvent::Custom(
220+
IdentifyHandlerEvent::Identified(info),
221+
));
199222
}
200223
}
224+
225+
Poll::Pending
201226
}
202227
}

protocols/identify/src/protocol.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
use crate::structs_proto;
2222
use asynchronous_codec::{FramedRead, FramedWrite};
23-
use futures::prelude::*;
23+
use futures::{future::BoxFuture, prelude::*};
2424
use libp2p_core::{
2525
identity, multiaddr,
2626
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
@@ -30,6 +30,7 @@ use log::trace;
3030
use std::convert::TryFrom;
3131
use std::{fmt, io, iter, pin::Pin};
3232
use thiserror::Error;
33+
use void::Void;
3334

3435
const MAX_MESSAGE_SIZE_BYTES: usize = 4096;
3536

@@ -143,12 +144,13 @@ impl<C> InboundUpgrade<C> for IdentifyPushProtocol<InboundPush>
143144
where
144145
C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
145146
{
146-
type Output = IdentifyInfo;
147-
type Error = UpgradeError;
148-
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
147+
type Output = BoxFuture<'static, Result<IdentifyInfo, UpgradeError>>;
148+
type Error = Void;
149+
type Future = future::Ready<Result<Self::Output, Self::Error>>;
149150

150151
fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
151-
recv(socket).boxed()
152+
// Lazily upgrade stream, thus allowing upgrade to happen within identify's handler.
153+
future::ok(recv(socket).boxed())
152154
}
153155
}
154156

0 commit comments

Comments
 (0)