Skip to content

Commit 54687ec

Browse files
committed
qos count configurable
1 parent c791dcf commit 54687ec

File tree

2 files changed

+40
-6
lines changed

2 files changed

+40
-6
lines changed

src/Provider/Amqp/AmqpQueueProvider.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ class AmqpQueueProvider extends AbstractQueueProvider
4444
protected $_reconnectInterval = 1800;
4545
protected $_lastConnectTime = 0;
4646

47+
protected $_qosCount;
48+
4749
/**
4850
* @var AMQPMessage[]
4951
*/
@@ -347,6 +349,14 @@ protected function _getChannel()
347349
if($this->_channel === null)
348350
{
349351
$this->_channel = $this->_getConnection()->channel();
352+
$config = $this->config();
353+
if($config->has('qos_count') || $config->has('qos_size'))
354+
{
355+
$this->setPrefetch(
356+
$config->getItem('qos_count', 0),
357+
$config->getItem('qos_size', 0)
358+
);
359+
}
350360
}
351361
return $this->_channel;
352362
}
@@ -383,8 +393,18 @@ public function disconnect()
383393
$this->_exchange = null;
384394
}
385395

396+
public function batchConsume(callable $callback, $batchSize)
397+
{
398+
if($this->_qosCount && $batchSize > $this->_qosCount)
399+
{
400+
throw new \Exception('Cannot consume batches greater than QoS');
401+
}
402+
parent::batchConsume($callback, $batchSize);
403+
}
404+
386405
public function setPrefetch($count, $size = 0)
387406
{
407+
$this->_qosCount = $count;
388408
$this->_getChannel()->basic_qos($size, $count, false);
389409
return $this;
390410
}

tests/Provider/AmqpTest.php

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
<?php
22
namespace Packaged\Queue\Tests;
33

4+
use Packaged\Config\ConfigSectionInterface;
5+
use Packaged\Config\Provider\ConfigSection;
46
use Packaged\Queue\Provider\Amqp\AmqpQueueProvider;
57

68
class AmqpTest extends \PHPUnit_Framework_TestCase
@@ -24,12 +26,12 @@ function ($message, $deliveryTag) use ($q)
2426

2527
public function testBatchAck()
2628
{
27-
$q = AmqpQueueProvider::create('test.batch.ack')
29+
$config = new ConfigSection('', ['wait_time' => 1, 'qos_count' => 250]);
30+
$q = $this->_getQueue('test.batch.ack', $config)
2831
->declareExchange()
2932
->declareQueue()
3033
->bindQueue()
3134
->purge();
32-
$q->config()->addItem('wait_time', 1);
3335

3436
$total = 1000;
3537

@@ -68,12 +70,12 @@ function (array $messages) use ($q, &$c)
6870

6971
public function testBatchNack()
7072
{
71-
$q = AmqpQueueProvider::create('test.batch.nack')
73+
$config = new ConfigSection('', ['wait_time' => 1]);
74+
$q = $this->_getQueue('test.batch.nack', $config)
7275
->declareExchange()
7376
->declareQueue()
7477
->bindQueue()
7578
->purge();
76-
$q->config()->addItem('wait_time', 1);
7779

7880
$total = 1000;
7981

@@ -112,12 +114,12 @@ function (array $messages) use ($q, &$c)
112114

113115
public function testRequeue()
114116
{
115-
$q = AmqpQueueProvider::create('test.batch.requeue')
117+
$config = new ConfigSection('', ['wait_time' => 1]);
118+
$q = $this->_getQueue('test.batch.requeue', $config)
116119
->declareExchange()
117120
->declareQueue()
118121
->bindQueue()
119122
->purge();
120-
$q->config()->addItem('wait_time', 1);
121123

122124
$total = 250;
123125

@@ -157,4 +159,16 @@ function (array $messages) use ($q, &$count)
157159
);
158160
$this->assertEquals($total, $count);
159161
}
162+
163+
protected function _getQueue(
164+
$queueName, ConfigSectionInterface $config = null
165+
)
166+
{
167+
$q = AmqpQueueProvider::create($queueName);
168+
if($config)
169+
{
170+
$q->configure($config);
171+
}
172+
return $q;
173+
}
160174
}

0 commit comments

Comments
 (0)