Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ object CometConf extends ShimCometConf {
createExecEnabledConfig("window", defaultValue = true)
val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("takeOrderedAndProject", defaultValue = true)
val COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("localTableScan", defaultValue = false)

val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled")
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ These settings can be used to determine which parts of the plan are accelerated
| `spark.comet.exec.globalLimit.enabled` | Whether to enable globalLimit by default. | true |
| `spark.comet.exec.hashJoin.enabled` | Whether to enable hashJoin by default. | true |
| `spark.comet.exec.localLimit.enabled` | Whether to enable localLimit by default. | true |
| `spark.comet.exec.localTableScan.enabled` | Whether to enable localTableScan by default. | true |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| `spark.comet.exec.localTableScan.enabled` | Whether to enable localTableScan by default. | true |
| `spark.comet.exec.localTableScan.enabled` | Whether to enable localTableScan by default. | false |

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed

| `spark.comet.exec.project.enabled` | Whether to enable project by default. | true |
| `spark.comet.exec.sort.enabled` | Whether to enable sort by default. | true |
| `spark.comet.exec.sortMergeJoin.enabled` | Whether to enable sortMergeJoin by default. | true |
Expand Down
87 changes: 43 additions & 44 deletions spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,63 +78,49 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
/**
* Tries to transform a Spark physical plan into a Comet plan.
*
* This rule traverses bottom-up from the original Spark plan and for each plan node, there
* are a few cases to consider:
* This rule traverses bottom-up from the original Spark plan and for each plan node, there are
* a few cases to consider:
*
* 1. The child(ren) of the current node `p` cannot be converted to native
* In this case, we'll simply return the original Spark plan, since Comet native
* execution cannot start from an arbitrary Spark operator (unless it is special node
* such as scan or sink such as shuffle exchange, union etc., which are wrapped by
* `CometScanWrapper` and `CometSinkPlaceHolder` respectively).
* 1. The child(ren) of the current node `p` cannot be converted to native In this case, we'll
* simply return the original Spark plan, since Comet native execution cannot start from an
* arbitrary Spark operator (unless it is special node such as scan or sink such as shuffle
* exchange, union etc., which are wrapped by `CometScanWrapper` and `CometSinkPlaceHolder`
* respectively).
*
* 2. The child(ren) of the current node `p` can be converted to native
* There are two sub-cases for this scenario: 1) This node `p` can also be converted to
* native. In this case, we'll create a new native Comet operator for `p` and connect it with
* its previously converted child(ren); 2) This node `p` cannot be converted to native. In
* this case, similar to 1) above, we simply return `p` as it is. Its child(ren) would still
* be native Comet operators.
* 2. The child(ren) of the current node `p` can be converted to native There are two sub-cases
* for this scenario: 1) This node `p` can also be converted to native. In this case, we'll
* create a new native Comet operator for `p` and connect it with its previously converted
* child(ren); 2) This node `p` cannot be converted to native. In this case, similar to 1)
* above, we simply return `p` as it is. Its child(ren) would still be native Comet operators.
*
* After this rule finishes, we'll do another pass on the final plan to convert all adjacent
* Comet native operators into a single native execution block. Please see where
* `convertBlock` is called below.
* Comet native operators into a single native execution block. Please see where `convertBlock`
* is called below.
*
* Here are a few examples:
*
* Scan ======> CometScan
* | |
* Filter CometFilter
* | |
* HashAggregate CometHashAggregate
* | |
* Exchange CometExchange
* | |
* HashAggregate CometHashAggregate
* | |
* UnsupportedOperator UnsupportedOperator
* Scan ======> CometScan
* \| | Filter CometFilter
* \| | HashAggregate CometHashAggregate
* \| | Exchange CometExchange
* \| | HashAggregate CometHashAggregate
* \| | UnsupportedOperator UnsupportedOperator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these formatting changes intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, reversed.

*
* Native execution doesn't necessarily have to start from `CometScan`:
*
* Scan =======> CometScan
* | |
* UnsupportedOperator UnsupportedOperator
* | |
* HashAggregate HashAggregate
* | |
* Exchange CometExchange
* | |
* HashAggregate CometHashAggregate
* | |
* UnsupportedOperator UnsupportedOperator
* Scan =======> CometScan
* \| | UnsupportedOperator UnsupportedOperator
* \| | HashAggregate HashAggregate
* \| | Exchange CometExchange
* \| | HashAggregate CometHashAggregate
* \| | UnsupportedOperator UnsupportedOperator
*
* A sink can also be Comet operators other than `CometExchange`, for instance `CometUnion`:
*
* Scan Scan =======> CometScan CometScan
* | | | |
* Filter Filter CometFilter CometFilter
* | | | |
* Union CometUnion
* | |
* Project CometProject
* Scan Scan =======> CometScan CometScan
* \| | | | Filter Filter CometFilter CometFilter
* \| | | | Union CometUnion
* \| | Project CometProject
*/
// spotless:on
private def transform(plan: SparkPlan): SparkPlan = {
Expand Down Expand Up @@ -534,6 +520,19 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
s
}

case op: LocalTableScanExec =>
if (CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.get(conf)) {
QueryPlanSerde
.operator2Proto(op)
.map { nativeOp =>
val cometOp = CometLocalTableScanExec(op, op.rows, op.output)
CometScanWrapper(nativeOp, cometOp)
}
.getOrElse(op)
} else {
withInfo(op, "LocalTableScan is not enabled")
}

case op =>
op match {
case _: CometPlan | _: AQEShuffleReadExec | _: BroadcastExchangeExec |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithIn
import org.apache.comet.serde.Types.{DataType => ProtoDataType}
import org.apache.comet.serde.Types.DataType._
import org.apache.comet.serde.literals.CometLiteral
import org.apache.comet.serde.operator.{CometLocalTableScan, CometProject, CometSort, CometSortOrder}
import org.apache.comet.shims.CometExprShim

/**
Expand All @@ -66,7 +67,10 @@ object QueryPlanSerde extends Logging with CometExprShim {
* Mapping of Spark operator class to Comet operator handler.
*/
private val opSerdeMap: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
Map(classOf[ProjectExec] -> CometProject, classOf[SortExec] -> CometSort)
Map(
classOf[ProjectExec] -> CometProject,
classOf[SortExec] -> CometSort,
classOf[LocalTableScanExec] -> CometLocalTableScan)

private val arrayExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
classOf[ArrayAppend] -> CometArrayAppend,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.serde.operator

import scala.jdk.CollectionConverters._

import org.apache.spark.sql.execution.LocalTableScanExec

import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde.serializeDataType

object CometLocalTableScan extends CometOperatorSerde[LocalTableScanExec] {

override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(
CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED)

override def convert(
op: LocalTableScanExec,
builder: Operator.Builder,
childOp: Operator*): Option[Operator] = {
val scanTypes = op.output.flatten(attr => serializeDataType(attr.dataType))
val scanBuilder = OperatorOuterClass.Scan
.newBuilder()
.setSource(op.getClass.getSimpleName)
.addAllFields(scanTypes.asJava)
.setArrowFfiSafe(false)
Some(builder.setScan(scanBuilder).build())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
* under the License.
*/

package org.apache.comet.serde
package org.apache.comet.serde.operator

import scala.jdk.CollectionConverters._

import org.apache.spark.sql.execution.ProjectExec

import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde.exprToProto

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.comet.serde
package org.apache.comet.serde.operator

import scala.jdk.CollectionConverters._

Expand All @@ -27,6 +27,7 @@ import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, MapType, Stru

import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.{CometExpressionSerde, CometOperatorSerde, Compatible, ExprOuterClass, Incompatible, OperatorOuterClass, SupportLevel}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde.{exprToProto, exprToProtoInternal, supportedSortType}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.comet

import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
import org.apache.spark.sql.comet.CometLocalTableScanExec.createMetricsIterator
import org.apache.spark.sql.comet.execution.arrow.CometArrowConverters
import org.apache.spark.sql.execution.{LeafExecNode, LocalTableScanExec}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.vectorized.ColumnarBatch

import com.google.common.base.Objects

import org.apache.comet.CometConf

case class CometLocalTableScanExec(
originalPlan: LocalTableScanExec,
@transient rows: Seq[InternalRow],
override val output: Seq[Attribute])
extends CometExec
with LeafExecNode {

override lazy val metrics: Map[String, SQLMetric] = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

@transient private lazy val unsafeRows: Array[InternalRow] = {
if (rows.isEmpty) {
Array.empty
} else {
val proj = UnsafeProjection.create(output, output)
rows.map(r => proj(r).copy()).toArray
}
}

@transient private lazy val rdd: RDD[InternalRow] = {
if (rows.isEmpty) {
sparkContext.emptyRDD
} else {
val numSlices = math.min(unsafeRows.length, session.leafNodeDefaultParallelism)
sparkContext.parallelize(unsafeRows, numSlices)
}
}

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numInputRows = longMetric("numOutputRows")
val maxRecordsPerBatch = CometConf.COMET_BATCH_SIZE.get(conf)
val timeZoneId = conf.sessionLocalTimeZone
rdd.mapPartitionsInternal { sparkBatches =>
val context = TaskContext.get()
val batches = CometArrowConverters.rowToArrowBatchIter(
sparkBatches,
originalPlan.schema,
maxRecordsPerBatch,
timeZoneId,
context)
createMetricsIterator(batches, numInputRows)
}
}

override protected def stringArgs: Iterator[Any] = {
if (rows.isEmpty) {
Iterator("<empty>", output)
} else {
Iterator(output)
}
}

override def supportsColumnar: Boolean = true

override def equals(obj: Any): Boolean = {
obj match {
case other: CometLocalTableScanExec =>
this.originalPlan == other.originalPlan &&
this.schema == other.schema &&
this.output == other.output
case _ =>
false
}
}

override def hashCode(): Int = Objects.hashCode(originalPlan, originalPlan.schema, output)
}

object CometLocalTableScanExec {

private def createMetricsIterator(
it: Iterator[ColumnarBatch],
numInputRows: SQLMetric): Iterator[ColumnarBatch] = {
new Iterator[ColumnarBatch] {
override def hasNext: Boolean = it.hasNext

override def next(): ColumnarBatch = {
val batch = it.next()
numInputRows.add(batch.numRows())
batch
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ abstract class CometNativeExec extends CometExec {
_: ShuffleQueryStageExec | _: AQEShuffleReadExec | _: CometShuffleExchangeExec |
_: CometUnionExec | _: CometTakeOrderedAndProjectExec | _: CometCoalesceExec |
_: ReusedExchangeExec | _: CometBroadcastExchangeExec | _: BroadcastQueryStageExec |
_: CometSparkToColumnarExec =>
_: CometSparkToColumnarExec | _: CometLocalTableScanExec =>
func(plan)
case _: CometPlan =>
// Other Comet operators, continue to traverse the tree.
Expand Down
24 changes: 24 additions & 0 deletions spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2277,6 +2277,30 @@ class CometExecSuite extends CometTestBase {
}
}

test("LocalTableScanExec spark fallback") {
withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "false") {
val df = Seq.range(0, 10).toDF("id")
checkSparkAnswerAndFallbackReason(df, "LocalTableScan is not enabled")
}
}

test("LocalTableScanExec with filter") {
withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") {
val df = Seq.range(0, 10).toDF("id").filter(col("id") > 5)
checkSparkAnswerAndOperator(df)
}
}

test("LocalTableScanExec with groupBy") {
withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") {
val df = (Seq.range(0, 10) ++ Seq.range(0, 20))
.toDF("id")
.groupBy(col("id"))
.agg(count("*"))
checkSparkAnswerAndOperator(df)
}
}

}

case class BucketedTableTestSpec(
Expand Down
Loading