Skip to content

Commit 579edf4

Browse files
gaborgsomogyidongjoon-hyun
authored andcommitted
[SPARK-28335][DSTREAMS][TEST] DirectKafkaStreamSuite wait for Kafka async commit
## What changes were proposed in this pull request? `DirectKafkaStreamSuite.offset recovery from kafka` commits offsets to Kafka with `Consumer.commitAsync` API (and then reads it back). Since this API is asynchronous it may send notifications late(or not at all). The actual test makes the assumption if the data sent and collected then the offset must be committed as well. This is not true. In this PR I've made the following modifications: * Wait for async offset commit before context stopped * Added commit succeed log to see whether it arrived at all * Using `ConcurrentHashMap` for committed offsets because 2 threads are using the variable (`JobGenerator` and `ScalaTest...`) ## How was this patch tested? Existing unit test in a loop + jenkins runs. Closes apache#25100 from gaborgsomogyi/SPARK-28335. Authored-by: Gabor Somogyi <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent a6506f0 commit 579edf4

File tree

1 file changed

+6
-2
lines changed

1 file changed

+6
-2
lines changed

external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.kafka010
2020
import java.io.File
2121
import java.lang.{ Long => JLong }
2222
import java.util.{ Arrays, HashMap => JHashMap, Map => JMap, UUID }
23+
import java.util.concurrent.ConcurrentHashMap
2324
import java.util.concurrent.ConcurrentLinkedQueue
2425
import java.util.concurrent.atomic.AtomicLong
2526

@@ -430,7 +431,7 @@ class DirectKafkaStreamSuite
430431
)
431432

432433
val collectedData = new ConcurrentLinkedQueue[String]()
433-
val committed = new JHashMap[TopicPartition, OffsetAndMetadata]()
434+
val committed = new ConcurrentHashMap[TopicPartition, OffsetAndMetadata]()
434435

435436
// Send data to Kafka and wait for it to be received
436437
def sendDataAndWaitForReceive(data: Seq[Int]) {
@@ -458,6 +459,7 @@ class DirectKafkaStreamSuite
458459
logError("commit failed", e)
459460
} else {
460461
committed.putAll(m)
462+
logDebug(s"commit succeeded: $m")
461463
}
462464
})
463465
}
@@ -467,8 +469,10 @@ class DirectKafkaStreamSuite
467469
for (i <- (1 to 10).grouped(4)) {
468470
sendDataAndWaitForReceive(i)
469471
}
472+
eventually(timeout(10.seconds), interval(50.milliseconds)) {
473+
assert(!committed.isEmpty)
474+
}
470475
ssc.stop()
471-
assert(! committed.isEmpty)
472476
val consumer = new KafkaConsumer[String, String](kafkaParams)
473477
consumer.subscribe(Arrays.asList(topic))
474478
consumer.poll(0)

0 commit comments

Comments
 (0)