Skip to content

Commit 327d25f

Browse files
mpetruskaHyukjinKwon
authored andcommitted
[SPARK-22572][SPARK SHELL] spark-shell does not re-initialize on :replay
## What changes were proposed in this pull request? Ticket: [SPARK-22572](https://issues.apache.org/jira/browse/SPARK-22572) ## How was this patch tested? Added a new test case to `org.apache.spark.repl.ReplSuite` Author: Mark Petruska <[email protected]> Closes #19791 from mpetruska/SPARK-22572.
1 parent 572af50 commit 327d25f

File tree

3 files changed

+96
-63
lines changed

3 files changed

+96
-63
lines changed

repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala

Lines changed: 43 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -35,40 +35,45 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
3535
def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
3636
def this() = this(None, new JPrintWriter(Console.out, true))
3737

38+
val initializationCommands: Seq[String] = Seq(
39+
"""
40+
@transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
41+
org.apache.spark.repl.Main.sparkSession
42+
} else {
43+
org.apache.spark.repl.Main.createSparkSession()
44+
}
45+
@transient val sc = {
46+
val _sc = spark.sparkContext
47+
if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
48+
val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
49+
if (proxyUrl != null) {
50+
println(
51+
s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
52+
} else {
53+
println(s"Spark Context Web UI is available at Spark Master Public URL")
54+
}
55+
} else {
56+
_sc.uiWebUrl.foreach {
57+
webUrl => println(s"Spark context Web UI available at ${webUrl}")
58+
}
59+
}
60+
println("Spark context available as 'sc' " +
61+
s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
62+
println("Spark session available as 'spark'.")
63+
_sc
64+
}
65+
""",
66+
"import org.apache.spark.SparkContext._",
67+
"import spark.implicits._",
68+
"import spark.sql",
69+
"import org.apache.spark.sql.functions._"
70+
)
71+
3872
def initializeSpark() {
3973
intp.beQuietDuring {
40-
processLine("""
41-
@transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
42-
org.apache.spark.repl.Main.sparkSession
43-
} else {
44-
org.apache.spark.repl.Main.createSparkSession()
45-
}
46-
@transient val sc = {
47-
val _sc = spark.sparkContext
48-
if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
49-
val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
50-
if (proxyUrl != null) {
51-
println(
52-
s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
53-
} else {
54-
println(s"Spark Context Web UI is available at Spark Master Public URL")
55-
}
56-
} else {
57-
_sc.uiWebUrl.foreach {
58-
webUrl => println(s"Spark context Web UI available at ${webUrl}")
59-
}
60-
}
61-
println("Spark context available as 'sc' " +
62-
s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
63-
println("Spark session available as 'spark'.")
64-
_sc
65-
}
66-
""")
67-
processLine("import org.apache.spark.SparkContext._")
68-
processLine("import spark.implicits._")
69-
processLine("import spark.sql")
70-
processLine("import org.apache.spark.sql.functions._")
71-
replayCommandStack = Nil // remove above commands from session history.
74+
savingReplayStack { // remove the commands from session history.
75+
initializationCommands.foreach(processLine)
76+
}
7277
}
7378
}
7479

@@ -107,6 +112,12 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
107112
initializeSpark()
108113
echo("Note that after :reset, state of SparkSession and SparkContext is unchanged.")
109114
}
115+
116+
override def replay(): Unit = {
117+
initializeSpark()
118+
super.replay()
119+
}
120+
110121
}
111122

112123
object SparkILoop {

repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala

Lines changed: 43 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -32,39 +32,45 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
3232
def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
3333
def this() = this(None, new JPrintWriter(Console.out, true))
3434

35+
val initializationCommands: Seq[String] = Seq(
36+
"""
37+
@transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
38+
org.apache.spark.repl.Main.sparkSession
39+
} else {
40+
org.apache.spark.repl.Main.createSparkSession()
41+
}
42+
@transient val sc = {
43+
val _sc = spark.sparkContext
44+
if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
45+
val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
46+
if (proxyUrl != null) {
47+
println(
48+
s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
49+
} else {
50+
println(s"Spark Context Web UI is available at Spark Master Public URL")
51+
}
52+
} else {
53+
_sc.uiWebUrl.foreach {
54+
webUrl => println(s"Spark context Web UI available at ${webUrl}")
55+
}
56+
}
57+
println("Spark context available as 'sc' " +
58+
s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
59+
println("Spark session available as 'spark'.")
60+
_sc
61+
}
62+
""",
63+
"import org.apache.spark.SparkContext._",
64+
"import spark.implicits._",
65+
"import spark.sql",
66+
"import org.apache.spark.sql.functions._"
67+
)
68+
3569
def initializeSpark() {
3670
intp.beQuietDuring {
37-
command("""
38-
@transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
39-
org.apache.spark.repl.Main.sparkSession
40-
} else {
41-
org.apache.spark.repl.Main.createSparkSession()
42-
}
43-
@transient val sc = {
44-
val _sc = spark.sparkContext
45-
if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
46-
val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
47-
if (proxyUrl != null) {
48-
println(
49-
s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
50-
} else {
51-
println(s"Spark Context Web UI is available at Spark Master Public URL")
52-
}
53-
} else {
54-
_sc.uiWebUrl.foreach {
55-
webUrl => println(s"Spark context Web UI available at ${webUrl}")
56-
}
57-
}
58-
println("Spark context available as 'sc' " +
59-
s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
60-
println("Spark session available as 'spark'.")
61-
_sc
62-
}
63-
""")
64-
command("import org.apache.spark.SparkContext._")
65-
command("import spark.implicits._")
66-
command("import spark.sql")
67-
command("import org.apache.spark.sql.functions._")
71+
savingReplayStack { // remove the commands from session history.
72+
initializationCommands.foreach(command)
73+
}
6874
}
6975
}
7076

@@ -103,6 +109,12 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
103109
initializeSpark()
104110
echo("Note that after :reset, state of SparkSession and SparkContext is unchanged.")
105111
}
112+
113+
override def replay(): Unit = {
114+
initializeSpark()
115+
super.replay()
116+
}
117+
106118
}
107119

108120
object SparkILoop {

repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,4 +217,14 @@ class ReplSuite extends SparkFunSuite {
217217
assertDoesNotContain("error:", output)
218218
assertDoesNotContain("Exception", output)
219219
}
220+
221+
test(":replay should work correctly") {
222+
val output = runInterpreter("local",
223+
"""
224+
|sc
225+
|:replay
226+
""".stripMargin)
227+
assertDoesNotContain("error: not found: value sc", output)
228+
}
229+
220230
}

0 commit comments

Comments
 (0)