@@ -143,7 +143,7 @@ private void setupErrorOnWriteListenerForPotentialRecovery() {
143143 });
144144 }
145145
146- private TopologyRecoveryFilter letAllPassFilter () {
146+ private static TopologyRecoveryFilter letAllPassFilter () {
147147 return new TopologyRecoveryFilter () {};
148148 }
149149
@@ -644,7 +644,7 @@ private void recoverChannels(final RecoveryAwareAMQConnection newConn) {
644644 }
645645 }
646646
647- void recoverChannel (AutorecoveringChannel channel ) throws IOException {
647+ public void recoverChannel (AutorecoveringChannel channel ) throws IOException {
648648 channel .automaticallyRecover (this , this .delegate );
649649 }
650650
@@ -666,6 +666,38 @@ private void notifyTopologyRecoveryListenersStarted() {
666666 }
667667 }
668668
669+ /**
670+ * Recover a closed channel and all topology (i.e. RecordedEntities) associated to it.
671+ * Any errors will be sent to the {@link #getExceptionHandler()}.
672+ * @param channel channel to recover
673+ * @throws IllegalArgumentException if this channel is not owned by this connection
674+ */
675+ public void recoverChannelAndTopology (final AutorecoveringChannel channel ) {
676+ if (!channels .containsValue (channel )) {
677+ throw new IllegalArgumentException ("This channel is not owned by this connection" );
678+ }
679+ try {
680+ LOGGER .debug ("Recovering channel={}" , channel );
681+ recoverChannel (channel );
682+ LOGGER .debug ("Recovered channel={}. Now recovering its topology" , channel );
683+ Utility .copy (recordedExchanges ).values ().stream ()
684+ .filter (e -> e .getChannel () == channel )
685+ .forEach (e -> recoverExchange (e , false ));
686+ Utility .copy (recordedQueues ).values ().stream ()
687+ .filter (q -> q .getChannel () == channel )
688+ .forEach (q -> recoverQueue (q .getName (), q , false ));
689+ Utility .copy (recordedBindings ).stream ()
690+ .filter (b -> b .getChannel () == channel )
691+ .forEach (b -> recoverBinding (b , false ));
692+ Utility .copy (consumers ).values ().stream ()
693+ .filter (c -> c .getChannel () == channel )
694+ .forEach (c -> recoverConsumer (c .getConsumerTag (), c , false ));
695+ LOGGER .debug ("Recovered topology for channel={}" , channel );
696+ } catch (Exception e ) {
697+ getExceptionHandler ().handleChannelRecoveryException (channel , e );
698+ }
699+ }
700+
669701 private void recoverTopology (final ExecutorService executor ) {
670702 // The recovery sequence is the following:
671703 // 1. Recover exchanges
@@ -704,7 +736,7 @@ private void recoverTopology(final ExecutorService executor) {
704736 }
705737 }
706738
707- private void recoverExchange (RecordedExchange x , boolean retry ) {
739+ public void recoverExchange (RecordedExchange x , boolean retry ) {
708740 // recorded exchanges are guaranteed to be non-predefined (we filter out predefined ones in exchangeDeclare). MK.
709741 try {
710742 if (topologyRecoveryFilter .filterExchange (x )) {
@@ -722,7 +754,7 @@ private void recoverExchange(RecordedExchange x, boolean retry) {
722754 } catch (Exception cause ) {
723755 final String message = "Caught an exception while recovering exchange " + x .getName () +
724756 ": " + cause .getMessage ();
725- TopologyRecoveryException e = new TopologyRecoveryException (message , cause );
757+ TopologyRecoveryException e = new TopologyRecoveryException (message , cause , x );
726758 this .getExceptionHandler ().handleTopologyRecoveryException (delegate , x .getDelegateChannel (), e );
727759 }
728760 }
@@ -766,12 +798,12 @@ public void recoverQueue(final String oldName, RecordedQueue q, boolean retry) {
766798 } catch (Exception cause ) {
767799 final String message = "Caught an exception while recovering queue " + oldName +
768800 ": " + cause .getMessage ();
769- TopologyRecoveryException e = new TopologyRecoveryException (message , cause );
801+ TopologyRecoveryException e = new TopologyRecoveryException (message , cause , q );
770802 this .getExceptionHandler ().handleTopologyRecoveryException (delegate , q .getDelegateChannel (), e );
771803 }
772804 }
773805
774- private void recoverBinding (RecordedBinding b , boolean retry ) {
806+ public void recoverBinding (RecordedBinding b , boolean retry ) {
775807 try {
776808 if (this .topologyRecoveryFilter .filterBinding (b )) {
777809 if (retry ) {
@@ -788,7 +820,7 @@ private void recoverBinding(RecordedBinding b, boolean retry) {
788820 } catch (Exception cause ) {
789821 String message = "Caught an exception while recovering binding between " + b .getSource () +
790822 " and " + b .getDestination () + ": " + cause .getMessage ();
791- TopologyRecoveryException e = new TopologyRecoveryException (message , cause );
823+ TopologyRecoveryException e = new TopologyRecoveryException (message , cause , b );
792824 this .getExceptionHandler ().handleTopologyRecoveryException (delegate , b .getDelegateChannel (), e );
793825 }
794826 }
@@ -800,7 +832,7 @@ public void recoverConsumer(final String tag, RecordedConsumer consumer, boolean
800832 String newTag = null ;
801833 if (retry ) {
802834 final RecordedConsumer entity = consumer ;
803- RetryResult retryResult = wrapRetryIfNecessary (consumer , () -> entity . recover () );
835+ RetryResult retryResult = wrapRetryIfNecessary (consumer , entity :: recover );
804836 consumer = (RecordedConsumer ) retryResult .getRecordedEntity ();
805837 newTag = (String ) retryResult .getResult ();
806838 } else {
@@ -824,7 +856,7 @@ public void recoverConsumer(final String tag, RecordedConsumer consumer, boolean
824856 } catch (Exception cause ) {
825857 final String message = "Caught an exception while recovering consumer " + tag +
826858 ": " + cause .getMessage ();
827- TopologyRecoveryException e = new TopologyRecoveryException (message , cause );
859+ TopologyRecoveryException e = new TopologyRecoveryException (message , cause , consumer );
828860 this .getExceptionHandler ().handleTopologyRecoveryException (delegate , consumer .getDelegateChannel (), e );
829861 }
830862 }
@@ -889,14 +921,10 @@ private void recoverEntitiesAsynchronously(ExecutorService executor, Collection<
889921
890922 private <E extends RecordedEntity > List <Callable <Object >> groupEntitiesByChannel (final Collection <E > entities ) {
891923 // map entities by channel
892- final Map <AutorecoveringChannel , List <E >> map = new LinkedHashMap <AutorecoveringChannel , List < E > >();
924+ final Map <AutorecoveringChannel , List <E >> map = new LinkedHashMap <>();
893925 for (final E entity : entities ) {
894926 final AutorecoveringChannel channel = entity .getChannel ();
895- List <E > list = map .get (channel );
896- if (list == null ) {
897- map .put (channel , list = new ArrayList <E >());
898- }
899- list .add (entity );
927+ map .computeIfAbsent (channel , c -> new ArrayList <>()).add (entity );
900928 }
901929 // now create a runnable per channel
902930 final List <Callable <Object >> callables = new ArrayList <>();
@@ -1083,7 +1111,7 @@ boolean hasMoreConsumersOnQueue(Collection<RecordedConsumer> consumers, String q
10831111 }
10841112
10851113 Set <RecordedBinding > removeBindingsWithDestination (String s ) {
1086- final Set <RecordedBinding > result = new HashSet < RecordedBinding >();
1114+ final Set <RecordedBinding > result = new LinkedHashSet < >();
10871115 synchronized (this .recordedBindings ) {
10881116 for (Iterator <RecordedBinding > it = this .recordedBindings .iterator (); it .hasNext (); ) {
10891117 RecordedBinding b = it .next ();
0 commit comments