1616package com .rabbitmq .client .impl .recovery ;
1717
1818import com .rabbitmq .client .*;
19- import com .rabbitmq .client .RecoverableChannel ;
19+ import com .rabbitmq .client .impl .AMQCommand ;
20+ import org .slf4j .Logger ;
21+ import org .slf4j .LoggerFactory ;
2022
2123import java .io .IOException ;
2224import java .util .*;
3133 * @since 3.3.0
3234 */
3335public class AutorecoveringChannel implements RecoverableChannel {
36+
37+ private static final Logger LOGGER = LoggerFactory .getLogger (AutorecoveringChannel .class );
38+
3439 private volatile RecoveryAwareChannelN delegate ;
3540 private volatile AutorecoveringConnection connection ;
3641 private final List <ShutdownListener > shutdownHooks = new CopyOnWriteArrayList <ShutdownListener >();
@@ -235,12 +240,7 @@ public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeT
235240 @ Override
236241 public AMQP .Exchange .DeclareOk exchangeDeclare (String exchange , String type , boolean durable , boolean autoDelete , boolean internal , Map <String , Object > arguments ) throws IOException {
237242 final AMQP .Exchange .DeclareOk ok = delegate .exchangeDeclare (exchange , type , durable , autoDelete , internal , arguments );
238- RecordedExchange x = new RecordedExchange (this , exchange ).
239- type (type ).
240- durable (durable ).
241- autoDelete (autoDelete ).
242- arguments (arguments );
243- recordExchange (exchange , x );
243+ recordExchange (ok , exchange , type , durable , autoDelete , arguments );
244244 return ok ;
245245 }
246246
@@ -331,15 +331,7 @@ public AMQP.Queue.DeclareOk queueDeclare() throws IOException {
331331 @ Override
332332 public AMQP .Queue .DeclareOk queueDeclare (String queue , boolean durable , boolean exclusive , boolean autoDelete , Map <String , Object > arguments ) throws IOException {
333333 final AMQP .Queue .DeclareOk ok = delegate .queueDeclare (queue , durable , exclusive , autoDelete , arguments );
334- RecordedQueue q = new RecordedQueue (this , ok .getQueue ()).
335- durable (durable ).
336- exclusive (exclusive ).
337- autoDelete (autoDelete ).
338- arguments (arguments );
339- if (queue .equals (RecordedQueue .EMPTY_STRING )) {
340- q .serverNamed (true );
341- }
342- recordQueue (ok , q );
334+ recordQueue (ok , queue , durable , exclusive , autoDelete , arguments );
343335 return ok ;
344336 }
345337
@@ -714,7 +706,10 @@ public void asyncRpc(Method method) throws IOException {
714706
715707 @ Override
716708 public Command rpc (Method method ) throws IOException {
717- return delegate .rpc (method );
709+ recordOnRpcRequest (method );
710+ AMQCommand response = delegate .rpc (method );
711+ recordOnRpcResponse (response .getMethod (), method );
712+ return response ;
718713 }
719714
720715 /**
@@ -840,6 +835,18 @@ private boolean deleteRecordedExchangeBinding(String destination, String source,
840835 return this .connection .deleteRecordedExchangeBinding (this , destination , source , routingKey , arguments );
841836 }
842837
838+ private void recordQueue (AMQP .Queue .DeclareOk ok , String queue , boolean durable , boolean exclusive , boolean autoDelete , Map <String , Object > arguments ) {
839+ RecordedQueue q = new RecordedQueue (this , ok .getQueue ()).
840+ durable (durable ).
841+ exclusive (exclusive ).
842+ autoDelete (autoDelete ).
843+ arguments (arguments );
844+ if (queue .equals (RecordedQueue .EMPTY_STRING )) {
845+ q .serverNamed (true );
846+ }
847+ recordQueue (ok , q );
848+ }
849+
843850 private void recordQueue (AMQP .Queue .DeclareOk ok , RecordedQueue q ) {
844851 this .connection .recordQueue (ok , q );
845852 }
@@ -852,6 +859,15 @@ private void deleteRecordedQueue(String queue) {
852859 this .connection .deleteRecordedQueue (queue );
853860 }
854861
862+ private void recordExchange (AMQP .Exchange .DeclareOk ok , String exchange , String type , boolean durable , boolean autoDelete , Map <String , Object > arguments ) {
863+ RecordedExchange x = new RecordedExchange (this , exchange ).
864+ type (type ).
865+ durable (durable ).
866+ autoDelete (autoDelete ).
867+ arguments (arguments );
868+ recordExchange (exchange , x );
869+ }
870+
855871 private void recordExchange (String exchange , RecordedExchange x ) {
856872 this .connection .recordExchange (exchange , x );
857873 }
@@ -898,7 +914,82 @@ void updateConsumerTag(String tag, String newTag) {
898914
899915 @ Override
900916 public CompletableFuture <Command > asyncCompletableRpc (Method method ) throws IOException {
901- return this .delegate .asyncCompletableRpc (method );
917+ recordOnRpcRequest (method );
918+ CompletableFuture <Command > future = this .delegate .asyncCompletableRpc (method );
919+ future .thenAccept (command -> {
920+ if (command != null ) {
921+ recordOnRpcResponse (command .getMethod (), method );
922+ }
923+ });
924+ return future ;
925+ }
926+
927+ private void recordOnRpcRequest (Method method ) {
928+ if (method instanceof AMQP .Queue .Delete ) {
929+ deleteRecordedQueue (((AMQP .Queue .Delete ) method ).getQueue ());
930+ } else if (method instanceof AMQP .Exchange .Delete ) {
931+ deleteRecordedExchange (((AMQP .Exchange .Delete ) method ).getExchange ());
932+ } else if (method instanceof AMQP .Queue .Unbind ) {
933+ AMQP .Queue .Unbind unbind = (AMQP .Queue .Unbind ) method ;
934+ deleteRecordedQueueBinding (
935+ unbind .getQueue (), unbind .getExchange (),
936+ unbind .getRoutingKey (), unbind .getArguments ()
937+ );
938+ this .maybeDeleteRecordedAutoDeleteExchange (unbind .getExchange ());
939+ } else if (method instanceof AMQP .Exchange .Unbind ) {
940+ AMQP .Exchange .Unbind unbind = (AMQP .Exchange .Unbind ) method ;
941+ deleteRecordedExchangeBinding (
942+ unbind .getDestination (), unbind .getSource (),
943+ unbind .getRoutingKey (), unbind .getArguments ()
944+ );
945+ this .maybeDeleteRecordedAutoDeleteExchange (unbind .getSource ());
946+ }
947+ }
948+
949+ private void recordOnRpcResponse (Method response , Method request ) {
950+ if (response instanceof AMQP .Queue .DeclareOk ) {
951+ if (request instanceof AMQP .Queue .Declare ) {
952+ AMQP .Queue .DeclareOk ok = (AMQP .Queue .DeclareOk ) response ;
953+ AMQP .Queue .Declare declare = (AMQP .Queue .Declare ) request ;
954+ recordQueue (
955+ ok , declare .getQueue (),
956+ declare .getDurable (), declare .getExclusive (), declare .getAutoDelete (),
957+ declare .getArguments ()
958+ );
959+ } else {
960+ LOGGER .warn ("RPC response {} and RPC request {} not compatible, topology not recorded." ,
961+ response .getClass (), request .getClass ());
962+ }
963+ } else if (response instanceof AMQP .Exchange .DeclareOk ) {
964+ if (request instanceof AMQP .Exchange .Declare ) {
965+ AMQP .Exchange .DeclareOk ok = (AMQP .Exchange .DeclareOk ) response ;
966+ AMQP .Exchange .Declare declare = (AMQP .Exchange .Declare ) request ;
967+ recordExchange (
968+ ok , declare .getExchange (), declare .getType (),
969+ declare .getDurable (), declare .getAutoDelete (),
970+ declare .getArguments ()
971+ );
972+ } else {
973+ LOGGER .warn ("RPC response {} and RPC request {} not compatible, topology not recorded." ,
974+ response .getClass (), request .getClass ());
975+ }
976+ } else if (response instanceof AMQP .Queue .BindOk ) {
977+ if (request instanceof AMQP .Queue .Bind ) {
978+ AMQP .Queue .Bind bind = (AMQP .Queue .Bind ) request ;
979+ recordQueueBinding (bind .getQueue (), bind .getExchange (), bind .getRoutingKey (), bind .getArguments ());
980+ } else {
981+ LOGGER .warn ("RPC response {} and RPC request {} not compatible, topology not recorded." ,
982+ response .getClass (), request .getClass ());
983+ }
984+ } else if (response instanceof AMQP .Exchange .BindOk ) {
985+ if (request instanceof AMQP .Exchange .Bind ) {
986+ AMQP .Exchange .Bind bind = (AMQP .Exchange .Bind ) request ;
987+ recordExchangeBinding (bind .getDestination (), bind .getSource (), bind .getRoutingKey (), bind .getArguments ());
988+ } else {
989+ LOGGER .warn ("RPC response {} and RPC request {} not compatible, topology not recorded." ,
990+ response .getClass (), request .getClass ());
991+ }
992+ }
902993 }
903994
904995 @ Override
0 commit comments