Skip to content

Commit fb37d9a

Browse files
kazantsev-maksimKazantsev Maksim
andauthored
chore: Refactor static invoke exprs (apache#2671)
* Refactor static invoke exprs * Fix fallback message for rpad/lpad binary * Resolve conflicts --------- Co-authored-by: Kazantsev Maksim <[email protected]>
1 parent 8886c89 commit fb37d9a

File tree

3 files changed

+59
-27
lines changed

3 files changed

+59
-27
lines changed

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.expressions._
2626
import org.apache.spark.sql.catalyst.expressions.aggregate._
2727
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
2828
import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
29-
import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils
3029
import org.apache.spark.sql.comet._
3130
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
3231
import org.apache.spark.sql.execution
@@ -243,7 +242,8 @@ object QueryPlanSerde extends Logging with CometExprShim {
243242
classOf[Literal] -> CometLiteral,
244243
classOf[MonotonicallyIncreasingID] -> CometMonotonicallyIncreasingId,
245244
classOf[SparkPartitionID] -> CometSparkPartitionId,
246-
classOf[SortOrder] -> CometSortOrder)
245+
classOf[SortOrder] -> CometSortOrder,
246+
classOf[StaticInvoke] -> CometStaticInvoke)
247247

248248
/**
249249
* Mapping of Spark expression class to Comet expression handler.
@@ -711,30 +711,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
711711
// `PromotePrecision` is just a wrapper, don't need to serialize it.
712712
exprToProtoInternal(child, inputs, binding)
713713

714-
// With Spark 3.4, CharVarcharCodegenUtils.readSidePadding gets called to pad spaces for
715-
// char types.
716-
// See https://github.com/apache/spark/pull/38151
717-
case s: StaticInvoke
718-
// classOf gets ther runtime class of T, which lets us compare directly
719-
// Otherwise isInstanceOf[Class[T]] will always evaluate to true for Class[_]
720-
if s.staticObject == classOf[CharVarcharCodegenUtils] &&
721-
s.dataType.isInstanceOf[StringType] &&
722-
s.functionName == "readSidePadding" &&
723-
s.arguments.size == 2 &&
724-
s.propagateNull &&
725-
!s.returnNullable &&
726-
s.isDeterministic =>
727-
val argsExpr = Seq(
728-
exprToProtoInternal(Cast(s.arguments(0), StringType), inputs, binding),
729-
exprToProtoInternal(s.arguments(1), inputs, binding))
730-
731-
if (argsExpr.forall(_.isDefined)) {
732-
scalarFunctionExprToProto("read_side_padding", argsExpr: _*)
733-
} else {
734-
withInfo(expr, s.arguments: _*)
735-
None
736-
}
737-
738714
case KnownFloatingPointNormalized(NormalizeNaNAndZero(expr)) =>
739715
val dataType = serializeDataType(expr.dataType)
740716
if (dataType.isEmpty) {
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.serde
21+
22+
import org.apache.spark.sql.catalyst.expressions.Attribute
23+
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
24+
import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils
25+
26+
import org.apache.comet.CometSparkSessionExtensions.withInfo
27+
28+
object CometStaticInvoke extends CometExpressionSerde[StaticInvoke] {
29+
30+
// With Spark 3.4, CharVarcharCodegenUtils.readSidePadding gets called to pad spaces for
31+
// char types.
32+
// See https://github.com/apache/spark/pull/38151
33+
private val staticInvokeExpressions
34+
: Map[(String, Class[_]), CometExpressionSerde[StaticInvoke]] =
35+
Map(
36+
("readSidePadding", classOf[CharVarcharCodegenUtils]) -> CometScalarFunction(
37+
"read_side_padding"))
38+
39+
override def convert(
40+
expr: StaticInvoke,
41+
inputs: Seq[Attribute],
42+
binding: Boolean): Option[ExprOuterClass.Expr] = {
43+
staticInvokeExpressions.get((expr.functionName, expr.staticObject)) match {
44+
case Some(handler) =>
45+
handler.convert(expr, inputs, binding)
46+
case None =>
47+
withInfo(
48+
expr,
49+
s"Static invoke expression: ${expr.functionName} is not supported",
50+
expr.children: _*)
51+
None
52+
}
53+
}
54+
}

spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,9 @@ class CometStringExpressionSuite extends CometTestBase {
139139
} else {
140140
// Comet will fall back to Spark because the plan contains a staticinvoke instruction
141141
// which is not supported
142-
checkSparkAnswerAndFallbackReason(sql, "staticinvoke is not supported")
142+
checkSparkAnswerAndFallbackReason(
143+
sql,
144+
s"Static invoke expression: $expr is not supported")
143145
}
144146
}
145147
}

0 commit comments

Comments
 (0)