Skip to content

Commit 9e62588

Browse files
feat(swarm): deprecate NegotiatedSubstream in favor of Stream
This patch tackles two things at once that are fairly intertwined: 1. There is no such thing as a "substream" in libp2p, the spec and other implementations only talk about "streams". We fix this by deprecating `NegotiatedSubstream`. 2. Previously, `NegotiatedSubstream` was a type alias that pointed to a type from `multistream-select`, effectively leaking the version of `multistream-select` to all dependencies of `libp2p-swarm`. We fix this by introducing a `Stream` newtype. Resolves: #3759. Related: #3748. Pull-Request: #3912.
1 parent 234a0d2 commit 9e62588

File tree

24 files changed

+234
-169
lines changed

24 files changed

+234
-169
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libp2p/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ pub use self::swarm::Swarm;
177177
pub use self::transport_ext::TransportExt;
178178
pub use libp2p_identity as identity;
179179
pub use libp2p_identity::PeerId;
180-
pub use libp2p_swarm::StreamProtocol;
180+
pub use libp2p_swarm::{Stream, StreamProtocol};
181181

182182
/// Builds a `Transport` based on TCP/IP that supports the most commonly-used features of libp2p:
183183
///

protocols/dcutr/src/protocol/inbound.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::proto;
2222
use asynchronous_codec::Framed;
2323
use futures::{future::BoxFuture, prelude::*};
2424
use libp2p_core::{multiaddr::Protocol, upgrade, Multiaddr};
25-
use libp2p_swarm::{NegotiatedSubstream, StreamProtocol};
25+
use libp2p_swarm::{Stream, StreamProtocol};
2626
use std::convert::TryFrom;
2727
use std::iter;
2828
use thiserror::Error;
@@ -38,12 +38,12 @@ impl upgrade::UpgradeInfo for Upgrade {
3838
}
3939
}
4040

41-
impl upgrade::InboundUpgrade<NegotiatedSubstream> for Upgrade {
41+
impl upgrade::InboundUpgrade<Stream> for Upgrade {
4242
type Output = PendingConnect;
4343
type Error = UpgradeError;
4444
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
4545

46-
fn upgrade_inbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future {
46+
fn upgrade_inbound(self, substream: Stream, _: Self::Info) -> Self::Future {
4747
let mut substream = Framed::new(
4848
substream,
4949
quick_protobuf_codec::Codec::new(super::MAX_MESSAGE_SIZE_BYTES),
@@ -92,7 +92,7 @@ impl upgrade::InboundUpgrade<NegotiatedSubstream> for Upgrade {
9292
}
9393

9494
pub struct PendingConnect {
95-
substream: Framed<NegotiatedSubstream, quick_protobuf_codec::Codec<proto::HolePunch>>,
95+
substream: Framed<Stream, quick_protobuf_codec::Codec<proto::HolePunch>>,
9696
remote_obs_addrs: Vec<Multiaddr>,
9797
}
9898

protocols/dcutr/src/protocol/outbound.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use futures::{future::BoxFuture, prelude::*};
2424
use futures_timer::Delay;
2525
use instant::Instant;
2626
use libp2p_core::{multiaddr::Protocol, upgrade, Multiaddr};
27-
use libp2p_swarm::{NegotiatedSubstream, StreamProtocol};
27+
use libp2p_swarm::{Stream, StreamProtocol};
2828
use std::convert::TryFrom;
2929
use std::iter;
3030
use thiserror::Error;
@@ -48,12 +48,12 @@ impl Upgrade {
4848
}
4949
}
5050

51-
impl upgrade::OutboundUpgrade<NegotiatedSubstream> for Upgrade {
51+
impl upgrade::OutboundUpgrade<Stream> for Upgrade {
5252
type Output = Connect;
5353
type Error = UpgradeError;
5454
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
5555

56-
fn upgrade_outbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future {
56+
fn upgrade_outbound(self, substream: Stream, _: Self::Info) -> Self::Future {
5757
let mut substream = Framed::new(
5858
substream,
5959
quick_protobuf_codec::Codec::new(super::MAX_MESSAGE_SIZE_BYTES),

protocols/gossipsub/src/handler.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use libp2p_swarm::handler::{
3333
FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, StreamUpgradeError,
3434
SubstreamProtocol,
3535
};
36-
use libp2p_swarm::NegotiatedSubstream;
36+
use libp2p_swarm::Stream;
3737
use smallvec::SmallVec;
3838
use std::{
3939
pin::Pin,
@@ -143,21 +143,21 @@ pub enum DisabledHandler {
143143
/// State of the inbound substream, opened either by us or by the remote.
144144
enum InboundSubstreamState {
145145
/// Waiting for a message from the remote. The idle state for an inbound substream.
146-
WaitingInput(Framed<NegotiatedSubstream, GossipsubCodec>),
146+
WaitingInput(Framed<Stream, GossipsubCodec>),
147147
/// The substream is being closed.
148-
Closing(Framed<NegotiatedSubstream, GossipsubCodec>),
148+
Closing(Framed<Stream, GossipsubCodec>),
149149
/// An error occurred during processing.
150150
Poisoned,
151151
}
152152

153153
/// State of the outbound substream, opened either by us or by the remote.
154154
enum OutboundSubstreamState {
155155
/// Waiting for the user to send a message. The idle state for an outbound substream.
156-
WaitingOutput(Framed<NegotiatedSubstream, GossipsubCodec>),
156+
WaitingOutput(Framed<Stream, GossipsubCodec>),
157157
/// Waiting to send a message to the remote.
158-
PendingSend(Framed<NegotiatedSubstream, GossipsubCodec>, proto::RPC),
158+
PendingSend(Framed<Stream, GossipsubCodec>, proto::RPC),
159159
/// Waiting to flush the substream so that the data arrives to the remote.
160-
PendingFlush(Framed<NegotiatedSubstream, GossipsubCodec>),
160+
PendingFlush(Framed<Stream, GossipsubCodec>),
161161
/// An error occurred during processing.
162162
Poisoned,
163163
}
@@ -185,7 +185,7 @@ impl Handler {
185185
impl EnabledHandler {
186186
fn on_fully_negotiated_inbound(
187187
&mut self,
188-
(substream, peer_kind): (Framed<NegotiatedSubstream, GossipsubCodec>, PeerKind),
188+
(substream, peer_kind): (Framed<Stream, GossipsubCodec>, PeerKind),
189189
) {
190190
// update the known kind of peer
191191
if self.peer_kind.is_none() {

protocols/kad/src/handler.rs

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use libp2p_swarm::handler::{
3333
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
3434
};
3535
use libp2p_swarm::{
36-
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, StreamUpgradeError,
36+
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, Stream, StreamUpgradeError,
3737
SubstreamProtocol,
3838
};
3939
use log::trace;
@@ -116,20 +116,16 @@ pub struct KademliaHandlerConfig {
116116
/// State of an active outbound substream.
117117
enum OutboundSubstreamState<TUserData> {
118118
/// Waiting to send a message to the remote.
119-
PendingSend(
120-
KadOutStreamSink<NegotiatedSubstream>,
121-
KadRequestMsg,
122-
Option<TUserData>,
123-
),
119+
PendingSend(KadOutStreamSink<Stream>, KadRequestMsg, Option<TUserData>),
124120
/// Waiting to flush the substream so that the data arrives to the remote.
125-
PendingFlush(KadOutStreamSink<NegotiatedSubstream>, Option<TUserData>),
121+
PendingFlush(KadOutStreamSink<Stream>, Option<TUserData>),
126122
/// Waiting for an answer back from the remote.
127123
// TODO: add timeout
128-
WaitingAnswer(KadOutStreamSink<NegotiatedSubstream>, TUserData),
124+
WaitingAnswer(KadOutStreamSink<Stream>, TUserData),
129125
/// An error happened on the substream and we should report the error to the user.
130126
ReportError(KademliaHandlerQueryErr, TUserData),
131127
/// The substream is being closed.
132-
Closing(KadOutStreamSink<NegotiatedSubstream>),
128+
Closing(KadOutStreamSink<Stream>),
133129
/// The substream is complete and will not perform any more work.
134130
Done,
135131
Poisoned,
@@ -142,24 +138,16 @@ enum InboundSubstreamState<TUserData> {
142138
/// Whether it is the first message to be awaited on this stream.
143139
first: bool,
144140
connection_id: UniqueConnecId,
145-
substream: KadInStreamSink<NegotiatedSubstream>,
141+
substream: KadInStreamSink<Stream>,
146142
},
147143
/// Waiting for the behaviour to send a [`KademliaHandlerIn`] event containing the response.
148-
WaitingBehaviour(
149-
UniqueConnecId,
150-
KadInStreamSink<NegotiatedSubstream>,
151-
Option<Waker>,
152-
),
144+
WaitingBehaviour(UniqueConnecId, KadInStreamSink<Stream>, Option<Waker>),
153145
/// Waiting to send an answer back to the remote.
154-
PendingSend(
155-
UniqueConnecId,
156-
KadInStreamSink<NegotiatedSubstream>,
157-
KadResponseMsg,
158-
),
146+
PendingSend(UniqueConnecId, KadInStreamSink<Stream>, KadResponseMsg),
159147
/// Waiting to flush an answer back to the remote.
160-
PendingFlush(UniqueConnecId, KadInStreamSink<NegotiatedSubstream>),
148+
PendingFlush(UniqueConnecId, KadInStreamSink<Stream>),
161149
/// The substream is being closed.
162-
Closing(KadInStreamSink<NegotiatedSubstream>),
150+
Closing(KadInStreamSink<Stream>),
163151
/// The substream was cancelled in favor of a new one.
164152
Cancelled,
165153

@@ -813,7 +801,7 @@ impl Default for KademliaHandlerConfig {
813801
}
814802
}
815803

816-
impl<TUserData> Stream for OutboundSubstreamState<TUserData>
804+
impl<TUserData> futures::Stream for OutboundSubstreamState<TUserData>
817805
where
818806
TUserData: Unpin,
819807
{
@@ -949,7 +937,7 @@ where
949937
}
950938
}
951939

952-
impl<TUserData> Stream for InboundSubstreamState<TUserData>
940+
impl<TUserData> futures::Stream for InboundSubstreamState<TUserData>
953941
where
954942
TUserData: Unpin,
955943
{

protocols/ping/src/handler.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use libp2p_swarm::handler::{
2727
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
2828
};
2929
use libp2p_swarm::{
30-
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, StreamProtocol,
30+
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, Stream, StreamProtocol,
3131
StreamUpgradeError, SubstreamProtocol,
3232
};
3333
use std::collections::VecDeque;
@@ -390,15 +390,15 @@ impl ConnectionHandler for Handler {
390390
}
391391
}
392392

393-
type PingFuture = BoxFuture<'static, Result<(NegotiatedSubstream, Duration), io::Error>>;
394-
type PongFuture = BoxFuture<'static, Result<NegotiatedSubstream, io::Error>>;
393+
type PingFuture = BoxFuture<'static, Result<(Stream, Duration), io::Error>>;
394+
type PongFuture = BoxFuture<'static, Result<Stream, io::Error>>;
395395

396396
/// The current state w.r.t. outbound pings.
397397
enum OutboundState {
398398
/// A new substream is being negotiated for the ping protocol.
399399
OpenStream,
400400
/// The substream is idle, waiting to send the next ping.
401-
Idle(NegotiatedSubstream),
401+
Idle(Stream),
402402
/// A ping is being sent and the response awaited.
403403
Ping(PingFuture),
404404
}

protocols/relay/src/behaviour/handler.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ use libp2p_swarm::handler::{
3737
ListenUpgradeError,
3838
};
3939
use libp2p_swarm::{
40-
ConnectionHandler, ConnectionHandlerEvent, ConnectionId, KeepAlive, NegotiatedSubstream,
41-
StreamUpgradeError, SubstreamProtocol,
40+
ConnectionHandler, ConnectionHandlerEvent, ConnectionId, KeepAlive, Stream, StreamUpgradeError,
41+
SubstreamProtocol,
4242
};
4343
use std::collections::VecDeque;
4444
use std::fmt;
@@ -77,7 +77,7 @@ pub enum In {
7777
dst_peer_id: PeerId,
7878
inbound_circuit_req: inbound_hop::CircuitReq,
7979
dst_handler_notifier: oneshot::Sender<()>,
80-
dst_stream: NegotiatedSubstream,
80+
dst_stream: Stream,
8181
dst_pending_data: Bytes,
8282
},
8383
}
@@ -193,7 +193,7 @@ pub enum Event {
193193
src_connection_id: ConnectionId,
194194
inbound_circuit_req: inbound_hop::CircuitReq,
195195
dst_handler_notifier: oneshot::Sender<()>,
196-
dst_stream: NegotiatedSubstream,
196+
dst_stream: Stream,
197197
dst_pending_data: Bytes,
198198
},
199199
/// Negotiating an outbound substream for an inbound circuit request failed.
@@ -914,10 +914,10 @@ pub struct OutboundOpenInfo {
914914

915915
pub(crate) struct CircuitParts {
916916
circuit_id: CircuitId,
917-
src_stream: NegotiatedSubstream,
917+
src_stream: Stream,
918918
src_pending_data: Bytes,
919919
dst_peer_id: PeerId,
920920
dst_handler_notifier: oneshot::Sender<()>,
921-
dst_stream: NegotiatedSubstream,
921+
dst_stream: Stream,
922922
dst_pending_data: Bytes,
923923
}

protocols/relay/src/priv_client.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ use libp2p_identity::PeerId;
3939
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm};
4040
use libp2p_swarm::dial_opts::DialOpts;
4141
use libp2p_swarm::{
42-
dummy, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, NegotiatedSubstream,
43-
NetworkBehaviour, NotifyHandler, PollParameters, StreamUpgradeError, THandler, THandlerInEvent,
42+
dummy, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, NetworkBehaviour,
43+
NotifyHandler, PollParameters, Stream, StreamUpgradeError, THandler, THandlerInEvent,
4444
THandlerOutEvent, ToSwarm,
4545
};
4646
use std::collections::{hash_map, HashMap, VecDeque};
@@ -391,7 +391,7 @@ enum ConnectionState {
391391
},
392392
Operational {
393393
read_buffer: Bytes,
394-
substream: NegotiatedSubstream,
394+
substream: Stream,
395395
/// "Drop notifier" pattern to signal to the transport that the connection has been dropped.
396396
///
397397
/// This is flagged as "dead-code" by the compiler because we never read from it here.
@@ -425,7 +425,7 @@ impl ConnectionState {
425425
}
426426

427427
pub(crate) fn new_outbound(
428-
substream: NegotiatedSubstream,
428+
substream: Stream,
429429
read_buffer: Bytes,
430430
drop_notifier: oneshot::Sender<void::Void>,
431431
) -> Self {

protocols/relay/src/protocol/inbound_hop.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use futures::{future::BoxFuture, prelude::*};
2626
use instant::{Duration, SystemTime};
2727
use libp2p_core::{upgrade, Multiaddr};
2828
use libp2p_identity::PeerId;
29-
use libp2p_swarm::{NegotiatedSubstream, StreamProtocol};
29+
use libp2p_swarm::{Stream, StreamProtocol};
3030
use std::convert::TryInto;
3131
use std::iter;
3232
use thiserror::Error;
@@ -46,12 +46,12 @@ impl upgrade::UpgradeInfo for Upgrade {
4646
}
4747
}
4848

49-
impl upgrade::InboundUpgrade<NegotiatedSubstream> for Upgrade {
49+
impl upgrade::InboundUpgrade<Stream> for Upgrade {
5050
type Output = Req;
5151
type Error = UpgradeError;
5252
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
5353

54-
fn upgrade_inbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future {
54+
fn upgrade_inbound(self, substream: Stream, _: Self::Info) -> Self::Future {
5555
let mut substream = Framed::new(
5656
substream,
5757
quick_protobuf_codec::Codec::new(MAX_MESSAGE_SIZE),
@@ -126,7 +126,7 @@ pub enum Req {
126126
}
127127

128128
pub struct ReservationReq {
129-
substream: Framed<NegotiatedSubstream, quick_protobuf_codec::Codec<proto::HopMessage>>,
129+
substream: Framed<Stream, quick_protobuf_codec::Codec<proto::HopMessage>>,
130130
reservation_duration: Duration,
131131
max_circuit_duration: Duration,
132132
max_circuit_bytes: u64,
@@ -183,15 +183,15 @@ impl ReservationReq {
183183

184184
pub struct CircuitReq {
185185
dst: PeerId,
186-
substream: Framed<NegotiatedSubstream, quick_protobuf_codec::Codec<proto::HopMessage>>,
186+
substream: Framed<Stream, quick_protobuf_codec::Codec<proto::HopMessage>>,
187187
}
188188

189189
impl CircuitReq {
190190
pub fn dst(&self) -> PeerId {
191191
self.dst
192192
}
193193

194-
pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), UpgradeError> {
194+
pub async fn accept(mut self) -> Result<(Stream, Bytes), UpgradeError> {
195195
let msg = proto::HopMessage {
196196
type_pb: proto::HopMessageType::STATUS,
197197
peer: None,

0 commit comments

Comments
 (0)