Skip to content

Commit 80732c8

Browse files
committed
fix code related to reviewer comments, move AsynchronousOperations multistore issue fix to separate plugin, move callable function to separate class
1 parent fd6cf5e commit 80732c8

File tree

11 files changed

+408
-133
lines changed

11 files changed

+408
-133
lines changed

app/code/Magento/Amqp/Plugin/Framework/MessageQueue/EnvelopeFactoryPlugin.php

Lines changed: 0 additions & 74 deletions
This file was deleted.

app/code/Magento/Amqp/etc/di.xml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,4 @@
7272
<argument name="instanceName" xsi:type="string">\Magento\Framework\Amqp\Bulk\Exchange</argument>
7373
</arguments>
7474
</virtualType>
75-
<type name="Magento\Framework\MessageQueue\EnvelopeFactory">
76-
<plugin name="amqpStoreIdFieldForMessageQueueEnvelopeFactory" type="Magento\Amqp\Plugin\Framework\MessageQueue\EnvelopeFactoryPlugin" />
77-
</type>
7875
</config>
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
<?php
2+
/**
3+
* Copyright © Magento, Inc. All rights reserved.
4+
* See COPYING.txt for license details.
5+
*/
6+
7+
declare(strict_types=1);
8+
9+
namespace Magento\AmqpStore\Plugin\AsynchronousOperations;
10+
11+
use Magento\Framework\Exception\NoSuchEntityException;
12+
use Magento\Store\Model\StoreManagerInterface;
13+
use Magento\Framework\MessageQueue\EnvelopeFactory;
14+
use PhpAmqpLib\Wire\AMQPTable;
15+
use Magento\Framework\Amqp\Queue;
16+
use Magento\Framework\MessageQueue\EnvelopeInterface;
17+
use Magento\AsynchronousOperations\Model\MassConsumerEnvelopeCallback;
18+
use Psr\Log\LoggerInterface;
19+
20+
/**
21+
* Plugin to get 'store_id' from the new custom header 'store_id' in amqp
22+
* 'application_headers' properties and setCurrentStore by value 'store_id'.
23+
*/
24+
class MassConsumerEnvelopeCallbackPlugin
25+
{
26+
/**
27+
* @var \Magento\Store\Model\StoreManagerInterface
28+
*/
29+
private $storeManager;
30+
/**
31+
* @var \Magento\Framework\MessageQueue\EnvelopeFactory
32+
*/
33+
private $envelopeFactory;
34+
/**
35+
* @var \Psr\Log\LoggerInterface
36+
*/
37+
private $logger;
38+
39+
/**
40+
* @param \Magento\Framework\MessageQueue\EnvelopeFactory $envelopeFactory
41+
* @param \Magento\Store\Model\StoreManagerInterface $storeManager
42+
* @param \Psr\Log\LoggerInterface $logger
43+
*/
44+
public function __construct(
45+
EnvelopeFactory $envelopeFactory,
46+
StoreManagerInterface $storeManager,
47+
LoggerInterface $logger
48+
) {
49+
$this->storeManager = $storeManager;
50+
$this->envelopeFactory = $envelopeFactory;
51+
$this->logger = $logger;
52+
}
53+
54+
/**
55+
* @param MassConsumerEnvelopeCallback $subject
56+
* @param EnvelopeInterface $message
57+
* @return array|null
58+
* @SuppressWarnings(PHPMD.UnusedFormalParameter)
59+
*/
60+
public function beforeExecute(MassConsumerEnvelopeCallback $subject, EnvelopeInterface $message)
61+
{
62+
$amqpProperties = $message->getProperties();
63+
if (isset($amqpProperties['application_headers'])) {
64+
$headers = $amqpProperties['application_headers'];
65+
if ($headers instanceof AMQPTable) {
66+
$headers = $headers->getNativeData();
67+
}
68+
if (isset($headers['store_id'])) {
69+
$storeId = $headers['store_id'];
70+
try {
71+
$currentStoreId = $this->storeManager->getStore()->getId();
72+
} catch (NoSuchEntityException $e) {
73+
$this->logger->error(
74+
sprintf("Can't set currentStoreId during processing queue. Error %s.", $e->getMessage())
75+
);
76+
return null;
77+
}
78+
if (isset($storeId) && $storeId !== $currentStoreId) {
79+
$this->storeManager->setCurrentStore($storeId);
80+
}
81+
}
82+
}
83+
return [$message];
84+
}
85+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
<?php
2+
/**
3+
* Copyright © Magento, Inc. All rights reserved.
4+
* See COPYING.txt for license details.
5+
*/
6+
7+
declare(strict_types=1);
8+
9+
namespace Magento\AmqpStore\Plugin\Framework\Amqp\Bulk;
10+
11+
use Magento\Framework\Exception\NoSuchEntityException;
12+
use Magento\Store\Model\StoreManagerInterface;
13+
use Magento\Framework\MessageQueue\EnvelopeFactory;
14+
use PhpAmqpLib\Exception\AMQPInvalidArgumentException;
15+
use PhpAmqpLib\Wire\AMQPTable;
16+
use Magento\Framework\Amqp\Bulk\Exchange;
17+
use Magento\Framework\MessageQueue\EnvelopeInterface;
18+
use Psr\Log\LoggerInterface;
19+
20+
/**
21+
* Plugin to set 'store_id' to the new custom header 'store_id' in amqp
22+
* 'application_headers'.
23+
*/
24+
class ExchangePlugin
25+
{
26+
/**
27+
* @var StoreManagerInterface
28+
*/
29+
private $storeManager;
30+
/**
31+
* @var EnvelopeFactory
32+
*/
33+
private $envelopeFactory;
34+
/**
35+
* @var LoggerInterface
36+
*/
37+
private $logger;
38+
39+
/**
40+
* @param EnvelopeFactory $envelopeFactory
41+
* @param StoreManagerInterface $storeManager
42+
*/
43+
public function __construct(
44+
EnvelopeFactory $envelopeFactory,
45+
StoreManagerInterface $storeManager,
46+
LoggerInterface $logger
47+
) {
48+
$this->storeManager = $storeManager;
49+
$this->envelopeFactory = $envelopeFactory;
50+
$this->logger = $logger;
51+
}
52+
53+
/**
54+
* @param Exchange $subject
55+
* @param $topic
56+
* @param EnvelopeInterface[] $envelopes
57+
* @return array|null
58+
* @SuppressWarnings(PHPMD.UnusedFormalParameter)
59+
*/
60+
public function beforeEnqueue(Exchange $subject, $topic, array $envelopes)
61+
{
62+
try {
63+
$storeId = $this->storeManager->getStore()->getId();
64+
} catch (NoSuchEntityException $e) {
65+
$this->logger->error(
66+
sprintf("Can't get current storeId and inject to amqp message. Error %s.", $e->getMessage())
67+
);
68+
return null;
69+
}
70+
71+
$updatedEnvelopes = [];
72+
foreach ($envelopes as $envelope) {
73+
$properties = $envelope->getProperties();
74+
if (!isset($properties)) {
75+
$properties = [];
76+
}
77+
if (isset($properties['application_headers'])) {
78+
$headers = $properties['application_headers'];
79+
if ($headers instanceof AMQPTable) {
80+
try {
81+
$headers->set('store_id', $storeId);
82+
} catch (AMQPInvalidArgumentException $ea) {
83+
$this->logger->error(
84+
sprintf("Can't set storeId to amqp message. Error %s.", $ea->getMessage())
85+
);
86+
return null;
87+
}
88+
$properties['application_headers'] = $headers;
89+
}
90+
} else {
91+
$properties['application_headers'] = new AMQPTable(['store_id' => $storeId]);
92+
}
93+
$updatedEnvelopes[] = $this->envelopeFactory->create(
94+
[
95+
'body' => $envelope->getBody(),
96+
'properties' => $properties
97+
]
98+
);
99+
}
100+
if (!empty($updatedEnvelopes)) {
101+
$envelopes = $updatedEnvelopes;
102+
}
103+
return [$topic, $envelopes];
104+
}
105+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
{
2+
"name": "magento/module-amqp-store",
3+
"description": "N/A",
4+
"config": {
5+
"sort-packages": true
6+
},
7+
"require": {
8+
"magento/framework": "*",
9+
"magento/framework-amqp": "*",
10+
"magento/framework-message-queue": "*",
11+
"php": "~7.1.3||~7.2.0"
12+
},
13+
"type": "magento2-module",
14+
"license": [
15+
"OSL-3.0",
16+
"AFL-3.0"
17+
],
18+
"autoload": {
19+
"files": [
20+
"registration.php"
21+
],
22+
"psr-4": {
23+
"Magento\\AmqpStore\\": ""
24+
}
25+
}
26+
}

app/code/Magento/AmqpStore/etc/di.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?xml version="1.0"?>
2+
<!--
3+
/**
4+
* Copyright © Magento, Inc. All rights reserved.
5+
* See COPYING.txt for license details.
6+
*/
7+
-->
8+
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:ObjectManager/etc/config.xsd">
9+
<type name="Magento\Framework\Amqp\Bulk\Exchange">
10+
<plugin name="amqpStoreIdFieldForAmqpBulkExchange" type="Magento\AmqpStore\Plugin\Framework\Amqp\Bulk\ExchangePlugin"/>
11+
</type>
12+
<type name="Magento\AsynchronousOperations\Model\MassConsumerEnvelopeCallback">
13+
<plugin name="amqpStoreIdFieldForAsynchronousOperationsMassConsumerEnvelopeCallback" type="Magento\AmqpStore\Plugin\AsynchronousOperations\MassConsumerEnvelopeCallbackPlugin"/>
14+
</type>
15+
</config>
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?xml version="1.0"?>
2+
<!--
3+
/**
4+
* Copyright © Magento, Inc. All rights reserved.
5+
* See COPYING.txt for license details.
6+
*/
7+
-->
8+
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Module/etc/module.xsd">
9+
<module name="Magento_AmqpStore">
10+
<sequence>
11+
<module name="Magento_Amqp"/>
12+
<module name="Magento_Store"/>
13+
</sequence>
14+
</module>
15+
</config>
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<?php
2+
/**
3+
* Copyright © Magento, Inc. All rights reserved.
4+
* See COPYING.txt for license details.
5+
*/
6+
7+
use \Magento\Framework\Component\ComponentRegistrar;
8+
9+
ComponentRegistrar::register(ComponentRegistrar::MODULE, 'Magento_AmqpStore', __DIR__);

0 commit comments

Comments
 (0)