Skip to content

Commit 1b3bdf8

Browse files
committed
switch back
1 parent eef16fe commit 1b3bdf8

File tree

1 file changed

+5
-8
lines changed

1 file changed

+5
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -333,30 +333,27 @@ case class OneRowRelationExec() extends LeafExecNode
333333

334334
override val output: Seq[Attribute] = Nil
335335

336-
private val rdd: RDD[UnsafeRow] = {
337-
val proj = UnsafeProjection.create(schema)
338-
val emptyRow = proj(InternalRow.empty)
339-
session.sparkContext.parallelize(Seq(emptyRow), 1)
340-
}
336+
private val rdd: RDD[InternalRow] = session.sparkContext.parallelize(Seq(InternalRow.empty), 1)
341337

342338
override lazy val metrics = Map(
343339
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
344340

345341
protected override def doExecute(): RDD[InternalRow] = {
346342
val numOutputRows = longMetric("numOutputRows")
347343
rdd.mapPartitionsWithIndexInternal { (_, iter) =>
344+
val proj = UnsafeProjection.create(schema)
348345
iter.map { r =>
349346
numOutputRows += 1
350-
r
347+
proj(r)
351348
}
352349
}
353350
}
354351

355352
override def simpleString(maxFields: Int): String = s"$nodeName[]"
356353

357-
override def inputRDD: RDD[InternalRow] = rdd.asInstanceOf[RDD[InternalRow]]
354+
override def inputRDD: RDD[InternalRow] = rdd
358355

359-
override protected val createUnsafeProjection: Boolean = false
356+
override protected val createUnsafeProjection: Boolean = true
360357

361358
override protected def doCanonicalize(): SparkPlan = {
362359
super.doCanonicalize().asInstanceOf[OneRowRelationExec].copy()

0 commit comments

Comments
 (0)