Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 60 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,28 @@ keywords = ["pulsar", "api", "client"]
[dependencies]
async-channel = "^2.3.1"
async-trait = "^0.1.88"
async-std = { version = "^1.13.1", features = ["attributes", "unstable"], optional = true }
async-std = { version = "^1.13.1", features = [
"attributes",
"unstable",
], optional = true }
async-native-tls = { version = "^0.5.0", optional = true }
asynchronous-codec = { version = "^0.7.0", optional = true }
bytes = "^1.9.0"
chrono = { version = "^0.4.41", default-features = false, features = ["clock", "std"] }
chrono = { version = "^0.4.41", default-features = false, features = [
"clock",
"std",
] }
crc = "^3.3.0"
data-url = { version = "^0.3.1", optional = true }
flate2 = { version = "^1.1.1", optional = true }
futures = "^0.3.31"
futures-rustls = { version = "^0.26.0", default-features = false, features = ["tls12", "logging"], optional = true } # replacement of crate async-rustls (also a fork of tokio-rustls)
futures-rustls = { version = "^0.26.0", default-features = false, features = [
"tls12",
"logging",
], optional = true } # replacement of crate async-rustls (also a fork of tokio-rustls)
log = "^0.4.27"
lz4 = { version = "^1.28.0", optional = true }
murmur3 = "0.5"
native-tls = { version = "^0.2.12", optional = true }
nom = { version = "^7.1.3", default-features = false, features = ["alloc"] }
openidconnect = { version = "^4.0.0", optional = true }
Expand All @@ -39,13 +49,23 @@ prost = "^0.13.4"
prost-derive = "^0.13.4"
rand = "^0.8.5"
regex = "^1.11.1"
rustls = { version = "^0.23.27", default-features = false, features = ["log", "std"] , optional = true }
rustls = { version = "^0.23.27", default-features = false, features = [
"log",
"std",
], optional = true }
snap = { version = "^1.1.1", optional = true }
serde = { version = "^1.0.219", features = ["derive"], optional = true }
serde_json = { version = "^1.0.140", optional = true }
tokio = { version = "^1.45.0", features = ["rt", "net", "time"], optional = true }
tokio = { version = "^1.45.0", features = [
"rt",
"net",
"time",
], optional = true }
tokio-util = { version = "^0.7.15", features = ["codec"], optional = true }
tokio-rustls = { version = "0.26.2", default-features = false, features = ["logging", "tls12"], optional = true }
tokio-rustls = { version = "0.26.2", default-features = false, features = [
"logging",
"tls12",
], optional = true }
tokio-native-tls = { version = "^0.3.1", optional = true }
tracing = { version = "^0.1.41", optional = true }
url = "^2.5.4"
Expand All @@ -66,16 +86,45 @@ prost-build = "^0.13.4"
protobuf-src = { version = "^2.1.0", optional = true }

[features]
async-std-runtime = ["async-std", "asynchronous-codec", "native-tls", "async-native-tls"]
async-std-runtime = [
"async-std",
"asynchronous-codec",
"native-tls",
"async-native-tls",
]
async-std-rustls-runtime = ["async-std-rustls-runtime-aws-lc-rs"]
async-std-rustls-runtime-aws-lc-rs = ["async-std", "asynchronous-codec", "webpki-roots", "rustls/aws-lc-rs", "futures-rustls/aws-lc-rs"]
async-std-rustls-runtime-ring = ["async-std", "asynchronous-codec", "webpki-roots", "rustls/ring", "futures-rustls/ring"]
async-std-rustls-runtime-aws-lc-rs = [
"async-std",
"asynchronous-codec",
"webpki-roots",
"rustls/aws-lc-rs",
"futures-rustls/aws-lc-rs",
]
async-std-rustls-runtime-ring = [
"async-std",
"asynchronous-codec",
"webpki-roots",
"rustls/ring",
"futures-rustls/ring",
]
auth-oauth2 = ["openidconnect", "oauth2", "serde", "serde_json", "data-url"]
compression = ["lz4", "flate2", "zstd", "snap"]
default = ["compression", "tokio-runtime", "async-std-runtime", "auth-oauth2"]
protobuf-src = ["dep:protobuf-src"]
telemetry = ["tracing"]
tokio-runtime = ["tokio", "tokio-util", "native-tls", "tokio-native-tls"]
tokio-rustls-runtime = ["tokio-rustls-runtime-aws-lc-rs"]
tokio-rustls-runtime-aws-lc-rs = ["tokio", "tokio-util", "webpki-roots", "rustls/aws-lc-rs", "tokio-rustls/aws-lc-rs"]
tokio-rustls-runtime-ring = ["tokio", "tokio-util", "webpki-roots", "rustls/ring", "tokio-rustls/ring"]
tokio-rustls-runtime-aws-lc-rs = [
"tokio",
"tokio-util",
"webpki-roots",
"rustls/aws-lc-rs",
"tokio-rustls/aws-lc-rs",
]
tokio-rustls-runtime-ring = [
"tokio",
"tokio-util",
"webpki-roots",
"rustls/ring",
"tokio-rustls/ring",
]
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ pub mod message;
pub mod producer;
pub mod reader;
mod retry_op;
pub mod routing_policy;
mod service_discovery;

#[cfg(all(
Expand Down
72 changes: 45 additions & 27 deletions src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::{
},
proto::CommandSuccess,
retry_op::retry_create_producer,
routing_policy::RoutingPolicy,
BrokerAddress, Error, Pulsar,
};

Expand Down Expand Up @@ -161,6 +162,7 @@ pub struct ProducerOptions {
/// [`crate::client::PulsarBuilder::with_outbound_channel_size`] is full, when awaiting
/// [`Producer::send_non_blocking`]. (default: false)
pub block_queue_if_full: bool,
pub routing_policy: Option<RoutingPolicy>,
}

impl ProducerOptions {
Expand Down Expand Up @@ -418,9 +420,14 @@ impl<Exe: Executor> Producer<Exe> {
&mut self,
message: T,
) -> Result<SendFuture, Error> {
let serialized_message = T::serialize_message(message)?;
match &mut self.inner {
ProducerInner::Single(p) => p.send(message).await,
ProducerInner::Partitioned(p) => p.next().send(message).await,
ProducerInner::Single(p) => p.send(serialized_message).await,
ProducerInner::Partitioned(p) => {
p.choose_partition(&serialized_message)
.send(serialized_message)
.await
}
}
}

Expand Down Expand Up @@ -464,13 +471,15 @@ impl<Exe: Executor> Producer<Exe> {
T: SerializeMessage,
I: IntoIterator<Item = T>,
{
let producer = match &mut self.inner {
ProducerInner::Single(p) => p,
ProducerInner::Partitioned(p) => p.next(),
};
let mut sends = Vec::new();
for message in messages {
sends.push(producer.send(message).await);
let serialized_message = T::serialize_message(message)?;
let producer = match &mut self.inner {
ProducerInner::Single(p) => p,
ProducerInner::Partitioned(p) => p.choose_partition(&serialized_message),
};

sends.push(producer.send(serialized_message).await);
}
if sends.iter().all(|s| s.is_ok()) {
Ok(sends.into_iter().map(|s| s.unwrap()).collect())
Expand All @@ -492,14 +501,6 @@ impl<Exe: Executor> Producer<Exe> {
}
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub(crate) async fn send_raw(&mut self, message: ProducerMessage) -> Result<SendFuture, Error> {
match &mut self.inner {
ProducerInner::Single(p) => p.send_raw(message).await,
ProducerInner::Partitioned(p) => p.next().send_raw(message).await,
}
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn close(&mut self) -> Result<(), Error> {
match &mut self.inner {
Expand Down Expand Up @@ -527,9 +528,31 @@ struct PartitionedProducer<Exe: Executor> {

impl<Exe: Executor> PartitionedProducer<Exe> {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn next(&mut self) -> &mut TopicProducer<Exe> {
self.producers.rotate_left(1);
self.producers.front_mut().unwrap()
pub fn choose_partition(&mut self, message: &Message) -> &mut TopicProducer<Exe> {
match &self.options.routing_policy {
Some(RoutingPolicy::RoundRobin) => {
// If the message has a partition key, use it
if let Some(partition_key) = &message.partition_key {
let index = RoutingPolicy::compute_partition_index_for_key(
partition_key,
self.producers.len(),
);
return self.producers.get_mut(index).unwrap();
}
// If not, use round robin
self.producers.rotate_left(1);
self.producers.front_mut().unwrap()
}
Some(RoutingPolicy::Single(index)) => self.producers.get_mut(*index).unwrap(),
Some(RoutingPolicy::Custom(policy)) => self
.producers
.get_mut(policy.route(message, self.producers.len()))
.unwrap(),
None => {
self.producers.rotate_left(1);
self.producers.front_mut().unwrap()
}
}
}
}

Expand Down Expand Up @@ -659,11 +682,8 @@ impl<Exe: Executor> TopicProducer<Exe> {
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
async fn send<T: SerializeMessage + Sized>(&mut self, message: T) -> Result<SendFuture, Error> {
match T::serialize_message(message) {
Ok(message) => self.send_raw(message.into()).await,
Err(e) => Err(e),
}
async fn send(&mut self, message: Message) -> Result<SendFuture, Error> {
self.send_raw(message.into()).await
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
Expand Down Expand Up @@ -1390,10 +1410,8 @@ impl<T: SerializeMessage + Sized, Exe: Executor> MessageBuilder<'_, T, Exe> {
message.partition_key = partition_key;
message.ordering_key = ordering_key;
message.event_time = event_time;

let mut producer_message: ProducerMessage = message.into();
producer_message.deliver_at_time = deliver_at_time;
producer.send_raw(producer_message).await
message.deliver_at_time = deliver_at_time;
producer.send_non_blocking(message).await
}
}

Expand Down
79 changes: 79 additions & 0 deletions src/routing_policy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use std::sync::Arc;

use murmur3::murmur3_32;

use crate::producer::Message;

#[derive(Clone, Default)]
pub enum RoutingPolicy {
#[default]
RoundRobin,
Single(usize),
Custom(Arc<dyn CustomRoutingPolicy>),
}

impl RoutingPolicy {
pub fn compute_partition_index_for_key(key: &str, partition_count: usize) -> usize {
// Use murmur3 hash for deterministic results across restarts
// Using a fixed seed (0) ensures consistent hashing
let hash = murmur3_32(&mut key.as_bytes(), 0).unwrap_or(0);
(hash % partition_count as u32) as usize
}
}

pub trait CustomRoutingPolicy: Send + Sync {
fn route(&self, message: &Message, num_producers: usize) -> usize;
}

#[cfg(test)]
mod tests {
use uuid::Uuid;

use crate::routing_policy::RoutingPolicy;

#[test]
fn test_compute_partition_index_consistency() {
let partition_count = 4;
let key = Uuid::new_v4().to_string();

let partition_index = RoutingPolicy::compute_partition_index_for_key(&key, partition_count);
assert!(partition_index < partition_count);

for _ in 0..10 {
let other_partition_index =
RoutingPolicy::compute_partition_index_for_key(&key, partition_count);
assert!(other_partition_index < partition_count);
assert_eq!(
partition_index, other_partition_index,
"partition index should be deterministic for the same key"
);
}
}

#[test]
fn test_compute_partition_index_distribution() {
let partition_count = 4;
let mut partition_counts = vec![0; partition_count];

let total = 1000;
for _ in 0..total {
let partition_index = RoutingPolicy::compute_partition_index_for_key(
&Uuid::new_v4().to_string(),
partition_count,
);
partition_counts[partition_index] += 1;
}

for count in partition_counts {
let ratio = count as f64 / total as f64;
let expected_ratio = 1.0 / partition_count as f64;

assert!(
ratio > (expected_ratio - 0.1) && ratio < (expected_ratio + 0.1),
"distribution ratio {} is not near expected ratio {}",
ratio,
expected_ratio
);
}
}
}