Skip to content

Commit ddf788c

Browse files
authored
chore: Refactor CometExecRule handling of sink operators (#2771)
1 parent 8769b9e commit ddf788c

File tree

266 files changed

+1281
-1491
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

266 files changed

+1281
-1491
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -914,7 +914,7 @@ private class TypedConfigBuilder[T](
914914
}
915915
}
916916

917-
private[comet] abstract class ConfigEntry[T](
917+
abstract class ConfigEntry[T](
918918
val key: String,
919919
val valueConverter: String => T,
920920
val stringConverter: T => String,

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ These settings can be used to determine which parts of the plan are accelerated
282282
| `spark.comet.expression.SparkPartitionID.enabled` | Enable Comet acceleration for `SparkPartitionID` | true |
283283
| `spark.comet.expression.Sqrt.enabled` | Enable Comet acceleration for `Sqrt` | true |
284284
| `spark.comet.expression.StartsWith.enabled` | Enable Comet acceleration for `StartsWith` | true |
285+
| `spark.comet.expression.StaticInvoke.enabled` | Enable Comet acceleration for `StaticInvoke` | true |
285286
| `spark.comet.expression.StringInstr.enabled` | Enable Comet acceleration for `StringInstr` | true |
286287
| `spark.comet.expression.StringLPad.enabled` | Enable Comet acceleration for `StringLPad` | true |
287288
| `spark.comet.expression.StringRPad.enabled` | Enable Comet acceleration for `StringRPad` | true |

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ object CometSparkSessionExtensions extends Logging {
141141
// 1. `COMET_EXEC_SHUFFLE_ENABLED` is true
142142
// 2. `spark.shuffle.manager` is set to `CometShuffleManager`
143143
// 3. Off-heap memory is enabled || Spark/Comet unit testing
144-
private[comet] def isCometShuffleEnabled(conf: SQLConf): Boolean =
144+
def isCometShuffleEnabled(conf: SQLConf): Boolean =
145145
COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf)
146146

147147
def isCometShuffleManagerEnabled(conf: SQLConf): Boolean = {

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 29 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.apache.comet.rules
2121

22-
import scala.collection.mutable.ListBuffer
2322
import scala.jdk.CollectionConverters._
2423

2524
import org.apache.spark.sql.SparkSession
@@ -44,7 +43,7 @@ import org.apache.spark.sql.types._
4443
import org.apache.comet.{CometConf, ExtendedExplainInfo}
4544
import org.apache.comet.CometConf.COMET_EXEC_SHUFFLE_ENABLED
4645
import org.apache.comet.CometSparkSessionExtensions._
47-
import org.apache.comet.rules.CometExecRule.cometNativeExecHandlers
46+
import org.apache.comet.rules.CometExecRule.allExecs
4847
import org.apache.comet.serde.{CometOperatorSerde, Compatible, Incompatible, OperatorOuterClass, QueryPlanSerde, Unsupported}
4948
import org.apache.comet.serde.OperatorOuterClass.Operator
5049
import org.apache.comet.serde.QueryPlanSerde.{serializeDataType, supportedDataType}
@@ -53,23 +52,35 @@ import org.apache.comet.serde.operator._
5352
object CometExecRule {
5453

5554
/**
56-
* Mapping of Spark operator class to Comet operator handler.
55+
* Fully native operators.
5756
*/
58-
val cometNativeExecHandlers: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
57+
val nativeExecs: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
5958
Map(
60-
classOf[ProjectExec] -> CometProject,
61-
classOf[FilterExec] -> CometFilter,
62-
classOf[LocalLimitExec] -> CometLocalLimit,
63-
classOf[GlobalLimitExec] -> CometGlobalLimit,
64-
classOf[ExpandExec] -> CometExpand,
65-
classOf[HashAggregateExec] -> CometHashAggregate,
66-
classOf[ObjectHashAggregateExec] -> CometObjectHashAggregate,
67-
classOf[BroadcastHashJoinExec] -> CometBroadcastHashJoin,
68-
classOf[ShuffledHashJoinExec] -> CometShuffleHashJoin,
69-
classOf[SortMergeJoinExec] -> CometSortMergeJoin,
70-
classOf[SortExec] -> CometSort,
71-
classOf[LocalTableScanExec] -> CometLocalTableScan,
72-
classOf[WindowExec] -> CometWindow)
59+
classOf[ProjectExec] -> CometProjectExec,
60+
classOf[FilterExec] -> CometFilterExec,
61+
classOf[LocalLimitExec] -> CometLocalLimitExec,
62+
classOf[GlobalLimitExec] -> CometGlobalLimitExec,
63+
classOf[ExpandExec] -> CometExpandExec,
64+
classOf[HashAggregateExec] -> CometHashAggregateExec,
65+
classOf[ObjectHashAggregateExec] -> CometObjectHashAggregateExec,
66+
classOf[BroadcastHashJoinExec] -> CometBroadcastHashJoinExec,
67+
classOf[ShuffledHashJoinExec] -> CometHashJoinExec,
68+
classOf[SortMergeJoinExec] -> CometSortMergeJoinExec,
69+
classOf[SortExec] -> CometSortExec,
70+
classOf[LocalTableScanExec] -> CometLocalTableScanExec,
71+
classOf[WindowExec] -> CometWindowExec)
72+
73+
/**
74+
* Sinks that have a native plan of ScanExec.
75+
*/
76+
val sinks: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
77+
Map(
78+
classOf[CoalesceExec] -> CometCoalesceExec,
79+
classOf[CollectLimitExec] -> CometCollectLimitExec,
80+
classOf[TakeOrderedAndProjectExec] -> CometTakeOrderedAndProjectExec,
81+
classOf[UnionExec] -> CometUnionExec)
82+
83+
val allExecs: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] = nativeExecs ++ sinks
7384

7485
}
7586

@@ -195,91 +206,6 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
195206
val nativeOp = operator2Proto(cometOp)
196207
CometScanWrapper(nativeOp.get, cometOp)
197208

198-
case op: CollectLimitExec =>
199-
val fallbackReasons = new ListBuffer[String]()
200-
if (!CometConf.COMET_EXEC_COLLECT_LIMIT_ENABLED.get(conf)) {
201-
fallbackReasons += s"${CometConf.COMET_EXEC_COLLECT_LIMIT_ENABLED.key} is false"
202-
}
203-
if (!isCometShuffleEnabled(conf)) {
204-
fallbackReasons += "Comet shuffle is not enabled"
205-
}
206-
if (fallbackReasons.nonEmpty) {
207-
withInfos(op, fallbackReasons.toSet)
208-
} else {
209-
if (!isCometNative(op.child)) {
210-
// no reason to report reason if child is not native
211-
op
212-
} else {
213-
operator2Proto(op)
214-
.map { nativeOp =>
215-
val cometOp =
216-
CometCollectLimitExec(op, op.limit, op.offset, op.child)
217-
CometSinkPlaceHolder(nativeOp, op, cometOp)
218-
}
219-
.getOrElse(op)
220-
}
221-
}
222-
223-
case c @ CoalesceExec(numPartitions, child)
224-
if CometConf.COMET_EXEC_COALESCE_ENABLED.get(conf)
225-
&& isCometNative(child) =>
226-
operator2Proto(c)
227-
.map { nativeOp =>
228-
val cometOp = CometCoalesceExec(c, c.output, numPartitions, child)
229-
CometSinkPlaceHolder(nativeOp, c, cometOp)
230-
}
231-
.getOrElse(c)
232-
233-
case c @ CoalesceExec(_, _) if !CometConf.COMET_EXEC_COALESCE_ENABLED.get(conf) =>
234-
withInfo(c, "Coalesce is not enabled")
235-
236-
case op: CoalesceExec if !op.children.forall(isCometNative) =>
237-
op
238-
239-
case s: TakeOrderedAndProjectExec
240-
if isCometNative(s.child) && CometConf.COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED
241-
.get(conf)
242-
&& isCometShuffleEnabled(conf) &&
243-
CometTakeOrderedAndProjectExec.isSupported(s) =>
244-
operator2Proto(s)
245-
.map { nativeOp =>
246-
val cometOp =
247-
CometTakeOrderedAndProjectExec(
248-
s,
249-
s.output,
250-
s.limit,
251-
s.offset,
252-
s.sortOrder,
253-
s.projectList,
254-
s.child)
255-
CometSinkPlaceHolder(nativeOp, s, cometOp)
256-
}
257-
.getOrElse(s)
258-
259-
case s: TakeOrderedAndProjectExec =>
260-
val info1 = createMessage(
261-
!CometConf.COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED.get(conf),
262-
"TakeOrderedAndProject is not enabled")
263-
val info2 = createMessage(
264-
!isCometShuffleEnabled(conf),
265-
"TakeOrderedAndProject requires shuffle to be enabled")
266-
withInfo(s, Seq(info1, info2).flatten.mkString(","))
267-
268-
case u: UnionExec
269-
if CometConf.COMET_EXEC_UNION_ENABLED.get(conf) &&
270-
u.children.forall(isCometNative) =>
271-
newPlanWithProto(
272-
u, {
273-
val cometOp = CometUnionExec(u, u.output, u.children)
274-
CometSinkPlaceHolder(_, u, cometOp)
275-
})
276-
277-
case u: UnionExec if !CometConf.COMET_EXEC_UNION_ENABLED.get(conf) =>
278-
withInfo(u, "Union is not enabled")
279-
280-
case op: UnionExec if !op.children.forall(isCometNative) =>
281-
op
282-
283209
// For AQE broadcast stage on a Comet broadcast exchange
284210
case s @ BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _) =>
285211
newPlanWithProto(s, CometSinkPlaceHolder(_, s, s))
@@ -386,8 +312,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
386312
}
387313

388314
case op =>
389-
// check if this is a fully native operator
390-
cometNativeExecHandlers
315+
allExecs
391316
.get(op.getClass)
392317
.map(_.asInstanceOf[CometOperatorSerde[SparkPlan]]) match {
393318
case Some(handler) =>
@@ -1005,10 +930,6 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
1005930
case s if isCometScan(s) => true
1006931
case _: CometSparkToColumnarExec => true
1007932
case _: CometSinkPlaceHolder => true
1008-
case _: CoalesceExec => true
1009-
case _: CollectLimitExec => true
1010-
case _: UnionExec => true
1011-
case _: TakeOrderedAndProjectExec => true
1012933
case _ => false
1013934
}
1014935
}

spark/src/main/scala/org/apache/comet/serde/operator/CometSort.scala renamed to spark/src/main/scala/org/apache/comet/serde/CometSortOrder.scala

Lines changed: 4 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,14 @@
1717
* under the License.
1818
*/
1919

20-
package org.apache.comet.serde.operator
21-
22-
import scala.jdk.CollectionConverters._
20+
package org.apache.comet.serde
2321

2422
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Descending, NullsFirst, NullsLast, SortOrder}
25-
import org.apache.spark.sql.comet.{CometNativeExec, CometSortExec, SerializedPlan}
26-
import org.apache.spark.sql.execution.SortExec
27-
import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, MapType, StructType}
23+
import org.apache.spark.sql.types._
2824

29-
import org.apache.comet.{CometConf, ConfigEntry}
25+
import org.apache.comet.CometConf
3026
import org.apache.comet.CometSparkSessionExtensions.withInfo
31-
import org.apache.comet.serde.{CometExpressionSerde, CometOperatorSerde, Compatible, ExprOuterClass, Incompatible, OperatorOuterClass, SupportLevel}
32-
import org.apache.comet.serde.OperatorOuterClass.Operator
33-
import org.apache.comet.serde.QueryPlanSerde.{exprToProto, exprToProtoInternal, supportedSortType}
27+
import org.apache.comet.serde.QueryPlanSerde.exprToProtoInternal
3428

3529
object CometSortOrder extends CometExpressionSerde[SortOrder] {
3630

@@ -87,42 +81,3 @@ object CometSortOrder extends CometExpressionSerde[SortOrder] {
8781
}
8882
}
8983
}
90-
91-
object CometSort extends CometOperatorSerde[SortExec] {
92-
93-
override def enabledConfig: Option[ConfigEntry[Boolean]] =
94-
Some(CometConf.COMET_EXEC_SORT_ENABLED)
95-
96-
override def convert(
97-
op: SortExec,
98-
builder: Operator.Builder,
99-
childOp: Operator*): Option[OperatorOuterClass.Operator] = {
100-
if (!supportedSortType(op, op.sortOrder)) {
101-
withInfo(op, "Unsupported data type in sort expressions")
102-
return None
103-
}
104-
105-
val sortOrders = op.sortOrder.map(exprToProto(_, op.child.output))
106-
107-
if (sortOrders.forall(_.isDefined) && childOp.nonEmpty) {
108-
val sortBuilder = OperatorOuterClass.Sort
109-
.newBuilder()
110-
.addAllSortOrders(sortOrders.map(_.get).asJava)
111-
Some(builder.setSort(sortBuilder).build())
112-
} else {
113-
withInfo(op, "sort order not supported", op.sortOrder: _*)
114-
None
115-
}
116-
}
117-
118-
override def createExec(nativeOp: Operator, op: SortExec): CometNativeExec = {
119-
CometSortExec(
120-
nativeOp,
121-
op,
122-
op.output,
123-
op.outputOrdering,
124-
op.sortOrder,
125-
op.child,
126-
SerializedPlan(None))
127-
}
128-
}

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import org.apache.comet.serde.ExprOuterClass.{AggExpr, Expr, ScalarFunc}
3939
import org.apache.comet.serde.Types.{DataType => ProtoDataType}
4040
import org.apache.comet.serde.Types.DataType._
4141
import org.apache.comet.serde.literals.CometLiteral
42-
import org.apache.comet.serde.operator._
4342
import org.apache.comet.shims.CometExprShim
4443

4544
/**

0 commit comments

Comments
 (0)