Skip to content

Commit 5ccb599

Browse files
authored
[Spark] Check for null arguments in CDC read (delta-io#4489)
#### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description CDC read query `table_changes` does not support `null` value as the start or end parameter. However, passing `null` argument via the SQL API results in NullPointerException. This PR adds a check for `null` argument and throws a DeltaIllegalArgumentException with the new error class `DELTA_CDC_READ_NULL_RANGE_BOUNDARY` for the SQL API. Similarly, providing `null` version for Scala API resulted in NumberFormatException. Added a new error class `DELTA_VERSION_INVALID` for this purpose (similar to existing `DELTA_TIMESTAMP_INVALID` for null timestamps). ## How was this patch tested? New unit tests are added in [DeltaCDCScalaSuite](spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala) and [DeltaCDCSQLSuite](spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSQLSuite.scala) to test for cases when either the start or the end parameter is `null`. Tests expect a proper DeltaIllegalArgumentException (SQL) or DeltaAnalysisException (Scala) to be thrown, which was not the case before. Other unit tests are added to [DeltaErrorsSuite](spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala) to test if the error messages are correctly reported for the new error classes. ## Does this PR introduce _any_ user-facing changes? No
1 parent 5e02cc7 commit 5ccb599

File tree

7 files changed

+119
-2
lines changed

7 files changed

+119
-2
lines changed

spark/src/main/resources/error/delta-error-classes.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,12 @@
359359
],
360360
"sqlState" : "0AKDC"
361361
},
362+
"DELTA_CDC_READ_NULL_RANGE_BOUNDARY" : {
363+
"message" : [
364+
"CDC read start/end parameters cannot be null. Please provide a valid version or timestamp."
365+
],
366+
"sqlState" : "22004"
367+
},
362368
"DELTA_CHANGE_DATA_FEED_INCOMPATIBLE_DATA_SCHEMA" : {
363369
"message" : [
364370
"Retrieving table changes between version <start> and <end> failed because of an incompatible data schema.",
@@ -3031,6 +3037,12 @@
30313037
],
30323038
"sqlState" : "KD00C"
30333039
},
3040+
"DELTA_VERSION_INVALID" : {
3041+
"message" : [
3042+
"The provided version (<version>) is not a valid version."
3043+
],
3044+
"sqlState" : "42815"
3045+
},
30343046
"DELTA_VIOLATE_CONSTRAINT_WITH_VALUES" : {
30353047
"message" : [
30363048
"CHECK constraint <constraintName> <expression> violated by row with values:",

spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,11 @@ trait DeltaErrorsBase
498498
pos = 0)
499499
}
500500

501+
/** Throwable used when a null 'start' or 'end' is provided in CDC reads. */
502+
def nullRangeBoundaryInCDCRead(): Throwable = {
503+
new DeltaIllegalArgumentException(errorClass = "DELTA_CDC_READ_NULL_RANGE_BOUNDARY")
504+
}
505+
501506
/**
502507
* Throwable used for invalid CDC 'start' and 'end' options, where end < start
503508
*/
@@ -1508,6 +1513,13 @@ trait DeltaErrorsBase
15081513
)
15091514
}
15101515

1516+
def versionInvalid(version: String): Throwable = {
1517+
new DeltaAnalysisException(
1518+
errorClass = "DELTA_VERSION_INVALID",
1519+
messageParameters = Array(s"$version")
1520+
)
1521+
}
1522+
15111523
case class TemporallyUnstableInputException(
15121524
userTimestamp: java.sql.Timestamp,
15131525
commitTs: java.sql.Timestamp,

spark/src/main/scala/org/apache/spark/sql/delta/DeltaTableValueFunctions.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql.delta.sources.DeltaDataSource
2929
import org.apache.spark.sql.SparkSession
3030
import org.apache.spark.sql.catalyst.FunctionIdentifier
3131
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistryBase, NamedRelation, TableFunctionRegistry, UnresolvedLeafNode, UnresolvedRelation}
32-
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExpressionInfo, StringLiteral}
32+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExpressionInfo, Literal, StringLiteral}
3333
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode}
3434
import org.apache.spark.sql.connector.catalog.V1Table
3535
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -110,6 +110,9 @@ trait CDCStatementBase extends DeltaTableValueFunction {
110110

111111
protected def getOptions: CaseInsensitiveStringMap = {
112112
def toDeltaOption(keyPrefix: String, value: Expression): (String, String) = {
113+
if (value == Literal(null)) {
114+
throw DeltaErrors.nullRangeBoundaryInCDCRead()
115+
}
113116
val evaluated = DeltaTableValueFunctionsShims.evaluateTimeOption(value)
114117
value.dataType match {
115118
// We dont need to explicitly handle ShortType as it is parsed as IntegerType.

spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,12 @@ trait CDCReaderImpl extends DeltaLogging {
241241
versionKey: String,
242242
timestampKey: String): Option[ResolvedCDFVersion] = {
243243
if (options.containsKey(versionKey)) {
244-
Some(ResolvedCDFVersion(options.get(versionKey).toLong, timestamp = None))
244+
val version = options.get(versionKey)
245+
try {
246+
Some(ResolvedCDFVersion(version.toLong, timestamp = None))
247+
} catch {
248+
case _: NumberFormatException => throw DeltaErrors.versionInvalid(version)
249+
}
245250
} else if (options.containsKey(timestampKey)) {
246251
val ts = options.get(timestampKey)
247252
val spec = DeltaTimeTravelSpec(Some(Literal(ts)), None, Some("cdcReader"))

spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSQLSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,32 @@ class DeltaCDCSQLSuite extends DeltaCDCSuiteBase with DeltaColumnMappingTestUtil
101101
}
102102
}
103103

104+
private def testNullRangeBoundary(start: Boundary, end: Boundary): Unit = {
105+
test(s"range boundary cannot be null - start=$start end=$end") {
106+
val tblName = "tbl"
107+
withTable(tblName) {
108+
createTblWithThreeVersions(tblName = Some(tblName))
109+
110+
checkError(intercept[DeltaIllegalArgumentException] {
111+
cdcRead(new TableName(tblName), start, end)
112+
}, "DELTA_CDC_READ_NULL_RANGE_BOUNDARY")
113+
}
114+
}
115+
}
116+
117+
for (end <- Seq(
118+
Unbounded,
119+
EndingVersion("null"),
120+
EndingVersion("0"),
121+
EndingTimestamp(dateFormat.format(new Date(1)))
122+
)) {
123+
testNullRangeBoundary(StartingVersion("null"), end)
124+
}
125+
126+
for (start <- Seq(StartingVersion("0"), StartingTimestamp(dateFormat.format(new Date(1))))) {
127+
testNullRangeBoundary(start, EndingVersion("null"))
128+
}
129+
104130
test("select individual column should push down filters") {
105131
val tblName = "tbl"
106132
withTable(tblName) {

spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -958,6 +958,52 @@ class DeltaCDCScalaSuite extends DeltaCDCSuiteBase {
958958
}
959959
}
960960

961+
private def testNullRangeBoundary(start: Boundary, end: Boundary): Unit = {
962+
test(s"range boundary cannot be null - start=$start end=$end") {
963+
val tblName = "tbl"
964+
withTable(tblName) {
965+
createTblWithThreeVersions(tblName = Some(tblName))
966+
967+
val expectedError = (start, end) match {
968+
case (StartingVersion(null), _) => "DELTA_VERSION_INVALID"
969+
case (StartingTimestamp(null), _) => "DELTA_TIMESTAMP_INVALID"
970+
case (_, EndingVersion(null)) => "DELTA_VERSION_INVALID"
971+
case (_, EndingTimestamp(null)) => "DELTA_TIMESTAMP_INVALID"
972+
}
973+
val expectedErrorParameters = expectedError match {
974+
case "DELTA_VERSION_INVALID" => Map("version" -> "null")
975+
case "DELTA_TIMESTAMP_INVALID" => Map("expr" -> "NULL")
976+
}
977+
978+
checkError(
979+
intercept[DeltaAnalysisException] {
980+
cdcRead(new TableName(tblName), start, end)
981+
},
982+
expectedError,
983+
parameters = expectedErrorParameters)
984+
}
985+
}
986+
}
987+
988+
for {
989+
start <- Seq(StartingVersion("0"), StartingTimestamp(dateFormat.format(new Date(1))))
990+
end <- Seq(EndingVersion(null), EndingTimestamp(null))
991+
} {
992+
testNullRangeBoundary(start, end)
993+
}
994+
995+
for {
996+
start <- Seq(StartingVersion(null), StartingTimestamp(null))
997+
end <- Seq(
998+
Unbounded,
999+
EndingVersion(null),
1000+
EndingTimestamp(null),
1001+
EndingVersion("0"),
1002+
EndingTimestamp(dateFormat.format(new Date(1))))
1003+
} {
1004+
testNullRangeBoundary(start, end)
1005+
}
1006+
9611007
test("filters should be pushed down") {
9621008
val tblName = "tbl"
9631009
withTable(tblName) {

spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1521,6 +1521,13 @@ trait DeltaErrorsSuiteBase
15211521
}
15221522
checkError(e, "DELTA_TIMESTAMP_INVALID", "42816", Map("expr" -> expr.sql))
15231523
}
1524+
{
1525+
val version = "null"
1526+
val e = intercept[DeltaAnalysisException] {
1527+
throw DeltaErrors.versionInvalid(version)
1528+
}
1529+
checkError(e, "DELTA_VERSION_INVALID", "42815", Map("version" -> version))
1530+
}
15241531
{
15251532
val e = intercept[DeltaAnalysisException] {
15261533
throw DeltaErrors.notADeltaSourceException("sample")
@@ -2149,6 +2156,12 @@ trait DeltaErrorsSuiteBase
21492156
}
21502157
checkError(e, "DELTA_UNRECOGNIZED_COLUMN_CHANGE", "42601", Map("otherClass" -> "change1"))
21512158
}
2159+
{
2160+
val e = intercept[DeltaIllegalArgumentException] {
2161+
throw DeltaErrors.nullRangeBoundaryInCDCRead()
2162+
}
2163+
checkError(e, "DELTA_CDC_READ_NULL_RANGE_BOUNDARY", "22004", Map.empty[String, String])
2164+
}
21522165
{
21532166
val e = intercept[DeltaIllegalArgumentException] {
21542167
throw DeltaErrors.endBeforeStartVersionInCDC(2, 1)

0 commit comments

Comments
 (0)