diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index d48d149728..f28351c476 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -254,6 +254,8 @@ object CometConf extends ShimCometConf { createExecEnabledConfig("window", defaultValue = true) val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] = createExecEnabledConfig("takeOrderedAndProject", defaultValue = true) + val COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED: ConfigEntry[Boolean] = + createExecEnabledConfig("localTableScan", defaultValue = false) val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled") diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 537d0d7748..041ec7c173 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -149,6 +149,7 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.exec.globalLimit.enabled` | Whether to enable globalLimit by default. | true | | `spark.comet.exec.hashJoin.enabled` | Whether to enable hashJoin by default. | true | | `spark.comet.exec.localLimit.enabled` | Whether to enable localLimit by default. | true | +| `spark.comet.exec.localTableScan.enabled` | Whether to enable localTableScan by default. | false | | `spark.comet.exec.project.enabled` | Whether to enable project by default. | true | | `spark.comet.exec.sort.enabled` | Whether to enable sort by default. | true | | `spark.comet.exec.sortMergeJoin.enabled` | Whether to enable sortMergeJoin by default. | true | diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 47e21b24c2..4f41f33414 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -534,6 +534,19 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { s } + case op: LocalTableScanExec => + if (CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.get(conf)) { + QueryPlanSerde + .operator2Proto(op) + .map { nativeOp => + val cometOp = CometLocalTableScanExec(op, op.rows, op.output) + CometScanWrapper(nativeOp, cometOp) + } + .getOrElse(op) + } else { + withInfo(op, "LocalTableScan is not enabled") + } + case op => op match { case _: CometPlan | _: AQEShuffleReadExec | _: BroadcastExchangeExec | diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 570c07cb09..d6579b69d3 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -55,6 +55,7 @@ import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithIn import org.apache.comet.serde.Types.{DataType => ProtoDataType} import org.apache.comet.serde.Types.DataType._ import org.apache.comet.serde.literals.CometLiteral +import org.apache.comet.serde.operator.{CometLocalTableScan, CometProject, CometSort, CometSortOrder} import org.apache.comet.shims.CometExprShim /** @@ -66,7 +67,10 @@ object QueryPlanSerde extends Logging with CometExprShim { * Mapping of Spark operator class to Comet operator handler. */ private val opSerdeMap: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] = - Map(classOf[ProjectExec] -> CometProject, classOf[SortExec] -> CometSort) + Map( + classOf[ProjectExec] -> CometProject, + classOf[SortExec] -> CometSort, + classOf[LocalTableScanExec] -> CometLocalTableScan) private val arrayExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[ArrayAppend] -> CometArrayAppend, diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala new file mode 100644 index 0000000000..e3e8538cf6 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.serde.operator + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.sql.execution.LocalTableScanExec + +import org.apache.comet.{CometConf, ConfigEntry} +import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} +import org.apache.comet.serde.OperatorOuterClass.Operator +import org.apache.comet.serde.QueryPlanSerde.serializeDataType + +object CometLocalTableScan extends CometOperatorSerde[LocalTableScanExec] { + + override def enabledConfig: Option[ConfigEntry[Boolean]] = Some( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED) + + override def convert( + op: LocalTableScanExec, + builder: Operator.Builder, + childOp: Operator*): Option[Operator] = { + val scanTypes = op.output.flatten(attr => serializeDataType(attr.dataType)) + val scanBuilder = OperatorOuterClass.Scan + .newBuilder() + .setSource(op.getClass.getSimpleName) + .addAllFields(scanTypes.asJava) + .setArrowFfiSafe(false) + Some(builder.setScan(scanBuilder).build()) + } +} diff --git a/spark/src/main/scala/org/apache/comet/serde/CometProject.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometProject.scala similarity index 94% rename from spark/src/main/scala/org/apache/comet/serde/CometProject.scala rename to spark/src/main/scala/org/apache/comet/serde/operator/CometProject.scala index 651aa8fefd..4ba02945d6 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometProject.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometProject.scala @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.comet.serde +package org.apache.comet.serde.operator import scala.jdk.CollectionConverters._ @@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.ProjectExec import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde.exprToProto diff --git a/spark/src/main/scala/org/apache/comet/serde/CometSort.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometSort.scala similarity index 95% rename from spark/src/main/scala/org/apache/comet/serde/CometSort.scala rename to spark/src/main/scala/org/apache/comet/serde/operator/CometSort.scala index 4a1063458c..39a1c55656 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometSort.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometSort.scala @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.comet.serde +package org.apache.comet.serde.operator import scala.jdk.CollectionConverters._ @@ -27,6 +27,7 @@ import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, MapType, Stru import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.serde.{CometExpressionSerde, CometOperatorSerde, Compatible, ExprOuterClass, Incompatible, OperatorOuterClass, SupportLevel} import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde.{exprToProto, exprToProtoInternal, supportedSortType} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala new file mode 100644 index 0000000000..611d367f35 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet + +import org.apache.spark.TaskContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection} +import org.apache.spark.sql.comet.CometLocalTableScanExec.createMetricsIterator +import org.apache.spark.sql.comet.execution.arrow.CometArrowConverters +import org.apache.spark.sql.execution.{LeafExecNode, LocalTableScanExec} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.vectorized.ColumnarBatch + +import com.google.common.base.Objects + +import org.apache.comet.CometConf + +case class CometLocalTableScanExec( + originalPlan: LocalTableScanExec, + @transient rows: Seq[InternalRow], + override val output: Seq[Attribute]) + extends CometExec + with LeafExecNode { + + override lazy val metrics: Map[String, SQLMetric] = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + + @transient private lazy val unsafeRows: Array[InternalRow] = { + if (rows.isEmpty) { + Array.empty + } else { + val proj = UnsafeProjection.create(output, output) + rows.map(r => proj(r).copy()).toArray + } + } + + @transient private lazy val rdd: RDD[InternalRow] = { + if (rows.isEmpty) { + sparkContext.emptyRDD + } else { + val numSlices = math.min(unsafeRows.length, session.leafNodeDefaultParallelism) + sparkContext.parallelize(unsafeRows, numSlices) + } + } + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val numInputRows = longMetric("numOutputRows") + val maxRecordsPerBatch = CometConf.COMET_BATCH_SIZE.get(conf) + val timeZoneId = conf.sessionLocalTimeZone + rdd.mapPartitionsInternal { sparkBatches => + val context = TaskContext.get() + val batches = CometArrowConverters.rowToArrowBatchIter( + sparkBatches, + originalPlan.schema, + maxRecordsPerBatch, + timeZoneId, + context) + createMetricsIterator(batches, numInputRows) + } + } + + override protected def stringArgs: Iterator[Any] = { + if (rows.isEmpty) { + Iterator("", output) + } else { + Iterator(output) + } + } + + override def supportsColumnar: Boolean = true + + override def equals(obj: Any): Boolean = { + obj match { + case other: CometLocalTableScanExec => + this.originalPlan == other.originalPlan && + this.schema == other.schema && + this.output == other.output + case _ => + false + } + } + + override def hashCode(): Int = Objects.hashCode(originalPlan, originalPlan.schema, output) +} + +object CometLocalTableScanExec { + + private def createMetricsIterator( + it: Iterator[ColumnarBatch], + numInputRows: SQLMetric): Iterator[ColumnarBatch] = { + new Iterator[ColumnarBatch] { + override def hasNext: Boolean = it.hasNext + + override def next(): ColumnarBatch = { + val batch = it.next() + numInputRows.add(batch.numRows()) + batch + } + } + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index de6892638a..d7a743eb2d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -380,7 +380,7 @@ abstract class CometNativeExec extends CometExec { _: ShuffleQueryStageExec | _: AQEShuffleReadExec | _: CometShuffleExchangeExec | _: CometUnionExec | _: CometTakeOrderedAndProjectExec | _: CometCoalesceExec | _: ReusedExchangeExec | _: CometBroadcastExchangeExec | _: BroadcastQueryStageExec | - _: CometSparkToColumnarExec => + _: CometSparkToColumnarExec | _: CometLocalTableScanExec => func(plan) case _: CometPlan => // Other Comet operators, continue to traverse the tree. diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 56174c7fc0..e9fe3859a6 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -2277,6 +2277,30 @@ class CometExecSuite extends CometTestBase { } } + test("LocalTableScanExec spark fallback") { + withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "false") { + val df = Seq.range(0, 10).toDF("id") + checkSparkAnswerAndFallbackReason(df, "LocalTableScan is not enabled") + } + } + + test("LocalTableScanExec with filter") { + withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") { + val df = Seq.range(0, 10).toDF("id").filter(col("id") > 5) + checkSparkAnswerAndOperator(df) + } + } + + test("LocalTableScanExec with groupBy") { + withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") { + val df = (Seq.range(0, 10) ++ Seq.range(0, 20)) + .toDF("id") + .groupBy(col("id")) + .agg(count("*")) + checkSparkAnswerAndOperator(df) + } + } + } case class BucketedTableTestSpec(