Skip to content

Commit 0ea949e

Browse files
committed
Batch message publishing capabilities added to dev-master version. Some small fixes.
1 parent 46e9a5a commit 0ea949e

File tree

1 file changed

+18
-17
lines changed

1 file changed

+18
-17
lines changed

src/Queue/AMQPQueue.php

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Forumhouse\LaravelAmqp\Jobs\AMQPJob;
77
use Forumhouse\LaravelAmqp\Utility\ArrayUtil;
88
use Illuminate\Contracts\Queue\Queue as QueueContract;
9+
use Illuminate\Queue\InvalidPayloadException;
910
use Illuminate\Queue\Queue;
1011
use PhpAmqpLib\Channel\AMQPChannel;
1112
use PhpAmqpLib\Connection\AMQPConnection;
@@ -141,14 +142,14 @@ protected function declareExchange($exchangeName, $exchangeType, array $exchange
141142
* @param mixed $data Job custom data. Usually array
142143
* @param string $queue Queue name, if different from the default one
143144
*
144-
* @throws \Illuminate\Queue\InvalidPayloadException
145+
* @throws InvalidPayloadException
145146
* @throws AMQPException
146147
* @return bool Always true
147148
*/
148149
public function push($job, $data = '', $queue = null)
149150
{
150151
$queue = $this->prepareQueue($queue);
151-
$amqpMessage = $this->priorityMessage($job, $data);
152+
$amqpMessage = $this->prepareMessage($job, $data);
152153
$this->channel->basic_publish($amqpMessage, $this->exchangeName, $this->getRoutingKey($queue));
153154

154155
return true;
@@ -162,13 +163,13 @@ public function push($job, $data = '', $queue = null)
162163
* @param string $queue Queue name, if different from the default one
163164
*
164165
* @return bool
165-
* @throws \Illuminate\Queue\InvalidPayloadException
166+
* @throws InvalidPayloadException
166167
* @throws AMQPException
167168
*/
168169
public function addMessageToBatch($job, $data = '', $queue = null)
169170
{
170171
$queue = $this->prepareQueue($queue);
171-
$amqpMessage = $this->priorityMessage($job, $data);
172+
$amqpMessage = $this->prepareMessage($job, $data);
172173
$this->channel->batch_basic_publish($amqpMessage, $this->exchangeName, $this->getRoutingKey($queue));
173174

174175
return true;
@@ -287,7 +288,7 @@ public function pushRaw($payload, $queue = null, array $options = [])
287288
* @param string $queue Queue name, if different from the default one
288289
*
289290
* @return bool Always true
290-
* @throws \Illuminate\Queue\InvalidPayloadException
291+
* @throws InvalidPayloadException
291292
* @throws AMQPException
292293
*/
293294
public function later($delay, $job, $data = '', $queue = null)
@@ -299,7 +300,7 @@ public function later($delay, $job, $data = '', $queue = null)
299300
$queue = $this->prepareQueue($queue);
300301
$delayedQueueName = $this->declareDelayedQueue($queue, $delay);
301302

302-
$amqpMessage = $this->priorityMessage($job, $data);
303+
$amqpMessage = $this->prepareMessage($job, $data);
303304
$this->channel->basic_publish($amqpMessage, $this->exchangeName, $delayedQueueName);
304305
return true;
305306
}
@@ -416,13 +417,15 @@ private function prepareQueue($queue)
416417

417418

418419
// FNX - ADD DYNAMIC PRIORITY TO MESSAGE
420+
419421
/**
420-
* @param string $job
421-
* @param mixed $data
422+
* @param string $job
423+
* @param mixed $data
422424
*
423425
* @return AMQPMessage
426+
* @throws InvalidPayloadException
424427
*/
425-
protected function priorityMessage($job, $data)
428+
protected function prepareMessage($job, $data)
426429
{
427430
$payloadJson = $this->createPayload($job, $data);
428431
$arrPayload = json_decode($payloadJson, true);
@@ -433,8 +436,7 @@ protected function priorityMessage($job, $data)
433436
if (!empty($arrPayload['priority']) && $arrPayload['priority'] > 0)
434437
$props['priority'] = $arrPayload['priority'];
435438

436-
$amqpMessage = new AMQPMessage($payloadJson, $props);
437-
return $amqpMessage;
439+
return new AMQPMessage($payloadJson, $props);
438440

439441
}
440442

@@ -454,11 +456,10 @@ protected function createObjectPayload($job)
454456
],
455457
'displayName' => $this->getDisplayName($job),
456458
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
457-
'maxTries' => $job->tries ?? null,
458-
'timeout' => $job->timeout ?? null,
459+
'maxTries' => empty($job->tries) ? null : $job->tries,
460+
'timeout' => empty($job->timeout) ? null : $job->timeout,
459461
'timeoutAt' => $this->getJobExpiration($job),
460-
// PRIORITY
461-
'priority' => $job->priority ?? null,
462+
'priority' => empty($job->priority) ? null : $job->priority,
462463
];
463464
}
464465
/**
@@ -479,7 +480,7 @@ protected function createStringPayload($job, $payload)
479480
'data' => $payload['data'],
480481
// BUG-FIX FOR ATTEMPTS HERE (OTHERWISE THIS FUNCTION IS THE SAME AS THE BASE IMPLEMENTATION):
481482
'attempts' => $payload['attempts'],
482-
'priority' => $payload['priority'] ?? null,
483+
'priority' => empty($payload['priority']) ? null : $payload['priority'],
483484
];
484485
}
485-
}
486+
}

0 commit comments

Comments
 (0)