Skip to content

Commit fd6cf5e

Browse files
committed
fix AsynchronousOperations multistore issue by passing store_id from in amqp application_headers property
1 parent 15100ac commit fd6cf5e

File tree

4 files changed

+106
-2
lines changed

4 files changed

+106
-2
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
<?php
2+
/**
3+
* Copyright © Magento, Inc. All rights reserved.
4+
* See COPYING.txt for license details.
5+
*/
6+
7+
declare(strict_types=1);
8+
9+
namespace Magento\Amqp\Plugin\Framework\MessageQueue;
10+
11+
use Magento\Store\Model\StoreManagerInterface;
12+
use Magento\Framework\MessageQueue\EnvelopeFactory;
13+
use PhpAmqpLib\Wire\AMQPTable;
14+
15+
/**
16+
* Plugin to set 'store_id' to the new custom header 'store_id' in amqp
17+
* 'application_headers'.
18+
*/
19+
class EnvelopeFactoryPlugin
20+
{
21+
/**
22+
* @var \Magento\Store\Model\StoreManagerInterface
23+
*/
24+
private $storeManager;
25+
26+
/**
27+
* @param \Magento\Store\Model\StoreManagerInterface $storeManager
28+
*/
29+
public function __construct(
30+
StoreManagerInterface $storeManager
31+
) {
32+
$this->storeManager = $storeManager;
33+
}
34+
35+
/**
36+
* Pass current 'store_id' to the new custom header 'store_id' in amqp
37+
* 'application_headers' Magento\AsynchronousOperations\Model\MassConsumer
38+
* will use store_id to setCurrentStore and will execute messages for
39+
* correct store instead of default.
40+
*
41+
* @return array
42+
* @SuppressWarnings(PHPMD.UnusedFormalParameter)
43+
*/
44+
public function beforeCreate(EnvelopeFactory $subject, array $data = [])
45+
{
46+
if (!isset($data['publisher_flag'])) {
47+
return null;
48+
} else {
49+
unset($data['publisher_flag']);
50+
}
51+
try {
52+
$storeId = $this->storeManager->getStore()->getId();
53+
54+
if (isset($storeId)) {
55+
if (isset($data['properties'])) {
56+
$properties = $data['properties'];
57+
if (isset($properties['application_headers'])) {
58+
$headers = $properties['application_headers'];
59+
if ($headers instanceof AMQPTable) {
60+
$headers->set('store_id', $storeId);
61+
$data['properties']['application_headers'] = $headers;
62+
}
63+
} else {
64+
$data['properties']['application_headers'] = new AMQPTable(['store_id' => $storeId]);
65+
}
66+
}
67+
}
68+
} catch (\Exception $e) {
69+
return null;
70+
}
71+
72+
return [$data];
73+
}
74+
}

app/code/Magento/Amqp/etc/di.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,7 @@
7272
<argument name="instanceName" xsi:type="string">\Magento\Framework\Amqp\Bulk\Exchange</argument>
7373
</arguments>
7474
</virtualType>
75+
<type name="Magento\Framework\MessageQueue\EnvelopeFactory">
76+
<plugin name="amqpStoreIdFieldForMessageQueueEnvelopeFactory" type="Magento\Amqp\Plugin\Framework\MessageQueue\EnvelopeFactoryPlugin" />
77+
</type>
7578
</config>

app/code/Magento/AsynchronousOperations/Model/MassConsumer.php

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
use Magento\Framework\App\ResourceConnection;
1212
use Magento\Framework\Registry;
13+
use Magento\Store\Model\StoreManagerInterface;
14+
use PhpAmqpLib\Wire\AMQPTable;
1315
use Psr\Log\LoggerInterface;
1416
use Magento\Framework\MessageQueue\MessageLockException;
1517
use Magento\Framework\MessageQueue\ConnectionLostException;
@@ -63,6 +65,10 @@ class MassConsumer implements ConsumerInterface
6365
* @var Registry
6466
*/
6567
private $registry;
68+
/**
69+
* @var \Magento\Store\Model\StoreManagerInterface
70+
*/
71+
private $storeManager;
6672

6773
/**
6874
* Initialize dependencies.
@@ -74,6 +80,7 @@ class MassConsumer implements ConsumerInterface
7480
* @param OperationProcessorFactory $operationProcessorFactory
7581
* @param LoggerInterface $logger
7682
* @param Registry $registry
83+
* @param StoreManagerInterface $storeManager
7784
*/
7885
public function __construct(
7986
CallbackInvokerInterface $invoker,
@@ -82,7 +89,8 @@ public function __construct(
8289
ConsumerConfigurationInterface $configuration,
8390
OperationProcessorFactory $operationProcessorFactory,
8491
LoggerInterface $logger,
85-
Registry $registry = null
92+
Registry $registry = null,
93+
StoreManagerInterface $storeManager = null
8694
) {
8795
$this->invoker = $invoker;
8896
$this->resource = $resource;
@@ -94,6 +102,8 @@ public function __construct(
94102
$this->logger = $logger;
95103
$this->registry = $registry ?? \Magento\Framework\App\ObjectManager::getInstance()
96104
->get(Registry::class);
105+
$this->storeManager = $storeManager ?? \Magento\Framework\App\ObjectManager::getInstance()
106+
->get(StoreManagerInterface::class);
97107
}
98108

99109
/**
@@ -126,6 +136,22 @@ private function getTransactionCallback(QueueInterface $queue)
126136
/** @var LockInterface $lock */
127137
$lock = null;
128138
try {
139+
$amqpProperties = $message->getProperties();
140+
if (isset($amqpProperties['application_headers'])) {
141+
$headers = $amqpProperties['application_headers'];
142+
if ($headers instanceof AMQPTable) {
143+
$headers = $headers->getNativeData();
144+
}
145+
if (isset($headers['store_id'])) {
146+
$storeId = $headers['store_id'];
147+
$currentStoreId = $this->storeManager->getStore()->getId();
148+
149+
if (isset($storeId) && $storeId !== $currentStoreId) {
150+
$this->storeManager->setCurrentStore($storeId);
151+
}
152+
}
153+
}
154+
129155
$topicName = $message->getProperties()['topic_name'];
130156
$lock = $this->messageController->lock($message, $this->configuration->getConsumerName());
131157

app/code/Magento/AsynchronousOperations/Model/MassPublisher.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ public function publish($topicName, $data)
9393
'properties' => [
9494
'delivery_mode' => 2,
9595
'message_id' => $this->messageIdGenerator->generate($topicName),
96-
]
96+
],
97+
'publisher_flag'=>true
9798
]
9899
);
99100
}

0 commit comments

Comments
 (0)