Skip to content

Commit a7b864f

Browse files
huanliwang-dbHeartSaVioR
authored andcommitted
[SPARK-51440][SS] classify the NPE when null topic field value is in kafka message data and there is no topic option
### What changes were proposed in this pull request? We are throwing out the NPE now when null topic field value is in kafka message data and there is no topic option. Introduce a new kafka error and throw out this classified exception instead. ### Why are the changes needed? Error classification for better user experience ### Does this PR introduce _any_ user-facing change? Yes, error classification ### How was this patch tested? modify the existing unit test ### Was this patch authored or co-authored using generative AI tooling? N/A Closes #50214 from huanliwang-db/kafka-error. Authored-by: huanliwang-db <huanli.wang@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent 76a1413 commit a7b864f

File tree

4 files changed

+19
-5
lines changed

4 files changed

+19
-5
lines changed

connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@
3737
"Specified: <specifiedPartitions> Assigned: <assignedPartitions>"
3838
]
3939
},
40+
"KAFKA_NULL_TOPIC_IN_DATA": {
41+
"message" : [
42+
"The Kafka message data sent to the producer contains a null topic. Use the `topic` option for setting a default topic."
43+
]
44+
},
4045
"KAFKA_DATA_LOSS" : {
4146
"message" : [
4247
"Some data may have been lost because they are not available in Kafka any more;",

connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,12 @@ object KafkaExceptions {
177177
"specifiedPartitions" -> specifiedPartitions.toString,
178178
"assignedPartitions" -> assignedPartitions.toString))
179179
}
180+
181+
def nullTopicInData(): KafkaIllegalStateException = {
182+
new KafkaIllegalStateException(
183+
errorClass = "KAFKA_NULL_TOPIC_IN_DATA",
184+
messageParameters = Map.empty)
185+
}
180186
}
181187

182188
/**

connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.kafka.common.header.internals.RecordHeader
2727

2828
import org.apache.spark.sql.catalyst.InternalRow
2929
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, UnsafeProjection}
30+
import org.apache.spark.sql.kafka010.KafkaExceptions.nullTopicInData
3031
import org.apache.spark.sql.kafka010.producer.{CachedKafkaProducer, InternalKafkaProducerPool}
3132
import org.apache.spark.sql.types.BinaryType
3233

@@ -95,8 +96,7 @@ private[kafka010] abstract class KafkaRowWriter(
9596
val key = projectedRow.getBinary(1)
9697
val value = projectedRow.getBinary(2)
9798
if (topic == null) {
98-
throw new NullPointerException(s"null topic present in the data. Use the " +
99-
s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.")
99+
throw nullTopicInData()
100100
}
101101
val partition: Integer =
102102
if (projectedRow.isNullAt(4)) null else projectedRow.getInt(4)

connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.kafka.common.Cluster
2929
import org.apache.kafka.common.serialization.ByteArraySerializer
3030
import org.scalatest.time.SpanSugar._
3131

32-
import org.apache.spark.{SparkConf, SparkContext, SparkException, TestUtils}
32+
import org.apache.spark.{SparkConf, SparkContext, TestUtils}
3333
import org.apache.spark.sql._
3434
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection}
3535
import org.apache.spark.sql.execution.streaming.{MemoryStream, MemoryStreamBase}
@@ -491,14 +491,17 @@ abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase {
491491

492492
test("batch - null topic field value, and no topic option") {
493493
val df = Seq[(String, String)](null.asInstanceOf[String] -> "1").toDF("topic", "value")
494-
val ex = intercept[SparkException] {
494+
val ex = intercept[KafkaIllegalStateException] {
495495
df.write
496496
.format("kafka")
497497
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
498498
.mode("append")
499499
.save()
500500
}
501-
TestUtils.assertExceptionMsg(ex, "null topic present in the data")
501+
checkError(
502+
exception = ex,
503+
condition = "KAFKA_NULL_TOPIC_IN_DATA"
504+
)
502505
}
503506

504507
protected def testUnsupportedSaveModes(msg: (SaveMode) => Seq[String]): Unit = {

0 commit comments

Comments
 (0)