Skip to content

Commit 528822a

Browse files
authored
chore: Improve expression fallback reporting (#2240)
1 parent d93931f commit 528822a

File tree

7 files changed

+95
-37
lines changed

7 files changed

+95
-37
lines changed

spark/src/main/scala/org/apache/comet/GenerateDocs.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ import scala.collection.mutable.ListBuffer
2525

2626
import org.apache.spark.sql.catalyst.expressions.Cast
2727

28-
import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible, Incompatible}
28+
import org.apache.comet.expressions.{CometCast, CometEvalMode}
29+
import org.apache.comet.serde.{Compatible, Incompatible}
2930

3031
/**
3132
* Utility for generating markdown documentation from the configs.

spark/src/main/scala/org/apache/comet/expressions/CometCast.scala

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,7 @@ package org.apache.comet.expressions
2121

2222
import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, NullType, StructType}
2323

24-
sealed trait SupportLevel
25-
26-
/** We support this feature with full compatibility with Spark */
27-
case class Compatible(notes: Option[String] = None) extends SupportLevel
28-
29-
/** We support this feature but results can be different from Spark */
30-
case class Incompatible(notes: Option[String] = None) extends SupportLevel
31-
32-
/** We do not support this feature */
33-
object Unsupported extends SupportLevel
24+
import org.apache.comet.serde.{Compatible, Incompatible, SupportLevel, Unsupported}
3425

3526
object CometCast {
3627

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -643,14 +643,31 @@ object QueryPlanSerde extends Logging with CometExprShim {
643643
SQLConf.get
644644

645645
def convert[T <: Expression](expr: T, handler: CometExpressionSerde[T]): Option[Expr] = {
646-
handler match {
647-
case _: IncompatExpr if !CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get() =>
648-
withInfo(
649-
expr,
650-
s"$expr is not fully compatible with Spark. To enable it anyway, set " +
651-
s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. ${CometConf.COMPAT_GUIDE}.")
646+
handler.getSupportLevel(expr) match {
647+
case Unsupported =>
648+
withInfo(expr, s"$expr is not supported.")
652649
None
653-
case _ =>
650+
case Incompatible(notes) =>
651+
if (CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) {
652+
if (notes.isDefined) {
653+
logWarning(
654+
s"Comet supports $expr when ${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true " +
655+
s"but has notes: ${notes.get}")
656+
}
657+
handler.convert(expr, inputs, binding)
658+
} else {
659+
val optionalNotes = notes.map(str => s" ($str)").getOrElse("")
660+
withInfo(
661+
expr,
662+
s"$expr is not fully compatible with Spark$optionalNotes. To enable it anyway, " +
663+
s"set ${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. " +
664+
s"${CometConf.COMPAT_GUIDE}.")
665+
None
666+
}
667+
case Compatible(notes) =>
668+
if (notes.isDefined) {
669+
logWarning(s"Comet supports $expr but has notes: ${notes.get}")
670+
}
654671
handler.convert(expr, inputs, binding)
655672
}
656673
}
@@ -2349,6 +2366,17 @@ object QueryPlanSerde extends Logging with CometExprShim {
23492366
}
23502367
}
23512368

2369+
sealed trait SupportLevel
2370+
2371+
/** We support this feature with full compatibility with Spark */
2372+
case class Compatible(notes: Option[String] = None) extends SupportLevel
2373+
2374+
/** We support this feature but results can be different from Spark */
2375+
case class Incompatible(notes: Option[String] = None) extends SupportLevel
2376+
2377+
/** We do not support this feature */
2378+
object Unsupported extends SupportLevel
2379+
23522380
/**
23532381
* Trait for providing serialization logic for operators.
23542382
*/
@@ -2386,6 +2414,16 @@ trait CometOperatorSerde[T <: SparkPlan] {
23862414
*/
23872415
trait CometExpressionSerde[T <: Expression] {
23882416

2417+
/**
2418+
* Determine the support level of the expression based on its attributes.
2419+
*
2420+
* @param expr
2421+
* The Spark expression.
2422+
* @return
2423+
* Support level (Compatible, Incompatible, or Unsupported).
2424+
*/
2425+
def getSupportLevel(expr: T): SupportLevel = Compatible(None)
2426+
23892427
/**
23902428
* Convert a Spark expression into a protocol buffer representation that can be passed into
23912429
* native code.
@@ -2436,9 +2474,6 @@ trait CometAggregateExpressionSerde {
24362474
conf: SQLConf): Option[ExprOuterClass.AggExpr]
24372475
}
24382476

2439-
/** Marker trait for an expression that is not guaranteed to be 100% compatible with Spark */
2440-
trait IncompatExpr {}
2441-
24422477
/** Serde for scalar function. */
24432478
case class CometScalarFunction[T <: Expression](name: String) extends CometExpressionSerde[T] {
24442479
override def convert(expr: T, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = {

spark/src/main/scala/org/apache/comet/serde/arrays.scala

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,10 @@ object CometArrayRemove extends CometExpressionSerde[ArrayRemove] with CometExpr
9393
}
9494
}
9595

96-
object CometArrayAppend extends CometExpressionSerde[ArrayAppend] with IncompatExpr {
96+
object CometArrayAppend extends CometExpressionSerde[ArrayAppend] {
97+
98+
override def getSupportLevel(expr: ArrayAppend): SupportLevel = Incompatible(None)
99+
97100
override def convert(
98101
expr: ArrayAppend,
99102
inputs: Seq[Attribute],
@@ -149,7 +152,10 @@ object CometArrayContains extends CometExpressionSerde[ArrayContains] {
149152
}
150153
}
151154

152-
object CometArrayDistinct extends CometExpressionSerde[ArrayDistinct] with IncompatExpr {
155+
object CometArrayDistinct extends CometExpressionSerde[ArrayDistinct] {
156+
157+
override def getSupportLevel(expr: ArrayDistinct): SupportLevel = Incompatible(None)
158+
153159
override def convert(
154160
expr: ArrayDistinct,
155161
inputs: Seq[Attribute],
@@ -162,7 +168,10 @@ object CometArrayDistinct extends CometExpressionSerde[ArrayDistinct] with Incom
162168
}
163169
}
164170

165-
object CometArrayIntersect extends CometExpressionSerde[ArrayIntersect] with IncompatExpr {
171+
object CometArrayIntersect extends CometExpressionSerde[ArrayIntersect] {
172+
173+
override def getSupportLevel(expr: ArrayIntersect): SupportLevel = Incompatible(None)
174+
166175
override def convert(
167176
expr: ArrayIntersect,
168177
inputs: Seq[Attribute],
@@ -201,7 +210,10 @@ object CometArrayMin extends CometExpressionSerde[ArrayMin] {
201210
}
202211
}
203212

204-
object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] with IncompatExpr {
213+
object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] {
214+
215+
override def getSupportLevel(expr: ArraysOverlap): SupportLevel = Incompatible(None)
216+
205217
override def convert(
206218
expr: ArraysOverlap,
207219
inputs: Seq[Attribute],
@@ -218,7 +230,10 @@ object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] with Incom
218230
}
219231
}
220232

221-
object CometArrayRepeat extends CometExpressionSerde[ArrayRepeat] with IncompatExpr {
233+
object CometArrayRepeat extends CometExpressionSerde[ArrayRepeat] {
234+
235+
override def getSupportLevel(expr: ArrayRepeat): SupportLevel = Incompatible(None)
236+
222237
override def convert(
223238
expr: ArrayRepeat,
224239
inputs: Seq[Attribute],
@@ -232,7 +247,10 @@ object CometArrayRepeat extends CometExpressionSerde[ArrayRepeat] with IncompatE
232247
}
233248
}
234249

235-
object CometArrayCompact extends CometExpressionSerde[Expression] with IncompatExpr {
250+
object CometArrayCompact extends CometExpressionSerde[Expression] {
251+
252+
override def getSupportLevel(expr: Expression): SupportLevel = Incompatible(None)
253+
236254
override def convert(
237255
expr: Expression,
238256
inputs: Seq[Attribute],
@@ -252,10 +270,9 @@ object CometArrayCompact extends CometExpressionSerde[Expression] with IncompatE
252270
}
253271
}
254272

255-
object CometArrayExcept
256-
extends CometExpressionSerde[ArrayExcept]
257-
with CometExprShim
258-
with IncompatExpr {
273+
object CometArrayExcept extends CometExpressionSerde[ArrayExcept] with CometExprShim {
274+
275+
override def getSupportLevel(expr: ArrayExcept): SupportLevel = Incompatible(None)
259276

260277
@tailrec
261278
def isTypeSupported(dt: DataType): Boolean = {
@@ -292,7 +309,10 @@ object CometArrayExcept
292309
}
293310
}
294311

295-
object CometArrayJoin extends CometExpressionSerde[ArrayJoin] with IncompatExpr {
312+
object CometArrayJoin extends CometExpressionSerde[ArrayJoin] {
313+
314+
override def getSupportLevel(expr: ArrayJoin): SupportLevel = Incompatible(None)
315+
296316
override def convert(
297317
expr: ArrayJoin,
298318
inputs: Seq[Attribute],
@@ -326,7 +346,10 @@ object CometArrayJoin extends CometExpressionSerde[ArrayJoin] with IncompatExpr
326346
}
327347
}
328348

329-
object CometArrayInsert extends CometExpressionSerde[ArrayInsert] with IncompatExpr {
349+
object CometArrayInsert extends CometExpressionSerde[ArrayInsert] {
350+
351+
override def getSupportLevel(expr: ArrayInsert): SupportLevel = Incompatible(None)
352+
330353
override def convert(
331354
expr: ArrayInsert,
332355
inputs: Seq[Attribute],
@@ -361,7 +384,10 @@ object CometArrayInsert extends CometExpressionSerde[ArrayInsert] with IncompatE
361384
}
362385
}
363386

364-
object CometArrayUnion extends CometExpressionSerde[ArrayUnion] with IncompatExpr {
387+
object CometArrayUnion extends CometExpressionSerde[ArrayUnion] {
388+
389+
override def getSupportLevel(expr: ArrayUnion): SupportLevel = Incompatible(None)
390+
365391
override def convert(
366392
expr: ArrayUnion,
367393
inputs: Seq[Attribute],

spark/src/main/scala/org/apache/comet/serde/unixtime.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithIn
2727

2828
// TODO: DataFusion supports only -8334601211038 <= sec <= 8210266876799
2929
// https://github.com/apache/datafusion/issues/16594
30-
object CometFromUnixTime extends CometExpressionSerde[FromUnixTime] with IncompatExpr {
30+
object CometFromUnixTime extends CometExpressionSerde[FromUnixTime] {
31+
32+
override def getSupportLevel(expr: FromUnixTime): SupportLevel = Incompatible(None)
33+
3134
override def convert(
3235
expr: FromUnixTime,
3336
inputs: Seq[Attribute],

spark/src/test/scala/org/apache/comet/CometCastSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ import org.apache.spark.sql.internal.SQLConf
3333
import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType, StructField, StructType}
3434

3535
import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus
36-
import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible}
36+
import org.apache.comet.expressions.{CometCast, CometEvalMode}
37+
import org.apache.comet.serde.Compatible
3738

3839
class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
3940

spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
package org.apache.spark.sql
2121

2222
import org.apache.comet.CometConf
23-
import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible}
23+
import org.apache.comet.expressions.{CometCast, CometEvalMode}
24+
import org.apache.comet.serde.Compatible
2425
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator}
2526
import org.apache.commons.io.FileUtils
2627
import org.apache.spark.sql.catalyst.TableIdentifier

0 commit comments

Comments
 (0)