Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
864 changes: 549 additions & 315 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ edition = "2024"
rust-version = "1.85"
license = "MIT"
description = "A flexible and lightweight messaging library for distributed systems"
authors = ["Jonas Bostoen", "Nicolas Racchi"]
authors = ["Chainbound Developers <dev@chainbound.io>"]
homepage = "https://github.com/chainbound/msg-rs"
repository = "https://github.com/chainbound/msg-rs"
keywords = [
Expand Down Expand Up @@ -57,9 +57,9 @@ rustc-hash = "1"
rand = "0.8"

# networking
quinn = "0.10"
quinn = "0.11.9"
# (rustls needs to be the same version as the one used by quinn)
rustls = { version = "0.21", features = ["quic", "dangerous_configuration"] }
rustls = { version = "0.23", default-features = false }
rcgen = "0.12"

# benchmarking & profiling
Expand Down
2 changes: 1 addition & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ fmt:
cargo +nightly fmt --all -- --check

test:
cargo nextest run --workspace --retries 3
cargo nextest run --workspace --all-features --retries 3
1 change: 1 addition & 0 deletions msg-socket/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ parking_lot.workspace = true

[dev-dependencies]
rand.workspace = true
msg-transport = { workspace = true, features = ["quic"] }

msg-sim.workspace = true

Expand Down
18 changes: 14 additions & 4 deletions msg-transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,21 @@ async-trait.workspace = true
futures.workspace = true
tokio.workspace = true
tracing.workspace = true
thiserror.workspace = true

quinn.workspace = true
rustls.workspace = true
rcgen.workspace = true
thiserror = { workspace = true, optional = true }

# QUIC
quinn = { workspace = true, optional = true }
rustls = { workspace = true, optional = true, features = [
"std",
"aws_lc_rs",
"tls12",
] }
rcgen = { workspace = true, optional = true }

[dev-dependencies]
tracing-subscriber = "0.3"

[features]
default = []
quic = ["dep:quinn", "dep:rustls", "dep:rcgen", "dep:thiserror"]
1 change: 1 addition & 0 deletions msg-transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use futures::{Future, FutureExt};
use tokio::io::{AsyncRead, AsyncWrite};

pub mod ipc;
#[cfg(feature = "quic")]
pub mod quic;
pub mod tcp;

Expand Down
17 changes: 6 additions & 11 deletions msg-transport/src/quic/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use std::{sync::Arc, time::Duration};

use quinn::{IdleTimeout, congestion::ControllerFactory};

use super::tls::{self_signed_certificate, unsafe_client_config};
use crate::quic::tls::tls_server_config;

use super::tls::unsafe_client_config;

use msg_common::constants::MiB;

Expand Down Expand Up @@ -96,17 +98,13 @@ where
.min_mtu(self.initial_mtu)
.allow_spin(false)
.stream_receive_window((8 * stream_rwnd).into())
.congestion_controller_factory(self.cc)
.congestion_controller_factory(Arc::new(self.cc))
.initial_rtt(Duration::from_millis(self.expected_rtt.into()))
.send_window((8 * stream_rwnd).into());

let transport = Arc::new(transport);
let (cert, key) = self_signed_certificate();

let mut server_config =
quinn::ServerConfig::with_single_cert(cert, key).expect("Valid rustls config");

server_config.use_retry(true);
let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(tls_server_config()));
server_config.transport_config(Arc::clone(&transport));

let mut client_config = quinn::ClientConfig::new(Arc::new(unsafe_client_config()));
Expand Down Expand Up @@ -154,12 +152,9 @@ impl Default for Config {
.send_window((8 * STREAM_RWND).into());

let transport = Arc::new(transport);
let (cert, key) = self_signed_certificate();

let mut server_config =
quinn::ServerConfig::with_single_cert(cert, key).expect("Valid rustls config");
let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(tls_server_config()));

server_config.use_retry(true);
server_config.transport_config(Arc::clone(&transport));

let mut client_config = quinn::ClientConfig::new(Arc::new(unsafe_client_config()));
Expand Down
4 changes: 3 additions & 1 deletion msg-transport/src/quic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use stream::QuicStream;

mod tls;

pub(crate) const ALPN_PROTOCOL: &[u8] = b"msg";

/// A QUIC error.
#[derive(Debug, Error)]
pub enum Error {
Expand Down Expand Up @@ -52,7 +54,7 @@ pub struct Quic {
endpoint: Option<quinn::Endpoint>,

/// A receiver for incoming connections waiting to be handled.
incoming: Option<Receiver<Result<quinn::Connecting, Error>>>,
incoming: Option<Receiver<Result<quinn::Incoming, Error>>>,
}

impl Quic {
Expand Down
95 changes: 79 additions & 16 deletions msg-transport/src/quic/tls.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,104 @@
use std::sync::Arc;

use rustls::client::{ServerCertVerified, ServerCertVerifier};
use quinn::crypto::rustls::{QuicClientConfig, QuicServerConfig};
use rustls::{
SignatureScheme,
client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier},
pki_types::{CertificateDer, PrivateKeyDer},
};

use crate::quic::ALPN_PROTOCOL;

/// A server certificate verifier that automatically passes all checks.
#[derive(Debug)]
pub(crate) struct SkipServerVerification;
pub(crate) struct SkipServerVerification(Arc<rustls::crypto::CryptoProvider>);

impl SkipServerVerification {
fn new() -> Arc<Self> {
Arc::new(Self(Arc::new(rustls::crypto::aws_lc_rs::default_provider())))
}
}

impl ServerCertVerifier for SkipServerVerification {
fn verify_server_cert(
&self,
_end_entity: &rustls::Certificate,
_intermediates: &[rustls::Certificate],
_server_name: &rustls::ServerName,
_scts: &mut dyn Iterator<Item = &[u8]>,
_end_entity: &rustls::pki_types::CertificateDer<'_>,
_intermediates: &[rustls::pki_types::CertificateDer<'_>],
_server_name: &rustls::pki_types::ServerName<'_>,
_ocsp_response: &[u8],
_now: std::time::SystemTime,
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
_now: rustls::pki_types::UnixTime,
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
Ok(ServerCertVerified::assertion())
}

fn verify_tls12_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(HandshakeSignatureValid::assertion())
}

fn verify_tls13_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(HandshakeSignatureValid::assertion())
}

fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
self.0.signature_verification_algorithms.supported_schemes()
}
}

/// Returns a TLS configuration that skips all server verification and doesn't do any client
/// authentication.
pub(crate) fn unsafe_client_config() -> rustls::ClientConfig {
rustls::ClientConfig::builder()
.with_safe_defaults()
.with_custom_certificate_verifier(Arc::new(SkipServerVerification))
/// authentication, with the correct ALPN protocol.
pub(crate) fn unsafe_client_config() -> QuicClientConfig {
let provider = Arc::new(rustls::crypto::aws_lc_rs::default_provider());

let mut rustls_config = rustls::ClientConfig::builder_with_provider(provider)
.with_protocol_versions(&[&rustls::version::TLS13])
.expect("aws_lc_rs provider supports TLS 1.3")
.dangerous()
.with_custom_certificate_verifier(SkipServerVerification::new())
.with_no_client_auth();

rustls_config.alpn_protocols = vec![ALPN_PROTOCOL.to_vec()];
rustls_config.enable_early_data = true;

rustls_config.try_into().expect("Valid rustls config")
}

/// Returns a self-signed TLS server configuration that doesn't do any client authentication, with
/// the correct ALPN protocol.
pub(crate) fn tls_server_config() -> QuicServerConfig {
let (cert_chain, key_der) = self_signed_certificate();
let provider = Arc::new(rustls::crypto::aws_lc_rs::default_provider());

let mut rustls_config = rustls::ServerConfig::builder_with_provider(provider)
.with_protocol_versions(&[&rustls::version::TLS13])
.expect("aws_lc_rs provider supports TLS 1.3")
.with_no_client_auth()
.with_single_cert(cert_chain, key_der)
.expect("Valid rustls config");

rustls_config.alpn_protocols = vec![ALPN_PROTOCOL.to_vec()];
rustls_config.max_early_data_size = u32::MAX;

rustls_config.try_into().expect("Valid rustls config")
}

/// Generates a self-signed certificate chain and private key.
pub(crate) fn self_signed_certificate() -> (Vec<rustls::Certificate>, rustls::PrivateKey) {
pub(crate) fn self_signed_certificate() -> (Vec<CertificateDer<'static>>, PrivateKeyDer<'static>) {
let cert = rcgen::generate_simple_self_signed(vec![]).expect("Generates valid certificate");
let cert_der = cert.serialize_der().expect("Serializes certificate");
let priv_key = rustls::PrivateKey(cert.serialize_private_key_der());
let priv_key =
PrivateKeyDer::try_from(cert.serialize_private_key_der()).expect("Serializes private key");

(vec![rustls::Certificate(cert_der)], priv_key)
(vec![CertificateDer::from(cert_der)], priv_key)
}

#[cfg(test)]
Expand Down
5 changes: 5 additions & 0 deletions msg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ rand.workspace = true
criterion.workspace = true
pprof.workspace = true

[features]
default = []
quic = ["msg-transport/quic"]


# Add jemalloc for extra perf on Linux systems.
[target.'cfg(all(not(windows), not(target_env = "musl")))'.dependencies]
jemallocator = { version = "0.5.0", features = ["profiling"] }
Expand Down
Loading