Skip to content

Commit f34dee1

Browse files
andygroveclaude
andcommitted
chore: upgrade Spark 4.x support from 4.0.1 to 4.1.0
Update the spark-4.0 profile to use Spark 4.1.0 and add shims for API changes: - Sum.evalMode -> Sum.evalContext.evalMode (SPARK-53968) - BinaryOutputStyle config now returns enum directly - MapStatus.apply() requires new checksumVal parameter (SPARK-51756) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent a6cfadb commit f34dee1

File tree

12 files changed

+129
-16
lines changed

12 files changed

+129
-16
lines changed

.github/workflows/spark_sql_test.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ jobs:
5050
strategy:
5151
matrix:
5252
os: [ubuntu-24.04]
53-
spark-version: [{short: '3.4', full: '3.4.3', java: 11}, {short: '3.5', full: '3.5.7', java: 11}, {short: '4.0', full: '4.0.1', java: 17}]
53+
spark-version: [{short: '3.4', full: '3.4.3', java: 11}, {short: '3.5', full: '3.5.7', java: 11}, {short: '4.0', full: '4.1.0', java: 17}]
5454
module:
5555
- {name: "catalyst", args1: "catalyst/test", args2: ""}
5656
- {name: "sql_core-1", args1: "", args2: sql/testOnly * -- -l org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest}
@@ -61,7 +61,7 @@ jobs:
6161
- {name: "sql_hive-3", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.SlowHiveTest"}
6262
# Skip sql_hive-1 for Spark 4.0 due to https://github.com/apache/datafusion-comet/issues/2946
6363
exclude:
64-
- spark-version: {short: '4.0', full: '4.0.1', java: 17}
64+
- spark-version: {short: '4.0', full: '4.1.0', java: 17}
6565
module: {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"}
6666
fail-fast: false
6767
name: spark-sql-${{ matrix.module.name }}/${{ matrix.os }}/spark-${{ matrix.spark-version.full }}/java-${{ matrix.spark-version.java }}

dev/release/create-tarball.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ echo "---------------------------------------------------------"
110110

111111

112112
# create <tarball> containing the files in git at $release_hash
113-
# the files in the tarball are prefixed with {version} (e.g. 4.0.1)
113+
# the files in the tarball are prefixed with {version} (e.g. 4.1.0)
114114
mkdir -p ${distdir}
115115
(cd "${DEV_RELEASE_TOP_DIR}" && git archive ${release_hash} --prefix ${release}/ | gzip > ${tarball})
116116

docs/source/user-guide/latest/installation.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use only and should not be used in production yet.
4949

5050
| Spark Version | Java Version | Scala Version | Comet Tests in CI | Spark SQL Tests in CI |
5151
| ------------- | ------------ | ------------- | ----------------- | --------------------- |
52-
| 4.0.1 | 17 | 2.13 | Yes | Yes |
52+
| 4.1.0 | 17 | 2.13 | Yes | Yes |
5353

5454
Note that Comet may not fully work with proprietary forks of Apache Spark such as the Spark versions offered by
5555
Cloud Service Providers.

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,7 @@ under the License.
637637
<!-- Use Scala 2.13 by default -->
638638
<scala.version>2.13.16</scala.version>
639639
<scala.binary.version>2.13</scala.binary.version>
640-
<spark.version>4.0.1</spark.version>
640+
<spark.version>4.1.0</spark.version>
641641
<spark.version.short>4.0</spark.version.short>
642642
<parquet.version>1.15.2</parquet.version>
643643
<semanticdb.version>4.13.6</semanticdb.version>

spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.apache.spark.memory.TaskMemoryManager;
4242
import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper;
4343
import org.apache.spark.scheduler.MapStatus;
44-
import org.apache.spark.scheduler.MapStatus$;
4544
import org.apache.spark.serializer.SerializerInstance;
4645
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
4746
import org.apache.spark.shuffle.ShuffleWriter;
@@ -54,6 +53,7 @@
5453
import org.apache.spark.shuffle.comet.CometShuffleMemoryAllocatorTrait;
5554
import org.apache.spark.shuffle.sort.CometShuffleExternalSorter;
5655
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
56+
import org.apache.spark.sql.comet.shims.ShimMapStatus$;
5757
import org.apache.spark.sql.types.StructType;
5858
import org.apache.spark.storage.BlockManager;
5959
import org.apache.spark.storage.FileSegment;
@@ -172,7 +172,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
172172
.commitAllPartitions(ShuffleChecksumHelper.EMPTY_CHECKSUM_VALUE)
173173
.getPartitionLengths();
174174
mapStatus =
175-
MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
175+
ShimMapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
176176
return;
177177
}
178178
final long openStartTime = System.nanoTime();
@@ -261,7 +261,8 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
261261

262262
// TODO: We probably can move checksum generation here when concatenating partition files
263263
partitionLengths = writePartitionedData(mapOutputWriter);
264-
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
264+
mapStatus =
265+
ShimMapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
265266
} catch (Exception e) {
266267
try {
267268
mapOutputWriter.abort(e);

spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometUnsafeShuffleWriter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper;
5151
import org.apache.spark.network.util.LimitedInputStream;
5252
import org.apache.spark.scheduler.MapStatus;
53-
import org.apache.spark.scheduler.MapStatus$;
5453
import org.apache.spark.serializer.SerializationStream;
5554
import org.apache.spark.serializer.SerializerInstance;
5655
import org.apache.spark.shuffle.BaseShuffleHandle;
@@ -67,6 +66,7 @@
6766
import org.apache.spark.shuffle.sort.SortShuffleManager;
6867
import org.apache.spark.shuffle.sort.UnsafeShuffleWriter;
6968
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
69+
import org.apache.spark.sql.comet.shims.ShimMapStatus$;
7070
import org.apache.spark.sql.types.StructType;
7171
import org.apache.spark.storage.BlockManager;
7272
import org.apache.spark.storage.TimeTrackingOutputStream;
@@ -288,7 +288,8 @@ void closeAndWriteOutput() throws IOException {
288288
}
289289
}
290290
}
291-
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
291+
mapStatus =
292+
ShimMapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
292293
}
293294

294295
@VisibleForTesting

spark/src/main/scala/org/apache/comet/serde/aggregates.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.comet.CometConf
3030
import org.apache.comet.CometConf.COMET_EXEC_STRICT_FLOATING_POINT
3131
import org.apache.comet.CometSparkSessionExtensions.withInfo
3232
import org.apache.comet.serde.QueryPlanSerde.{evalModeToProto, exprToProto, serializeDataType}
33-
import org.apache.comet.shims.CometEvalModeUtil
33+
import org.apache.comet.shims.{CometAggShim, CometEvalModeUtil}
3434

3535
object CometMin extends CometAggregateExpressionSerde[Min] {
3636

@@ -214,7 +214,7 @@ object CometAverage extends CometAggregateExpressionSerde[Average] {
214214
object CometSum extends CometAggregateExpressionSerde[Sum] {
215215

216216
override def getSupportLevel(sum: Sum): SupportLevel = {
217-
sum.evalMode match {
217+
CometAggShim.getSumEvalMode(sum) match {
218218
case EvalMode.ANSI if !sum.dataType.isInstanceOf[DecimalType] =>
219219
Incompatible(Some("ANSI mode for non decimal inputs is not supported"))
220220
case EvalMode.TRY if !sum.dataType.isInstanceOf[DecimalType] =>
@@ -243,7 +243,8 @@ object CometSum extends CometAggregateExpressionSerde[Sum] {
243243
val builder = ExprOuterClass.Sum.newBuilder()
244244
builder.setChild(childExpr.get)
245245
builder.setDatatype(dataType.get)
246-
builder.setEvalMode(evalModeToProto(CometEvalModeUtil.fromSparkEvalMode(sum.evalMode)))
246+
val evalMode = CometEvalModeUtil.fromSparkEvalMode(CometAggShim.getSumEvalMode(sum))
247+
builder.setEvalMode(evalModeToProto(evalMode))
247248

248249
Some(
249250
ExprOuterClass.AggExpr
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.shims
21+
22+
import org.apache.spark.sql.catalyst.expressions.EvalMode
23+
import org.apache.spark.sql.catalyst.expressions.aggregate.Sum
24+
25+
object CometAggShim {
26+
def getSumEvalMode(sum: Sum): EvalMode.Value = sum.evalMode
27+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.comet.shims
21+
22+
import org.apache.spark.scheduler.MapStatus
23+
import org.apache.spark.storage.BlockManagerId
24+
25+
object ShimMapStatus {
26+
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long], mapTaskId: Long): MapStatus = {
27+
MapStatus(loc, uncompressedSizes, mapTaskId)
28+
}
29+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.shims
21+
22+
import org.apache.spark.sql.catalyst.expressions.EvalMode
23+
import org.apache.spark.sql.catalyst.expressions.aggregate.Sum
24+
25+
object CometAggShim {
26+
def getSumEvalMode(sum: Sum): EvalMode.Value = sum.evalContext.evalMode
27+
}

0 commit comments

Comments
 (0)