Skip to content

Commit 9359ba3

Browse files
committed
Using latest PHPUnit 6.1 and Orchestra TestBench 3.4, feature to skip declaring queues added (hopefully addresses #1), size() implemented for queues
1 parent 3ee5817 commit 9359ba3

File tree

7 files changed

+160
-71
lines changed

7 files changed

+160
-71
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ In your ```config/queue.php``` file you have to provide the following:
3535
'vhost' => '/',
3636
'queue' => null,
3737
'queue_flags' => ['durable' => true, 'routing_key' => null], //Durable queue (survives server crash)
38+
'declare_queues' => true, //If we need to declare queues each time before sending a message. If not, you will have to declare them manually elsewhere
3839
'message_properties' => ['delivery_mode' => 2], //Persistent messages (survives server crash)
3940
'channel_id' => null,
4041
'exchange_name' => null,

composer.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
"require-dev": {
1111
"laravel/framework": ">=5.3.0",
1212
"squizlabs/php_codesniffer": "1.*",
13-
"orchestra/testbench": "3.3.*",
14-
"phpunit/phpunit": "4.3.*"
13+
"orchestra/testbench": "~3.4",
14+
"phpunit/phpunit": "~6.1"
1515
},
1616
"autoload": {
1717
"psr-4": {

src/Connectors/AmqpConnector.php

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,9 @@ public function connect(array $config)
4848
}
4949

5050
return new AMQPQueue(
51-
$connection,
52-
$config['queue'],
53-
$config['queue_flags'],
54-
$config['message_properties'],
55-
$config['channel_id'],
56-
$config['exchange_name'],
57-
$config['exchange_type'],
58-
$config['exchange_flags']
51+
$connection, $config['queue'], $config['queue_flags'], $config['declare_queues'],
52+
$config['message_properties'], $config['channel_id'],
53+
$config['exchange_name'], $config['exchange_type'], $config['exchange_flags']
5954
);
6055
}
6156
}

src/Queue/AMQPQueue.php

Lines changed: 70 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -52,32 +52,42 @@ class AMQPQueue extends Queue implements QueueContract
5252
* @var string Default channel id if needed
5353
*/
5454
private $defaultChannelId;
55+
5556
/**
5657
* @var array
5758
*/
5859
private $queueFlags;
60+
5961
/**
6062
* @var array
6163
*/
6264
private $messageProperties;
6365

66+
/**
67+
* @var bool
68+
*/
69+
private $declareQueues;
70+
6471
/**
6572
* @param AMQPStreamConnection $connection
66-
* @param string $defaultQueueName Default queue name
67-
* @param array $queueFlags Queue flags See a list of parameters to
68-
* \PhpAmqpLib\Channel\AMQPChannel::queue_declare. Parameters should be
69-
* passed like for call_user_func_array in this parameter
70-
* @param array $messageProperties This is passed as a second parameter to \PhpAmqpLib\Message\AMQPMessage
71-
* constructor
72-
* @param string $defaultChannelId Default channel id
73-
* @param string $exchangeName Exchange name
74-
* @param mixed $exchangeType Exchange type
75-
* @param mixed $exchangeFlags Exchange flags
73+
* @param string $defaultQueueName Default queue name
74+
* @param array $queueFlags Queue flags See a list of parameters to
75+
* \PhpAmqpLib\Channel\AMQPChannel::queue_declare. Parameters should
76+
* be passed like for call_user_func_array in this parameter
77+
* @param bool $declareQueues If we should declare queues before actually trying to send a
78+
* message
79+
* @param array $messageProperties This is passed as a second parameter to
80+
* \PhpAmqpLib\Message\AMQPMessage constructor
81+
* @param string $defaultChannelId Default channel id
82+
* @param string $exchangeName Exchange name
83+
* @param mixed $exchangeType Exchange type
84+
* @param mixed $exchangeFlags Exchange flags
7685
*/
7786
public function __construct(
7887
AMQPStreamConnection $connection,
7988
$defaultQueueName = null,
8089
$queueFlags = [],
90+
$declareQueues = true,
8191
$messageProperties = [],
8292
$defaultChannelId = null,
8393
$exchangeName = '',
@@ -87,6 +97,7 @@ public function __construct(
8797
$this->connection = $connection;
8898
$this->defaultQueueName = $defaultQueueName ?: 'default';
8999
$this->queueFlags = $queueFlags;
100+
$this->declareQueues = $declareQueues;
90101
$this->messageProperties = $messageProperties;
91102
$this->defaultChannelId = $defaultChannelId;
92103
$this->exchangeName = $exchangeName;
@@ -98,9 +109,9 @@ public function __construct(
98109
}
99110

100111
/**
101-
* @param string $exchangeName The name of the exchange. For example, 'logs'
102-
* @param string $exchangeType The type of the exchange. See EXCHANGE_TYPE_* constants for details
103-
* @param array $exchangeFlags The flags of the exchange. See \PhpAmqpLib\Channel\AMQPChannel::exchange_declare
112+
* @param string $exchangeName The name of the exchange. For example, 'logs'
113+
* @param string $exchangeType The type of the exchange. See EXCHANGE_TYPE_* constants for details
114+
* @param array $exchangeFlags The flags of the exchange. See \PhpAmqpLib\Channel\AMQPChannel::exchange_declare
104115
* (from third parameter onwards). Must be an assoc array. Default flags can be omitted
105116
*
106117
* @see \PhpAmqpLib\Channel\AMQPChannel::exchange_declare
@@ -109,15 +120,15 @@ public function __construct(
109120
protected function declareExchange($exchangeName, $exchangeType, array $exchangeFlags = [])
110121
{
111122
$flags = array_replace([
112-
'exchange' => $exchangeName,
113-
'type' => $exchangeType,
114-
'passive' => false,
115-
'durable' => false,
123+
'exchange' => $exchangeName,
124+
'type' => $exchangeType,
125+
'passive' => false,
126+
'durable' => false,
116127
'auto_delete' => true,
117-
'internal' => false,
118-
'nowait' => false,
119-
'arguments' => null,
120-
'ticket' => null,
128+
'internal' => false,
129+
'nowait' => false,
130+
'arguments' => null,
131+
'ticket' => null,
121132
], $exchangeFlags);
122133

123134
call_user_func_array([$this->channel, 'exchange_declare'], $flags);
@@ -136,7 +147,9 @@ protected function declareExchange($exchangeName, $exchangeType, array $exchange
136147
public function push($job, $data = '', $queue = null)
137148
{
138149
$queue = $this->getQueueName($queue);
139-
$this->declareQueue($queue);
150+
if ($this->declareQueues) {
151+
$this->declareQueue($queue);
152+
}
140153
$payload = new AMQPMessage($this->createPayload($job, $data), $this->messageProperties);
141154
$this->channel->basic_publish($payload, $this->exchangeName, $this->getRoutingKey($queue));
142155
return true;
@@ -164,29 +177,29 @@ protected function getQueueName($name)
164177
*
165178
* @param string $name The name of the queue to declare
166179
*
167-
* @return void
180+
* @return QueueInfo
168181
*/
169182
public function declareQueue($name)
170183
{
171184
$queue = $this->getQueueName($name);
172185
$flags = array_replace_recursive([
173-
'queue' => $queue,
174-
'passive' => false,
175-
'durable' => false,
176-
'exclusive' => false,
186+
'queue' => $queue,
187+
'passive' => false,
188+
'durable' => false,
189+
'exclusive' => false,
177190
'auto_delete' => true,
178-
'nowait' => false,
179-
'arguments' => null,
180-
'ticket' => null,
191+
'nowait' => false,
192+
'arguments' => null,
193+
'ticket' => null,
181194
], $this->getQueueFlags($name));
182195

183-
call_user_func_array([$this->channel, 'queue_declare'], $flags);
196+
return QueueInfo::createFromDeclareOk(call_user_func_array([$this->channel, 'queue_declare'], $flags));
184197
}
185198

186199
/**
187-
* @param string $queueName
200+
* @param string $queueName
188201
* @param null|string $deferredQueueName
189-
* @param null|int $deferredQueueDelay
202+
* @param null|int $deferredQueueDelay
190203
*
191204
* @return array
192205
*/
@@ -206,6 +219,7 @@ protected function getQueueFlags($queueName, $deferredQueueName = null, $deferre
206219
* Get routing key from config or use default one (queue name)
207220
*
208221
* @param $queue string
222+
*
209223
* @return string Routing key name
210224
*/
211225
protected function getRoutingKey($queue)
@@ -225,7 +239,9 @@ protected function getRoutingKey($queue)
225239
public function pushRaw($payload, $queue = null, array $options = [])
226240
{
227241
$queue = $this->getQueueName($queue);
228-
$this->declareQueue($queue);
242+
if ($this->declareQueues) {
243+
$this->declareQueue($queue);
244+
}
229245
$payload = new AMQPMessage($payload, $this->messageProperties);
230246
$this->channel->basic_publish($payload, $this->exchangeName, $queue);
231247
return true;
@@ -248,7 +264,9 @@ public function later($delay, $job, $data = '', $queue = null)
248264
}
249265

250266
$queue = $this->getQueueName($queue);
251-
$this->declareQueue($queue);
267+
if ($this->declareQueues) {
268+
$this->declareQueue($queue);
269+
}
252270
$delayedQueueName = $this->declareDelayedQueue($queue, $delay);
253271

254272
$payload = new AMQPMessage($this->createPayload($job, $data), $this->messageProperties);
@@ -270,21 +288,21 @@ public function declareDelayedQueue($destinationQueueName, $delay)
270288
$deferredQueueName = $destinationQueueName . '_deferred_' . $delay;
271289

272290
$flags = array_replace_recursive([
273-
'queue' => '',
274-
'passive' => false,
275-
'durable' => false,
276-
'exclusive' => false,
291+
'queue' => '',
292+
'passive' => false,
293+
'durable' => false,
294+
'exclusive' => false,
277295
'auto_delete' => true,
278-
'nowait' => false,
279-
'arguments' => null,
280-
'ticket' => null,
296+
'nowait' => false,
297+
'arguments' => null,
298+
'ticket' => null,
281299
], $this->getQueueFlags($destinationQueueName, $deferredQueueName, $delay), [
282-
'queue' => $deferredQueueName,
283-
'durable' => true,
300+
'queue' => $deferredQueueName,
301+
'durable' => true,
284302
'arguments' => new AMQPTable([
285-
'x-dead-letter-exchange' => '',
303+
'x-dead-letter-exchange' => '',
286304
'x-dead-letter-routing-key' => $destinationQueueName,
287-
'x-message-ttl' => $delay * 1000,
305+
'x-message-ttl' => $delay * 1000,
288306
]),
289307
]);
290308

@@ -302,7 +320,9 @@ public function declareDelayedQueue($destinationQueueName, $delay)
302320
public function pop($queue = null)
303321
{
304322
$queue = $this->getQueueName($queue);
305-
$this->declareQueue($queue);
323+
if ($this->declareQueues) {
324+
$this->declareQueue($queue);
325+
}
306326
$envelope = $this->channel->basic_get($queue);
307327

308328
if ($envelope instanceof AMQPMessage) {
@@ -316,9 +336,12 @@ public function pop($queue = null)
316336
* Get the size of the queue.
317337
*
318338
* @param string $queue
339+
*
319340
* @return int
320341
*/
321342
public function size($queue = null)
322343
{
344+
$data = $this->declareQueue($this->getQueueName($queue));
345+
return $data->getJobs();
323346
}
324347
}

src/Queue/QueueInfo.php

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
<?php
2+
3+
namespace Forumhouse\LaravelAmqp\Queue;
4+
5+
/**
6+
* Class providing information about a queue
7+
*/
8+
class QueueInfo
9+
{
10+
/**
11+
* @var string The name of the queue
12+
*/
13+
private $name;
14+
15+
/**
16+
* @var int The number of ready messages in the queue
17+
*/
18+
private $jobs;
19+
20+
/**
21+
* @var int The number of consumers currently connected to read from queue
22+
*/
23+
private $consumers;
24+
25+
public function __construct($name, $jobs, $consumers)
26+
{
27+
$this->name = $name;
28+
$this->jobs = $jobs;
29+
$this->consumers = $consumers;
30+
}
31+
32+
/**
33+
* Creates an instance from the data returned from queue_declare call
34+
*
35+
* @param array $declareOkData An array of data returned by queue_declare
36+
*
37+
* @return static
38+
*/
39+
public static function createFromDeclareOk(array $declareOkData)
40+
{
41+
list($name, $jobs, $consumers) = $declareOkData;
42+
return new static($name, $jobs, $consumers);
43+
}
44+
45+
/**
46+
* @return string
47+
*/
48+
public function getName(): string
49+
{
50+
return $this->name;
51+
}
52+
53+
/**
54+
* @return int
55+
*/
56+
public function getJobs(): int
57+
{
58+
return $this->jobs;
59+
}
60+
61+
/**
62+
* @return int
63+
*/
64+
public function getConsumers(): int
65+
{
66+
return $this->consumers;
67+
}
68+
}

tests/BaseFeaturesTest.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Forumhouse\LaravelAmqp\Tests;
44

5+
use Forumhouse\LaravelAmqp\Jobs\AMQPJob;
56
use Illuminate\Queue\Jobs\Job;
67

78
/**
@@ -32,7 +33,7 @@ public function setUp()
3233
public function testSimpleDelete()
3334
{
3435
$this->app['queue']->push(self::TEST_JOB_CLASS, ['test1' => 1, 'test2' => 2, 'delete' => true]);
35-
/** @var Job $job */
36+
/** @var AMQPJob $job */
3637
$job = $this->app['queue']->pop();
3738
$this->assertInstanceOf('Illuminate\Queue\Jobs\Job', $job);
3839
$job->fire();
@@ -51,7 +52,7 @@ public function testSimpleRelease()
5152
{
5253
$this->app['queue']->push(self::TEST_JOB_CLASS, ['test1' => 1, 'test2' => 2, 'release' => true]);
5354

54-
/** @var Job $job */
55+
/** @var AMQPJob $job */
5556
$job = $this->app['queue']->pop();
5657
$this->assertInstanceOf('Illuminate\Queue\Jobs\Job', $job);
5758
$this->assertEquals(0, $job->attempts());

0 commit comments

Comments
 (0)