4747
4848final class AmqpConnection extends ResourceBase implements Connection {
4949
50+ /** Connection-related issues */
51+ private static final Predicate <Exception > CONNECTION_EXCEPTION_PREDICATE =
52+ e -> e instanceof AmqpException .AmqpConnectionException ;
53+
54+ /**
55+ * Issues related to underlying resources.
56+ *
57+ * <p>E.g. the connection used for enforcing affinity gets closed, the management is marked as
58+ * unavailable and throws an invalid state exception when it is called. The recovery process
59+ * should restart.
60+ */
61+ private static final Predicate <Exception > RESOURCE_INVALID_STATE_PREDICATE =
62+ e ->
63+ e instanceof AmqpException .AmqpResourceInvalidStateException
64+ && !(e instanceof AmqpException .AmqpResourceClosedException );
65+
5066 private static final Predicate <Exception > RECOVERY_PREDICATE =
51- t -> t instanceof AmqpException . AmqpConnectionException ;
67+ CONNECTION_EXCEPTION_PREDICATE . or ( RESOURCE_INVALID_STATE_PREDICATE ) ;
5268
5369 private static final AtomicLong ID_SEQUENCE = new AtomicLong (0 );
5470
@@ -306,14 +322,15 @@ TopologyListener createTopologyListener(AmqpConnectionBuilder builder) {
306322 // nothing to do in this listener
307323 return ;
308324 }
325+ AmqpException exception = ExceptionUtils .convert (event .failureCause ());
326+ LOGGER .debug ("Converted native exception to {}" , exception .getClass ().getSimpleName ());
309327 if (this .recoveringConnection .get ()) {
310328 LOGGER .debug (
311329 "Filtering recovery task scheduling, connection recovery of '{}' already in progress" ,
312330 this .name ());
331+ this .releaseManagementResources (exception );
313332 return ;
314333 }
315- AmqpException exception = ExceptionUtils .convert (event .failureCause ());
316- LOGGER .debug ("Converted native exception to {}" , exception .getClass ().getSimpleName ());
317334
318335 if (RECOVERY_PREDICATE .test (exception ) && this .state () != OPENING ) {
319336 LOGGER .debug (
@@ -327,6 +344,8 @@ TopologyListener createTopologyListener(AmqpConnectionBuilder builder) {
327344 if (!this .recoveringConnection .get ()) {
328345 recoverAfterConnectionFailure (
329346 recoveryConfiguration , name , exception , resultReference );
347+ } else {
348+ this .releaseManagementResources (exception );
330349 }
331350 });
332351 } else {
@@ -345,11 +364,11 @@ TopologyListener createTopologyListener(AmqpConnectionBuilder builder) {
345364 private void recoverAfterConnectionFailure (
346365 AmqpConnectionBuilder .AmqpRecoveryConfiguration recoveryConfiguration ,
347366 String connectionName ,
348- Exception failureCause ,
367+ AmqpException failureCause ,
349368 AtomicReference <BiConsumer <org .apache .qpid .protonj2 .client .Connection , DisconnectionEvent >>
350369 disconnectedHandlerReference ) {
351370 LOGGER .info (
352- "Connection '{}' to '{}' has been disconnected, trying to recover ." ,
371+ "Connection '{}' to '{}' has been disconnected, initializing recovery ." ,
353372 this .name (),
354373 this .currentConnectionLabel ());
355374 LOGGER .debug ("Notifying listeners of connection '{}'." , this .name ());
@@ -360,11 +379,11 @@ private void recoverAfterConnectionFailure(
360379 this .nativeSession = null ;
361380 this .connectionAddress = null ;
362381 LOGGER .debug ("Releasing management resource of connection '{}'." , this .name ());
363- this .releaseManagementResources ();
382+ this .releaseManagementResources (failureCause );
364383 CompletableFuture <NativeConnectionWrapper > ncwFuture ;
365384 if (this .recoveringConnection .compareAndSet (false , true )) {
366385 this .recoveringConnection .set (true );
367- LOGGER .debug ("Connection attempt for '{}'." , this .name ());
386+ LOGGER .debug ("Scheduling connection attempt for '{}'." , this .name ());
368387 ncwFuture =
369388 recoverNativeConnection (
370389 recoveryConfiguration , connectionName , disconnectedHandlerReference );
@@ -378,7 +397,6 @@ private void recoverAfterConnectionFailure(
378397 ncw -> {
379398 this .sync (ncw );
380399 LOGGER .debug ("Reconnected '{}' to {}" , this .name (), this .currentConnectionLabel ());
381- this .recoveringConnection .set (false );
382400 try {
383401 if (recoveryConfiguration .topology ()) {
384402 this .management .init ();
@@ -391,15 +409,17 @@ private void recoverAfterConnectionFailure(
391409 LOGGER .info (
392410 "Recovered connection '{}' to {}" , this .name (), this .currentConnectionLabel ());
393411 this .state (OPEN );
412+ this .recoveringConnection .set (false );
394413 } catch (Exception ex ) {
395414 // likely InterruptedException or IO exception
396415 LOGGER .warn (
397416 "Error while trying to recover topology for connection '{}': {}" ,
398417 this .name (),
399418 ex .getMessage ());
400- if (RECOVERY_PREDICATE .test (ex )) {
419+ AmqpException amqpException = ExceptionUtils .convert (ex );
420+ if (RECOVERY_PREDICATE .test (amqpException )) {
401421 LOGGER .debug (
402- "Error during topology recoverable , queueing recovery task for '{}', error is {}" ,
422+ "Error during topology recovery , queueing recovery task for '{}', error is {}" ,
403423 this .name (),
404424 ex .getMessage ());
405425 this .environment
@@ -408,7 +428,10 @@ private void recoverAfterConnectionFailure(
408428 () -> {
409429 if (!this .recoveringConnection .get ()) {
410430 recoverAfterConnectionFailure (
411- recoveryConfiguration , name , ex , disconnectedHandlerReference );
431+ recoveryConfiguration ,
432+ name ,
433+ amqpException ,
434+ disconnectedHandlerReference );
412435 }
413436 });
414437 }
@@ -452,14 +475,17 @@ private CompletableFuture<NativeConnectionWrapper> recoverNativeConnection(
452475 public <T > T maybeRetry (Supplier <T > task ) {
453476 return RetryUtils .callAndMaybeRetry (
454477 task ::get ,
455- e -> true ,
478+ // no need to retry if the connection is closed
479+ // the affinity task will fail and AsyncRetry will take care
480+ // of retrying later
481+ e -> RECOVERY_PREDICATE .negate ().test (e ),
456482 Duration .ofMillis (10 ),
457483 5 ,
458484 "Connection affinity operation" );
459485 }
460486 },
461487 connectionName ))
462- .description ("Trying to create native connection for '%s'." , connectionName )
488+ .description ("Recovering native connection for '%s'." , connectionName )
463489 .delayPolicy (recoveryConfiguration .backOffDelayPolicy ())
464490 .retry (RECOVERY_PREDICATE )
465491 .scheduler (this .scheduledExecutorService ())
@@ -493,6 +519,24 @@ private void recoverConsumers() throws InterruptedException {
493519 consumer .queue (),
494520 ex );
495521 throw ex ;
522+ } catch (AmqpException .AmqpResourceClosedException ex ) {
523+ if (ExceptionUtils .noRunningStreamMemberOnNode (ex )) {
524+ LOGGER .warn (
525+ "Could not recover consumer {} (queue '{}') because there is "
526+ + "running stream member on the node, restarting recovery" ,
527+ consumer .id (),
528+ consumer .queue (),
529+ ex );
530+ throw new AmqpException .AmqpConnectionException (
531+ "No running stream member on the node" , ex );
532+ } else {
533+ LOGGER .warn (
534+ "Error while trying to recover consumer {} (queue '{}')" ,
535+ consumer .id (),
536+ consumer .queue (),
537+ ex );
538+ failedConsumers .add (consumer );
539+ }
496540 } catch (Exception ex ) {
497541 LOGGER .warn (
498542 "Error while trying to recover consumer {} (queue '{}')" ,
@@ -521,6 +565,13 @@ private void recoverPublishers() throws InterruptedException {
521565 publisher .state (OPEN );
522566 LOGGER .debug (
523567 "Recovered publisher {} (address '{}')" , publisher .id (), publisher .address ());
568+ } catch (AmqpException .AmqpConnectionException ex ) {
569+ LOGGER .warn (
570+ "Connection error while trying to recover publisher {} (address '{}'), restarting recovery" ,
571+ publisher .id (),
572+ publisher .address (),
573+ ex );
574+ throw ex ;
524575 } catch (Exception ex ) {
525576 LOGGER .warn (
526577 "Error while trying to recover publisher {} (address '{}')" ,
@@ -538,9 +589,9 @@ private void closeManagement() {
538589 this .management .close ();
539590 }
540591
541- private void releaseManagementResources () {
592+ private void releaseManagementResources (AmqpException e ) {
542593 if (this .management != null ) {
543- this .management .releaseResources ();
594+ this .management .releaseResources (e );
544595 }
545596 }
546597
@@ -743,21 +794,43 @@ private void close(Throwable cause) {
743794 for (AmqpConsumer consumer : this .consumers ) {
744795 consumer .close (cause );
745796 }
797+ boolean locked = false ;
746798 try {
747- this .dispatchingExecutorService . shutdownNow ( );
748- } catch ( Exception e ) {
749- LOGGER .info (
750- "Error while shutting down dispatching executor service for connection '{}': {}" ,
751- this . name (),
752- e . getMessage () );
799+ locked = this .instanceLock . tryLock ( 1 , TimeUnit . SECONDS );
800+ if (! locked ) {
801+ LOGGER .info ("Could not acquire connection lock during closing" );
802+ }
803+ } catch ( InterruptedException e ) {
804+ LOGGER . info ( "Interrupted while waiting for connection lock" );
753805 }
754806 try {
755- org .apache .qpid .protonj2 .client .Connection nc = this .nativeConnection ;
756- if (nc != null ) {
757- nc .close ();
807+ ExecutorService es = this .dispatchingExecutorService ;
808+ if (es != null ) {
809+ try {
810+ es .shutdownNow ();
811+ } catch (Exception e ) {
812+ LOGGER .info (
813+ "Error while shutting down dispatching executor service for connection '{}': {}" ,
814+ this .name (),
815+ e .getMessage ());
816+ }
817+ }
818+ try {
819+ org .apache .qpid .protonj2 .client .Connection nc = this .nativeConnection ;
820+ if (nc != null ) {
821+ nc .close ();
822+ }
823+ } catch (Exception e ) {
824+ LOGGER .warn ("Error while closing native connection" , e );
825+ }
826+ } finally {
827+ if (locked ) {
828+ try {
829+ this .instanceLock .unlock ();
830+ } catch (Exception e ) {
831+ LOGGER .debug ("Error while releasing connection lock: {}" , e .getMessage ());
832+ }
758833 }
759- } catch (Exception e ) {
760- LOGGER .warn ("Error while closing native connection" , e );
761834 }
762835 this .state (CLOSED , cause );
763836 this .environment .metricsCollector ().closeConnection ();
0 commit comments