|
14 | 14 | use Enqueue\AmqpTools\DelayStrategyAware; |
15 | 15 | use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy; |
16 | 16 | use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy; |
| 17 | +use Enqueue\MessengerAdapter\EnvelopeItem\InteropMessageStamp; |
17 | 18 | use Interop\Queue\Consumer; |
18 | 19 | use Symfony\Component\Messenger\Envelope; |
19 | 20 | use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; |
@@ -77,13 +78,18 @@ public function get(): iterable |
77 | 78 | throw $e; |
78 | 79 | } |
79 | 80 |
|
80 | | - return array( |
81 | | - $this->serializer->decode(array( |
82 | | - 'body' => $interopMessage->getBody(), |
83 | | - 'headers' => $interopMessage->getHeaders(), |
84 | | - 'properties' => $interopMessage->getProperties(), |
85 | | - )), |
86 | | - ); |
| 81 | + /** @var Envelope $envelope */ |
| 82 | + $envelope = $this->serializer->decode(array( |
| 83 | + 'body' => $interopMessage->getBody(), |
| 84 | + 'headers' => $interopMessage->getHeaders(), |
| 85 | + 'properties' => $interopMessage->getProperties(), |
| 86 | + )); |
| 87 | + |
| 88 | + if ($envelope) { |
| 89 | + $envelope = $envelope->with(new InteropMessageStamp($interopMessage)); |
| 90 | + } |
| 91 | + |
| 92 | + return array($envelope); |
87 | 93 | } |
88 | 94 |
|
89 | 95 | /** |
@@ -217,6 +223,13 @@ private function setMessageMetadata(Message $interopMessage, Envelope $envelope) |
217 | 223 |
|
218 | 224 | private function encodeMessage(Envelope $envelope): Message |
219 | 225 | { |
| 226 | + /** @var InteropMessageStamp $interopStamp */ |
| 227 | + $interopStamp = $envelope->last(InteropMessageStamp::class); |
| 228 | + |
| 229 | + if ($interopStamp) { |
| 230 | + return $interopStamp->getMessage(); |
| 231 | + } |
| 232 | + |
220 | 233 | $context = $this->contextManager->context(); |
221 | 234 | $encodedMessage = $this->serializer->encode($envelope); |
222 | 235 |
|
|
0 commit comments