Skip to content

Commit 4d3ffe5

Browse files
committed
Merge branch 'main' into iceberg-rust
2 parents 7c0a99b + 82df9e7 commit 4d3ffe5

File tree

452 files changed

+36868
-40435
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

452 files changed

+36868
-40435
lines changed

docs/source/user-guide/latest/configs.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ These settings can be used to determine which parts of the plan are accelerated
215215
| `spark.comet.expression.BitwiseNot.enabled` | Enable Comet acceleration for `BitwiseNot` | true |
216216
| `spark.comet.expression.BitwiseOr.enabled` | Enable Comet acceleration for `BitwiseOr` | true |
217217
| `spark.comet.expression.BitwiseXor.enabled` | Enable Comet acceleration for `BitwiseXor` | true |
218+
| `spark.comet.expression.BloomFilterMightContain.enabled` | Enable Comet acceleration for `BloomFilterMightContain` | true |
218219
| `spark.comet.expression.CaseWhen.enabled` | Enable Comet acceleration for `CaseWhen` | true |
219220
| `spark.comet.expression.Cast.enabled` | Enable Comet acceleration for `Cast` | true |
220221
| `spark.comet.expression.Ceil.enabled` | Enable Comet acceleration for `Ceil` | true |
@@ -259,6 +260,7 @@ These settings can be used to determine which parts of the plan are accelerated
259260
| `spark.comet.expression.IsNaN.enabled` | Enable Comet acceleration for `IsNaN` | true |
260261
| `spark.comet.expression.IsNotNull.enabled` | Enable Comet acceleration for `IsNotNull` | true |
261262
| `spark.comet.expression.IsNull.enabled` | Enable Comet acceleration for `IsNull` | true |
263+
| `spark.comet.expression.KnownFloatingPointNormalized.enabled` | Enable Comet acceleration for `KnownFloatingPointNormalized` | true |
262264
| `spark.comet.expression.Length.enabled` | Enable Comet acceleration for `Length` | true |
263265
| `spark.comet.expression.LessThan.enabled` | Enable Comet acceleration for `LessThan` | true |
264266
| `spark.comet.expression.LessThanOrEqual.enabled` | Enable Comet acceleration for `LessThanOrEqual` | true |
@@ -268,6 +270,7 @@ These settings can be used to determine which parts of the plan are accelerated
268270
| `spark.comet.expression.Log10.enabled` | Enable Comet acceleration for `Log10` | true |
269271
| `spark.comet.expression.Log2.enabled` | Enable Comet acceleration for `Log2` | true |
270272
| `spark.comet.expression.Lower.enabled` | Enable Comet acceleration for `Lower` | true |
273+
| `spark.comet.expression.MakeDecimal.enabled` | Enable Comet acceleration for `MakeDecimal` | true |
271274
| `spark.comet.expression.MapEntries.enabled` | Enable Comet acceleration for `MapEntries` | true |
272275
| `spark.comet.expression.MapFromArrays.enabled` | Enable Comet acceleration for `MapFromArrays` | true |
273276
| `spark.comet.expression.MapKeys.enabled` | Enable Comet acceleration for `MapKeys` | true |
@@ -290,6 +293,7 @@ These settings can be used to determine which parts of the plan are accelerated
290293
| `spark.comet.expression.Remainder.enabled` | Enable Comet acceleration for `Remainder` | true |
291294
| `spark.comet.expression.Reverse.enabled` | Enable Comet acceleration for `Reverse` | true |
292295
| `spark.comet.expression.Round.enabled` | Enable Comet acceleration for `Round` | true |
296+
| `spark.comet.expression.ScalarSubquery.enabled` | Enable Comet acceleration for `ScalarSubquery` | true |
293297
| `spark.comet.expression.Second.enabled` | Enable Comet acceleration for `Second` | true |
294298
| `spark.comet.expression.Sha1.enabled` | Enable Comet acceleration for `Sha1` | true |
295299
| `spark.comet.expression.Sha2.enabled` | Enable Comet acceleration for `Sha2` | true |
@@ -321,6 +325,7 @@ These settings can be used to determine which parts of the plan are accelerated
321325
| `spark.comet.expression.TruncTimestamp.enabled` | Enable Comet acceleration for `TruncTimestamp` | true |
322326
| `spark.comet.expression.UnaryMinus.enabled` | Enable Comet acceleration for `UnaryMinus` | true |
323327
| `spark.comet.expression.Unhex.enabled` | Enable Comet acceleration for `Unhex` | true |
328+
| `spark.comet.expression.UnscaledValue.enabled` | Enable Comet acceleration for `UnscaledValue` | true |
324329
| `spark.comet.expression.Upper.enabled` | Enable Comet acceleration for `Upper` | true |
325330
| `spark.comet.expression.WeekDay.enabled` | Enable Comet acceleration for `WeekDay` | true |
326331
| `spark.comet.expression.WeekOfYear.enabled` | Enable Comet acceleration for `WeekOfYear` | true |

docs/source/user-guide/latest/expressions.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ Expressions that are not Spark-compatible will fall back to Spark by default and
125125
| BRound | `bround` | Yes | |
126126
| Ceil | `ceil` | Yes | |
127127
| Cos | `cos` | Yes | |
128+
| Cosh | `cosh` | Yes | |
128129
| Cot | `cot` | Yes | |
129130
| Divide | `/` | Yes | |
130131
| Exp | `exp` | Yes | |
@@ -144,9 +145,11 @@ Expressions that are not Spark-compatible will fall back to Spark by default and
144145
| Round | `round` | Yes | |
145146
| Signum | `signum` | Yes | |
146147
| Sin | `sin` | Yes | |
148+
| Sinh | `sinh` | Yes | |
147149
| Sqrt | `sqrt` | Yes | |
148150
| Subtract | `-` | Yes | |
149151
| Tan | `tan` | Yes | |
152+
| Tanh | `tanh` | Yes | |
150153
| TryAdd | `try_add` | Yes | Only integer inputs are supported |
151154
| TryDivide | `try_div` | Yes | Only integer inputs are supported |
152155
| TryMultiply | `try_mul` | Yes | Only integer inputs are supported |

fuzz-testing/src/main/scala/org/apache/comet/fuzz/Meta.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ object Meta {
9898
createUnaryNumericFunction("atan"),
9999
createFunctionWithInputTypes("atan2", Seq(SparkNumericType, SparkNumericType)),
100100
createUnaryNumericFunction("cos"),
101+
createUnaryNumericFunction("cosh"),
101102
createUnaryNumericFunction("exp"),
102103
createUnaryNumericFunction("expm1"),
103104
createFunctionWithInputTypes("log", Seq(SparkNumericType, SparkNumericType)),
@@ -112,8 +113,10 @@ object Meta {
112113
FunctionSignature(Seq(SparkNumericType, SparkIntType)))),
113114
createUnaryNumericFunction("signum"),
114115
createUnaryNumericFunction("sin"),
116+
createUnaryNumericFunction("sinh"),
115117
createUnaryNumericFunction("sqrt"),
116118
createUnaryNumericFunction("tan"),
119+
createUnaryNumericFunction("tanh"),
117120
createUnaryNumericFunction("cot"),
118121
createUnaryNumericFunction("ceil"),
119122
createUnaryNumericFunction("floor"),
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.serde
21+
22+
import org.apache.spark.sql.catalyst.expressions.{Attribute, BloomFilterMightContain}
23+
24+
import org.apache.comet.CometSparkSessionExtensions.withInfo
25+
import org.apache.comet.serde.QueryPlanSerde.exprToProtoInternal
26+
27+
object CometBloomFilterMightContain extends CometExpressionSerde[BloomFilterMightContain] {
28+
29+
override def convert(
30+
expr: BloomFilterMightContain,
31+
inputs: Seq[Attribute],
32+
binding: Boolean): Option[ExprOuterClass.Expr] = {
33+
34+
val bloomFilter = expr.left
35+
val value = expr.right
36+
val bloomFilterExpr = exprToProtoInternal(bloomFilter, inputs, binding)
37+
val valueExpr = exprToProtoInternal(value, inputs, binding)
38+
if (bloomFilterExpr.isDefined && valueExpr.isDefined) {
39+
val builder = ExprOuterClass.BloomFilterMightContain.newBuilder()
40+
builder.setBloomFilter(bloomFilterExpr.get)
41+
builder.setValue(valueExpr.get)
42+
Some(
43+
ExprOuterClass.Expr
44+
.newBuilder()
45+
.setBloomFilterMightContain(builder)
46+
.build())
47+
} else {
48+
withInfo(expr, bloomFilter, value)
49+
None
50+
}
51+
}
52+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.serde
21+
22+
import org.apache.spark.sql.catalyst.expressions.Attribute
23+
import org.apache.spark.sql.execution.ScalarSubquery
24+
25+
import org.apache.comet.CometSparkSessionExtensions.withInfo
26+
import org.apache.comet.serde.QueryPlanSerde.{serializeDataType, supportedDataType}
27+
28+
object CometScalarSubquery extends CometExpressionSerde[ScalarSubquery] {
29+
override def convert(
30+
expr: ScalarSubquery,
31+
inputs: Seq[Attribute],
32+
binding: Boolean): Option[ExprOuterClass.Expr] = {
33+
if (supportedDataType(expr.dataType)) {
34+
val dataType = serializeDataType(expr.dataType)
35+
if (dataType.isEmpty) {
36+
withInfo(expr, s"Failed to serialize datatype ${expr.dataType} for scalar subquery")
37+
return None
38+
}
39+
40+
val builder = ExprOuterClass.Subquery
41+
.newBuilder()
42+
.setId(expr.exprId.id)
43+
.setDatatype(dataType.get)
44+
Some(ExprOuterClass.Expr.newBuilder().setSubquery(builder).build())
45+
} else {
46+
withInfo(expr, s"Unsupported data type: ${expr.dataType}")
47+
None
48+
}
49+
50+
}
51+
}

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 12 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,8 @@ import org.apache.spark.internal.Logging
2525
import org.apache.spark.sql.catalyst.expressions._
2626
import org.apache.spark.sql.catalyst.expressions.aggregate._
2727
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
28-
import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
29-
import org.apache.spark.sql.comet._
30-
import org.apache.spark.sql.execution
31-
import org.apache.spark.sql.execution._
28+
import org.apache.spark.sql.comet.DecimalPrecision
29+
import org.apache.spark.sql.execution.{ScalarSubquery, SparkPlan}
3230
import org.apache.spark.sql.internal.SQLConf
3331
import org.apache.spark.sql.types._
3432

@@ -93,6 +91,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
9391
classOf[Atan2] -> CometAtan2,
9492
classOf[Ceil] -> CometCeil,
9593
classOf[Cos] -> CometScalarFunction("cos"),
94+
classOf[Cosh] -> CometScalarFunction("cosh"),
9695
classOf[Divide] -> CometDivide,
9796
classOf[Exp] -> CometScalarFunction("exp"),
9897
classOf[Expm1] -> CometScalarFunction("expm1"),
@@ -111,9 +110,11 @@ object QueryPlanSerde extends Logging with CometExprShim {
111110
classOf[Round] -> CometRound,
112111
classOf[Signum] -> CometScalarFunction("signum"),
113112
classOf[Sin] -> CometScalarFunction("sin"),
113+
classOf[Sinh] -> CometScalarFunction("sinh"),
114114
classOf[Sqrt] -> CometScalarFunction("sqrt"),
115115
classOf[Subtract] -> CometSubtract,
116116
classOf[Tan] -> CometScalarFunction("tan"),
117+
classOf[Tanh] -> CometScalarFunction("tanh"),
117118
classOf[Cot] -> CometScalarFunction("cot"),
118119
classOf[UnaryMinus] -> CometUnaryMinus,
119120
classOf[Unhex] -> CometUnhex,
@@ -153,7 +154,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
153154
classOf[Lower] -> CometLower,
154155
classOf[OctetLength] -> CometScalarFunction("octet_length"),
155156
classOf[RegExpReplace] -> CometRegExpReplace,
156-
classOf[Reverse] -> CometScalarFunction("reverse"),
157+
classOf[Reverse] -> CometReverse,
157158
classOf[RLike] -> CometRLike,
158159
classOf[StartsWith] -> CometScalarFunction("starts_with"),
159160
classOf[StringInstr] -> CometScalarFunction("instr"),
@@ -203,21 +204,20 @@ object QueryPlanSerde extends Logging with CometExprShim {
203204

204205
private val miscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
205206
// TODO PromotePrecision
206-
// TODO KnownFloatingPointNormalized
207-
// TODO ScalarSubquery
208-
// TODO UnscaledValue
209-
// TODO MakeDecimal
210-
// TODO BloomFilterMightContain
211-
// TODO RegExpReplace
212207
classOf[Alias] -> CometAlias,
213208
classOf[AttributeReference] -> CometAttributeReference,
209+
classOf[BloomFilterMightContain] -> CometBloomFilterMightContain,
214210
classOf[CheckOverflow] -> CometCheckOverflow,
215211
classOf[Coalesce] -> CometCoalesce,
212+
classOf[KnownFloatingPointNormalized] -> CometKnownFloatingPointNormalized,
216213
classOf[Literal] -> CometLiteral,
214+
classOf[MakeDecimal] -> CometMakeDecimal,
217215
classOf[MonotonicallyIncreasingID] -> CometMonotonicallyIncreasingId,
216+
classOf[ScalarSubquery] -> CometScalarSubquery,
218217
classOf[SparkPartitionID] -> CometSparkPartitionId,
219218
classOf[SortOrder] -> CometSortOrder,
220-
classOf[StaticInvoke] -> CometStaticInvoke)
219+
classOf[StaticInvoke] -> CometStaticInvoke,
220+
classOf[UnscaledValue] -> CometUnscaledValue)
221221

222222
/**
223223
* Mapping of Spark expression class to Comet expression handler.
@@ -541,74 +541,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
541541
// `PromotePrecision` is just a wrapper, don't need to serialize it.
542542
exprToProtoInternal(child, inputs, binding)
543543

544-
case KnownFloatingPointNormalized(NormalizeNaNAndZero(expr)) =>
545-
val dataType = serializeDataType(expr.dataType)
546-
if (dataType.isEmpty) {
547-
withInfo(expr, s"Unsupported datatype ${expr.dataType}")
548-
return None
549-
}
550-
val ex = exprToProtoInternal(expr, inputs, binding)
551-
ex.map { child =>
552-
val builder = ExprOuterClass.NormalizeNaNAndZero
553-
.newBuilder()
554-
.setChild(child)
555-
.setDatatype(dataType.get)
556-
ExprOuterClass.Expr.newBuilder().setNormalizeNanAndZero(builder).build()
557-
}
558-
559-
case s @ execution.ScalarSubquery(_, _) =>
560-
if (supportedDataType(s.dataType)) {
561-
val dataType = serializeDataType(s.dataType)
562-
if (dataType.isEmpty) {
563-
withInfo(s, s"Scalar subquery returns unsupported datatype ${s.dataType}")
564-
return None
565-
}
566-
567-
val builder = ExprOuterClass.Subquery
568-
.newBuilder()
569-
.setId(s.exprId.id)
570-
.setDatatype(dataType.get)
571-
Some(ExprOuterClass.Expr.newBuilder().setSubquery(builder).build())
572-
} else {
573-
withInfo(s, s"Unsupported data type: ${s.dataType}")
574-
None
575-
}
576-
577-
case UnscaledValue(child) =>
578-
val childExpr = exprToProtoInternal(child, inputs, binding)
579-
val optExpr =
580-
scalarFunctionExprToProtoWithReturnType("unscaled_value", LongType, false, childExpr)
581-
optExprWithInfo(optExpr, expr, child)
582-
583-
case MakeDecimal(child, precision, scale, true) =>
584-
val childExpr = exprToProtoInternal(child, inputs, binding)
585-
val optExpr = scalarFunctionExprToProtoWithReturnType(
586-
"make_decimal",
587-
DecimalType(precision, scale),
588-
false,
589-
childExpr)
590-
optExprWithInfo(optExpr, expr, child)
591-
592-
case b @ BloomFilterMightContain(_, _) =>
593-
val bloomFilter = b.left
594-
val value = b.right
595-
val bloomFilterExpr = exprToProtoInternal(bloomFilter, inputs, binding)
596-
val valueExpr = exprToProtoInternal(value, inputs, binding)
597-
if (bloomFilterExpr.isDefined && valueExpr.isDefined) {
598-
val builder = ExprOuterClass.BloomFilterMightContain.newBuilder()
599-
builder.setBloomFilter(bloomFilterExpr.get)
600-
builder.setValue(valueExpr.get)
601-
Some(
602-
ExprOuterClass.Expr
603-
.newBuilder()
604-
.setBloomFilterMightContain(builder)
605-
.build())
606-
} else {
607-
withInfo(expr, bloomFilter, value)
608-
None
609-
}
610-
case r @ Reverse(child) if child.dataType.isInstanceOf[ArrayType] =>
611-
convert(r, CometArrayReverse)
612544
case expr =>
613545
QueryPlanSerde.exprSerdeMap.get(expr.getClass) match {
614546
case Some(handler) =>
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.serde
21+
22+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Reverse}
23+
import org.apache.spark.sql.types.ArrayType
24+
25+
import org.apache.comet.serde.ExprOuterClass.Expr
26+
27+
object CometReverse extends CometScalarFunction[Reverse]("reverse") {
28+
29+
override def getSupportLevel(expr: Reverse): SupportLevel = {
30+
if (expr.child.dataType.isInstanceOf[ArrayType]) {
31+
CometArrayReverse.getSupportLevel(expr)
32+
} else {
33+
Compatible()
34+
}
35+
}
36+
37+
override def convert(expr: Reverse, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = {
38+
if (expr.child.dataType.isInstanceOf[ArrayType]) {
39+
CometArrayReverse.convert(expr, inputs, binding)
40+
} else {
41+
super.convert(expr, inputs, binding)
42+
}
43+
}
44+
}

0 commit comments

Comments
 (0)