1
1
<?php
2
2
namespace Codeception \Module ;
3
3
4
+ use Codeception \Exception \ModuleException as ModuleException ;
4
5
use Codeception \Lib \Interfaces \RequiresPackage ;
5
6
use Codeception \Module as CodeceptionModule ;
6
- use Codeception \Exception \ModuleException as ModuleException ;
7
7
use Codeception \TestInterface ;
8
8
use Exception ;
9
9
use PhpAmqpLib \Channel \AMQPChannel ;
10
10
use PhpAmqpLib \Connection \AMQPStreamConnection ;
11
- use PhpAmqpLib \Message \AMQPMessage ;
12
11
use PhpAmqpLib \Exception \AMQPProtocolChannelException ;
12
+ use PhpAmqpLib \Message \AMQPMessage ;
13
13
14
14
/**
15
15
* This module interacts with message broker software that implements
27
27
* * vhost: '/' - vhost to connect
28
28
* * cleanup: true - defined queues will be purged before running every test.
29
29
* * queues: [mail, twitter] - queues to cleanup
30
+ * * single_channel - create and use only one channel during test execution
30
31
*
31
32
* ### Example
32
33
*
39
40
* password: 'guest'
40
41
* vhost: '/'
41
42
* queues: [queue1, queue2]
43
+ * single_channel: false
42
44
*
43
45
* ## Public Properties
44
46
*
47
49
class AMQP extends CodeceptionModule implements RequiresPackage
48
50
{
49
51
protected $ config = [
50
- 'host ' => 'localhost ' ,
51
- 'username ' => 'guest ' ,
52
- 'password ' => 'guest ' ,
53
- 'port ' => '5672 ' ,
54
- 'vhost ' => '/ ' ,
55
- 'cleanup ' => true ,
52
+ 'host ' => 'localhost ' ,
53
+ 'username ' => 'guest ' ,
54
+ 'password ' => 'guest ' ,
55
+ 'port ' => '5672 ' ,
56
+ 'vhost ' => '/ ' ,
57
+ 'cleanup ' => true ,
58
+ 'single_channel ' => false
56
59
];
57
60
58
61
/**
@@ -61,9 +64,9 @@ class AMQP extends CodeceptionModule implements RequiresPackage
61
64
public $ connection ;
62
65
63
66
/**
64
- * @var AMQPChannel
67
+ * @var int
65
68
*/
66
- protected $ channel ;
69
+ protected $ channelId ;
67
70
68
71
protected $ requiredFields = ['host ' , 'username ' , 'password ' , 'vhost ' ];
69
72
@@ -115,7 +118,7 @@ public function pushToExchange($exchange, $message, $routing_key = null)
115
118
$ message = $ message instanceof AMQPMessage
116
119
? $ message
117
120
: new AMQPMessage ($ message );
118
- $ this ->connection -> channel ()->basic_publish ($ message , $ exchange , $ routing_key );
121
+ $ this ->getChannel ()->basic_publish ($ message , $ exchange , $ routing_key );
119
122
}
120
123
121
124
/**
@@ -137,8 +140,8 @@ public function pushToQueue($queue, $message)
137
140
? $ message
138
141
: new AMQPMessage ($ message );
139
142
140
- $ this ->connection -> channel ()->queue_declare ($ queue );
141
- $ this ->connection -> channel ()->basic_publish ($ message , '' , $ queue );
143
+ $ this ->getChannel ()->queue_declare ($ queue );
144
+ $ this ->getChannel ()->basic_publish ($ message , '' , $ queue );
142
145
}
143
146
144
147
/**
@@ -176,7 +179,7 @@ public function declareExchange(
176
179
$ arguments = null ,
177
180
$ ticket = null
178
181
) {
179
- return $ this ->connection -> channel ()->exchange_declare (
182
+ return $ this ->getChannel ()->exchange_declare (
180
183
$ exchange ,
181
184
$ type ,
182
185
$ passive ,
@@ -221,7 +224,7 @@ public function declareQueue(
221
224
$ arguments = null ,
222
225
$ ticket = null
223
226
) {
224
- return $ this ->connection -> channel ()->queue_declare (
227
+ return $ this ->getChannel ()->queue_declare (
225
228
$ queue ,
226
229
$ passive ,
227
230
$ durable ,
@@ -263,7 +266,7 @@ public function bindQueueToExchange(
263
266
$ arguments = null ,
264
267
$ ticket = null
265
268
) {
266
- return $ this ->connection -> channel ()->queue_bind (
269
+ return $ this ->getChannel ()->queue_bind (
267
270
$ queue ,
268
271
$ exchange ,
269
272
$ routing_key ,
@@ -291,7 +294,7 @@ public function bindQueueToExchange(
291
294
*/
292
295
public function seeMessageInQueueContainsText ($ queue , $ text )
293
296
{
294
- $ msg = $ this ->connection -> channel ()->basic_get ($ queue );
297
+ $ msg = $ this ->getChannel ()->basic_get ($ queue );
295
298
if (!$ msg ) {
296
299
$ this ->fail ("Message was not received " );
297
300
}
@@ -316,7 +319,7 @@ public function seeMessageInQueueContainsText($queue, $text)
316
319
*/
317
320
public function grabMessageFromQueue ($ queue )
318
321
{
319
- $ message = $ this ->connection -> channel ()->basic_get ($ queue );
322
+ $ message = $ this ->getChannel ()->basic_get ($ queue );
320
323
return $ message ;
321
324
}
322
325
@@ -337,7 +340,7 @@ public function purgeQueue($queueName = '')
337
340
throw new ModuleException (__CLASS__ , "' $ queueName' doesn't exist in queues config list " );
338
341
}
339
342
340
- $ this ->connection -> channel ()->queue_purge ($ queueName , true );
343
+ $ this ->getChannel ()->queue_purge ($ queueName , true );
341
344
}
342
345
343
346
/**
@@ -354,6 +357,17 @@ public function purgeAllQueues()
354
357
$ this ->cleanup ();
355
358
}
356
359
360
+ /**
361
+ * @return AMQPChannel
362
+ */
363
+ protected function getChannel ()
364
+ {
365
+ if ($ this ->config ['single_channel ' ] && $ this ->channelId === null ) {
366
+ $ this ->channelId = $ this ->connection ->get_free_channel_id ();
367
+ }
368
+ return $ this ->connection ->channel ($ this ->channelId );
369
+ }
370
+
357
371
protected function cleanup ()
358
372
{
359
373
if (!isset ($ this ->config ['queues ' ])) {
@@ -364,7 +378,7 @@ protected function cleanup()
364
378
}
365
379
foreach ($ this ->config ['queues ' ] as $ queue ) {
366
380
try {
367
- $ this ->connection -> channel ()->queue_purge ($ queue );
381
+ $ this ->getChannel ()->queue_purge ($ queue );
368
382
} catch (AMQPProtocolChannelException $ e ) {
369
383
// ignore if exchange/queue doesn't exist and rethrow exception if it's something else
370
384
if ($ e ->getCode () !== 404 ) {
0 commit comments