Skip to content

Commit 5d25b20

Browse files
committed
Added code to support queue priorities with current version of the library interface
1 parent 9359ba3 commit 5d25b20

File tree

1 file changed

+29
-2
lines changed

1 file changed

+29
-2
lines changed

src/Queue/AMQPQueue.php

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ protected function declareExchange($exchangeName, $exchangeType, array $exchange
141141
* @param mixed $data Job custom data. Usually array
142142
* @param string $queue Queue name, if different from the default one
143143
*
144+
* @throws \Illuminate\Queue\InvalidPayloadException
144145
* @throws AMQPException
145146
* @return bool Always true
146147
*/
@@ -178,6 +179,7 @@ protected function getQueueName($name)
178179
* @param string $name The name of the queue to declare
179180
*
180181
* @return QueueInfo
182+
* @throws AMQPException
181183
*/
182184
public function declareQueue($name)
183185
{
@@ -235,15 +237,16 @@ protected function getRoutingKey($queue)
235237
* @param array $options Currently unused
236238
*
237239
* @return bool Always true
240+
* @throws AMQPException
238241
*/
239242
public function pushRaw($payload, $queue = null, array $options = [])
240243
{
241244
$queue = $this->getQueueName($queue);
242245
if ($this->declareQueues) {
243246
$this->declareQueue($queue);
244247
}
245-
$payload = new AMQPMessage($payload, $this->messageProperties);
246-
$this->channel->basic_publish($payload, $this->exchangeName, $queue);
248+
$amqpPayload = new AMQPMessage($payload, $this->messageProperties);
249+
$this->channel->basic_publish($amqpPayload, $this->exchangeName, $queue);
247250
return true;
248251
}
249252

@@ -256,6 +259,7 @@ public function pushRaw($payload, $queue = null, array $options = [])
256259
* @param string $queue Queue name, if different from the default one
257260
*
258261
* @return bool Always true
262+
* @throws AMQPException
259263
*/
260264
public function later($delay, $job, $data = '', $queue = null)
261265
{
@@ -281,6 +285,7 @@ public function later($delay, $job, $data = '', $queue = null)
281285
* @param int $delay Queue delay in seconds
282286
*
283287
* @return string Deferred queue name for the specified delay
288+
* @throws AMQPException
284289
*/
285290
public function declareDelayedQueue($destinationQueueName, $delay)
286291
{
@@ -316,6 +321,7 @@ public function declareDelayedQueue($destinationQueueName, $delay)
316321
* @param string|null $queue Queue name if different from the default one
317322
*
318323
* @return \Illuminate\Queue\Jobs\Job|null Job instance or null if no unhandled jobs available
324+
* @throws AMQPException
319325
*/
320326
public function pop($queue = null)
321327
{
@@ -338,10 +344,31 @@ public function pop($queue = null)
338344
* @param string $queue
339345
*
340346
* @return int
347+
* @throws AMQPException
341348
*/
342349
public function size($queue = null)
343350
{
344351
$data = $this->declareQueue($this->getQueueName($queue));
345352
return $data->getJobs();
346353
}
354+
355+
/**
356+
* @return array
357+
*/
358+
public function getMessageProperties()
359+
{
360+
return $this->messageProperties;
361+
}
362+
363+
/**
364+
* @param array $messageProperties
365+
*
366+
* @return AMQPQueue
367+
*/
368+
public function setMessageProperties(array $messageProperties)
369+
{
370+
$this->messageProperties = $messageProperties;
371+
372+
return $this;
373+
}
347374
}

0 commit comments

Comments
 (0)