Skip to content

Commit 5ac42e2

Browse files
changgyoopark-dbHyukjinKwon
authored andcommitted
[SPARK-50534][SPARK-50535][TEST][CONNECT] Fix sporadic test failures
### What changes were proposed in this pull request? Fix sporadic Spark Connect test failures. 1. SPARK-50534: VerifyEvents.this.listener.executeHolder was not declared "volatile", causing the thread to repeatedly read potentially outdated value. The data structure is only used by the test suite. 2. SPARK-50535: org.apache.spark.sql.connect.service.SparkConnectSessionManager.invalidateAllSessions is susceptible to system time synchronization (e.g., NTP), leaving stale sessions. invalidateAllSessions is only used by test suites. ### Why are the changes needed? Fix sporadic test failures. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Repeatedly ran testOnly org.apache.spark.sql.connect.planner.SparkConnectServiceSuite and org.apache.spark.sql.connect.service.SparkConnectServiceE2ESuite. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #49253 from changgyoopark-db/SPARK-50534. Authored-by: changgyoopark-db <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 827d2a0 commit 5ac42e2

File tree

2 files changed

+6
-3
lines changed

2 files changed

+6
-3
lines changed

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,8 +289,10 @@ class SparkConnectSessionManager extends Logging {
289289
* Used for testing
290290
*/
291291
private[connect] def invalidateAllSessions(): Unit = {
292-
periodicMaintenance(defaultInactiveTimeoutMs = 0L, ignoreCustomTimeout = true)
293-
assert(sessionStore.isEmpty)
292+
sessionStore.forEach((key, sessionHolder) => {
293+
removeSessionHolder(key)
294+
shutdownSessionHolder(sessionHolder)
295+
})
294296
closedSessionsCache.invalidateAll()
295297
}
296298

sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -919,7 +919,8 @@ class SparkConnectServiceSuite
919919
}
920920
class MockSparkListener() extends SparkListener {
921921
val semaphoreStarted = new Semaphore(0)
922-
var executeHolder = Option.empty[ExecuteHolder]
922+
// Accessed by multiple threads in parallel.
923+
@volatile var executeHolder = Option.empty[ExecuteHolder]
923924
override def onOtherEvent(event: SparkListenerEvent): Unit = {
924925
event match {
925926
case e: SparkListenerConnectOperationStarted =>

0 commit comments

Comments
 (0)