Skip to content

Commit 3220979

Browse files
committed
Applied improvements from #25197
1 parent 8cc8edc commit 3220979

File tree

5 files changed

+81
-35
lines changed

5 files changed

+81
-35
lines changed

app/code/Magento/MysqlMq/Model/Driver/Bulk/Exchange.php

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,24 @@
33
* Copyright © Magento, Inc. All rights reserved.
44
* See COPYING.txt for license details.
55
*/
6+
67
namespace Magento\MysqlMq\Model\Driver\Bulk;
78

89
use Magento\Framework\MessageQueue\Bulk\ExchangeInterface;
910
use Magento\Framework\MessageQueue\Topology\ConfigInterface as MessageQueueConfig;
11+
use Magento\MysqlMq\Model\ConnectionTypeResolver;
1012
use Magento\MysqlMq\Model\QueueManagement;
1113

1214
/**
1315
* Used to send messages in bulk in MySQL queue.
1416
*/
1517
class Exchange implements ExchangeInterface
1618
{
19+
/**
20+
* @var ConnectionTypeResolver
21+
*/
22+
private $connectionTypeResolver;
23+
1724
/**
1825
* @var MessageQueueConfig
1926
*/
@@ -27,13 +34,18 @@ class Exchange implements ExchangeInterface
2734
/**
2835
* Initialize dependencies.
2936
*
37+
* @param ConnectionTypeResolver $connectionTypeResolver
3038
* @param MessageQueueConfig $messageQueueConfig
3139
* @param QueueManagement $queueManagement
3240
*/
33-
public function __construct(MessageQueueConfig $messageQueueConfig, QueueManagement $queueManagement)
34-
{
41+
public function __construct(
42+
ConnectionTypeResolver $connectionTypeResolver,
43+
MessageQueueConfig $messageQueueConfig,
44+
QueueManagement $queueManagement
45+
) {
3546
$this->messageQueueConfig = $messageQueueConfig;
3647
$this->queueManagement = $queueManagement;
48+
$this->connectionTypeResolver = $connectionTypeResolver;
3749
}
3850

3951
/**
@@ -44,16 +56,17 @@ public function enqueue($topic, array $envelopes)
4456
$queueNames = [];
4557
$exchanges = $this->messageQueueConfig->getExchanges();
4658
foreach ($exchanges as $exchange) {
47-
// @todo Is there a more reliable way to identify MySQL exchanges?
48-
if ($exchange->getConnection() == 'db') {
49-
foreach ($exchange->getBindings() as $binding) {
50-
// This only supports exact matching of topics.
51-
if ($binding->getTopic() == $topic) {
52-
$queueNames[] = $binding->getDestination();
53-
}
59+
$connection = $exchange->getConnection();
60+
if ($this->connectionTypeResolver->getConnectionType($connection)) {
61+
foreach ($exchange->getBindings() as $binding) {
62+
// This only supports exact matching of topics.
63+
if ($binding->getTopic() === $topic) {
64+
$queueNames[] = $binding->getDestination();
65+
}
66+
}
5467
}
55-
}
5668
}
69+
5770
$messages = array_map(
5871
function ($envelope) {
5972
return $envelope->getBody();

app/code/Magento/MysqlMq/Model/Driver/Exchange.php

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,26 @@
33
* Copyright © Magento, Inc. All rights reserved.
44
* See COPYING.txt for license details.
55
*/
6+
67
namespace Magento\MysqlMq\Model\Driver;
78

89
use Magento\Framework\MessageQueue\EnvelopeInterface;
910
use Magento\Framework\MessageQueue\ExchangeInterface;
1011
use Magento\Framework\MessageQueue\Topology\ConfigInterface as MessageQueueConfig;
12+
use Magento\MysqlMq\Model\ConnectionTypeResolver;
1113
use Magento\MysqlMq\Model\QueueManagement;
1214

15+
/**
16+
* Class Exchange
17+
* @package Magento\MysqlMq\Model\Driver
18+
*/
1319
class Exchange implements ExchangeInterface
1420
{
21+
/**
22+
* @var ConnectionTypeResolver
23+
*/
24+
private $connectionTypeResolver;
25+
1526
/**
1627
* @var MessageQueueConfig
1728
*/
@@ -25,13 +36,18 @@ class Exchange implements ExchangeInterface
2536
/**
2637
* Initialize dependencies.
2738
*
39+
* @param ConnectionTypeResolver $connectionTypeResolver
2840
* @param MessageQueueConfig $messageQueueConfig
2941
* @param QueueManagement $queueManagement
3042
*/
31-
public function __construct(MessageQueueConfig $messageQueueConfig, QueueManagement $queueManagement)
32-
{
43+
public function __construct(
44+
ConnectionTypeResolver $connectionTypeResolver,
45+
MessageQueueConfig $messageQueueConfig,
46+
QueueManagement $queueManagement
47+
) {
3348
$this->messageQueueConfig = $messageQueueConfig;
3449
$this->queueManagement = $queueManagement;
50+
$this->connectionTypeResolver = $connectionTypeResolver;
3551
}
3652

3753
/**
@@ -46,15 +62,14 @@ public function enqueue($topic, EnvelopeInterface $envelope)
4662
$queueNames = [];
4763
$exchanges = $this->messageQueueConfig->getExchanges();
4864
foreach ($exchanges as $exchange) {
49-
// @todo Is there a more reliable way to identify MySQL exchanges?
50-
if ($exchange->getConnection() == 'db') {
51-
foreach ($exchange->getBindings() as $binding) {
52-
// This only supports exact matching of topics.
53-
if ($binding->getTopic() == $topic) {
54-
$queueNames[] = $binding->getDestination();
55-
}
65+
$connection = $exchange->getConnection();
66+
if ($this->connectionTypeResolver->getConnectionType($connection)) {
67+
foreach ($exchange->getBindings() as $binding) {
68+
if ($binding->getTopic() == $topic) {
69+
$queueNames[] = $binding->getDestination();
70+
}
71+
}
5672
}
57-
}
5873
}
5974
$this->queueManagement->addMessageToQueues($topic, $envelope->getBody(), $queueNames);
6075
return null;

app/code/Magento/MysqlMq/Setup/Recurring.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@ public function install(SchemaSetupInterface $setup, ModuleContextInterface $con
3737

3838
$queues = [];
3939
foreach ($this->messageQueueConfig->getQueues() as $queue) {
40-
$queues[] = $queue->getName();
40+
$queues[] = $queue->getName();
4141
}
42+
4243
$connection = $setup->getConnection();
4344
$existingQueues = $connection->fetchCol($connection->select()->from($setup->getTable('queue'), 'name'));
4445
$queues = array_unique(array_diff($queues, $existingQueues));

app/code/Magento/MysqlMq/Test/Unit/Model/Driver/Bulk/ExchangeTest.php

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
class ExchangeTest extends \PHPUnit\Framework\TestCase
1313
{
1414
/**
15-
* @var \Magento\Framework\MessageQueue\Topology\ConfigInterface|\PHPUnit_Framework_MockObject_MockObject
15+
* @var \Magento\Framework\MessageQueue\ConfigInterface|\PHPUnit_Framework_MockObject_MockObject
1616
*/
1717
private $messageQueueConfig;
1818

@@ -25,6 +25,10 @@ class ExchangeTest extends \PHPUnit\Framework\TestCase
2525
* @var \Magento\MysqlMq\Model\Driver\Bulk\Exchange
2626
*/
2727
private $exchange;
28+
/**
29+
* @var \Magento\MysqlMq\Model\ConnectionTypeResolver|\PHPUnit_Framework_MockObject_MockObject
30+
*/
31+
private $connnectionTypeResolver;
2832

2933
/**
3034
* Set up.
@@ -33,15 +37,20 @@ class ExchangeTest extends \PHPUnit\Framework\TestCase
3337
*/
3438
protected function setUp()
3539
{
36-
$this->messageQueueConfig = $this->getMockBuilder(\Magento\Framework\MessageQueue\Topology\ConfigInterface::class)
40+
$this->messageQueueConfig = $this->getMockBuilder(
41+
\Magento\Framework\MessageQueue\Topology\ConfigInterface::class
42+
)
3743
->disableOriginalConstructor()->getMock();
3844
$this->queueManagement = $this->getMockBuilder(\Magento\MysqlMq\Model\QueueManagement::class)
3945
->disableOriginalConstructor()->getMock();
46+
$this->connnectionTypeResolver = $this->getMockBuilder(\Magento\MysqlMq\Model\ConnectionTypeResolver::class)
47+
->disableOriginalConstructor()->getMock();
4048

4149
$objectManager = new \Magento\Framework\TestFramework\Unit\Helper\ObjectManager($this);
4250
$this->exchange = $objectManager->getObject(
4351
\Magento\MysqlMq\Model\Driver\Bulk\Exchange::class,
4452
[
53+
'connectionTypeResolver' => $this->connnectionTypeResolver,
4554
'messageQueueConfig' => $this->messageQueueConfig,
4655
'queueManagement' => $this->queueManagement,
4756
]
@@ -57,36 +66,42 @@ public function testEnqueue()
5766
{
5867
$topicName = 'topic.name';
5968
$queueNames = ['queue0'];
60-
61-
$binding1 = $this->createMock(\Magento\Framework\MessageQueue\Topology\Config\ExchangeConfigItem\BindingInterface::class);
69+
$binding1 = $this->createMock(
70+
\Magento\Framework\MessageQueue\Topology\Config\ExchangeConfigItem\BindingInterface::class
71+
);
6272
$binding1->expects($this->once())
6373
->method('getTopic')
6474
->willReturn($topicName);
6575
$binding1->expects($this->once())
6676
->method('getDestination')
6777
->willReturn($queueNames[0]);
68-
69-
$binding2 = $this->createMock(\Magento\Framework\MessageQueue\Topology\Config\ExchangeConfigItem\BindingInterface::class);
78+
$binding2 = $this->createMock(
79+
\Magento\Framework\MessageQueue\Topology\Config\ExchangeConfigItem\BindingInterface::class
80+
);
7081
$binding2->expects($this->once())
7182
->method('getTopic')
7283
->willReturn('different.topic');
7384
$binding2->expects($this->never())
7485
->method('getDestination');
75-
76-
$exchange1 = $this->createMock(\Magento\Framework\MessageQueue\Topology\Config\ExchangeConfigItemInterface::class);
86+
$exchange1 = $this->createMock(
87+
\Magento\Framework\MessageQueue\Topology\Config\ExchangeConfigItemInterface::class
88+
);
7789
$exchange1->expects($this->once())
7890
->method('getConnection')
7991
->willReturn('db');
8092
$exchange1->expects($this->once())
8193
->method('getBindings')
8294
->willReturn([$binding1, $binding2]);
83-
$exchange2 = $this->createMock(\Magento\Framework\MessageQueue\Topology\Config\ExchangeConfigItemInterface::class);
95+
$exchange2 = $this->createMock(
96+
\Magento\Framework\MessageQueue\Topology\Config\ExchangeConfigItemInterface::class
97+
);
8498
$exchange2->expects($this->once())
8599
->method('getConnection')
86100
->willReturn('amqp');
87101
$exchange2->expects($this->never())
88102
->method('getBindings');
89103

104+
$this->connnectionTypeResolver->method('getConnectionType')->willReturnOnConsecutiveCalls(['db', null]);
90105
$envelopeBody = 'serializedMessage';
91106
$this->messageQueueConfig->expects($this->once())
92107
->method('getExchanges')->willReturn([$exchange1, $exchange2]);

app/code/Magento/MysqlMq/Test/Unit/Setup/RecurringTest.php

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class RecurringTest extends \PHPUnit\Framework\TestCase
2424
private $model;
2525

2626
/**
27-
* @var \Magento\Framework\MessageQueue\Topology\ConfigInterface|\PHPUnit_Framework_MockObject_MockObject
27+
* @var \Magento\Framework\MessageQueue\ConfigInterface|\PHPUnit_Framework_MockObject_MockObject
2828
*/
2929
private $messageQueueConfig;
3030

@@ -34,7 +34,9 @@ class RecurringTest extends \PHPUnit\Framework\TestCase
3434
protected function setUp()
3535
{
3636
$this->objectManager = new ObjectManager($this);
37-
$this->messageQueueConfig = $this->getMockBuilder(\Magento\Framework\MessageQueue\Topology\ConfigInterface::class)
37+
$this->messageQueueConfig = $this->getMockBuilder(
38+
\Magento\Framework\MessageQueue\Topology\ConfigInterface::class
39+
)
3840
->getMockForAbstractClass();
3941
$this->model = $this->objectManager->getObject(
4042
\Magento\MysqlMq\Setup\Recurring::class,
@@ -45,15 +47,15 @@ protected function setUp()
4547
}
4648

4749
/**
48-
* Test for install method
50+
* {@inheritdoc}
4951
*/
5052
public function testInstall()
5153
{
52-
for ($i = 1; $i <=3; $i++) {
54+
for ($i = 1; $i <= 3; $i++) {
5355
$queue = $this->createMock(\Magento\Framework\MessageQueue\Topology\Config\QueueConfigItemInterface::class);
5456
$queue->expects($this->once())
5557
->method('getName')
56-
->willReturn('queue_name_'. $i);
58+
->willReturn('queue_name_' . $i);
5759
$queues[] = $queue;
5860
}
5961

0 commit comments

Comments
 (0)