Skip to content

Commit 8dff711

Browse files
revans2tgravescs
authored andcommitted
[SPARK-28213][SQL] Replace ColumnarBatchScan with equivilant from Columnar
## What changes were proposed in this pull request? This is a second part of the https://issues.apache.org/jira/browse/SPARK-27396 and a follow on to apache#24795 ## How was this patch tested? I did some manual tests and ran/updated the automated tests I did some simple performance tests on a single node to try to verify that there is no performance impact, and I was not able to measure anything beyond noise. Closes apache#25008 from revans2/columnar-remove-batch-scan. Authored-by: Robert (Bobby) Evans <[email protected]> Signed-off-by: Thomas Graves <[email protected]>
1 parent 19bcce1 commit 8dff711

17 files changed

+142
-335
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ class ColumnarRule {
5353
* Provides a common executor to translate an [[RDD]] of [[ColumnarBatch]] into an [[RDD]] of
5454
* [[InternalRow]]. This is inserted whenever such a transition is determined to be needed.
5555
*
56-
* The implementation is based off of similar implementations in [[ColumnarBatchScan]],
57-
* [[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]], and
56+
* The implementation is based off of similar implementations in
57+
* [[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]] and
5858
* [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those implementations.
5959
*/
6060
case class ColumnarToRowExec(child: SparkPlan)
@@ -96,9 +96,6 @@ case class ColumnarToRowExec(child: SparkPlan)
9696
/**
9797
* Generate [[ColumnVector]] expressions for our parent to consume as rows.
9898
* This is called once per [[ColumnVector]] in the batch.
99-
*
100-
* This code came unchanged from [[ColumnarBatchScan]] and will hopefully replace it
101-
* at some point.
10299
*/
103100
private def genCodeColumnVector(
104101
ctx: CodegenContext,
@@ -130,9 +127,6 @@ case class ColumnarToRowExec(child: SparkPlan)
130127
* Produce code to process the input iterator as [[ColumnarBatch]]es.
131128
* This produces an [[org.apache.spark.sql.catalyst.expressions.UnsafeRow]] for each row in
132129
* each batch.
133-
*
134-
* This code came almost completely unchanged from [[ColumnarBatchScan]] and will
135-
* hopefully replace it at some point.
136130
*/
137131
override protected def doProduce(ctx: CodegenContext): String = {
138132
// PhysicalRDD always just has one input

sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala

Lines changed: 0 additions & 167 deletions
This file was deleted.

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,11 @@ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat =>
3737
import org.apache.spark.sql.execution.metric.SQLMetrics
3838
import org.apache.spark.sql.sources.{BaseRelation, Filter}
3939
import org.apache.spark.sql.types.StructType
40+
import org.apache.spark.sql.vectorized.ColumnarBatch
4041
import org.apache.spark.util.Utils
4142
import org.apache.spark.util.collection.BitSet
4243

43-
trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
44+
trait DataSourceScanExec extends LeafExecNode {
4445
val relation: BaseRelation
4546
val tableIdentifier: Option[TableIdentifier]
4647

@@ -69,6 +70,12 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
6970
private def redact(text: String): String = {
7071
Utils.redact(sqlContext.sessionState.conf.stringRedactionPattern, text)
7172
}
73+
74+
/**
75+
* The data being read in. This is to provide input to the tests in a way compatible with
76+
* [[InputRDDCodegen]] which all implementations used to extend.
77+
*/
78+
def inputRDDs(): Seq[RDD[InternalRow]]
7279
}
7380

7481
/** Physical plan node for scanning data from a relation. */
@@ -141,11 +148,11 @@ case class FileSourceScanExec(
141148
optionalBucketSet: Option[BitSet],
142149
dataFilters: Seq[Expression],
143150
override val tableIdentifier: Option[TableIdentifier])
144-
extends DataSourceScanExec with ColumnarBatchScan {
151+
extends DataSourceScanExec {
145152

146153
// Note that some vals referring the file-based relation are lazy intentionally
147154
// so that this plan can be canonicalized on executor side too. See SPARK-23731.
148-
override lazy val supportsBatch: Boolean = {
155+
override lazy val supportsColumnar: Boolean = {
149156
relation.fileFormat.supportBatch(relation.sparkSession, schema)
150157
}
151158

@@ -275,7 +282,7 @@ case class FileSourceScanExec(
275282
Map(
276283
"Format" -> relation.fileFormat.toString,
277284
"ReadSchema" -> requiredSchema.catalogString,
278-
"Batched" -> supportsBatch.toString,
285+
"Batched" -> supportsColumnar.toString,
279286
"PartitionFilters" -> seqToString(partitionFilters),
280287
"PushedFilters" -> seqToString(pushedDownFilters),
281288
"DataFilters" -> seqToString(dataFilters),
@@ -302,7 +309,7 @@ case class FileSourceScanExec(
302309
withSelectedBucketsCount
303310
}
304311

305-
private lazy val inputRDD: RDD[InternalRow] = {
312+
lazy val inputRDD: RDD[InternalRow] = {
306313
val readFile: (PartitionedFile) => Iterator[InternalRow] =
307314
relation.fileFormat.buildReaderWithPartitionValues(
308315
sparkSession = relation.sparkSession,
@@ -334,29 +341,30 @@ case class FileSourceScanExec(
334341
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
335342

336343
protected override def doExecute(): RDD[InternalRow] = {
337-
if (supportsBatch) {
338-
// in the case of fallback, this batched scan should never fail because of:
339-
// 1) only primitive types are supported
340-
// 2) the number of columns should be smaller than spark.sql.codegen.maxFields
341-
WholeStageCodegenExec(this)(codegenStageId = 0).execute()
342-
} else {
343-
val numOutputRows = longMetric("numOutputRows")
344-
345-
if (needsUnsafeRowConversion) {
346-
inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
347-
val proj = UnsafeProjection.create(schema)
348-
proj.initialize(index)
349-
iter.map( r => {
350-
numOutputRows += 1
351-
proj(r)
352-
})
353-
}
354-
} else {
355-
inputRDD.map { r =>
344+
val numOutputRows = longMetric("numOutputRows")
345+
346+
if (needsUnsafeRowConversion) {
347+
inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
348+
val proj = UnsafeProjection.create(schema)
349+
proj.initialize(index)
350+
iter.map( r => {
356351
numOutputRows += 1
357-
r
358-
}
352+
proj(r)
353+
})
359354
}
355+
} else {
356+
inputRDD.map { r =>
357+
numOutputRows += 1
358+
r
359+
}
360+
}
361+
}
362+
363+
protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
364+
val numOutputRows = longMetric("numOutputRows")
365+
inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { batch =>
366+
numOutputRows += batch.numRows()
367+
batch
360368
}
361369
}
362370

sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -709,11 +709,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
709709
s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disabled " +
710710
s"for this plan (id=$codegenStageId). To avoid this, you can raise the limit " +
711711
s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString")
712-
child match {
713-
// The fallback solution of batch file source scan still uses WholeStageCodegenExec
714-
case f: FileSourceScanExec if f.supportsBatch => // do nothing
715-
case _ => return child.execute()
716-
}
712+
return child.execute()
717713
}
718714

719715
val references = ctx.references.toArray

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ case class AdaptiveSparkPlanExec(
8484
// optimizations should be stage-independent.
8585
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
8686
ReduceNumShufflePartitions(conf),
87+
ApplyColumnarRulesAndInsertTransitions(session.sessionState.conf,
88+
session.sessionState.columnarRules),
8789
CollapseCodegenStages(conf)
8890
)
8991

0 commit comments

Comments
 (0)