Skip to content

Commit c758169

Browse files
committed
Handle job exception via PerformOp
1 parent 563da40 commit c758169

File tree

3 files changed

+29
-13
lines changed

3 files changed

+29
-13
lines changed

common/kotlinx-coroutines-core-common/src/JobSupport.kt

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,17 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
252252
*/
253253
private fun tryFinalizeStateActually(expect: Incomplete, update: Any?, mode: Int): Boolean {
254254
require(update !is Incomplete) // only incomplete -> completed transition is allowed
255-
if (!_state.compareAndSet(expect, update)) return false // failed
255+
256+
/*
257+
* We're publishing CompletedExceptionally as OpDescriptor to avoid races with parent:
258+
* Job can't report exception before CAS (as it can fail), but after CAS there is a small window
259+
* where the parent is considering this job (child) completed, though child has not yet reported its exception.
260+
*/
261+
val updateValue = if (update is CompletedExceptionally) HandleExceptionOp(update) else update
262+
if (!_state.compareAndSet(expect, updateValue)) return false // failed
263+
if (updateValue is HandleExceptionOp) {
264+
updateValue.perform(this) // help perform
265+
}
256266
completeStateFinalization(expect, update, mode)
257267
return true
258268
}
@@ -262,28 +272,22 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
262272
* Now the job in THE FINAL state. We need to properly handle the resulting state.
263273
* Order of various invocations here is important.
264274
*
265-
* 1) Standalone coroutines (launch/job) cancel their parent.
266-
*/
267-
if (update is CompletedExceptionally) {
268-
handleJobException(update.cause)
269-
}
270-
/*
271-
* 2) Unregister from parent job.
275+
* 1) Unregister from parent job.
272276
*/
273277
parentHandle?.let {
274278
it.dispose() // volatile read parentHandle _after_ state was updated
275279
parentHandle = NonDisposableHandle // release it just in case, to aid GC
276280
}
277281
val exceptionally = update as? CompletedExceptionally
278282
/*
279-
* 3) Invoke onCancellationInternal: exception handling, resource cancellation etc.
283+
* 2) Invoke onCancellationInternal: exception handling, resource cancellation etc.
280284
* Only notify on cancellation once (expect.isCancelling)
281285
*/
282286
if (!expect.isCancelling) {
283287
onCancellationInternal(exceptionally)
284288
}
285289
/*
286-
* 4) Invoke completion handlers: .join(), callbacks etc.
290+
* 3) Invoke completion handlers: .join(), callbacks etc.
287291
* It's important to invoke them only AFTER exception handling, see #208
288292
*/
289293
val cause = exceptionally?.cause
@@ -297,7 +301,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
297301
expect.list?.notifyCompletion(cause)
298302
}
299303
/*
300-
* 5) Invoke onCompletionInternal: onNext(), timeout deregistration etc.
304+
* 4) Invoke onCompletionInternal: onNext(), timeout deregistration etc.
301305
* It should be last so all callbacks observe consistent state
302306
* of the job which doesn't depend on callback scheduling.
303307
*/
@@ -1013,6 +1017,18 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
10131017
else
10141018
block.startCoroutineCancellable(state as T, select.completion)
10151019
}
1020+
1021+
class HandleExceptionOp(val original: CompletedExceptionally) : OpDescriptor() {
1022+
1023+
override fun perform(affected: Any?): Any? {
1024+
val job = (affected as JobSupport)
1025+
if (job._state.compareAndSet(this, original)) {
1026+
job.handleJobException(original.cause)
1027+
}
1028+
1029+
return null
1030+
}
1031+
}
10161032
}
10171033

10181034
// --------------- helper classes & constants for job implementation

core/kotlinx-coroutines-core/test/WithContextCancellationStressTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ class WithContextCancellationStressTest : TestBase() {
3131
repeat(iterations) {
3232
val barrier = CyclicBarrier(4)
3333
val jobWithContext = async(pool) {
34-
barrier.await()
3534
withContext(wrapperDispatcher(coroutineContext), start = CoroutineStart.ATOMIC) {
35+
barrier.await()
3636
throw IOException()
3737
}
3838
}

core/kotlinx-coroutines-core/test/exceptions/JobExceptionsStressTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class JobExceptionsStressTest : TestBase() {
2828
* Owner: launch 3 children, every of it throws an exception, and then call delay()
2929
* Result: one of the exceptions with the rest two as suppressed
3030
*/
31-
repeat(100 * stressTestMultiplier) {
31+
repeat(1000 * stressTestMultiplier) {
3232
val exception = runBlock(executor) {
3333
val barrier = CyclicBarrier(4)
3434
val job = launch(coroutineContext.minusKey(Job)) {

0 commit comments

Comments
 (0)