Skip to content

Commit 4715acb

Browse files
committed
Batch message publishing capabilities added to dev-master version. Some small fixes.
1 parent 5d25b20 commit 4715acb

File tree

4 files changed

+60
-20
lines changed

4 files changed

+60
-20
lines changed

src/Jobs/AMQPJob.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ public function getQueue()
121121
* Get the job identifier.
122122
*
123123
* @return string
124+
* @throws \OutOfBoundsException
124125
*/
125126
public function getJobId()
126127
{

src/Queue/AMQPQueue.php

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -147,15 +147,44 @@ protected function declareExchange($exchangeName, $exchangeType, array $exchange
147147
*/
148148
public function push($job, $data = '', $queue = null)
149149
{
150-
$queue = $this->getQueueName($queue);
151-
if ($this->declareQueues) {
152-
$this->declareQueue($queue);
153-
}
150+
$queue = $this->prepareQueue($queue);
154151
$payload = new AMQPMessage($this->createPayload($job, $data), $this->messageProperties);
155152
$this->channel->basic_publish($payload, $this->exchangeName, $this->getRoutingKey($queue));
153+
154+
return true;
155+
}
156+
157+
158+
/**
159+
* Adds the message to internal buffer to be sent later in a batch
160+
*
161+
* @param string $job Job implementation class name
162+
* @param mixed $data Job custom data. Usually array
163+
* @param string $queue Queue name, if different from the default one
164+
*
165+
* @return bool
166+
* @throws \Illuminate\Queue\InvalidPayloadException
167+
* @throws AMQPException
168+
*/
169+
public function addMessageToBatch($job, $data = '', $queue = null)
170+
{
171+
$queue = $this->prepareQueue($queue);
172+
$payload = new AMQPMessage($this->createPayload($job, $data), $this->messageProperties);
173+
$this->channel->batch_basic_publish($payload, $this->exchangeName, $this->getRoutingKey($queue));
174+
156175
return true;
157176
}
158177

178+
/**
179+
* Publishes and internal buffer of messages, that were added before with {@see addMessageToBatch}
180+
*
181+
* @return void
182+
*/
183+
public function pushBatch()
184+
{
185+
$this->channel->publish_batch();
186+
}
187+
159188
/**
160189
* Helper to return a default queue name in case passed param is empty
161190
*
@@ -241,10 +270,7 @@ protected function getRoutingKey($queue)
241270
*/
242271
public function pushRaw($payload, $queue = null, array $options = [])
243272
{
244-
$queue = $this->getQueueName($queue);
245-
if ($this->declareQueues) {
246-
$this->declareQueue($queue);
247-
}
273+
$queue = $this->prepareQueue($queue);
248274
$amqpPayload = new AMQPMessage($payload, $this->messageProperties);
249275
$this->channel->basic_publish($amqpPayload, $this->exchangeName, $queue);
250276
return true;
@@ -259,6 +285,7 @@ public function pushRaw($payload, $queue = null, array $options = [])
259285
* @param string $queue Queue name, if different from the default one
260286
*
261287
* @return bool Always true
288+
* @throws \Illuminate\Queue\InvalidPayloadException
262289
* @throws AMQPException
263290
*/
264291
public function later($delay, $job, $data = '', $queue = null)
@@ -267,10 +294,7 @@ public function later($delay, $job, $data = '', $queue = null)
267294
$delay = $delay->getTimestamp() - time();
268295
}
269296

270-
$queue = $this->getQueueName($queue);
271-
if ($this->declareQueues) {
272-
$this->declareQueue($queue);
273-
}
297+
$queue = $this->prepareQueue($queue);
274298
$delayedQueueName = $this->declareDelayedQueue($queue, $delay);
275299

276300
$payload = new AMQPMessage($this->createPayload($job, $data), $this->messageProperties);
@@ -325,10 +349,7 @@ public function declareDelayedQueue($destinationQueueName, $delay)
325349
*/
326350
public function pop($queue = null)
327351
{
328-
$queue = $this->getQueueName($queue);
329-
if ($this->declareQueues) {
330-
$this->declareQueue($queue);
331-
}
352+
$queue = $this->prepareQueue($queue);
332353
$envelope = $this->channel->basic_get($queue);
333354

334355
if ($envelope instanceof AMQPMessage) {
@@ -371,4 +392,22 @@ public function setMessageProperties(array $messageProperties)
371392

372393
return $this;
373394
}
395+
396+
/**
397+
* Prepares a queue for later use. Declares it if needed
398+
*
399+
* @param string $queue
400+
*
401+
* @return string
402+
* @throws AMQPException
403+
*/
404+
private function prepareQueue($queue)
405+
{
406+
$queue = $this->getQueueName($queue);
407+
if ($this->declareQueues) {
408+
$this->declareQueue($queue);
409+
}
410+
411+
return $queue;
412+
}
374413
}

src/Queue/QueueInfo.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,23 +45,23 @@ public static function createFromDeclareOk(array $declareOkData)
4545
/**
4646
* @return string
4747
*/
48-
public function getName(): string
48+
public function getName()
4949
{
5050
return $this->name;
5151
}
5252

5353
/**
5454
* @return int
5555
*/
56-
public function getJobs(): int
56+
public function getJobs()
5757
{
5858
return $this->jobs;
5959
}
6060

6161
/**
6262
* @return int
6363
*/
64-
public function getConsumers(): int
64+
public function getConsumers()
6565
{
6666
return $this->consumers;
6767
}

src/Utility/ArrayUtil.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public static function arrayFilterRecursive(array $input, callable $callback)
5454
public static function removeNullsRecursive(array $arr)
5555
{
5656
return static::arrayFilterRecursive($arr, function ($var) {
57-
return !is_null($var);
57+
return null !== $var;
5858
});
5959
}
6060
}

0 commit comments

Comments
 (0)