Skip to content

Commit 49acf2e

Browse files
swarm/src/protocols_handler: Impl ProtocolsHandler on either::Either (#2192)
Implement ProtocolsHandler on either::Either representing either of two ProtocolsHandler implementations. Co-authored-by: Thomas Eizinger <[email protected]>
1 parent c58f697 commit 49acf2e

File tree

3 files changed

+312
-2
lines changed

3 files changed

+312
-2
lines changed

swarm/CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,13 @@
1313
- Require `ProtocolsHandler::{InEvent,OutEvent,Error}` to implement `Debug` (see
1414
[PR 2183]).
1515

16-
[PR 2150]: https://github.com/libp2p/rust-libp2p/pull/2150/
16+
- Implement `ProtocolsHandler` on `either::Either`representing either of two
17+
`ProtocolsHandler` implementations (see [PR 2192]).
18+
19+
[PR 2150]: https://github.com/libp2p/rust-libp2p/pull/2150
1720
[PR 2182]: https://github.com/libp2p/rust-libp2p/pull/2182
1821
[PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183
22+
[PR 2192]: https://github.com/libp2p/rust-libp2p/pull/2192
1923

2024
# 0.30.0 [2021-07-12]
2125

swarm/src/protocols_handler.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,15 @@
3838
//! > the network as a whole, see the `NetworkBehaviour` trait.
3939
4040
mod dummy;
41+
pub mod either;
4142
mod map_in;
4243
mod map_out;
4344
pub mod multi;
4445
mod node_handler;
4546
mod one_shot;
4647
mod select;
4748

48-
pub use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, UpgradeInfoSend};
49+
pub use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper, UpgradeInfoSend};
4950

5051
use libp2p_core::{upgrade::UpgradeError, ConnectedPoint, Multiaddr, PeerId};
5152
use std::{cmp::Ordering, error, fmt, task::Context, task::Poll, time::Duration};
Lines changed: 305 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
// Copyright 2021 Protocol Labs.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
use crate::protocols_handler::{
22+
IntoProtocolsHandler, KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent,
23+
ProtocolsHandlerUpgrErr, SubstreamProtocol,
24+
};
25+
use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper};
26+
use either::Either;
27+
use libp2p_core::either::{EitherError, EitherOutput};
28+
use libp2p_core::upgrade::{EitherUpgrade, UpgradeError};
29+
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId};
30+
use std::task::{Context, Poll};
31+
32+
pub enum IntoEitherHandler<L, R> {
33+
Left(L),
34+
Right(R),
35+
}
36+
37+
/// Implementation of a [`IntoProtocolsHandler`] that represents either of two [`IntoProtocolsHandler`]
38+
/// implementations.
39+
impl<L, R> IntoProtocolsHandler for IntoEitherHandler<L, R>
40+
where
41+
L: IntoProtocolsHandler,
42+
R: IntoProtocolsHandler,
43+
{
44+
type Handler = Either<L::Handler, R::Handler>;
45+
46+
fn into_handler(self, p: &PeerId, c: &ConnectedPoint) -> Self::Handler {
47+
match self {
48+
IntoEitherHandler::Left(into_handler) => Either::Left(into_handler.into_handler(p, c)),
49+
IntoEitherHandler::Right(into_handler) => {
50+
Either::Right(into_handler.into_handler(p, c))
51+
}
52+
}
53+
}
54+
55+
fn inbound_protocol(&self) -> <Self::Handler as ProtocolsHandler>::InboundProtocol {
56+
match self {
57+
IntoEitherHandler::Left(into_handler) => {
58+
EitherUpgrade::A(SendWrapper(into_handler.inbound_protocol()))
59+
}
60+
IntoEitherHandler::Right(into_handler) => {
61+
EitherUpgrade::B(SendWrapper(into_handler.inbound_protocol()))
62+
}
63+
}
64+
}
65+
}
66+
67+
/// Implementation of a [`ProtocolsHandler`] that represents either of two [`ProtocolsHandler`]
68+
/// implementations.
69+
impl<L, R> ProtocolsHandler for Either<L, R>
70+
where
71+
L: ProtocolsHandler,
72+
R: ProtocolsHandler,
73+
{
74+
type InEvent = Either<L::InEvent, R::InEvent>;
75+
type OutEvent = Either<L::OutEvent, R::OutEvent>;
76+
type Error = Either<L::Error, R::Error>;
77+
type InboundProtocol =
78+
EitherUpgrade<SendWrapper<L::InboundProtocol>, SendWrapper<R::InboundProtocol>>;
79+
type OutboundProtocol =
80+
EitherUpgrade<SendWrapper<L::OutboundProtocol>, SendWrapper<R::OutboundProtocol>>;
81+
type InboundOpenInfo = Either<L::InboundOpenInfo, R::InboundOpenInfo>;
82+
type OutboundOpenInfo = Either<L::OutboundOpenInfo, R::OutboundOpenInfo>;
83+
84+
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
85+
match self {
86+
Either::Left(a) => a
87+
.listen_protocol()
88+
.map_upgrade(|u| EitherUpgrade::A(SendWrapper(u)))
89+
.map_info(Either::Left),
90+
Either::Right(b) => b
91+
.listen_protocol()
92+
.map_upgrade(|u| EitherUpgrade::B(SendWrapper(u)))
93+
.map_info(Either::Right),
94+
}
95+
}
96+
97+
fn inject_fully_negotiated_outbound(
98+
&mut self,
99+
output: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
100+
info: Self::OutboundOpenInfo,
101+
) {
102+
match (self, output, info) {
103+
(Either::Left(handler), EitherOutput::First(output), Either::Left(info)) => {
104+
handler.inject_fully_negotiated_outbound(output, info)
105+
}
106+
(Either::Right(handler), EitherOutput::Second(output), Either::Right(info)) => {
107+
handler.inject_fully_negotiated_outbound(output, info)
108+
}
109+
_ => unreachable!(),
110+
}
111+
}
112+
113+
fn inject_fully_negotiated_inbound(
114+
&mut self,
115+
output: <Self::InboundProtocol as InboundUpgradeSend>::Output,
116+
info: Self::InboundOpenInfo,
117+
) {
118+
match (self, output, info) {
119+
(Either::Left(handler), EitherOutput::First(output), Either::Left(info)) => {
120+
handler.inject_fully_negotiated_inbound(output, info)
121+
}
122+
(Either::Right(handler), EitherOutput::Second(output), Either::Right(info)) => {
123+
handler.inject_fully_negotiated_inbound(output, info)
124+
}
125+
_ => unreachable!(),
126+
}
127+
}
128+
129+
fn inject_event(&mut self, event: Self::InEvent) {
130+
match (self, event) {
131+
(Either::Left(handler), Either::Left(event)) => handler.inject_event(event),
132+
(Either::Right(handler), Either::Right(event)) => handler.inject_event(event),
133+
_ => unreachable!(),
134+
}
135+
}
136+
137+
fn inject_address_change(&mut self, addr: &Multiaddr) {
138+
match self {
139+
Either::Left(handler) => handler.inject_address_change(addr),
140+
Either::Right(handler) => handler.inject_address_change(addr),
141+
}
142+
}
143+
144+
fn inject_dial_upgrade_error(
145+
&mut self,
146+
info: Self::OutboundOpenInfo,
147+
error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>,
148+
) {
149+
match error {
150+
ProtocolsHandlerUpgrErr::Timer => match (self, info) {
151+
(Either::Left(handler), Either::Left(info)) => {
152+
handler.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timer);
153+
}
154+
(Either::Right(handler), Either::Right(info)) => {
155+
handler.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timer);
156+
}
157+
_ => unreachable!(),
158+
},
159+
ProtocolsHandlerUpgrErr::Timeout => match (self, info) {
160+
(Either::Left(handler), Either::Left(info)) => {
161+
handler.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timeout);
162+
}
163+
(Either::Right(handler), Either::Right(info)) => {
164+
handler.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timeout);
165+
}
166+
_ => unreachable!(),
167+
},
168+
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(error)) => match (self, info) {
169+
(Either::Left(handler), Either::Left(info)) => {
170+
handler.inject_dial_upgrade_error(
171+
info,
172+
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
173+
);
174+
}
175+
(Either::Right(handler), Either::Right(info)) => {
176+
handler.inject_dial_upgrade_error(
177+
info,
178+
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
179+
);
180+
}
181+
_ => unreachable!(),
182+
},
183+
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => {
184+
match (self, info) {
185+
(Either::Left(handler), Either::Left(info)) => {
186+
handler.inject_dial_upgrade_error(
187+
info,
188+
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
189+
);
190+
}
191+
_ => unreachable!(),
192+
}
193+
}
194+
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(e))) => {
195+
match (self, info) {
196+
(Either::Right(handler), Either::Right(info)) => {
197+
handler.inject_dial_upgrade_error(
198+
info,
199+
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
200+
);
201+
}
202+
_ => unreachable!(),
203+
}
204+
}
205+
}
206+
}
207+
208+
fn inject_listen_upgrade_error(
209+
&mut self,
210+
info: Self::InboundOpenInfo,
211+
error: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>,
212+
) {
213+
match error {
214+
ProtocolsHandlerUpgrErr::Timer => match (self, info) {
215+
(Either::Left(handler), Either::Left(info)) => {
216+
handler.inject_listen_upgrade_error(info, ProtocolsHandlerUpgrErr::Timer);
217+
}
218+
(Either::Right(handler), Either::Right(info)) => {
219+
handler.inject_listen_upgrade_error(info, ProtocolsHandlerUpgrErr::Timer);
220+
}
221+
_ => unreachable!(),
222+
},
223+
ProtocolsHandlerUpgrErr::Timeout => match (self, info) {
224+
(Either::Left(handler), Either::Left(info)) => {
225+
handler.inject_listen_upgrade_error(info, ProtocolsHandlerUpgrErr::Timeout);
226+
}
227+
(Either::Right(handler), Either::Right(info)) => {
228+
handler.inject_listen_upgrade_error(info, ProtocolsHandlerUpgrErr::Timeout);
229+
}
230+
_ => unreachable!(),
231+
},
232+
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(error)) => match (self, info) {
233+
(Either::Left(handler), Either::Left(info)) => {
234+
handler.inject_listen_upgrade_error(
235+
info,
236+
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
237+
);
238+
}
239+
(Either::Right(handler), Either::Right(info)) => {
240+
handler.inject_listen_upgrade_error(
241+
info,
242+
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
243+
);
244+
}
245+
_ => unreachable!(),
246+
},
247+
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => {
248+
match (self, info) {
249+
(Either::Left(handler), Either::Left(info)) => {
250+
handler.inject_listen_upgrade_error(
251+
info,
252+
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
253+
);
254+
}
255+
_ => unreachable!(),
256+
}
257+
}
258+
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(e))) => {
259+
match (self, info) {
260+
(Either::Right(handler), Either::Right(info)) => {
261+
handler.inject_listen_upgrade_error(
262+
info,
263+
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
264+
);
265+
}
266+
_ => unreachable!(),
267+
}
268+
}
269+
}
270+
}
271+
272+
fn connection_keep_alive(&self) -> KeepAlive {
273+
match self {
274+
Either::Left(handler) => handler.connection_keep_alive(),
275+
Either::Right(handler) => handler.connection_keep_alive(),
276+
}
277+
}
278+
279+
fn poll(
280+
&mut self,
281+
cx: &mut Context<'_>,
282+
) -> Poll<
283+
ProtocolsHandlerEvent<
284+
Self::OutboundProtocol,
285+
Self::OutboundOpenInfo,
286+
Self::OutEvent,
287+
Self::Error,
288+
>,
289+
> {
290+
let event = match self {
291+
Either::Left(handler) => futures::ready!(handler.poll(cx))
292+
.map_custom(Either::Left)
293+
.map_close(Either::Left)
294+
.map_protocol(|p| EitherUpgrade::A(SendWrapper(p)))
295+
.map_outbound_open_info(Either::Left),
296+
Either::Right(handler) => futures::ready!(handler.poll(cx))
297+
.map_custom(Either::Right)
298+
.map_close(Either::Right)
299+
.map_protocol(|p| EitherUpgrade::B(SendWrapper(p)))
300+
.map_outbound_open_info(Either::Right),
301+
};
302+
303+
Poll::Ready(event)
304+
}
305+
}

0 commit comments

Comments
 (0)