Skip to content

Commit 038615b

Browse files
committed
reject message in consumer plugin instead of throw exception, add classes/methods comment, fix code style issues
1 parent c336e36 commit 038615b

File tree

6 files changed

+36
-13
lines changed

6 files changed

+36
-13
lines changed

app/code/Magento/AmqpStore/Plugin/AsynchronousOperations/MassConsumerEnvelopeCallback.php

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,12 @@ class MassConsumerEnvelopeCallback
2626
* @var StoreManagerInterface
2727
*/
2828
private $storeManager;
29+
2930
/**
3031
* @var EnvelopeFactory
3132
*/
3233
private $envelopeFactory;
34+
3335
/**
3436
* @var LoggerInterface
3537
*/
@@ -52,13 +54,13 @@ public function __construct(
5254

5355
/**
5456
* Check if amqpProperties['application_headers'] have 'store_id' and use it to setCurrentStore
55-
* Restore currentStore of consumer process after execution.
57+
* Restore original store value in consumer process after execution.
58+
* Reject queue messages because of wrong store_id.
5659
*
5760
* @param SubjectMassConsumerEnvelopeCallback $subject
5861
* @param callable $proceed
5962
* @param EnvelopeInterface $message
60-
* @return array|null
61-
* @throws NoSuchEntityException
63+
* @return void
6264
* @SuppressWarnings(PHPMD.UnusedFormalParameter)
6365
*/
6466
public function aroundExecute(SubjectMassConsumerEnvelopeCallback $subject, callable $proceed, EnvelopeInterface $message)
@@ -75,19 +77,22 @@ public function aroundExecute(SubjectMassConsumerEnvelopeCallback $subject, call
7577
$currentStoreId = $this->storeManager->getStore()->getId();
7678
} catch (NoSuchEntityException $e) {
7779
$this->logger->error(
78-
sprintf("Can't set currentStoreId during processing queue. Error %s.", $e->getMessage())
80+
sprintf(
81+
"Can't set currentStoreId during processing queue. Message rejected. Error %s.",
82+
$e->getMessage()
83+
)
7984
);
80-
throw new NoSuchEntityException(__($e->getMessage()));
85+
$subject->getQueue()->reject($message, false, $e->getMessage());
86+
return;
8187
}
8288
if (isset($storeId) && $storeId !== $currentStoreId) {
8389
$this->storeManager->setCurrentStore($storeId);
8490
}
8591
}
8692
}
87-
$result = $proceed($message);
93+
$proceed($message);
8894
if (isset($storeId, $currentStoreId) && $storeId !== $currentStoreId) {
89-
$this->storeManager->setCurrentStore($currentStoreId);//restore previous current store
95+
$this->storeManager->setCurrentStore($currentStoreId);//restore original store value
9096
}
91-
return $result;
9297
}
9398
}

app/code/Magento/AmqpStore/Plugin/Framework/Amqp/Bulk/Exchange.php

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@ class Exchange
2727
* @var StoreManagerInterface
2828
*/
2929
private $storeManager;
30+
3031
/**
3132
* @var EnvelopeFactory
3233
*/
3334
private $envelopeFactory;
35+
3436
/**
3537
* @var LoggerInterface
3638
*/
@@ -51,11 +53,14 @@ public function __construct(
5153
}
5254

5355
/**
56+
* Set current store_id in amqpProperties['application_headers']
57+
* so consumer may check store_id and execute operation in correct store scope.
58+
* Prevent publishing inconsistent messages because of store_id not defined or wrong.
59+
*
5460
* @param SubjectExchange $subject
5561
* @param $topic
5662
* @param EnvelopeInterface[] $envelopes
57-
* @return array|null
58-
* @throws NoSuchEntityException
63+
* @return array
5964
* @throws AMQPInvalidArgumentException
6065
* @SuppressWarnings(PHPMD.UnusedFormalParameter)
6166
*/
@@ -67,7 +72,7 @@ public function beforeEnqueue(SubjectExchange $subject, $topic, array $envelopes
6772
$this->logger->error(
6873
sprintf("Can't get current storeId and inject to amqp message. Error %s.", $e->getMessage())
6974
);
70-
throw new NoSuchEntityException(__($e->getMessage()));
75+
throw new \Exception($e->getMessage());
7176
}
7277

7378
$updatedEnvelopes = [];

app/code/Magento/AmqpStore/composer.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
"magento/module-store": "*",
1212
"php": "~7.1.3||~7.2.0"
1313
},
14+
"suggest": {
15+
"magento/module-asynchronous-operations": "*"
16+
},
1417
"type": "magento2-module",
1518
"license": [
1619
"OSL-3.0",

app/code/Magento/AmqpStore/etc/module.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Module/etc/module.xsd">
99
<module name="Magento_AmqpStore">
1010
<sequence>
11-
<module name="Magento_Amqp"/>
1211
<module name="Magento_Store"/>
1312
</sequence>
1413
</module>

app/code/Magento/AsynchronousOperations/Model/MassConsumer.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class MassConsumer implements ConsumerInterface
3636
* @var Registry
3737
*/
3838
private $registry;
39+
3940
/**
4041
* @var MassConsumerEnvelopeCallbackFactory
4142
*/

app/code/Magento/AsynchronousOperations/Model/MassConsumerEnvelopeCallback.php

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
use Magento\Framework\MessageQueue\MessageController;
2121

2222
/**
23-
* Class used by \Magento\AsynchronousOperations\Model\MassConsumer as public callback function.
23+
* Class used as public callback function by async consumer.
2424
* @SuppressWarnings(PHPMD.CouplingBetweenObjects)
2525
*/
2626
class MassConsumerEnvelopeCallback
@@ -85,6 +85,7 @@ public function __construct(
8585
* Get transaction callback. This handles the case of async.
8686
*
8787
* @param EnvelopeInterface $message
88+
* @return void
8889
*/
8990
public function execute(EnvelopeInterface $message)
9091
{
@@ -121,4 +122,13 @@ public function execute(EnvelopeInterface $message)
121122
}
122123
}
123124
}
125+
126+
/**
127+
* Get message queue.
128+
* @return QueueInterface
129+
*/
130+
public function getQueue()
131+
{
132+
return $this->queue;
133+
}
124134
}

0 commit comments

Comments
 (0)