Skip to content

Commit 1018be4

Browse files
ericlgatorsmile
authored andcommitted
[SPARK-23971] Should not leak Spark sessions across test suites
## What changes were proposed in this pull request? Many suites currently leak Spark sessions (sometimes with stopped SparkContexts) via the thread-local active Spark session and default Spark session. We should attempt to clean these up and detect when this happens to improve the reproducibility of tests. ## How was this patch tested? Existing tests Author: Eric Liang <[email protected]> Closes apache#21058 from ericl/clear-session.
1 parent ab7b961 commit 1018be4

File tree

4 files changed

+47
-9
lines changed

4 files changed

+47
-9
lines changed

mllib/src/test/java/org/apache/spark/SharedSparkSession.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,12 @@ public void setUp() throws IOException {
4242

4343
@After
4444
public void tearDown() {
45-
spark.stop();
46-
spark = null;
45+
try {
46+
spark.stop();
47+
spark = null;
48+
} finally {
49+
SparkSession.clearDefaultSession();
50+
SparkSession.clearActiveSession();
51+
}
4752
}
4853
}

sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import org.apache.spark.sql.sources.BaseRelation
4444
import org.apache.spark.sql.streaming._
4545
import org.apache.spark.sql.types.{DataType, StructType}
4646
import org.apache.spark.sql.util.ExecutionListenerManager
47-
import org.apache.spark.util.Utils
47+
import org.apache.spark.util.{CallSite, Utils}
4848

4949

5050
/**
@@ -81,6 +81,9 @@ class SparkSession private(
8181
@transient private[sql] val extensions: SparkSessionExtensions)
8282
extends Serializable with Closeable with Logging { self =>
8383

84+
// The call site where this SparkSession was constructed.
85+
private val creationSite: CallSite = Utils.getCallSite()
86+
8487
private[sql] def this(sc: SparkContext) {
8588
this(sc, None, None, new SparkSessionExtensions)
8689
}
@@ -763,7 +766,7 @@ class SparkSession private(
763766

764767

765768
@InterfaceStability.Stable
766-
object SparkSession {
769+
object SparkSession extends Logging {
767770

768771
/**
769772
* Builder for [[SparkSession]].
@@ -1090,4 +1093,20 @@ object SparkSession {
10901093
}
10911094
}
10921095

1096+
private[spark] def cleanupAnyExistingSession(): Unit = {
1097+
val session = getActiveSession.orElse(getDefaultSession)
1098+
if (session.isDefined) {
1099+
logWarning(
1100+
s"""An existing Spark session exists as the active or default session.
1101+
|This probably means another suite leaked it. Attempting to stop it before continuing.
1102+
|This existing Spark session was created at:
1103+
|
1104+
|${session.get.creationSite.longForm}
1105+
|
1106+
""".stripMargin)
1107+
session.get.stop()
1108+
SparkSession.clearActiveSession()
1109+
SparkSession.clearDefaultSession()
1110+
}
1111+
}
10931112
}

sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ class SessionStateSuite extends SparkFunSuite {
4444
if (activeSession != null) {
4545
activeSession.stop()
4646
activeSession = null
47+
SparkSession.clearActiveSession()
48+
SparkSession.clearDefaultSession()
4749
}
4850
super.afterAll()
4951
}

sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ trait SharedSparkSession
6060
protected implicit def sqlContext: SQLContext = _spark.sqlContext
6161

6262
protected def createSparkSession: TestSparkSession = {
63+
SparkSession.cleanupAnyExistingSession()
6364
new TestSparkSession(sparkConf)
6465
}
6566

@@ -92,11 +93,22 @@ trait SharedSparkSession
9293
* Stop the underlying [[org.apache.spark.SparkContext]], if any.
9394
*/
9495
protected override def afterAll(): Unit = {
95-
super.afterAll()
96-
if (_spark != null) {
97-
_spark.sessionState.catalog.reset()
98-
_spark.stop()
99-
_spark = null
96+
try {
97+
super.afterAll()
98+
} finally {
99+
try {
100+
if (_spark != null) {
101+
try {
102+
_spark.sessionState.catalog.reset()
103+
} finally {
104+
_spark.stop()
105+
_spark = null
106+
}
107+
}
108+
} finally {
109+
SparkSession.clearActiveSession()
110+
SparkSession.clearDefaultSession()
111+
}
100112
}
101113
}
102114

0 commit comments

Comments
 (0)