Skip to content

Commit 511417d

Browse files
authored
[ISSUE #6341]🚀Add producer send with selector examples (#6342)
* [ISSUE #6341]🚀Add producer send with selector examples * [ISSUE #6341]🚀Add producer send with selector examples
1 parent 1ac16a0 commit 511417d

File tree

2 files changed

+247
-1
lines changed

2 files changed

+247
-1
lines changed

rocketmq-example/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,8 @@ path = "examples/producer/batch_send.rs"
6464

6565
[[example]]
6666
name = "producer-send-to-queue"
67-
path = "examples/producer/send_to_queue.rs"
67+
path = "examples/producer/send_to_queue.rs"
68+
69+
[[example]]
70+
name = "producer-send-with-selector"
71+
path = "examples/producer/send_with_selector.rs"
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
// Copyright 2023 The RocketMQ Rust Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! # Send with Selector Methods
16+
//!
17+
//! This example demonstrates methods for sending messages with custom queue selection logic:
18+
//! - `send_with_selector`: Send with custom queue selector
19+
//! - `send_with_selector_timeout`: Send with selector and timeout
20+
//! - `send_with_selector_callback`: Send with selector and callback
21+
//! - `send_with_selector_callback_timeout`: Send with selector, callback, and timeout
22+
//! - `send_oneway_with_selector`: One-way send with selector
23+
24+
use std::any::Any;
25+
use std::sync::Arc;
26+
27+
use rocketmq_client_rust::producer::default_mq_producer::DefaultMQProducer;
28+
use rocketmq_client_rust::producer::mq_producer::MQProducer;
29+
use rocketmq_client_rust::producer::send_callback::SendMessageCallback;
30+
use rocketmq_client_rust::producer::send_result::SendResult;
31+
use rocketmq_common::common::message::MessageTrait;
32+
use rocketmq_common::common::message::message_queue::MessageQueue;
33+
use rocketmq_common::common::message::message_single::Message;
34+
use rocketmq_error::RocketMQResult;
35+
use rocketmq_rust::rocketmq;
36+
37+
pub const PRODUCER_GROUP: &str = "producer_send_with_selector";
38+
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
39+
pub const TOPIC: &str = "SendWithSelectorTestTopic";
40+
pub const TAG: &str = "SelectorTag";
41+
pub const TIMEOUT_MS: u64 = 3000;
42+
43+
#[rocketmq::main]
44+
pub async fn main() -> RocketMQResult<()> {
45+
rocketmq_common::log::init_logger()?;
46+
47+
let mut producer = DefaultMQProducer::builder()
48+
.producer_group(PRODUCER_GROUP)
49+
.name_server_addr(DEFAULT_NAMESRVADDR)
50+
.build();
51+
52+
producer.start().await?;
53+
54+
println!("========== RocketMQ Send with Selector Methods ==========\n");
55+
56+
// 1. Send with selector
57+
send_with_selector(&mut producer).await?;
58+
59+
// 2. Send with selector and timeout
60+
send_with_selector_timeout(&mut producer).await?;
61+
62+
// 3. Send with selector and callback
63+
send_with_selector_callback(&mut producer).await?;
64+
65+
// 4. Send with selector, callback, and timeout
66+
send_with_selector_callback_timeout(&mut producer).await?;
67+
68+
// 5. One-way send with selector
69+
send_oneway_with_selector(&mut producer).await?;
70+
71+
producer.shutdown().await;
72+
println!("\n========== All examples completed ==========");
73+
Ok(())
74+
}
75+
76+
/// 1. Send with selector
77+
///
78+
/// Uses a custom selector function to choose which queue to send the message to.
79+
/// The selector receives all available queues and chooses one based on custom logic.
80+
async fn send_with_selector(producer: &mut DefaultMQProducer) -> RocketMQResult<()> {
81+
println!("1. Send with Selector");
82+
println!(" Method: producer.send_with_selector(message, selector_fn, arg).await");
83+
84+
// Define a selector function
85+
let selector = |queues: &[MessageQueue], _msg: &dyn MessageTrait, _arg: &dyn Any| -> Option<MessageQueue> {
86+
// Example: Select first queue (simple selection)
87+
queues.first().cloned()
88+
89+
// Other possible selection strategies:
90+
// - Hash-based: queues[(msg_hash % queues.len())]
91+
// - Round-robin: maintain index and rotate
92+
// - Random: queues[random index]
93+
// - Attribute-based: based on message properties
94+
};
95+
96+
let message = Message::builder()
97+
.topic(TOPIC)
98+
.tags(TAG)
99+
.body("Send with selector message")
100+
.build()?;
101+
102+
let arg = "selector-arg"; // Can be any type implementing Any
103+
104+
match producer.send_with_selector(message, selector, arg).await? {
105+
Some(result) => println!(" Result: {:?}", result),
106+
None => println!(" Result: None (async delivery)"),
107+
}
108+
109+
println!(" Status: Completed\n");
110+
Ok(())
111+
}
112+
113+
/// 2. Send with selector and timeout
114+
///
115+
/// Combines custom queue selection with timeout protection.
116+
async fn send_with_selector_timeout(producer: &mut DefaultMQProducer) -> RocketMQResult<()> {
117+
println!("2. Send with Selector and Timeout");
118+
println!(" Method: producer.send_with_selector_timeout(message, selector_fn, arg, timeout_ms).await");
119+
println!(" Timeout: {}ms", TIMEOUT_MS);
120+
121+
let selector = |queues: &[MessageQueue], _msg: &dyn MessageTrait, _arg: &dyn Any| -> Option<MessageQueue> {
122+
queues.first().cloned()
123+
};
124+
125+
let message = Message::builder()
126+
.topic(TOPIC)
127+
.tags(TAG)
128+
.body("Send with selector and timeout message")
129+
.build()?;
130+
131+
let arg = "selector-arg";
132+
133+
match producer
134+
.send_with_selector_timeout(message, selector, arg, TIMEOUT_MS)
135+
.await?
136+
{
137+
Some(result) => println!(" Result: {:?}", result),
138+
None => println!(" Result: None (async delivery)"),
139+
}
140+
141+
println!(" Status: Completed\n");
142+
Ok(())
143+
}
144+
145+
/// 3. Send with selector and callback
146+
///
147+
/// Combines custom queue selection with asynchronous callback delivery.
148+
async fn send_with_selector_callback(producer: &mut DefaultMQProducer) -> RocketMQResult<()> {
149+
println!("3. Send with Selector and Callback");
150+
println!(" Method: producer.send_with_selector_callback(message, selector_fn, arg, callback_fn).await");
151+
152+
let selector = |queues: &[MessageQueue], _msg: &dyn MessageTrait, _arg: &dyn Any| -> Option<MessageQueue> {
153+
queues.first().cloned()
154+
};
155+
156+
let message = Message::builder()
157+
.topic(TOPIC)
158+
.tags(TAG)
159+
.body("Send with selector and callback message")
160+
.build()?;
161+
162+
let arg = "selector-arg";
163+
164+
let callback: SendMessageCallback = Arc::new(
165+
|result: Option<&SendResult>, error: Option<&dyn std::error::Error>| match (result, error) {
166+
(Some(r), None) => println!(" Callback: Success - {:?}", r),
167+
(None, Some(e)) => println!(" Callback: Error - {}", e),
168+
_ => println!(" Callback: Unknown state"),
169+
},
170+
);
171+
172+
producer
173+
.send_with_selector_callback(message, selector, arg, Some(callback))
174+
.await?;
175+
176+
println!(" Status: Completed\n");
177+
Ok(())
178+
}
179+
180+
/// 4. Send with selector, callback, and timeout
181+
///
182+
/// Full featured send with custom queue selection, callback delivery, and timeout protection.
183+
async fn send_with_selector_callback_timeout(producer: &mut DefaultMQProducer) -> RocketMQResult<()> {
184+
println!("4. Send with Selector, Callback, and Timeout");
185+
println!(
186+
" Method: producer.send_with_selector_callback_timeout(message, selector_fn, arg, callback_fn, \
187+
timeout_ms).await"
188+
);
189+
println!(" Timeout: {}ms", TIMEOUT_MS);
190+
191+
let selector = |queues: &[MessageQueue], _msg: &dyn MessageTrait, _arg: &dyn Any| -> Option<MessageQueue> {
192+
queues.first().cloned()
193+
};
194+
195+
let message = Message::builder()
196+
.topic(TOPIC)
197+
.tags(TAG)
198+
.body("Send with selector, callback, and timeout message")
199+
.build()?;
200+
201+
let arg = "selector-arg";
202+
203+
let callback: SendMessageCallback = Arc::new(
204+
|result: Option<&SendResult>, error: Option<&dyn std::error::Error>| match (result, error) {
205+
(Some(r), None) => println!(" Callback: Success - {:?}", r),
206+
(None, Some(e)) => println!(" Callback: Error - {}", e),
207+
_ => println!(" Callback: Unknown state"),
208+
},
209+
);
210+
211+
producer
212+
.send_with_selector_callback_timeout(message, selector, arg, Some(callback), TIMEOUT_MS)
213+
.await?;
214+
215+
println!(" Status: Completed\n");
216+
Ok(())
217+
}
218+
219+
/// 5. One-way send with selector
220+
///
221+
/// One-way send with custom queue selection, no response expected.
222+
async fn send_oneway_with_selector(producer: &mut DefaultMQProducer) -> RocketMQResult<()> {
223+
println!("5. One-way Send with Selector");
224+
println!(" Method: producer.send_oneway_with_selector(message, selector_fn, arg).await");
225+
226+
let selector = |queues: &[MessageQueue], _msg: &dyn MessageTrait, _arg: &dyn Any| -> Option<MessageQueue> {
227+
queues.first().cloned()
228+
};
229+
230+
let message = Message::builder()
231+
.topic(TOPIC)
232+
.tags(TAG)
233+
.body("One-way send with selector message")
234+
.build()?;
235+
236+
let arg = "selector-arg";
237+
238+
producer.send_oneway_with_selector(message, selector, arg).await?;
239+
240+
println!(" Status: Sent (no response expected)\n");
241+
Ok(())
242+
}

0 commit comments

Comments
 (0)