Skip to content

Commit 73e06e4

Browse files
authored
[ISSUE #6618]🚀Add hash-based message queue selector (#6619)
1 parent da0bba5 commit 73e06e4

File tree

4 files changed

+302
-0
lines changed

4 files changed

+302
-0
lines changed
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
use criterion::criterion_group;
2+
use criterion::criterion_main;
3+
use criterion::Criterion;
4+
use rocketmq_client_rust::producer::message_queue_selector::MessageQueueSelector;
5+
use rocketmq_client_rust::producer::queue_selector::SelectMessageQueueByHash;
6+
use rocketmq_common::common::message::message_queue::MessageQueue;
7+
use rocketmq_common::common::message::message_single::Message;
8+
use std::cell::RefCell;
9+
use std::collections::hash_map::DefaultHasher;
10+
use std::hash::Hash;
11+
use std::hash::Hasher;
12+
use std::hint::black_box;
13+
14+
thread_local! {
15+
static HASHER: RefCell<DefaultHasher> = RefCell::new(DefaultHasher::new());
16+
}
17+
18+
/// Queue selector using thread-local hasher for performance comparison.
19+
struct SelectMessageQueueByHashCached;
20+
21+
impl<M, A> MessageQueueSelector<M, A> for SelectMessageQueueByHashCached
22+
where
23+
M: rocketmq_common::common::message::MessageTrait,
24+
A: Hash,
25+
{
26+
#[inline]
27+
fn select(&self, mqs: &[MessageQueue], _msg: &M, arg: &A) -> Option<MessageQueue> {
28+
if mqs.is_empty() {
29+
return None;
30+
}
31+
32+
let hash_code = HASHER.with(|hasher| {
33+
let mut h = hasher.borrow_mut();
34+
arg.hash(&mut *h);
35+
let code = h.finish();
36+
*h = DefaultHasher::new();
37+
code
38+
});
39+
40+
let index = (hash_code % mqs.len() as u64) as usize;
41+
mqs.get(index).cloned()
42+
}
43+
}
44+
45+
fn bench_hasher_creation(c: &mut Criterion) {
46+
c.bench_function("hasher_new", |b| {
47+
b.iter(|| {
48+
let hasher = black_box(DefaultHasher::new());
49+
black_box(hasher);
50+
})
51+
});
52+
}
53+
54+
fn bench_select_original(c: &mut Criterion) {
55+
let selector = SelectMessageQueueByHash;
56+
let queues = vec![
57+
MessageQueue::from_parts("test_topic", "broker-a", 0),
58+
MessageQueue::from_parts("test_topic", "broker-a", 1),
59+
MessageQueue::from_parts("test_topic", "broker-a", 2),
60+
MessageQueue::from_parts("test_topic", "broker-a", 3),
61+
];
62+
let msg = Message::builder().build().unwrap();
63+
64+
c.bench_function("select_original", |b| {
65+
b.iter(|| {
66+
let order_id = black_box(12345);
67+
black_box(selector.select(&queues, &msg, &order_id))
68+
})
69+
});
70+
}
71+
72+
fn bench_select_cached(c: &mut Criterion) {
73+
let selector = SelectMessageQueueByHashCached;
74+
let queues = vec![
75+
MessageQueue::from_parts("test_topic", "broker-a", 0),
76+
MessageQueue::from_parts("test_topic", "broker-a", 1),
77+
MessageQueue::from_parts("test_topic", "broker-a", 2),
78+
MessageQueue::from_parts("test_topic", "broker-a", 3),
79+
];
80+
let msg = Message::builder().build().unwrap();
81+
82+
c.bench_function("select_cached", |b| {
83+
b.iter(|| {
84+
let order_id = black_box(12345);
85+
black_box(selector.select(&queues, &msg, &order_id))
86+
})
87+
});
88+
}
89+
90+
criterion_group!(
91+
benches,
92+
bench_hasher_creation,
93+
bench_select_original,
94+
bench_select_cached
95+
);
96+
criterion_main!(benches);

rocketmq-client/src/producer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ pub mod mq_producer;
2020
pub mod produce_accumulator;
2121
pub mod producer_config_validation;
2222
pub mod producer_impl;
23+
pub mod queue_selector;
2324
pub mod request_callback;
2425
pub(crate) mod request_future_holder;
2526
pub(crate) mod request_response_future;
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright 2023 The RocketMQ Rust Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
pub mod select_message_queue_by_hash;
16+
17+
pub use select_message_queue_by_hash::SelectMessageQueueByHash;
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
// Copyright 2023 The RocketMQ Rust Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::hash::Hash;
16+
use std::hash::Hasher;
17+
18+
use rocketmq_common::common::message::message_queue::MessageQueue;
19+
use rocketmq_common::common::message::MessageTrait;
20+
21+
use crate::producer::message_queue_selector::MessageQueueSelector;
22+
23+
/// A message queue selector that uses hash-based routing.
24+
///
25+
/// Routes messages to queues by computing a hash of the provided argument and applying
26+
/// modulo against the available queue count. Messages with identical argument values
27+
/// are consistently routed to the same queue, preserving ordering semantics.
28+
///
29+
/// # Performance
30+
///
31+
/// Selection operates in O(1) time with no heap allocations. The `select` method is
32+
/// inlined to eliminate function call overhead.
33+
///
34+
/// # Examples
35+
///
36+
/// ```rust,ignore
37+
/// use rocketmq_client_rust::producer::queue_selector::SelectMessageQueueByHash;
38+
/// use rocketmq_client_rust::producer::message_queue_selector::MessageQueueSelector;
39+
///
40+
/// let selector = SelectMessageQueueByHash;
41+
/// let order_id = 12345;
42+
/// let queue = selector.select(&message_queues, &message, &order_id);
43+
/// ```
44+
#[derive(Debug, Clone, Copy, Default)]
45+
pub struct SelectMessageQueueByHash;
46+
47+
impl SelectMessageQueueByHash {
48+
/// Returns a new instance.
49+
pub fn new() -> Self {
50+
Self
51+
}
52+
}
53+
54+
impl<M, A> MessageQueueSelector<M, A> for SelectMessageQueueByHash
55+
where
56+
M: MessageTrait,
57+
A: Hash,
58+
{
59+
/// Selects a message queue by hashing the argument.
60+
///
61+
/// Returns `None` if the queue list is empty.
62+
#[inline]
63+
fn select(&self, mqs: &[MessageQueue], _msg: &M, arg: &A) -> Option<MessageQueue> {
64+
if mqs.is_empty() {
65+
return None;
66+
}
67+
68+
let mut hasher = std::collections::hash_map::DefaultHasher::new();
69+
arg.hash(&mut hasher);
70+
let hash_code = hasher.finish();
71+
72+
let index = (hash_code % mqs.len() as u64) as usize;
73+
mqs.get(index).cloned()
74+
}
75+
}
76+
77+
#[cfg(test)]
78+
mod tests {
79+
use rocketmq_common::common::message::message_queue::MessageQueue;
80+
use rocketmq_common::common::message::message_single::Message;
81+
82+
use super::*;
83+
84+
#[test]
85+
fn test_select_message_queue_by_hash() {
86+
let selector = SelectMessageQueueByHash::new();
87+
88+
let queues = vec![
89+
MessageQueue::from_parts("test_topic", "broker-a", 0),
90+
MessageQueue::from_parts("test_topic", "broker-a", 1),
91+
MessageQueue::from_parts("test_topic", "broker-a", 2),
92+
MessageQueue::from_parts("test_topic", "broker-a", 3),
93+
];
94+
95+
let msg = Message::builder().topic("test_topic").build().unwrap();
96+
97+
// Test with integer argument
98+
let order_id = 12345;
99+
let selected = selector.select(&queues, &msg, &order_id);
100+
assert!(selected.is_some());
101+
102+
// Same argument should select same queue
103+
let selected1 = selector.select(&queues, &msg, &order_id);
104+
let selected2 = selector.select(&queues, &msg, &order_id);
105+
assert_eq!(selected1, selected2);
106+
107+
// Different arguments should distribute across queues
108+
let selected_a = selector.select(&queues, &msg, &100);
109+
let selected_b = selector.select(&queues, &msg, &200);
110+
assert!(selected_a.is_some());
111+
assert!(selected_b.is_some());
112+
}
113+
114+
#[test]
115+
fn test_select_with_string_argument() {
116+
let selector = SelectMessageQueueByHash::new();
117+
118+
let queues = vec![
119+
MessageQueue::from_parts("test_topic", "broker-a", 0),
120+
MessageQueue::from_parts("test_topic", "broker-a", 1),
121+
MessageQueue::from_parts("test_topic", "broker-a", 2),
122+
];
123+
124+
let msg = Message::builder().topic("test_topic").build().unwrap();
125+
126+
// Test with string argument
127+
let user_id = "user_12345";
128+
let selected1 = selector.select(&queues, &msg, &user_id);
129+
let selected2 = selector.select(&queues, &msg, &user_id);
130+
131+
assert_eq!(selected1, selected2);
132+
assert!(selected1.is_some());
133+
}
134+
135+
#[test]
136+
fn test_select_empty_queue_list() {
137+
let selector = SelectMessageQueueByHash::new();
138+
let queues: Vec<MessageQueue> = vec![];
139+
let msg = Message::builder().topic("test_topic").build().unwrap();
140+
let order_id = 12345;
141+
142+
let selected = selector.select(&queues, &msg, &order_id);
143+
assert!(selected.is_none());
144+
}
145+
146+
#[test]
147+
fn test_select_single_queue() {
148+
let selector = SelectMessageQueueByHash::new();
149+
let queues = vec![MessageQueue::from_parts("test_topic", "broker-a", 0)];
150+
let msg = Message::builder().topic("test_topic").build().unwrap();
151+
152+
// All arguments should select the only available queue
153+
let selected1 = selector.select(&queues, &msg, &100);
154+
let selected2 = selector.select(&queues, &msg, &200);
155+
let selected3 = selector.select(&queues, &msg, &300);
156+
157+
assert_eq!(selected1, selected2);
158+
assert_eq!(selected2, selected3);
159+
assert_eq!(selected1.unwrap().queue_id(), 0);
160+
}
161+
162+
#[test]
163+
fn test_distribution_across_queues() {
164+
let selector = SelectMessageQueueByHash::new();
165+
let queues = vec![
166+
MessageQueue::from_parts("test_topic", "broker-a", 0),
167+
MessageQueue::from_parts("test_topic", "broker-a", 1),
168+
MessageQueue::from_parts("test_topic", "broker-a", 2),
169+
MessageQueue::from_parts("test_topic", "broker-a", 3),
170+
];
171+
172+
let msg = Message::builder().topic("test_topic").build().unwrap();
173+
let mut queue_counts = std::collections::HashMap::new();
174+
175+
// Test distribution with 100 different arguments
176+
for i in 0..100 {
177+
if let Some(queue) = selector.select(&queues, &msg, &i) {
178+
*queue_counts.entry(queue.queue_id()).or_insert(0) += 1;
179+
}
180+
}
181+
182+
// Verify all queues received at least some messages
183+
assert!(!queue_counts.is_empty());
184+
for count in queue_counts.values() {
185+
assert!(*count > 0);
186+
}
187+
}
188+
}

0 commit comments

Comments
 (0)