Skip to content

Commit 6ef94e0

Browse files
committed
[SPARK-27260][SS] Upgrade to Kafka 2.2.0
## What changes were proposed in this pull request? This PR aims to update Kafka dependency to 2.2.0 to bring the following improvement and bug fixes. - https://issues.apache.org/jira/projects/KAFKA/versions/12344063 Due to [KAFKA-4453](https://issues.apache.org/jira/browse/KAFKA-4453), data plane API and controller plane API are separated. Apache Spark needs the following changes. ```scala - servers.head.apis.metadataCache + servers.head.dataPlaneRequestProcessor.metadataCache ``` ## How was this patch tested? Pass the Jenkins with the existing tests. Closes apache#24190 from dongjoon-hyun/SPARK-27260. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 52671d6 commit 6ef94e0

File tree

3 files changed

+5
-3
lines changed

3 files changed

+5
-3
lines changed

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
429429
}
430430

431431
private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
432-
def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
432+
def isPropagated = server.dataPlaneRequestProcessor.metadataCache
433+
.getPartitionInfo(topic, partition) match {
433434
case Some(partitionState) =>
434435
zkUtils.getLeaderForPartition(topic, partition).isDefined &&
435436
Request.isValidBrokerId(partitionState.basePartitionState.leader) &&

external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,8 @@ private[kafka010] class KafkaTestUtils extends Logging {
286286
}
287287

288288
private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
289-
def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
289+
def isPropagated = server.dataPlaneRequestProcessor.metadataCache
290+
.getPartitionInfo(topic, partition) match {
290291
case Some(partitionState) =>
291292
val leader = partitionState.basePartitionState.leader
292293
val isr = partitionState.basePartitionState.isr

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@
131131
<!-- Version used for internal directory structure -->
132132
<hive.version.short>1.2.1</hive.version.short>
133133
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
134-
<kafka.version>2.1.1</kafka.version>
134+
<kafka.version>2.2.0</kafka.version>
135135
<derby.version>10.12.1.1</derby.version>
136136
<parquet.version>1.10.1</parquet.version>
137137
<orc.version>1.5.5</orc.version>

0 commit comments

Comments
 (0)