Skip to content

Commit 5356023

Browse files
AurelienPillevessenicolas-grekas
authored andcommitted
improve amqp connection issues
1 parent 717ca76 commit 5356023

File tree

1 file changed

+17
-4
lines changed

1 file changed

+17
-4
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,16 @@ public function ack(Envelope $envelope): void
9292
try {
9393
$stamp = $this->findAmqpStamp($envelope);
9494

95-
$this->connection->ack(
96-
$stamp->getAmqpEnvelope(),
97-
$stamp->getQueueName()
98-
);
95+
$this->connection->ack($stamp->getAmqpEnvelope(), $stamp->getQueueName());
96+
} catch (\AMQPConnectionException) {
97+
try {
98+
$stamp = $this->findAmqpStamp($envelope);
99+
100+
$this->connection->queue($stamp->getQueueName())->getConnection()->reconnect();
101+
$this->connection->ack($stamp->getAmqpEnvelope(), $stamp->getQueueName());
102+
} catch (\AMQPException $exception) {
103+
throw new TransportException($exception->getMessage(), 0, $exception);
104+
}
99105
} catch (\AMQPException $exception) {
100106
throw new TransportException($exception->getMessage(), 0, $exception);
101107
}
@@ -124,6 +130,13 @@ private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, string $queueNa
124130
{
125131
try {
126132
$this->connection->nack($amqpEnvelope, $queueName, \AMQP_NOPARAM);
133+
} catch (\AMQPConnectionException) {
134+
try {
135+
$this->connection->queue($queueName)->getConnection()->reconnect();
136+
$this->connection->nack($amqpEnvelope, $queueName, \AMQP_NOPARAM);
137+
} catch (\AMQPException $exception) {
138+
throw new TransportException($exception->getMessage(), 0, $exception);
139+
}
127140
} catch (\AMQPException $exception) {
128141
throw new TransportException($exception->getMessage(), 0, $exception);
129142
}

0 commit comments

Comments
 (0)