Skip to content

Commit 4adfdd7

Browse files
dlg99eolivelli
authored andcommitted
Revert "[tests] Add tests about automatic purge of aborted TXs"
This reverts commit 9e4d07f.
1 parent f2138f3 commit 4adfdd7

File tree

5 files changed

+13
-81
lines changed

5 files changed

+13
-81
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import java.util.concurrent.Executor;
4949
import java.util.concurrent.ExecutorService;
5050
import java.util.concurrent.TimeUnit;
51-
import java.util.concurrent.atomic.AtomicBoolean;
5251
import java.util.concurrent.atomic.AtomicLong;
5352
import java.util.function.Consumer;
5453
import lombok.AllArgsConstructor;
@@ -154,8 +153,6 @@ public CompletableFuture<KeyValueSchemaIds> getSchemaIds(String topic, BytesSche
154153

155154
private volatile EntryFormatter entryFormatter;
156155

157-
private final AtomicBoolean unloaded = new AtomicBoolean();
158-
159156
public PartitionLog(KafkaServiceConfiguration kafkaConfig,
160157
RequestStats requestStats,
161158
Time time,
@@ -211,10 +208,6 @@ public boolean isInitialisationFailed() {
211208
return initFuture.isDone() && initFuture.isCompletedExceptionally();
212209
}
213210

214-
public void markAsUnloaded() {
215-
unloaded.set(true);
216-
}
217-
218211
private CompletableFuture<Void> loadTopicProperties() {
219212
CompletableFuture<Optional<PersistentTopic>> persistentTopicFuture =
220213
kafkaTopicLookupService.getTopic(fullPartitionName, this);
@@ -1080,20 +1073,12 @@ public CompletableFuture<?> updatePurgeAbortedTxnsOffset() {
10801073
// nothing to do
10811074
return CompletableFuture.completedFuture(null);
10821075
}
1083-
if (unloaded.get()) {
1084-
// nothing to do
1085-
return CompletableFuture.completedFuture(null);
1086-
}
10871076
return fetchOldestAvailableIndexFromTopic()
10881077
.thenAccept(offset ->
10891078
producerStateManager.updateAbortedTxnsPurgeOffset(offset));
10901079

10911080
}
10921081
public CompletableFuture<Long> fetchOldestAvailableIndexFromTopic() {
1093-
if (unloaded.get()) {
1094-
return FutureUtil.failedFuture(new NotLeaderOrFollowerException());
1095-
}
1096-
10971082
final CompletableFuture<Long> future = new CompletableFuture<>();
10981083

10991084
// The future that is returned by getTopicConsumerManager is always completed normally
@@ -1109,17 +1094,15 @@ public CompletableFuture<Long> fetchOldestAvailableIndexFromTopic() {
11091094
}
11101095
});
11111096

1112-
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
1113-
long numberOfEntries = managedLedger.getNumberOfEntries();
1114-
if (numberOfEntries == 0) {
1097+
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) tcm.getManagedLedger();
1098+
if (managedLedger.getNumberOfEntries() == 0) {
11151099
long currentOffset = MessageMetadataUtils.getCurrentOffset(managedLedger);
11161100
log.info("First offset for topic {} is {} as the topic is empty (numberOfEntries=0)",
11171101
fullPartitionName, currentOffset);
11181102
future.complete(currentOffset);
11191103

11201104
return future;
11211105
}
1122-
log.info("{} numberOfEntries={}", fullPartitionName, numberOfEntries);
11231106
// this is a DUMMY entry with -1
11241107
PositionImpl firstPosition = managedLedger.getFirstPosition();
11251108
// look for the first entry with data
@@ -1419,7 +1402,4 @@ private void decodeEntriesForRecovery(final CompletableFuture<DecodeResult> futu
14191402
);
14201403
}
14211404

1422-
public boolean isUnloaded() {
1423-
return unloaded.get();
1424-
}
14251405
}

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogManager.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ public PartitionLog getLog(TopicPartition topicPartition, String namespacePrefix
8686
if (error != null) {
8787
// in case of failure we have to remove the CompletableFuture from the map
8888
log.error("Recovery of {} failed", key, error);
89-
partitionLog.markAsUnloaded();
9089
logMap.remove(key, partitionLog);
9190
}
9291
});
@@ -95,19 +94,14 @@ public PartitionLog getLog(TopicPartition topicPartition, String namespacePrefix
9594
});
9695
if (res.isInitialisationFailed()) {
9796
log.error("Recovery of {} failed", kopTopic, res);
98-
res.markAsUnloaded();
9997
logMap.remove(kopTopic, res);
10098
}
10199
return res;
102100
}
103101

104102
public PartitionLog removeLog(String topicName) {
105103
log.info("removePartitionLog {}", topicName);
106-
PartitionLog exists = logMap.remove(topicName);
107-
if (exists != null) {
108-
exists.markAsUnloaded();
109-
}
110-
return exists;
104+
return logMap.remove(topicName);
111105
}
112106

113107
public int size() {

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ public void safeRun() {
150150

151151
void updateAbortedTxnsPurgeOffset(long abortedTxnsPurgeOffset) {
152152
if (log.isDebugEnabled()) {
153-
log.debug("{} updateAbortedTxnsPurgeOffset offset={}", topicPartition, abortedTxnsPurgeOffset);
153+
log.debug("{} updateAbortedTxnsPurgeOffset {}", topicPartition, abortedTxnsPurgeOffset);
154154
}
155155
if (abortedTxnsPurgeOffset < 0) {
156156
return;
@@ -164,10 +164,6 @@ long maybePurgeAbortedTx() {
164164
}
165165
long now = System.currentTimeMillis();
166166
long deltaFromLast = (now - lastPurgeAbortedTxnTime) / 1000;
167-
if (log.isDebugEnabled()) {
168-
log.debug("maybePurgeAbortedTx deltaFromLast {} vs kafkaTxnPurgeAbortedTxnIntervalSeconds {} ",
169-
deltaFromLast, kafkaTxnPurgeAbortedTxnIntervalSeconds);
170-
}
171167
if (deltaFromLast < kafkaTxnPurgeAbortedTxnIntervalSeconds) {
172168
return 0;
173169
}
@@ -344,10 +340,6 @@ public long purgeAbortedTxns(long offset) {
344340
if (toRemove) {
345341
log.info("Transaction {} can be removed (lastOffset < {})", tx, tx.lastOffset(), offset);
346342
count.incrementAndGet();
347-
} else {
348-
if (log.isDebugEnabled()) {
349-
log.info("Transaction {} cannot be removed (lastOffset >= {})", tx, tx.lastOffset(), offset);
350-
}
351343
}
352344
return toRemove;
353345
});

tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static org.mockito.Mockito.spy;
2020
import static org.testng.Assert.assertNotNull;
2121
import static org.testng.Assert.assertTrue;
22-
import static org.testng.AssertJUnit.assertEquals;
2322

2423
import com.fasterxml.jackson.databind.ObjectMapper;
2524
import com.fasterxml.jackson.databind.SerializationFeature;
@@ -984,13 +983,8 @@ public void trimConsumedLedgers(String topic) throws Exception {
984983
mock(KopBrokerLookupManager.class));
985984
PersistentTopic topicHandle = lookupService.getTopic(topic, "test").get().get();
986985

987-
log.info("Stats {}",
988-
mapper.writeValueAsString(admin
989-
.topics()
990-
.getInternalStats(topic)));
991-
992986
Awaitility.await().untilAsserted(() -> {
993-
log.debug("Subscriptions {}", topicHandle.getSubscriptions().keys());
987+
log.debug("Subscriptions {}", topicHandle.getSubscriptions());
994988
assertTrue(topicHandle.getSubscriptions().isEmpty());
995989
});
996990

@@ -1005,12 +999,6 @@ public void trimConsumedLedgers(String topic) throws Exception {
1005999
Thread.sleep(2000);
10061000
topicHandle.getManagedLedger().trimConsumedLedgersInBackground(future);
10071001
future.get(10, TimeUnit.SECONDS);
1008-
1009-
Awaitility.await().untilAsserted(() -> {
1010-
log.debug("{} getNumberOfEntries {} id {}", topicHandle.getName(), topicHandle.getNumberOfEntries());
1011-
assertEquals(topicHandle.getNumberOfEntries(), 0);
1012-
});
1013-
10141002
} finally {
10151003
admin.namespaces().setRetention(namespace, oldRetentionPolicies);
10161004
if (deduplicationStatus != null) {

tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java

Lines changed: 8 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1421,7 +1421,7 @@ public void testSnapshotEventuallyTaken() throws Exception {
14211421
}
14221422
}
14231423

1424-
@Test(timeOut = 30000)
1424+
@Test(timeOut = 20000, enabled = false)
14251425
public void testAbortedTxEventuallyPurged() throws Exception {
14261426
KafkaProtocolHandler protocolHandler = (KafkaProtocolHandler)
14271427
pulsar.getProtocolHandlers().protocol("kafka");
@@ -1447,11 +1447,6 @@ public void testAbortedTxEventuallyPurged() throws Exception {
14471447
producer1.send(new ProducerRecord<>(topicName, "test")).get(); // OFFSET 1
14481448
producer1.abortTransaction(); // OFFSET 2
14491449

1450-
producer1.beginTransaction();
1451-
producer1.send(new ProducerRecord<>(topicName, "test")).get(); // OFFSET 3
1452-
producer1.send(new ProducerRecord<>(topicName, "test")).get(); // OFFSET 4
1453-
producer1.abortTransaction(); // OFFSET 5
1454-
14551450
waitForTransactionsToBeInStableState(transactionalId);
14561451

14571452
PartitionLog partitionLog = protocolHandler
@@ -1461,10 +1456,7 @@ public void testAbortedTxEventuallyPurged() throws Exception {
14611456

14621457
List<FetchResponseData.AbortedTransaction> abortedIndexList =
14631458
partitionLog.getProducerStateManager().getAbortedIndexList(Long.MIN_VALUE);
1464-
assertEquals(2, abortedIndexList.size());
1465-
assertEquals(2, abortedIndexList.size());
1466-
assertEquals(0, abortedIndexList.get(0).firstOffset());
1467-
assertEquals(3, abortedIndexList.get(1).firstOffset());
1459+
assertEquals(1, abortedIndexList.size());
14681460

14691461
takeSnapshot(topicName);
14701462

@@ -1476,43 +1468,29 @@ public void testAbortedTxEventuallyPurged() throws Exception {
14761468
admin.topics().unload(fullTopicName);
14771469
admin.lookups().lookupTopic(fullTopicName);
14781470

1479-
assertTrue(partitionLog.isUnloaded());
1480-
14811471
trimConsumedLedgers(fullTopicName);
14821472

1483-
partitionLog = protocolHandler
1484-
.getReplicaManager()
1485-
.getPartitionLog(topicPartition, namespacePrefix);
1486-
partitionLog.awaitInitialisation().get();
1487-
assertEquals(5, partitionLog.fetchOldestAvailableIndexFromTopic().get().longValue());
1473+
assertEquals(2, partitionLog.fetchOldestAvailableIndexFromTopic().get().longValue());
14881474

14891475
abortedIndexList =
14901476
partitionLog.getProducerStateManager().getAbortedIndexList(Long.MIN_VALUE);
1491-
assertEquals(2, abortedIndexList.size());
1492-
assertEquals(0, abortedIndexList.get(0).firstOffset());
1493-
assertEquals(3, abortedIndexList.get(1).firstOffset());
1477+
assertEquals(1, abortedIndexList.size());
1478+
14941479

1495-
// force reading the minimum valid offset
1496-
// the timer is not started by the PH because
1497-
// we don't want it to make noise in the other tests
14981480
partitionLog.updatePurgeAbortedTxnsOffset().get();
14991481

1500-
// wait for some time
1501-
Thread.sleep(conf.getKafkaTxnPurgeAbortedTxnIntervalSeconds() * 1000 + 5);
1482+
Thread.sleep(conf.getKafkaTxnPurgeAbortedTxnIntervalSeconds() * 1000
1483+
+ 5);
15021484

15031485
producer1.beginTransaction();
15041486
// sending a message triggers the procedure
15051487
producer1.send(new ProducerRecord<>(topicName, "test")).get();
15061488

15071489
abortedIndexList =
15081490
partitionLog.getProducerStateManager().getAbortedIndexList(Long.MIN_VALUE);
1509-
assertEquals(1, abortedIndexList.size());
1510-
// the second TX cannot be purged because the lastOffset is 5, that is the boundary of the
1511-
// trimmed portion of the topic
1512-
assertEquals(3, abortedIndexList.get(0).firstOffset());
1491+
assertEquals(0, abortedIndexList.size());
15131492

15141493
producer1.close();
1515-
15161494
} finally {
15171495
conf.setKafkaTxnPurgeAbortedTxnIntervalSeconds(kafkaTxnPurgeAbortedTxnIntervalSeconds);
15181496
}

0 commit comments

Comments
 (0)