Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions rocketmq-client/benches/select_queue_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use criterion::criterion_group;
use criterion::criterion_main;
use criterion::Criterion;
use rocketmq_client_rust::producer::message_queue_selector::MessageQueueSelector;
use rocketmq_client_rust::producer::queue_selector::SelectMessageQueueByHash;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::message::message_single::Message;
use std::cell::RefCell;
use std::collections::hash_map::DefaultHasher;
use std::hash::Hash;
use std::hash::Hasher;
use std::hint::black_box;

Comment on lines +1 to +13
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add the copyright header.

This file is missing the required copyright header. Per repository convention, all Rust source files must include the "Copyright 2023" header.

📝 Proposed fix
+// Copyright 2023 The RocketMQ Rust Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 use criterion::criterion_group;

Based on learnings: "In Rust source files (*.rs) across the rocketmq-rust repository, enforce using 'Copyright 2023' as the year in the header."

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
use criterion::criterion_group;
use criterion::criterion_main;
use criterion::Criterion;
use rocketmq_client_rust::producer::message_queue_selector::MessageQueueSelector;
use rocketmq_client_rust::producer::queue_selector::SelectMessageQueueByHash;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::message::message_single::Message;
use std::cell::RefCell;
use std::collections::hash_map::DefaultHasher;
use std::hash::Hash;
use std::hash::Hasher;
use std::hint::black_box;
// Copyright 2023 The RocketMQ Rust Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use criterion::criterion_group;
use criterion::criterion_main;
use criterion::Criterion;
use rocketmq_client_rust::producer::message_queue_selector::MessageQueueSelector;
use rocketmq_client_rust::producer::queue_selector::SelectMessageQueueByHash;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::message::message_single::Message;
use std::cell::RefCell;
use std::collections::hash_map::DefaultHasher;
use std::hash::Hash;
use std::hash::Hasher;
use std::hint::black_box;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rocketmq-client/benches/select_queue_benchmark.rs` around lines 1 - 13, The
source file is missing the required copyright header; add the
repository-standard header containing "Copyright 2023" at the very top of the
Rust file (above the first use statements such as use criterion::criterion_group
and use
rocketmq_client_rust::producer::message_queue_selector::MessageQueueSelector) so
every .rs file includes the mandated license header.

thread_local! {
static HASHER: RefCell<DefaultHasher> = RefCell::new(DefaultHasher::new());
}

/// Queue selector using thread-local hasher for performance comparison.
struct SelectMessageQueueByHashCached;

impl<M, A> MessageQueueSelector<M, A> for SelectMessageQueueByHashCached
where
M: rocketmq_common::common::message::MessageTrait,
A: Hash,
{
#[inline]
fn select(&self, mqs: &[MessageQueue], _msg: &M, arg: &A) -> Option<MessageQueue> {
if mqs.is_empty() {
return None;
}

let hash_code = HASHER.with(|hasher| {
let mut h = hasher.borrow_mut();
arg.hash(&mut *h);
let code = h.finish();
*h = DefaultHasher::new();
code
});

let index = (hash_code % mqs.len() as u64) as usize;
mqs.get(index).cloned()
}
}

fn bench_hasher_creation(c: &mut Criterion) {
c.bench_function("hasher_new", |b| {
b.iter(|| {
let hasher = black_box(DefaultHasher::new());
black_box(hasher);
})
});
}

fn bench_select_original(c: &mut Criterion) {
let selector = SelectMessageQueueByHash;
let queues = vec![
MessageQueue::from_parts("test_topic", "broker-a", 0),
MessageQueue::from_parts("test_topic", "broker-a", 1),
MessageQueue::from_parts("test_topic", "broker-a", 2),
MessageQueue::from_parts("test_topic", "broker-a", 3),
];
let msg = Message::builder().build().unwrap();

c.bench_function("select_original", |b| {
b.iter(|| {
let order_id = black_box(12345);
black_box(selector.select(&queues, &msg, &order_id))
})
});
}

fn bench_select_cached(c: &mut Criterion) {
let selector = SelectMessageQueueByHashCached;
let queues = vec![
MessageQueue::from_parts("test_topic", "broker-a", 0),
MessageQueue::from_parts("test_topic", "broker-a", 1),
MessageQueue::from_parts("test_topic", "broker-a", 2),
MessageQueue::from_parts("test_topic", "broker-a", 3),
];
let msg = Message::builder().build().unwrap();

c.bench_function("select_cached", |b| {
b.iter(|| {
let order_id = black_box(12345);
black_box(selector.select(&queues, &msg, &order_id))
})
});
}

criterion_group!(
benches,
bench_hasher_creation,
bench_select_original,
bench_select_cached
);
criterion_main!(benches);
1 change: 1 addition & 0 deletions rocketmq-client/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod mq_producer;
pub mod produce_accumulator;
pub mod producer_config_validation;
pub mod producer_impl;
pub mod queue_selector;
pub mod request_callback;
pub(crate) mod request_future_holder;
pub(crate) mod request_response_future;
Expand Down
17 changes: 17 additions & 0 deletions rocketmq-client/src/producer/queue_selector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2023 The RocketMQ Rust Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod select_message_queue_by_hash;

pub use select_message_queue_by_hash::SelectMessageQueueByHash;
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright 2023 The RocketMQ Rust Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::hash::Hash;
use std::hash::Hasher;

use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::message::MessageTrait;

use crate::producer::message_queue_selector::MessageQueueSelector;

/// A message queue selector that uses hash-based routing.
///
/// Routes messages to queues by computing a hash of the provided argument and applying
/// modulo against the available queue count. Messages with identical argument values
/// are consistently routed to the same queue, preserving ordering semantics.
///
/// # Performance
///
/// Selection operates in O(1) time with no heap allocations. The `select` method is
/// inlined to eliminate function call overhead.
///
/// # Examples
///
/// ```rust,ignore
/// use rocketmq_client_rust::producer::queue_selector::SelectMessageQueueByHash;
/// use rocketmq_client_rust::producer::message_queue_selector::MessageQueueSelector;
///
/// let selector = SelectMessageQueueByHash;
/// let order_id = 12345;
/// let queue = selector.select(&message_queues, &message, &order_id);
/// ```
#[derive(Debug, Clone, Copy, Default)]
pub struct SelectMessageQueueByHash;

impl SelectMessageQueueByHash {
/// Returns a new instance.
pub fn new() -> Self {
Self
}
}

impl<M, A> MessageQueueSelector<M, A> for SelectMessageQueueByHash
where
M: MessageTrait,
A: Hash,
{
/// Selects a message queue by hashing the argument.
///
/// Returns `None` if the queue list is empty.
#[inline]
fn select(&self, mqs: &[MessageQueue], _msg: &M, arg: &A) -> Option<MessageQueue> {
if mqs.is_empty() {
return None;
}

let mut hasher = std::collections::hash_map::DefaultHasher::new();
arg.hash(&mut hasher);
let hash_code = hasher.finish();

let index = (hash_code % mqs.len() as u64) as usize;
mqs.get(index).cloned()
}
}

#[cfg(test)]
mod tests {
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::message::message_single::Message;

use super::*;

#[test]
fn test_select_message_queue_by_hash() {
let selector = SelectMessageQueueByHash::new();

let queues = vec![
MessageQueue::from_parts("test_topic", "broker-a", 0),
MessageQueue::from_parts("test_topic", "broker-a", 1),
MessageQueue::from_parts("test_topic", "broker-a", 2),
MessageQueue::from_parts("test_topic", "broker-a", 3),
];

let msg = Message::builder().topic("test_topic").build().unwrap();

// Test with integer argument
let order_id = 12345;
let selected = selector.select(&queues, &msg, &order_id);
assert!(selected.is_some());

// Same argument should select same queue
let selected1 = selector.select(&queues, &msg, &order_id);
let selected2 = selector.select(&queues, &msg, &order_id);
assert_eq!(selected1, selected2);

// Different arguments should distribute across queues
let selected_a = selector.select(&queues, &msg, &100);
let selected_b = selector.select(&queues, &msg, &200);
assert!(selected_a.is_some());
assert!(selected_b.is_some());
}

#[test]
fn test_select_with_string_argument() {
let selector = SelectMessageQueueByHash::new();

let queues = vec![
MessageQueue::from_parts("test_topic", "broker-a", 0),
MessageQueue::from_parts("test_topic", "broker-a", 1),
MessageQueue::from_parts("test_topic", "broker-a", 2),
];

let msg = Message::builder().topic("test_topic").build().unwrap();

// Test with string argument
let user_id = "user_12345";
let selected1 = selector.select(&queues, &msg, &user_id);
let selected2 = selector.select(&queues, &msg, &user_id);

assert_eq!(selected1, selected2);
assert!(selected1.is_some());
}

#[test]
fn test_select_empty_queue_list() {
let selector = SelectMessageQueueByHash::new();
let queues: Vec<MessageQueue> = vec![];
let msg = Message::builder().topic("test_topic").build().unwrap();
let order_id = 12345;

let selected = selector.select(&queues, &msg, &order_id);
assert!(selected.is_none());
}

#[test]
fn test_select_single_queue() {
let selector = SelectMessageQueueByHash::new();
let queues = vec![MessageQueue::from_parts("test_topic", "broker-a", 0)];
let msg = Message::builder().topic("test_topic").build().unwrap();

// All arguments should select the only available queue
let selected1 = selector.select(&queues, &msg, &100);
let selected2 = selector.select(&queues, &msg, &200);
let selected3 = selector.select(&queues, &msg, &300);

assert_eq!(selected1, selected2);
assert_eq!(selected2, selected3);
assert_eq!(selected1.unwrap().queue_id(), 0);
}

#[test]
fn test_distribution_across_queues() {
let selector = SelectMessageQueueByHash::new();
let queues = vec![
MessageQueue::from_parts("test_topic", "broker-a", 0),
MessageQueue::from_parts("test_topic", "broker-a", 1),
MessageQueue::from_parts("test_topic", "broker-a", 2),
MessageQueue::from_parts("test_topic", "broker-a", 3),
];

let msg = Message::builder().topic("test_topic").build().unwrap();
let mut queue_counts = std::collections::HashMap::new();

// Test distribution with 100 different arguments
for i in 0..100 {
if let Some(queue) = selector.select(&queues, &msg, &i) {
*queue_counts.entry(queue.queue_id()).or_insert(0) += 1;
}
}

// Verify all queues received at least some messages
assert!(!queue_counts.is_empty());
for count in queue_counts.values() {
assert!(*count > 0);
}
}
}
Loading