@@ -92,10 +92,16 @@ public function ack(Envelope $envelope): void
9292 try {
9393 $ stamp = $ this ->findAmqpStamp ($ envelope );
9494
95- $ this ->connection ->ack (
96- $ stamp ->getAmqpEnvelope (),
97- $ stamp ->getQueueName ()
98- );
95+ $ this ->connection ->ack ($ stamp ->getAmqpEnvelope (), $ stamp ->getQueueName ());
96+ } catch (\AMQPConnectionException ) {
97+ try {
98+ $ stamp = $ this ->findAmqpStamp ($ envelope );
99+
100+ $ this ->connection ->queue ($ stamp ->getQueueName ())->getConnection ()->reconnect ();
101+ $ this ->connection ->ack ($ stamp ->getAmqpEnvelope (), $ stamp ->getQueueName ());
102+ } catch (\AMQPException $ exception ) {
103+ throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
104+ }
99105 } catch (\AMQPException $ exception ) {
100106 throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
101107 }
@@ -124,6 +130,13 @@ private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, string $queueNa
124130 {
125131 try {
126132 $ this ->connection ->nack ($ amqpEnvelope , $ queueName , \AMQP_NOPARAM );
133+ } catch (\AMQPConnectionException ) {
134+ try {
135+ $ this ->connection ->queue ($ queueName )->getConnection ()->reconnect ();
136+ $ this ->connection ->nack ($ amqpEnvelope , $ queueName , \AMQP_NOPARAM );
137+ } catch (\AMQPException $ exception ) {
138+ throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
139+ }
127140 } catch (\AMQPException $ exception ) {
128141 throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
129142 }
0 commit comments