From 38a80298d285991440a5bc4096dfcf996bb7baa7 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Sat, 16 Mar 2024 09:39:34 +0800 Subject: [PATCH 1/7] [fix] [broker] fix mismatch between dispatcher.consumerList and dispatcher.consumerSet --- ...PersistentDispatcherMultipleConsumers.java | 8 +- ...ProducerConsumerMLInitializeDelayTest.java | 106 ++++++++++++++++++ 2 files changed, 113 insertions(+), 1 deletion(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index be82b190ffb32..1a7433ed28c63 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -190,9 +190,15 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { } if (isConsumersExceededOnSubscription()) { - log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit", name); + log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit {}", + name, consumer); return FutureUtil.failedFuture(new ConsumerBusyException("Subscription reached max consumers limit")); } + if (consumerSet.contains(consumer)) { + log.warn("[{}] Attempting to add a consumer that already registered {}", name, consumer); + return FutureUtil.failedFuture(new ConsumerBusyException("Attempting to add a consumer that already" + + " registered")); + } consumerList.add(consumer); if (consumerList.size() > 1 diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java new file mode 100644 index 0000000000000..e939a8283fd44 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import com.carrotsearch.hppc.ObjectSet; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.common.naming.TopicName; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class SimpleProducerConsumerMLInitializeDelayTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setTopicLoadTimeoutSeconds(60 * 5); + } + + @Test(timeOut = 30 * 1000) + public void testConsumerListMatchesConsumerSet() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String subName = "sub"; + final int clientOperationTimeout = 3; + final int loadMLDelayMillis = clientOperationTimeout * 3 * 1000; + final int clientMaxBackoffSeconds = clientOperationTimeout * 2; + admin.topics().createNonPartitionedTopic(topicName); + // Create a client with a low operation timeout. + PulsarClient client = PulsarClient.builder() + .serviceUrl(lookupUrl.toString()) + .operationTimeout(clientOperationTimeout, TimeUnit.SECONDS) + .maxBackoffInterval(clientMaxBackoffSeconds, TimeUnit.SECONDS) + .build(); + Consumer consumer = client.newConsumer() + .topic(topicName) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + // Inject a delay for the initialization of ML, to make the consumer to register twice. + // Consumer register twice: the first will be timeout, and try again. + AtomicInteger delayTimes = new AtomicInteger(); + mockZooKeeper.delay(loadMLDelayMillis, (op, s) -> { + if (op.toString().equals("GET") && s.contains(TopicName.get(topicName).getPersistenceNamingEncoding())) { + return delayTimes.incrementAndGet() == 1; + } + return false; + }); + admin.topics().unload(topicName); + // Verify: at last, "dispatcher.consumers.size" equals "dispatcher.consumerList.size". + Awaitility.await().atMost(Duration.ofSeconds(clientMaxBackoffSeconds * 3)) + .ignoreExceptions().untilAsserted(() -> { + Dispatcher dispatcher = pulsar.getBrokerService() + .getTopic(topicName, false).join().get() + .getSubscription(subName).getDispatcher(); + ObjectSet consumerSet = WhiteboxImpl.getInternalState(dispatcher, "consumerSet"); + List consumerList = WhiteboxImpl.getInternalState(dispatcher, "consumerList"); + log.info("consumerSet_size: {}, consumerList_size: {}", consumerSet.size(), consumerList.size()); + Assert.assertEquals(consumerList.size(), 1); + Assert.assertEquals(consumerSet.size(), 1); + }); + + // Verify: the topic can be deleted. + consumer.close(); + admin.topics().delete(topicName); + } +} From cbb527e6e63d6d3730750f5c40216ff6524be05a Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 20 Mar 2024 13:57:22 +0800 Subject: [PATCH 2/7] cleanup for the test --- .../client/api/SimpleProducerConsumerMLInitializeDelayTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java index e939a8283fd44..15eef949ba727 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java @@ -102,5 +102,7 @@ public void testConsumerListMatchesConsumerSet() throws Exception { // Verify: the topic can be deleted. consumer.close(); admin.topics().delete(topicName); + // cleanup. + client.close(); } } From 92b7f2cf8520a1e56551a694e2985bda12d20df3 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 21 Mar 2024 21:58:20 +0800 Subject: [PATCH 3/7] address comments --- .../apache/pulsar/broker/service/ServerCnx.java | 16 +++++++++++++--- .../PersistentDispatcherMultipleConsumers.java | 2 -- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 3ab25eb098cdf..e537e0854bf83 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1224,12 +1224,22 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady, "Consumer is already present on the connection"); } else if (existingConsumerFuture.isCompletedExceptionally()){ + log.warn("[{}][{}][{}] A failed consumer with id is already present on the connection," + + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId); ServerError error = getErrorCodeWithErrorLog(existingConsumerFuture, true, - String.format("Consumer subscribe failure. remoteAddress: %s, subscription: %s", + String.format("A failed consumer with id is already present on the connection." + + " remoteAddress: %s, subscription: %s", remoteAddress, subscriptionName)); - consumers.remove(consumerId, existingConsumerFuture); + /** + * This feature may was failed due to the client closed a in-progress subscribing. + * See {@link #handleCloseConsumer(CommandCloseConsumer)} + * Do not remove the failed feature at current line, it will be removed after the progress of + * the previous subscribing is done. + * Before the previous subscribing is done, the new subscribe request will always fail. + * This mechanism is in order to prevent more complex logic to handle the race conditions. + */ commandSender.sendErrorResponse(requestId, error, - "Consumer that failed is already present on the connection"); + "A failed consumer is already present on the connection"); } else { Consumer consumer = existingConsumerFuture.getNow(null); log.warn("[{}] Consumer with the same id is already created:" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 1a7433ed28c63..cccb614afb17d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -196,8 +196,6 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { } if (consumerSet.contains(consumer)) { log.warn("[{}] Attempting to add a consumer that already registered {}", name, consumer); - return FutureUtil.failedFuture(new ConsumerBusyException("Attempting to add a consumer that already" - + " registered")); } consumerList.add(consumer); From 11cd3d8dfd7a9c909c6cb9be61bde2bdd508d3ab Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 21 Mar 2024 22:01:33 +0800 Subject: [PATCH 4/7] address comments --- .../client/api/SimpleProducerConsumerMLInitializeDelayTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java index 15eef949ba727..d503f9a9398ef 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java @@ -87,7 +87,7 @@ public void testConsumerListMatchesConsumerSet() throws Exception { }); admin.topics().unload(topicName); // Verify: at last, "dispatcher.consumers.size" equals "dispatcher.consumerList.size". - Awaitility.await().atMost(Duration.ofSeconds(clientMaxBackoffSeconds * 3)) + Awaitility.await().atMost(Duration.ofSeconds(loadMLDelayMillis * 3)) .ignoreExceptions().untilAsserted(() -> { Dispatcher dispatcher = pulsar.getBrokerService() .getTopic(topicName, false).join().get() From d36a83205235c42e20a72bff1ed6c996e42bb2a0 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 22 Mar 2024 14:34:16 +0800 Subject: [PATCH 5/7] address comments --- .../java/org/apache/pulsar/broker/service/ServerCnx.java | 6 +++--- .../persistent/PersistentDispatcherMultipleConsumers.java | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index e537e0854bf83..2f32af3ecdf2a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1231,15 +1231,15 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { + " remoteAddress: %s, subscription: %s", remoteAddress, subscriptionName)); /** - * This feature may was failed due to the client closed a in-progress subscribing. + * This future may was failed due to the client closed a in-progress subscribing. * See {@link #handleCloseConsumer(CommandCloseConsumer)} - * Do not remove the failed feature at current line, it will be removed after the progress of + * Do not remove the failed future at current line, it will be removed after the progress of * the previous subscribing is done. * Before the previous subscribing is done, the new subscribe request will always fail. * This mechanism is in order to prevent more complex logic to handle the race conditions. */ commandSender.sendErrorResponse(requestId, error, - "A failed consumer is already present on the connection"); + "Consumer that failed is already present on the connection"); } else { Consumer consumer = existingConsumerFuture.getNow(null); log.warn("[{}] Consumer with the same id is already created:" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index cccb614afb17d..bf08274523ff8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -194,6 +194,8 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { name, consumer); return FutureUtil.failedFuture(new ConsumerBusyException("Subscription reached max consumers limit")); } + // This is not an expected scenario, it will never happen in expected. Just print a warn log if the unexpected + // scenario happens. See more detail: https://github.com/apache/pulsar/pull/22283. if (consumerSet.contains(consumer)) { log.warn("[{}] Attempting to add a consumer that already registered {}", name, consumer); } From 1a06fa92e323f7ca6712724c4e4afce6484cc8ce Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Sat, 23 Mar 2024 14:56:22 +0800 Subject: [PATCH 6/7] check style fix --- .../api/SimpleProducerConsumerMLInitializeDelayTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java index d503f9a9398ef..ab4e063ae3d83 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java @@ -27,8 +27,8 @@ import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.common.naming.TopicName; -import org.testcontainers.shaded.org.awaitility.Awaitility; -import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl; +import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; From 1f6d2a0fdb53620f5feebb662b6c8f676fc6440b Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 25 Mar 2024 15:43:05 +0800 Subject: [PATCH 7/7] fix test --- .../java/org/apache/pulsar/broker/service/ServerCnx.java | 4 ++-- .../java/org/apache/pulsar/broker/service/ServerCnxTest.java | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 2f32af3ecdf2a..4f82f416ed2a5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1228,8 +1228,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId); ServerError error = getErrorCodeWithErrorLog(existingConsumerFuture, true, String.format("A failed consumer with id is already present on the connection." - + " remoteAddress: %s, subscription: %s", - remoteAddress, subscriptionName)); + + " consumerId: %s, remoteAddress: %s, subscription: %s", + consumerId, remoteAddress, subscriptionName)); /** * This future may was failed due to the client closed a in-progress subscribing. * See {@link #handleCloseConsumer(CommandCloseConsumer)} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index e195f220f87dd..1cb2f76c5e2b2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -3382,8 +3382,9 @@ public boolean isCompletedExceptionally() { }; // assert error response assertTrue(responseAssert.test(responseAssert)); - // assert consumer-delete event occur - assertEquals(1L, + // The delete event will only occur after the future is completed. + // assert consumer-delete event will not occur. + assertEquals(0L, deleteTimesMark.getAllValues().stream().filter(f -> f == existingConsumerFuture).count()); // Server will not close the connection assertTrue(channel.isOpen());