7
7
8
8
namespace Magento \AsynchronousOperations \Model ;
9
9
10
+ use Magento \Framework \App \ObjectManager ;
10
11
use Magento \Framework \MessageQueue \CallbackInvokerInterface ;
11
12
use Magento \Framework \MessageQueue \ConsumerConfigurationInterface ;
12
13
use Magento \Framework \MessageQueue \ConsumerInterface ;
13
14
use Magento \Framework \MessageQueue \EnvelopeInterface ;
14
15
use Magento \Framework \MessageQueue \QueueInterface ;
15
16
use Magento \Framework \Registry ;
17
+ use Magento \Framework \MessageQueue \Consumer \ConfigInterface as ConsumerConfig ;
16
18
17
19
/**
18
20
* Class Consumer used to process OperationInterface messages.
@@ -41,24 +43,32 @@ class MassConsumer implements ConsumerInterface
41
43
*/
42
44
private $ registry ;
43
45
46
+ /**
47
+ * @var ConsumerConfig
48
+ */
49
+ private $ consumerConfig ;
50
+
44
51
/**
45
52
* Initialize dependencies.
46
53
*
47
54
* @param CallbackInvokerInterface $invoker
48
55
* @param ConsumerConfigurationInterface $configuration
49
56
* @param MassConsumerEnvelopeCallbackFactory $massConsumerEnvelopeCallback
50
57
* @param Registry $registry
58
+ * @param ConsumerConfig|null $consumerConfig
51
59
*/
52
60
public function __construct (
53
61
CallbackInvokerInterface $ invoker ,
54
62
ConsumerConfigurationInterface $ configuration ,
55
63
MassConsumerEnvelopeCallbackFactory $ massConsumerEnvelopeCallback ,
56
- Registry $ registry
64
+ Registry $ registry ,
65
+ ?ConsumerConfig $ consumerConfig = null
57
66
) {
58
67
$ this ->invoker = $ invoker ;
59
68
$ this ->configuration = $ configuration ;
60
69
$ this ->massConsumerEnvelopeCallback = $ massConsumerEnvelopeCallback ;
61
70
$ this ->registry = $ registry ;
71
+ $ this ->consumerConfig = $ consumerConfig ?: ObjectManager::getInstance ()->get (ConsumerConfig::class);
62
72
}
63
73
64
74
/**
@@ -75,12 +85,16 @@ public function process($maxNumberOfMessages = null)
75
85
if (!isset ($ maxNumberOfMessages )) {
76
86
$ queue ->subscribe ($ this ->getTransactionCallback ($ queue ));
77
87
} else {
88
+ $ connectionName = $ this ->consumerConfig
89
+ ->getConsumer ($ this ->configuration ->getConsumerName ())
90
+ ->getConnection ();
78
91
$ this ->invoker ->invoke (
79
92
$ queue ,
80
93
$ maxNumberOfMessages ,
81
94
$ this ->getTransactionCallback ($ queue ),
82
95
$ maxIdleTime ,
83
- $ sleep
96
+ $ sleep ,
97
+ $ connectionName
84
98
);
85
99
}
86
100
0 commit comments