Skip to content

Commit 5aa871f

Browse files
author
farhadzand
committed
feat: enhance RabbitMQ driver with advanced features and configuration options
- Expanded README to highlight new features including connection pooling, channel management, and async processing. - Introduced advanced configuration options for connection and channel pools in `rabbitmq.php` and added an example configuration file. - Implemented a `PoolStatsCommand` for monitoring connection and channel pool statistics. - Added support for delayed messages, publisher confirms, and transaction management in the RabbitMQ driver. - Enhanced the RabbitQueue class with improved error handling and retry logic. - Introduced unit tests for new features and refactored existing tests for better coverage.
1 parent db8ed8e commit 5aa871f

34 files changed

+5284
-136
lines changed

README.md

Lines changed: 914 additions & 11 deletions
Large diffs are not rendered by default.

config/rabbitmq.example.php

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
<?php
2+
3+
/**
4+
* Laravel RabbitMQ - Advanced Configuration Example
5+
*
6+
* This file demonstrates all available configuration options
7+
* for the Laravel RabbitMQ package with advanced features.
8+
*/
9+
10+
return [
11+
'driver' => 'rabbitmq',
12+
'queue' => env('RABBITMQ_QUEUE', 'default'),
13+
14+
// ==================== Connection Settings ====================
15+
'hosts' => [
16+
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
17+
'port' => env('RABBITMQ_PORT', 5672),
18+
'user' => env('RABBITMQ_USER', 'guest'),
19+
'password' => env('RABBITMQ_PASSWORD', 'guest'),
20+
'vhost' => env('RABBITMQ_VHOST', '/'),
21+
'lazy' => env('RABBITMQ_LAZY_CONNECTION', true),
22+
'keepalive' => env('RABBITMQ_KEEPALIVE_CONNECTION', false),
23+
'heartbeat' => env('RABBITMQ_HEARTBEAT_CONNECTION', 0),
24+
'secure' => env('RABBITMQ_SECURE', false),
25+
],
26+
27+
// ==================== Connection & Channel Pool ====================
28+
'pool' => [
29+
// Connection Pool Settings
30+
'max_connections' => env('RABBITMQ_MAX_CONNECTIONS', 10),
31+
'min_connections' => env('RABBITMQ_MIN_CONNECTIONS', 2),
32+
33+
// Channel Pool Settings
34+
'max_channels_per_connection' => env('RABBITMQ_MAX_CHANNELS_PER_CONNECTION', 100),
35+
36+
// Retry Strategy
37+
'max_retries' => env('RABBITMQ_MAX_RETRIES', 3),
38+
'retry_delay' => env('RABBITMQ_RETRY_DELAY', 1000), // milliseconds
39+
40+
// Health Check Settings
41+
'health_check_enabled' => env('RABBITMQ_HEALTH_CHECK_ENABLED', true),
42+
'health_check_interval' => env('RABBITMQ_HEALTH_CHECK_INTERVAL', 30), // seconds
43+
],
44+
45+
// ==================== Exponential Backoff ====================
46+
'backoff' => [
47+
'enabled' => env('RABBITMQ_BACKOFF_ENABLED', true),
48+
'base_delay' => env('RABBITMQ_BACKOFF_BASE_DELAY', 1000), // milliseconds
49+
'max_delay' => env('RABBITMQ_BACKOFF_MAX_DELAY', 60000), // milliseconds
50+
'multiplier' => env('RABBITMQ_BACKOFF_MULTIPLIER', 2.0),
51+
'jitter' => env('RABBITMQ_BACKOFF_JITTER', true),
52+
],
53+
54+
// ==================== Exchange Configuration ====================
55+
'exchanges' => [
56+
'default' => [
57+
'name' => env('RABBITMQ_EXCHANGE', ''),
58+
'type' => env('RABBITMQ_EXCHANGE_TYPE', 'direct'), // direct, fanout, topic, headers
59+
'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true),
60+
'auto_delete' => env('RABBITMQ_EXCHANGE_AUTO_DELETE', false),
61+
'arguments' => [],
62+
],
63+
64+
// Example: Topic Exchange for Notifications
65+
'notifications' => [
66+
'name' => 'notifications',
67+
'type' => 'topic',
68+
'durable' => true,
69+
'auto_delete' => false,
70+
'arguments' => [],
71+
],
72+
73+
// Example: Fanout Exchange for Broadcasting
74+
'broadcasts' => [
75+
'name' => 'broadcasts',
76+
'type' => 'fanout',
77+
'durable' => true,
78+
'auto_delete' => false,
79+
],
80+
81+
// Example: Headers Exchange
82+
'documents' => [
83+
'name' => 'documents',
84+
'type' => 'headers',
85+
'durable' => true,
86+
'auto_delete' => false,
87+
],
88+
],
89+
90+
// ==================== Queue Configuration ====================
91+
'queues' => [
92+
'default' => [
93+
'name' => env('RABBITMQ_QUEUE', 'default'),
94+
'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
95+
'auto_delete' => env('RABBITMQ_QUEUE_AUTO_DELETE', false),
96+
'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
97+
'lazy' => env('RABBITMQ_QUEUE_LAZY', false),
98+
'priority' => env('RABBITMQ_QUEUE_PRIORITY', null), // null or max priority (1-255)
99+
'arguments' => [],
100+
'bindings' => [],
101+
],
102+
103+
// Example: High Priority Queue
104+
'high-priority' => [
105+
'name' => 'high-priority',
106+
'durable' => true,
107+
'auto_delete' => false,
108+
'exclusive' => false,
109+
'lazy' => false,
110+
'priority' => 10,
111+
'arguments' => [],
112+
'bindings' => [
113+
[
114+
'exchange' => 'tasks',
115+
'routing_key' => 'urgent.*',
116+
],
117+
],
118+
],
119+
120+
// Example: Lazy Queue for High Volume
121+
'bulk-processing' => [
122+
'name' => 'bulk-processing',
123+
'durable' => true,
124+
'auto_delete' => false,
125+
'exclusive' => false,
126+
'lazy' => true, // Messages stored on disk
127+
'priority' => null,
128+
'arguments' => [
129+
'x-max-length' => 100000,
130+
'x-overflow' => 'reject-publish',
131+
],
132+
],
133+
134+
// Example: Queue with Dead Letter Exchange
135+
'orders' => [
136+
'name' => 'orders',
137+
'durable' => true,
138+
'auto_delete' => false,
139+
'exclusive' => false,
140+
'lazy' => false,
141+
'priority' => 5,
142+
'arguments' => [
143+
'x-dead-letter-exchange' => 'dlx',
144+
'x-dead-letter-routing-key' => 'orders.failed',
145+
'x-message-ttl' => 3600000, // 1 hour
146+
],
147+
],
148+
],
149+
150+
// ==================== Dead Letter Exchange ====================
151+
'dead_letter' => [
152+
'enabled' => env('RABBITMQ_DLX_ENABLED', true),
153+
'exchange' => env('RABBITMQ_DLX_EXCHANGE', 'dlx'),
154+
'exchange_type' => env('RABBITMQ_DLX_EXCHANGE_TYPE', 'direct'),
155+
'queue_suffix' => env('RABBITMQ_DLX_QUEUE_SUFFIX', '.dlq'),
156+
'ttl' => env('RABBITMQ_DLX_TTL', null), // Message TTL in milliseconds
157+
],
158+
159+
// ==================== Delayed Messages ====================
160+
'delayed_message' => [
161+
'enabled' => env('RABBITMQ_DELAYED_MESSAGE_ENABLED', false),
162+
'exchange' => env('RABBITMQ_DELAYED_EXCHANGE', 'delayed'),
163+
'plugin_enabled' => env('RABBITMQ_DELAYED_PLUGIN_ENABLED', false), // rabbitmq_delayed_message_exchange plugin
164+
],
165+
166+
// ==================== RPC Configuration ====================
167+
'rpc' => [
168+
'enabled' => env('RABBITMQ_RPC_ENABLED', false),
169+
'timeout' => env('RABBITMQ_RPC_TIMEOUT', 30), // seconds
170+
'callback_queue_prefix' => env('RABBITMQ_RPC_CALLBACK_PREFIX', 'rpc_callback_'),
171+
],
172+
173+
// ==================== Publisher Confirms ====================
174+
'publisher_confirms' => [
175+
'enabled' => env('RABBITMQ_PUBLISHER_CONFIRMS_ENABLED', false),
176+
'timeout' => env('RABBITMQ_PUBLISHER_CONFIRMS_TIMEOUT', 5), // seconds
177+
],
178+
179+
// ==================== Transactions ====================
180+
'transactions' => [
181+
'enabled' => env('RABBITMQ_TRANSACTIONS_ENABLED', false),
182+
],
183+
184+
// ==================== Options ====================
185+
'options' => [
186+
'ssl_options' => [
187+
'cafile' => env('RABBITMQ_SSL_CAFILE', null),
188+
'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
189+
'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
190+
'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
191+
'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
192+
],
193+
'queue' => [
194+
'job' => \iamfarhad\LaravelRabbitMQ\Jobs\RabbitMQJob::class,
195+
'qos' => [
196+
'prefetch_size' => 0,
197+
'prefetch_count' => 10,
198+
'global' => false,
199+
],
200+
],
201+
],
202+
];

config/rabbitmq.php

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,110 @@
1616
'secure' => env('RABBITMQ_SECURE', false),
1717
],
1818

19+
// Connection and Channel Pool Configuration
20+
'pool' => [
21+
// Connection Pool Settings
22+
'max_connections' => env('RABBITMQ_MAX_CONNECTIONS', 10),
23+
'min_connections' => env('RABBITMQ_MIN_CONNECTIONS', 2),
24+
25+
// Channel Pool Settings
26+
'max_channels_per_connection' => env('RABBITMQ_MAX_CHANNELS_PER_CONNECTION', 100),
27+
28+
// Retry Strategy
29+
'max_retries' => env('RABBITMQ_MAX_RETRIES', 3),
30+
'retry_delay' => env('RABBITMQ_RETRY_DELAY', 1000), // milliseconds
31+
32+
// Health Check Settings
33+
'health_check_enabled' => env('RABBITMQ_HEALTH_CHECK_ENABLED', true),
34+
'health_check_interval' => env('RABBITMQ_HEALTH_CHECK_INTERVAL', 30), // seconds
35+
],
36+
37+
// Exponential Backoff Configuration
38+
'backoff' => [
39+
'enabled' => env('RABBITMQ_BACKOFF_ENABLED', true),
40+
'base_delay' => env('RABBITMQ_BACKOFF_BASE_DELAY', 1000), // milliseconds
41+
'max_delay' => env('RABBITMQ_BACKOFF_MAX_DELAY', 60000), // milliseconds
42+
'multiplier' => env('RABBITMQ_BACKOFF_MULTIPLIER', 2.0),
43+
'jitter' => env('RABBITMQ_BACKOFF_JITTER', true),
44+
],
45+
46+
// Exchange Configuration
47+
'exchanges' => [
48+
'default' => [
49+
'name' => env('RABBITMQ_EXCHANGE', ''),
50+
'type' => env('RABBITMQ_EXCHANGE_TYPE', 'direct'), // direct, fanout, topic, headers
51+
'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true),
52+
'auto_delete' => env('RABBITMQ_EXCHANGE_AUTO_DELETE', false),
53+
'arguments' => [],
54+
],
55+
// Add custom exchanges here
56+
// 'notifications' => [
57+
// 'name' => 'notifications',
58+
// 'type' => 'topic',
59+
// 'durable' => true,
60+
// 'auto_delete' => false,
61+
// ],
62+
],
63+
64+
// Queue Configuration
65+
'queues' => [
66+
'default' => [
67+
'name' => env('RABBITMQ_QUEUE', 'default'),
68+
'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
69+
'auto_delete' => env('RABBITMQ_QUEUE_AUTO_DELETE', false),
70+
'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
71+
'lazy' => env('RABBITMQ_QUEUE_LAZY', false),
72+
'priority' => env('RABBITMQ_QUEUE_PRIORITY', null), // null or max priority (1-255)
73+
'arguments' => [],
74+
'bindings' => [
75+
[
76+
'exchange' => 'default',
77+
'routing_key' => '',
78+
],
79+
],
80+
],
81+
// Add custom queues here
82+
// 'high-priority' => [
83+
// 'name' => 'high-priority',
84+
// 'durable' => true,
85+
// 'priority' => 10,
86+
// ],
87+
],
88+
89+
// Dead Letter Exchange Configuration
90+
'dead_letter' => [
91+
'enabled' => env('RABBITMQ_DLX_ENABLED', true),
92+
'exchange' => env('RABBITMQ_DLX_EXCHANGE', 'dlx'),
93+
'exchange_type' => env('RABBITMQ_DLX_EXCHANGE_TYPE', 'direct'),
94+
'queue_suffix' => env('RABBITMQ_DLX_QUEUE_SUFFIX', '.dlq'),
95+
'ttl' => env('RABBITMQ_DLX_TTL', null), // Message TTL in milliseconds
96+
],
97+
98+
// Delayed Message Configuration
99+
'delayed_message' => [
100+
'enabled' => env('RABBITMQ_DELAYED_MESSAGE_ENABLED', false),
101+
'exchange' => env('RABBITMQ_DELAYED_EXCHANGE', 'delayed'),
102+
'plugin_enabled' => env('RABBITMQ_DELAYED_PLUGIN_ENABLED', false), // rabbitmq_delayed_message_exchange plugin
103+
],
104+
105+
// RPC Configuration
106+
'rpc' => [
107+
'enabled' => env('RABBITMQ_RPC_ENABLED', false),
108+
'timeout' => env('RABBITMQ_RPC_TIMEOUT', 30), // seconds
109+
'callback_queue_prefix' => env('RABBITMQ_RPC_CALLBACK_PREFIX', 'rpc_callback_'),
110+
],
111+
112+
// Publisher Confirms Configuration
113+
'publisher_confirms' => [
114+
'enabled' => env('RABBITMQ_PUBLISHER_CONFIRMS_ENABLED', false),
115+
'timeout' => env('RABBITMQ_PUBLISHER_CONFIRMS_TIMEOUT', 5), // seconds
116+
],
117+
118+
// Transaction Configuration
119+
'transactions' => [
120+
'enabled' => env('RABBITMQ_TRANSACTIONS_ENABLED', false),
121+
],
122+
19123
'options' => [
20124
'ssl_options' => [
21125
'cafile' => env('RABBITMQ_SSL_CAFILE', null),

0 commit comments

Comments
 (0)