A simple and reliable RabbitMQ client library for Rust. Easy to use with minimal configuration and flexible retry mechanisms.
- Simple API with just Publisher and Consumer
- Flexible retry mechanisms: exponential, linear, or custom delays
- Automatic queue and exchange declaration
- Built-in reliability with intelligent error handling
- MassTransit integration for C# interoperability
- Production-ready with persistent messages and proper ACK handling
Add to your Cargo.toml:
[dependencies]
rust-rabbit = "1.2"
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }use rust_rabbit::{Connection, Publisher};
use serde::Serialize;
#[derive(Serialize)]
struct Order {
id: u32,
amount: f64,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let connection = Connection::new("amqp://localhost:5672").await?;
let publisher = Publisher::new(connection);
let order = Order { id: 123, amount: 99.99 };
// Publish to exchange
publisher.publish_to_exchange("orders", "new.order", &order, None).await?;
// Publish to queue
publisher.publish_to_queue("order_queue", &order, None).await?;
Ok(())
}use rust_rabbit::{Connection, Consumer, RetryConfig};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Clone)]
struct Order {
id: u32,
amount: f64,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let connection = Connection::new("amqp://localhost:5672").await?;
let consumer = Consumer::builder(connection, "order_queue")
.with_retry(RetryConfig::exponential_default())
.bind_to_exchange("orders", "new.order")
.with_prefetch(5)
.build();
consumer.consume(|msg: Order| async move {
println!("Processing order {}: ${}", msg.id, msg.amount);
if msg.amount > 1000.0 {
return Err("Amount too high".into());
}
Ok(())
}).await?;
Ok(())
}Comprehensive guides for common use cases and patterns:
| Guide | Description |
|---|---|
| Retry Configuration Guide | Learn about retry mechanisms, delay strategies, and DLQ configuration |
| Queues and Exchanges Guide | Understanding queue binding, exchange types, and routing patterns |
| Error Handling Guide | Error types, classification, and recovery strategies |
| Best Practices Guide | Production patterns, performance optimization, and operational tips |
Full API documentation is available at docs.rs/rust-rabbit.
See the examples/ directory for complete working examples:
- basic_publisher.rs - Simple message publishing
- basic_consumer.rs - Simple message consumption
- retry_examples.rs - Different retry configurations
- delayed_exchange_example.rs - Using delayed exchange plugin
- dlq_ttl_example.rs - Auto-cleanup DLQ with TTL
- masstransit_option_example.rs - MassTransit integration
- production_setup.rs - Production-ready configuration
rust-rabbit provides flexible retry mechanisms for handling message processing failures:
use rust_rabbit::RetryConfig;
use std::time::Duration;
// Exponential backoff: 1s, 2s, 4s, 8s, 16s
let exponential = RetryConfig::exponential_default();
// Custom exponential with base and max delay
let custom_exp = RetryConfig::exponential(
5,
Duration::from_secs(2),
Duration::from_secs(60)
);
// Linear retry: same delay for each attempt
let linear = RetryConfig::linear(3, Duration::from_secs(10));
// Custom delays for each retry
let custom = RetryConfig::custom(vec![
Duration::from_secs(1),
Duration::from_secs(5),
Duration::from_secs(30),
]);
// No retries
let no_retry = RetryConfig::no_retry();See the Retry Configuration Guide for detailed information.
Two strategies for implementing message delays:
TTL Strategy (Default)
- Uses RabbitMQ's TTL feature
- No plugin required
- Works out-of-the-box
DelayedExchange Strategy
- Uses rabbitmq_delayed_message_exchange plugin
- More precise timing
- Better for high-volume scenarios
- Requires plugin installation
use rust_rabbit::{RetryConfig, DelayStrategy};
// TTL strategy (default)
let config = RetryConfig::exponential_default()
.with_delay_strategy(DelayStrategy::TTL);
// Delayed exchange strategy (requires plugin)
let config = RetryConfig::exponential_default()
.with_delay_strategy(DelayStrategy::DelayedExchange);See the Retry Configuration Guide for setup instructions.
Failed messages that exceed max retries are automatically sent to a Dead Letter Queue. You can configure automatic cleanup:
let retry_config = RetryConfig::exponential_default()
.with_dlq_ttl(Duration::from_secs(86400)); // Auto-cleanup after 1 day
let consumer = Consumer::builder(connection, "orders")
.with_retry(retry_config)
.build();rust-rabbit seamlessly integrates with C# services using MassTransit.
Publishing to MassTransit services:
use rust_rabbit::PublishOptions;
publisher.publish_to_exchange(
"order-exchange",
"order.created",
&order,
Some(PublishOptions::new().with_masstransit("Contracts:OrderCreated"))
).await?;Consuming MassTransit messages:
Messages published by MassTransit are automatically detected and unwrapped. Your handler receives just the payload:
consumer.consume(|msg: OrderMessage| async move {
println!("Order ID: {}", msg.order_id);
Ok(())
}).await?;Access envelope metadata:
Use consume_envelopes() to access correlation IDs, timestamps, and other metadata:
use rust_rabbit::MessageEnvelope;
consumer.consume_envelopes(|envelope: MessageEnvelope<OrderMessage>| async move {
println!("Correlation ID: {:?}", envelope.metadata.correlation_id);
println!("Timestamp: {:?}", envelope.metadata.timestamp);
let order = envelope.payload;
process_order(&order).await?;
Ok(())
}).await?;use rust_rabbit::PublishOptions;
let options = PublishOptions::new()
.mandatory()
.priority(5);
publisher.publish_to_queue("orders", &message, Some(options)).await?;let consumer = Consumer::builder(connection, "order_queue")
.with_retry(RetryConfig::exponential_default())
.bind_to_exchange("order_exchange", "new.order")
.with_prefetch(10)
.build();- Rust 1.70 or higher
- RabbitMQ 3.8 or higher
- Tokio async runtime
Run the tests:
cargo testFor integration tests with real RabbitMQ:
# Start RabbitMQ with Docker
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
# Run tests
cargo testMIT License - see LICENSE for details.
Contributions are welcome. Please read our contributing guide and submit pull requests.
- Issues: GitHub Issues
- Documentation: docs.rs