Skip to content

Commit 7beb375

Browse files
squitoMarcelo Vanzin
authored andcommitted
[SPARK-22861][SQL] SQLAppStatusListener handles multi-job executions.
When one execution has multiple jobs, we need to append to the set of stages, not replace them on every job. Added unit test and ran existing tests on jenkins Author: Imran Rashid <[email protected]> Closes #20047 from squito/SPARK-22861.
1 parent fe65361 commit 7beb375

File tree

2 files changed

+44
-1
lines changed

2 files changed

+44
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class SQLAppStatusListener(
8787
}
8888

8989
exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
90-
exec.stages = event.stageIds.toSet
90+
exec.stages ++= event.stageIds.toSet
9191
update(exec)
9292
}
9393

sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,49 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with
383383
assertJobs(statusStore.execution(executionId), failed = Seq(0))
384384
}
385385

386+
test("handle one execution with multiple jobs") {
387+
val statusStore = createStatusStore()
388+
val listener = statusStore.listener.get
389+
390+
val executionId = 0
391+
val df = createTestDataFrame
392+
listener.onOtherEvent(SparkListenerSQLExecutionStart(
393+
executionId,
394+
"test",
395+
"test",
396+
df.queryExecution.toString,
397+
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
398+
System.currentTimeMillis()))
399+
400+
var stageId = 0
401+
def twoStageJob(jobId: Int): Unit = {
402+
val stages = Seq(stageId, stageId + 1).map { id => createStageInfo(id, 0)}
403+
stageId += 2
404+
listener.onJobStart(SparkListenerJobStart(
405+
jobId = jobId,
406+
time = System.currentTimeMillis(),
407+
stageInfos = stages,
408+
createProperties(executionId)))
409+
stages.foreach { s =>
410+
listener.onStageSubmitted(SparkListenerStageSubmitted(s))
411+
listener.onStageCompleted(SparkListenerStageCompleted(s))
412+
}
413+
listener.onJobEnd(SparkListenerJobEnd(
414+
jobId = jobId,
415+
time = System.currentTimeMillis(),
416+
JobSucceeded
417+
))
418+
}
419+
// submit two jobs with the same executionId
420+
twoStageJob(0)
421+
twoStageJob(1)
422+
listener.onOtherEvent(SparkListenerSQLExecutionEnd(
423+
executionId, System.currentTimeMillis()))
424+
425+
assertJobs(statusStore.execution(0), completed = 0 to 1)
426+
assert(statusStore.execution(0).get.stages === (0 to 3).toSet)
427+
}
428+
386429
test("SPARK-11126: no memory leak when running non SQL jobs") {
387430
val listener = spark.sharedState.statusStore.listener.get
388431
// At the beginning of this test case, there should be no live data in the listener.

0 commit comments

Comments
 (0)