Skip to content

Commit a1a1505

Browse files
authored
[ISSUE #6323]🚀Add producer_with_timeout example for RocketMQ (#6324)
1 parent 870da38 commit a1a1505

File tree

2 files changed

+95
-1
lines changed

2 files changed

+95
-1
lines changed

rocketmq-example/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,8 @@ path = "examples/consumer/pop_consumer.rs"
4848

4949
[[example]]
5050
name = "producer-simple"
51-
path = "examples/producer/producer_simple.rs"
51+
path = "examples/producer/producer_simple.rs"
52+
53+
[[example]]
54+
name = "producer-with-timeout"
55+
path = "examples/producer/producer_with_timeout.rs"
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Copyright 2023 The RocketMQ Rust Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use rocketmq_client_rust::producer::default_mq_producer::DefaultMQProducer;
16+
use rocketmq_client_rust::producer::mq_producer::MQProducer;
17+
use rocketmq_common::common::message::message_single::Message;
18+
use rocketmq_error::RocketMQResult;
19+
use rocketmq_rust::rocketmq;
20+
21+
pub const MESSAGE_COUNT: usize = 1;
22+
pub const PRODUCER_GROUP: &str = "producer_with_timeout";
23+
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
24+
pub const TOPIC: &str = "ProducerTimeoutTest";
25+
pub const TAG: &str = "TagA";
26+
27+
// Send timeout in milliseconds
28+
pub const SEND_TIMEOUT_MS: u64 = 3000;
29+
30+
#[rocketmq::main]
31+
pub async fn main() -> RocketMQResult<()> {
32+
// init logger
33+
rocketmq_common::log::init_logger()?;
34+
35+
// create a producer builder with default configuration
36+
let mut producer = DefaultMQProducer::builder()
37+
.producer_group(PRODUCER_GROUP)
38+
.name_server_addr(DEFAULT_NAMESRVADDR)
39+
.build();
40+
41+
producer.start().await?;
42+
43+
// Example 1: Set delay level (1-18 for predefined levels)
44+
let message = Message::builder()
45+
.topic(TOPIC)
46+
.tags(TAG)
47+
.body("Hello RocketMQ with delay level")
48+
.delay_level(3)
49+
.build()?;
50+
let send_result = producer.send_with_timeout(message, SEND_TIMEOUT_MS).await?;
51+
println!("send result with delay level: {:?}", send_result);
52+
53+
// Example 2: Set delay time in seconds
54+
let message = Message::builder()
55+
.topic(TOPIC)
56+
.tags(TAG)
57+
.body("Hello RocketMQ with delay seconds")
58+
.delay_secs(60)
59+
.build()?;
60+
let send_result = producer.send_with_timeout(message, SEND_TIMEOUT_MS).await?;
61+
println!("send result with delay seconds: {:?}", send_result);
62+
63+
// Example 3: Set delay time in milliseconds
64+
let message = Message::builder()
65+
.topic(TOPIC)
66+
.tags(TAG)
67+
.body("Hello RocketMQ with delay millis")
68+
.delay_millis(5000)
69+
.build()?;
70+
let send_result = producer.send_with_timeout(message, SEND_TIMEOUT_MS).await?;
71+
println!("send result with delay millis: {:?}", send_result);
72+
73+
// Example 4: Set delivery time (absolute timestamp in milliseconds)
74+
let current_time = std::time::SystemTime::now()
75+
.duration_since(std::time::UNIX_EPOCH)
76+
.unwrap()
77+
.as_millis() as u64;
78+
let deliver_time = current_time + 10_000; // deliver after 10 seconds
79+
let message = Message::builder()
80+
.topic(TOPIC)
81+
.tags(TAG)
82+
.body("Hello RocketMQ with deliver time")
83+
.deliver_time_ms(deliver_time)
84+
.build()?;
85+
let send_result = producer.send_with_timeout(message, SEND_TIMEOUT_MS).await?;
86+
println!("send result with deliver time: {:?}", send_result);
87+
88+
producer.shutdown().await;
89+
Ok(())
90+
}

0 commit comments

Comments
 (0)