Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions quic/s2n-quic-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ exclude = ["corpus.tar.gz"]
[features]
default = ["alloc", "std"]
alloc = ["atomic-waker", "bytes", "crossbeam-utils", "s2n-codec/alloc"]
std = ["alloc", "once_cell"]
std = ["alloc", "once_cell", "futures-channel", "futures"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wondering if it would make more sense to have futures-channel and futures be part of the unstable-offload-tls feature in this crate. Since you don't really need those dependencies if you are only interested in using std and not the TLS offloading feature

testing = ["std", "generator", "s2n-codec/testing", "checked-counters", "insta", "futures-test"]
generator = ["bolero-generator"]
checked-counters = []
Expand Down Expand Up @@ -47,8 +47,8 @@ tracing = { version = "0.1", default-features = false, optional = true }
zerocopy = { version = "0.8", features = ["derive"] }
futures-test = { version = "0.3", optional = true } # For testing Waker interactions
once_cell = { version = "1", optional = true }
futures-channel = "0.3.31"
futures = "0.3"
futures-channel = { version = "0.3", default-features = false, optional=true, features = ["std", "alloc"]}
futures = { version = "0.3", default-features = false, optional=true, features = ["std", "alloc"]}
Comment on lines +50 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like there was an intention to have the dependencies be alphabetically ordered (through the last two in the list broke that). Could you move these and future-test and once_cell?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


[dev-dependencies]
bolero = "0.13"
Expand Down
1 change: 1 addition & 0 deletions quic/s2n-quic-core/src/crypto/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod null;
#[cfg(feature = "alloc")]
pub mod slow_tls;

#[cfg(feature = "std")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[cfg(feature = "std")]
#[cfg(feature = "unstable-offload-tls")]

pub mod offload;

/// Holds all application parameters which are exchanged within the TLS handshake.
Expand Down
61 changes: 40 additions & 21 deletions quic/s2n-quic-core/src/crypto/tls/offload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,27 @@ use crate::{
},
transport,
};
use alloc::{sync::Arc, task::Wake, vec, vec::Vec};
use core::{
any::Any,
future::Future,
pin::Pin,
task::{Poll, Waker},
task::{Context, Poll, Waker},
};
use futures::{prelude::Stream, task};
use futures_channel::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot::{Receiver, Sender},
};
use std::{sync::Arc, task::Wake, thread};
use std::thread;

type SessionProducer<E> = (
<E as tls::Endpoint>::Session,
UnboundedSender<Request<<E as tls::Endpoint>::Session>>,
);
pub struct OffloadEndpoint<E: tls::Endpoint> {
new_session: UnboundedSender<SessionProducer<E>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this called new_session?

_thread: std::thread::JoinHandle<()>,
_thread: thread::JoinHandle<()>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this needed?

inner: E,
remote_thread_waker: Waker,
}
Expand All @@ -37,10 +39,10 @@ impl<E: tls::Endpoint> OffloadEndpoint<E> {

let handle = thread::spawn(move || {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to spawn a new thread per session. This will essentially be an unbounded queue of spawning threads.

let mut sessions = vec![];
let waker = Waker::from(Arc::new(ThreadWaker(std::thread::current())));
let waker = Waker::from(Arc::new(ThreadWaker(thread::current())));

loop {
let mut cx = task::Context::from_waker(&waker);
let mut cx = Context::from_waker(&waker);

// Add incoming sessions to queue
while let Poll::Ready(Some((new_session, tx))) =
Expand Down Expand Up @@ -76,7 +78,8 @@ impl<E: tls::Endpoint> OffloadEndpoint<E> {
}
}
sessions = next_sessions;
std::thread::park();

thread::park();
}
});

Expand All @@ -92,7 +95,7 @@ impl<E: tls::Endpoint> OffloadEndpoint<E> {
struct ThreadWaker(thread::Thread);

impl Wake for ThreadWaker {
fn wake(self: std::sync::Arc<Self>) {
fn wake(self: Arc<Self>) {
self.0.unpark();
}
}
Expand Down Expand Up @@ -131,6 +134,7 @@ impl<E: tls::Endpoint> tls::Endpoint for OffloadEndpoint<E> {

#[derive(Debug)]
pub struct OffloadSession<S: tls::Session> {
// Inner is none while remote thread has the session
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Inner is none while remote thread has the session
// Inner is `None` while remote thread has the session

inner: Option<S>,
is_poll_done: Option<Result<(), crate::transport::Error>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming on this seems a little off, since its not a boolean. Is there something better?

pending_requests: UnboundedReceiver<Request<S>>,
Expand All @@ -146,7 +150,7 @@ impl<S: tls::Session> OffloadSession<S> {
// Channel to pass requests from remote TLS thread to main thread
let (tx, rx) = futures_channel::mpsc::unbounded::<Request<S>>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we really want to avoid any kind of unbounded queues


// Send the session to the TLS thread
// Send the session to the TLS thread. It will pass it back when the handshake has finished.
let _ = new_session.unbounded_send((inner, tx));

Self {
Expand All @@ -167,10 +171,11 @@ impl<S: tls::Session> tls::Session for OffloadSession<S> {
if let Some(finished) = self.is_poll_done {
return Poll::Ready(finished);
}

// This will wake up the TLS remote thread
self.waker.wake_by_ref();

loop {
let mut cx = std::task::Context::from_waker(context.waker());
let mut cx = Context::from_waker(context.waker());

let req = match Pin::new(&mut self.pending_requests).poll_next(&mut cx) {
Poll::Ready(Some(request)) => request,
Expand Down Expand Up @@ -222,7 +227,9 @@ impl<S: tls::Session> tls::Session for OffloadSession<S> {
Request::ApplicationProtocol(application_protocol) => {
context.on_application_protocol(application_protocol)?;
}
Request::HandshakeComplete => context.on_handshake_complete()?,
Request::HandshakeComplete => {
context.on_handshake_complete()?;
}
Request::CanSendInitial(sender) => {
let _ = sender.send(context.can_send_initial());
}
Expand All @@ -231,19 +238,31 @@ impl<S: tls::Session> tls::Session for OffloadSession<S> {
let _ = sender.send(resp);
}
Request::ReceiveApplication(max_len, sender) => {
let _ = sender.send(context.receive_application(max_len));
let resp = context.receive_application(max_len);
let _ = sender.send(resp);
}
Request::ReceiveHandshake(max_len, sender) => {
let _ = sender.send(context.receive_handshake(max_len));
let resp = context.receive_handshake(max_len);
if resp.is_some() {
// We need to wake up the s2n-quic endpoint after providing
// handshake packets to the TLS provider as there may now be
// handshake data that needs to be sent in response.
Comment on lines +247 to +249
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is special about the handshake packets that require this? ie why doesn't ReceiveInitial need it too?

context.waker().wake_by_ref();
}
let _ = sender.send(resp);
}
Request::CanSendHandshake(sender) => {
let _ = sender.send(context.can_send_handshake());
}
Request::CanSendApplication(sender) => {
let _ = sender.send(context.can_send_application());
}
Request::SendApplication(bytes) => context.send_application(bytes),
Request::SendHandshake(bytes) => context.send_handshake(bytes),
Request::SendApplication(bytes) => {
context.send_application(bytes);
}
Request::SendHandshake(bytes) => {
context.send_handshake(bytes);
}
Request::SendInitial(bytes) => context.send_initial(bytes),
Request::KeyExchangeGroup(named_group) => {
context.on_key_exchange_group(named_group)?;
Expand Down Expand Up @@ -422,7 +441,7 @@ impl<S: CryptoSuite> tls::Context<S> for RemoteContext<S> {
}

fn receive_initial(&mut self, max_len: Option<usize>) -> Option<bytes::Bytes> {
let mut cx = core::task::Context::from_waker(&self.waker);
let mut cx = Context::from_waker(&self.waker);
if let Poll::Ready(resp) = self.receive_initial.poll_request(&mut cx, |tx| {
let _ = self.tx.unbounded_send(Request::ReceiveInitial(max_len, tx));
}) {
Expand All @@ -433,7 +452,7 @@ impl<S: CryptoSuite> tls::Context<S> for RemoteContext<S> {
}

fn receive_handshake(&mut self, max_len: Option<usize>) -> Option<bytes::Bytes> {
let mut cx = core::task::Context::from_waker(&self.waker);
let mut cx = Context::from_waker(&self.waker);
if let Poll::Ready(resp) = self.receive_handshake.poll_request(&mut cx, |tx| {
let _ = self
.tx
Expand All @@ -446,7 +465,7 @@ impl<S: CryptoSuite> tls::Context<S> for RemoteContext<S> {
}

fn receive_application(&mut self, max_len: Option<usize>) -> Option<bytes::Bytes> {
let mut cx = core::task::Context::from_waker(&self.waker);
let mut cx = Context::from_waker(&self.waker);
if let Poll::Ready(resp) = self.receive_application.poll_request(&mut cx, |tx| {
let _ = self
.tx
Expand All @@ -459,7 +478,7 @@ impl<S: CryptoSuite> tls::Context<S> for RemoteContext<S> {
}

fn can_send_initial(&mut self) -> bool {
let mut cx = core::task::Context::from_waker(&self.waker);
let mut cx = Context::from_waker(&self.waker);
if let Poll::Ready(resp) = self.can_send_initial.poll_request(&mut cx, |tx| {
let _ = self.tx.unbounded_send(Request::CanSendInitial(tx));
}) {
Expand All @@ -475,7 +494,7 @@ impl<S: CryptoSuite> tls::Context<S> for RemoteContext<S> {
}

fn can_send_handshake(&mut self) -> bool {
let mut cx = core::task::Context::from_waker(&self.waker);
let mut cx = Context::from_waker(&self.waker);
if let Poll::Ready(resp) = self.can_send_handshake.poll_request(&mut cx, |tx| {
let _ = self.tx.unbounded_send(Request::CanSendHandshake(tx));
}) {
Expand All @@ -491,7 +510,7 @@ impl<S: CryptoSuite> tls::Context<S> for RemoteContext<S> {
}

fn can_send_application(&mut self) -> bool {
let mut cx = core::task::Context::from_waker(&self.waker);
let mut cx = Context::from_waker(&self.waker);
if let Poll::Ready(resp) = self.can_send_application.poll_request(&mut cx, |tx| {
let _ = self.tx.unbounded_send(Request::CanSendApplication(tx));
}) {
Expand Down
1 change: 1 addition & 0 deletions quic/s2n-quic/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::{
mod recorder;

mod connection_limits;
mod offload_tls;
mod resumption;
mod setup;
mod slow_tls;
Expand Down
43 changes: 43 additions & 0 deletions quic/s2n-quic/src/tests/offload_tls.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

#[test]
#[cfg(feature = "unstable-offload-tls")]
fn offload_tls() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks pretty straightforward to use, but might still be worth an example in the examples directory. And that can contain some documentation on the feature in a readme

use super::*;
use crate::provider::tls::{default, offload::Offload};
use s2n_quic_core::crypto::tls::testing::certificates::{CERT_PEM, KEY_PEM};

let model = Model::default();

let server_endpoint = default::Server::builder()
.with_certificate(CERT_PEM, KEY_PEM)
.unwrap()
.build()
.unwrap();
let client_endpoint = default::Client::builder()
.with_certificate(CERT_PEM)
.unwrap()
.build()
.unwrap();
let server_endpoint = Offload(server_endpoint);
let client_endpoint = Offload(client_endpoint);
test(model, |handle| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised this is working with bach. I won't like interacting with an actual async operation (interacting with a thread). My guess is it's a race condition of things actually working or not, at the very least non-deterministic.

let server = Server::builder()
.with_io(handle.builder().build()?)?
.with_event(tracing_events())?
.with_tls(server_endpoint)?
.start()?;

let client = Client::builder()
.with_io(handle.builder().build()?)?
.with_tls(client_endpoint)?
.with_event(tracing_events())?
.start()?;
let addr = start_server(server)?;
start_client(client, addr, Data::new(1000))?;

Ok(addr)
})
.unwrap();
}
Loading