Skip to content

Commit 7bfa946

Browse files
authored
[spark] Fix EqualNullSafe is not correct when column has null value. (#6943)
1 parent 662a566 commit 7bfa946

File tree

3 files changed

+20
-2
lines changed

3 files changed

+20
-2
lines changed

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ case class SparkV2FilterConverter(rowType: RowType) extends Logging {
6262
if (literal == null) {
6363
builder.isNull(transform)
6464
} else {
65-
builder.equal(transform, literal)
65+
PredicateBuilder.and(builder.isNotNull(transform), builder.equal(transform, literal))
6666
}
6767
case _ =>
6868
throw new UnsupportedOperationException(s"Convert $sparkPredicate is unsupported.")

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,24 @@ class PaimonSourceTest extends PaimonSparkTestBase with StreamTest {
3030

3131
import testImplicits._
3232

33+
test("Paimon Source: EQUAL_NULL_SAFE") {
34+
withTempDir {
35+
_ =>
36+
{
37+
val TableSnapshotState(_, _, _, _, _) = prepareTableAndGetLocation(0, hasPk = true)
38+
spark.sql("INSERT INTO T VALUES (1, CAST(null as string)), (2, CAST(null as string))")
39+
val currentResult = () => spark.sql("SELECT * FROM T WHERE !(b <=> 'v_1')")
40+
checkAnswer(currentResult(), Seq(Row(1, null), Row(2, null)))
41+
spark.sql("INSERT INTO T VALUES (3, 'v_1'), (4, CAST(null as string))")
42+
checkAnswer(currentResult(), Seq(Row(1, null), Row(2, null), Row(4, null)))
43+
val valueDF = spark.sql("SELECT * FROM T WHERE b <=> 'v_1'")
44+
checkAnswer(valueDF, Seq(Row(3, "v_1")))
45+
val nullDF = spark.sql("SELECT * FROM T WHERE b <=> null")
46+
checkAnswer(nullDF, Seq(Row(1, null), Row(2, null), Row(4, null)))
47+
}
48+
}
49+
}
50+
3351
test("Paimon Source: default scan mode") {
3452
withTempDir {
3553
checkpointDir =>

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ abstract class SparkV2FilterConverterTestBase extends PaimonSparkTestBase {
220220
test("V2Filter: EqualNullSafe") {
221221
var filter = "int_col <=> 1"
222222
var actual = converter.convert(v2Filter(filter)).get
223-
assert(actual.equals(builder.equal(3, 1)))
223+
assert(actual.equals(PredicateBuilder.and(builder.isNotNull(3), builder.equal(3, 1))))
224224
checkAnswer(sql(s"SELECT int_col from test_tbl WHERE $filter ORDER BY int_col"), Seq(Row(1)))
225225
assert(scanFilesCount(filter) == 1)
226226

0 commit comments

Comments
 (0)