Skip to content

Commit f04a85e

Browse files
committed
build: Add spark-4.1 profile and shims
1 parent fd53edb commit f04a85e

File tree

8 files changed

+60
-13
lines changed

8 files changed

+60
-13
lines changed

.github/workflows/pr_build_linux.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@ jobs:
9898
java_version: "17"
9999
maven_opts: "-Pspark-4.0"
100100
scan_impl: "native_comet"
101+
102+
- name: "Spark 4.1, JDK 17"
103+
java_version: "17"
104+
maven_opts: "-Pspark-4.1"
105+
scan_impl: "native_comet"
101106
suite:
102107
- name: "fuzz"
103108
value: |

pom.xml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,33 @@ under the License.
651651
</properties>
652652
</profile>
653653

654+
<profile>
655+
<!-- FIXME: this is WIP. Tests may fail https://github.com/apache/datafusion-comet/issues/551 -->
656+
<id>spark-4.1</id>
657+
<properties>
658+
<!-- Use Scala 2.13 by default -->
659+
<scala.version>2.13.17</scala.version>
660+
<scala.binary.version>2.13</scala.binary.version>
661+
<spark.version>4.1.0</spark.version>
662+
<spark.version.short>4.1</spark.version.short>
663+
<parquet.version>1.16.0</parquet.version>
664+
<semanticdb.version>4.13.9</semanticdb.version>
665+
<slf4j.version>2.0.17</slf4j.version>
666+
<shims.majorVerSrc>spark-4.1</shims.majorVerSrc>
667+
<shims.minorVerSrc>not-needed-yet</shims.minorVerSrc>
668+
<!-- Use jdk17 by default -->
669+
<java.version>17</java.version>
670+
<maven.compiler.source>${java.version}</maven.compiler.source>
671+
<maven.compiler.target>${java.version}</maven.compiler.target>
672+
</properties>
673+
<repositories>
674+
<repository>
675+
<id>apache-staging</id>
676+
<url>https://repository.apache.org/content/repositories/orgapachespark-1506/</url>
677+
</repository>
678+
</repositories>
679+
</profile>
680+
654681
<profile>
655682
<id>scala-2.12</id>
656683
</profile>

spark/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,19 @@ under the License.
256256
</dependency>
257257
</dependencies>
258258
</profile>
259+
260+
<profile>
261+
<id>spark-4.1</id>
262+
<dependencies>
263+
<dependency>
264+
<groupId>org.apache.iceberg</groupId>
265+
<!-- TODO: Upgrade after iceberg-spark-runtime-4.1_2.13 release -->
266+
<artifactId>iceberg-spark-runtime-4.0_${scala.binary.version}</artifactId>
267+
<version>1.10.0</version>
268+
<scope>test</scope>
269+
</dependency>
270+
</dependencies>
271+
</profile>
259272
</profiles>
260273

261274
<build>

spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
172172
.commitAllPartitions(ShuffleChecksumHelper.EMPTY_CHECKSUM_VALUE)
173173
.getPartitionLengths();
174174
mapStatus =
175-
MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
175+
MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId, 0);
176176
return;
177177
}
178178
final long openStartTime = System.nanoTime();
@@ -261,7 +261,8 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
261261

262262
// TODO: We probably can move checksum generation here when concatenating partition files
263263
partitionLengths = writePartitionedData(mapOutputWriter);
264-
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
264+
mapStatus =
265+
MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId, 0);
265266
} catch (Exception e) {
266267
try {
267268
mapOutputWriter.abort(e);

spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometUnsafeShuffleWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,8 @@ void closeAndWriteOutput() throws IOException {
288288
}
289289
}
290290
}
291-
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
291+
mapStatus =
292+
MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId, 0);
292293
}
293294

294295
@VisibleForTesting

spark/src/main/scala/org/apache/comet/serde/aggregates.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@ package org.apache.comet.serde
2222
import scala.jdk.CollectionConverters._
2323

2424
import org.apache.spark.sql.catalyst.expressions.{Attribute, EvalMode}
25-
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, BitAndAgg, BitOrAgg, BitXorAgg, BloomFilterAggregate, CentralMomentAgg, Corr, Count, Covariance, CovPopulation, CovSample, First, Last, Max, Min, StddevPop, StddevSamp, Sum, VariancePop, VarianceSamp}
25+
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, BitAndAgg, BitOrAgg, BitXorAgg, BloomFilterAggregate, CentralMomentAgg, Corr, Count, CovPopulation, CovSample, Covariance, First, Last, Max, Min, StddevPop, StddevSamp, Sum, VariancePop, VarianceSamp}
2626
import org.apache.spark.sql.internal.SQLConf
2727
import org.apache.spark.sql.types.{ByteType, DataTypes, DecimalType, IntegerType, LongType, ShortType, StringType}
2828

2929
import org.apache.comet.CometConf
3030
import org.apache.comet.CometConf.COMET_EXEC_STRICT_FLOATING_POINT
3131
import org.apache.comet.CometSparkSessionExtensions.withInfo
32-
import org.apache.comet.serde.QueryPlanSerde.{evalModeToProto, exprToProto, serializeDataType}
33-
import org.apache.comet.shims.CometEvalModeUtil
32+
import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType}
33+
import org.apache.comet.shims.CometExprShim
3434

3535
object CometMin extends CometAggregateExpressionSerde[Min] {
3636

@@ -211,10 +211,10 @@ object CometAverage extends CometAggregateExpressionSerde[Average] {
211211
}
212212
}
213213

214-
object CometSum extends CometAggregateExpressionSerde[Sum] {
214+
object CometSum extends CometAggregateExpressionSerde[Sum] with CometExprShim {
215215

216216
override def getSupportLevel(sum: Sum): SupportLevel = {
217-
sum.evalMode match {
217+
sparkEvalMode(sum) match {
218218
case EvalMode.ANSI if !sum.dataType.isInstanceOf[DecimalType] =>
219219
Incompatible(Some("ANSI mode for non decimal inputs is not supported"))
220220
case EvalMode.TRY if !sum.dataType.isInstanceOf[DecimalType] =>
@@ -243,7 +243,7 @@ object CometSum extends CometAggregateExpressionSerde[Sum] {
243243
val builder = ExprOuterClass.Sum.newBuilder()
244244
builder.setChild(childExpr.get)
245245
builder.setDatatype(dataType.get)
246-
builder.setEvalMode(evalModeToProto(CometEvalModeUtil.fromSparkEvalMode(sum.evalMode)))
246+
builder.setFailOnError(sparkEvalMode(sum) == EvalMode.ANSI)
247247

248248
Some(
249249
ExprOuterClass.AggExpr

spark/src/test/scala/org/apache/spark/sql/CometTPCDSQueryTestSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import java.nio.file.{Files, Paths}
2525
import scala.jdk.CollectionConverters._
2626

2727
import org.apache.spark.{SparkConf, SparkContext}
28-
import org.apache.spark.sql.catalyst.util.{fileToString, resourceToString, stringToFile}
28+
import org.apache.spark.sql.catalyst.util.{resourceToString, stringToFile}
2929
import org.apache.spark.sql.internal.SQLConf
3030
import org.apache.spark.sql.test.TestSparkSession
3131

@@ -118,7 +118,7 @@ class CometTPCDSQueryTestSuite extends QueryTest with TPCDSBase with CometSQLQue
118118

119119
// Read back the golden file.
120120
val (expectedSchema, expectedOutput) = {
121-
val goldenOutput = fileToString(goldenFile)
121+
val goldenOutput = Files.readString(goldenFile.toPath)
122122
val segments = goldenOutput.split("-- !query.*\n")
123123

124124
// query has 3 segments, plus the header

spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import scala.jdk.CollectionConverters._
2727
import org.apache.spark.{SparkConf, SparkContext}
2828
import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_SIZE}
2929
import org.apache.spark.sql.catalyst.TableIdentifier
30-
import org.apache.spark.sql.catalyst.util.{fileToString, resourceToString, stringToFile}
30+
import org.apache.spark.sql.catalyst.util.{resourceToString, stringToFile}
3131
import org.apache.spark.sql.internal.SQLConf
3232
import org.apache.spark.sql.test.TestSparkSession
3333

@@ -162,7 +162,7 @@ class CometTPCHQuerySuite extends QueryTest with TPCBase with ShimCometTPCHQuery
162162

163163
// Read back the golden file.
164164
val (expectedSchema, expectedOutput) = {
165-
val goldenOutput = fileToString(goldenFile)
165+
val goldenOutput = Files.readString(goldenFile.toPath)
166166
val segments = goldenOutput.split("-- !query.*\n")
167167

168168
// query has 3 segments, plus the header

0 commit comments

Comments
 (0)