Skip to content

Commit bbb3219

Browse files
ENGCOM-6171: Fix for Mysql MessageQueue (2) #25289
2 parents 09574d7 + cfbe267 commit bbb3219

File tree

5 files changed

+125
-35
lines changed

5 files changed

+125
-35
lines changed

app/code/Magento/MysqlMq/Model/Driver/Bulk/Exchange.php

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,24 @@
33
* Copyright © Magento, Inc. All rights reserved.
44
* See COPYING.txt for license details.
55
*/
6+
67
namespace Magento\MysqlMq\Model\Driver\Bulk;
78

89
use Magento\Framework\MessageQueue\Bulk\ExchangeInterface;
9-
use Magento\Framework\MessageQueue\ConfigInterface as MessageQueueConfig;
10+
use Magento\Framework\MessageQueue\Topology\ConfigInterface as MessageQueueConfig;
11+
use Magento\MysqlMq\Model\ConnectionTypeResolver;
1012
use Magento\MysqlMq\Model\QueueManagement;
1113

1214
/**
1315
* Used to send messages in bulk in MySQL queue.
1416
*/
1517
class Exchange implements ExchangeInterface
1618
{
19+
/**
20+
* @var ConnectionTypeResolver
21+
*/
22+
private $connectionTypeResolver;
23+
1724
/**
1825
* @var MessageQueueConfig
1926
*/
@@ -27,21 +34,39 @@ class Exchange implements ExchangeInterface
2734
/**
2835
* Initialize dependencies.
2936
*
37+
* @param ConnectionTypeResolver $connectionTypeResolver
3038
* @param MessageQueueConfig $messageQueueConfig
3139
* @param QueueManagement $queueManagement
3240
*/
33-
public function __construct(MessageQueueConfig $messageQueueConfig, QueueManagement $queueManagement)
34-
{
41+
public function __construct(
42+
ConnectionTypeResolver $connectionTypeResolver,
43+
MessageQueueConfig $messageQueueConfig,
44+
QueueManagement $queueManagement
45+
) {
3546
$this->messageQueueConfig = $messageQueueConfig;
3647
$this->queueManagement = $queueManagement;
48+
$this->connectionTypeResolver = $connectionTypeResolver;
3749
}
3850

3951
/**
4052
* @inheritdoc
4153
*/
4254
public function enqueue($topic, array $envelopes)
4355
{
44-
$queueNames = $this->messageQueueConfig->getQueuesByTopic($topic);
56+
$queueNames = [];
57+
$exchanges = $this->messageQueueConfig->getExchanges();
58+
foreach ($exchanges as $exchange) {
59+
$connection = $exchange->getConnection();
60+
if ($this->connectionTypeResolver->getConnectionType($connection)) {
61+
foreach ($exchange->getBindings() as $binding) {
62+
// This only supports exact matching of topics.
63+
if ($binding->getTopic() === $topic) {
64+
$queueNames[] = $binding->getDestination();
65+
}
66+
}
67+
}
68+
}
69+
4570
$messages = array_map(
4671
function ($envelope) {
4772
return $envelope->getBody();

app/code/Magento/MysqlMq/Model/Driver/Exchange.php

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,25 @@
33
* Copyright © Magento, Inc. All rights reserved.
44
* See COPYING.txt for license details.
55
*/
6+
67
namespace Magento\MysqlMq\Model\Driver;
78

89
use Magento\Framework\MessageQueue\EnvelopeInterface;
910
use Magento\Framework\MessageQueue\ExchangeInterface;
10-
use Magento\Framework\MessageQueue\ConfigInterface as MessageQueueConfig;
11+
use Magento\Framework\MessageQueue\Topology\ConfigInterface as MessageQueueConfig;
12+
use Magento\MysqlMq\Model\ConnectionTypeResolver;
1113
use Magento\MysqlMq\Model\QueueManagement;
1214

15+
/**
16+
* Class Exchange
17+
*/
1318
class Exchange implements ExchangeInterface
1419
{
20+
/**
21+
* @var ConnectionTypeResolver
22+
*/
23+
private $connectionTypeResolver;
24+
1525
/**
1626
* @var MessageQueueConfig
1727
*/
@@ -25,13 +35,18 @@ class Exchange implements ExchangeInterface
2535
/**
2636
* Initialize dependencies.
2737
*
38+
* @param ConnectionTypeResolver $connectionTypeResolver
2839
* @param MessageQueueConfig $messageQueueConfig
2940
* @param QueueManagement $queueManagement
3041
*/
31-
public function __construct(MessageQueueConfig $messageQueueConfig, QueueManagement $queueManagement)
32-
{
42+
public function __construct(
43+
ConnectionTypeResolver $connectionTypeResolver,
44+
MessageQueueConfig $messageQueueConfig,
45+
QueueManagement $queueManagement
46+
) {
3347
$this->messageQueueConfig = $messageQueueConfig;
3448
$this->queueManagement = $queueManagement;
49+
$this->connectionTypeResolver = $connectionTypeResolver;
3550
}
3651

3752
/**
@@ -43,7 +58,18 @@ public function __construct(MessageQueueConfig $messageQueueConfig, QueueManagem
4358
*/
4459
public function enqueue($topic, EnvelopeInterface $envelope)
4560
{
46-
$queueNames = $this->messageQueueConfig->getQueuesByTopic($topic);
61+
$queueNames = [];
62+
$exchanges = $this->messageQueueConfig->getExchanges();
63+
foreach ($exchanges as $exchange) {
64+
$connection = $exchange->getConnection();
65+
if ($this->connectionTypeResolver->getConnectionType($connection)) {
66+
foreach ($exchange->getBindings() as $binding) {
67+
if ($binding->getTopic() == $topic) {
68+
$queueNames[] = $binding->getDestination();
69+
}
70+
}
71+
}
72+
}
4773
$this->queueManagement->addMessageToQueues($topic, $envelope->getBody(), $queueNames);
4874
return null;
4975
}

app/code/Magento/MysqlMq/Setup/Recurring.php

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
* Copyright © Magento, Inc. All rights reserved.
44
* See COPYING.txt for license details.
55
*/
6+
67
namespace Magento\MysqlMq\Setup;
78

89
use Magento\Framework\Setup\InstallSchemaInterface;
910
use Magento\Framework\Setup\ModuleContextInterface;
1011
use Magento\Framework\Setup\SchemaSetupInterface;
11-
use Magento\Framework\MessageQueue\ConfigInterface as MessageQueueConfig;
12+
use Magento\Framework\MessageQueue\Topology\ConfigInterface as MessageQueueConfig;
1213

1314
/**
1415
* Class Recurring
@@ -29,17 +30,17 @@ public function __construct(MessageQueueConfig $messageQueueConfig)
2930
}
3031

3132
/**
32-
* {@inheritdoc}
33+
* @inheritdoc
3334
*/
3435
public function install(SchemaSetupInterface $setup, ModuleContextInterface $context)
3536
{
3637
$setup->startSetup();
3738

38-
$binds = $this->messageQueueConfig->getBinds();
3939
$queues = [];
40-
foreach ($binds as $bind) {
41-
$queues[] = $bind[MessageQueueConfig::BIND_QUEUE];
40+
foreach ($this->messageQueueConfig->getQueues() as $queue) {
41+
$queues[] = $queue->getName();
4242
}
43+
4344
$connection = $setup->getConnection();
4445
$existingQueues = $connection->fetchCol($connection->select()->from($setup->getTable('queue'), 'name'));
4546
$queues = array_unique(array_diff($queues, $existingQueues));

app/code/Magento/MysqlMq/Test/Unit/Model/Driver/Bulk/ExchangeTest.php

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ class ExchangeTest extends \PHPUnit\Framework\TestCase
2525
* @var \Magento\MysqlMq\Model\Driver\Bulk\Exchange
2626
*/
2727
private $exchange;
28+
/**
29+
* @var \Magento\MysqlMq\Model\ConnectionTypeResolver|\PHPUnit_Framework_MockObject_MockObject
30+
*/
31+
private $connnectionTypeResolver;
2832

2933
/**
3034
* Set up.
@@ -33,15 +37,20 @@ class ExchangeTest extends \PHPUnit\Framework\TestCase
3337
*/
3438
protected function setUp()
3539
{
36-
$this->messageQueueConfig = $this->getMockBuilder(\Magento\Framework\MessageQueue\ConfigInterface::class)
40+
$this->messageQueueConfig = $this->getMockBuilder(
41+
\Magento\Framework\MessageQueue\Topology\ConfigInterface::class
42+
)
3743
->disableOriginalConstructor()->getMock();
3844
$this->queueManagement = $this->getMockBuilder(\Magento\MysqlMq\Model\QueueManagement::class)
3945
->disableOriginalConstructor()->getMock();
46+
$this->connnectionTypeResolver = $this->getMockBuilder(\Magento\MysqlMq\Model\ConnectionTypeResolver::class)
47+
->disableOriginalConstructor()->getMock();
4048

4149
$objectManager = new \Magento\Framework\TestFramework\Unit\Helper\ObjectManager($this);
4250
$this->exchange = $objectManager->getObject(
4351
\Magento\MysqlMq\Model\Driver\Bulk\Exchange::class,
4452
[
53+
'connectionTypeResolver' => $this->connnectionTypeResolver,
4554
'messageQueueConfig' => $this->messageQueueConfig,
4655
'queueManagement' => $this->queueManagement,
4756
]
@@ -56,10 +65,46 @@ protected function setUp()
5665
public function testEnqueue()
5766
{
5867
$topicName = 'topic.name';
59-
$queueNames = ['queue0', 'queue1'];
68+
$queueNames = ['queue0'];
69+
$binding1 = $this->createMock(
70+
\Magento\Framework\MessageQueue\Topology\Config\ExchangeConfigItem\BindingInterface::class
71+
);
72+
$binding1->expects($this->once())
73+
->method('getTopic')
74+
->willReturn($topicName);
75+
$binding1->expects($this->once())
76+
->method('getDestination')
77+
->willReturn($queueNames[0]);
78+
$binding2 = $this->createMock(
79+
\Magento\Framework\MessageQueue\Topology\Config\ExchangeConfigItem\BindingInterface::class
80+
);
81+
$binding2->expects($this->once())
82+
->method('getTopic')
83+
->willReturn('different.topic');
84+
$binding2->expects($this->never())
85+
->method('getDestination');
86+
$exchange1 = $this->createMock(
87+
\Magento\Framework\MessageQueue\Topology\Config\ExchangeConfigItemInterface::class
88+
);
89+
$exchange1->expects($this->once())
90+
->method('getConnection')
91+
->willReturn('db');
92+
$exchange1->expects($this->once())
93+
->method('getBindings')
94+
->willReturn([$binding1, $binding2]);
95+
$exchange2 = $this->createMock(
96+
\Magento\Framework\MessageQueue\Topology\Config\ExchangeConfigItemInterface::class
97+
);
98+
$exchange2->expects($this->once())
99+
->method('getConnection')
100+
->willReturn('amqp');
101+
$exchange2->expects($this->never())
102+
->method('getBindings');
103+
104+
$this->connnectionTypeResolver->method('getConnectionType')->willReturnOnConsecutiveCalls(['db', null]);
60105
$envelopeBody = 'serializedMessage';
61106
$this->messageQueueConfig->expects($this->once())
62-
->method('getQueuesByTopic')->with($topicName)->willReturn($queueNames);
107+
->method('getExchanges')->willReturn([$exchange1, $exchange2]);
63108
$envelope = $this->getMockBuilder(\Magento\Framework\MessageQueue\EnvelopeInterface::class)
64109
->disableOriginalConstructor()->getMock();
65110
$envelope->expects($this->once())->method('getBody')->willReturn($envelopeBody);

app/code/Magento/MysqlMq/Test/Unit/Setup/RecurringTest.php

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ class RecurringTest extends \PHPUnit\Framework\TestCase
3434
protected function setUp()
3535
{
3636
$this->objectManager = new ObjectManager($this);
37-
$this->messageQueueConfig = $this->getMockBuilder(\Magento\Framework\MessageQueue\ConfigInterface::class)
37+
$this->messageQueueConfig = $this->getMockBuilder(
38+
\Magento\Framework\MessageQueue\Topology\ConfigInterface::class
39+
)
3840
->getMockForAbstractClass();
3941
$this->model = $this->objectManager->getObject(
4042
\Magento\MysqlMq\Setup\Recurring::class,
@@ -49,23 +51,14 @@ protected function setUp()
4951
*/
5052
public function testInstall()
5153
{
52-
$binds = [
53-
'first_bind' => [
54-
'queue' => 'queue_name_1',
55-
'exchange' => 'magento-db',
56-
'topic' => 'queue.topic.1'
57-
],
58-
'second_bind' => [
59-
'queue' => 'queue_name_2',
60-
'exchange' => 'magento-db',
61-
'topic' => 'queue.topic.2'
62-
],
63-
'third_bind' => [
64-
'queue' => 'queue_name_3',
65-
'exchange' => 'magento-db',
66-
'topic' => 'queue.topic.3'
67-
]
68-
];
54+
for ($i = 1; $i <= 3; $i++) {
55+
$queue = $this->createMock(\Magento\Framework\MessageQueue\Topology\Config\QueueConfigItemInterface::class);
56+
$queue->expects($this->once())
57+
->method('getName')
58+
->willReturn('queue_name_' . $i);
59+
$queues[] = $queue;
60+
}
61+
6962
$dbQueues = [
7063
'queue_name_1',
7164
'queue_name_2',
@@ -81,7 +74,7 @@ public function testInstall()
8174
->getMockForAbstractClass();
8275

8376
$setup->expects($this->once())->method('startSetup')->willReturnSelf();
84-
$this->messageQueueConfig->expects($this->once())->method('getBinds')->willReturn($binds);
77+
$this->messageQueueConfig->expects($this->once())->method('getQueues')->willReturn($queues);
8578
$connection = $this->getMockBuilder(\Magento\Framework\DB\Adapter\AdapterInterface::class)
8679
->getMockForAbstractClass();
8780
$setup->expects($this->once())->method('getConnection')->willReturn($connection);

0 commit comments

Comments
 (0)