Skip to content

Commit 81c424e

Browse files
feat(swarm): make stream uprade errors more ergonomic
The currently provided `ConnectionHandlerUpgrErr` is very hard to use. Not only does it have a long name, it also features 3 levels of nesting which results in a lot of boilerplate. Last but not least, it exposes `multistream-select` as a dependency to all protocols. We fix all of the above by renaming the type to `StreamUpgradeError` and flattening out its interface. Unrecoverable errors during protocol selection are hidden within the `Io` variant. Related: #3759. Pull-Request: #3882.
1 parent 0e36c7c commit 81c424e

File tree

23 files changed

+231
-297
lines changed

23 files changed

+231
-297
lines changed

examples/file-sharing/src/network.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use libp2p::{
1616
multiaddr::Protocol,
1717
noise,
1818
request_response::{self, ProtocolSupport, RequestId, ResponseChannel},
19-
swarm::{ConnectionHandlerUpgrErr, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent},
19+
swarm::{NetworkBehaviour, StreamUpgradeError, Swarm, SwarmBuilder, SwarmEvent},
2020
tcp, yamux, PeerId, Transport,
2121
};
2222

@@ -216,7 +216,7 @@ impl EventLoop {
216216

217217
async fn handle_event(
218218
&mut self,
219-
event: SwarmEvent<ComposedEvent, Either<ConnectionHandlerUpgrErr<io::Error>, io::Error>>,
219+
event: SwarmEvent<ComposedEvent, Either<StreamUpgradeError<io::Error>, io::Error>>,
220220
) {
221221
match event {
222222
SwarmEvent::Behaviour(ComposedEvent::Kademlia(

protocols/dcutr/src/behaviour_impl.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailu
3030
use libp2p_swarm::dial_opts::{self, DialOpts};
3131
use libp2p_swarm::{dummy, ConnectionDenied, ConnectionId, THandler, THandlerOutEvent};
3232
use libp2p_swarm::{
33-
ConnectionHandlerUpgrErr, ExternalAddresses, NetworkBehaviour, NotifyHandler, PollParameters,
33+
ExternalAddresses, NetworkBehaviour, NotifyHandler, PollParameters, StreamUpgradeError,
3434
THandlerInEvent, ToSwarm,
3535
};
3636
use std::collections::{HashMap, HashSet, VecDeque};
@@ -65,7 +65,7 @@ pub enum Error {
6565
#[error("Failed to dial peer.")]
6666
Dial,
6767
#[error("Failed to establish substream: {0}.")]
68-
Handler(ConnectionHandlerUpgrErr<Void>),
68+
Handler(StreamUpgradeError<Void>),
6969
}
7070

7171
pub struct Behaviour {

protocols/dcutr/src/handler/direct.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@
2323
use libp2p_core::upgrade::DeniedUpgrade;
2424
use libp2p_swarm::handler::ConnectionEvent;
2525
use libp2p_swarm::{
26-
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
27-
SubstreamProtocol,
26+
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError, SubstreamProtocol,
2827
};
2928
use std::task::{Context, Poll};
3029
use void::Void;
@@ -42,7 +41,7 @@ pub struct Handler {
4241
impl ConnectionHandler for Handler {
4342
type InEvent = void::Void;
4443
type OutEvent = Event;
45-
type Error = ConnectionHandlerUpgrErr<std::io::Error>;
44+
type Error = StreamUpgradeError<std::io::Error>;
4645
type InboundProtocol = DeniedUpgrade;
4746
type OutboundProtocol = DeniedUpgrade;
4847
type OutboundOpenInfo = Void;

protocols/dcutr/src/handler/relayed.rs

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,14 @@ use futures::future;
2626
use futures::future::{BoxFuture, FutureExt};
2727
use instant::Instant;
2828
use libp2p_core::multiaddr::Multiaddr;
29-
use libp2p_core::upgrade::{DeniedUpgrade, NegotiationError, UpgradeError};
29+
use libp2p_core::upgrade::DeniedUpgrade;
3030
use libp2p_core::ConnectedPoint;
3131
use libp2p_swarm::handler::{
3232
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
3333
ListenUpgradeError,
3434
};
3535
use libp2p_swarm::{
36-
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
37-
SubstreamProtocol,
36+
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError, SubstreamProtocol,
3837
};
3938
use std::collections::VecDeque;
4039
use std::fmt;
@@ -82,11 +81,11 @@ pub enum Event {
8281
remote_addr: Multiaddr,
8382
},
8483
InboundNegotiationFailed {
85-
error: ConnectionHandlerUpgrErr<void::Void>,
84+
error: StreamUpgradeError<void::Void>,
8685
},
8786
InboundConnectNegotiated(Vec<Multiaddr>),
8887
OutboundNegotiationFailed {
89-
error: ConnectionHandlerUpgrErr<void::Void>,
88+
error: StreamUpgradeError<void::Void>,
9089
},
9190
OutboundConnectNegotiated {
9291
remote_addrs: Vec<Multiaddr>,
@@ -127,7 +126,7 @@ pub struct Handler {
127126
endpoint: ConnectedPoint,
128127
/// A pending fatal error that results in the connection being closed.
129128
pending_error: Option<
130-
ConnectionHandlerUpgrErr<
129+
StreamUpgradeError<
131130
Either<protocol::inbound::UpgradeError, protocol::outbound::UpgradeError>,
132131
>,
133132
>,
@@ -212,12 +211,10 @@ impl Handler {
212211
<Self as ConnectionHandler>::InboundProtocol,
213212
>,
214213
) {
215-
self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(
216-
match error {
217-
Either::Left(e) => Either::Left(e),
218-
Either::Right(v) => void::unreachable(v),
219-
},
220-
)));
214+
self.pending_error = Some(StreamUpgradeError::Apply(match error {
215+
Either::Left(e) => Either::Left(e),
216+
Either::Right(v) => void::unreachable(v),
217+
}));
221218
}
222219

223220
fn on_dial_upgrade_error(
@@ -230,29 +227,27 @@ impl Handler {
230227
self.keep_alive = KeepAlive::No;
231228

232229
match error {
233-
ConnectionHandlerUpgrErr::Timeout => {
230+
StreamUpgradeError::Timeout => {
234231
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
235232
Event::OutboundNegotiationFailed {
236-
error: ConnectionHandlerUpgrErr::Timeout,
233+
error: StreamUpgradeError::Timeout,
237234
},
238235
));
239236
}
240-
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
237+
StreamUpgradeError::NegotiationFailed => {
241238
// The remote merely doesn't support the DCUtR protocol.
242239
// This is no reason to close the connection, which may
243240
// successfully communicate with other protocols already.
244241
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
245242
Event::OutboundNegotiationFailed {
246-
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
247-
NegotiationError::Failed,
248-
)),
243+
error: StreamUpgradeError::NegotiationFailed,
249244
},
250245
));
251246
}
252247
_ => {
253248
// Anything else is considered a fatal error or misbehaviour of
254249
// the remote peer and results in closing the connection.
255-
self.pending_error = Some(error.map_upgrade_err(|e| e.map_err(Either::Right)));
250+
self.pending_error = Some(error.map_upgrade_err(Either::Right));
256251
}
257252
}
258253
}
@@ -261,7 +256,7 @@ impl Handler {
261256
impl ConnectionHandler for Handler {
262257
type InEvent = Command;
263258
type OutEvent = Event;
264-
type Error = ConnectionHandlerUpgrErr<
259+
type Error = StreamUpgradeError<
265260
Either<protocol::inbound::UpgradeError, protocol::outbound::UpgradeError>,
266261
>;
267262
type InboundProtocol = Either<protocol::inbound::Upgrade, DeniedUpgrade>;
@@ -352,9 +347,9 @@ impl ConnectionHandler for Handler {
352347
));
353348
}
354349
Err(e) => {
355-
return Poll::Ready(ConnectionHandlerEvent::Close(
356-
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Left(e))),
357-
))
350+
return Poll::Ready(ConnectionHandlerEvent::Close(StreamUpgradeError::Apply(
351+
Either::Left(e),
352+
)))
358353
}
359354
}
360355
}

protocols/gossipsub/src/handler.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ use futures::future::Either;
2727
use futures::prelude::*;
2828
use futures::StreamExt;
2929
use instant::Instant;
30-
use libp2p_core::upgrade::{DeniedUpgrade, NegotiationError, UpgradeError};
30+
use libp2p_core::upgrade::DeniedUpgrade;
3131
use libp2p_swarm::handler::{
32-
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr,
33-
DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive,
32+
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
33+
FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, StreamUpgradeError,
3434
SubstreamProtocol,
3535
};
3636
use libp2p_swarm::NegotiatedSubstream;
@@ -526,20 +526,17 @@ impl ConnectionHandler for Handler {
526526
handler.on_fully_negotiated_outbound(fully_negotiated_outbound)
527527
}
528528
ConnectionEvent::DialUpgradeError(DialUpgradeError {
529-
error: ConnectionHandlerUpgrErr::Timeout,
529+
error: StreamUpgradeError::Timeout,
530530
..
531531
}) => {
532532
log::debug!("Dial upgrade error: Protocol negotiation timeout");
533533
}
534534
ConnectionEvent::DialUpgradeError(DialUpgradeError {
535-
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
535+
error: StreamUpgradeError::Apply(e),
536536
..
537537
}) => void::unreachable(e),
538538
ConnectionEvent::DialUpgradeError(DialUpgradeError {
539-
error:
540-
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
541-
NegotiationError::Failed,
542-
)),
539+
error: StreamUpgradeError::NegotiationFailed,
543540
..
544541
}) => {
545542
// The protocol is not supported
@@ -551,10 +548,7 @@ impl ConnectionHandler for Handler {
551548
});
552549
}
553550
ConnectionEvent::DialUpgradeError(DialUpgradeError {
554-
error:
555-
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
556-
NegotiationError::ProtocolError(e),
557-
)),
551+
error: StreamUpgradeError::Io(e),
558552
..
559553
}) => {
560554
log::debug!("Protocol negotiation failed: {e}")

protocols/identify/src/behaviour.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ use libp2p_identity::PeerId;
2525
use libp2p_identity::PublicKey;
2626
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
2727
use libp2p_swarm::{
28-
AddressScore, ConnectionDenied, ConnectionHandlerUpgrErr, DialError, ExternalAddresses,
29-
ListenAddresses, NetworkBehaviour, NotifyHandler, PollParameters, StreamProtocol,
28+
AddressScore, ConnectionDenied, DialError, ExternalAddresses, ListenAddresses,
29+
NetworkBehaviour, NotifyHandler, PollParameters, StreamProtocol, StreamUpgradeError,
3030
THandlerInEvent, ToSwarm,
3131
};
3232
use libp2p_swarm::{ConnectionId, THandler, THandlerOutEvent};
@@ -492,7 +492,7 @@ pub enum Event {
492492
/// The peer with whom the error originated.
493493
peer_id: PeerId,
494494
/// The error that occurred.
495-
error: ConnectionHandlerUpgrErr<UpgradeError>,
495+
error: StreamUpgradeError<UpgradeError>,
496496
},
497497
}
498498

protocols/identify/src/handler.rs

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ use libp2p_swarm::handler::{
3434
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
3535
};
3636
use libp2p_swarm::{
37-
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
38-
NegotiatedSubstream, StreamProtocol, SubstreamProtocol,
37+
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, StreamProtocol,
38+
StreamUpgradeError, SubstreamProtocol,
3939
};
4040
use log::warn;
4141
use smallvec::SmallVec;
@@ -111,7 +111,7 @@ pub enum Event {
111111
/// We received a request for identification.
112112
Identify,
113113
/// Failed to identify the remote, or to reply to an identification request.
114-
IdentificationError(ConnectionHandlerUpgrErr<UpgradeError>),
114+
IdentificationError(StreamUpgradeError<UpgradeError>),
115115
}
116116

117117
impl Handler {
@@ -205,13 +205,7 @@ impl Handler {
205205
<Self as ConnectionHandler>::OutboundProtocol,
206206
>,
207207
) {
208-
use libp2p_core::upgrade::UpgradeError;
209-
210-
let err = err.map_upgrade_err(|e| match e {
211-
UpgradeError::Select(e) => UpgradeError::Select(e),
212-
UpgradeError::Apply(Either::Left(ioe)) => UpgradeError::Apply(ioe),
213-
UpgradeError::Apply(Either::Right(ioe)) => UpgradeError::Apply(ioe),
214-
});
208+
let err = err.map_upgrade_err(|e| e.into_inner());
215209
self.events
216210
.push(ConnectionHandlerEvent::Custom(Event::IdentificationError(
217211
err,
@@ -317,9 +311,7 @@ impl ConnectionHandler for Handler {
317311
Event::Identification(peer_id),
318312
)),
319313
Poll::Ready(Some(Err(err))) => Poll::Ready(ConnectionHandlerEvent::Custom(
320-
Event::IdentificationError(ConnectionHandlerUpgrErr::Upgrade(
321-
libp2p_core::upgrade::UpgradeError::Apply(err),
322-
)),
314+
Event::IdentificationError(StreamUpgradeError::Apply(err)),
323315
)),
324316
Poll::Ready(None) | Poll::Pending => Poll::Pending,
325317
}

protocols/kad/src/handler_priv.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ use libp2p_swarm::handler::{
3333
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
3434
};
3535
use libp2p_swarm::{
36-
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
37-
NegotiatedSubstream, SubstreamProtocol,
36+
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, StreamUpgradeError,
37+
SubstreamProtocol,
3838
};
3939
use log::trace;
4040
use std::collections::VecDeque;
@@ -325,7 +325,7 @@ pub enum KademliaHandlerEvent<TUserData> {
325325
#[derive(Debug)]
326326
pub enum KademliaHandlerQueryErr {
327327
/// Error while trying to perform the query.
328-
Upgrade(ConnectionHandlerUpgrErr<io::Error>),
328+
Upgrade(StreamUpgradeError<io::Error>),
329329
/// Received an answer that doesn't correspond to the request.
330330
UnexpectedMessage,
331331
/// I/O error in the substream.
@@ -361,8 +361,8 @@ impl error::Error for KademliaHandlerQueryErr {
361361
}
362362
}
363363

364-
impl From<ConnectionHandlerUpgrErr<io::Error>> for KademliaHandlerQueryErr {
365-
fn from(err: ConnectionHandlerUpgrErr<io::Error>) -> Self {
364+
impl From<StreamUpgradeError<io::Error>> for KademliaHandlerQueryErr {
365+
fn from(err: StreamUpgradeError<io::Error>) -> Self {
366366
KademliaHandlerQueryErr::Upgrade(err)
367367
}
368368
}

protocols/perf/src/client/behaviour.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ use std::{
2828
use libp2p_core::Multiaddr;
2929
use libp2p_identity::PeerId;
3030
use libp2p_swarm::{
31-
derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionHandlerUpgrErr,
32-
ConnectionId, FromSwarm, NetworkBehaviour, NotifyHandler, PollParameters, THandlerInEvent,
31+
derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionId, FromSwarm,
32+
NetworkBehaviour, NotifyHandler, PollParameters, StreamUpgradeError, THandlerInEvent,
3333
THandlerOutEvent, ToSwarm,
3434
};
3535
use void::Void;
@@ -41,7 +41,7 @@ use super::{RunId, RunParams, RunStats};
4141
#[derive(Debug)]
4242
pub struct Event {
4343
pub id: RunId,
44-
pub result: Result<RunStats, ConnectionHandlerUpgrErr<Void>>,
44+
pub result: Result<RunStats, StreamUpgradeError<Void>>,
4545
}
4646

4747
#[derive(Default)]

protocols/perf/src/client/handler.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use libp2p_swarm::{
3131
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
3232
ListenUpgradeError,
3333
},
34-
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, StreamProtocol,
34+
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamProtocol, StreamUpgradeError,
3535
SubstreamProtocol,
3636
};
3737
use void::Void;
@@ -47,7 +47,7 @@ pub struct Command {
4747
#[derive(Debug)]
4848
pub struct Event {
4949
pub(crate) id: RunId,
50-
pub(crate) result: Result<RunStats, ConnectionHandlerUpgrErr<Void>>,
50+
pub(crate) result: Result<RunStats, StreamUpgradeError<Void>>,
5151
}
5252

5353
pub struct Handler {

0 commit comments

Comments
 (0)