Skip to content

Commit 0c94e48

Browse files
gaborgsomogyiMarcelo Vanzin
authored andcommitted
[SPARK-23775][TEST] Make DataFrameRangeSuite not flaky
## What changes were proposed in this pull request? DataFrameRangeSuite.test("Cancelling stage in a query with Range.") stays sometimes in an infinite loop and times out the build. There were multiple issues with the test: 1. The first valid stageId is zero when the test started alone and not in a suite and the following code waits until timeout: ``` eventually(timeout(10.seconds), interval(1.millis)) { assert(DataFrameRangeSuite.stageToKill > 0) } ``` 2. The `DataFrameRangeSuite.stageToKill` was overwritten by the task's thread after the reset which ended up in canceling the same stage 2 times. This caused the infinite wait. This PR solves this mentioned flakyness by removing the shared `DataFrameRangeSuite.stageToKill` and using `wait` and `CountDownLatch` for synhronization. ## How was this patch tested? Existing unit test. Author: Gabor Somogyi <[email protected]> Closes apache#20888 from gaborgsomogyi/SPARK-23775.
1 parent a906647 commit 0c94e48

File tree

1 file changed

+45
-33
lines changed

1 file changed

+45
-33
lines changed

sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala

Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@
1717

1818
package org.apache.spark.sql
1919

20+
import java.util.concurrent.{CountDownLatch, TimeUnit}
21+
2022
import scala.concurrent.duration._
2123
import scala.math.abs
2224
import scala.util.Random
2325

2426
import org.scalatest.concurrent.Eventually
2527

26-
import org.apache.spark.{SparkException, TaskContext}
27-
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
28+
import org.apache.spark.{SparkContext, SparkException}
29+
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
2830
import org.apache.spark.sql.functions._
2931
import org.apache.spark.sql.internal.SQLConf
3032
import org.apache.spark.sql.test.SharedSQLContext
@@ -152,39 +154,53 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
152154
}
153155

154156
test("Cancelling stage in a query with Range.") {
155-
val listener = new SparkListener {
156-
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
157-
eventually(timeout(10.seconds), interval(1.millis)) {
158-
assert(DataFrameRangeSuite.stageToKill > 0)
157+
// Save and restore the value because SparkContext is shared
158+
val savedInterruptOnCancel = sparkContext
159+
.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL)
160+
161+
try {
162+
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")
163+
164+
for (codegen <- Seq(true, false)) {
165+
// This countdown latch used to make sure with all the stages cancelStage called in listener
166+
val latch = new CountDownLatch(2)
167+
168+
val listener = new SparkListener {
169+
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
170+
sparkContext.cancelStage(taskStart.stageId)
171+
latch.countDown()
172+
}
159173
}
160-
sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
161-
}
162-
}
163174

164-
sparkContext.addSparkListener(listener)
165-
for (codegen <- Seq(true, false)) {
166-
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) {
167-
DataFrameRangeSuite.stageToKill = -1
168-
val ex = intercept[SparkException] {
169-
spark.range(0, 100000000000L, 1, 1).map { x =>
170-
DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
171-
x
172-
}.toDF("id").agg(sum("id")).collect()
175+
sparkContext.addSparkListener(listener)
176+
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) {
177+
val ex = intercept[SparkException] {
178+
sparkContext.range(0, 10000L, numSlices = 10).mapPartitions { x =>
179+
x.synchronized {
180+
x.wait()
181+
}
182+
x
183+
}.toDF("id").agg(sum("id")).collect()
184+
}
185+
ex.getCause() match {
186+
case null =>
187+
assert(ex.getMessage().contains("cancelled"))
188+
case cause: SparkException =>
189+
assert(cause.getMessage().contains("cancelled"))
190+
case cause: Throwable =>
191+
fail("Expected the cause to be SparkException, got " + cause.toString() + " instead.")
192+
}
173193
}
174-
ex.getCause() match {
175-
case null =>
176-
assert(ex.getMessage().contains("cancelled"))
177-
case cause: SparkException =>
178-
assert(cause.getMessage().contains("cancelled"))
179-
case cause: Throwable =>
180-
fail("Expected the cause to be SparkException, got " + cause.toString() + " instead.")
194+
latch.await(20, TimeUnit.SECONDS)
195+
eventually(timeout(20.seconds)) {
196+
assert(sparkContext.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
181197
}
198+
sparkContext.removeSparkListener(listener)
182199
}
183-
eventually(timeout(20.seconds)) {
184-
assert(sparkContext.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
185-
}
200+
} finally {
201+
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,
202+
savedInterruptOnCancel)
186203
}
187-
sparkContext.removeSparkListener(listener)
188204
}
189205

190206
test("SPARK-20430 Initialize Range parameters in a driver side") {
@@ -204,7 +220,3 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
204220
}
205221
}
206222
}
207-
208-
object DataFrameRangeSuite {
209-
@volatile var stageToKill = -1
210-
}

0 commit comments

Comments
 (0)