Skip to content

Commit 5d740a8

Browse files
authored
refactor(core): blanket implementation of connection upgrade
Introduces blanket implementation of `{In,Out}boundConnectionUpgrade` and uses it in transport upgrade infrastructure. Resolves: #4307. Pull-Request: #4316.
1 parent c52a2fc commit 5d740a8

File tree

3 files changed

+98
-38
lines changed

3 files changed

+98
-38
lines changed

core/src/transport/upgrade.rs

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ use crate::{
3030
TransportError, TransportEvent,
3131
},
3232
upgrade::{
33-
self, apply_inbound, apply_outbound, InboundUpgrade, InboundUpgradeApply, OutboundUpgrade,
34-
OutboundUpgradeApply, UpgradeError,
33+
self, apply_inbound, apply_outbound, InboundConnectionUpgrade, InboundUpgradeApply,
34+
OutboundConnectionUpgrade, OutboundUpgradeApply, UpgradeError,
3535
},
3636
Negotiated,
3737
};
@@ -101,8 +101,8 @@ where
101101
T: Transport<Output = C>,
102102
C: AsyncRead + AsyncWrite + Unpin,
103103
D: AsyncRead + AsyncWrite + Unpin,
104-
U: InboundUpgrade<Negotiated<C>, Output = (PeerId, D), Error = E>,
105-
U: OutboundUpgrade<Negotiated<C>, Output = (PeerId, D), Error = E> + Clone,
104+
U: InboundConnectionUpgrade<Negotiated<C>, Output = (PeerId, D), Error = E>,
105+
U: OutboundConnectionUpgrade<Negotiated<C>, Output = (PeerId, D), Error = E> + Clone,
106106
E: Error + 'static,
107107
{
108108
let version = self.version;
@@ -123,7 +123,7 @@ where
123123
pub struct Authenticate<C, U>
124124
where
125125
C: AsyncRead + AsyncWrite + Unpin,
126-
U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>>,
126+
U: InboundConnectionUpgrade<Negotiated<C>> + OutboundConnectionUpgrade<Negotiated<C>>,
127127
{
128128
#[pin]
129129
inner: EitherUpgrade<C, U>,
@@ -132,11 +132,11 @@ where
132132
impl<C, U> Future for Authenticate<C, U>
133133
where
134134
C: AsyncRead + AsyncWrite + Unpin,
135-
U: InboundUpgrade<Negotiated<C>>
136-
+ OutboundUpgrade<
135+
U: InboundConnectionUpgrade<Negotiated<C>>
136+
+ OutboundConnectionUpgrade<
137137
Negotiated<C>,
138-
Output = <U as InboundUpgrade<Negotiated<C>>>::Output,
139-
Error = <U as InboundUpgrade<Negotiated<C>>>::Error,
138+
Output = <U as InboundConnectionUpgrade<Negotiated<C>>>::Output,
139+
Error = <U as InboundConnectionUpgrade<Negotiated<C>>>::Error,
140140
>,
141141
{
142142
type Output = <EitherUpgrade<C, U> as Future>::Output;
@@ -155,7 +155,7 @@ where
155155
pub struct Multiplex<C, U>
156156
where
157157
C: AsyncRead + AsyncWrite + Unpin,
158-
U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>>,
158+
U: InboundConnectionUpgrade<Negotiated<C>> + OutboundConnectionUpgrade<Negotiated<C>>,
159159
{
160160
peer_id: Option<PeerId>,
161161
#[pin]
@@ -165,8 +165,8 @@ where
165165
impl<C, U, M, E> Future for Multiplex<C, U>
166166
where
167167
C: AsyncRead + AsyncWrite + Unpin,
168-
U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
169-
U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E>,
168+
U: InboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E>,
169+
U: OutboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E>,
170170
{
171171
type Output = Result<(PeerId, M), UpgradeError<E>>;
172172

@@ -208,8 +208,8 @@ where
208208
T: Transport<Output = (PeerId, C)>,
209209
C: AsyncRead + AsyncWrite + Unpin,
210210
D: AsyncRead + AsyncWrite + Unpin,
211-
U: InboundUpgrade<Negotiated<C>, Output = D, Error = E>,
212-
U: OutboundUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
211+
U: InboundConnectionUpgrade<Negotiated<C>, Output = D, Error = E>,
212+
U: OutboundConnectionUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
213213
E: Error + 'static,
214214
{
215215
Authenticated(Builder::new(
@@ -236,8 +236,8 @@ where
236236
T: Transport<Output = (PeerId, C)>,
237237
C: AsyncRead + AsyncWrite + Unpin,
238238
M: StreamMuxer,
239-
U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
240-
U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
239+
U: InboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E>,
240+
U: OutboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
241241
E: Error + 'static,
242242
{
243243
let version = self.0.version;
@@ -269,8 +269,8 @@ where
269269
T: Transport<Output = (PeerId, C)>,
270270
C: AsyncRead + AsyncWrite + Unpin,
271271
M: StreamMuxer,
272-
U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
273-
U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
272+
U: InboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E>,
273+
U: OutboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
274274
E: Error + 'static,
275275
F: for<'a> FnOnce(&'a PeerId, &'a ConnectedPoint) -> U + Clone,
276276
{
@@ -395,8 +395,8 @@ where
395395
T: Transport<Output = (PeerId, C)>,
396396
T::Error: 'static,
397397
C: AsyncRead + AsyncWrite + Unpin,
398-
U: InboundUpgrade<Negotiated<C>, Output = D, Error = E>,
399-
U: OutboundUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
398+
U: InboundConnectionUpgrade<Negotiated<C>, Output = D, Error = E>,
399+
U: OutboundConnectionUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
400400
E: Error + 'static,
401401
{
402402
type Output = (PeerId, D);
@@ -502,7 +502,7 @@ where
502502
/// The [`Transport::Dial`] future of an [`Upgrade`]d transport.
503503
pub struct DialUpgradeFuture<F, U, C>
504504
where
505-
U: OutboundUpgrade<Negotiated<C>>,
505+
U: OutboundConnectionUpgrade<Negotiated<C>>,
506506
C: AsyncRead + AsyncWrite + Unpin,
507507
{
508508
future: Pin<Box<F>>,
@@ -513,7 +513,7 @@ impl<F, U, C, D> Future for DialUpgradeFuture<F, U, C>
513513
where
514514
F: TryFuture<Ok = (PeerId, C)>,
515515
C: AsyncRead + AsyncWrite + Unpin,
516-
U: OutboundUpgrade<Negotiated<C>, Output = D>,
516+
U: OutboundConnectionUpgrade<Negotiated<C>, Output = D>,
517517
U::Error: Error,
518518
{
519519
type Output = Result<(PeerId, D), TransportUpgradeError<F::Error, U::Error>>;
@@ -553,7 +553,7 @@ where
553553

554554
impl<F, U, C> Unpin for DialUpgradeFuture<F, U, C>
555555
where
556-
U: OutboundUpgrade<Negotiated<C>>,
556+
U: OutboundConnectionUpgrade<Negotiated<C>>,
557557
C: AsyncRead + AsyncWrite + Unpin,
558558
{
559559
}
@@ -562,7 +562,7 @@ where
562562
pub struct ListenerUpgradeFuture<F, U, C>
563563
where
564564
C: AsyncRead + AsyncWrite + Unpin,
565-
U: InboundUpgrade<Negotiated<C>>,
565+
U: InboundConnectionUpgrade<Negotiated<C>>,
566566
{
567567
future: Pin<Box<F>>,
568568
upgrade: future::Either<Option<U>, (PeerId, InboundUpgradeApply<C, U>)>,
@@ -572,7 +572,7 @@ impl<F, U, C, D> Future for ListenerUpgradeFuture<F, U, C>
572572
where
573573
F: TryFuture<Ok = (PeerId, C)>,
574574
C: AsyncRead + AsyncWrite + Unpin,
575-
U: InboundUpgrade<Negotiated<C>, Output = D>,
575+
U: InboundConnectionUpgrade<Negotiated<C>, Output = D>,
576576
U::Error: Error,
577577
{
578578
type Output = Result<(PeerId, D), TransportUpgradeError<F::Error, U::Error>>;
@@ -613,6 +613,6 @@ where
613613
impl<F, U, C> Unpin for ListenerUpgradeFuture<F, U, C>
614614
where
615615
C: AsyncRead + AsyncWrite + Unpin,
616-
U: InboundUpgrade<Negotiated<C>>,
616+
U: InboundConnectionUpgrade<Negotiated<C>>,
617617
{
618618
}

core/src/upgrade.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,3 +125,63 @@ pub trait OutboundUpgrade<C>: UpgradeInfo {
125125
/// The `info` is the identifier of the protocol, as produced by `protocol_info`.
126126
fn upgrade_outbound(self, socket: C, info: Self::Info) -> Self::Future;
127127
}
128+
129+
/// Possible upgrade on an inbound connection
130+
pub trait InboundConnectionUpgrade<T>: UpgradeInfo {
131+
/// Output after the upgrade has been successfully negotiated and the handshake performed.
132+
type Output;
133+
/// Possible error during the handshake.
134+
type Error;
135+
/// Future that performs the handshake with the remote.
136+
type Future: Future<Output = Result<Self::Output, Self::Error>>;
137+
138+
/// After we have determined that the remote supports one of the protocols we support, this
139+
/// method is called to start the handshake.
140+
///
141+
/// The `info` is the identifier of the protocol, as produced by `protocol_info`.
142+
fn upgrade_inbound(self, socket: T, info: Self::Info) -> Self::Future;
143+
}
144+
145+
/// Possible upgrade on an outbound connection
146+
pub trait OutboundConnectionUpgrade<T>: UpgradeInfo {
147+
/// Output after the upgrade has been successfully negotiated and the handshake performed.
148+
type Output;
149+
/// Possible error during the handshake.
150+
type Error;
151+
/// Future that performs the handshake with the remote.
152+
type Future: Future<Output = Result<Self::Output, Self::Error>>;
153+
154+
/// After we have determined that the remote supports one of the protocols we support, this
155+
/// method is called to start the handshake.
156+
///
157+
/// The `info` is the identifier of the protocol, as produced by `protocol_info`.
158+
fn upgrade_outbound(self, socket: T, info: Self::Info) -> Self::Future;
159+
}
160+
161+
// Blanket implementation for InboundConnectionUpgrade based on InboundUpgrade for backwards compatibility
162+
impl<U, T> InboundConnectionUpgrade<T> for U
163+
where
164+
U: InboundUpgrade<T>,
165+
{
166+
type Output = <U as InboundUpgrade<T>>::Output;
167+
type Error = <U as InboundUpgrade<T>>::Error;
168+
type Future = <U as InboundUpgrade<T>>::Future;
169+
170+
fn upgrade_inbound(self, socket: T, info: Self::Info) -> Self::Future {
171+
self.upgrade_inbound(socket, info)
172+
}
173+
}
174+
175+
// Blanket implementation for OutboundConnectionUpgrade based on OutboundUpgrade for backwards compatibility
176+
impl<U, T> OutboundConnectionUpgrade<T> for U
177+
where
178+
U: OutboundUpgrade<T>,
179+
{
180+
type Output = <U as OutboundUpgrade<T>>::Output;
181+
type Error = <U as OutboundUpgrade<T>>::Error;
182+
type Future = <U as OutboundUpgrade<T>>::Future;
183+
184+
fn upgrade_outbound(self, socket: T, info: Self::Info) -> Self::Future {
185+
self.upgrade_outbound(socket, info)
186+
}
187+
}

core/src/upgrade/apply.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
1919
// DEALINGS IN THE SOFTWARE.
2020

21-
use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError};
21+
use crate::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade, UpgradeError};
2222
use crate::{connection::ConnectedPoint, Negotiated};
2323
use futures::{future::Either, prelude::*};
2424
use log::debug;
@@ -37,7 +37,7 @@ pub(crate) fn apply<C, U>(
3737
) -> Either<InboundUpgradeApply<C, U>, OutboundUpgradeApply<C, U>>
3838
where
3939
C: AsyncRead + AsyncWrite + Unpin,
40-
U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>>,
40+
U: InboundConnectionUpgrade<Negotiated<C>> + OutboundConnectionUpgrade<Negotiated<C>>,
4141
{
4242
match cp {
4343
ConnectedPoint::Dialer { role_override, .. } if role_override.is_dialer() => {
@@ -51,7 +51,7 @@ where
5151
pub(crate) fn apply_inbound<C, U>(conn: C, up: U) -> InboundUpgradeApply<C, U>
5252
where
5353
C: AsyncRead + AsyncWrite + Unpin,
54-
U: InboundUpgrade<Negotiated<C>>,
54+
U: InboundConnectionUpgrade<Negotiated<C>>,
5555
{
5656
InboundUpgradeApply {
5757
inner: InboundUpgradeApplyState::Init {
@@ -65,7 +65,7 @@ where
6565
pub(crate) fn apply_outbound<C, U>(conn: C, up: U, v: Version) -> OutboundUpgradeApply<C, U>
6666
where
6767
C: AsyncRead + AsyncWrite + Unpin,
68-
U: OutboundUpgrade<Negotiated<C>>,
68+
U: OutboundConnectionUpgrade<Negotiated<C>>,
6969
{
7070
OutboundUpgradeApply {
7171
inner: OutboundUpgradeApplyState::Init {
@@ -79,7 +79,7 @@ where
7979
pub struct InboundUpgradeApply<C, U>
8080
where
8181
C: AsyncRead + AsyncWrite + Unpin,
82-
U: InboundUpgrade<Negotiated<C>>,
82+
U: InboundConnectionUpgrade<Negotiated<C>>,
8383
{
8484
inner: InboundUpgradeApplyState<C, U>,
8585
}
@@ -88,7 +88,7 @@ where
8888
enum InboundUpgradeApplyState<C, U>
8989
where
9090
C: AsyncRead + AsyncWrite + Unpin,
91-
U: InboundUpgrade<Negotiated<C>>,
91+
U: InboundConnectionUpgrade<Negotiated<C>>,
9292
{
9393
Init {
9494
future: ListenerSelectFuture<C, U::Info>,
@@ -104,14 +104,14 @@ where
104104
impl<C, U> Unpin for InboundUpgradeApply<C, U>
105105
where
106106
C: AsyncRead + AsyncWrite + Unpin,
107-
U: InboundUpgrade<Negotiated<C>>,
107+
U: InboundConnectionUpgrade<Negotiated<C>>,
108108
{
109109
}
110110

111111
impl<C, U> Future for InboundUpgradeApply<C, U>
112112
where
113113
C: AsyncRead + AsyncWrite + Unpin,
114-
U: InboundUpgrade<Negotiated<C>>,
114+
U: InboundConnectionUpgrade<Negotiated<C>>,
115115
{
116116
type Output = Result<U::Output, UpgradeError<U::Error>>;
117117

@@ -162,15 +162,15 @@ where
162162
pub struct OutboundUpgradeApply<C, U>
163163
where
164164
C: AsyncRead + AsyncWrite + Unpin,
165-
U: OutboundUpgrade<Negotiated<C>>,
165+
U: OutboundConnectionUpgrade<Negotiated<C>>,
166166
{
167167
inner: OutboundUpgradeApplyState<C, U>,
168168
}
169169

170170
enum OutboundUpgradeApplyState<C, U>
171171
where
172172
C: AsyncRead + AsyncWrite + Unpin,
173-
U: OutboundUpgrade<Negotiated<C>>,
173+
U: OutboundConnectionUpgrade<Negotiated<C>>,
174174
{
175175
Init {
176176
future: DialerSelectFuture<C, <U::InfoIter as IntoIterator>::IntoIter>,
@@ -186,14 +186,14 @@ where
186186
impl<C, U> Unpin for OutboundUpgradeApply<C, U>
187187
where
188188
C: AsyncRead + AsyncWrite + Unpin,
189-
U: OutboundUpgrade<Negotiated<C>>,
189+
U: OutboundConnectionUpgrade<Negotiated<C>>,
190190
{
191191
}
192192

193193
impl<C, U> Future for OutboundUpgradeApply<C, U>
194194
where
195195
C: AsyncRead + AsyncWrite + Unpin,
196-
U: OutboundUpgrade<Negotiated<C>>,
196+
U: OutboundConnectionUpgrade<Negotiated<C>>,
197197
{
198198
type Output = Result<U::Output, UpgradeError<U::Error>>;
199199

0 commit comments

Comments
 (0)