Skip to content

Commit 06f57aa

Browse files
qwwdfsadelizarov
authored andcommitted
JobSupport cancellation rework:
Change cancel contract: if it returns true, then Job is responsible for handling given exception Introduce handleJobException as the way to handle "final" exception on job completion Do not report intermediate exceptions to exception handler Propagate exceptions up to the Job which is responsible for exception handling Change handleCoroutineException contract, so custom exception handlers can't break coroutines machinery Unwrap JobCancellationException to provide exception transparency over whole call chain Fixes #333, #452 and #451
1 parent 3bda22c commit 06f57aa

33 files changed

+1256
-135
lines changed

common/kotlinx-coroutines-core-common/src/AbstractCoroutine.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public abstract class AbstractCoroutine<in T>(
110110
makeCompletingOnce(CompletedExceptionally(exception), defaultResumeMode)
111111
}
112112

113-
internal final override fun handleException(exception: Throwable) {
113+
internal final override fun handleOnCompletionException(exception: Throwable) {
114114
handleCoroutineException(parentContext, exception)
115115
}
116116

common/kotlinx-coroutines-core-common/src/Builders.common.kt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,15 @@ private open class StandaloneCoroutine(
157157
active: Boolean
158158
) : AbstractCoroutine<Unit>(parentContext, active) {
159159
override fun hasOnFinishingHandler(update: Any?) = update is CompletedExceptionally
160+
161+
override fun handleJobException(exception: Throwable) {
162+
handleCoroutineException(parentContext, exception)
163+
}
164+
160165
override fun onFinishingInternal(update: Any?) {
161-
// note the use of the parent's job context below!
162-
if (update is CompletedExceptionally) handleCoroutineException(parentContext, update.cause)
166+
if (update is CompletedExceptionally && update.cause !is CancellationException) {
167+
parentContext[Job]?.cancel(update.cause)
168+
}
163169
}
164170
}
165171

common/kotlinx-coroutines-core-common/src/CoroutineExceptionHandler.kt

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,26 +12,36 @@ internal expect fun handleCoroutineExceptionImpl(context: CoroutineContext, exce
1212
* Helper function for coroutine builder implementations to handle uncaught exception in coroutines.
1313
*
1414
* It tries to handle uncaught exception in the following way:
15-
* * If there is [CoroutineExceptionHandler] in the context, then it is used.
16-
* * Otherwise, if exception is [CancellationException] then it is ignored
17-
* (because that is the supposed mechanism to cancel the running coroutine)
18-
* * Otherwise:
19-
* * if there is a [Job] in the context, then [Job.cancel] is invoked;
20-
* * all instances of [CoroutineExceptionHandler] found via [ServiceLoader] are invoked;
21-
* * current thread's [Thread.uncaughtExceptionHandler] is invoked.
15+
* If current exception is [CancellationException], it's ignored: [CancellationException] is a normal way to cancel
16+
* coroutine.
17+
*
18+
* If there is a [Job] in the context, then [Job.cancel] is invoked.
19+
* If invocation returned `true`, method terminates: now [Job] is responsible for handling an exception.
20+
* Otherwise, If there is [CoroutineExceptionHandler] in the context, it is used.
21+
* Otherwise all instances of [CoroutineExceptionHandler] found via [ServiceLoader] and [Thread.uncaughtExceptionHandler] are invoked
2222
*/
2323
public fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
2424
// if exception handling fails, make sure the original exception is not lost
2525
try {
26+
27+
// Ignore CancellationException (they are normal ways to terminate a coroutine)
28+
if (exception is CancellationException) {
29+
return
30+
}
31+
32+
// If parent is successfully cancelled, we're done, it is now its responsibility to handle the exception
33+
val parent = context[Job]
34+
if (parent != null && parent.cancel(exception)) {
35+
return
36+
}
37+
38+
// If not, invoke exception handler from the context
2639
context[CoroutineExceptionHandler]?.let {
2740
it.handleException(context, exception)
2841
return
2942
}
30-
// ignore CancellationException (they are normal means to terminate a coroutine)
31-
if (exception is CancellationException) return
32-
// try cancel job in the context
33-
context[Job]?.cancel(exception)
34-
// platform-specific
43+
44+
// If handler is not present in the context, fallback to the global handler
3545
handleCoroutineExceptionImpl(context, exception)
3646
} catch (handlerException: Throwable) {
3747
// simply rethrow if handler threw the original exception

common/kotlinx-coroutines-core-common/src/Job.kt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,10 +149,12 @@ public interface Job : CoroutineContext.Element {
149149
public fun start(): Boolean
150150

151151
/**
152-
* Cancels this job with an optional cancellation [cause]. The result is `true` if this job was
153-
* cancelled as a result of this invocation and `false` otherwise
154-
* (if it was already _completed_ or if it is [NonCancellable]).
155-
* Repeated invocations of this function have no effect and always produce `false`.
152+
* Cancels this job with an optional cancellation [cause].
153+
* The result is `true` if this job was either cancelled as a result of this invocation
154+
* or it's being cancelled and given [cause] was successfully received by the job and will be properly handled, `false` otherwise.
155+
*
156+
* If this method returned `false`, then caller is responsible for handling [cause].
157+
* If job is already completed, method returns `false`
156158
*
157159
* When cancellation has a clear reason in the code, an instance of [CancellationException] should be created
158160
* at the corresponding original cancellation site and passed into this method to aid in debugging by providing

common/kotlinx-coroutines-core-common/src/JobSupport.kt

Lines changed: 158 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -155,12 +155,107 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
155155
* If this method succeeds, state of this job will never be changed again
156156
*/
157157
private fun tryFinalizeState(expect: Incomplete, proposedUpdate: Any?, mode: Int): Boolean {
158+
/*
159+
* If job is in 'cancelling' state and we're finalizing job state, we start intricate dance:
160+
* 1) Synchronize on state to avoid races with concurrent
161+
* mutations (e.g. when new child is added)
162+
* 2) After synchronization check we're still in the expected state
163+
* 3) Aggregate final exception under the same lock which protects exceptions
164+
* collection
165+
* 4) Pass it upstream
166+
*/
167+
if (expect is Finishing && expect.cancelled != null) {
168+
val finalException = synchronized(expect) {
169+
if (_state.value !== expect) {
170+
return false
171+
}
172+
173+
if (proposedUpdate is CompletedExceptionally) {
174+
expect.addLocked(proposedUpdate.cause)
175+
}
176+
177+
/*
178+
* Note that new exceptions cannot be added concurrently: state is guarded by lock
179+
* and storage is sealed in the end, so all new exceptions will be reported separately
180+
*/
181+
buildException(expect).also { expect.seal() }
182+
}
183+
184+
val update = Cancelled(this, finalException ?: expect.cancelled.cause)
185+
handleJobException(update.cause)
186+
// This CAS never fails: we're in the state when no jobs can be attached, because state is already sealed
187+
if (!tryFinalizeState(expect, update)) {
188+
val error = AssertionError("Unexpected state: ${_state.value}, expected: $expect, update: $update")
189+
handleOnCompletionException(error)
190+
throw error
191+
}
192+
193+
completeStateFinalization(expect, update, mode)
194+
return true
195+
}
196+
158197
val update = coerceProposedUpdate(expect, proposedUpdate)
159198
if (!tryFinalizeState(expect, update)) return false
160199
completeStateFinalization(expect, update, mode)
161200
return true
162201
}
163202

203+
private fun buildException(state: Finishing): Throwable? {
204+
val cancelled = state.cancelled!!
205+
val suppressed = state.exceptions
206+
207+
/*
208+
* This is a place where we step on our API limitation:
209+
* We can't distinguish internal JobCancellationException from our parent
210+
* from external cancellation, thus we ought to collect all exceptions.
211+
*
212+
* But it has negative consequences: same exception can be added as suppressed more than once.
213+
* Consider concurrent parent-child relationship:
214+
* 1) Child throws E1 and parent throws E2
215+
* 2) Parent goes to "Cancelling(E1)" and cancels child with E1
216+
* 3) Child goes to "Cancelling(E1)", but throws an exception E2
217+
* 4) When child throws, it notifies parent that he is cancelling, adding its exception to parent list of exceptions
218+
* (again, parent don't know whether it's child exception or external exception)
219+
* 5) Child builds final exception: E1 with suppressed E2, reports it to parent
220+
* 6) Parent aggregates three exceptions: original E1, reported E2 and "final" E1.
221+
* It filters the third exception, but adds the second one to the first one, thus adding suppressed duplicate.
222+
*
223+
* Note that it's only happening when both parent and child throw exception simultaneously
224+
*/
225+
var rootCause = cancelled.cause
226+
if (rootCause is JobCancellationException) {
227+
val cause = unwrap(rootCause)
228+
rootCause = if (cause !== null) {
229+
cause
230+
} else {
231+
suppressed.firstOrNull { unwrap(it) != null } ?: return rootCause
232+
}
233+
}
234+
235+
val seenExceptions = HashSet<Throwable>() // TODO it should be identity set
236+
suppressed.forEach {
237+
val unwrapped = unwrap(it)
238+
if (unwrapped !== null && unwrapped !== rootCause) {
239+
if (seenExceptions.add(unwrapped)) {
240+
rootCause.addSuppressedThrowable(unwrapped)
241+
}
242+
}
243+
}
244+
245+
return rootCause
246+
}
247+
248+
private tailrec fun unwrap(exception: Throwable): Throwable? {
249+
if (exception is JobCancellationException) {
250+
val cause = exception.cause
251+
if (cause !== null) return unwrap(cause)
252+
return null
253+
254+
} else {
255+
return exception
256+
}
257+
}
258+
164259
/**
165260
* Tries to update [_state] of this job to the final state and, if
166261
* succeeds, disposes parent handle (de-attaching child from parent)
@@ -200,7 +295,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
200295
try {
201296
expect.invoke(cause)
202297
} catch (ex: Throwable) {
203-
handleException(CompletionHandlerException("Exception in completion handler $expect for $this", ex))
298+
handleOnCompletionException(CompletionHandlerException("Exception in completion handler $expect for $this", ex))
204299
}
205300
} else {
206301
expect.list?.notifyCompletion(cause)
@@ -225,8 +320,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
225320
if (proposedUpdate !is CompletedExceptionally) return cancelled // not exception -- just use original cancelled
226321
val exception = proposedUpdate.cause
227322
if (cancelled.cause == exception) return cancelled // that is the cancelled we need already!
228-
// todo: We need to rework this logic to keep original cancellation cause in the state and suppress other exceptions
229-
// that could have occurred while coroutine is being cancelled.
323+
// That could have occurred while coroutine is being cancelled.
230324
// Do not spam with JCE in suppressed exceptions
231325
if (cancelled.cause !is JobCancellationException) {
232326
exception.addSuppressedThrowable(cancelled.cause)
@@ -251,7 +345,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
251345
}
252346
}
253347
}
254-
exception?.let { handleException(it) }
348+
exception?.let { handleOnCompletionException(it) }
255349
}
256350

257351
public final override fun start(): Boolean {
@@ -509,7 +603,15 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
509603
}
510604
}
511605
is Finishing -> { // Completing/Cancelling the job, may cancel
512-
if (state.cancelled != null) return false // already cancelling
606+
if (state.cancelled != null) {
607+
if (cause == null) {
608+
return true
609+
}
610+
611+
// We either successfully added an exception or caller should handle it itself
612+
return cause.let { state.addException(it) }
613+
}
614+
513615
if (tryMakeCancelling(state, state.list, cause)) return true
514616
}
515617
else -> { // is inactive
@@ -666,20 +768,24 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
666768
* installed via [invokeOnCompletion].
667769
* @suppress **This is unstable API and it is subject to change.**
668770
*/
669-
internal open fun handleException(exception: Throwable) {
771+
internal open fun handleOnCompletionException(exception: Throwable) {
670772
throw exception
671773
}
672774

673775
/**
674-
* This function is invoked once when job is cancelled or is completed, similarly to [invokeOnCompletion] with
675-
* `onCancelling` set to `true`.
776+
* This function is invoked once when job is cancelled or is completed.
777+
* It's an optimization for [invokeOnCompletion] with `onCancelling` set to `true`.
778+
*
676779
* @param exceptionally not null when the the job was cancelled or completed exceptionally,
677780
* null when it has completed normally.
678-
* @suppress **This is unstable API and it is subject to change.**
781+
* @suppress **This is unstable API and it is subject to change.*
679782
*/
680-
internal open fun onCancellationInternal(exceptionally: CompletedExceptionally?) {}
783+
internal open fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
784+
// TODO rename to "onCancelling"
785+
}
681786

682787
/**
788+
* Whether job has [onFinishingInternal] handler for given [update]
683789
* @suppress **This is unstable API and it is subject to change.**
684790
*/
685791
internal open fun hasOnFinishingHandler(update: Any?) = false
@@ -689,6 +795,12 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
689795
*/
690796
internal open fun onFinishingInternal(update: Any?) {}
691797

798+
/**
799+
* Method which is invoked once Job becomes `Cancelled`. It's guaranteed that at the moment
800+
* of invocation the job and all its children are complete
801+
*/
802+
internal open fun handleJobException(exception: Throwable) {}
803+
692804
/**
693805
* Override for post-completion actions that need to do something with the state.
694806
* @param mode completion mode.
@@ -726,6 +838,36 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
726838
@JvmField val completing: Boolean /* true when completing */
727839
) : Incomplete {
728840
override val isActive: Boolean get() = cancelled == null
841+
val exceptions: List<Throwable> get() = _exceptionsHolder as List<Throwable>
842+
843+
// TODO optimize
844+
private var _exceptionsHolder: Any? = if (cancelled == null) null else ArrayList<Throwable>(2)
845+
846+
fun addException(exception: Throwable): Boolean {
847+
synchronized(this) {
848+
return if (_exceptionsHolder == null) {
849+
false
850+
} else {
851+
@Suppress("UNCHECKED_CAST")
852+
(_exceptionsHolder as MutableList<Throwable>).add(exception)
853+
true
854+
}
855+
}
856+
}
857+
858+
fun addLocked(exception: Throwable) {
859+
(_exceptionsHolder as MutableList<Throwable>).add(exception)
860+
}
861+
862+
/**
863+
* Seals current state. After [seal] call all consecutive calls to [addException]
864+
* return `false` forcing callers to handle pending exception by themselves.
865+
* This call should be guarded by `synchronized(finishingState)`
866+
*/
867+
fun seal() {
868+
_exceptionsHolder = null
869+
}
870+
729871
}
730872

731873
private val Incomplete.isCancelling: Boolean
@@ -854,6 +996,12 @@ private class Empty(override val isActive: Boolean) : Incomplete {
854996
internal class JobImpl(parent: Job? = null) : JobSupport(true) {
855997
init { initParentJobInternal(parent) }
856998
override val onCancelMode: Int get() = ON_CANCEL_MAKE_COMPLETING
999+
1000+
override fun cancel(cause: Throwable?): Boolean {
1001+
// JobImpl can't handle an exception, thus always returns false
1002+
super.cancel(cause)
1003+
return false
1004+
}
8571005
}
8581006

8591007
// -------- invokeOnCompletion nodes

common/kotlinx-coroutines-core-common/test/AbstractCoroutineTest.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import kotlin.coroutines.experimental.*
88
import kotlin.test.*
99

1010
class AbstractCoroutineTest : TestBase() {
11+
1112
@Test
1213
fun testNotifications() = runTest {
1314
expect(1)
@@ -67,18 +68,21 @@ class AbstractCoroutineTest : TestBase() {
6768
}
6869

6970
override fun onCompletedExceptionally(exception: Throwable) {
70-
assertTrue(exception is TestException1)
71+
assertTrue(exception is TestException0)
7172
expect(9)
7273
}
7374
}
75+
7476
coroutine.invokeOnCompletion(onCancelling = true) {
7577
assertTrue(it is TestException0)
7678
expect(6)
7779
}
80+
7881
coroutine.invokeOnCompletion {
79-
assertTrue(it is TestException1)
82+
assertTrue(it is TestException0)
8083
expect(8)
8184
}
85+
8286
expect(2)
8387
coroutine.start()
8488
expect(4)

common/kotlinx-coroutines-core-common/test/AsyncLazyTest.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import kotlin.coroutines.experimental.*
1010
import kotlin.test.*
1111

1212
class AsyncLazyTest : TestBase() {
13+
1314
@Test
1415
fun testSimple() = runTest {
1516
expect(1)
@@ -180,7 +181,7 @@ class AsyncLazyTest : TestBase() {
180181
assertTrue(d.isActive && !d.isCompleted && !d.isCancelled)
181182
assertTrue(d.cancel())
182183
assertTrue(!d.isActive && !d.isCompletedExceptionally && d.isCancelled) // cancelling !
183-
assertTrue(!d.cancel())
184+
assertTrue(d.cancel())
184185
assertTrue(!d.isActive && !d.isCompletedExceptionally && d.isCancelled) // still cancelling
185186
finish(6)
186187
assertTrue(d.await() == 42) // await shall throw CancellationException

0 commit comments

Comments
 (0)