Skip to content

Commit cab792d

Browse files
committed
implemented dynamic message priority
1 parent bde2db2 commit cab792d

File tree

1 file changed

+58
-7
lines changed

1 file changed

+58
-7
lines changed

src/Queue/AMQPQueue.php

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,8 @@ protected function declareExchange($exchangeName, $exchangeType, array $exchange
148148
public function push($job, $data = '', $queue = null)
149149
{
150150
$queue = $this->prepareQueue($queue);
151-
$payload = new AMQPMessage($this->createPayload($job, $data), $this->messageProperties);
152-
$this->channel->basic_publish($payload, $this->exchangeName, $this->getRoutingKey($queue));
151+
$amqpMessage = $this->priorityMessage($job, $data);
152+
$this->channel->basic_publish($amqpMessage, $this->exchangeName, $this->getRoutingKey($queue));
153153

154154
return true;
155155
}
@@ -168,8 +168,8 @@ public function push($job, $data = '', $queue = null)
168168
public function addMessageToBatch($job, $data = '', $queue = null)
169169
{
170170
$queue = $this->prepareQueue($queue);
171-
$payload = new AMQPMessage($this->createPayload($job, $data), $this->messageProperties);
172-
$this->channel->batch_basic_publish($payload, $this->exchangeName, $this->getRoutingKey($queue));
171+
$amqpMessage = $this->priorityMessage($job, $data);
172+
$this->channel->batch_basic_publish($amqpMessage, $this->exchangeName, $this->getRoutingKey($queue));
173173

174174
return true;
175175
}
@@ -269,6 +269,9 @@ protected function getRoutingKey($queue)
269269
*/
270270
public function pushRaw($payload, $queue = null, array $options = [])
271271
{
272+
// NB: DYNAMIC PRIORITY IS NOT IMPLEMENTED FOR RAW MESSAGES,
273+
// NB: NEED TO SET THE $this->messageProperties['priority'] FIELD
274+
272275
$queue = $this->prepareQueue($queue);
273276
$amqpPayload = new AMQPMessage($payload, $this->messageProperties);
274277
$this->channel->basic_publish($amqpPayload, $this->exchangeName, $queue);
@@ -296,8 +299,8 @@ public function later($delay, $job, $data = '', $queue = null)
296299
$queue = $this->prepareQueue($queue);
297300
$delayedQueueName = $this->declareDelayedQueue($queue, $delay);
298301

299-
$payload = new AMQPMessage($this->createPayload($job, $data), $this->messageProperties);
300-
$this->channel->basic_publish($payload, $this->exchangeName, $delayedQueueName);
302+
$amqpMessage = $this->priorityMessage($job, $data);
303+
$this->channel->basic_publish($amqpMessage, $this->exchangeName, $delayedQueueName);
301304
return true;
302305
}
303306

@@ -410,7 +413,54 @@ private function prepareQueue($queue)
410413
return $queue;
411414
}
412415

413-
// FNX OVERRIDE STRING PAYLOAD GENERATION
416+
417+
418+
// FNX - ADD DYNAMIC PRIORITY TO MESSAGE
419+
/**
420+
* @param string $job
421+
* @param mixed $data
422+
*
423+
* @return AMQPMessage
424+
*/
425+
protected function priorityMessage($job, $data)
426+
{
427+
$payloadJson = $this->createPayload($job, $data);
428+
$arrPayload = json_decode($payloadJson, true);
429+
430+
// OVERRIDE PRIORITY
431+
// NB: IMPLEMENT QUEUE DEFAULT PRIORITY USING THE FIELD `message_properties['priority']` IN THE CONFIG
432+
$props = $this->messageProperties;
433+
if (!empty($arrPayload['priority']) && $arrPayload['priority'] > 0)
434+
$props['priority'] = $arrPayload['priority'];
435+
436+
$amqpMessage = new AMQPMessage($payloadJson, $props);
437+
return $amqpMessage;
438+
439+
}
440+
441+
// FNX - OVERRIDE PAYLOAD GENERATION
442+
/**
443+
* Create a payload for an object-based queue handler.
444+
*
445+
* @param mixed $job
446+
* @return array
447+
*/
448+
protected function createObjectPayload($job)
449+
{
450+
return [
451+
'data' => [
452+
'command' => serialize(clone $job),
453+
'commandName' => get_class($job),
454+
],
455+
'displayName' => $this->getDisplayName($job),
456+
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
457+
'maxTries' => $job->tries ?? null,
458+
'timeout' => $job->timeout ?? null,
459+
'timeoutAt' => $this->getJobExpiration($job),
460+
// PRIORITY
461+
'priority' => $job->priority ?? null,
462+
];
463+
}
414464
/**
415465
* Create a typical, string based queue payload array.
416466
*
@@ -429,6 +479,7 @@ protected function createStringPayload($job, $payload)
429479
'data' => $payload['data'],
430480
// BUG-FIX FOR ATTEMPTS HERE (OTHERWISE THIS FUNCTION IS THE SAME AS THE BASE IMPLEMENTATION):
431481
'attempts' => $payload['attempts'],
482+
'priority' => $payload['priority'] ?? null,
432483
];
433484

434485
return $x;

0 commit comments

Comments
 (0)