This project is the Rust implementation of Apache RocketMQ client. It provides feature parity with the RocketMQ Java client, supporting all major Producer capabilities.
The RocketMQ Rust client provides comprehensive Producer functionality through two main implementations:
| Feature | DefaultMQProducer | TransactionMQProducer |
|---|---|---|
| Basic Send (Sync/Async/Oneway) | ✅ | ✅ |
| Send to Specific Queue | ✅ | ✅ |
| Send with Selector | ✅ | ✅ |
| Batch Send | ✅ | ✅ |
| Request-Reply (RPC) | ✅ | ✅ |
| Message Recall | ✅ | ✅ |
| Transaction Messages | ❌ | ✅ |
| Auto Batch Sending | ✅ | ✅ |
| Backpressure Control | ✅ | ✅ |
- Synchronous Send: Blocks until receive broker response
- Asynchronous Send: Non-blocking with callback
- Oneway Send: Fire-and-forget, no response expected
- Timeout Support: Configurable per-request timeout
- Auto Selection: Default load balancing across queues
- Specific Queue: Send to designated MessageQueue
- Custom Selector: Implement
MessageQueueSelectorfor custom routing logic
- Manual Batch: Send multiple messages together
- Auto Batch: Automatic batching with configurable thresholds
- Batch to Queue: Send batches to specific queues
- Synchronous Request: Send request and wait for response
- Asynchronous Request: Non-blocking with callback
- Request with Selector: Route requests via custom selector
- Request to Queue: Send requests to specific queues
- Local Transaction Execution: Execute transaction logic locally
- Transaction Commit/Rollback: Full transaction state management
- Transaction Listener: Custom transaction behavior via
TransactionListenertrait
- Message Recall: Recall messages by topic and handle
- Compression: Automatic compression for large messages
- Backpressure: Configurable async backpressure control
- Namespace Support: Multi-tenant namespace isolation
- Trace Integration: Message tracing support
First, start the RocketMQ NameServer and Broker services.
For more examples, you can check here
use rocketmq_client_rust::producer::default_mq_producer::DefaultMQProducer;
use rocketmq_client_rust::producer::mq_producer::MQProducer;
use rocketmq_client_rust::Result;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_rust::rocketmq;
pub const MESSAGE_COUNT: usize = 1;
pub const PRODUCER_GROUP: &str = "please_rename_unique_group_name";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "TopicTest";
pub const TAG: &str = "TagA";
#[rocketmq::main]
pub async fn main() -> Result<()> {
//init logger
rocketmq_common::log::init_logger()?;
// create a producer builder with default configuration
let builder = DefaultMQProducer::builder();
let mut producer = builder
.producer_group(PRODUCER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.build();
producer.start().await?;
for _ in 0..10 {
let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes());
let send_result = producer.send_with_timeout(message, 2000).await?;
println!("send result: {}", send_result);
}
producer.shutdown().await;
Ok(())
}use rocketmq_client_rust::producer::default_mq_producer::DefaultMQProducer;
use rocketmq_client_rust::producer::mq_producer::MQProducer;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_rust::rocketmq;
pub const PRODUCER_GROUP: &str = "BatchProducerGroupName";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "TopicTest";
pub const TAG: &str = "TagA";
#[rocketmq::main]
pub async fn main() -> rocketmq_client_rust::Result<()> {
//init logger
rocketmq_common::log::init_logger()?;
// create a producer builder with default configuration
let builder = DefaultMQProducer::builder();
let mut producer = builder
.producer_group(PRODUCER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.build();
producer.start().await?;
let mut messages = Vec::new();
messages.push(Message::with_keys(
TOPIC,
TAG,
"OrderID001",
"Hello world 0".as_bytes(),
));
messages.push(Message::with_keys(
TOPIC,
TAG,
"OrderID002",
"Hello world 1".as_bytes(),
));
messages.push(Message::with_keys(
TOPIC,
TAG,
"OrderID003",
"Hello world 2".as_bytes(),
));
let send_result = producer.send_batch(messages).await?;
println!("send result: {}", send_result);
Ok(())
}use rocketmq_client_rust::producer::default_mq_producer::DefaultMQProducer;
use rocketmq_client_rust::producer::mq_producer::MQProducer;
use rocketmq_client_rust::Result;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_rust::rocketmq;
pub const MESSAGE_COUNT: usize = 1;
pub const PRODUCER_GROUP: &str = "please_rename_unique_group_name";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "RequestTopic";
pub const TAG: &str = "TagA";
#[rocketmq::main]
pub async fn main() -> Result<()> {
//init logger
rocketmq_common::log::init_logger()?;
// create a producer builder with default configuration
let builder = DefaultMQProducer::builder();
let mut producer = builder
.producer_group(PRODUCER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.build();
producer.start().await?;
let ttl = 3000;
let message = producer
.request(
Message::with_tags(TOPIC, "", "Hello RocketMQ".as_bytes()),
ttl,
)
.await?;
println!("send result: {:?}", message);
producer.shutdown().await;
Ok(())
}use rocketmq_client_rust::producer::transaction_mq_producer::TransactionMQProducer;
use rocketmq_client_rust::producer::transaction_listener::TransactionListener;
use cheetah_string::CheetahString;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_rust::rocketmq;
pub const PRODUCER_GROUP: &str = "transaction_producer_group";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "TransactionTopic";
// Custom transaction listener
struct MyTransactionListener;
impl TransactionListener for MyTransactionListener {
fn execute_local_transaction(
&mut self,
msg: &dyn rocketmq_common::common::message::MessageTrait,
arg: &dyn std::any::Any,
) -> rocketmq_client_rust::Result<rocketmq_client_rust::producer::transaction_listener::LocalTransactionState> {
// Implement local transaction logic here
println!("Executing local transaction for message: {:?}", msg.get_keys());
Ok(rocketmq_client_rust::producer::transaction_listener::LocalTransactionState::CommitMessage)
}
fn check_local_transaction(
&mut self,
msg: &dyn rocketmq_common::common::message::MessageTrait,
) -> rocketmq_client_rust::Result<rocketmq_client_rust::producer::transaction_listener::LocalTransactionState> {
// Check transaction status
Ok(rocketmq_client_rust::producer::transaction_listener::LocalTransactionState::CommitMessage)
}
}
#[rocketmq::main]
pub async fn main() -> rocketmq_client_rust::Result<()> {
rocketmq_common::log::init_logger()?;
let mut producer = TransactionMQProducer::builder()
.producer_group(PRODUCER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.transaction_listener(MyTransactionListener)
.build()
.await?;
producer.start().await?;
let msg = Message::with_tags(TOPIC, "", "Hello Transactional RocketMQ".as_bytes());
let result = producer.send_message_in_transaction(msg, Some("transaction_arg")).await?;
println!("Transaction send result: {:?}", result);
producer.shutdown().await;
Ok(())
}use rocketmq_client_rust::producer::default_mq_producer::DefaultMQProducer;
use rocketmq_client_rust::producer::mq_producer::MQProducer;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::message::MessageTrait;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_rust::rocketmq;
pub const PRODUCER_GROUP: &str = "selector_producer_group";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "SelectorTopic";
#[rocketmq::main]
pub async fn main() -> rocketmq_client_rust::Result<()> {
rocketmq_common::log::init_logger()?;
let mut producer = DefaultMQProducer::builder()
.producer_group(PRODUCER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.build();
producer.start().await?;
let msg = Message::with_tags(TOPIC, "", "Hello RocketMQ with Selector".as_bytes());
// Custom queue selector - routes messages based on key
let selector = |queues: &[MessageQueue], _msg: &dyn MessageTrait, arg: &dyn std::any::Any| {
if let Some(order_id) = arg.downcast_ref::<String>() {
// Simple hash-based routing
let hash = order_id.chars().map(|c| c as usize).sum::<usize>();
let index = hash % queues.len();
Some(queues[index].clone())
} else {
queues.first().cloned()
}
};
let order_id = "ORDER12345".to_string();
let result = producer.send_with_selector(msg, selector, order_id).await?;
println!("Send result: {:?}", result);
producer.shutdown().await;
Ok(())
}use rocketmq_client_rust::producer::default_mq_producer::DefaultMQProducer;
use rocketmq_client_rust::producer::mq_producer::MQProducer;
use rocketmq_common::common::message::message_single::Message;
use cheetah_string::CheetahString;
use rocketmq_rust::rocketmq;
pub const PRODUCER_GROUP: &str = "recall_producer_group";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "RecallTopic";
#[rocketmq::main]
pub async fn main() -> rocketmq_client_rust::Result<()> {
rocketmq_common::log::init_logger()?;
let mut producer = DefaultMQProducer::builder()
.producer_group(PRODUCER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.build();
producer.start().await?;
// Send a message
let msg = Message::with_tags(TOPIC, "", "Hello RocketMQ - Recallable".as_bytes());
let send_result = producer.send(msg).await?;
println!("Send result: {:?}", send_result);
// Recall the message (if recall handle is available)
// The recall handle is typically returned in the send result for recallable messages
if let Some(recall_handle) = send_result.and_then(|r| r.recall_handle()) {
let recall_result = producer.recall_message(
CheetahString::from_static_str(TOPIC),
CheetahString::from(recall_handle.as_str())
).await?;
println!("Recall result: {}", recall_result);
}
producer.shutdown().await;
Ok(())
}The RocketMQ Rust client provides zero-copy polling APIs for maximum performance when consuming messages at high throughput (10,000+ msg/s).
| Method | Throughput | Memory Overhead | Use Case |
|---|---|---|---|
poll() |
~1,000 msg/s | High (clones all messages) | Simple scenarios, need to store messages |
poll_zero_copy() |
10,000+ msg/s | Minimal (no cloning) | High throughput, read-only processing |
Regular poll() overhead:
// ⚠️ Each poll with 32 messages of 2KB each = ~90KB allocation + copy
// At 100 polls/sec = ~9MB/s memory allocation rate
let messages = consumer.poll().await; // Clones every message!Zero-copy poll_zero_copy() advantage:
// ✅ Zero heap allocations, 10x+ faster
let messages = consumer.poll_zero_copy().await; // No cloning!For processing messages without storing them (forward, parse, aggregate, etc.):
use rocketmq_client::consumer::default_lite_pull_consumer::DefaultLitePullConsumer;
use rocketmq_client::consumer::lite_pull_consumer::LitePullConsumer;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let consumer = DefaultLitePullConsumer::builder()
.consumer_group("high_throughput_group")
.name_server_addr("127.0.0.1:9876")
.pull_batch_size(32)
.auto_commit(true)
.build();
consumer.start().await?;
consumer.subscribe("MyTopic").await?;
loop {
// ✅ Use zero-copy for maximum performance (10x+ faster)
let messages = consumer.poll_zero_copy().await;
for msg in &messages {
// Process without cloning - read-only access
let topic = msg.get_topic();
let body = msg.get_body();
// Your processing: parse, forward, aggregate, etc.
process_message(topic, body).await?;
}
// Messages automatically dropped here, no manual cleanup!
}
}When you need to store only some messages:
// ✅ Best practice: Zero-copy first, clone only what you need
let messages = consumer.poll_zero_copy().await;
// Filter and clone only important messages
let important: Vec<MessageExt> = messages.into_iter()
.filter(|msg| is_important(msg))
.map(|msg| (*msg).clone()) // Clone only filtered messages
.collect();
// Store only important messages
store.save(important);Use regular poll() when you need to store all messages:
// ✅ Appropriate use of poll() - need to store all messages
let messages = consumer.poll().await; // Clones all
message_store.save_all(messages); // Messages outlive poll scopeUse the zero-copy API in your application code. Here's how:
Basic high-throughput pattern: Basic high-throughput pattern:
loop {
let messages = consumer.poll_zero_copy().await;
for msg in &messages {
process_message(msg).await?;
}
}**SeImplementation Notes
**Key Takeaways:ng pattern:
let messages = consumer.poll_zero_copy().await;
let important = messages.into_iter()
.filter(|m| is_important(m))
.map(|m| (*m).clone())
.collect();- Default choice for high throughput: Use
poll_zero_copy()orpoll_with_timeout_zero_copy() - Read-only processing: No need to clone - process messages directly
- Selective storage: Clone only the messages you need to keep
- Legacy compatibility: Regular
poll()still available for simple use cases
For detailed API documentation, see the LitePullConsumer trait documentation.