Skip to content

Commit 4d384f9

Browse files
authored
test(s2n-quic-dc): implement tokio stream tests for dcQUIC (#2772)
1 parent 3b42424 commit 4d384f9

File tree

18 files changed

+1942
-12
lines changed

18 files changed

+1942
-12
lines changed

dc/s2n-quic-dc/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ bolero = "0.13"
6868
bolero-generator = "0.13"
6969
insta = "1"
7070
s2n-codec = { path = "../../common/s2n-codec", features = ["testing"] }
71+
s2n-quic = { path = "../../quic/s2n-quic" }
7172
s2n-quic-core = { path = "../../quic/s2n-quic-core", features = ["testing"] }
7273
s2n-quic-platform = { path = "../../quic/s2n-quic-platform", features = [
7374
"testing",
@@ -77,4 +78,4 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
7778

7879
[lints.rust.unexpected_cfgs]
7980
level = "warn"
80-
check-cfg = ['cfg(fuzzing)', 'cfg(kani)', 'cfg(todo)']
81+
check-cfg = ['cfg(future)', 'cfg(fuzzing)', 'cfg(kani)', 'cfg(todo)']

dc/s2n-quic-dc/src/psk/client.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ impl Provider {
201201
Ok((peer, HandshakeKind::Fresh))
202202
}
203203

204-
/// Handshake with a peer in the background.
204+
/// Handshake with a peer in the background.
205205
#[inline]
206206
pub fn background_handshake_with(
207207
&self,
@@ -215,7 +215,6 @@ impl Provider {
215215

216216
let client = self.state.client.clone();
217217
if let Some((runtime, _)) = self.state.runtime.as_ref() {
218-
let server_name = server_name.clone();
219218
// Drop the JoinHandle -- we're not actually going to block on the join handle's
220219
// result. The future will keep running in the background.
221220
runtime.spawn(async move {

dc/s2n-quic-dc/src/stream.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ pub const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
88
/// The maximum length of a single packet written to a stream
99
pub const MAX_DATAGRAM_SIZE: usize = 1 << 15; // 32k
1010

11+
pub use crate::stream::socket::Protocol;
12+
1113
pub mod application;
1214
pub mod client;
1315
pub mod crypto;

dc/s2n-quic-dc/src/stream/client/tokio.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ pub mod rpc {
2626
pub use crate::stream::client::rpc::{InMemoryResponse, Request, Response};
2727
}
2828

29-
// This trait is a temporary solution to abstract handshake_with_entry,
30-
// local_addr, and map methods until we implement the handshake provider
29+
// This trait is a solution to abstract handshake_with_entry,
30+
// local_addr, and map methods
3131
#[allow(async_fn_in_trait)]
3232
pub trait Handshake: Clone {
3333
/// Handshake with the remote peer

dc/s2n-quic-dc/src/stream/server/tokio.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use tracing::{trace, Instrument as _};
2525
pub mod tcp;
2626
pub mod udp;
2727

28-
// This trait is a temporary solution to abstract local_addr and map methods until we implement the handshake provider
28+
// This trait is a solution to abstract local_addr and map methods
2929
pub trait Handshake: Clone {
3030
fn local_addr(&self) -> SocketAddr;
3131

dc/s2n-quic-dc/src/stream/testing.rs

Lines changed: 261 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use super::{client::tokio::Client as ClientTokio, server::tokio::Server as ServerTokio};
45
use crate::{
56
either::Either,
67
event::{self, testing},
78
path::secret,
9+
psk::{client::Provider as ClientProvider, server::Provider as ServerProvider},
810
stream::{
911
application, client as stream_client,
1012
environment::{bach, tokio, udp, Environment},
11-
recv, send,
1213
server::{self as stream_server, accept, stats},
1314
socket::Protocol,
1415
},
16+
testing::NoopSubscriber,
1517
};
1618
use s2n_quic_core::dc::{self, ApplicationParams};
1719
use s2n_quic_platform::socket;
@@ -31,8 +33,6 @@ thread_local! {
3133
pub type Subscriber = (Arc<event::testing::Subscriber>, event::tracing::Subscriber);
3234

3335
pub type Stream = application::Stream<Subscriber>;
34-
pub type Writer = send::application::Writer<Subscriber>;
35-
pub type Reader = recv::application::Reader<Subscriber>;
3636

3737
const DEFAULT_POOLED: bool = true;
3838

@@ -47,6 +47,264 @@ pub(crate) const MAX_DATAGRAM_SIZE: u16 = if cfg!(target_os = "linux") {
4747

4848
type Env = Either<tokio::Environment<Subscriber>, bach::Environment<Subscriber>>;
4949

50+
pub fn bind_pair(
51+
protocol: Protocol,
52+
server_addr: SocketAddr,
53+
client: ClientProvider,
54+
server: ServerProvider,
55+
) -> (
56+
ClientTokio<ClientProvider, NoopSubscriber>,
57+
ServerTokio<ServerProvider, NoopSubscriber>,
58+
) {
59+
let test_subscriber = NoopSubscriber {};
60+
let client = ClientTokio::<ClientProvider, NoopSubscriber>::builder()
61+
.with_default_protocol(protocol)
62+
.build(client, test_subscriber.clone())
63+
.unwrap();
64+
65+
let server = ServerTokio::<ServerProvider, NoopSubscriber>::builder()
66+
.with_address(server_addr)
67+
.with_protocol(protocol)
68+
.with_workers(1.try_into().unwrap())
69+
.build(server, test_subscriber)
70+
.unwrap();
71+
72+
(client, server)
73+
}
74+
75+
macro_rules! check_pair_addrs {
76+
($local:ident, $peer:ident) => {
77+
debug_assert_eq!(
78+
$local.local_addr().ok().map(|addr| addr.port()),
79+
$peer.peer_addr().ok().map(|addr| addr.port())
80+
);
81+
};
82+
}
83+
84+
macro_rules! dcquic_context {
85+
($protocol:ident) => {
86+
use super::Protocol;
87+
use std::net::SocketAddr;
88+
89+
pub type Stream = crate::stream::application::Stream<super::NoopSubscriber>;
90+
91+
pub struct Context(super::Context);
92+
93+
#[allow(dead_code)]
94+
impl Context {
95+
pub async fn new() -> Self {
96+
Self(super::Context::new(Protocol::$protocol).await)
97+
}
98+
99+
pub fn new_sync(addr: SocketAddr) -> Self {
100+
Self(super::Context::new_sync(Protocol::$protocol, addr))
101+
}
102+
103+
pub async fn bind(addr: SocketAddr) -> Self {
104+
Self(super::Context::bind(Protocol::$protocol, addr).await)
105+
}
106+
107+
pub fn acceptor_addr(&self) -> SocketAddr {
108+
self.0.acceptor_addr()
109+
}
110+
111+
pub fn handshake_addr(&self) -> SocketAddr {
112+
self.0.handshake_addr()
113+
}
114+
115+
pub async fn pair(&self) -> (Stream, Stream) {
116+
self.0.pair().await
117+
}
118+
119+
pub async fn pair_with(&self, acceptor_addr: SocketAddr) -> (Stream, Stream) {
120+
self.0.pair_with(acceptor_addr).await
121+
}
122+
123+
pub fn protocol(&self) -> Protocol {
124+
self.0.protocol
125+
}
126+
}
127+
};
128+
}
129+
130+
pub mod dcquic {
131+
use crate::{
132+
psk::{client::Provider as ClientProvider, server::Provider as ServerProvider},
133+
stream::{
134+
client::tokio::Client as ClientTokio,
135+
server::tokio::Server as ServerTokio,
136+
socket::Protocol,
137+
testing::{bind_pair, NoopSubscriber},
138+
},
139+
testing::server_name,
140+
};
141+
use std::net::SocketAddr;
142+
143+
pub type Stream = crate::stream::application::Stream<NoopSubscriber>;
144+
145+
pub mod tcp {
146+
dcquic_context!(Tcp);
147+
}
148+
149+
pub mod udp {
150+
dcquic_context!(Udp);
151+
}
152+
153+
pub struct Context {
154+
pub(crate) server: ServerTokio<ServerProvider, NoopSubscriber>,
155+
pub(crate) client: ClientTokio<ClientProvider, NoopSubscriber>,
156+
protocol: Protocol,
157+
}
158+
159+
impl Context {
160+
pub async fn new(protocol: Protocol) -> Self {
161+
if protocol.is_udp() {
162+
Self::bind(protocol, "[::1]:0".parse().unwrap()).await
163+
} else {
164+
Self::bind(protocol, "127.0.0.1:0".parse().unwrap()).await
165+
}
166+
}
167+
168+
pub fn new_sync(protocol: Protocol, addr: SocketAddr) -> Self {
169+
let (client, server) = crate::testing::pair_sync();
170+
let (client, server) = bind_pair(protocol, addr, client, server);
171+
Self {
172+
client,
173+
server,
174+
protocol,
175+
}
176+
}
177+
178+
pub async fn bind(protocol: Protocol, addr: SocketAddr) -> Self {
179+
Self::bind_with(protocol, addr, crate::testing::Pair::default()).await
180+
}
181+
182+
pub async fn bind_with(
183+
protocol: Protocol,
184+
addr: SocketAddr,
185+
pair: crate::testing::Pair,
186+
) -> Self {
187+
let (client, server) = pair.build().await;
188+
let (client, server) = bind_pair(protocol, addr, client, server);
189+
Self {
190+
client,
191+
server,
192+
protocol,
193+
}
194+
}
195+
196+
pub fn acceptor_addr(&self) -> SocketAddr {
197+
self.server.acceptor_addr().expect("acceptor_addr")
198+
}
199+
200+
pub fn handshake_addr(&self) -> SocketAddr {
201+
self.server.handshake_addr().expect("handshake_addr")
202+
}
203+
204+
pub async fn pair(&self) -> (Stream, Stream) {
205+
self.pair_with(self.acceptor_addr()).await
206+
}
207+
208+
pub async fn pair_with(&self, acceptor_addr: SocketAddr) -> (Stream, Stream) {
209+
let handshake_addr = self.handshake_addr();
210+
let (client, server) = tokio::join!(
211+
async move {
212+
self.client
213+
.connect(handshake_addr, acceptor_addr, server_name())
214+
.await
215+
.expect("connect")
216+
},
217+
async move {
218+
let (conn, _) = self.server.accept().await.expect("accept");
219+
conn
220+
}
221+
);
222+
223+
check_pair_addrs!(client, server);
224+
225+
// the client doesn't have a response from the server so it might not know its
226+
// port yet
227+
if self.protocol().is_udp() {
228+
let acceptor_port = acceptor_addr.port();
229+
let server_local_port = server.local_addr().unwrap().port();
230+
let client_peer_port = client.peer_addr().unwrap().port();
231+
assert!(
232+
[acceptor_port, server_local_port].contains(&client_peer_port),
233+
"acceptor_port={acceptor_port}, server_local_port={server_local_port}, client_peer_port={client_peer_port}"
234+
);
235+
} else {
236+
check_pair_addrs!(server, client);
237+
}
238+
239+
(client, server)
240+
}
241+
242+
pub fn protocol(&self) -> Protocol {
243+
self.protocol
244+
}
245+
246+
#[allow(dead_code)]
247+
pub fn split(
248+
self,
249+
) -> (
250+
ClientTokio<ClientProvider, NoopSubscriber>,
251+
ServerTokio<ServerProvider, NoopSubscriber>,
252+
) {
253+
(self.client, self.server)
254+
}
255+
}
256+
}
257+
258+
pub mod tcp {
259+
use super::Protocol;
260+
use std::net::SocketAddr;
261+
use tokio::net::{TcpListener, TcpStream};
262+
263+
pub type Stream = TcpStream;
264+
265+
pub struct Context {
266+
acceptor: TcpListener,
267+
}
268+
269+
impl Context {
270+
pub async fn new() -> Self {
271+
Self::bind("127.0.0.1:0".parse().unwrap()).await
272+
}
273+
274+
pub async fn bind(addr: SocketAddr) -> Self {
275+
let acceptor = TcpListener::bind(addr).await.expect("bind");
276+
Self { acceptor }
277+
}
278+
279+
pub fn acceptor_addr(&self) -> SocketAddr {
280+
self.acceptor.local_addr().expect("acceptor_addr")
281+
}
282+
283+
pub async fn pair(&self) -> (Stream, Stream) {
284+
self.pair_with(self.acceptor_addr()).await
285+
}
286+
287+
pub async fn pair_with(&self, acceptor_addr: SocketAddr) -> (Stream, Stream) {
288+
let (client, server) = tokio::join!(
289+
async move { Stream::connect(acceptor_addr).await.expect("connect") },
290+
async move {
291+
let (conn, _addr) = self.acceptor.accept().await.expect("accept");
292+
conn
293+
}
294+
);
295+
296+
check_pair_addrs!(client, server);
297+
check_pair_addrs!(server, client);
298+
299+
(client, server)
300+
}
301+
302+
pub fn protocol(&self) -> Protocol {
303+
Protocol::Tcp
304+
}
305+
}
306+
}
307+
50308
#[derive(Clone)]
51309
pub struct Client {
52310
map: secret::Map,

dc/s2n-quic-dc/src/stream/tests.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,13 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
mod accept_queue;
5+
mod behavior;
6+
/// A set of tests ensuring we support a large number of peers.
7+
#[cfg(future)] // TODO remove this since they're quite expensive
8+
mod cardinality;
59
mod deterministic;
610
mod idle_timeout;
711
mod key_update;
812
mod request_response;
13+
mod restart;
914
mod rpc;

0 commit comments

Comments
 (0)