Skip to content

Commit c781454

Browse files
ericlRobert Kruszewski
authored andcommitted
[SPARK-23809][SQL] Active SparkSession should be set by getOrCreate
## What changes were proposed in this pull request? Currently, the active spark session is set inconsistently (e.g., in createDataFrame, prior to query execution). Many places in spark also incorrectly query active session when they should be calling activeSession.getOrElse(defaultSession) and so might get None even if a Spark session exists. The semantics here can be cleaned up if we also set the active session when the default session is set. Related: https://github.com/apache/spark/pull/20926/files ## How was this patch tested? Unit test, existing test. Note that if apache#20926 merges first we should also update the tests there. Author: Eric Liang <[email protected]> Closes apache#20927 from ericl/active-session-cleanup.
1 parent 0bcf7e4 commit c781454

File tree

4 files changed

+35
-1
lines changed

4 files changed

+35
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -952,7 +952,8 @@ object SparkSession {
952952

953953
session = new SparkSession(sparkContext, None, None, extensions)
954954
options.foreach { case (k, v) => session.initialSessionOptions.put(k, v) }
955-
defaultSession.set(session)
955+
setDefaultSession(session)
956+
setActiveSession(session)
956957

957958
// Register a successfully instantiated context to the singleton. This should be at the
958959
// end of the class definition so that the singleton is updated only if there is no
@@ -1028,6 +1029,17 @@ object SparkSession {
10281029
*/
10291030
def getDefaultSession: Option[SparkSession] = Option(defaultSession.get)
10301031

1032+
/**
1033+
* Returns the currently active SparkSession, otherwise the default one. If there is no default
1034+
* SparkSession, throws an exception.
1035+
*
1036+
* @since 2.4.0
1037+
*/
1038+
def active: SparkSession = {
1039+
getActiveSession.getOrElse(getDefaultSession.getOrElse(
1040+
throw new IllegalStateException("No active or default Spark session found")))
1041+
}
1042+
10311043
////////////////////////////////////////////////////////////////////////////////////////
10321044
// Private methods from now on
10331045
////////////////////////////////////////////////////////////////////////////////////////

sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,24 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach {
5050
assert(SparkSession.builder().getOrCreate() == session)
5151
}
5252

53+
test("sets default and active session") {
54+
assert(SparkSession.getDefaultSession == None)
55+
assert(SparkSession.getActiveSession == None)
56+
val session = SparkSession.builder().master("local").getOrCreate()
57+
assert(SparkSession.getDefaultSession == Some(session))
58+
assert(SparkSession.getActiveSession == Some(session))
59+
}
60+
61+
test("get active or default session") {
62+
val session = SparkSession.builder().master("local").getOrCreate()
63+
assert(SparkSession.active == session)
64+
SparkSession.clearActiveSession()
65+
assert(SparkSession.active == session)
66+
SparkSession.clearDefaultSession()
67+
intercept[IllegalStateException](SparkSession.active)
68+
session.stop()
69+
}
70+
5371
test("config options are propagated to existing SparkSession") {
5472
val session1 = SparkSession.builder().master("local").config("spark-config1", "a").getOrCreate()
5573
assert(session1.conf.get("spark-config1") == "a")

sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ private[spark] class TestSparkSession(sc: SparkContext) extends SparkSession(sc)
3535
}
3636

3737
SparkSession.setDefaultSession(this)
38+
SparkSession.setActiveSession(this)
3839

3940
@transient
4041
override lazy val sessionState: SessionState = {

sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,9 @@ private[hive] class TestHiveSparkSession(
179179
loadTestTables)
180180
}
181181

182+
SparkSession.setDefaultSession(this)
183+
SparkSession.setActiveSession(this)
184+
182185
{ // set the metastore temporary configuration
183186
val metastoreTempConf = HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false) ++ Map(
184187
ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true",

0 commit comments

Comments
 (0)