Skip to content

Commit 32fa0b8

Browse files
ueshincloud-fan
authored andcommitted
[SPARK-21781][SQL] Modify DataSourceScanExec to use concrete ColumnVector type.
## What changes were proposed in this pull request? As mentioned at apache#18680 (comment), when we have more `ColumnVector` implementations, it might (or might not) have huge performance implications because it might disable inlining, or force virtual dispatches. As for read path, one of the major paths is the one generated by `ColumnBatchScan`. Currently it refers `ColumnVector` so the penalty will be bigger as we have more classes, but we can know the concrete type from its usage, e.g. vectorized Parquet reader uses `OnHeapColumnVector`. We can use the concrete type in the generated code directly to avoid the penalty. ## How was this patch tested? Existing tests. Author: Takuya UESHIN <[email protected]> Closes apache#18989 from ueshin/issues/SPARK-21781.
1 parent c7270a4 commit 32fa0b8

File tree

4 files changed

+32
-5
lines changed

4 files changed

+32
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
3333

3434
val inMemoryTableScan: InMemoryTableScanExec = null
3535

36+
def vectorTypes: Option[Seq[String]] = None
37+
3638
override lazy val metrics = Map(
3739
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
3840
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
@@ -79,17 +81,19 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
7981
val scanTimeTotalNs = ctx.freshName("scanTime")
8082
ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
8183

82-
val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch"
84+
val columnarBatchClz = classOf[ColumnarBatch].getName
8385
val batch = ctx.freshName("batch")
8486
ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;")
8587

86-
val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector"
8788
val idx = ctx.freshName("batchIdx")
8889
ctx.addMutableState("int", idx, s"$idx = 0;")
8990
val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
90-
val columnAssigns = colVars.zipWithIndex.map { case (name, i) =>
91-
ctx.addMutableState(columnVectorClz, name, s"$name = null;")
92-
s"$name = $batch.column($i);"
91+
val columnVectorClzs = vectorTypes.getOrElse(
92+
Seq.fill(colVars.size)(classOf[ColumnVector].getName))
93+
val columnAssigns = colVars.zip(columnVectorClzs).zipWithIndex.map {
94+
case ((name, columnVectorClz), i) =>
95+
ctx.addMutableState(columnVectorClz, name, s"$name = null;")
96+
s"$name = ($columnVectorClz) $batch.column($i);"
9397
}
9498

9599
val nextBatch = ctx.freshName("nextBatch")

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,11 @@ case class FileSourceScanExec(
174174
false
175175
}
176176

177+
override def vectorTypes: Option[Seq[String]] =
178+
relation.fileFormat.vectorTypes(
179+
requiredSchema = requiredSchema,
180+
partitionSchema = relation.partitionSchema)
181+
177182
@transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
178183
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
179184
val startTime = System.nanoTime()

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,16 @@ trait FileFormat {
6464
false
6565
}
6666

67+
/**
68+
* Returns concrete column vector class names for each column to be used in a columnar batch
69+
* if this format supports returning columnar batch.
70+
*/
71+
def vectorTypes(
72+
requiredSchema: StructType,
73+
partitionSchema: StructType): Option[Seq[String]] = {
74+
None
75+
}
76+
6777
/**
6878
* Returns whether a file with `path` could be splitted or not.
6979
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import org.apache.spark.sql.catalyst.expressions._
4747
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
4848
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
4949
import org.apache.spark.sql.execution.datasources._
50+
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
5051
import org.apache.spark.sql.internal.SQLConf
5152
import org.apache.spark.sql.sources._
5253
import org.apache.spark.sql.types._
@@ -272,6 +273,13 @@ class ParquetFileFormat
272273
schema.forall(_.dataType.isInstanceOf[AtomicType])
273274
}
274275

276+
override def vectorTypes(
277+
requiredSchema: StructType,
278+
partitionSchema: StructType): Option[Seq[String]] = {
279+
Option(Seq.fill(requiredSchema.fields.length + partitionSchema.fields.length)(
280+
classOf[OnHeapColumnVector].getName))
281+
}
282+
275283
override def isSplitable(
276284
sparkSession: SparkSession,
277285
options: Map[String, String],

0 commit comments

Comments
 (0)