Skip to content

Commit 8dc0188

Browse files
authored
swarm/src/connection: Test max_negotiating_inbound_streams (#2785)
Test that `HandlerWrapper` upholds the provided `max_negotiating_inbound_streams` limit.
1 parent 6a9fa3d commit 8dc0188

File tree

5 files changed

+279
-0
lines changed

5 files changed

+279
-0
lines changed

core/src/upgrade.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ mod error;
6464
mod from_fn;
6565
mod map;
6666
mod optional;
67+
mod pending;
6768
mod select;
6869
mod transfer;
6970

@@ -77,6 +78,7 @@ pub use self::{
7778
from_fn::{from_fn, FromFnUpgrade},
7879
map::{MapInboundUpgrade, MapInboundUpgradeErr, MapOutboundUpgrade, MapOutboundUpgradeErr},
7980
optional::OptionalUpgrade,
81+
pending::PendingUpgrade,
8082
select::SelectUpgrade,
8183
transfer::{read_length_prefixed, read_varint, write_length_prefixed, write_varint},
8284
};

core/src/upgrade/pending.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright 2022 Protocol Labs.
2+
// Copyright 2017-2018 Parity Technologies (UK) Ltd.
3+
//
4+
// Permission is hereby granted, free of charge, to any person obtaining a
5+
// copy of this software and associated documentation files (the "Software"),
6+
// to deal in the Software without restriction, including without limitation
7+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
8+
// and/or sell copies of the Software, and to permit persons to whom the
9+
// Software is furnished to do so, subject to the following conditions:
10+
//
11+
// The above copyright notice and this permission notice shall be included in
12+
// all copies or substantial portions of the Software.
13+
//
14+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20+
// DEALINGS IN THE SOFTWARE.
21+
22+
use crate::upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo};
23+
use futures::future;
24+
use std::iter;
25+
use void::Void;
26+
27+
/// Implementation of [`UpgradeInfo`], [`InboundUpgrade`] and [`OutboundUpgrade`] that always
28+
/// returns a pending upgrade.
29+
#[derive(Debug, Copy, Clone)]
30+
pub struct PendingUpgrade<P> {
31+
protocol_name: P,
32+
}
33+
34+
impl<P> PendingUpgrade<P> {
35+
pub fn new(protocol_name: P) -> Self {
36+
Self { protocol_name }
37+
}
38+
}
39+
40+
impl<P> UpgradeInfo for PendingUpgrade<P>
41+
where
42+
P: ProtocolName + Clone,
43+
{
44+
type Info = P;
45+
type InfoIter = iter::Once<P>;
46+
47+
fn protocol_info(&self) -> Self::InfoIter {
48+
iter::once(self.protocol_name.clone())
49+
}
50+
}
51+
52+
impl<C, P> InboundUpgrade<C> for PendingUpgrade<P>
53+
where
54+
P: ProtocolName + Clone,
55+
{
56+
type Output = Void;
57+
type Error = Void;
58+
type Future = future::Pending<Result<Self::Output, Self::Error>>;
59+
60+
fn upgrade_inbound(self, _: C, _: Self::Info) -> Self::Future {
61+
future::pending()
62+
}
63+
}
64+
65+
impl<C, P> OutboundUpgrade<C> for PendingUpgrade<P>
66+
where
67+
P: ProtocolName + Clone,
68+
{
69+
type Output = Void;
70+
type Error = Void;
71+
type Future = future::Pending<Result<Self::Output, Self::Error>>;
72+
73+
fn upgrade_outbound(self, _: C, _: Self::Info) -> Self::Future {
74+
future::pending()
75+
}
76+
}

swarm/src/connection/handler_wrapper.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,3 +440,82 @@ pub enum Event<TOutboundOpenInfo, TCustom> {
440440
/// Other event.
441441
Custom(TCustom),
442442
}
443+
444+
#[cfg(test)]
445+
mod tests {
446+
use super::*;
447+
use crate::handler::PendingConnectionHandler;
448+
use quickcheck::*;
449+
use std::sync::Arc;
450+
451+
#[test]
452+
fn max_negotiating_inbound_streams() {
453+
fn prop(max_negotiating_inbound_streams: u8) {
454+
let max_negotiating_inbound_streams: usize = max_negotiating_inbound_streams.into();
455+
let mut wrapper = HandlerWrapper::new(
456+
PeerId::random(),
457+
ConnectedPoint::Listener {
458+
local_addr: Multiaddr::empty(),
459+
send_back_addr: Multiaddr::empty(),
460+
},
461+
PendingConnectionHandler::new("test".to_string()),
462+
None,
463+
max_negotiating_inbound_streams,
464+
);
465+
let alive_substreams_counter = Arc::new(());
466+
467+
for _ in 0..max_negotiating_inbound_streams {
468+
let substream =
469+
SubstreamBox::new(PendingSubstream(alive_substreams_counter.clone()));
470+
wrapper.inject_substream(substream, SubstreamEndpoint::Listener);
471+
}
472+
473+
assert_eq!(
474+
Arc::strong_count(&alive_substreams_counter),
475+
max_negotiating_inbound_streams + 1,
476+
"Expect none of the substreams up to the limit to be dropped."
477+
);
478+
479+
let substream = SubstreamBox::new(PendingSubstream(alive_substreams_counter.clone()));
480+
wrapper.inject_substream(substream, SubstreamEndpoint::Listener);
481+
482+
assert_eq!(
483+
Arc::strong_count(&alive_substreams_counter),
484+
max_negotiating_inbound_streams + 1,
485+
"Expect substream exceeding the limit to be dropped."
486+
);
487+
}
488+
489+
QuickCheck::new().quickcheck(prop as fn(_));
490+
}
491+
492+
struct PendingSubstream(Arc<()>);
493+
494+
impl AsyncRead for PendingSubstream {
495+
fn poll_read(
496+
self: Pin<&mut Self>,
497+
_cx: &mut Context<'_>,
498+
_buf: &mut [u8],
499+
) -> Poll<std::io::Result<usize>> {
500+
Poll::Pending
501+
}
502+
}
503+
504+
impl AsyncWrite for PendingSubstream {
505+
fn poll_write(
506+
self: Pin<&mut Self>,
507+
_cx: &mut Context<'_>,
508+
_buf: &[u8],
509+
) -> Poll<std::io::Result<usize>> {
510+
Poll::Pending
511+
}
512+
513+
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
514+
Poll::Pending
515+
}
516+
517+
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
518+
Poll::Pending
519+
}
520+
}
521+
}

swarm/src/handler.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ mod map_in;
4444
mod map_out;
4545
pub mod multi;
4646
mod one_shot;
47+
mod pending;
4748
mod select;
4849

4950
pub use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper, UpgradeInfoSend};
@@ -56,6 +57,7 @@ pub use dummy::DummyConnectionHandler;
5657
pub use map_in::MapInEvent;
5758
pub use map_out::MapOutEvent;
5859
pub use one_shot::{OneShotHandler, OneShotHandlerConfig};
60+
pub use pending::PendingConnectionHandler;
5961
pub use select::{ConnectionHandlerSelect, IntoConnectionHandlerSelect};
6062

6163
/// A handler for a set of protocols used on a connection with a remote.

swarm/src/handler/pending.rs

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Copyright 2022 Protocol Labs.
2+
// Copyright 2018 Parity Technologies (UK) Ltd.
3+
//
4+
// Permission is hereby granted, free of charge, to any person obtaining a
5+
// copy of this software and associated documentation files (the "Software"),
6+
// to deal in the Software without restriction, including without limitation
7+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
8+
// and/or sell copies of the Software, and to permit persons to whom the
9+
// Software is furnished to do so, subject to the following conditions:
10+
//
11+
// The above copyright notice and this permission notice shall be included in
12+
// all copies or substantial portions of the Software.
13+
//
14+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20+
// DEALINGS IN THE SOFTWARE.
21+
22+
use crate::handler::{
23+
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
24+
SubstreamProtocol,
25+
};
26+
use crate::NegotiatedSubstream;
27+
use libp2p_core::{
28+
upgrade::{InboundUpgrade, OutboundUpgrade, PendingUpgrade},
29+
Multiaddr,
30+
};
31+
use std::task::{Context, Poll};
32+
use void::Void;
33+
34+
/// Implementation of [`ConnectionHandler`] that returns a pending upgrade.
35+
#[derive(Clone, Debug)]
36+
pub struct PendingConnectionHandler {
37+
protocol_name: String,
38+
}
39+
40+
impl PendingConnectionHandler {
41+
pub fn new(protocol_name: String) -> Self {
42+
PendingConnectionHandler { protocol_name }
43+
}
44+
}
45+
46+
impl ConnectionHandler for PendingConnectionHandler {
47+
type InEvent = Void;
48+
type OutEvent = Void;
49+
type Error = Void;
50+
type InboundProtocol = PendingUpgrade<String>;
51+
type OutboundProtocol = PendingUpgrade<String>;
52+
type OutboundOpenInfo = Void;
53+
type InboundOpenInfo = ();
54+
55+
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
56+
SubstreamProtocol::new(PendingUpgrade::new(self.protocol_name.clone()), ())
57+
}
58+
59+
fn inject_fully_negotiated_inbound(
60+
&mut self,
61+
protocol: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
62+
_: Self::InboundOpenInfo,
63+
) {
64+
void::unreachable(protocol)
65+
}
66+
67+
fn inject_fully_negotiated_outbound(
68+
&mut self,
69+
protocol: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
70+
_info: Self::OutboundOpenInfo,
71+
) {
72+
void::unreachable(protocol);
73+
#[allow(unreachable_code)]
74+
{
75+
void::unreachable(_info);
76+
}
77+
}
78+
79+
fn inject_event(&mut self, v: Self::InEvent) {
80+
void::unreachable(v)
81+
}
82+
83+
fn inject_address_change(&mut self, _: &Multiaddr) {}
84+
85+
fn inject_dial_upgrade_error(
86+
&mut self,
87+
_: Self::OutboundOpenInfo,
88+
_: ConnectionHandlerUpgrErr<
89+
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error,
90+
>,
91+
) {
92+
}
93+
94+
fn inject_listen_upgrade_error(
95+
&mut self,
96+
_: Self::InboundOpenInfo,
97+
_: ConnectionHandlerUpgrErr<
98+
<Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Error,
99+
>,
100+
) {
101+
}
102+
103+
fn connection_keep_alive(&self) -> KeepAlive {
104+
KeepAlive::No
105+
}
106+
107+
fn poll(
108+
&mut self,
109+
_: &mut Context<'_>,
110+
) -> Poll<
111+
ConnectionHandlerEvent<
112+
Self::OutboundProtocol,
113+
Self::OutboundOpenInfo,
114+
Self::OutEvent,
115+
Self::Error,
116+
>,
117+
> {
118+
Poll::Pending
119+
}
120+
}

0 commit comments

Comments
 (0)