Skip to content

Commit d5a8373

Browse files
KonstantinCodesweaverryan
authored andcommitted
Refactor the InteropStamp logic
1 parent 90276d9 commit d5a8373

File tree

2 files changed

+18
-15
lines changed

2 files changed

+18
-15
lines changed

EnvelopeItem/InteropMessageStamp.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
namespace Enqueue\MessengerAdapter\EnvelopeItem;
1313

1414
use Interop\Queue\Message;
15-
use Symfony\Component\Messenger\Stamp\StampInterface;
15+
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
1616

17-
final class InteropMessageStamp implements StampInterface
17+
final class InteropMessageStamp implements NonSendableStampInterface
1818
{
1919
/** @var Message */
2020
private $message;

QueueInteropTransport.php

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use Enqueue\MessengerAdapter\EnvelopeItem\InteropMessageStamp;
1818
use Interop\Queue\Consumer;
1919
use Symfony\Component\Messenger\Envelope;
20+
use Symfony\Component\Messenger\Exception\LogicException;
2021
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2122
use Symfony\Component\Messenger\Transport\TransportInterface;
2223
use Interop\Queue\Exception as InteropQueueException;
@@ -78,16 +79,13 @@ public function get(): iterable
7879
throw $e;
7980
}
8081

81-
/** @var Envelope $envelope */
8282
$envelope = $this->serializer->decode(array(
8383
'body' => $interopMessage->getBody(),
8484
'headers' => $interopMessage->getHeaders(),
8585
'properties' => $interopMessage->getProperties(),
8686
));
8787

88-
if ($envelope) {
89-
$envelope = $envelope->with(new InteropMessageStamp($interopMessage));
90-
}
88+
$envelope = $envelope->with(new InteropMessageStamp($interopMessage));
9189

9290
return array($envelope);
9391
}
@@ -97,7 +95,7 @@ public function get(): iterable
9795
*/
9896
public function ack(Envelope $envelope): void
9997
{
100-
$interopMessage = $this->encodeMessage($envelope);
98+
$interopMessage = $this->findMessage($envelope);
10199

102100
$this->getConsumer()->acknowledge($interopMessage);
103101
}
@@ -107,7 +105,7 @@ public function ack(Envelope $envelope): void
107105
*/
108106
public function reject(Envelope $envelope): void
109107
{
110-
$interopMessage = $this->encodeMessage($envelope);
108+
$interopMessage = $this->findMessage($envelope);
111109

112110
$this->getConsumer()->reject($interopMessage);
113111
}
@@ -223,13 +221,6 @@ private function setMessageMetadata(Message $interopMessage, Envelope $envelope)
223221

224222
private function encodeMessage(Envelope $envelope): Message
225223
{
226-
/** @var InteropMessageStamp $interopStamp */
227-
$interopStamp = $envelope->last(InteropMessageStamp::class);
228-
229-
if ($interopStamp) {
230-
return $interopStamp->getMessage();
231-
}
232-
233224
$context = $this->contextManager->context();
234225
$encodedMessage = $this->serializer->encode($envelope);
235226

@@ -242,6 +233,18 @@ private function encodeMessage(Envelope $envelope): Message
242233
return $interopMessage;
243234
}
244235

236+
private function findMessage(Envelope $envelope): Message
237+
{
238+
/** @var InteropMessageStamp $interopStamp */
239+
$interopStamp = $envelope->last(InteropMessageStamp::class);
240+
241+
if (null === $interopStamp) {
242+
throw new LogicException('No InteropMessageStamp found in the Envelope.');
243+
}
244+
245+
return $interopStamp->getMessage();
246+
}
247+
245248
private function getConsumer(): Consumer
246249
{
247250
$context = $this->contextManager->context();

0 commit comments

Comments
 (0)