Skip to content

Commit 9b91c25

Browse files
Feat: Add CometLocalTableScanExec operator (#2735)
1 parent 9bfb5c4 commit 9b91c25

File tree

10 files changed

+217
-4
lines changed

10 files changed

+217
-4
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,8 @@ object CometConf extends ShimCometConf {
251251
createExecEnabledConfig("window", defaultValue = true)
252252
val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] =
253253
createExecEnabledConfig("takeOrderedAndProject", defaultValue = true)
254+
val COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED: ConfigEntry[Boolean] =
255+
createExecEnabledConfig("localTableScan", defaultValue = false)
254256

255257
val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] =
256258
conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled")

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ These settings can be used to determine which parts of the plan are accelerated
149149
| `spark.comet.exec.globalLimit.enabled` | Whether to enable globalLimit by default. | true |
150150
| `spark.comet.exec.hashJoin.enabled` | Whether to enable hashJoin by default. | true |
151151
| `spark.comet.exec.localLimit.enabled` | Whether to enable localLimit by default. | true |
152+
| `spark.comet.exec.localTableScan.enabled` | Whether to enable localTableScan by default. | false |
152153
| `spark.comet.exec.project.enabled` | Whether to enable project by default. | true |
153154
| `spark.comet.exec.sort.enabled` | Whether to enable sort by default. | true |
154155
| `spark.comet.exec.sortMergeJoin.enabled` | Whether to enable sortMergeJoin by default. | true |

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,19 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
534534
s
535535
}
536536

537+
case op: LocalTableScanExec =>
538+
if (CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.get(conf)) {
539+
QueryPlanSerde
540+
.operator2Proto(op)
541+
.map { nativeOp =>
542+
val cometOp = CometLocalTableScanExec(op, op.rows, op.output)
543+
CometScanWrapper(nativeOp, cometOp)
544+
}
545+
.getOrElse(op)
546+
} else {
547+
withInfo(op, "LocalTableScan is not enabled")
548+
}
549+
537550
case op =>
538551
op match {
539552
case _: CometPlan | _: AQEShuffleReadExec | _: BroadcastExchangeExec |

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import org.apache.comet.serde.OperatorOuterClass.Operator
4747
import org.apache.comet.serde.Types.{DataType => ProtoDataType}
4848
import org.apache.comet.serde.Types.DataType._
4949
import org.apache.comet.serde.literals.CometLiteral
50+
import org.apache.comet.serde.operator.{CometLocalTableScan, CometProject, CometSort, CometSortOrder}
5051
import org.apache.comet.shims.CometExprShim
5152

5253
/**
@@ -58,7 +59,10 @@ object QueryPlanSerde extends Logging with CometExprShim {
5859
* Mapping of Spark operator class to Comet operator handler.
5960
*/
6061
private val opSerdeMap: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
61-
Map(classOf[ProjectExec] -> CometProject, classOf[SortExec] -> CometSort)
62+
Map(
63+
classOf[ProjectExec] -> CometProject,
64+
classOf[SortExec] -> CometSort,
65+
classOf[LocalTableScanExec] -> CometLocalTableScan)
6266

6367
private val arrayExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
6468
classOf[ArrayAppend] -> CometArrayAppend,
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.serde.operator
21+
22+
import scala.jdk.CollectionConverters._
23+
24+
import org.apache.spark.sql.execution.LocalTableScanExec
25+
26+
import org.apache.comet.{CometConf, ConfigEntry}
27+
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
28+
import org.apache.comet.serde.OperatorOuterClass.Operator
29+
import org.apache.comet.serde.QueryPlanSerde.serializeDataType
30+
31+
object CometLocalTableScan extends CometOperatorSerde[LocalTableScanExec] {
32+
33+
override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(
34+
CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED)
35+
36+
override def convert(
37+
op: LocalTableScanExec,
38+
builder: Operator.Builder,
39+
childOp: Operator*): Option[Operator] = {
40+
val scanTypes = op.output.flatten(attr => serializeDataType(attr.dataType))
41+
val scanBuilder = OperatorOuterClass.Scan
42+
.newBuilder()
43+
.setSource(op.getClass.getSimpleName)
44+
.addAllFields(scanTypes.asJava)
45+
.setArrowFfiSafe(false)
46+
Some(builder.setScan(scanBuilder).build())
47+
}
48+
}

spark/src/main/scala/org/apache/comet/serde/CometProject.scala renamed to spark/src/main/scala/org/apache/comet/serde/operator/CometProject.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,15 @@
1717
* under the License.
1818
*/
1919

20-
package org.apache.comet.serde
20+
package org.apache.comet.serde.operator
2121

2222
import scala.jdk.CollectionConverters._
2323

2424
import org.apache.spark.sql.execution.ProjectExec
2525

2626
import org.apache.comet.{CometConf, ConfigEntry}
2727
import org.apache.comet.CometSparkSessionExtensions.withInfo
28+
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
2829
import org.apache.comet.serde.OperatorOuterClass.Operator
2930
import org.apache.comet.serde.QueryPlanSerde.exprToProto
3031

spark/src/main/scala/org/apache/comet/serde/CometSort.scala renamed to spark/src/main/scala/org/apache/comet/serde/operator/CometSort.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* under the License.
1818
*/
1919

20-
package org.apache.comet.serde
20+
package org.apache.comet.serde.operator
2121

2222
import scala.jdk.CollectionConverters._
2323

@@ -27,6 +27,7 @@ import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, MapType, Stru
2727

2828
import org.apache.comet.{CometConf, ConfigEntry}
2929
import org.apache.comet.CometSparkSessionExtensions.withInfo
30+
import org.apache.comet.serde.{CometExpressionSerde, CometOperatorSerde, Compatible, ExprOuterClass, Incompatible, OperatorOuterClass, SupportLevel}
3031
import org.apache.comet.serde.OperatorOuterClass.Operator
3132
import org.apache.comet.serde.QueryPlanSerde.{exprToProto, exprToProtoInternal, supportedSortType}
3233

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.comet
21+
22+
import org.apache.spark.TaskContext
23+
import org.apache.spark.rdd.RDD
24+
import org.apache.spark.sql.catalyst.InternalRow
25+
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
26+
import org.apache.spark.sql.comet.CometLocalTableScanExec.createMetricsIterator
27+
import org.apache.spark.sql.comet.execution.arrow.CometArrowConverters
28+
import org.apache.spark.sql.execution.{LeafExecNode, LocalTableScanExec}
29+
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
30+
import org.apache.spark.sql.vectorized.ColumnarBatch
31+
32+
import com.google.common.base.Objects
33+
34+
import org.apache.comet.CometConf
35+
36+
case class CometLocalTableScanExec(
37+
originalPlan: LocalTableScanExec,
38+
@transient rows: Seq[InternalRow],
39+
override val output: Seq[Attribute])
40+
extends CometExec
41+
with LeafExecNode {
42+
43+
override lazy val metrics: Map[String, SQLMetric] = Map(
44+
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
45+
46+
@transient private lazy val unsafeRows: Array[InternalRow] = {
47+
if (rows.isEmpty) {
48+
Array.empty
49+
} else {
50+
val proj = UnsafeProjection.create(output, output)
51+
rows.map(r => proj(r).copy()).toArray
52+
}
53+
}
54+
55+
@transient private lazy val rdd: RDD[InternalRow] = {
56+
if (rows.isEmpty) {
57+
sparkContext.emptyRDD
58+
} else {
59+
val numSlices = math.min(unsafeRows.length, session.leafNodeDefaultParallelism)
60+
sparkContext.parallelize(unsafeRows, numSlices)
61+
}
62+
}
63+
64+
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
65+
val numInputRows = longMetric("numOutputRows")
66+
val maxRecordsPerBatch = CometConf.COMET_BATCH_SIZE.get(conf)
67+
val timeZoneId = conf.sessionLocalTimeZone
68+
rdd.mapPartitionsInternal { sparkBatches =>
69+
val context = TaskContext.get()
70+
val batches = CometArrowConverters.rowToArrowBatchIter(
71+
sparkBatches,
72+
originalPlan.schema,
73+
maxRecordsPerBatch,
74+
timeZoneId,
75+
context)
76+
createMetricsIterator(batches, numInputRows)
77+
}
78+
}
79+
80+
override protected def stringArgs: Iterator[Any] = {
81+
if (rows.isEmpty) {
82+
Iterator("<empty>", output)
83+
} else {
84+
Iterator(output)
85+
}
86+
}
87+
88+
override def supportsColumnar: Boolean = true
89+
90+
override def equals(obj: Any): Boolean = {
91+
obj match {
92+
case other: CometLocalTableScanExec =>
93+
this.originalPlan == other.originalPlan &&
94+
this.schema == other.schema &&
95+
this.output == other.output
96+
case _ =>
97+
false
98+
}
99+
}
100+
101+
override def hashCode(): Int = Objects.hashCode(originalPlan, originalPlan.schema, output)
102+
}
103+
104+
object CometLocalTableScanExec {
105+
106+
private def createMetricsIterator(
107+
it: Iterator[ColumnarBatch],
108+
numInputRows: SQLMetric): Iterator[ColumnarBatch] = {
109+
new Iterator[ColumnarBatch] {
110+
override def hasNext: Boolean = it.hasNext
111+
112+
override def next(): ColumnarBatch = {
113+
val batch = it.next()
114+
numInputRows.add(batch.numRows())
115+
batch
116+
}
117+
}
118+
}
119+
}

spark/src/main/scala/org/apache/spark/sql/comet/operators.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ abstract class CometNativeExec extends CometExec {
380380
_: ShuffleQueryStageExec | _: AQEShuffleReadExec | _: CometShuffleExchangeExec |
381381
_: CometUnionExec | _: CometTakeOrderedAndProjectExec | _: CometCoalesceExec |
382382
_: ReusedExchangeExec | _: CometBroadcastExchangeExec | _: BroadcastQueryStageExec |
383-
_: CometSparkToColumnarExec =>
383+
_: CometSparkToColumnarExec | _: CometLocalTableScanExec =>
384384
func(plan)
385385
case _: CometPlan =>
386386
// Other Comet operators, continue to traverse the tree.

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2105,6 +2105,30 @@ class CometExecSuite extends CometTestBase {
21052105
}
21062106
}
21072107

2108+
test("LocalTableScanExec spark fallback") {
2109+
withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "false") {
2110+
val df = Seq.range(0, 10).toDF("id")
2111+
checkSparkAnswerAndFallbackReason(df, "LocalTableScan is not enabled")
2112+
}
2113+
}
2114+
2115+
test("LocalTableScanExec with filter") {
2116+
withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") {
2117+
val df = Seq.range(0, 10).toDF("id").filter(col("id") > 5)
2118+
checkSparkAnswerAndOperator(df)
2119+
}
2120+
}
2121+
2122+
test("LocalTableScanExec with groupBy") {
2123+
withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") {
2124+
val df = (Seq.range(0, 10) ++ Seq.range(0, 20))
2125+
.toDF("id")
2126+
.groupBy(col("id"))
2127+
.agg(count("*"))
2128+
checkSparkAnswerAndOperator(df)
2129+
}
2130+
}
2131+
21082132
}
21092133

21102134
case class BucketedTableTestSpec(

0 commit comments

Comments
 (0)