Skip to content

Commit f03a9fd

Browse files
BewareMyPowersrinath-ctds
authored andcommitted
[fix][client][branch-4.0] Partitioned topics are unexpectedly created by client after deletion (apache#24554) (apache#24571)
(cherry picked from commit 16271dc)
1 parent 2b4b96b commit f03a9fd

File tree

4 files changed

+72
-10
lines changed

4 files changed

+72
-10
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3058,7 +3058,7 @@ public void asyncDelete(final DeleteLedgerCallback callback, final Object ctx) {
30583058

30593059
// Truncate to ensure the offloaded data is not orphaned.
30603060
// Also ensures the BK ledgers are deleted and not just scheduled for deletion
3061-
CompletableFuture<Void> truncateFuture = asyncTruncate();
3061+
CompletableFuture<Void> truncateFuture = asyncTruncate(true);
30623062
truncateFuture.whenComplete((ignore, exc) -> {
30633063
if (exc != null) {
30643064
log.error("[{}] Error truncating ledger for deletion", name, exc);
@@ -4461,6 +4461,12 @@ public void setEntriesAddedCounter(long count) {
44614461

44624462
@Override
44634463
public CompletableFuture<Void> asyncTruncate() {
4464+
return asyncTruncate(false);
4465+
}
4466+
4467+
// When asyncTruncate is called by asyncDelete, the argument should be true because cursors will not be accessed
4468+
// after the managed ledger is deleted.
4469+
private CompletableFuture<Void> asyncTruncate(boolean ignoreCursorFailure) {
44644470

44654471
final List<CompletableFuture<Void>> futures = new ArrayList();
44664472
for (ManagedCursor cursor : cursors) {
@@ -4473,7 +4479,12 @@ public void clearBacklogComplete(Object ctx) {
44734479

44744480
@Override
44754481
public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) {
4476-
future.completeExceptionally(exception);
4482+
if (ignoreCursorFailure) {
4483+
log.warn("Failed to clear backlog for cursor {}", cursor.getName(), exception);
4484+
future.complete(null);
4485+
} else {
4486+
future.completeExceptionally(exception);
4487+
}
44774488
}
44784489
}, null);
44794490
futures.add(future);

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import static org.mockito.Mockito.when;
2525
import static org.testng.Assert.assertEquals;
2626
import static org.testng.Assert.assertTrue;
27+
import java.io.Closeable;
2728
import java.net.InetSocketAddress;
29+
import java.time.Duration;
2830
import java.util.List;
2931
import java.util.UUID;
3032
import java.util.concurrent.CompletableFuture;
@@ -36,26 +38,29 @@
3638
import org.apache.pulsar.client.admin.PulsarAdminException;
3739
import org.apache.pulsar.client.api.ClientBuilder;
3840
import org.apache.pulsar.client.api.Consumer;
41+
import org.apache.pulsar.client.api.MessageId;
3942
import org.apache.pulsar.client.api.Producer;
4043
import org.apache.pulsar.client.api.ProducerConsumerBase;
44+
import org.apache.pulsar.client.api.PulsarClient;
4145
import org.apache.pulsar.client.api.PulsarClientException;
4246
import org.apache.pulsar.client.impl.LookupService;
4347
import org.apache.pulsar.client.impl.LookupTopicResult;
4448
import org.apache.pulsar.client.impl.PulsarClientImpl;
4549
import org.apache.pulsar.common.naming.NamespaceName;
50+
import org.apache.pulsar.common.naming.TopicName;
4651
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
4752
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
4853
import org.apache.pulsar.common.policies.data.TopicType;
49-
import org.testng.annotations.AfterMethod;
50-
import org.testng.annotations.BeforeMethod;
54+
import org.testng.annotations.AfterClass;
55+
import org.testng.annotations.BeforeClass;
5156
import org.testng.annotations.Test;
5257

5358
@Test(groups = "broker-admin")
5459
@Slf4j
5560
public class TopicAutoCreationTest extends ProducerConsumerBase {
5661

5762
@Override
58-
@BeforeMethod
63+
@BeforeClass
5964
protected void setup() throws Exception {
6065
conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
6166
conf.setAllowAutoTopicCreation(true);
@@ -71,7 +76,7 @@ protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
7176
}
7277

7378
@Override
74-
@AfterMethod(alwaysRun = true)
79+
@AfterClass(alwaysRun = true)
7580
protected void cleanup() throws Exception {
7681
super.internalCleanup();
7782
}
@@ -87,9 +92,11 @@ public void testPartitionedTopicAutoCreation() throws PulsarAdminException, Puls
8792
.create();
8893

8994
List<String> partitionedTopics = admin.topics().getPartitionedTopicList(namespaceName);
95+
assertTrue(partitionedTopics.contains(topic));
9096
List<String> topics = admin.topics().getList(namespaceName);
91-
assertEquals(partitionedTopics.size(), 1);
92-
assertEquals(topics.size(), 3);
97+
for (int i = 0; i < conf.getDefaultNumPartitions(); i++) {
98+
assertTrue(topics.contains(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + i));
99+
}
93100

94101
producer.close();
95102
for (String t : topics) {
@@ -248,4 +255,48 @@ public void testClientWithAutoCreationGotNotFoundException() throws PulsarAdminE
248255
admin.namespaces().deleteNamespace(namespace, true);
249256
}
250257

258+
@Test
259+
public void testPartitionsNotCreatedAfterDeletion() throws Exception {
260+
@Cleanup final var client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
261+
final var topicName = TopicName.get("my-property/my-ns/testPartitionsNotCreatedAfterDeletion");
262+
final var topic = topicName.toString();
263+
final var interval = Duration.ofSeconds(1);
264+
final ThrowableConsumer<ThrowableSupplier<Closeable>> verifier = creator -> {
265+
admin.topics().createPartitionedTopic(topic, 1);
266+
boolean needCleanup = false;
267+
try (final var ignored = creator.get()) {
268+
admin.topics().terminatePartitionedTopic(topic);
269+
admin.topics().deletePartitionedTopic(topic, true);
270+
Thread.sleep(interval.toMillis() + 500); // wait until the auto update partitions task has run
271+
272+
final var topics = admin.topics().getList(topicName.getNamespace()).stream()
273+
.filter(__ -> __.contains(topicName.getLocalName())).toList();
274+
// Without https://github.com/apache/pulsar/pull/24118, the producer or consumer on partition 0 could be
275+
// automatically created.
276+
if (!topics.isEmpty()) {
277+
assertEquals(topics, List.of(topicName.getPartition(0).toString()));
278+
needCleanup = true;
279+
}
280+
}
281+
if (needCleanup) {
282+
admin.topics().delete(topicName.getPartition(0).toString());
283+
}
284+
};
285+
verifier.accept(() -> client.newProducer().topic(topic)
286+
.autoUpdatePartitionsInterval(interval.toSecondsPart(), TimeUnit.SECONDS).create());
287+
verifier.accept(() -> client.newConsumer().topic(topic).subscriptionName("sub")
288+
.autoUpdatePartitionsInterval(interval.toSecondsPart(), TimeUnit.SECONDS).subscribe());
289+
verifier.accept(() -> client.newReader().topic(topic).startMessageId(MessageId.earliest)
290+
.autoUpdatePartitionsInterval(interval.toSecondsPart(), TimeUnit.SECONDS).create());
291+
}
292+
293+
private interface ThrowableConsumer<T> {
294+
295+
void accept(T value) throws Exception;
296+
}
297+
298+
public interface ThrowableSupplier<T> {
299+
300+
T get() throws Exception;
301+
}
251302
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1401,7 +1401,7 @@ public CompletableFuture<Void> onTopicsExtended(Collection<String> topicsExtende
14011401
private CompletableFuture<Void> subscribeIncreasedTopicPartitions(String topicName) {
14021402
int oldPartitionNumber = partitionedTopics.get(topicName);
14031403

1404-
return client.getPartitionsForTopic(topicName).thenCompose(list -> {
1404+
return client.getPartitionsForTopic(topicName, false).thenCompose(list -> {
14051405
int currentPartitionNumber = Long.valueOf(list.stream()
14061406
.filter(t -> TopicName.get(t).isPartitioned()).count()).intValue();
14071407

pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ public CompletableFuture<Void> onTopicsExtended(Collection<String> topicsExtende
390390
return future;
391391
}
392392

393-
client.getPartitionsForTopic(topic).thenCompose(list -> {
393+
client.getPartitionsForTopic(topic, false).thenCompose(list -> {
394394
int oldPartitionNumber = topicMetadata.numPartitions();
395395
int currentPartitionNumber = list.size();
396396

0 commit comments

Comments
 (0)