Skip to content

Commit 96b030c

Browse files
committed
[tests] Add tests about automatic purge of aborted TXs
1 parent f878fc6 commit 96b030c

File tree

5 files changed

+82
-14
lines changed

5 files changed

+82
-14
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.concurrent.Executor;
4949
import java.util.concurrent.ExecutorService;
5050
import java.util.concurrent.TimeUnit;
51+
import java.util.concurrent.atomic.AtomicBoolean;
5152
import java.util.concurrent.atomic.AtomicLong;
5253
import java.util.function.Consumer;
5354
import lombok.AllArgsConstructor;
@@ -153,6 +154,8 @@ public CompletableFuture<KeyValueSchemaIds> getSchemaIds(String topic, BytesSche
153154

154155
private volatile EntryFormatter entryFormatter;
155156

157+
private volatile AtomicBoolean unloaded = new AtomicBoolean();
158+
156159
public PartitionLog(KafkaServiceConfiguration kafkaConfig,
157160
RequestStats requestStats,
158161
Time time,
@@ -208,6 +211,10 @@ public boolean isInitialisationFailed() {
208211
return initFuture.isDone() && initFuture.isCompletedExceptionally();
209212
}
210213

214+
public void markAsUnloaded() {
215+
unloaded.set(true);
216+
}
217+
211218
private CompletableFuture<Void> loadTopicProperties() {
212219
CompletableFuture<Optional<PersistentTopic>> persistentTopicFuture =
213220
kafkaTopicLookupService.getTopic(fullPartitionName, this);
@@ -1101,12 +1108,20 @@ public CompletableFuture<?> updatePurgeAbortedTxnsOffset() {
11011108
// nothing to do
11021109
return CompletableFuture.completedFuture(null);
11031110
}
1111+
if (unloaded.get()) {
1112+
// nothing to do
1113+
return CompletableFuture.completedFuture(null);
1114+
}
11041115
return fetchOldestAvailableIndexFromTopic()
11051116
.thenAccept(offset ->
11061117
producerStateManager.updateAbortedTxnsPurgeOffset(offset));
11071118

11081119
}
11091120
public CompletableFuture<Long> fetchOldestAvailableIndexFromTopic() {
1121+
if (unloaded.get()) {
1122+
return FutureUtil.failedFuture(new NotLeaderOrFollowerException());
1123+
}
1124+
11101125
final CompletableFuture<Long> future = new CompletableFuture<>();
11111126

11121127
// The future that is returned by getTopicConsumerManager is always completed normally
@@ -1122,15 +1137,17 @@ public CompletableFuture<Long> fetchOldestAvailableIndexFromTopic() {
11221137
}
11231138
});
11241139

1125-
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) tcm.getManagedLedger();
1126-
if (managedLedger.getNumberOfEntries() == 0) {
1140+
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
1141+
long numberOfEntries = managedLedger.getNumberOfEntries();
1142+
if (numberOfEntries == 0) {
11271143
long currentOffset = MessageMetadataUtils.getCurrentOffset(managedLedger);
11281144
log.info("First offset for topic {} is {} as the topic is empty (numberOfEntries=0)",
11291145
fullPartitionName, currentOffset);
11301146
future.complete(currentOffset);
11311147

11321148
return future;
11331149
}
1150+
log.info("{} numberOfEntries={}", fullPartitionName, numberOfEntries);
11341151
// this is a DUMMY entry with -1
11351152
PositionImpl firstPosition = managedLedger.getFirstPosition();
11361153
// look for the first entry with data
@@ -1430,4 +1447,7 @@ private void decodeEntriesForRecovery(final CompletableFuture<DecodeResult> futu
14301447
);
14311448
}
14321449

1450+
public boolean isUnloaded() {
1451+
return unloaded.get();
1452+
}
14331453
}

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogManager.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public PartitionLog getLog(TopicPartition topicPartition, String namespacePrefix
8686
if (error != null) {
8787
// in case of failure we have to remove the CompletableFuture from the map
8888
log.error("Recovery of {} failed", key, error);
89+
partitionLog.markAsUnloaded();
8990
logMap.remove(key, partitionLog);
9091
}
9192
});
@@ -94,14 +95,19 @@ public PartitionLog getLog(TopicPartition topicPartition, String namespacePrefix
9495
});
9596
if (res.isInitialisationFailed()) {
9697
log.error("Recovery of {} failed", kopTopic, res);
98+
res.markAsUnloaded();
9799
logMap.remove(kopTopic, res);
98100
}
99101
return res;
100102
}
101103

102104
public PartitionLog removeLog(String topicName) {
103105
log.info("removePartitionLog {}", topicName);
104-
return logMap.remove(topicName);
106+
PartitionLog exists = logMap.remove(topicName);
107+
if (exists != null) {
108+
exists.markAsUnloaded();
109+
}
110+
return exists;
105111
}
106112

107113
public int size() {

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ public void safeRun() {
150150

151151
void updateAbortedTxnsPurgeOffset(long abortedTxnsPurgeOffset) {
152152
if (log.isDebugEnabled()) {
153-
log.debug("{} updateAbortedTxnsPurgeOffset {}", topicPartition, abortedTxnsPurgeOffset);
153+
log.debug("{} updateAbortedTxnsPurgeOffset offset={}", topicPartition, abortedTxnsPurgeOffset);
154154
}
155155
if (abortedTxnsPurgeOffset < 0) {
156156
return;
@@ -164,7 +164,11 @@ long maybePurgeAbortedTx() {
164164
}
165165
long now = System.currentTimeMillis();
166166
long deltaFromLast = (now - lastPurgeAbortedTxnTime) / 1000;
167-
if (deltaFromLast < kafkaTxnPurgeAbortedTxnIntervalSeconds) {
167+
if (log.isDebugEnabled()) {
168+
log.debug("maybePurgeAbortedTx deltaFromLast {} vs kafkaTxnPurgeAbortedTxnIntervalSeconds {} ",
169+
deltaFromLast, kafkaTxnPurgeAbortedTxnIntervalSeconds);
170+
}
171+
if (deltaFromLast > kafkaTxnPurgeAbortedTxnIntervalSeconds) {
168172
return 0;
169173
}
170174
lastPurgeAbortedTxnTime = now;
@@ -340,6 +344,10 @@ public long purgeAbortedTxns(long offset) {
340344
if (toRemove) {
341345
log.info("Transaction {} can be removed (lastOffset < {})", tx, tx.lastOffset(), offset);
342346
count.incrementAndGet();
347+
} else {
348+
if (log.isDebugEnabled()) {
349+
log.info("Transaction {} cannot be removed (lastOffset >= {})", tx, tx.lastOffset(), offset);
350+
}
343351
}
344352
return toRemove;
345353
});

tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.mockito.Mockito.spy;
2020
import static org.testng.Assert.assertNotNull;
2121
import static org.testng.Assert.assertTrue;
22+
import static org.testng.AssertJUnit.assertEquals;
2223

2324
import com.fasterxml.jackson.databind.ObjectMapper;
2425
import com.fasterxml.jackson.databind.SerializationFeature;
@@ -983,8 +984,13 @@ public void trimConsumedLedgers(String topic) throws Exception {
983984
mock(KopBrokerLookupManager.class));
984985
PersistentTopic topicHandle = lookupService.getTopic(topic, "test").get().get();
985986

987+
log.info("Stats {}",
988+
mapper.writeValueAsString(admin
989+
.topics()
990+
.getInternalStats(topic)));
991+
986992
Awaitility.await().untilAsserted(() -> {
987-
log.debug("Subscriptions {}", topicHandle.getSubscriptions());
993+
log.debug("Subscriptions {}", topicHandle.getSubscriptions().keys());
988994
assertTrue(topicHandle.getSubscriptions().isEmpty());
989995
});
990996

@@ -999,6 +1005,12 @@ public void trimConsumedLedgers(String topic) throws Exception {
9991005
Thread.sleep(2000);
10001006
topicHandle.getManagedLedger().trimConsumedLedgersInBackground(future);
10011007
future.get(10, TimeUnit.SECONDS);
1008+
1009+
Awaitility.await().untilAsserted(() -> {
1010+
log.debug("{} getNumberOfEntries {} id {}", topicHandle.getName(), topicHandle.getNumberOfEntries());
1011+
assertEquals(topicHandle.getNumberOfEntries(), 0);
1012+
});
1013+
10021014
} finally {
10031015
admin.namespaces().setRetention(namespace, oldRetentionPolicies);
10041016
if (deduplicationStatus != null) {

tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1423,7 +1423,7 @@ public void testSnapshotEventuallyTaken() throws Exception {
14231423
}
14241424
}
14251425

1426-
@Test(timeOut = 20000, enabled = false)
1426+
@Test(timeOut = 30000)
14271427
public void testAbortedTxEventuallyPurged() throws Exception {
14281428
KafkaProtocolHandler protocolHandler = (KafkaProtocolHandler)
14291429
pulsar.getProtocolHandlers().protocol("kafka");
@@ -1449,6 +1449,11 @@ public void testAbortedTxEventuallyPurged() throws Exception {
14491449
producer1.send(new ProducerRecord<>(topicName, "test")).get(); // OFFSET 1
14501450
producer1.abortTransaction(); // OFFSET 2
14511451

1452+
producer1.beginTransaction();
1453+
producer1.send(new ProducerRecord<>(topicName, "test")).get(); // OFFSET 3
1454+
producer1.send(new ProducerRecord<>(topicName, "test")).get(); // OFFSET 4
1455+
producer1.abortTransaction(); // OFFSET 5
1456+
14521457
waitForTransactionsToBeInStableState(transactionalId);
14531458

14541459
PartitionLog partitionLog = protocolHandler
@@ -1458,7 +1463,10 @@ public void testAbortedTxEventuallyPurged() throws Exception {
14581463

14591464
List<FetchResponseData.AbortedTransaction> abortedIndexList =
14601465
partitionLog.getProducerStateManager().getAbortedIndexList(Long.MIN_VALUE);
1461-
assertEquals(1, abortedIndexList.size());
1466+
assertEquals(2, abortedIndexList.size());
1467+
assertEquals(2, abortedIndexList.size());
1468+
assertEquals(0, abortedIndexList.get(0).firstOffset());
1469+
assertEquals(3, abortedIndexList.get(1).firstOffset());
14621470

14631471
takeSnapshot(topicName);
14641472

@@ -1470,29 +1478,43 @@ public void testAbortedTxEventuallyPurged() throws Exception {
14701478
admin.topics().unload(fullTopicName);
14711479
admin.lookups().lookupTopic(fullTopicName);
14721480

1481+
assertTrue(partitionLog.isUnloaded());
1482+
14731483
trimConsumedLedgers(fullTopicName);
14741484

1475-
assertEquals(2, partitionLog.fetchOldestAvailableIndexFromTopic().get().longValue());
1485+
partitionLog = protocolHandler
1486+
.getReplicaManager()
1487+
.getPartitionLog(topicPartition, namespacePrefix);
1488+
partitionLog.awaitInitialisation().get();
1489+
assertEquals(5, partitionLog.fetchOldestAvailableIndexFromTopic().get().longValue());
14761490

14771491
abortedIndexList =
14781492
partitionLog.getProducerStateManager().getAbortedIndexList(Long.MIN_VALUE);
1479-
assertEquals(1, abortedIndexList.size());
1480-
1493+
assertEquals(2, abortedIndexList.size());
1494+
assertEquals(0, abortedIndexList.get(0).firstOffset());
1495+
assertEquals(3, abortedIndexList.get(1).firstOffset());
14811496

1497+
// force reading the minimum valid offset
1498+
// the timer is not started by the PH because
1499+
// we don't want it to make noise in the other tests
14821500
partitionLog.updatePurgeAbortedTxnsOffset().get();
14831501

1484-
Thread.sleep(conf.getKafkaTxnPurgeAbortedTxnIntervalSeconds() * 1000
1485-
+ 5);
1502+
// wait for some time
1503+
Thread.sleep(conf.getKafkaTxnPurgeAbortedTxnIntervalSeconds() * 1000 + 5);
14861504

14871505
producer1.beginTransaction();
14881506
// sending a message triggers the procedure
14891507
producer1.send(new ProducerRecord<>(topicName, "test")).get();
14901508

14911509
abortedIndexList =
14921510
partitionLog.getProducerStateManager().getAbortedIndexList(Long.MIN_VALUE);
1493-
assertEquals(0, abortedIndexList.size());
1511+
assertEquals(1, abortedIndexList.size());
1512+
// the second TX cannot be purged because the lastOffset is 5, that is the boundary of the
1513+
// trimmed portion of the topic
1514+
assertEquals(3, abortedIndexList.get(0).firstOffset());
14941515

14951516
producer1.close();
1517+
14961518
} finally {
14971519
conf.setKafkaTxnPurgeAbortedTxnIntervalSeconds(kafkaTxnPurgeAbortedTxnIntervalSeconds);
14981520
}

0 commit comments

Comments
 (0)