Skip to content

Commit 3a89a11

Browse files
gaoran10eolivelli
authored andcommitted
The delta time should less than aborted txn index purge interval.
1 parent 3e64d96 commit 3a89a11

File tree

2 files changed

+48
-1
lines changed

2 files changed

+48
-1
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ long maybePurgeAbortedTx() {
168168
log.debug("maybePurgeAbortedTx deltaFromLast {} vs kafkaTxnPurgeAbortedTxnIntervalSeconds {} ",
169169
deltaFromLast, kafkaTxnPurgeAbortedTxnIntervalSeconds);
170170
}
171-
if (deltaFromLast > kafkaTxnPurgeAbortedTxnIntervalSeconds) {
171+
if (deltaFromLast < kafkaTxnPurgeAbortedTxnIntervalSeconds) {
172172
return 0;
173173
}
174174
lastPurgeAbortedTxnTime = now;

tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
1717
import static org.hamcrest.CoreMatchers.instanceOf;
1818
import static org.hamcrest.MatcherAssert.assertThat;
19+
import static org.mockito.ArgumentMatchers.any;
20+
import static org.mockito.Mockito.spy;
21+
import static org.mockito.Mockito.times;
22+
import static org.mockito.Mockito.verify;
1923
import static org.testng.Assert.assertEquals;
2024
import static org.testng.Assert.assertFalse;
2125
import static org.testng.Assert.assertNotNull;
@@ -30,9 +34,15 @@
3034
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState;
3135
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager;
3236
import io.streamnative.pulsar.handlers.kop.scala.Either;
37+
import io.streamnative.pulsar.handlers.kop.storage.CompletedTxn;
3338
import io.streamnative.pulsar.handlers.kop.storage.PartitionLog;
39+
import io.streamnative.pulsar.handlers.kop.storage.ProducerStateManager;
3440
import io.streamnative.pulsar.handlers.kop.storage.ProducerStateManagerSnapshot;
41+
import io.streamnative.pulsar.handlers.kop.storage.ProducerStateManagerSnapshotBuffer;
3542
import io.streamnative.pulsar.handlers.kop.storage.TxnMetadata;
43+
44+
import java.lang.reflect.InvocationTargetException;
45+
import java.lang.reflect.Method;
3646
import java.time.Duration;
3747
import java.time.temporal.ChronoUnit;
3848
import java.util.ArrayList;
@@ -51,6 +61,8 @@
5161
import java.util.concurrent.atomic.AtomicInteger;
5262
import java.util.function.BiConsumer;
5363
import java.util.function.Function;
64+
65+
import io.streamnative.pulsar.handlers.kop.utils.ReflectionUtils;
5466
import lombok.Cleanup;
5567
import lombok.extern.slf4j.Slf4j;
5668
import org.apache.commons.lang3.RandomStringUtils;
@@ -81,6 +93,7 @@
8193
import org.apache.kafka.common.serialization.StringSerializer;
8294
import org.apache.pulsar.common.naming.TopicName;
8395
import org.awaitility.Awaitility;
96+
import org.mockito.Mockito;
8497
import org.testng.Assert;
8598
import org.testng.annotations.AfterClass;
8699
import org.testng.annotations.AfterMethod;
@@ -1036,8 +1049,42 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except
10361049

10371050
}
10381051

1052+
@Test(timeOut = 10_000)
1053+
public void testAbortedPurgeIntervalConfiguration() throws Exception {
1054+
Class<ProducerStateManager> clazz = ProducerStateManager.class;
1055+
Method maybePurgeMethod = clazz.getDeclaredMethod("maybePurgeAbortedTx");
1056+
maybePurgeMethod.setAccessible(true);
1057+
Method updateAbortedTxnsPurgeOffsetMethod =
1058+
clazz.getDeclaredMethod("updateAbortedTxnsPurgeOffset", long.class);
1059+
updateAbortedTxnsPurgeOffsetMethod.setAccessible(true);
1060+
1061+
ProducerStateManager producerStateManager = buildProducerStateManager(
1062+
updateAbortedTxnsPurgeOffsetMethod, Integer.MAX_VALUE);
1063+
for (int i = 0; i < 10; i++) {
1064+
// the purge interval is Integer.MAX_VALUE, the purge operation should not be triggered
1065+
assertEquals(maybePurgeMethod.invoke(producerStateManager), 0L);
1066+
Thread.sleep(500);
1067+
}
10391068

1069+
producerStateManager = buildProducerStateManager(updateAbortedTxnsPurgeOffsetMethod, 1);
1070+
assertEquals(maybePurgeMethod.invoke(producerStateManager), 0L);
1071+
Thread.sleep(1500);
1072+
assertEquals(maybePurgeMethod.invoke(producerStateManager), 1L);
1073+
}
10401074

1075+
private ProducerStateManager buildProducerStateManager(Method updateAbortedTxnsPurgeOffsetMethod,
1076+
int purgeAbortedTxnIntervalSec) throws Exception {
1077+
ProducerStateManager producerStateManager = new ProducerStateManager(
1078+
"aborted-txn-index-purge-interval-test-" + RandomStringUtils.randomAlphanumeric(5),
1079+
UUID.randomUUID().toString(),
1080+
Mockito.mock(ProducerStateManagerSnapshotBuffer.class),
1081+
1000 * 30,
1082+
purgeAbortedTxnIntervalSec);
1083+
producerStateManager.updateMapEndOffset(100L);
1084+
updateAbortedTxnsPurgeOffsetMethod.invoke(producerStateManager, 10L);
1085+
producerStateManager.updateTxnIndex(new CompletedTxn(1L, 5L, 6L, true), 6L);
1086+
return producerStateManager;
1087+
}
10411088

10421089
@Test(timeOut = 60000)
10431090
public void testRecoverFromInvalidSnapshotAfterTrim() throws Exception {

0 commit comments

Comments
 (0)