Skip to content

Commit 45327fd

Browse files
KAFKA-18035: Backport TransactionsTest testBumpTransactionalEpochWithTV2Disabled failed on trunk (#20102)
Backports the flakyness fix in #18451 to 4.0 branch > Sometimes we didn't get into abortable state before aborting, so the epoch didn't get bumped. Now we force abortable state with an attempt to send before aborting so the epoch bump occurs as expected. > > Reviewers: Jeff Kim <[email protected]> Reviewers: Chia-Ping Tsai <[email protected]> Co-authored-by: Justine Olshan <[email protected]>
1 parent 4ce6f5c commit 45327fd

File tree

1 file changed

+15
-2
lines changed

1 file changed

+15
-2
lines changed

core/src/test/scala/integration/kafka/api/TransactionsTest.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import kafka.utils.TestUtils.{consumeRecords, waitUntilTrue}
2121
import kafka.utils.{TestInfoUtils, TestUtils}
2222
import org.apache.kafka.clients.consumer._
2323
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
24-
import org.apache.kafka.common.TopicPartition
24+
import org.apache.kafka.common.{KafkaException, TopicPartition}
2525
import org.apache.kafka.common.errors.{ConcurrentTransactionsException, InvalidProducerEpochException, ProducerFencedException, TimeoutException}
2626
import org.apache.kafka.common.test.api.Flaky
2727
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
@@ -738,6 +738,19 @@ class TransactionsTest extends IntegrationTestHarness {
738738
restartDeadBrokers()
739739

740740
org.apache.kafka.test.TestUtils.assertFutureThrows(failedFuture, classOf[TimeoutException])
741+
// Ensure the producer transitions to abortable_error state.
742+
TestUtils.waitUntilTrue(() => {
743+
var failed = false
744+
try {
745+
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(testTopic, 0, "3", "3", willBeCommitted = false))
746+
} catch {
747+
case e: Exception =>
748+
if (e.isInstanceOf[KafkaException])
749+
failed = true
750+
}
751+
failed
752+
}, "The send request never failed as expected.")
753+
assertThrows(classOf[KafkaException], () => producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(testTopic, 0, "3", "3", willBeCommitted = false)))
741754
producer.abortTransaction()
742755

743756
producer.beginTransaction()
@@ -760,7 +773,7 @@ class TransactionsTest extends IntegrationTestHarness {
760773
producerStateEntry =
761774
brokers(partitionLeader).logManager.getLog(new TopicPartition(testTopic, 0)).get.producerStateManager.activeProducers.get(producerId)
762775
assertNotNull(producerStateEntry)
763-
assertTrue(producerStateEntry.producerEpoch > initialProducerEpoch)
776+
assertTrue(producerStateEntry.producerEpoch > initialProducerEpoch, "InitialProduceEpoch: " + initialProducerEpoch + " ProducerStateEntry: " + producerStateEntry)
764777
} finally {
765778
producer.close(Duration.ZERO)
766779
}

0 commit comments

Comments
 (0)