diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java index c550f7339..d4e5e9b86 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java @@ -17,15 +17,33 @@ // info@rabbitmq.com. package com.rabbitmq.client.amqp.impl; -import static com.rabbitmq.client.amqp.Resource.State.*; +import static com.rabbitmq.client.amqp.Resource.State.CLOSED; +import static com.rabbitmq.client.amqp.Resource.State.CLOSING; +import static com.rabbitmq.client.amqp.Resource.State.OPEN; +import static com.rabbitmq.client.amqp.Resource.State.OPENING; +import static com.rabbitmq.client.amqp.Resource.State.RECOVERING; import static com.rabbitmq.client.amqp.impl.ExceptionUtils.convert; +import static com.rabbitmq.client.amqp.impl.Tuples.pair; import static com.rabbitmq.client.amqp.impl.Utils.supportFilterExpressions; import static com.rabbitmq.client.amqp.impl.Utils.supportSetToken; import static java.lang.System.nanoTime; import static java.time.Duration.ofNanos; -import com.rabbitmq.client.amqp.*; +import com.rabbitmq.client.amqp.Address; +import com.rabbitmq.client.amqp.AmqpException; +import com.rabbitmq.client.amqp.Connection; +import com.rabbitmq.client.amqp.ConnectionSettings; +import com.rabbitmq.client.amqp.Consumer; +import com.rabbitmq.client.amqp.ConsumerBuilder; +import com.rabbitmq.client.amqp.Management; import com.rabbitmq.client.amqp.ObservationCollector; +import com.rabbitmq.client.amqp.Publisher; +import com.rabbitmq.client.amqp.PublisherBuilder; +import com.rabbitmq.client.amqp.RpcClient; +import com.rabbitmq.client.amqp.RpcClientBuilder; +import com.rabbitmq.client.amqp.RpcServer; +import com.rabbitmq.client.amqp.RpcServerBuilder; +import com.rabbitmq.client.amqp.impl.Tuples.Pair; import com.rabbitmq.client.amqp.impl.Utils.RunnableWithException; import com.rabbitmq.client.amqp.impl.Utils.StopWatch; import com.rabbitmq.client.amqp.metrics.MetricsCollector; @@ -89,7 +107,7 @@ final class AmqpConnection extends ResourceBase implements Connection { private final List rpcClients = new CopyOnWriteArrayList<>(); private final List rpcServers = new CopyOnWriteArrayList<>(); private final TopologyListener topologyListener; - private volatile EntityRecovery entityRecovery; + private final EntityRecovery entityRecovery; private final AtomicBoolean recoveringConnection = new AtomicBoolean(false); private final DefaultConnectionSettings connectionSettings; private final Supplier sessionHandlerSupplier; @@ -118,7 +136,9 @@ final class AmqpConnection extends ResourceBase implements Connection { AmqpConnectionBuilder.AmqpRecoveryConfiguration recoveryConfiguration = builder.recoveryConfiguration(); - this.topologyListener = createTopologyListener(builder); + Pair topologyInfra = createTopologyInfrastructure(builder); + this.topologyListener = topologyInfra.v1(); + this.entityRecovery = topologyInfra.v2(); Executor de = builder.dispatchingExecutor() == null @@ -344,20 +364,25 @@ private static String extractNode(org.apache.qpid.protonj2.client.Connection con return node; } - TopologyListener createTopologyListener(AmqpConnectionBuilder builder) { + Pair createTopologyInfrastructure( + AmqpConnectionBuilder builder) { TopologyListener topologyListener; + EntityRecovery entityRecovery; if (builder.recoveryConfiguration().topology()) { RecordingTopologyListener rtl = new RecordingTopologyListener( "topology-listener-connection-" + this.name(), this.environment.recoveryEventLoop()); - this.entityRecovery = new EntityRecovery(this, rtl); + entityRecovery = new EntityRecovery(this, rtl); topologyListener = rtl; } else { topologyListener = TopologyListener.NO_OP; + entityRecovery = null; } - return builder.topologyListener() == null - ? topologyListener - : TopologyListener.compose(List.of(builder.topologyListener(), topologyListener)); + topologyListener = + builder.topologyListener() == null + ? topologyListener + : TopologyListener.compose(List.of(builder.topologyListener(), topologyListener)); + return pair(topologyListener, entityRecovery); } private BiConsumer @@ -455,12 +480,21 @@ private void recoverAfterConnectionFailure( LOGGER.debug("Reconnected '{}' to {}", this.name(), this.currentConnectionLabel()); try { if (recoveryConfiguration.topology()) { + boolean managementPreviouslyClosed = this.management.isClosed(); this.management.init(); LOGGER.debug("Recovering topology of connection '{}'...", this.name()); this.recoverTopology(); this.recoverConsumers(); this.recoverPublishers(); LOGGER.debug("Recovered topology of connection '{}'.", this.name()); + if (managementPreviouslyClosed) { + LOGGER.debug("Management was closed before recovery, closing it again"); + try { + this.closeManagement(); + } catch (Exception e) { + LOGGER.info("Error while (re)closing management after recovery"); + } + } } LOGGER.info( "Recovered connection '{}' to {}", this.name(), this.currentConnectionLabel()); @@ -924,6 +958,22 @@ public String toString() { return this.name(); } + private void authenticate(String username, String password) { + State state = this.state(); + if (state == OPEN) { + LOGGER.debug("Setting new token for connection {}", this.name); + long start = nanoTime(); + ((AmqpManagement) management()).setToken(password); + LOGGER.debug( + "Set new token for connection {} in {} ms", + this.name, + ofNanos(nanoTime() - start).toMillis()); + } else { + LOGGER.debug( + "Could not set new token for connection {} because its state is {}", this.name(), state); + } + } + static class NativeConnectionWrapper { private final org.apache.qpid.protonj2.client.Connection connection; diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java index 1d148dcba..af69eb10c 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java @@ -17,7 +17,9 @@ // info@rabbitmq.com. package com.rabbitmq.client.amqp.impl; -import static com.rabbitmq.client.amqp.Resource.State.*; +import static com.rabbitmq.client.amqp.Resource.State.CLOSED; +import static com.rabbitmq.client.amqp.Resource.State.CLOSING; +import static com.rabbitmq.client.amqp.Resource.State.OPEN; import static com.rabbitmq.client.amqp.impl.AmqpConsumerBuilder.*; import static com.rabbitmq.client.amqp.metrics.MetricsCollector.ConsumeDisposition.*; import static java.time.Duration.ofSeconds; diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java index 6ea2e476f..24eaac0ea 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java @@ -220,8 +220,7 @@ public void close() { "Management is initializing, retry closing later."); } if (this.closed.compareAndSet(false, true)) { - this.state(CLOSED); - this.releaseResources(null); + this.releaseResources(null, CLOSED); if (this.receiver != null) { try { this.receiver.close(); @@ -293,6 +292,7 @@ void init() { this.receiver.openFuture().get(this.rpcTimeout.toMillis(), MILLISECONDS); LOGGER.debug("Management receiver created ({}).", this); this.state(OPEN); + this.closed.set(false); } catch (Exception e) { LOGGER.info("Error during management {} initialization: {}", cName, e.getMessage()); throw ExceptionUtils.convert(e); @@ -381,7 +381,15 @@ private void failRequests(AmqpException exception) { } void releaseResources(AmqpException e) { - this.markUnavailable(); + this.releaseResources(e, null); + } + + void releaseResources(AmqpException e, State state) { + if (state == null) { + this.markUnavailable(); + } else { + this.state(state); + } if (this.receiveLoop != null) { this.receiveLoop.cancel(true); this.receiveLoop = null; @@ -872,4 +880,8 @@ public long messageCount() { return this.messageCount; } } + + boolean isClosed() { + return this.closed.get(); + } } diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/Tuples.java b/src/main/java/com/rabbitmq/client/amqp/impl/Tuples.java similarity index 97% rename from src/test/java/com/rabbitmq/client/amqp/impl/Tuples.java rename to src/main/java/com/rabbitmq/client/amqp/impl/Tuples.java index e6bf34100..62f2b2ae0 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/Tuples.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/Tuples.java @@ -17,7 +17,7 @@ // info@rabbitmq.com. package com.rabbitmq.client.amqp.impl; -public final class Tuples { +final class Tuples { private Tuples() {} diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/ManagementTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/ManagementTest.java index 0817e8f35..385bc3a6f 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/ManagementTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/ManagementTest.java @@ -97,6 +97,22 @@ void receiveLoopShouldStopAfterBeingIdle() { assertThat(management.queueInfo(info2.name())).hasName(info2.name()); } + @Test + void getManagementFromConnectionAfterManagementHasBeenClosed() { + AmqpManagement m1 = (AmqpManagement) connection.management(); + String q = m1.queue().exclusive(true).declare().name(); + assertThat(m1.queueInfo(q)).isEmpty(); + assertThat(m1.isClosed()).isFalse(); + m1.close(); + assertThat(m1.isClosed()).isTrue(); + assertThatThrownBy(() -> m1.queueInfo(q)) + .isInstanceOf(AmqpException.AmqpResourceClosedException.class); + AmqpManagement m2 = (AmqpManagement) connection.management(); + assertThat(m2.isClosed()).isFalse(); + assertThat(m2.queueInfo(q)).isEmpty(); + assertThat(m2).isSameAs(m1); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) @BrokerVersionAtLeast(RABBITMQ_4_1_0) diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/TopologyRecoveryTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/TopologyRecoveryTest.java index bbc91c46b..2aa8f09ae 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/TopologyRecoveryTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/TopologyRecoveryTest.java @@ -27,6 +27,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.rabbitmq.client.amqp.*; +import com.rabbitmq.client.amqp.AmqpException.AmqpResourceClosedException; import java.util.List; import java.util.Set; import java.util.concurrent.*; @@ -656,7 +657,26 @@ void shouldRecoverEvenIfManagementIsClosed() { closeConnectionAndWaitForRecovery(); publisher.publish(publisher.message(), ctx -> {}); assertThat(consumeSync).completes(); - management.queueInfo(queueInfo.name()); + assertThatThrownBy(() -> management.queueInfo(queueInfo.name())) + .isInstanceOf(AmqpResourceClosedException.class); + assertThat(connection.management().queueInfo(queueInfo.name())).isEmpty(); + } + } + + @Test + void managementShouldStayClosedAfterRecoveryIfClosedBefore() { + try (Connection connection = connection()) { + AmqpManagement management = (AmqpManagement) connection.management(); + + String q = management.queue().exclusive(true).declare().name(); + + management.close(); + + closeConnectionAndWaitForRecovery(); + assertThat(management.isClosed()).isTrue(); + assertThatThrownBy(() -> management.queueInfo(q)) + .isInstanceOf(AmqpResourceClosedException.class); + assertThat(connection.management().queueInfo(q)).isEmpty(); } } diff --git a/src/test/java/com/rabbitmq/client/amqp/oauth2/TokenCredentialsManagerTest.java b/src/test/java/com/rabbitmq/client/amqp/oauth2/TokenCredentialsManagerTest.java index 5a3f9b691..166582df6 100644 --- a/src/test/java/com/rabbitmq/client/amqp/oauth2/TokenCredentialsManagerTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/oauth2/TokenCredentialsManagerTest.java @@ -18,8 +18,8 @@ package com.rabbitmq.client.amqp.oauth2; import static com.rabbitmq.client.amqp.impl.TestUtils.waitAtMost; -import static com.rabbitmq.client.amqp.impl.Tuples.pair; import static com.rabbitmq.client.amqp.oauth2.TokenCredentialsManager.DEFAULT_REFRESH_DELAY_STRATEGY; +import static com.rabbitmq.client.amqp.oauth2.Tuples.pair; import static java.time.Duration.ofMillis; import static java.time.Duration.ofSeconds; import static java.util.stream.Collectors.toList; @@ -30,7 +30,6 @@ import com.rabbitmq.client.amqp.impl.Assertions; import com.rabbitmq.client.amqp.impl.TestUtils; import com.rabbitmq.client.amqp.impl.TestUtils.Sync; -import com.rabbitmq.client.amqp.impl.Tuples; import java.time.Duration; import java.time.Instant; import java.util.List; diff --git a/src/test/java/com/rabbitmq/client/amqp/oauth2/Tuples.java b/src/test/java/com/rabbitmq/client/amqp/oauth2/Tuples.java new file mode 100644 index 000000000..f8aa7ab00 --- /dev/null +++ b/src/test/java/com/rabbitmq/client/amqp/oauth2/Tuples.java @@ -0,0 +1,46 @@ +// Copyright (c) 2024 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.client.amqp.oauth2; + +final class Tuples { + + private Tuples() {} + + public static Pair pair(A v1, B v2) { + return new Pair<>(v1, v2); + } + + public static class Pair { + + private final A v1; + private final B v2; + + private Pair(A v1, B v2) { + this.v1 = v1; + this.v2 = v2; + } + + public A v1() { + return this.v1; + } + + public B v2() { + return this.v2; + } + } +}