Skip to content

Commit d3a1d95

Browse files
committed
[SPARK-22786][SQL] only use AppStatusPlugin in history server
## What changes were proposed in this pull request? In #19681 we introduced a new interface called `AppStatusPlugin`, to register listeners and set up the UI for both live and history UI. However I think it's an overkill for live UI. For example, we should not register `SQLListener` if users are not using SQL functions. Previously we register the `SQLListener` and set up SQL tab when `SparkSession` is firstly created, which indicates users are going to use SQL functions. But in #19681 , we register the SQL functions during `SparkContext` creation. The same thing should apply to streaming too. I think we should keep the previous behavior, and only use this new interface for history server. To reflect this change, I also rename the new interface to `SparkHistoryUIPlugin` This PR also refines the tests for sql listener. ## How was this patch tested? existing tests Author: Wenchen Fan <[email protected]> Closes #19981 from cloud-fan/listener.
1 parent 8a0ed5a commit d3a1d95

File tree

16 files changed

+256
-319
lines changed

16 files changed

+256
-319
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ import org.apache.spark.rpc.RpcEndpointRef
5353
import org.apache.spark.scheduler._
5454
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend}
5555
import org.apache.spark.scheduler.local.LocalSchedulerBackend
56-
import org.apache.spark.status.{AppStatusPlugin, AppStatusStore}
56+
import org.apache.spark.status.AppStatusStore
5757
import org.apache.spark.storage._
5858
import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
5959
import org.apache.spark.ui.{ConsoleProgressBar, SparkUI}
@@ -416,7 +416,8 @@ class SparkContext(config: SparkConf) extends Logging {
416416

417417
// Initialize the app status store and listener before SparkEnv is created so that it gets
418418
// all events.
419-
_statusStore = AppStatusStore.createLiveStore(conf, l => listenerBus.addToStatusQueue(l))
419+
_statusStore = AppStatusStore.createLiveStore(conf)
420+
listenerBus.addToStatusQueue(_statusStore.listener.get)
420421

421422
// Create the Spark execution environment (cache, map output tracker, etc)
422423
_env = createSparkEnv(_conf, isLocal, listenerBus)
@@ -445,14 +446,9 @@ class SparkContext(config: SparkConf) extends Logging {
445446
// For tests, do not enable the UI
446447
None
447448
}
448-
_ui.foreach { ui =>
449-
// Load any plugins that might want to modify the UI.
450-
AppStatusPlugin.loadPlugins().foreach(_.setupUI(ui))
451-
452-
// Bind the UI before starting the task scheduler to communicate
453-
// the bound port to the cluster manager properly
454-
ui.bind()
455-
}
449+
// Bind the UI before starting the task scheduler to communicate
450+
// the bound port to the cluster manager properly
451+
_ui.foreach(_.bind())
456452

457453
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
458454

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ import org.apache.spark.scheduler.ReplayListenerBus._
4444
import org.apache.spark.status._
4545
import org.apache.spark.status.KVUtils._
4646
import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
47-
import org.apache.spark.status.config._
4847
import org.apache.spark.ui.SparkUI
4948
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
5049
import org.apache.spark.util.kvstore._
@@ -322,15 +321,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
322321
(new InMemoryStore(), true)
323322
}
324323

324+
val plugins = ServiceLoader.load(
325+
classOf[AppHistoryServerPlugin], Utils.getContextOrSparkClassLoader).asScala
325326
val trackingStore = new ElementTrackingStore(kvstore, conf)
326327
if (needReplay) {
327328
val replayBus = new ReplayListenerBus()
328329
val listener = new AppStatusListener(trackingStore, conf, false,
329330
lastUpdateTime = Some(attempt.info.lastUpdated.getTime()))
330331
replayBus.addListener(listener)
331-
AppStatusPlugin.loadPlugins().foreach { plugin =>
332-
plugin.setupListeners(conf, trackingStore, l => replayBus.addListener(l), false)
333-
}
332+
for {
333+
plugin <- plugins
334+
listener <- plugin.createListeners(conf, trackingStore)
335+
} replayBus.addListener(listener)
334336
try {
335337
val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
336338
replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)
@@ -353,9 +355,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
353355
HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
354356
attempt.info.startTime.getTime(),
355357
attempt.info.appSparkVersion)
356-
AppStatusPlugin.loadPlugins().foreach { plugin =>
357-
plugin.setupUI(ui)
358-
}
358+
plugins.foreach(_.setupUI(ui))
359359

360360
val loadedUI = LoadedAppUI(ui)
361361

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.status
19+
20+
import org.apache.spark.SparkConf
21+
import org.apache.spark.scheduler.SparkListener
22+
import org.apache.spark.ui.SparkUI
23+
24+
/**
25+
* An interface for creating history listeners(to replay event logs) defined in other modules like
26+
* SQL, and setup the UI of the plugin to rebuild the history UI.
27+
*/
28+
private[spark] trait AppHistoryServerPlugin {
29+
/**
30+
* Creates listeners to replay the event logs.
31+
*/
32+
def createListeners(conf: SparkConf, store: ElementTrackingStore): Seq[SparkListener]
33+
34+
/**
35+
* Sets up UI of this plugin to rebuild the history UI.
36+
*/
37+
def setupUI(ui: SparkUI): Unit
38+
}

core/src/main/scala/org/apache/spark/status/AppStatusListener.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ private[spark] class AppStatusListener(
4848

4949
import config._
5050

51-
private var sparkVersion = SPARK_VERSION
51+
private val sparkVersion = SPARK_VERSION
5252
private var appInfo: v1.ApplicationInfo = null
5353
private var appSummary = new AppSummary(0, 0)
5454
private var coresPerTask: Int = 1

core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala

Lines changed: 0 additions & 71 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/status/AppStatusStore.scala

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,22 @@
1717

1818
package org.apache.spark.status
1919

20-
import java.io.File
21-
import java.util.{Arrays, List => JList}
20+
import java.util.{List => JList}
2221

2322
import scala.collection.JavaConverters._
2423

2524
import org.apache.spark.{JobExecutionStatus, SparkConf}
26-
import org.apache.spark.scheduler.SparkListener
2725
import org.apache.spark.status.api.v1
2826
import org.apache.spark.ui.scope._
29-
import org.apache.spark.util.{Distribution, Utils}
27+
import org.apache.spark.util.Distribution
3028
import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
3129

3230
/**
3331
* A wrapper around a KVStore that provides methods for accessing the API data stored within.
3432
*/
3533
private[spark] class AppStatusStore(
3634
val store: KVStore,
37-
listener: Option[AppStatusListener] = None) {
35+
val listener: Option[AppStatusListener] = None) {
3836

3937
def applicationInfo(): v1.ApplicationInfo = {
4038
store.view(classOf[ApplicationInfoWrapper]).max(1).iterator().next().info
@@ -346,17 +344,10 @@ private[spark] object AppStatusStore {
346344

347345
/**
348346
* Create an in-memory store for a live application.
349-
*
350-
* @param conf Configuration.
351-
* @param addListenerFn Function to register a listener with a bus.
352347
*/
353-
def createLiveStore(conf: SparkConf, addListenerFn: SparkListener => Unit): AppStatusStore = {
348+
def createLiveStore(conf: SparkConf): AppStatusStore = {
354349
val store = new ElementTrackingStore(new InMemoryStore(), conf)
355350
val listener = new AppStatusListener(store, conf, true)
356-
addListenerFn(listener)
357-
AppStatusPlugin.loadPlugins().foreach { p =>
358-
p.setupListeners(conf, store, addListenerFn, true)
359-
}
360351
new AppStatusStore(store, listener = Some(listener))
361352
}
362353

core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,13 @@ import javax.servlet.http.HttpServletRequest
2222

2323
import scala.xml.Node
2424

25-
import org.mockito.Matchers.anyString
2625
import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
2726

2827
import org.apache.spark._
2928
import org.apache.spark.executor.TaskMetrics
3029
import org.apache.spark.scheduler._
3130
import org.apache.spark.status.AppStatusStore
3231
import org.apache.spark.ui.jobs.{StagePage, StagesTab}
33-
import org.apache.spark.util.Utils
3432

3533
class StagePageSuite extends SparkFunSuite with LocalSparkContext {
3634

@@ -55,20 +53,20 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
5553
* This also runs a dummy stage to populate the page with useful content.
5654
*/
5755
private def renderStagePage(conf: SparkConf): Seq[Node] = {
58-
val bus = new ReplayListenerBus()
59-
val store = AppStatusStore.createLiveStore(conf, l => bus.addListener(l))
56+
val statusStore = AppStatusStore.createLiveStore(conf)
57+
val listener = statusStore.listener.get
6058

6159
try {
6260
val tab = mock(classOf[StagesTab], RETURNS_SMART_NULLS)
63-
when(tab.store).thenReturn(store)
61+
when(tab.store).thenReturn(statusStore)
6462

6563
val request = mock(classOf[HttpServletRequest])
6664
when(tab.conf).thenReturn(conf)
6765
when(tab.appName).thenReturn("testing")
6866
when(tab.headerTabs).thenReturn(Seq.empty)
6967
when(request.getParameter("id")).thenReturn("0")
7068
when(request.getParameter("attempt")).thenReturn("0")
71-
val page = new StagePage(tab, store)
69+
val page = new StagePage(tab, statusStore)
7270

7371
// Simulate a stage in job progress listener
7472
val stageInfo = new StageInfo(0, 0, "dummy", 1, Seq.empty, Seq.empty, "details")
@@ -77,17 +75,17 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
7775
taskId =>
7876
val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY,
7977
false)
80-
bus.postToAll(SparkListenerStageSubmitted(stageInfo))
81-
bus.postToAll(SparkListenerTaskStart(0, 0, taskInfo))
78+
listener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
79+
listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
8280
taskInfo.markFinished(TaskState.FINISHED, System.currentTimeMillis())
8381
val taskMetrics = TaskMetrics.empty
8482
taskMetrics.incPeakExecutionMemory(peakExecutionMemory)
85-
bus.postToAll(SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, taskMetrics))
83+
listener.onTaskEnd(SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, taskMetrics))
8684
}
87-
bus.postToAll(SparkListenerStageCompleted(stageInfo))
85+
listener.onStageCompleted(SparkListenerStageCompleted(stageInfo))
8886
page.render(request)
8987
} finally {
90-
store.close()
88+
statusStore.close()
9189
}
9290
}
9391

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.apache.spark.sql.execution.ui.SQLHistoryServerPlugin

sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppStatusPlugin

Lines changed: 0 additions & 1 deletion
This file was deleted.

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,11 @@ import org.apache.spark.sql.execution.metric._
3030
import org.apache.spark.sql.internal.StaticSQLConf._
3131
import org.apache.spark.status.{ElementTrackingStore, KVUtils, LiveEntity}
3232
import org.apache.spark.status.config._
33-
import org.apache.spark.ui.SparkUI
34-
import org.apache.spark.util.kvstore.KVStore
3533

36-
private[sql] class SQLAppStatusListener(
34+
class SQLAppStatusListener(
3735
conf: SparkConf,
3836
kvstore: ElementTrackingStore,
39-
live: Boolean,
40-
ui: Option[SparkUI] = None)
41-
extends SparkListener with Logging {
37+
live: Boolean) extends SparkListener with Logging {
4238

4339
// How often to flush intermediate state of a live execution to the store. When replaying logs,
4440
// never flush (only do the very last write).
@@ -50,7 +46,10 @@ private[sql] class SQLAppStatusListener(
5046
private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]()
5147
private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]()
5248

53-
private var uiInitialized = false
49+
// Returns true if this listener has no live data. Exposed for tests only.
50+
private[sql] def noLiveData(): Boolean = {
51+
liveExecutions.isEmpty && stageMetrics.isEmpty
52+
}
5453

5554
kvstore.addTrigger(classOf[SQLExecutionUIData], conf.get(UI_RETAINED_EXECUTIONS)) { count =>
5655
cleanupExecutions(count)
@@ -230,14 +229,6 @@ private[sql] class SQLAppStatusListener(
230229
}
231230

232231
private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
233-
// Install the SQL tab in a live app if it hasn't been initialized yet.
234-
if (!uiInitialized) {
235-
ui.foreach { _ui =>
236-
new SQLTab(new SQLAppStatusStore(kvstore, Some(this)), _ui)
237-
}
238-
uiInitialized = true
239-
}
240-
241232
val SparkListenerSQLExecutionStart(executionId, description, details,
242233
physicalPlanDescription, sparkPlanInfo, time) = event
243234

@@ -389,7 +380,7 @@ private class LiveStageMetrics(
389380
val accumulatorIds: Array[Long],
390381
val taskMetrics: ConcurrentHashMap[Long, LiveTaskMetrics])
391382

392-
private[sql] class LiveTaskMetrics(
383+
private class LiveTaskMetrics(
393384
val ids: Array[Long],
394385
val values: Array[Long],
395386
val succeeded: Boolean)

0 commit comments

Comments
 (0)