Skip to content

Commit 8a40b30

Browse files
authored
[fix][broker] Closed topics won't be removed from the cache (#23884)
1 parent 144fe2e commit 8a40b30

File tree

4 files changed

+188
-46
lines changed

4 files changed

+188
-46
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.function.ToLongFunction;
4949
import javax.annotation.Nonnull;
5050
import lombok.Getter;
51+
import lombok.Setter;
5152
import org.apache.bookkeeper.mledger.util.StatsBuckets;
5253
import org.apache.commons.collections4.CollectionUtils;
5354
import org.apache.commons.collections4.MapUtils;
@@ -96,6 +97,13 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener {
9697

9798
protected final String topic;
9899

100+
// Reference to the CompletableFuture returned when creating this topic in BrokerService.
101+
// Used to safely remove the topic from BrokerService's cache by ensuring we remove the exact
102+
// topic instance that was created.
103+
@Getter
104+
@Setter
105+
protected volatile CompletableFuture<Optional<Topic>> createFuture;
106+
99107
// Producers currently connected to this topic
100108
protected final ConcurrentHashMap<String, Producer> producers;
101109

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 12 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1326,6 +1326,7 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
13261326
NonPersistentTopic nonPersistentTopic;
13271327
try {
13281328
nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class);
1329+
nonPersistentTopic.setCreateFuture(topicFuture);
13291330
} catch (Throwable e) {
13301331
log.warn("Failed to create topic {}", topic, e);
13311332
topicFuture.completeExceptionally(e);
@@ -1800,6 +1801,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
18001801
PersistentTopic persistentTopic = isSystemTopic(topic)
18011802
? new SystemTopic(topic, ledger, BrokerService.this)
18021803
: newTopic(topic, ledger, BrokerService.this, PersistentTopic.class);
1804+
persistentTopic.setCreateFuture(topicFuture);
18031805
persistentTopic
18041806
.initialize()
18051807
.thenCompose(__ -> persistentTopic.preCreateSubscriptionForCompactionIfNeeded())
@@ -2409,47 +2411,18 @@ public AuthorizationService getAuthorizationService() {
24092411
return authorizationService;
24102412
}
24112413

2412-
public CompletableFuture<Void> removeTopicFromCache(Topic topic) {
2413-
Optional<CompletableFuture<Optional<Topic>>> createTopicFuture = findTopicFutureInCache(topic);
2414-
if (createTopicFuture.isEmpty()){
2415-
return CompletableFuture.completedFuture(null);
2416-
}
2417-
return removeTopicFutureFromCache(topic.getName(), createTopicFuture.get());
2418-
}
2419-
2420-
private Optional<CompletableFuture<Optional<Topic>>> findTopicFutureInCache(Topic topic){
2421-
if (topic == null){
2422-
return Optional.empty();
2423-
}
2424-
final CompletableFuture<Optional<Topic>> createTopicFuture = topics.get(topic.getName());
2425-
// If not exists in cache, do nothing.
2426-
if (createTopicFuture == null){
2427-
return Optional.empty();
2428-
}
2429-
// If the future in cache is not yet complete, the topic instance in the cache is not the same with the topic.
2430-
if (!createTopicFuture.isDone()){
2431-
return Optional.empty();
2432-
}
2433-
// If the future in cache has exception complete,
2434-
// the topic instance in the cache is not the same with the topic.
2435-
if (createTopicFuture.isCompletedExceptionally()){
2436-
return Optional.empty();
2437-
}
2438-
Optional<Topic> optionalTopic = createTopicFuture.join();
2439-
Topic topicInCache = optionalTopic.orElse(null);
2440-
if (topicInCache == null || topicInCache != topic){
2441-
return Optional.empty();
2442-
} else {
2443-
return Optional.of(createTopicFuture);
2444-
}
2445-
}
2446-
2447-
private CompletableFuture<Void> removeTopicFutureFromCache(String topic,
2448-
CompletableFuture<Optional<Topic>> createTopicFuture) {
2449-
TopicName topicName = TopicName.get(topic);
2414+
/**
2415+
* Removes the topic from the cache only if the topicName and associated createFuture match exactly.
2416+
* The TopicEvent.UNLOAD event will be triggered before and after removal.
2417+
*
2418+
* @param topic The topic to be removed.
2419+
* @return A CompletableFuture that completes when the operation is done.
2420+
*/
2421+
public CompletableFuture<Void> removeTopicFromCache(AbstractTopic topic) {
2422+
TopicName topicName = TopicName.get(topic.getName());
24502423
return pulsar.getNamespaceService().getBundleAsync(topicName)
24512424
.thenAccept(namespaceBundle -> {
2452-
removeTopicFromCache(topic, namespaceBundle, createTopicFuture);
2425+
removeTopicFromCache(topic.getName(), namespaceBundle, topic.getCreateFuture());
24532426
});
24542427
}
24552428

pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,25 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
109109
private final AbortedTxnProcessor.SnapshotType snapshotType;
110110
private final MaxReadPositionCallBack maxReadPositionCallBack;
111111

112+
private static AbortedTxnProcessor createSnapshotProcessor(PersistentTopic topic) {
113+
return topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled()
114+
? new SnapshotSegmentAbortedTxnProcessorImpl(topic)
115+
: new SingleSnapshotAbortedTxnProcessorImpl(topic);
116+
}
117+
118+
private static AbortedTxnProcessor.SnapshotType determineSnapshotType(PersistentTopic topic) {
119+
return topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled()
120+
? AbortedTxnProcessor.SnapshotType.Segment
121+
: AbortedTxnProcessor.SnapshotType.Single;
122+
}
123+
112124
public TopicTransactionBuffer(PersistentTopic topic) {
125+
this(topic, createSnapshotProcessor(topic), determineSnapshotType(topic));
126+
}
127+
128+
@VisibleForTesting
129+
TopicTransactionBuffer(PersistentTopic topic, AbortedTxnProcessor snapshotAbortedTxnProcessor,
130+
AbortedTxnProcessor.SnapshotType snapshotType) {
113131
super(State.None);
114132
this.topic = topic;
115133
this.timer = topic.getBrokerService().getPulsar().getTransactionTimer();
@@ -118,13 +136,8 @@ public TopicTransactionBuffer(PersistentTopic topic) {
118136
this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
119137
.getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
120138
this.maxReadPosition = topic.getManagedLedger().getLastConfirmedEntry();
121-
if (topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled()) {
122-
snapshotAbortedTxnProcessor = new SnapshotSegmentAbortedTxnProcessorImpl(topic);
123-
snapshotType = AbortedTxnProcessor.SnapshotType.Segment;
124-
} else {
125-
snapshotAbortedTxnProcessor = new SingleSnapshotAbortedTxnProcessorImpl(topic);
126-
snapshotType = AbortedTxnProcessor.SnapshotType.Single;
127-
}
139+
this.snapshotAbortedTxnProcessor = snapshotAbortedTxnProcessor;
140+
this.snapshotType = snapshotType;
128141
this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack();
129142
this.recover();
130143
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.transaction.buffer.impl;
20+
21+
import static org.mockito.Mockito.doAnswer;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.when;
24+
import static org.testng.Assert.assertFalse;
25+
import static org.testng.Assert.assertTrue;
26+
import java.io.IOException;
27+
import java.util.Optional;
28+
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.CountDownLatch;
30+
import java.util.concurrent.TimeUnit;
31+
import lombok.SneakyThrows;
32+
import lombok.extern.slf4j.Slf4j;
33+
import org.apache.bookkeeper.mledger.ManagedLedger;
34+
import org.apache.pulsar.broker.BrokerTestUtil;
35+
import org.apache.pulsar.broker.service.BrokerService;
36+
import org.apache.pulsar.broker.service.Topic;
37+
import org.apache.pulsar.broker.service.TopicFactory;
38+
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
39+
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
40+
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
41+
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
42+
import org.apache.pulsar.client.api.ProducerConsumerBase;
43+
import org.awaitility.Awaitility;
44+
import org.testng.annotations.AfterClass;
45+
import org.testng.annotations.BeforeClass;
46+
import org.testng.annotations.Test;
47+
48+
@Slf4j
49+
@Test(groups = "broker")
50+
public class TransactionPersistentTopicTest extends ProducerConsumerBase {
51+
52+
private static CountDownLatch topicInitSuccessSignal = new CountDownLatch(1);
53+
54+
@BeforeClass(alwaysRun = true)
55+
@Override
56+
protected void setup() throws Exception {
57+
// Intercept when the `topicFuture` is about to complete and wait until the topic close operation finishes.
58+
conf.setTopicFactoryClassName(MyTopicFactory.class.getName());
59+
conf.setTransactionCoordinatorEnabled(true);
60+
conf.setBrokerDeduplicationEnabled(false);
61+
super.internalSetup();
62+
super.producerBaseSetup();
63+
}
64+
65+
@AfterClass(alwaysRun = true)
66+
@Override
67+
protected void cleanup() throws Exception {
68+
super.internalCleanup();
69+
}
70+
71+
@Test
72+
public void testNoOrphanClosedTopicIfTxnInternalFailed() {
73+
String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp2");
74+
75+
BrokerService brokerService = pulsar.getBrokerService();
76+
77+
// 1. Mock close topic when create transactionBuffer
78+
TransactionBufferProvider mockTransactionBufferProvider = originTopic -> {
79+
AbortedTxnProcessor abortedTxnProcessor = mock(AbortedTxnProcessor.class);
80+
doAnswer(invocation -> {
81+
topicInitSuccessSignal.await();
82+
return CompletableFuture.failedFuture(new RuntimeException("Mock recovery failed"));
83+
}).when(abortedTxnProcessor).recoverFromSnapshot();
84+
when(abortedTxnProcessor.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
85+
return new TopicTransactionBuffer(
86+
(PersistentTopic) originTopic, abortedTxnProcessor, AbortedTxnProcessor.SnapshotType.Single);
87+
};
88+
TransactionBufferProvider originalTransactionBufferProvider = pulsar.getTransactionBufferProvider();
89+
pulsar.setTransactionBufferProvider(mockTransactionBufferProvider);
90+
91+
// 2. Trigger create topic and assert topic load success.
92+
CompletableFuture<Optional<Topic>> firstLoad = brokerService.getTopic(tpName, true);
93+
Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS)
94+
.pollInterval(200, TimeUnit.MILLISECONDS)
95+
.untilAsserted(() -> {
96+
assertTrue(firstLoad.isDone());
97+
assertFalse(firstLoad.isCompletedExceptionally());
98+
});
99+
100+
// 3. Assert topic removed from cache
101+
Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS)
102+
.pollInterval(500, TimeUnit.MILLISECONDS)
103+
.untilAsserted(() -> {
104+
assertFalse(brokerService.getTopics().containsKey(tpName));
105+
});
106+
107+
// 4. Set txn provider to back
108+
pulsar.setTransactionBufferProvider(originalTransactionBufferProvider);
109+
}
110+
111+
public static class MyTopicFactory implements TopicFactory {
112+
@Override
113+
public <T extends Topic> T create(String topic, ManagedLedger ledger, BrokerService brokerService,
114+
Class<T> topicClazz) {
115+
try {
116+
if (topicClazz == NonPersistentTopic.class) {
117+
return (T) new NonPersistentTopic(topic, brokerService);
118+
} else {
119+
return (T) new MyPersistentTopic(topic, ledger, brokerService);
120+
}
121+
} catch (Exception e) {
122+
throw new IllegalStateException(e);
123+
}
124+
}
125+
126+
@Override
127+
public void close() throws IOException {
128+
// No-op
129+
}
130+
}
131+
132+
public static class MyPersistentTopic extends PersistentTopic {
133+
134+
public MyPersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) {
135+
super(topic, ledger, brokerService);
136+
}
137+
138+
@SneakyThrows
139+
@Override
140+
public CompletableFuture<Void> checkDeduplicationStatus() {
141+
topicInitSuccessSignal.countDown();
142+
// Sleep 1s pending txn buffer recover failed and close topic
143+
Thread.sleep(1000);
144+
return CompletableFuture.completedFuture(null);
145+
}
146+
}
147+
148+
}

0 commit comments

Comments
 (0)