Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ crc = "^3.3.0"
data-url = { version = "^0.3.1", optional = true }
flate2 = { version = "^1.1.1", optional = true }
futures = "^0.3.31"
futures-rustls = { version = "^0.26.0", optional = true } # replacement of crate async-rustls (also a fork of tokio-rustls)
futures-rustls = { version = "^0.26.0", default-features = false, features = ["tls12", "logging"], optional = true } # replacement of crate async-rustls (also a fork of tokio-rustls)
log = "^0.4.27"
lz4 = { version = "^1.28.0", optional = true }
native-tls = { version = "^0.2.12", optional = true }
Expand All @@ -39,13 +39,13 @@ prost = "^0.13.4"
prost-derive = "^0.13.4"
rand = "^0.8.5"
regex = "^1.11.1"
rustls = { version = "^0.23.27", optional = true }
rustls = { version = "^0.23.27", default-features = false, features = ["log", "std"] , optional = true }
snap = { version = "^1.1.1", optional = true }
serde = { version = "^1.0.219", features = ["derive"], optional = true }
serde_json = { version = "^1.0.140", optional = true }
tokio = { version = "^1.45.0", features = ["rt", "net", "time"], optional = true }
tokio-util = { version = "^0.7.15", features = ["codec"], optional = true }
tokio-rustls = { version = "0.26.2", optional = true }
tokio-rustls = { version = "0.26.2", default-features = false, features = ["logging", "tls12"], optional = true }
tokio-native-tls = { version = "^0.3.1", optional = true }
tracing = { version = "^0.1.41", optional = true }
url = "^2.5.4"
Expand All @@ -67,11 +67,15 @@ protobuf-src = { version = "^2.1.0", optional = true }

[features]
async-std-runtime = ["async-std", "asynchronous-codec", "native-tls", "async-native-tls"]
async-std-rustls-runtime = ["async-std", "asynchronous-codec", "futures-rustls", "rustls", "webpki-roots"]
async-std-rustls-runtime = ["async-std-rustls-runtime-aws-lc-rs"]
async-std-rustls-runtime-aws-lc-rs = ["async-std", "asynchronous-codec", "webpki-roots", "rustls/aws-lc-rs", "futures-rustls/aws-lc-rs"]
async-std-rustls-runtime-ring = ["async-std", "asynchronous-codec", "webpki-roots", "rustls/ring", "futures-rustls/ring"]
auth-oauth2 = ["openidconnect", "oauth2", "serde", "serde_json", "data-url"]
compression = ["lz4", "flate2", "zstd", "snap"]
default = ["compression", "tokio-runtime", "async-std-runtime", "auth-oauth2"]
protobuf-src = ["dep:protobuf-src"]
telemetry = ["tracing"]
tokio-runtime = ["tokio", "tokio-util", "native-tls", "tokio-native-tls"]
tokio-rustls-runtime = ["tokio", "tokio-util", "tokio-rustls", "rustls", "webpki-roots"]
tokio-rustls-runtime = ["tokio-rustls-runtime-aws-lc-rs"]
tokio-rustls-runtime-aws-lc-rs = ["tokio", "tokio-util", "webpki-roots", "rustls/aws-lc-rs", "tokio-rustls/aws-lc-rs"]
tokio-rustls-runtime-ring = ["tokio", "tokio-util", "webpki-roots", "rustls/ring", "tokio-rustls/ring"]
40 changes: 33 additions & 7 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,13 @@ impl<Exe: Executor> Connection<Exe> {
.await
}
}
#[cfg(all(feature = "tokio-rustls-runtime", not(feature = "tokio-runtime")))]
#[cfg(all(
any(
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
),
not(feature = "tokio-runtime")
))]
ExecutorKind::Tokio => {
if tls {
let stream = tokio::net::TcpStream::connect(&address).await?;
Expand Down Expand Up @@ -1061,7 +1067,11 @@ impl<Exe: Executor> Connection<Exe> {
.await
}
}
#[cfg(all(not(feature = "tokio-runtime"), not(feature = "tokio-rustls-runtime")))]
#[cfg(all(
not(feature = "tokio-runtime"),
not(feature = "tokio-rustls-runtime-aws-lc-rs"),
not(feature = "tokio-rustls-runtime-ring")
))]
ExecutorKind::Tokio => {
unimplemented!("the tokio-runtime cargo feature is not active");
}
Expand Down Expand Up @@ -1110,7 +1120,10 @@ impl<Exe: Executor> Connection<Exe> {
}
}
#[cfg(all(
feature = "async-std-rustls-runtime",
any(
feature = "async-std-rustls-runtime-aws-lc-rs",
feature = "async-std-rustls-runtime-ring"
),
not(feature = "async-std-runtime")
))]
#[allow(deprecated)]
Expand Down Expand Up @@ -1166,7 +1179,8 @@ impl<Exe: Executor> Connection<Exe> {
}
#[cfg(all(
not(feature = "async-std-runtime"),
not(feature = "async-std-rustls-runtime")
not(feature = "async-std-rustls-runtime-aws-lc-rs"),
not(feature = "async-std-rustls-runtime-ring")
))]
ExecutorKind::AsyncStd => {
unimplemented!("the async-std-runtime cargo feature is not active");
Expand Down Expand Up @@ -1811,7 +1825,11 @@ mod tests {
use uuid::Uuid;

use super::{Connection, Receiver};
#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
use crate::TokioExecutor;
use crate::{
authentication::Authentication,
Expand All @@ -1821,7 +1839,11 @@ mod tests {
};

#[tokio::test]
#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
async fn receiver_auth_challenge_test() {
let (message_tx, message_rx) = mpsc::unbounded();
let (tx, _) = async_channel::bounded(10);
Expand Down Expand Up @@ -1879,7 +1901,11 @@ mod tests {
}

#[tokio::test]
#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
async fn connection_auth_challenge_test() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();

Expand Down
6 changes: 4 additions & 2 deletions src/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,10 @@ impl<Exe: Executor> ConnectionManager<Exe> {

#[cfg(all(
any(
feature = "tokio-rustls-runtime",
feature = "async-std-rustls-runtime"
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring",
feature = "async-std-rustls-runtime-aws-lc-rs",
feature = "async-std-rustls-runtime-ring"
),
not(any(feature = "tokio-runtime", feature = "async-std-runtime"))
))]
Expand Down
54 changes: 45 additions & 9 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,11 +437,19 @@ mod tests {
};
use log::LevelFilter;
use regex::Regex;
#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
use tokio::time::timeout;

use super::*;
#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
use crate::executor::TokioExecutor;
use crate::{
consumer::initial_position::InitialPosition, producer, proto, tests::TEST_LOGGER,
Expand Down Expand Up @@ -493,7 +501,11 @@ mod tests {
tag: "multi_consumer",
};
#[tokio::test]
#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
async fn multi_consumer() {
let _result = log::set_logger(&MULTI_LOGGER);
log::set_max_level(LevelFilter::Debug);
Expand Down Expand Up @@ -584,7 +596,11 @@ mod tests {
}

#[tokio::test]
#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
async fn consumer_dropped_with_lingering_acks() {
use rand::{distributions::Alphanumeric, Rng};
let _result = log::set_logger(&TEST_LOGGER);
Expand Down Expand Up @@ -681,7 +697,11 @@ mod tests {
}

#[tokio::test]
#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
async fn dead_letter_queue() {
let _result = log::set_logger(&TEST_LOGGER);
log::set_max_level(LevelFilter::Debug);
Expand Down Expand Up @@ -761,7 +781,11 @@ mod tests {
}

#[tokio::test]
#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
async fn dead_letter_queue_batched() {
use crate::ProducerOptions;

Expand Down Expand Up @@ -860,7 +884,11 @@ mod tests {
}

#[tokio::test]
#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
async fn failover() {
let _result = log::set_logger(&MULTI_LOGGER);
log::set_max_level(LevelFilter::Debug);
Expand Down Expand Up @@ -920,7 +948,11 @@ mod tests {
}

#[tokio::test]
#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
async fn seek_single_consumer() {
let _result = log::set_logger(&MULTI_LOGGER);
log::set_max_level(LevelFilter::Debug);
Expand Down Expand Up @@ -1031,7 +1063,11 @@ mod tests {
}

#[tokio::test]
#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
async fn schema_test() {
#[derive(Serialize, Deserialize)]
struct TestData {
Expand Down
35 changes: 30 additions & 5 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,21 @@ pub enum ConnectionError {
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
Tls(native_tls::Error),
#[cfg(all(
any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"),
any(
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring",
feature = "async-std-rustls-runtime-aws-lc-rs",
feature = "async-std-rustls-runtime-ring",
),
not(any(feature = "tokio-runtime", feature = "async-std-runtime"))
))]
Tls(rustls::Error),
#[cfg(any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"))]
#[cfg(any(
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring",
feature = "async-std-rustls-runtime-aws-lc-rs",
feature = "async-std-rustls-runtime-ring",
))]
DnsName(rustls::pki_types::InvalidDnsNameError),
Authentication(AuthenticationError),
NotFound,
Expand Down Expand Up @@ -131,7 +141,12 @@ impl From<native_tls::Error> for ConnectionError {
}

#[cfg(all(
any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"),
any(
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring",
feature = "async-std-rustls-runtime-aws-lc-rs",
feature = "async-std-rustls-runtime-ring",
),
not(any(feature = "tokio-runtime", feature = "async-std-runtime"))
))]
impl From<rustls::Error> for ConnectionError {
Expand All @@ -141,7 +156,12 @@ impl From<rustls::Error> for ConnectionError {
}
}

#[cfg(any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"))]
#[cfg(any(
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring",
feature = "async-std-rustls-runtime-aws-lc-rs",
feature = "async-std-rustls-runtime-ring",
))]
impl From<rustls::pki_types::InvalidDnsNameError> for ConnectionError {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
fn from(err: rustls::pki_types::InvalidDnsNameError) -> Self {
Expand Down Expand Up @@ -188,7 +208,12 @@ impl fmt::Display for ConnectionError {
ConnectionError::Encoding(e) => write!(f, "Error encoding message: {e}"),
ConnectionError::SocketAddr(e) => write!(f, "Error obtaining socket address: {e}"),
ConnectionError::Tls(e) => write!(f, "Error connecting TLS stream: {e}"),
#[cfg(any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"))]
#[cfg(any(
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring",
feature = "async-std-rustls-runtime-aws-lc-rs",
feature = "async-std-rustls-runtime-ring",
))]
ConnectionError::DnsName(e) => write!(f, "Error resolving hostname: {e}"),
ConnectionError::Authentication(e) => write!(f, "Error authentication: {e}"),
ConnectionError::UnexpectedResponse(e) => {
Expand Down
Loading
Loading