1616package com .rabbitmq .client .impl .recovery ;
1717
1818import com .rabbitmq .client .*;
19- import com .rabbitmq .client .RecoverableChannel ;
19+ import com .rabbitmq .client .impl .AMQCommand ;
20+ import org .slf4j .Logger ;
21+ import org .slf4j .LoggerFactory ;
2022
2123import java .io .IOException ;
2224import java .util .*;
3032 * @since 3.3.0
3133 */
3234public class AutorecoveringChannel implements RecoverableChannel {
35+
36+ private static final Logger LOGGER = LoggerFactory .getLogger (AutorecoveringChannel .class );
37+
3338 private volatile RecoveryAwareChannelN delegate ;
3439 private volatile AutorecoveringConnection connection ;
3540 private final List <ShutdownListener > shutdownHooks = new CopyOnWriteArrayList <ShutdownListener >();
@@ -235,12 +240,7 @@ public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeT
235240 @ Override
236241 public AMQP .Exchange .DeclareOk exchangeDeclare (String exchange , String type , boolean durable , boolean autoDelete , boolean internal , Map <String , Object > arguments ) throws IOException {
237242 final AMQP .Exchange .DeclareOk ok = delegate .exchangeDeclare (exchange , type , durable , autoDelete , internal , arguments );
238- RecordedExchange x = new RecordedExchange (this , exchange ).
239- type (type ).
240- durable (durable ).
241- autoDelete (autoDelete ).
242- arguments (arguments );
243- recordExchange (exchange , x );
243+ recordExchange (ok , exchange , type , durable , autoDelete , arguments );
244244 return ok ;
245245 }
246246
@@ -331,15 +331,7 @@ public AMQP.Queue.DeclareOk queueDeclare() throws IOException {
331331 @ Override
332332 public AMQP .Queue .DeclareOk queueDeclare (String queue , boolean durable , boolean exclusive , boolean autoDelete , Map <String , Object > arguments ) throws IOException {
333333 final AMQP .Queue .DeclareOk ok = delegate .queueDeclare (queue , durable , exclusive , autoDelete , arguments );
334- RecordedQueue q = new RecordedQueue (this , ok .getQueue ()).
335- durable (durable ).
336- exclusive (exclusive ).
337- autoDelete (autoDelete ).
338- arguments (arguments );
339- if (queue .equals (RecordedQueue .EMPTY_STRING )) {
340- q .serverNamed (true );
341- }
342- recordQueue (ok , q );
334+ recordQueue (ok , queue , durable , exclusive , autoDelete , arguments );
343335 return ok ;
344336 }
345337
@@ -356,7 +348,6 @@ public void queueDeclareNoWait(String queue,
356348 arguments (arguments );
357349 delegate .queueDeclareNoWait (queue , durable , exclusive , autoDelete , arguments );
358350 recordQueue (queue , meta );
359-
360351 }
361352
362353 @ Override
@@ -546,7 +537,10 @@ public void asyncRpc(Method method) throws IOException {
546537
547538 @ Override
548539 public Command rpc (Method method ) throws IOException {
549- return delegate .rpc (method );
540+ recordOnRpcRequest (method );
541+ AMQCommand response = delegate .rpc (method );
542+ recordOnRpcResponse (response .getMethod (), method );
543+ return response ;
550544 }
551545
552546 /**
@@ -680,6 +674,18 @@ private boolean deleteRecordedExchangeBinding(String destination, String source,
680674 return this .connection .deleteRecordedExchangeBinding (this , destination , source , routingKey , arguments );
681675 }
682676
677+ private void recordQueue (AMQP .Queue .DeclareOk ok , String queue , boolean durable , boolean exclusive , boolean autoDelete , Map <String , Object > arguments ) {
678+ RecordedQueue q = new RecordedQueue (this , ok .getQueue ()).
679+ durable (durable ).
680+ exclusive (exclusive ).
681+ autoDelete (autoDelete ).
682+ arguments (arguments );
683+ if (queue .equals (RecordedQueue .EMPTY_STRING )) {
684+ q .serverNamed (true );
685+ }
686+ recordQueue (ok , q );
687+ }
688+
683689 private void recordQueue (AMQP .Queue .DeclareOk ok , RecordedQueue q ) {
684690 this .connection .recordQueue (ok , q );
685691 }
@@ -692,6 +698,15 @@ private void deleteRecordedQueue(String queue) {
692698 this .connection .deleteRecordedQueue (queue );
693699 }
694700
701+ private void recordExchange (AMQP .Exchange .DeclareOk ok , String exchange , String type , boolean durable , boolean autoDelete , Map <String , Object > arguments ) {
702+ RecordedExchange x = new RecordedExchange (this , exchange ).
703+ type (type ).
704+ durable (durable ).
705+ autoDelete (autoDelete ).
706+ arguments (arguments );
707+ recordExchange (exchange , x );
708+ }
709+
695710 private void recordExchange (String exchange , RecordedExchange x ) {
696711 this .connection .recordExchange (exchange , x );
697712 }
@@ -736,6 +751,74 @@ void updateConsumerTag(String tag, String newTag) {
736751 }
737752 }
738753
754+ private void recordOnRpcRequest (Method method ) {
755+ if (method instanceof AMQP .Queue .Delete ) {
756+ deleteRecordedQueue (((AMQP .Queue .Delete ) method ).getQueue ());
757+ } else if (method instanceof AMQP .Exchange .Delete ) {
758+ deleteRecordedExchange (((AMQP .Exchange .Delete ) method ).getExchange ());
759+ } else if (method instanceof AMQP .Queue .Unbind ) {
760+ AMQP .Queue .Unbind unbind = (AMQP .Queue .Unbind ) method ;
761+ deleteRecordedQueueBinding (
762+ unbind .getQueue (), unbind .getExchange (),
763+ unbind .getRoutingKey (), unbind .getArguments ()
764+ );
765+ this .maybeDeleteRecordedAutoDeleteExchange (unbind .getExchange ());
766+ } else if (method instanceof AMQP .Exchange .Unbind ) {
767+ AMQP .Exchange .Unbind unbind = (AMQP .Exchange .Unbind ) method ;
768+ deleteRecordedExchangeBinding (
769+ unbind .getDestination (), unbind .getSource (),
770+ unbind .getRoutingKey (), unbind .getArguments ()
771+ );
772+ this .maybeDeleteRecordedAutoDeleteExchange (unbind .getSource ());
773+ }
774+ }
775+
776+ private void recordOnRpcResponse (Method response , Method request ) {
777+ if (response instanceof AMQP .Queue .DeclareOk ) {
778+ if (request instanceof AMQP .Queue .Declare ) {
779+ AMQP .Queue .DeclareOk ok = (AMQP .Queue .DeclareOk ) response ;
780+ AMQP .Queue .Declare declare = (AMQP .Queue .Declare ) request ;
781+ recordQueue (
782+ ok , declare .getQueue (),
783+ declare .getDurable (), declare .getExclusive (), declare .getAutoDelete (),
784+ declare .getArguments ()
785+ );
786+ } else {
787+ LOGGER .warn ("RPC response {} and RPC request {} not compatible, topology not recorded." ,
788+ response .getClass (), request .getClass ());
789+ }
790+ } else if (response instanceof AMQP .Exchange .DeclareOk ) {
791+ if (request instanceof AMQP .Exchange .Declare ) {
792+ AMQP .Exchange .DeclareOk ok = (AMQP .Exchange .DeclareOk ) response ;
793+ AMQP .Exchange .Declare declare = (AMQP .Exchange .Declare ) request ;
794+ recordExchange (
795+ ok , declare .getExchange (), declare .getType (),
796+ declare .getDurable (), declare .getAutoDelete (),
797+ declare .getArguments ()
798+ );
799+ } else {
800+ LOGGER .warn ("RPC response {} and RPC request {} not compatible, topology not recorded." ,
801+ response .getClass (), request .getClass ());
802+ }
803+ } else if (response instanceof AMQP .Queue .BindOk ) {
804+ if (request instanceof AMQP .Queue .Bind ) {
805+ AMQP .Queue .Bind bind = (AMQP .Queue .Bind ) request ;
806+ recordQueueBinding (bind .getQueue (), bind .getExchange (), bind .getRoutingKey (), bind .getArguments ());
807+ } else {
808+ LOGGER .warn ("RPC response {} and RPC request {} not compatible, topology not recorded." ,
809+ response .getClass (), request .getClass ());
810+ }
811+ } else if (response instanceof AMQP .Exchange .BindOk ) {
812+ if (request instanceof AMQP .Exchange .Bind ) {
813+ AMQP .Exchange .Bind bind = (AMQP .Exchange .Bind ) request ;
814+ recordExchangeBinding (bind .getDestination (), bind .getSource (), bind .getRoutingKey (), bind .getArguments ());
815+ } else {
816+ LOGGER .warn ("RPC response {} and RPC request {} not compatible, topology not recorded." ,
817+ response .getClass (), request .getClass ());
818+ }
819+ }
820+ }
821+
739822 @ Override
740823 public String toString () {
741824 return this .delegate .toString ();
0 commit comments