Skip to content

Commit 0e42b95

Browse files
committed
[SPARK-53673][CONNECT][TESTS] Fix a flaky test failure in SparkSessionE2ESuite - interrupt tag caused by the usage of ForkJoinPool
### What changes were proposed in this pull request? This PR aims to fix one of the issues which block SPARK-48139. In the problematic test `interrupt tag` in `SparkSessionE2ESuite`, four futures run on threads in `ForkJoinPool` and try to interrupt through tags. A thread in a `ForkJoinPool` can create a spare thread and make it available in the pool so any of threads can be parent and child. It can happen when a thread performs a blocking operation. One example is `ArrayBlockingQueue.take` and it is called in a method provided by [gRPC](https://github.com/grpc/grpc-java/blob/3e993a9f44ff52bd3d5ac59dfa978d8e7d30e28b/stub/src/main/java/io/grpc/stub/ClientCalls.java#L607). On the other hand, tags are inplemented as [InheritableThreadLocal](https://github.com/apache/spark/blob/f3ac67ee9b3b0ce63b30426f8bec825b20d91dde/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala#L303). So, if the futures q1 and q4, or q2 and q3 are parent and child, tags should be inheritd, which causes the flaky test faulure. You can easily reprodue the issue by inserting a sleep into the problematic test like as follows (don't forget to replace `ignore` with `test`). ``` // TODO(SPARK-48139): Re-enable `SparkSessionE2ESuite.interrupt tag` - ignore("interrupt tag") { + test("interrupt tag") { val session = spark import session.implicits._ -204,6 +204,7 class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession { spark.clearTags() // clear for the case of thread reuse by another Future } }(executionContext) + Thread.sleep(1000) val q4 = Future { assert(spark.getTags() == Set()) spark.addTag("one") ``` And then, run the test. ``` $ build/sbt 'connect-client-jvm/testOnly org.apache.spark.sql.connect.SparkSessionE2ESuite -- -z "interrupt tag"' ``` ### Why are the changes needed? For test stability. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Ran the problematic test with inserting sleep like mentioned above and it passed. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52417 from sarutak/fix-thread-pool-issue. Authored-by: Kousuke Saruta <[email protected]> Signed-off-by: Kousuke Saruta <[email protected]>
1 parent fdcd140 commit 0e42b95

File tree

1 file changed

+2
-2
lines changed

1 file changed

+2
-2
lines changed

sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.spark.sql.connect
1818

19-
import java.util.concurrent.ForkJoinPool
19+
import java.util.concurrent.Executors
2020

2121
import scala.collection.mutable
2222
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
@@ -146,7 +146,7 @@ class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession {
146146
// global ExecutionContext has only 2 threads in Apache Spark CI
147147
// create own thread pool for four Futures used in this test
148148
val numThreads = 4
149-
val fpool = new ForkJoinPool(numThreads)
149+
val fpool = Executors.newFixedThreadPool(numThreads)
150150
val executionContext = ExecutionContext.fromExecutorService(fpool)
151151

152152
val q1 = Future {

0 commit comments

Comments
 (0)