Skip to content

Commit 33c0936

Browse files
committed
Merge remote-tracking branch 'base-origin/master'
# Conflicts: # src/Queue/AMQPQueue.php
2 parents c0a563f + 4715acb commit 33c0936

File tree

9 files changed

+234
-78
lines changed

9 files changed

+234
-78
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ In your ```config/queue.php``` file you have to provide the following:
3535
'vhost' => '/',
3636
'queue' => null,
3737
'queue_flags' => ['durable' => true, 'routing_key' => null], //Durable queue (survives server crash)
38+
'declare_queues' => true, //If we need to declare queues each time before sending a message. If not, you will have to declare them manually elsewhere
3839
'message_properties' => ['delivery_mode' => 2], //Persistent messages (survives server crash)
3940
'channel_id' => null,
4041
'exchange_name' => null,

composer.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
"require-dev": {
1111
"laravel/framework": ">=5.3.0",
1212
"squizlabs/php_codesniffer": "1.*",
13-
"orchestra/testbench": "3.3.*",
14-
"phpunit/phpunit": "4.3.*"
13+
"orchestra/testbench": "~3.4",
14+
"phpunit/phpunit": "~6.1"
1515
},
1616
"autoload": {
1717
"psr-4": {

src/Connectors/AmqpConnector.php

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,9 @@ public function connect(array $config)
4848
}
4949

5050
return new AMQPQueue(
51-
$connection,
52-
$config['queue'],
53-
$config['queue_flags'],
54-
$config['message_properties'],
55-
$config['channel_id'],
56-
$config['exchange_name'],
57-
$config['exchange_type'],
58-
$config['exchange_flags']
51+
$connection, $config['queue'], $config['queue_flags'], $config['declare_queues'],
52+
$config['message_properties'], $config['channel_id'],
53+
$config['exchange_name'], $config['exchange_type'], $config['exchange_flags']
5954
);
6055
}
6156
}

src/Jobs/AMQPJob.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ public function getQueue()
121121
* Get the job identifier.
122122
*
123123
* @return string
124+
* @throws \OutOfBoundsException
124125
*/
125126
public function getJobId()
126127
{

src/Queue/AMQPQueue.php

Lines changed: 142 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -52,32 +52,42 @@ class AMQPQueue extends Queue implements QueueContract
5252
* @var string Default channel id if needed
5353
*/
5454
private $defaultChannelId;
55+
5556
/**
5657
* @var array
5758
*/
5859
private $queueFlags;
60+
5961
/**
6062
* @var array
6163
*/
6264
private $messageProperties;
6365

66+
/**
67+
* @var bool
68+
*/
69+
private $declareQueues;
70+
6471
/**
6572
* @param AMQPStreamConnection $connection
66-
* @param string $defaultQueueName Default queue name
67-
* @param array $queueFlags Queue flags See a list of parameters to
68-
* \PhpAmqpLib\Channel\AMQPChannel::queue_declare. Parameters should be
69-
* passed like for call_user_func_array in this parameter
70-
* @param array $messageProperties This is passed as a second parameter to \PhpAmqpLib\Message\AMQPMessage
71-
* constructor
72-
* @param string $defaultChannelId Default channel id
73-
* @param string $exchangeName Exchange name
74-
* @param mixed $exchangeType Exchange type
75-
* @param mixed $exchangeFlags Exchange flags
73+
* @param string $defaultQueueName Default queue name
74+
* @param array $queueFlags Queue flags See a list of parameters to
75+
* \PhpAmqpLib\Channel\AMQPChannel::queue_declare. Parameters should
76+
* be passed like for call_user_func_array in this parameter
77+
* @param bool $declareQueues If we should declare queues before actually trying to send a
78+
* message
79+
* @param array $messageProperties This is passed as a second parameter to
80+
* \PhpAmqpLib\Message\AMQPMessage constructor
81+
* @param string $defaultChannelId Default channel id
82+
* @param string $exchangeName Exchange name
83+
* @param mixed $exchangeType Exchange type
84+
* @param mixed $exchangeFlags Exchange flags
7685
*/
7786
public function __construct(
7887
AMQPStreamConnection $connection,
7988
$defaultQueueName = null,
8089
$queueFlags = [],
90+
$declareQueues = true,
8191
$messageProperties = [],
8292
$defaultChannelId = null,
8393
$exchangeName = '',
@@ -87,6 +97,7 @@ public function __construct(
8797
$this->connection = $connection;
8898
$this->defaultQueueName = $defaultQueueName ?: 'default';
8999
$this->queueFlags = $queueFlags;
100+
$this->declareQueues = $declareQueues;
90101
$this->messageProperties = $messageProperties;
91102
$this->defaultChannelId = $defaultChannelId;
92103
$this->exchangeName = $exchangeName;
@@ -98,9 +109,9 @@ public function __construct(
98109
}
99110

100111
/**
101-
* @param string $exchangeName The name of the exchange. For example, 'logs'
102-
* @param string $exchangeType The type of the exchange. See EXCHANGE_TYPE_* constants for details
103-
* @param array $exchangeFlags The flags of the exchange. See \PhpAmqpLib\Channel\AMQPChannel::exchange_declare
112+
* @param string $exchangeName The name of the exchange. For example, 'logs'
113+
* @param string $exchangeType The type of the exchange. See EXCHANGE_TYPE_* constants for details
114+
* @param array $exchangeFlags The flags of the exchange. See \PhpAmqpLib\Channel\AMQPChannel::exchange_declare
104115
* (from third parameter onwards). Must be an assoc array. Default flags can be omitted
105116
*
106117
* @see \PhpAmqpLib\Channel\AMQPChannel::exchange_declare
@@ -109,15 +120,15 @@ public function __construct(
109120
protected function declareExchange($exchangeName, $exchangeType, array $exchangeFlags = [])
110121
{
111122
$flags = array_replace([
112-
'exchange' => $exchangeName,
113-
'type' => $exchangeType,
114-
'passive' => false,
115-
'durable' => false,
123+
'exchange' => $exchangeName,
124+
'type' => $exchangeType,
125+
'passive' => false,
126+
'durable' => false,
116127
'auto_delete' => true,
117-
'internal' => false,
118-
'nowait' => false,
119-
'arguments' => null,
120-
'ticket' => null,
128+
'internal' => false,
129+
'nowait' => false,
130+
'arguments' => null,
131+
'ticket' => null,
121132
], $exchangeFlags);
122133

123134
call_user_func_array([$this->channel, 'exchange_declare'], $flags);
@@ -130,18 +141,50 @@ protected function declareExchange($exchangeName, $exchangeType, array $exchange
130141
* @param mixed $data Job custom data. Usually array
131142
* @param string $queue Queue name, if different from the default one
132143
*
144+
* @throws \Illuminate\Queue\InvalidPayloadException
133145
* @throws AMQPException
134146
* @return bool Always true
135147
*/
136148
public function push($job, $data = '', $queue = null)
137149
{
138-
$queue = $this->getQueueName($queue);
139-
$this->declareQueue($queue);
150+
$queue = $this->prepareQueue($queue);
140151
$payload = new AMQPMessage($this->createPayload($job, $data), $this->messageProperties);
141152
$this->channel->basic_publish($payload, $this->exchangeName, $this->getRoutingKey($queue));
153+
154+
return true;
155+
}
156+
157+
158+
/**
159+
* Adds the message to internal buffer to be sent later in a batch
160+
*
161+
* @param string $job Job implementation class name
162+
* @param mixed $data Job custom data. Usually array
163+
* @param string $queue Queue name, if different from the default one
164+
*
165+
* @return bool
166+
* @throws \Illuminate\Queue\InvalidPayloadException
167+
* @throws AMQPException
168+
*/
169+
public function addMessageToBatch($job, $data = '', $queue = null)
170+
{
171+
$queue = $this->prepareQueue($queue);
172+
$payload = new AMQPMessage($this->createPayload($job, $data), $this->messageProperties);
173+
$this->channel->batch_basic_publish($payload, $this->exchangeName, $this->getRoutingKey($queue));
174+
142175
return true;
143176
}
144177

178+
/**
179+
* Publishes and internal buffer of messages, that were added before with {@see addMessageToBatch}
180+
*
181+
* @return void
182+
*/
183+
public function pushBatch()
184+
{
185+
$this->channel->publish_batch();
186+
}
187+
145188
/**
146189
* Helper to return a default queue name in case passed param is empty
147190
*
@@ -164,29 +207,30 @@ protected function getQueueName($name)
164207
*
165208
* @param string $name The name of the queue to declare
166209
*
167-
* @return void
210+
* @return QueueInfo
211+
* @throws AMQPException
168212
*/
169213
public function declareQueue($name)
170214
{
171215
$queue = $this->getQueueName($name);
172216
$flags = array_replace_recursive([
173-
'queue' => $queue,
174-
'passive' => false,
175-
'durable' => false,
176-
'exclusive' => false,
217+
'queue' => $queue,
218+
'passive' => false,
219+
'durable' => false,
220+
'exclusive' => false,
177221
'auto_delete' => true,
178-
'nowait' => false,
179-
'arguments' => null,
180-
'ticket' => null,
222+
'nowait' => false,
223+
'arguments' => null,
224+
'ticket' => null,
181225
], $this->getQueueFlags($name));
182226

183-
call_user_func_array([$this->channel, 'queue_declare'], $flags);
227+
return QueueInfo::createFromDeclareOk(call_user_func_array([$this->channel, 'queue_declare'], $flags));
184228
}
185229

186230
/**
187-
* @param string $queueName
231+
* @param string $queueName
188232
* @param null|string $deferredQueueName
189-
* @param null|int $deferredQueueDelay
233+
* @param null|int $deferredQueueDelay
190234
*
191235
* @return array
192236
*/
@@ -206,6 +250,7 @@ protected function getQueueFlags($queueName, $deferredQueueName = null, $deferre
206250
* Get routing key from config or use default one (queue name)
207251
*
208252
* @param $queue string
253+
*
209254
* @return string Routing key name
210255
*/
211256
protected function getRoutingKey($queue)
@@ -221,13 +266,13 @@ protected function getRoutingKey($queue)
221266
* @param array $options Currently unused
222267
*
223268
* @return bool Always true
269+
* @throws AMQPException
224270
*/
225271
public function pushRaw($payload, $queue = null, array $options = [])
226272
{
227-
$queue = $this->getQueueName($queue);
228-
$this->declareQueue($queue);
229-
$payload = new AMQPMessage($payload, $this->messageProperties);
230-
$this->channel->basic_publish($payload, $this->exchangeName, $queue);
273+
$queue = $this->prepareQueue($queue);
274+
$amqpPayload = new AMQPMessage($payload, $this->messageProperties);
275+
$this->channel->basic_publish($amqpPayload, $this->exchangeName, $queue);
231276
return true;
232277
}
233278

@@ -240,15 +285,16 @@ public function pushRaw($payload, $queue = null, array $options = [])
240285
* @param string $queue Queue name, if different from the default one
241286
*
242287
* @return bool Always true
288+
* @throws \Illuminate\Queue\InvalidPayloadException
289+
* @throws AMQPException
243290
*/
244291
public function later($delay, $job, $data = '', $queue = null)
245292
{
246293
if ($delay instanceof \DateTime) {
247294
$delay = $delay->getTimestamp() - time();
248295
}
249296

250-
$queue = $this->getQueueName($queue);
251-
$this->declareQueue($queue);
297+
$queue = $this->prepareQueue($queue);
252298
$delayedQueueName = $this->declareDelayedQueue($queue, $delay);
253299

254300
$payload = new AMQPMessage($this->createPayload($job, $data), $this->messageProperties);
@@ -263,28 +309,29 @@ public function later($delay, $job, $data = '', $queue = null)
263309
* @param int $delay Queue delay in seconds
264310
*
265311
* @return string Deferred queue name for the specified delay
312+
* @throws AMQPException
266313
*/
267314
public function declareDelayedQueue($destinationQueueName, $delay)
268315
{
269316
$destinationQueueName = $this->getQueueName($destinationQueueName);
270317
$deferredQueueName = $destinationQueueName . '_deferred_' . $delay;
271318

272319
$flags = array_replace_recursive([
273-
'queue' => '',
274-
'passive' => false,
275-
'durable' => false,
276-
'exclusive' => false,
320+
'queue' => '',
321+
'passive' => false,
322+
'durable' => false,
323+
'exclusive' => false,
277324
'auto_delete' => true,
278-
'nowait' => false,
279-
'arguments' => null,
280-
'ticket' => null,
325+
'nowait' => false,
326+
'arguments' => null,
327+
'ticket' => null,
281328
], $this->getQueueFlags($destinationQueueName, $deferredQueueName, $delay), [
282-
'queue' => $deferredQueueName,
283-
'durable' => true,
329+
'queue' => $deferredQueueName,
330+
'durable' => true,
284331
'arguments' => new AMQPTable([
285-
'x-dead-letter-exchange' => '',
332+
'x-dead-letter-exchange' => '',
286333
'x-dead-letter-routing-key' => $destinationQueueName,
287-
'x-message-ttl' => intval($delay * 1000),
334+
'x-message-ttl' => intval($delay * 1000),
288335
]),
289336
]);
290337

@@ -298,11 +345,11 @@ public function declareDelayedQueue($destinationQueueName, $delay)
298345
* @param string|null $queue Queue name if different from the default one
299346
*
300347
* @return \Illuminate\Queue\Jobs\Job|null Job instance or null if no unhandled jobs available
348+
* @throws AMQPException
301349
*/
302350
public function pop($queue = null)
303351
{
304-
$queue = $this->getQueueName($queue);
305-
$this->declareQueue($queue);
352+
$queue = $this->prepareQueue($queue);
306353
$envelope = $this->channel->basic_get($queue);
307354

308355
if ($envelope instanceof AMQPMessage) {
@@ -316,9 +363,51 @@ public function pop($queue = null)
316363
* Get the size of the queue.
317364
*
318365
* @param string $queue
366+
*
319367
* @return int
368+
* @throws AMQPException
320369
*/
321370
public function size($queue = null)
322371
{
372+
$data = $this->declareQueue($this->getQueueName($queue));
373+
return $data->getJobs();
374+
}
375+
376+
/**
377+
* @return array
378+
*/
379+
public function getMessageProperties()
380+
{
381+
return $this->messageProperties;
382+
}
383+
384+
/**
385+
* @param array $messageProperties
386+
*
387+
* @return AMQPQueue
388+
*/
389+
public function setMessageProperties(array $messageProperties)
390+
{
391+
$this->messageProperties = $messageProperties;
392+
393+
return $this;
394+
}
395+
396+
/**
397+
* Prepares a queue for later use. Declares it if needed
398+
*
399+
* @param string $queue
400+
*
401+
* @return string
402+
* @throws AMQPException
403+
*/
404+
private function prepareQueue($queue)
405+
{
406+
$queue = $this->getQueueName($queue);
407+
if ($this->declareQueues) {
408+
$this->declareQueue($queue);
409+
}
410+
411+
return $queue;
323412
}
324413
}

0 commit comments

Comments
 (0)