Skip to content

Commit 31cf1ca

Browse files
committed
build: Add spark-4.1 profile and shims
1 parent 1ec3563 commit 31cf1ca

22 files changed

+951
-0
lines changed

.github/workflows/pr_build_linux.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ jobs:
9898
java_version: "17"
9999
maven_opts: "-Pspark-4.0"
100100
scan_impl: "native_comet"
101+
102+
- name: "Spark 4.1, JDK 17"
103+
java_version: "17"
104+
# skip building tests till iceberg-spark-runtime-4.1_2.13 is released
105+
maven_opts: "-Pspark-4.1 -Dmaven.test.skip=true"
106+
scan_impl: "native_comet"
101107
suite:
102108
- name: "fuzz"
103109
value: |
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.shims
21+
22+
import org.apache.spark.sql.internal.types.StringTypeWithCollation
23+
import org.apache.spark.sql.types.DataType
24+
25+
trait CometTypeShim {
26+
def isStringCollationType(dt: DataType): Boolean = dt.isInstanceOf[StringTypeWithCollation]
27+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.shims
21+
22+
import org.apache.spark.paths.SparkPath
23+
import org.apache.spark.sql.catalyst.InternalRow
24+
import org.apache.spark.sql.execution.datasources.PartitionedFile
25+
26+
object ShimBatchReader {
27+
def newPartitionedFile(partitionValues: InternalRow, file: String): PartitionedFile =
28+
PartitionedFile(
29+
partitionValues,
30+
SparkPath.fromUrlString(file),
31+
-1, // -1 means we read the entire file
32+
-1,
33+
Array.empty[String],
34+
0,
35+
0)
36+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.shims
21+
22+
trait ShimCometConf {
23+
protected val COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT = true
24+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.shims
21+
22+
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
23+
import org.apache.spark.sql.execution.datasources.parquet.ParquetRowIndexUtil
24+
import org.apache.spark.sql.types.StructType
25+
26+
object ShimFileFormat {
27+
// A name for a temporary column that holds row indexes computed by the file format reader
28+
// until they can be placed in the _metadata struct.
29+
val ROW_INDEX_TEMPORARY_COLUMN_NAME = ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
30+
31+
def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int =
32+
ParquetRowIndexUtil.findRowIndexColumnIndexInSchema(sparkSchema)
33+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.comet.shims
21+
22+
import org.apache.spark.executor.TaskMetrics
23+
import org.apache.spark.util.AccumulatorV2
24+
25+
object ShimTaskMetrics {
26+
27+
def getTaskAccumulator(taskMetrics: TaskMetrics): Option[AccumulatorV2[_, _]] =
28+
taskMetrics._externalAccums.lastOption
29+
}

pom.xml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,33 @@ under the License.
651651
</properties>
652652
</profile>
653653

654+
<profile>
655+
<!-- FIXME: this is WIP. Tests may fail https://github.com/apache/datafusion-comet/issues/551 -->
656+
<id>spark-4.1</id>
657+
<properties>
658+
<!-- Use Scala 2.13 by default -->
659+
<scala.version>2.13.17</scala.version>
660+
<scala.binary.version>2.13</scala.binary.version>
661+
<spark.version>4.1.0</spark.version>
662+
<spark.version.short>4.1</spark.version.short>
663+
<parquet.version>1.16.0</parquet.version>
664+
<semanticdb.version>4.13.9</semanticdb.version>
665+
<slf4j.version>2.0.17</slf4j.version>
666+
<shims.majorVerSrc>spark-4.1</shims.majorVerSrc>
667+
<shims.minorVerSrc>not-needed-yet</shims.minorVerSrc>
668+
<!-- Use jdk17 by default -->
669+
<java.version>17</java.version>
670+
<maven.compiler.source>${java.version}</maven.compiler.source>
671+
<maven.compiler.target>${java.version}</maven.compiler.target>
672+
</properties>
673+
<repositories>
674+
<repository>
675+
<id>apache-staging</id>
676+
<url>https://repository.apache.org/content/repositories/orgapachespark-1506/</url>
677+
</repository>
678+
</repositories>
679+
</profile>
680+
654681
<profile>
655682
<id>scala-2.12</id>
656683
</profile>
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.shims
21+
22+
import org.apache.spark.sql.catalyst.expressions._
23+
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
24+
import org.apache.spark.sql.internal.SQLConf
25+
import org.apache.spark.sql.internal.types.StringTypeWithCollation
26+
import org.apache.spark.sql.types.{BinaryType, BooleanType, DataTypes, StringType}
27+
28+
import org.apache.comet.CometSparkSessionExtensions.withInfo
29+
import org.apache.comet.expressions.{CometCast, CometEvalMode}
30+
import org.apache.comet.serde.{CommonStringExprs, Compatible, ExprOuterClass, Incompatible}
31+
import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr}
32+
import org.apache.comet.serde.QueryPlanSerde.exprToProtoInternal
33+
34+
/**
35+
* `CometExprShim` acts as a shim for parsing expressions from different Spark versions.
36+
*/
37+
trait CometExprShim extends CommonStringExprs {
38+
protected def evalMode(c: Cast): CometEvalMode.Value =
39+
CometEvalModeUtil.fromSparkEvalMode(c.evalMode)
40+
41+
protected def binaryOutputStyle: BinaryOutputStyle = {
42+
SQLConf.get
43+
.getConf(SQLConf.BINARY_OUTPUT_STYLE)
44+
.map(SQLConf.BinaryOutputStyle.withName) match {
45+
case Some(SQLConf.BinaryOutputStyle.UTF8) => BinaryOutputStyle.UTF8
46+
case Some(SQLConf.BinaryOutputStyle.BASIC) => BinaryOutputStyle.BASIC
47+
case Some(SQLConf.BinaryOutputStyle.BASE64) => BinaryOutputStyle.BASE64
48+
case Some(SQLConf.BinaryOutputStyle.HEX) => BinaryOutputStyle.HEX
49+
case _ => BinaryOutputStyle.HEX_DISCRETE
50+
}
51+
}
52+
53+
def versionSpecificExprToProtoInternal(
54+
expr: Expression,
55+
inputs: Seq[Attribute],
56+
binding: Boolean): Option[Expr] = {
57+
expr match {
58+
case s: StaticInvoke
59+
if s.staticObject == classOf[StringDecode] &&
60+
s.dataType.isInstanceOf[StringType] &&
61+
s.functionName == "decode" &&
62+
s.arguments.size == 4 &&
63+
s.inputTypes == Seq(
64+
BinaryType,
65+
StringTypeWithCollation(supportsTrimCollation = true),
66+
BooleanType,
67+
BooleanType) =>
68+
val Seq(bin, charset, _, _) = s.arguments
69+
stringDecode(expr, charset, bin, inputs, binding)
70+
71+
case expr @ ToPrettyString(child, timeZoneId) =>
72+
val castSupported = CometCast.isSupported(
73+
child.dataType,
74+
DataTypes.StringType,
75+
timeZoneId,
76+
CometEvalMode.TRY)
77+
78+
val isCastSupported = castSupported match {
79+
case Compatible(_) => true
80+
case Incompatible(_) => true
81+
case _ => false
82+
}
83+
84+
if (isCastSupported) {
85+
exprToProtoInternal(child, inputs, binding) match {
86+
case Some(p) =>
87+
val toPrettyString = ExprOuterClass.ToPrettyString
88+
.newBuilder()
89+
.setChild(p)
90+
.setTimezone(timeZoneId.getOrElse("UTC"))
91+
.setBinaryOutputStyle(binaryOutputStyle)
92+
.build()
93+
Some(
94+
ExprOuterClass.Expr
95+
.newBuilder()
96+
.setToPrettyString(toPrettyString)
97+
.build())
98+
case _ =>
99+
withInfo(expr, child)
100+
None
101+
}
102+
} else {
103+
None
104+
}
105+
106+
case _ => None
107+
}
108+
}
109+
}
110+
111+
object CometEvalModeUtil {
112+
def fromSparkEvalMode(evalMode: EvalMode.Value): CometEvalMode.Value = evalMode match {
113+
case EvalMode.LEGACY => CometEvalMode.LEGACY
114+
case EvalMode.TRY => CometEvalMode.TRY
115+
case EvalMode.ANSI => CometEvalMode.ANSI
116+
}
117+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.shims
21+
22+
import org.apache.spark.SparkContext
23+
import org.apache.spark.network.util.JavaUtils
24+
import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
25+
import org.apache.spark.sql.internal.SQLConf
26+
27+
import org.apache.comet.shims.ShimCometBroadcastExchangeExec.SPARK_MAX_BROADCAST_TABLE_SIZE
28+
29+
trait ShimCometBroadcastExchangeExec {
30+
31+
def setJobGroupOrTag(sc: SparkContext, broadcastExchange: BroadcastExchangeLike): Unit = {
32+
// Setup a job tag here so later it may get cancelled by tag if necessary.
33+
sc.addJobTag(broadcastExchange.jobTag)
34+
sc.setInterruptOnCancel(true)
35+
}
36+
37+
def cancelJobGroup(sc: SparkContext, broadcastExchange: BroadcastExchangeLike): Unit = {
38+
sc.cancelJobsWithTag(broadcastExchange.jobTag)
39+
}
40+
41+
def maxBroadcastTableBytes(conf: SQLConf): Long = {
42+
JavaUtils.byteStringAsBytes(conf.getConfString(SPARK_MAX_BROADCAST_TABLE_SIZE, "8GB"))
43+
}
44+
45+
}
46+
47+
object ShimCometBroadcastExchangeExec {
48+
val SPARK_MAX_BROADCAST_TABLE_SIZE = "spark.sql.maxBroadcastTableSize"
49+
}

0 commit comments

Comments
 (0)