Skip to content

Commit 85e95b7

Browse files
HeartSaVioRHyukjinKwon
authored andcommitted
[SPARK-28142][SS] Use CaseInsensitiveStringMap for KafkaContinuousStream
## What changes were proposed in this pull request? This patch addresses a missing spot which Map should be passed as CaseInsensitiveStringMap - KafkaContinuousStream seems to be only the missed one. Before this fix, it has a relevant bug where `pollTimeoutMs` is always set to default value, as the value of `KafkaSourceProvider.CONSUMER_POLL_TIMEOUT` is `kafkaConsumer.pollTimeoutMs` which key-lowercased map has been provided as `sourceOptions`. ## How was this patch tested? N/A. Closes apache#24942 from HeartSaVioR/MINOR-use-case-insensitive-map-for-kafka-continuous-source. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent b5e183c commit 85e95b7

File tree

2 files changed

+5
-4
lines changed

2 files changed

+5
-4
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,15 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
3030
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
3131
import org.apache.spark.sql.sources.v2.reader._
3232
import org.apache.spark.sql.sources.v2.reader.streaming._
33+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3334

3435
/**
3536
* A [[ContinuousStream]] for data from kafka.
3637
*
3738
* @param offsetReader a reader used to get kafka offsets. Note that the actual data will be
3839
* read by per-task consumers generated later.
3940
* @param kafkaParams String params for per-task Kafka consumers.
40-
* @param sourceOptions Params which are not Kafka consumer params.
41+
* @param options Params which are not Kafka consumer params.
4142
* @param metadataPath Path to a directory this reader can use for writing metadata.
4243
* @param initialOffsets The Kafka offsets to start reading data at.
4344
* @param failOnDataLoss Flag indicating whether reading should fail in data loss
@@ -47,14 +48,14 @@ import org.apache.spark.sql.sources.v2.reader.streaming._
4748
class KafkaContinuousStream(
4849
offsetReader: KafkaOffsetReader,
4950
kafkaParams: ju.Map[String, Object],
50-
sourceOptions: Map[String, String],
51+
options: CaseInsensitiveStringMap,
5152
metadataPath: String,
5253
initialOffsets: KafkaOffsetRangeLimit,
5354
failOnDataLoss: Boolean)
5455
extends ContinuousStream with Logging {
5556

5657
private val pollTimeoutMs =
57-
sourceOptions.getOrElse(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, "512").toLong
58+
options.getLong(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, 512)
5859

5960
// Initialized when creating reader factories. If this diverges from the partitions at the latest
6061
// offsets, we need to reconfigure.

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
449449
new KafkaContinuousStream(
450450
kafkaOffsetReader,
451451
kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
452-
parameters,
452+
options,
453453
checkpointLocation,
454454
startingStreamOffsets,
455455
failOnDataLoss(caseInsensitiveParams))

0 commit comments

Comments
 (0)