Skip to content

Commit 51345be

Browse files
authored
Merge branch-25.04 into main [skip ci] (#12545)
Merge branch-25.04 into main Note: merge this PR with **Create a merge commit to merge**
2 parents b228bab + 9fc8292 commit 51345be

File tree

4 files changed

+86
-33
lines changed

4 files changed

+86
-33
lines changed

CHANGELOG.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Change log
2-
Generated on 2025-04-14
2+
Generated on 2025-04-17
33

44
## Release 25.04
55

@@ -30,11 +30,16 @@ Generated on 2025-04-14
3030
### Bugs Fixed
3131
|||
3232
|:---|:---|
33+
|[#12530](https://github.com/NVIDIA/spark-rapids/issues/12530)|[BUG] Outer join result is incorrect when Spark is 3.5.x, join side is outer side, join on struct column and there is null|
3334
|[#12410](https://github.com/NVIDIA/spark-rapids/issues/12410)|[BUG] ThrottlingExecutorSuite: test task metrics failed intermittently|
3435
|[#12435](https://github.com/NVIDIA/spark-rapids/issues/12435)|[BUG] Running integration tests with `PERFILE` results in failed tests|
36+
|[#12360](https://github.com/NVIDIA/spark-rapids/issues/12360)|[BUG] delta_lake_test test_delta_deletion_vector cases failed in databricks 14.3 runtime|
3537
|[#12123](https://github.com/NVIDIA/spark-rapids/issues/12123)|[BUG] delta_lake_update_test.test_delta_update_fallback_with_deletion_vectors failed assertion failed: Could not find RapidsDeltaWriteExec in the GPU plans with spark34Xshims|
38+
|[#12405](https://github.com/NVIDIA/spark-rapids/issues/12405)|[BUG] test_delta_deletion_vector_fallback fails on [databricks] 14.3 CI|
39+
|[#12460](https://github.com/NVIDIA/spark-rapids/issues/12460)|[BUG] Fallback to the CPU when FileSourceScan is reading Deletion Vectors on Databricks 14.3|
3640
|[#12027](https://github.com/NVIDIA/spark-rapids/issues/12027)|[BUG] [DB 14.3] `tightBounds` stat in Delta Lake tables is set incorrectly|
3741
|[#12379](https://github.com/NVIDIA/spark-rapids/issues/12379)|[BUG] test_parse_url_supported fails on [databricks] 14.3|
42+
|[#12428](https://github.com/NVIDIA/spark-rapids/issues/12428)|[BUG] Multiple python udf integration test cases failed in DB 14.3|
3843
|[#12408](https://github.com/NVIDIA/spark-rapids/issues/12408)|[BUG] Job timeout registration pathologically fails in some [databricks] CI_PART1 pipelines|
3944
|[#12413](https://github.com/NVIDIA/spark-rapids/issues/12413)|[BUG] nightly shuffle multi-thread/UCX CI failed possibly out of memory or process/resource limits reached|
4045
|[#12376](https://github.com/NVIDIA/spark-rapids/issues/12376)|[BUG] test_col_size_exceeding_cudf_limit fails on [databricks]|
@@ -82,6 +87,8 @@ Generated on 2025-04-14
8287
### PRs
8388
|||
8489
|:---|:---|
90+
|[#12535](https://github.com/NVIDIA/spark-rapids/pull/12535)|Fix bug when join side is outer side|
91+
|[#12494](https://github.com/NVIDIA/spark-rapids/pull/12494)|Update changelog for v25.04.0 release [skip ci]|
8592
|[#12497](https://github.com/NVIDIA/spark-rapids/pull/12497)|[DOC] update the download page for 2504 release [skip ci]|
8693
|[#12473](https://github.com/NVIDIA/spark-rapids/pull/12473)|Update dependency version JNI, private, hybrid to 25.04.0|
8794
|[#12485](https://github.com/NVIDIA/spark-rapids/pull/12485)|Enable the 14.3 Shim|

integration_tests/src/main/python/join_test.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,36 @@ def test_hash_join_side_is_build_side_asymmetric(data_gen, join_type, kudo_enabl
337337
}
338338
hash_join_side_is_build_side(data_gen, join_type, confs)
339339

340+
@ignore_order(local=True)
341+
@pytest.mark.parametrize('join_type', all_asymmetric_sized_join_types, ids=idfn)
342+
def test_hash_join_side_is_build_side_basic(join_type):
343+
def _do_join(spark):
344+
left = [
345+
(1, ("Alice",)),
346+
(2, ("Bob",)),
347+
(3, None),
348+
(4, (None,)),
349+
]
350+
right = [
351+
(11, ("Alice",)),
352+
(33, None),
353+
(333, None),
354+
(44, (None,)),
355+
]
356+
schema = StructType([
357+
StructField("id", IntegerType()),
358+
StructField("name", StructType([
359+
StructField("value", StringType())]))])
360+
left = spark.createDataFrame(left, schema)
361+
right = spark.createDataFrame(right, schema)
362+
if (join_type == "LeftOuter"):
363+
return left.hint("SHUFFLE_HASH").join(right, "name", join_type).select(left.id, left.name, right.id, right.name)
364+
elif (join_type == "RightOuter"):
365+
return left.join(right.hint("SHUFFLE_HASH"), "name", join_type).select(left.id, left.name, right.id, right.name)
366+
else:
367+
raise RuntimeError("Only supports left join and right join")
368+
assert_gpu_and_cpu_are_equal_collect(_do_join)
369+
340370
# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
341371
# After 3.1.0 is the min spark version we can drop this
342372
@ignore_order(local=True)

sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSizedHashJoinExec.scala

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -86,20 +86,11 @@ object GpuShuffledSizedHashJoinExec {
8686
val flippedCondition = condition.map { c =>
8787
GpuBindReferences.bindGpuReference(c, conditionLeftAttrs ++ conditionRightAttrs)
8888
}
89-
// For join types other than FullOuter and outer joins where the build side matches the
90-
// outer side, we simply set compareNullsEqual as true to adapt struct keys with nullable
91-
// children. Non-nested keys can also be correctly processed with compareNullsEqual = true,
92-
// because we filter all null records from build table before join.
93-
// For full outer and outer joins with build side matching outer side, we need to keep the
94-
// nulls in the build table and thus cannot compare nulls as equal.
95-
// For details, see https://github.com/NVIDIA/spark-rapids/issues/2126.
96-
val treatNullsEqual = joinType match {
97-
case FullOuter => false
98-
case LeftOuter if buildSide == GpuBuildLeft => false
99-
case RightOuter if buildSide == GpuBuildRight => false
100-
case _ => GpuHashJoin.anyNullableStructChild(boundStreamKeys)
101-
}
102-
val needNullFilter = treatNullsEqual && boundStreamKeys.exists(_.nullable)
89+
90+
val treatNullsEqual = GpuHashJoin.compareNullsEqual(joinType, boundStreamKeys)
91+
val needNullFilter = GpuHashJoin.buildSideNeedsNullFilter(
92+
joinType, treatNullsEqual, buildSide, boundBuildKeys)
93+
10394
BoundJoinExprs(boundStreamKeys, streamTypes, streamOutput,
10495
boundBuildKeys, buildTypes, buildOutput,
10596
flippedCondition, conditionLeftAttrs.size, treatNullsEqual, needNullFilter)
@@ -133,16 +124,14 @@ object GpuShuffledSizedHashJoinExec {
133124
val boundCondition = condition.map { c =>
134125
GpuBindReferences.bindGpuReference(c, streamOutput ++ buildOutput)
135126
}
136-
// For join types other than FullOuter, we simply set compareNullsEqual as true to adapt
137-
// struct keys with nullable children. Non-nested keys can also be correctly processed with
138-
// compareNullsEqual = true, because we filter all null records from build table before join.
139-
// For details, see https://github.com/NVIDIA/spark-rapids/issues/2126.
140-
val compareNullsEqual = (joinType != FullOuter) &&
141-
GpuHashJoin.anyNullableStructChild(boundBuildKeys)
142-
val needNullFilter = compareNullsEqual && boundBuildKeys.exists(_.nullable)
127+
128+
val treatNullsEqual = GpuHashJoin.compareNullsEqual(joinType, boundBuildKeys)
129+
val needNullFilter = GpuHashJoin.buildSideNeedsNullFilter(
130+
joinType, treatNullsEqual, buildSide, boundBuildKeys)
131+
143132
BoundJoinExprs(boundBuildKeys, buildTypes, buildOutput,
144133
boundStreamKeys, streamTypes, streamOutput,
145-
boundCondition, streamOutput.size, compareNullsEqual, needNullFilter)
134+
boundCondition, streamOutput.size, treatNullsEqual, needNullFilter)
146135
}
147136
}
148137

sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,37 @@ object JoinTypeChecks {
103103

104104
object GpuHashJoin {
105105

106+
// For the join on struct, it's equal for nulls in child columns of struct column, it's not
107+
// equal for the root struct when meets both nulls.
108+
// So for join types other than FullOuter and join on struct column with nullable child,
109+
// we simply set compareNullsEqual as true, and Spark plan already filter out the nulls for root
110+
// struct column.
111+
// For details, see https://github.com/NVIDIA/spark-rapids/issues/2126.
112+
// For Non-nested keys, it is also correctly processed, because compareNullsEqual will be set to
113+
// false which is the semantic of join: null != null when join
114+
def compareNullsEqual(
115+
joinType: JoinType,
116+
buildKeys: Seq[Expression]): Boolean = (joinType != FullOuter) &&
117+
GpuHashJoin.anyNullableStructChild(buildKeys)
118+
119+
// For full outer and outer joins with build side matching outer side, we need to keep the
120+
// nulls in the build table and compare nulls as equal.
121+
// Note: for outer joins with build side matching outer side and join on struct column with child
122+
// is nullable, MUST not filter out null records from build table.
123+
def buildSideNeedsNullFilter(
124+
joinType: JoinType,
125+
compareNullsEqual: Boolean, // from function: compareNullsEqual
126+
buildSide: GpuBuildSide,
127+
buildKeys:Seq[Expression]): Boolean = {
128+
val needFilterOutNull = joinType match {
129+
case FullOuter => false
130+
case LeftOuter if buildSide == GpuBuildLeft => false
131+
case RightOuter if buildSide == GpuBuildRight => false
132+
case _ => true
133+
}
134+
needFilterOutNull && compareNullsEqual && buildKeys.exists(_.nullable)
135+
}
136+
106137
def tagJoin(
107138
meta: SparkPlanMeta[_],
108139
joinType: JoinType,
@@ -1114,12 +1145,8 @@ trait GpuHashJoin extends GpuJoinExec {
11141145
(rightData, remappedRightOutput)
11151146
}
11161147

1117-
// For join types other than FullOuter, we simply set compareNullsEqual as true to adapt
1118-
// struct keys with nullable children. Non-nested keys can also be correctly processed with
1119-
// compareNullsEqual = true, because we filter all null records from build table before join.
1120-
// For some details, please refer the issue: https://github.com/NVIDIA/spark-rapids/issues/2126
1121-
protected lazy val compareNullsEqual: Boolean = (joinType != FullOuter) &&
1122-
GpuHashJoin.anyNullableStructChild(buildKeys)
1148+
protected lazy val compareNullsEqual: Boolean =
1149+
GpuHashJoin.compareNullsEqual(joinType, buildKeys)
11231150

11241151
protected lazy val (boundBuildKeys, boundStreamKeys) = {
11251152
val lkeys = GpuBindReferences.bindGpuReferences(leftKeys, left.output)
@@ -1150,11 +1177,11 @@ trait GpuHashJoin extends GpuJoinExec {
11501177
numOutputBatches: GpuMetric,
11511178
opTime: GpuMetric,
11521179
joinTime: GpuMetric): Iterator[ColumnarBatch] = {
1153-
// Filtering nulls on the build side is a workaround for Struct joins with nullable children
1154-
// see https://github.com/NVIDIA/spark-rapids/issues/2126 for more info
1155-
val builtAnyNullable = compareNullsEqual && buildKeys.exists(_.nullable)
11561180

1157-
val nullFiltered = if (builtAnyNullable) {
1181+
val filterOutNull = GpuHashJoin.buildSideNeedsNullFilter(joinType, compareNullsEqual,
1182+
buildSide, buildKeys)
1183+
1184+
val nullFiltered = if (filterOutNull) {
11581185
val sb = closeOnExcept(builtBatch)(
11591186
SpillableColumnarBatch(_, SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
11601187
GpuHashJoin.filterNullsWithRetryAndClose(sb, boundBuildKeys)

0 commit comments

Comments
 (0)