Skip to content

Commit b52abdb

Browse files
committed
init
1 parent aa89883 commit b52abdb

File tree

4 files changed

+63
-4
lines changed

4 files changed

+63
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,3 +319,51 @@ case class RDDScanExec(
319319

320320
override def getStream: Option[SparkDataStream] = stream
321321
}
322+
323+
/**
324+
* A special case of RDDScanExec that is used to represent a scan without a `FROM` clause.
325+
* For example, 'select version()'.
326+
*
327+
* We do not extend `RDDScanExec` in order to avoid complexity due to `TreeNode.makeCopy` and
328+
* `TreeNode`'s general use of reflection.
329+
*/
330+
case class OneRowRelationExec() extends LeafExecNode
331+
with StreamSourceAwareSparkPlan
332+
with InputRDDCodegen {
333+
334+
override val nodeName: String = s"Scan OneRowRelation"
335+
336+
override val output: Seq[Attribute] = Nil
337+
338+
val rdd = session.sparkContext.parallelize(Seq(InternalRow()), 1)
339+
340+
override lazy val metrics = Map(
341+
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
342+
343+
protected override def doExecute(): RDD[InternalRow] = {
344+
val numOutputRows = longMetric("numOutputRows")
345+
rdd.mapPartitionsWithIndexInternal { (index, iter) =>
346+
val proj = UnsafeProjection.create(schema)
347+
proj.initialize(index)
348+
iter.map { r =>
349+
numOutputRows += 1
350+
proj(r)
351+
}
352+
}
353+
}
354+
355+
override def simpleString(maxFields: Int): String = {
356+
s"$nodeName${truncatedString(output, "[", ",", "]", maxFields)}"
357+
}
358+
359+
override def inputRDD: RDD[InternalRow] = rdd
360+
361+
// Input can be InternalRow, has to be turned into UnsafeRows.
362+
override protected val createUnsafeProjection: Boolean = true
363+
364+
override protected def doCanonicalize(): SparkPlan = {
365+
super.doCanonicalize().asInstanceOf[OneRowRelationExec].copy()
366+
}
367+
368+
override def getStream: Option[SparkDataStream] = None
369+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -690,8 +690,6 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
690690
}
691691
}
692692

693-
protected lazy val singleRowRdd = session.sparkContext.parallelize(Seq(InternalRow()), 1)
694-
695693
object InMemoryScans extends Strategy {
696694
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
697695
case PhysicalOperation(projectList, filters, mem: InMemoryRelation) =>
@@ -1040,7 +1038,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
10401038
generator, g.requiredChildOutput, outer,
10411039
g.qualifiedGeneratorOutput, planLater(child)) :: Nil
10421040
case _: logical.OneRowRelation =>
1043-
execution.RDDScanExec(Nil, singleRowRdd, "OneRowRelation") :: Nil
1041+
execution.OneRowRelationExec() :: Nil
10441042
case r: logical.Range =>
10451043
execution.RangeExec(r) :: Nil
10461044
case r: logical.RepartitionByExpression =>

sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ trait CodegenSupport extends SparkPlan {
5656
case _: SortMergeJoinExec => "smj"
5757
case _: BroadcastNestedLoopJoinExec => "bnlj"
5858
case _: RDDScanExec => "rdd"
59+
case _: OneRowRelationExec => "orr"
5960
case _: DataSourceScanExec => "scan"
6061
case _: InMemoryTableScanExec => "memoryScan"
6162
case _: WholeStageCodegenExec => "wholestagecodegen"

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.optimizer.{ConvertToLocalRelation, NestedCo
3838
import org.apache.spark.sql.catalyst.parser.ParseException
3939
import org.apache.spark.sql.catalyst.plans.logical.{LocalLimit, Project, RepartitionByExpression, Sort}
4040
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
41-
import org.apache.spark.sql.execution.{CommandResultExec, UnionExec}
41+
import org.apache.spark.sql.execution.{CommandResultExec, OneRowRelationExec, UnionExec}
4242
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
4343
import org.apache.spark.sql.execution.aggregate._
4444
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
@@ -4962,6 +4962,18 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
49624962
parameters = Map("plan" -> "'Aggregate [groupingsets(Vector(0), posexplode(array(col)))]")
49634963
)
49644964
}
4965+
4966+
Seq(true, false).foreach { codegenEnabled =>
4967+
test(s"SPARK-52060: one row relation with codegen enabled - $codegenEnabled") {
4968+
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegenEnabled.toString) {
4969+
val df = spark.sql("select 'test' stringCol")
4970+
checkAnswer(df, Row("test"))
4971+
val plan = df.queryExecution.executedPlan
4972+
val oneRowRelationExists = plan.find(_.isInstanceOf[OneRowRelationExec]).isDefined
4973+
assert(oneRowRelationExists)
4974+
}
4975+
}
4976+
}
49654977
}
49664978

49674979
case class Foo(bar: Option[String])

0 commit comments

Comments
 (0)