Skip to content

Commit aa70a0a

Browse files
committed
[SPARK-25288][TESTS] Fix flaky Kafka transaction tests
## What changes were proposed in this pull request? Here are the failures: http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaRelationSuite&test_name=read+Kafka+transactional+messages%3A+read_committed http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV1SourceSuite&test_name=read+Kafka+transactional+messages%3A+read_committed http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV2SourceSuite&test_name=read+Kafka+transactional+messages%3A+read_committed I found the Kafka consumer may not see the committed messages for a short time. This PR just adds a new method `waitUntilOffsetAppears` and uses it to make sure the consumer can see a specified offset before checking the result. ## How was this patch tested? Jenkins Closes apache#22293 from zsxwing/SPARK-25288. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
1 parent f29c2b5 commit aa70a0a

File tree

3 files changed

+37
-14
lines changed

3 files changed

+37
-14
lines changed

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -160,14 +160,18 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf
160160
}
161161

162162
object WithOffsetSync {
163-
def apply(topic: String)(func: () => Unit): StreamAction = {
163+
/**
164+
* Run `func` to write some Kafka messages and wait until the latest offset of the given
165+
* `TopicPartition` is not less than `expectedOffset`.
166+
*/
167+
def apply(
168+
topicPartition: TopicPartition,
169+
expectedOffset: Long)(func: () => Unit): StreamAction = {
164170
Execute("Run Kafka Producer")(_ => {
165171
func()
166172
// This is a hack for the race condition that the committed message may be not visible to
167173
// consumer for a short time.
168-
// Looks like after the following call returns, the consumer can always read the committed
169-
// messages.
170-
testUtils.getLatestOffsets(Set(topic))
174+
testUtils.waitUntilOffsetAppears(topicPartition, expectedOffset)
171175
})
172176
}
173177
}
@@ -652,13 +656,14 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
652656
}
653657
}
654658

659+
val topicPartition = new TopicPartition(topic, 0)
655660
// The message values are the same as their offsets to make the test easy to follow
656661
testUtils.withTranscationalProducer { producer =>
657662
testStream(mapped)(
658663
StartStream(ProcessingTime(100), clock),
659664
waitUntilBatchProcessed,
660665
CheckAnswer(),
661-
WithOffsetSync(topic) { () =>
666+
WithOffsetSync(topicPartition, expectedOffset = 5) { () =>
662667
// Send 5 messages. They should be visible only after being committed.
663668
producer.beginTransaction()
664669
(0 to 4).foreach { i =>
@@ -669,7 +674,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
669674
waitUntilBatchProcessed,
670675
// Should not see any uncommitted messages
671676
CheckNewAnswer(),
672-
WithOffsetSync(topic) { () =>
677+
WithOffsetSync(topicPartition, expectedOffset = 6) { () =>
673678
producer.commitTransaction()
674679
},
675680
AdvanceManualClock(100),
@@ -678,7 +683,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
678683
AdvanceManualClock(100),
679684
waitUntilBatchProcessed,
680685
CheckNewAnswer(3, 4), // offset: 3, 4, 5* [* means it's not a committed data message]
681-
WithOffsetSync(topic) { () =>
686+
WithOffsetSync(topicPartition, expectedOffset = 12) { () =>
682687
// Send 5 messages and abort the transaction. They should not be read.
683688
producer.beginTransaction()
684689
(6 to 10).foreach { i =>
@@ -692,7 +697,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
692697
AdvanceManualClock(100),
693698
waitUntilBatchProcessed,
694699
CheckNewAnswer(), // offset: 9*, 10*, 11*
695-
WithOffsetSync(topic) { () =>
700+
WithOffsetSync(topicPartition, expectedOffset = 18) { () =>
696701
// Send 5 messages again. The consumer should skip the above aborted messages and read
697702
// them.
698703
producer.beginTransaction()
@@ -707,7 +712,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
707712
AdvanceManualClock(100),
708713
waitUntilBatchProcessed,
709714
CheckNewAnswer(15, 16), // offset: 15, 16, 17*
710-
WithOffsetSync(topic) { () =>
715+
WithOffsetSync(topicPartition, expectedOffset = 25) { () =>
711716
producer.beginTransaction()
712717
producer.send(new ProducerRecord[String, String](topic, "18")).get()
713718
producer.commitTransaction()
@@ -774,13 +779,14 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
774779
}
775780
}
776781

782+
val topicPartition = new TopicPartition(topic, 0)
777783
// The message values are the same as their offsets to make the test easy to follow
778784
testUtils.withTranscationalProducer { producer =>
779785
testStream(mapped)(
780786
StartStream(ProcessingTime(100), clock),
781787
waitUntilBatchProcessed,
782788
CheckNewAnswer(),
783-
WithOffsetSync(topic) { () =>
789+
WithOffsetSync(topicPartition, expectedOffset = 5) { () =>
784790
// Send 5 messages. They should be visible only after being committed.
785791
producer.beginTransaction()
786792
(0 to 4).foreach { i =>
@@ -790,13 +796,13 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
790796
AdvanceManualClock(100),
791797
waitUntilBatchProcessed,
792798
CheckNewAnswer(0, 1, 2), // offset 0, 1, 2
793-
WithOffsetSync(topic) { () =>
799+
WithOffsetSync(topicPartition, expectedOffset = 6) { () =>
794800
producer.commitTransaction()
795801
},
796802
AdvanceManualClock(100),
797803
waitUntilBatchProcessed,
798804
CheckNewAnswer(3, 4), // offset: 3, 4, 5* [* means it's not a committed data message]
799-
WithOffsetSync(topic) { () =>
805+
WithOffsetSync(topicPartition, expectedOffset = 12) { () =>
800806
// Send 5 messages and abort the transaction. They should not be read.
801807
producer.beginTransaction()
802808
(6 to 10).foreach { i =>
@@ -810,7 +816,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
810816
AdvanceManualClock(100),
811817
waitUntilBatchProcessed,
812818
CheckNewAnswer(9, 10), // offset: 9, 10, 11*
813-
WithOffsetSync(topic) { () =>
819+
WithOffsetSync(topicPartition, expectedOffset = 18) { () =>
814820
// Send 5 messages again. The consumer should skip the above aborted messages and read
815821
// them.
816822
producer.beginTransaction()
@@ -825,7 +831,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
825831
AdvanceManualClock(100),
826832
waitUntilBatchProcessed,
827833
CheckNewAnswer(15, 16), // offset: 15, 16, 17*
828-
WithOffsetSync(topic) { () =>
834+
WithOffsetSync(topicPartition, expectedOffset = 25) { () =>
829835
producer.beginTransaction()
830836
producer.send(new ProducerRecord[String, String](topic, "18")).get()
831837
producer.commitTransaction()

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest
260260
producer.commitTransaction()
261261

262262
// Should read all committed messages
263+
testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 6)
263264
checkAnswer(df, (1 to 5).map(_.toString).toDF)
264265

265266
producer.beginTransaction()
@@ -269,6 +270,7 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest
269270
producer.abortTransaction()
270271

271272
// Should not read aborted messages
273+
testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 12)
272274
checkAnswer(df, (1 to 5).map(_.toString).toDF)
273275

274276
producer.beginTransaction()
@@ -278,6 +280,7 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest
278280
producer.commitTransaction()
279281

280282
// Should skip aborted messages and read new committed ones.
283+
testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 18)
281284
checkAnswer(df, ((1 to 5) ++ (11 to 15)).map(_.toString).toDF)
282285
}
283286
}
@@ -301,11 +304,13 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest
301304
}
302305

303306
// "read_uncommitted" should see all messages including uncommitted ones
307+
testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 5)
304308
checkAnswer(df, (1 to 5).map(_.toString).toDF)
305309

306310
producer.commitTransaction()
307311

308312
// Should read all committed messages
313+
testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 6)
309314
checkAnswer(df, (1 to 5).map(_.toString).toDF)
310315

311316
producer.beginTransaction()
@@ -315,6 +320,7 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest
315320
producer.abortTransaction()
316321

317322
// "read_uncommitted" should see all messages including uncommitted or aborted ones
323+
testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 12)
318324
checkAnswer(df, (1 to 10).map(_.toString).toDF)
319325

320326
producer.beginTransaction()
@@ -324,6 +330,7 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest
324330
producer.commitTransaction()
325331

326332
// Should read all messages
333+
testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 18)
327334
checkAnswer(df, (1 to 15).map(_.toString).toDF)
328335
}
329336
}

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,16 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
439439
}
440440
}
441441

442+
/**
443+
* Wait until the latest offset of the given `TopicPartition` is not less than `offset`.
444+
*/
445+
def waitUntilOffsetAppears(topicPartition: TopicPartition, offset: Long): Unit = {
446+
eventually(timeout(60.seconds)) {
447+
val currentOffset = getLatestOffsets(Set(topicPartition.topic)).get(topicPartition)
448+
assert(currentOffset.nonEmpty && currentOffset.get >= offset)
449+
}
450+
}
451+
442452
private class EmbeddedZookeeper(val zkConnect: String) {
443453
val snapshotDir = Utils.createTempDir()
444454
val logDir = Utils.createTempDir()

0 commit comments

Comments
 (0)