Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private[kafka010] class KafkaOffsetReaderAdmin(
logDebug(s"Assigned partitions: $partitions. Seeking to $partitionOffsets")
partitionOffsets
}
val partitions = consumerStrategy.assignedTopicPartitions(admin)
val partitions = withRetries { consumerStrategy.assignedTopicPartitions(admin) }
// Obtain TopicPartition offsets with late binding support
offsetRangeLimit match {
case EarliestOffsetRangeLimit => partitions.map {
Expand Down Expand Up @@ -455,9 +455,9 @@ private[kafka010] class KafkaOffsetReaderAdmin(
* Retries are needed to handle transient failures. For e.g. race conditions between getting
* assignment and getting position while topics/partitions are deleted can cause NPEs.
*/
private def withRetries(body: => Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
private def withRetries[T](body: => T): T = {
synchronized {
var result: Option[Map[TopicPartition, Long]] = None
var result: Option[T] = None
var attempt = 1
var lastException: Throwable = null
while (result.isEmpty && attempt <= maxOffsetFetchAttempts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger

import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.Admin
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{mock, when}

import org.apache.spark.SparkException
import org.apache.spark.sql.QueryTest
Expand Down Expand Up @@ -263,4 +266,60 @@ class KafkaOffsetReaderSuite extends QueryTest with SharedSparkSession with Kafk
}
}
}

private def createReaderWithMockedStrategy(
mockStrategy: ConsumerStrategy): KafkaOffsetReaderAdmin = {
new KafkaOffsetReaderAdmin(
mockStrategy,
KafkaSourceProvider.kafkaParamsForDriver(Map(
"bootstrap.servers" -> testUtils.brokerAddress
)),
CaseInsensitiveMap(Map(
KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY -> "3",
KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS -> "0"
)),
""
)
}

test("SPARK-55561: fetchPartitionOffsets retries on transient failures") {
val tp0 = new TopicPartition("topic", 0)
val tp1 = new TopicPartition("topic", 1)
val expectedPartitions = Set(tp0, tp1)

val mockStrategy = mock(classOf[ConsumerStrategy])
val mockAdmin = mock(classOf[Admin])
when(mockStrategy.createAdmin(any())).thenReturn(mockAdmin)
when(mockStrategy.assignedTopicPartitions(any()))
.thenThrow(new RuntimeException("Transient error"))
.thenThrow(new RuntimeException("Transient error"))
.thenReturn(expectedPartitions)

val reader = createReaderWithMockedStrategy(mockStrategy)
try {
val result = reader.fetchPartitionOffsets(
EarliestOffsetRangeLimit, isStartingOffsets = true)
assert(result === expectedPartitions.map(tp => tp -> KafkaOffsetRangeLimit.EARLIEST).toMap)
} finally {
reader.close()
}
}

test("SPARK-55561: fetchPartitionOffsets throws after all retries exhausted") {
val mockStrategy = mock(classOf[ConsumerStrategy])
val mockAdmin = mock(classOf[Admin])
when(mockStrategy.createAdmin(any())).thenReturn(mockAdmin)
when(mockStrategy.assignedTopicPartitions(any()))
.thenThrow(new RuntimeException("Persistent error"))

val reader = createReaderWithMockedStrategy(mockStrategy)
try {
val ex = intercept[RuntimeException] {
reader.fetchPartitionOffsets(EarliestOffsetRangeLimit, isStartingOffsets = true)
}
assert(ex.getMessage === "Persistent error")
} finally {
reader.close()
}
}
}