Skip to content

Commit 3d0e174

Browse files
committed
[SPARK-21845][SQL] Make codegen fallback of expressions configurable
## What changes were proposed in this pull request? We should make codegen fallback of expressions configurable. So far, it is always on. We might hide it when our codegen have compilation bugs. Thus, we should also disable the codegen fallback when running test cases. ## How was this patch tested? Added test cases Author: gatorsmile <[email protected]> Closes apache#19062 from gatorsmile/fallbackCodegen.
1 parent fba9cc8 commit 3d0e174

File tree

7 files changed

+24
-16
lines changed

7 files changed

+24
-16
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -551,9 +551,9 @@ object SQLConf {
551551
.intConf
552552
.createWithDefault(100)
553553

554-
val WHOLESTAGE_FALLBACK = buildConf("spark.sql.codegen.fallback")
554+
val CODEGEN_FALLBACK = buildConf("spark.sql.codegen.fallback")
555555
.internal()
556-
.doc("When true, whole stage codegen could be temporary disabled for the part of query that" +
556+
.doc("When true, (whole stage) codegen could be temporary disabled for the part of query that" +
557557
" fail to compile generated code")
558558
.booleanConf
559559
.createWithDefault(true)
@@ -1041,7 +1041,7 @@ class SQLConf extends Serializable with Logging {
10411041

10421042
def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS)
10431043

1044-
def wholeStageFallback: Boolean = getConf(WHOLESTAGE_FALLBACK)
1044+
def codegenFallback: Boolean = getConf(CODEGEN_FALLBACK)
10451045

10461046
def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES)
10471047

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,10 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
5656

5757
protected def sparkContext = sqlContext.sparkContext
5858

59-
// sqlContext will be null when we are being deserialized on the slaves. In this instance
60-
// the value of subexpressionEliminationEnabled will be set by the deserializer after the
61-
// constructor has run.
62-
val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) {
63-
sqlContext.conf.subexpressionEliminationEnabled
64-
} else {
65-
false
66-
}
59+
// whether we should fallback when hitting compilation errors caused by codegen
60+
private val codeGenFallBack = sqlContext.conf.codegenFallback
61+
62+
protected val subexpressionEliminationEnabled = sqlContext.conf.subexpressionEliminationEnabled
6763

6864
/** Overridden make copy also propagates sqlContext to copied plan. */
6965
override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = {
@@ -370,8 +366,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
370366
try {
371367
GeneratePredicate.generate(expression, inputSchema)
372368
} catch {
373-
case e @ (_: JaninoRuntimeException | _: CompileException)
374-
if sqlContext == null || sqlContext.conf.wholeStageFallback =>
369+
case _ @ (_: JaninoRuntimeException | _: CompileException) if codeGenFallBack =>
375370
genInterpretedPredicate(expression, inputSchema)
376371
}
377372
}

sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
382382
try {
383383
CodeGenerator.compile(cleanedSource)
384384
} catch {
385-
case e: Exception if !Utils.isTesting && sqlContext.conf.wholeStageFallback =>
385+
case _: Exception if !Utils.isTesting && sqlContext.conf.codegenFallback =>
386386
// We should already saw the error message
387387
logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString")
388388
return child.execute()

sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
422422
v
423423
}
424424
withSQLConf(
425-
(SQLConf.WHOLESTAGE_FALLBACK.key, codegenFallback.toString),
425+
(SQLConf.CODEGEN_FALLBACK.key, codegenFallback.toString),
426426
(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString)) {
427427
val df = spark.range(0, 4, 1, 4).withColumn("c", c)
428428
val rows = df.collect()

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2011,7 +2011,17 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
20112011

20122012
val filter = (0 until N)
20132013
.foldLeft(lit(false))((e, index) => e.or(df.col(df.columns(index)) =!= "string"))
2014-
df.filter(filter).count
2014+
2015+
withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "true") {
2016+
df.filter(filter).count()
2017+
}
2018+
2019+
withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "false") {
2020+
val e = intercept[SparkException] {
2021+
df.filter(filter).count()
2022+
}.getMessage
2023+
assert(e.contains("grows beyond 64 KB"))
2024+
}
20152025
}
20162026

20172027
test("SPARK-20897: cached self-join should not fail") {

sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.scalatest.concurrent.Eventually
2424

2525
import org.apache.spark.{DebugFilesystem, SparkConf}
2626
import org.apache.spark.sql.{SparkSession, SQLContext}
27+
import org.apache.spark.sql.internal.SQLConf
2728

2829
/**
2930
* Helper trait for SQL test suites where all tests share a single [[TestSparkSession]].
@@ -34,6 +35,7 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventua
3435
new SparkConf()
3536
.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
3637
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
38+
.set(SQLConf.CODEGEN_FALLBACK.key, "false")
3739
}
3840

3941
/**

sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ object TestHive
5151
"TestSQLContext",
5252
new SparkConf()
5353
.set("spark.sql.test", "")
54+
.set(SQLConf.CODEGEN_FALLBACK.key, "false")
5455
.set("spark.sql.hive.metastore.barrierPrefixes",
5556
"org.apache.spark.sql.hive.execution.PairSerDe")
5657
.set("spark.sql.warehouse.dir", TestHiveContext.makeWarehouseDir().toURI.getPath)

0 commit comments

Comments
 (0)