Skip to content

Commit 9c21238

Browse files
anchovYucloud-fan
authored andcommitted
[SPARK-46021][CORE] Support cancel future jobs belonging to a job group
### What changes were proposed in this pull request? This PR supports a new API in SparkContext `cancelJobGroupAndFutureJobs(jobGroup)`. It not only cancels the active jobs, future submitted jobs that belongs to this job group will be cancelled and not run. Internally, it uses a limited-size (current size: 1000, controlled by config `CANCELLED_JOB_GROUP_SET_SIZE`) FIFO set to record all the job group cancelled with this new API. This PR also adds a new error class `SPARK_JOB_CANCELLED` without changing the error message at all, based on the assumption that it's a fundamental error that some downstream workload could rely on parsing the error message. ### Why are the changes needed? Improvements. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43926 from anchovYu/SPARK-46021. Authored-by: Xinyi Yu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 4c36ca3 commit 9c21238

File tree

10 files changed

+197
-17
lines changed

10 files changed

+197
-17
lines changed

common/utils/src/main/resources/error/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1347,7 +1347,7 @@ The following SQLSTATEs are collated from:
13471347
|XX001 |XX |Internal Error |001 |data_corrupted |PostgreSQL |N |PostgreSQL Redshift |
13481348
|XX002 |XX |Internal Error |002 |index_corrupted |PostgreSQL |N |PostgreSQL Redshift |
13491349
|XXKD0 |XX |Internal Error |KD0 |Analysis - Bad plan |Databricks |N |Databricks |
1350-
|XXKDA |XX |Internal Error |KAS |Scheduler (Aether Scheduler) |Databricks |N |Databricks |
1350+
|XXKDA |XX |Internal Error |KAS |Scheduler |Databricks |N |Databricks |
13511351
|XXKDS |XX |Internal Error |KDS |Delta Storage |Databricks |N |Databricks |
13521352
|XXKUC |XX |Internal Error |KUC |Catalog Service (Unity Catalog) |Databricks |N |Databricks |
13531353
|XXKST |XX |Internal Error |KST |Streaming |Databricks |N |Databricks |

common/utils/src/main/resources/error/error-classes.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2961,6 +2961,12 @@
29612961
],
29622962
"sqlState" : "42601"
29632963
},
2964+
"SPARK_JOB_CANCELLED" : {
2965+
"message" : [
2966+
"Job <jobId> cancelled <reason>"
2967+
],
2968+
"sqlState" : "XXKDA"
2969+
},
29642970
"SPECIFY_BUCKETING_IS_NOT_ALLOWED" : {
29652971
"message" : [
29662972
"A CREATE TABLE without explicit column list cannot specify bucketing information.",

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2608,6 +2608,17 @@ class SparkContext(config: SparkConf) extends Logging {
26082608
dagScheduler.cancelJobGroup(groupId)
26092609
}
26102610

2611+
/**
2612+
* Cancel active jobs for the specified group, as well as the future jobs in this job group.
2613+
* Note: the maximum number of job groups that can be tracked is set by
2614+
* 'spark.scheduler.numCancelledJobGroupsToTrack'. Once the limit is reached and a new job group
2615+
* is to be added, the oldest job group tracked will be discarded.
2616+
*/
2617+
def cancelJobGroupAndFutureJobs(groupId: String): Unit = {
2618+
assertNotStopped()
2619+
dagScheduler.cancelJobGroup(groupId, cancelFutureJobs = true)
2620+
}
2621+
26112622
/**
26122623
* Cancel active jobs that have the specified tag. See `org.apache.spark.SparkContext.addJobTag`.
26132624
*

core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,18 @@ private[spark] object SparkCoreErrors {
221221
new NoSuchElementException(id)
222222
}
223223

224+
def sparkJobCancelled(jobId: Int, reason: String, e: Exception): SparkException = {
225+
new SparkException(
226+
errorClass = "SPARK_JOB_CANCELLED",
227+
messageParameters = Map("jobId" -> jobId.toString, "reason" -> reason),
228+
cause = e
229+
)
230+
}
231+
232+
def sparkJobCancelledAsPartOfJobGroupError(jobId: Int, jobGroupId: String): SparkException = {
233+
sparkJobCancelled(jobId, s"part of cancelled job group $jobGroupId", null)
234+
}
235+
224236
def barrierStageWithRDDChainPatternError(): Throwable = {
225237
new BarrierJobUnsupportedRDDChainException
226238
}

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1728,6 +1728,17 @@ package object config {
17281728
.checkValue(v => v > 0, "The max failures should be a positive value.")
17291729
.createWithDefault(40)
17301730

1731+
private[spark] val NUM_CANCELLED_JOB_GROUPS_TO_TRACK =
1732+
ConfigBuilder("spark.scheduler.numCancelledJobGroupsToTrack")
1733+
.doc("The maximum number of tracked job groups that are cancelled with " +
1734+
"`cancelJobGroupAndFutureJobs`. If this maximum number is hit, the oldest job group " +
1735+
"will no longer be tracked that future jobs belonging to this job group will not " +
1736+
"be cancelled.")
1737+
.version("4.0.0")
1738+
.intConf
1739+
.checkValue(v => v > 0, "The size of the set should be a positive value.")
1740+
.createWithDefault(1000)
1741+
17311742
private[spark] val UNSAFE_EXCEPTION_ON_MEMORY_LEAK =
17321743
ConfigBuilder("spark.unsafe.exceptionOnMemoryLeak")
17331744
.internal()

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,12 @@ private[spark] class DAGScheduler(
169169

170170
private[scheduler] val activeJobs = new HashSet[ActiveJob]
171171

172+
// Job groups that are cancelled with `cancelFutureJobs` as true, with at most
173+
// `NUM_CANCELLED_JOB_GROUPS_TO_TRACK` stored. On a new job submission, if its job group is in
174+
// this set, the job will be immediately cancelled.
175+
private[scheduler] val cancelledJobGroups =
176+
new LimitedSizeFIFOSet[String](sc.getConf.get(config.NUM_CANCELLED_JOB_GROUPS_TO_TRACK))
177+
172178
/**
173179
* Contains the locations that each RDD's partitions are cached on. This map's keys are RDD ids
174180
* and its values are arrays indexed by partition numbers. Each array value is the set of
@@ -1081,10 +1087,11 @@ private[spark] class DAGScheduler(
10811087

10821088
/**
10831089
* Cancel all jobs in the given job group ID.
1090+
* @param cancelFutureJobs if true, future submitted jobs in this job group will be cancelled
10841091
*/
1085-
def cancelJobGroup(groupId: String): Unit = {
1086-
logInfo("Asked to cancel job group " + groupId)
1087-
eventProcessLoop.post(JobGroupCancelled(groupId))
1092+
def cancelJobGroup(groupId: String, cancelFutureJobs: Boolean = false): Unit = {
1093+
logInfo(s"Asked to cancel job group $groupId with cancelFutureJobs=$cancelFutureJobs")
1094+
eventProcessLoop.post(JobGroupCancelled(groupId, cancelFutureJobs))
10881095
}
10891096

10901097
/**
@@ -1180,7 +1187,16 @@ private[spark] class DAGScheduler(
11801187
jobsThatUseStage.find(jobIdToActiveJob.contains)
11811188
}
11821189

1183-
private[scheduler] def handleJobGroupCancelled(groupId: String): Unit = {
1190+
private[scheduler] def handleJobGroupCancelled(
1191+
groupId: String,
1192+
cancelFutureJobs: Boolean): Unit = {
1193+
// If cancelFutureJobs is true, store the cancelled job group id into internal states.
1194+
// When a job belonging to this job group is submitted, skip running it.
1195+
if (cancelFutureJobs) {
1196+
logInfo(s"Add job group $groupId into cancelled job groups")
1197+
cancelledJobGroups.add(groupId)
1198+
}
1199+
11841200
// Cancel all jobs belonging to this job group.
11851201
// First finds all active jobs with this group id, and then kill stages for them.
11861202
val activeInGroup = activeJobs.filter { activeJob =>
@@ -1264,14 +1280,24 @@ private[spark] class DAGScheduler(
12641280
listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
12651281
}
12661282

1267-
private[scheduler] def handleJobSubmitted(jobId: Int,
1283+
private[scheduler] def handleJobSubmitted(
1284+
jobId: Int,
12681285
finalRDD: RDD[_],
12691286
func: (TaskContext, Iterator[_]) => _,
12701287
partitions: Array[Int],
12711288
callSite: CallSite,
12721289
listener: JobListener,
12731290
artifacts: JobArtifactSet,
12741291
properties: Properties): Unit = {
1292+
// If this job belongs to a cancelled job group, skip running it
1293+
val jobGroupIdOpt = Option(properties).map(_.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
1294+
if (jobGroupIdOpt.exists(cancelledJobGroups.contains(_))) {
1295+
listener.jobFailed(
1296+
SparkCoreErrors.sparkJobCancelledAsPartOfJobGroupError(jobId, jobGroupIdOpt.get))
1297+
logInfo(s"Skip running a job that belongs to the cancelled job group ${jobGroupIdOpt.get}.")
1298+
return
1299+
}
1300+
12751301
var finalStage: ResultStage = null
12761302
try {
12771303
// New stage creation may throw an exception if, for example, jobs are run on a
@@ -2727,7 +2753,9 @@ private[spark] class DAGScheduler(
27272753
logDebug("Trying to cancel unregistered job " + jobId)
27282754
} else {
27292755
failJobAndIndependentStages(
2730-
jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, reason.getOrElse("")))
2756+
job = jobIdToActiveJob(jobId),
2757+
error = SparkCoreErrors.sparkJobCancelled(jobId, reason.getOrElse(""), null)
2758+
)
27312759
}
27322760
}
27332761

@@ -2787,7 +2815,10 @@ private[spark] class DAGScheduler(
27872815
failedStage.latestInfo.completionTime = Some(clock.getTimeMillis())
27882816
updateStageInfoForPushBasedShuffle(failedStage)
27892817
for (job <- dependentJobs) {
2790-
failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason", exception)
2818+
failJobAndIndependentStages(
2819+
job,
2820+
new SparkException(s"Job aborted due to stage failure: $reason", cause = exception.orNull)
2821+
)
27912822
}
27922823
if (dependentJobs.isEmpty) {
27932824
logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done")
@@ -2845,13 +2876,11 @@ private[spark] class DAGScheduler(
28452876
/** Fails a job and all stages that are only used by that job, and cleans up relevant state. */
28462877
private def failJobAndIndependentStages(
28472878
job: ActiveJob,
2848-
failureReason: String,
2849-
exception: Option[Throwable] = None): Unit = {
2850-
if (cancelRunningIndependentStages(job, failureReason)) {
2879+
error: SparkException): Unit = {
2880+
if (cancelRunningIndependentStages(job, error.getMessage)) {
28512881
// SPARK-15783 important to cleanup state first, just for tests where we have some asserts
28522882
// against the state. Otherwise we have a *little* bit of flakiness in the tests.
28532883
cleanupStateForJobAndIndependentStages(job)
2854-
val error = new SparkException(failureReason, exception.orNull)
28552884
job.listener.jobFailed(error)
28562885
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
28572886
}
@@ -3010,8 +3039,8 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
30103039
case JobCancelled(jobId, reason) =>
30113040
dagScheduler.handleJobCancellation(jobId, reason)
30123041

3013-
case JobGroupCancelled(groupId) =>
3014-
dagScheduler.handleJobGroupCancelled(groupId)
3042+
case JobGroupCancelled(groupId, cancelFutureJobs) =>
3043+
dagScheduler.handleJobGroupCancelled(groupId, cancelFutureJobs)
30153044

30163045
case JobTagCancelled(tag) =>
30173046
dagScheduler.handleJobTagCancelled(tag)
@@ -3092,3 +3121,17 @@ private[spark] object DAGScheduler {
30923121
// as more failure events come in
30933122
val RESUBMIT_TIMEOUT = 200
30943123
}
3124+
3125+
/**
3126+
* A NOT thread-safe set that only keeps the last `capacity` elements added to it.
3127+
*/
3128+
private[scheduler] class LimitedSizeFIFOSet[T](val capacity: Int) {
3129+
private val set = scala.collection.mutable.LinkedHashSet[T]()
3130+
def add(t: T): Unit = {
3131+
set += t
3132+
if (set.size > capacity) {
3133+
set -= set.head
3134+
}
3135+
}
3136+
def contains(t: T): Boolean = set.contains(t)
3137+
}

core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,10 @@ private[scheduler] case class JobCancelled(
6363
reason: Option[String])
6464
extends DAGSchedulerEvent
6565

66-
private[scheduler] case class JobGroupCancelled(groupId: String) extends DAGSchedulerEvent
66+
private[scheduler] case class JobGroupCancelled(
67+
groupId: String,
68+
cancelFutureJobs: Boolean = false)
69+
extends DAGSchedulerEvent
6770

6871
private[scheduler] case class JobTagCancelled(tagName: String) extends DAGSchedulerEvent
6972

core/src/test/scala/org/apache/spark/JobCancellationSuite.scala

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,84 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
153153
assert(jobB.get() === 100)
154154
}
155155

156+
test("if cancel job group and future jobs, skip running jobs in the same job group") {
157+
sc = new SparkContext("local[2]", "test")
158+
159+
val sem = new Semaphore(0)
160+
sc.addSparkListener(new SparkListener {
161+
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
162+
sem.release()
163+
}
164+
})
165+
166+
// run a job, cancel the job group and its future jobs
167+
val jobGroupName = "job-group"
168+
val job = Future {
169+
sc.setJobGroup(jobGroupName, "")
170+
sc.parallelize(1 to 1000).map { i => Thread.sleep (100); i}.count()
171+
}
172+
// block until job starts
173+
sem.acquire(1)
174+
// cancel the job group and future jobs
175+
sc.cancelJobGroupAndFutureJobs(jobGroupName)
176+
ThreadUtils.awaitReady(job, Duration.Inf).failed.foreach { case e: SparkException =>
177+
checkError(
178+
exception = e,
179+
errorClass = "SPARK_JOB_CANCELLED",
180+
sqlState = "XXKDA",
181+
parameters = scala.collection.immutable.Map(
182+
"jobId" -> "0",
183+
"reason" -> s"part of cancelled job group $jobGroupName")
184+
)
185+
}
186+
187+
// job in the same job group will not run
188+
checkError(
189+
exception = intercept[SparkException] {
190+
sc.setJobGroup(jobGroupName, "")
191+
sc.parallelize(1 to 100).count()
192+
},
193+
errorClass = "SPARK_JOB_CANCELLED",
194+
sqlState = "XXKDA",
195+
parameters = scala.collection.immutable.Map(
196+
"jobId" -> "1",
197+
"reason" -> s"part of cancelled job group $jobGroupName")
198+
)
199+
200+
// job in a different job group should run
201+
sc.setJobGroup("another-job-group", "")
202+
assert(sc.parallelize(1 to 100).count() == 100)
203+
}
204+
205+
test("only keeps limited number of cancelled job groups") {
206+
val conf = new SparkConf()
207+
.set(NUM_CANCELLED_JOB_GROUPS_TO_TRACK, 5)
208+
sc = new SparkContext("local[2]", "test", conf)
209+
val setSize = sc.getConf.get(NUM_CANCELLED_JOB_GROUPS_TO_TRACK)
210+
// call cancelJobGroup with cancelFutureJobs = true on (setSize + 1) job groups, the first one
211+
// should have been evicted from the cancelledJobGroups set
212+
(0 to setSize).foreach { idx =>
213+
val sem = new Semaphore(0)
214+
sc.addSparkListener(new SparkListener {
215+
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
216+
sem.release()
217+
}
218+
})
219+
val job = Future {
220+
sc.setJobGroup(s"job-group-$idx", "")
221+
sc.parallelize(1 to 1000).map { i => Thread.sleep (100); i}.count()
222+
}
223+
sem.acquire(1)
224+
sc.cancelJobGroupAndFutureJobs(s"job-group-$idx")
225+
ThreadUtils.awaitReady(job, Duration.Inf).failed.foreach { case e: SparkException =>
226+
assert(e.getErrorClass == "SPARK_JOB_CANCELLED")
227+
}
228+
}
229+
// submit a job with the 0 job group that was evicted from cancelledJobGroups set, it should run
230+
sc.setJobGroup("job-group-0", "")
231+
assert(sc.parallelize(1 to 100).count() == 100)
232+
}
233+
156234
test("job tags") {
157235
sc = new SparkContext("local[2]", "test")
158236

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,12 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
724724
assert(numResults === 0)
725725
cancel(jobId)
726726
assert(failureReason.isDefined)
727-
assert(failureReason.get.getMessage() === "Job 0 cancelled ")
727+
checkError(
728+
exception = failureReason.get.asInstanceOf[SparkException],
729+
errorClass = "SPARK_JOB_CANCELLED",
730+
sqlState = "XXKDA",
731+
parameters = scala.collection.immutable.Map("jobId" -> "0", "reason" -> "")
732+
)
728733
}
729734

730735
test("run trivial job") {
@@ -841,7 +846,12 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
841846
val rdd = new MyRDD(sc, 1, Nil)
842847
val jobId = submit(rdd, Array(0))
843848
cancel(jobId)
844-
assert(failure.getMessage === s"Job $jobId cancelled ")
849+
checkError(
850+
exception = failure.asInstanceOf[SparkException],
851+
errorClass = "SPARK_JOB_CANCELLED",
852+
sqlState = "XXKDA",
853+
parameters = scala.collection.immutable.Map("jobId" -> jobId.toString, "reason" -> "")
854+
)
845855
assert(sparkListener.failedStages === Seq(0))
846856
assertDataStructuresEmpty()
847857
}

docs/sql-error-conditions.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1850,6 +1850,12 @@ The seed expression `<seedExpr>` of the expression `<exprWithSeed>` must be fold
18501850

18511851
sortBy must be used together with bucketBy.
18521852

1853+
### SPARK_JOB_CANCELLED
1854+
1855+
[SQLSTATE: XXKDA](sql-error-conditions-sqlstates.html#class-XX-internal-error)
1856+
1857+
Job `<jobId>` cancelled `<reason>`
1858+
18531859
### SPECIFY_BUCKETING_IS_NOT_ALLOWED
18541860

18551861
[SQLSTATE: 42601](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

0 commit comments

Comments
 (0)