diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 888668e15b167..3a239aa2e7bbe 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1184,7 +1184,6 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { ServerError error = getErrorCodeWithErrorLog(existingConsumerFuture, true, String.format("Consumer subscribe failure. remoteAddress: %s, subscription: %s", remoteAddress, subscriptionName)); - consumers.remove(consumerId, existingConsumerFuture); commandSender.sendErrorResponse(requestId, error, "Consumer that failed is already present on the connection"); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java index 61c7a98602b69..bdcc0f5c989ad 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java @@ -22,8 +22,13 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.carrotsearch.hppc.ObjectHashSet; +import com.carrotsearch.hppc.ObjectSet; import io.netty.channel.ChannelHandlerContext; +import java.net.SocketAddress; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -38,7 +43,10 @@ import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.schema.SchemaVersion; +import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.awaitility.Awaitility; +import org.mockito.Mockito; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -293,6 +301,182 @@ private void consumerCreatedThenFailsRetryTimeout(String topic) throws Exception mockBrokerService.resetHandleCloseConsumer(); } + @Test + public void testConcurrentCreatedConsumerAndRetryAfterFix() throws Exception { + concurrentConsumerCreatedAndRetry("persistent://prop/use/ns/t1", true); + } + + @Test + public void testConcurrentCreatedConsumerAndRetryBeforeFix() throws Exception { + concurrentConsumerCreatedAndRetry("persistent://prop/use/ns/t1", false); + } + + private void concurrentConsumerCreatedAndRetry(String topic, boolean isFix) throws Exception { + + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress()) + .operationTimeout(3, TimeUnit.SECONDS).build(); + final AtomicInteger subscribeCounter = new AtomicInteger(0); + final AtomicInteger closeConsumerCounter = new AtomicInteger(0); + final AtomicInteger subscribeFinishCounter = new AtomicInteger(0); + + AtomicBoolean canExecute = new AtomicBoolean(false); + + CopyOnWriteArrayList consumerList = new CopyOnWriteArrayList<>(); + ObjectSet consumerSet = new ObjectHashSet<>(); + ConcurrentLongHashMap> consumers = + ConcurrentLongHashMap.>newBuilder() + .expectedItems(8) + .concurrencyLevel(1) + .build(); + org.apache.pulsar.broker.service.Consumer mockConsumer = + Mockito.mock(org.apache.pulsar.broker.service.Consumer.class); + + AtomicReference id = new AtomicReference<>(); + AtomicReference address = new AtomicReference<>(); + + // subscribe 1: Success. Then broker restart, trigger client reconnect + // subscribe 2: Broker not fully success, but client timeout then trigger handleClose and reconnect again + // subscribe 3: Remove the failed existingConsumerFuture, then client reconnect again + // subscribe 4: Execute concurrently with subscribe 2 + mockBrokerService.setHandleSubscribe((ctx, subscribe) -> { + final long requestId = subscribe.getRequestId(); + final long consumerId = subscribe.getConsumerId(); + SocketAddress remoteAddress = ctx.channel().remoteAddress(); + int subscribeCount = subscribeCounter.incrementAndGet(); + + // ensure consumerId and clientAddress is the same during 4 handleSubscribe requests + // so that add consumer is always the same consumer + if (id.get() == null && address.get() == null) { + id.set(consumerId); + address.set(remoteAddress); + } else { + Assert.assertEquals(id.get(), consumerId); + Assert.assertEquals(remoteAddress, address.get()); + } + + CompletableFuture consumerFuture = new CompletableFuture<>(); + CompletableFuture existingConsumerFuture = + consumers.putIfAbsent(consumerId, consumerFuture); + + if (existingConsumerFuture != null) { + if (!existingConsumerFuture.isDone()) { + ctx.writeAndFlush(Commands.newError(requestId, ServerError.ServiceNotReady, + "Consumer is already present on the connection")); + } else if (existingConsumerFuture.isCompletedExceptionally()){ + ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError, + "Consumer that failed is already present on the connection")); + // the fix of pr-20583 + if (!isFix) { + consumers.remove(consumerId, existingConsumerFuture); + } + // subscribe 3 finish + } else { + ctx.writeAndFlush(Commands.newSuccess(requestId)); + } + subscribeFinishCounter.incrementAndGet(); + return; + } + + // simulate add consumer + // must use asyncFuture to simulate, otherwise requests can not be concurrent + CompletableFuture.supplyAsync(() -> { + org.apache.pulsar.broker.service.Consumer consumer = mockConsumer; + consumerSet.add(consumer); + consumerList.add(consumer); + return consumer; + }).thenAccept(consumer -> { + // block second subscribe, and ensure client timeout + // make the subscribe 2 and 4 execute concurrently in broker + if (subscribeCount == 2) { + while (!canExecute.get()) { + // wait until subscribe 4 finish add consumer + } + } + + if (subscribeCount == 4) { + canExecute.set(true); + } + + if (consumerFuture.complete(consumer)) { + ctx.writeAndFlush(Commands.newSuccess(requestId)); + + if (subscribeCount == 1) { + // simulate broker restart and trigger client reconnect + if (consumerSet.removeAll(consumer) == 1) { + consumerList.remove(consumer); + } + consumers.remove(consumerId, consumerFuture); + ctx.writeAndFlush(Commands.newCloseConsumer(consumerId, -1)); + // subscribe 1 finish + } else { + // subscribe 4 finish + } + } else { + // cleared consumer created after timeout on client side + // simulate close consumer + if (consumerSet.removeAll(consumer) == 1) { + consumerList.remove(consumer); + } + consumers.remove(consumerId, consumerFuture); + // subscribe 2 finish + } + }); + subscribeFinishCounter.incrementAndGet(); + }); + + mockBrokerService.setHandleCloseConsumer((ctx, closeConsumer) -> { + long requestId = closeConsumer.getRequestId(); + long consumerId = closeConsumer.getConsumerId(); + closeConsumerCounter.incrementAndGet(); + + CompletableFuture consumerFuture = consumers.get(consumerId); + if (consumerFuture == null) { + ctx.writeAndFlush(Commands.newSuccess(requestId)); + return; + } + + if (!consumerFuture.isDone() && consumerFuture + .completeExceptionally(new IllegalStateException("Closed consumer before creation was complete"))) { + ctx.writeAndFlush(Commands.newSuccess(requestId)); + return; + } + + if (consumerFuture.isCompletedExceptionally()) { + ctx.writeAndFlush(Commands.newSuccess(requestId)); + return; + } + + Assert.fail("should not go here"); + // simulate close consumer + org.apache.pulsar.broker.service.Consumer consumer = consumerFuture.getNow(null); + if (consumerSet.removeAll(consumer) == 1) { + consumerList.remove(consumer); + } + consumers.remove(consumerId, consumerFuture); + ctx.writeAndFlush(Commands.newSuccess(requestId)); + }); + + // Create consumer (subscribe) should succeed then upon closure, it should reattempt creation. + client.newConsumer().topic(topic).subscriptionName("test").subscribe(); + + Awaitility.await().until(() -> closeConsumerCounter.get() == 1); + Awaitility.await().until(() -> subscribeFinishCounter.get() == 4); + + mockBrokerService.resetHandleSubscribe(); + mockBrokerService.resetHandleCloseConsumer(); + + if (isFix) { + Assert.assertEquals(consumers.size(), 1); + Assert.assertEquals(consumerSet.size(), 1); + Assert.assertEquals(consumerList.size(), 1); + } else { + Assert.assertEquals(consumers.size(), 1); + Assert.assertEquals(consumerSet.size(), 0); + Assert.assertEquals(consumerList.size(), 1); + } + } + @Test public void testProducerFailDoesNotFailOtherProducer() throws Exception { producerFailDoesNotFailOtherProducer("persistent://prop/use/ns/t1", "persistent://prop/use/ns/t2");