Skip to content

Commit 256eb84

Browse files
committed
Updated exception message in AmqpSender, updated AmqpReceiver to throw new TransportException
1 parent 0a30f7b commit 256eb84

File tree

3 files changed

+91
-3
lines changed

3 files changed

+91
-3
lines changed

Tests/Transport/AmqpExt/AmqpReceiverTest.php

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,83 @@ public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionIn
101101
throw new WillNeverWorkException('Well...');
102102
});
103103
}
104+
105+
/**
106+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
107+
*/
108+
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
109+
{
110+
$serializer = new Serializer(
111+
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
112+
);
113+
114+
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
115+
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
116+
$envelope->method('getHeaders')->willReturn([
117+
'type' => DummyMessage::class,
118+
]);
119+
120+
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
121+
$connection->method('get')->willReturn($envelope);
122+
123+
$connection->method('ack')->with($envelope)->willThrowException(new \AMQPException());
124+
125+
$receiver = new AmqpReceiver($connection, $serializer);
126+
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
127+
$receiver->stop();
128+
});
129+
}
130+
131+
/**
132+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
133+
*/
134+
public function testItThrowsATransportExceptionIfItCannotRejectMessage()
135+
{
136+
$serializer = new Serializer(
137+
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
138+
);
139+
140+
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
141+
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
142+
$envelope->method('getHeaders')->willReturn([
143+
'type' => DummyMessage::class,
144+
]);
145+
146+
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
147+
$connection->method('get')->willReturn($envelope);
148+
$connection->method('reject')->with($envelope)->willThrowException(new \AMQPException());
149+
150+
$receiver = new AmqpReceiver($connection, $serializer);
151+
$receiver->receive(function () {
152+
throw new WillNeverWorkException('Well...');
153+
});
154+
}
155+
156+
/**
157+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
158+
*/
159+
public function testItThrowsATransportExceptionIfItCannotNonAcknowledgeMessage()
160+
{
161+
$serializer = new Serializer(
162+
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
163+
);
164+
165+
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
166+
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
167+
$envelope->method('getHeaders')->willReturn([
168+
'type' => DummyMessage::class,
169+
]);
170+
171+
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
172+
$connection->method('get')->willReturn($envelope);
173+
174+
$connection->method('nack')->with($envelope)->willThrowException(new \AMQPException());
175+
176+
$receiver = new AmqpReceiver($connection, $serializer);
177+
$receiver->receive(function () {
178+
throw new InterruptException('Well...');
179+
});
180+
}
104181
}
105182

106183
class InterruptException extends \Exception

Transport/AmqpExt/AmqpReceiver.php

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\Messenger\Transport\AmqpExt;
1313

14+
use Symfony\Component\Messenger\Exception\TransportException;
1415
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
1516
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
1617
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@@ -61,11 +62,21 @@ public function receive(callable $handler): void
6162

6263
$this->connection->ack($AMQPEnvelope);
6364
} catch (RejectMessageExceptionInterface $e) {
64-
$this->connection->reject($AMQPEnvelope);
65+
try {
66+
$this->connection->reject($AMQPEnvelope);
67+
} catch (\AMQPException $exception) {
68+
throw new TransportException($exception->getMessage(), 0, $exception);
69+
}
6570

6671
throw $e;
72+
} catch (\AMQPException $e) {
73+
throw new TransportException($e->getMessage(), 0, $e);
6774
} catch (\Throwable $e) {
68-
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE);
75+
try {
76+
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE);
77+
} catch (\AMQPException $exception) {
78+
throw new TransportException($exception->getMessage(), 0, $exception);
79+
}
6980

7081
throw $e;
7182
} finally {

Transport/AmqpExt/AmqpSender.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public function send(Envelope $envelope): Envelope
4545
try {
4646
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers']);
4747
} catch (\AMQPException $e) {
48-
throw new TransportException('Current transport was not able to send given message, please try again', 0, $e);
48+
throw new TransportException($e->getMessage(), 0, $e);
4949
}
5050

5151
return $envelope;

0 commit comments

Comments
 (0)