1919
2020package org .apache .comet .rules
2121
22+ import scala .collection .mutable .ListBuffer
23+
2224import org .apache .spark .sql .SparkSession
2325import org .apache .spark .sql .catalyst .expressions .{Divide , DoubleLiteral , EqualNullSafe , EqualTo , Expression , FloatLiteral , GreaterThan , GreaterThanOrEqual , KnownFloatingPointNormalized , LessThan , LessThanOrEqual , NamedExpression , Remainder }
2426import org .apache .spark .sql .catalyst .expressions .aggregate .{Final , Partial }
2527import org .apache .spark .sql .catalyst .optimizer .NormalizeNaNAndZero
2628import org .apache .spark .sql .catalyst .rules .Rule
27- import org .apache .spark .sql .comet .{ CometBroadcastExchangeExec , CometBroadcastHashJoinExec , CometCoalesceExec , CometCollectLimitExec , CometExec , CometExpandExec , CometFilterExec , CometGlobalLimitExec , CometHashAggregateExec , CometHashJoinExec , CometLocalLimitExec , CometNativeExec , CometNativeScanExec , CometPlan , CometProjectExec , CometScanExec , CometScanWrapper , CometSinkPlaceHolder , CometSortExec , CometSortMergeJoinExec , CometSparkToColumnarExec , CometTakeOrderedAndProjectExec , CometUnionExec , CometWindowExec , SerializedPlan }
29+ import org .apache .spark .sql .comet ._
2830import org .apache .spark .sql .comet .execution .shuffle .{CometColumnarShuffle , CometNativeShuffle , CometShuffleExchangeExec }
29- import org .apache .spark .sql .execution .{ CoalesceExec , CollectLimitExec , ExpandExec , FilterExec , GlobalLimitExec , LocalLimitExec , ProjectExec , SortExec , SparkPlan , TakeOrderedAndProjectExec , UnionExec }
31+ import org .apache .spark .sql .execution ._
3032import org .apache .spark .sql .execution .adaptive .{AQEShuffleReadExec , BroadcastQueryStageExec , ShuffleQueryStageExec }
3133import org .apache .spark .sql .execution .aggregate .{BaseAggregateExec , HashAggregateExec , ObjectHashAggregateExec }
3234import org .apache .spark .sql .execution .exchange .{BroadcastExchangeExec , ReusedExchangeExec , ShuffleExchangeExec }
@@ -36,7 +38,7 @@ import org.apache.spark.sql.types.{DoubleType, FloatType}
3638
3739import org .apache .comet .{CometConf , ExtendedExplainInfo }
3840import org .apache .comet .CometConf .COMET_ANSI_MODE_ENABLED
39- import org .apache .comet .CometSparkSessionExtensions .{ createMessage , getCometBroadcastNotEnabledReason , getCometShuffleNotEnabledReason , isANSIEnabled , isCometBroadCastForceEnabled , isCometExecEnabled , isCometJVMShuffleMode , isCometLoaded , isCometNativeShuffleMode , isCometScan , isCometShuffleEnabled , isSpark40Plus , shouldApplySparkToColumnar , withInfo }
41+ import org .apache .comet .CometSparkSessionExtensions ._
4042import org .apache .comet .serde .OperatorOuterClass .Operator
4143import org .apache .comet .serde .QueryPlanSerde
4244
@@ -201,18 +203,34 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
201203 op,
202204 CometGlobalLimitExec (_, op, op.limit, op.child, SerializedPlan (None )))
203205
204- case op : CollectLimitExec
205- if isCometNative(op.child) && CometConf .COMET_EXEC_COLLECT_LIMIT_ENABLED .get(conf)
206- && isCometShuffleEnabled(conf)
207- && op.offset == 0 =>
208- QueryPlanSerde
209- .operator2Proto(op)
210- .map { nativeOp =>
211- val cometOp =
212- CometCollectLimitExec (op, op.limit, op.offset, op.child)
213- CometSinkPlaceHolder (nativeOp, op, cometOp)
206+ case op : CollectLimitExec =>
207+ val fallbackReasons = new ListBuffer [String ]()
208+ if (! CometConf .COMET_EXEC_COLLECT_LIMIT_ENABLED .get(conf)) {
209+ fallbackReasons += s " ${CometConf .COMET_EXEC_COLLECT_LIMIT_ENABLED .key} is false "
210+ }
211+ if (! isCometShuffleEnabled(conf)) {
212+ fallbackReasons += " Comet shuffle is not enabled"
213+ }
214+ if (op.offset != 0 ) {
215+ fallbackReasons += " CollectLimit with non-zero offset is not supported"
216+ }
217+ if (fallbackReasons.nonEmpty) {
218+ withInfos(op, fallbackReasons.toSet)
219+ } else {
220+ if (! isCometNative(op.child)) {
221+ // no reason to report reason if child is not native
222+ op
223+ } else {
224+ QueryPlanSerde
225+ .operator2Proto(op)
226+ .map { nativeOp =>
227+ val cometOp =
228+ CometCollectLimitExec (op, op.limit, op.offset, op.child)
229+ CometSinkPlaceHolder (nativeOp, op, cometOp)
230+ }
231+ .getOrElse(op)
214232 }
215- .getOrElse(op)
233+ }
216234
217235 case op : ExpandExec =>
218236 newPlanWithProto(
0 commit comments