Skip to content

Commit a36ab94

Browse files
committed
Failed parent Job cancels all children jobs, then waits them
Also improved debug output a little bit
1 parent a9fb479 commit a36ab94

File tree

5 files changed

+132
-45
lines changed

5 files changed

+132
-45
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,5 +242,4 @@ private class BlockingCoroutine<T>(
242242
(state as? CompletedExceptionally)?.let { throw it.exception }
243243
return state as T
244244
}
245-
246245
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -113,19 +113,23 @@ internal class DispatchTask<in T>(
113113
) : Runnable {
114114
@Suppress("UNCHECKED_CAST")
115115
override fun run() {
116-
val context = continuation.context
117-
val job = if (cancellable) context[Job] else null
118-
withCoroutineContext(context) {
119-
when {
120-
job != null && !job.isActive -> continuation.resumeWithException(job.getCancellationException())
121-
exception -> continuation.resumeWithException(value as Throwable)
122-
else -> continuation.resume(value as T)
116+
try {
117+
val context = continuation.context
118+
val job = if (cancellable) context[Job] else null
119+
withCoroutineContext(context) {
120+
when {
121+
job != null && !job.isActive -> continuation.resumeWithException(job.getCancellationException())
122+
exception -> continuation.resumeWithException(value as Throwable)
123+
else -> continuation.resume(value as T)
124+
}
123125
}
126+
} catch (e: Throwable) {
127+
throw RuntimeException("Unexpected exception running $this", e)
124128
}
125129
}
126130

127131
override fun toString(): String =
128-
"DispatchTask[$value, cancellable=$cancellable, ${continuation.toDebugString()}]"
132+
"DispatchTask[${continuation.toDebugString()}, cancellable=$cancellable, value=${value.toSafeString()}]"
129133
}
130134

131135
internal class DispatchedContinuation<in T>(
@@ -186,16 +190,8 @@ internal class DispatchedContinuation<in T>(
186190
dispatcher.dispatch(context, DispatchTask(continuation, value,false, true))
187191
}
188192

189-
override fun toString(): String = "DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]"
190-
}
191-
192-
// **KLUDGE**: there is no reason to include continuation into debug string until the following ticket is resolved:
193-
// KT-18986 Debug-friendly toString implementation for CoroutineImpl
194-
// (the current string representation of continuation is useless and uses buggy reflection internals)
195-
// So, this function is a replacement that extract a usable information from continuation -> its class name, at least
196-
internal fun Continuation<*>.toDebugString(): String = when (this) {
197-
is DispatchedContinuation -> toString()
198-
else -> this::class.java.name
193+
override fun toString(): String =
194+
"DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]"
199195
}
200196

201197
internal fun <T> Continuation<T>.resumeCancellable(value: T) = when (this) {
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import kotlin.coroutines.experimental.Continuation
20+
21+
// internal debugging tools
22+
23+
internal val Any.hexAddress: String
24+
get() = Integer.toHexString(System.identityHashCode(this))
25+
26+
internal fun Any?.toSafeString(): String =
27+
try { toString() }
28+
catch (e: Throwable) { "toString() failed with $e" }
29+
30+
// **KLUDGE**: there is no reason to include continuation into debug string until the following ticket is resolved:
31+
// KT-18986 Debug-friendly toString implementation for CoroutineImpl
32+
// (the current string representation of continuation is useless and uses buggy reflection internals)
33+
// So, this function is a replacement that extract a usable information from continuation -> its class name, at least
34+
internal fun Continuation<*>.toDebugString(): String = when (this) {
35+
is DispatchedContinuation -> toString()
36+
else -> "${this::class.java.name}@$hexAddress"
37+
}
38+

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
5252
* | _Completing_ (optional transient state) | `true` | `false` | `false` |
5353
* | _Cancelling_ (optional transient state) | `false` | `false` | `true` |
5454
* | _Cancelled_ (final state) | `false` | `true` | `true` |
55-
* | _Completed normally_ (final state) | `false` | `true` | `false` |
55+
* | _Completed_ (final state) | `false` | `true` | `false` |
5656
*
5757
* Usually, a job is created in _active_ state (it is created and started). However, coroutine builders
5858
* that provide an optional `start` parameter create a coroutine in _new_ state when this parameter is set to
@@ -68,8 +68,8 @@ import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
6868
* wait children
6969
* +-----+ start +--------+ complete +-------------+ finish +-----------+
7070
* | New | ---------------> | Active | -----------> | Completing | -------> | Completed |
71-
* +-----+ +--------+ +-------------+ | normally |
72-
* | | | +-----------+
71+
* +-----+ +--------+ +-------------+ +-----------+
72+
* | | |
7373
* | cancel | cancel | cancel
7474
* V V |
7575
* +-----------+ finish +------------+ |
@@ -82,7 +82,7 @@ import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
8282
* A job is active while the coroutine is working and job's cancellation aborts the coroutine when
8383
* the coroutine is suspended on a _cancellable_ suspension point by throwing [CancellationException].
8484
*
85-
* A job can have a _parent_ job. A job with a parent is cancelled when its parent is cancelled or completes.
85+
* A job can have a _parent_ job. A job with a parent is cancelled when its parent is cancelled or completes exceptionally.
8686
* Parent job waits for all its [children][attachChild] to complete in _completing_ or _cancelling_ state.
8787
* _Completing_ state is purely internal to the job. For an outside observer a _completing_ job is still active,
8888
* while internally it is waiting for its children.
@@ -185,9 +185,10 @@ public interface Job : CoroutineContext.Element {
185185
* returns a handle that should be used to detach it.
186186
*
187187
* A parent-child relation has the following effect:
188-
* * Cancellation of parent with [cancel] immediately cancels all its children with the same cause.
188+
* * Cancellation of parent with [cancel] or its exceptional completion (failure)
189+
* immediately cancels all its children.
189190
* * Parent cannot complete until all its children are complete. Parent waits for all its children to
190-
* complete first in _completing_ or _cancelling_ state.
191+
* complete in _completing_ or _cancelling_ state.
191192
*
192193
* A child must store the resulting [DisposableHandle] and [dispose][DisposableHandle.dispose] the attachment
193194
* to its parent on its own completion.
@@ -988,30 +989,38 @@ public open class JobSupport(active: Boolean) : Job, SelectClause0, SelectClause
988989
throw IllegalStateException("Job $this is already complete, but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull)
989990
if (state is Finishing && state.completing)
990991
throw IllegalStateException("Job $this is already completing, but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull)
991-
val waitChild: Child = firstChild(state) ?: // or else complete immediately
992+
val child: Child = firstChild(state) ?: // or else complete immediately w/o children
992993
if (updateState(state, proposedUpdate, mode)) return true else return@loopOnState
993-
// switch to completing state
994+
// must promote to list to correct operate on child lists
994995
if (state is JobNode<*>) {
995-
// must promote to list to make completing & retry
996996
promoteSingleToNodeList(state)
997-
} else {
998-
val completing = Finishing(state.list!!, (state as? Finishing)?.cancelled, true)
999-
if (_state.compareAndSet(state, completing)) {
1000-
waitForChild(waitChild, proposedUpdate)
1001-
return false
1002-
}
997+
return@loopOnState // retry
998+
}
999+
// cancel all children in list on exceptional completion
1000+
if (proposedUpdate is CompletedExceptionally)
1001+
child.cancelChildrenInternal(proposedUpdate.exception)
1002+
// switch to completing state
1003+
val completing = Finishing(state.list!!, (state as? Finishing)?.cancelled, true)
1004+
if (_state.compareAndSet(state, completing)) {
1005+
waitForChild(child, proposedUpdate)
1006+
return false
10031007
}
10041008
}
10051009
}
10061010

1011+
private tailrec fun Child.cancelChildrenInternal(cause: Throwable) {
1012+
childJob.cancel(JobCancellationException("Child job was cancelled because of parent failure", cause, childJob))
1013+
nextChild()?.cancelChildrenInternal(cause)
1014+
}
1015+
10071016
private val Any?.exceptionOrNull: Throwable?
10081017
get() = (this as? CompletedExceptionally)?.exception
10091018

10101019
private fun firstChild(state: Incomplete) =
10111020
state as? Child ?: state.list?.nextChild()
10121021

1013-
private fun waitForChild(waitChild: Child, proposedUpdate: Any?) {
1014-
waitChild.child.invokeOnCompletion(handler = ChildCompletion(this, waitChild, proposedUpdate))
1022+
private fun waitForChild(child: Child, proposedUpdate: Any?) {
1023+
child.childJob.invokeOnCompletion(handler = ChildCompletion(this, child, proposedUpdate))
10151024
}
10161025

10171026
internal fun continueCompleting(lastChild: Child, proposedUpdate: Any?) {
@@ -1044,13 +1053,13 @@ public open class JobSupport(active: Boolean) : Job, SelectClause0, SelectClause
10441053
public override fun cancelChildren(cause: Throwable?) {
10451054
val state = this.state
10461055
when (state) {
1047-
is Child -> state.child.cancel(cause)
1056+
is Child -> state.childJob.cancel(cause)
10481057
is Incomplete -> state.list?.cancelChildrenList(cause)
10491058
}
10501059
}
10511060

10521061
private fun NodeList.cancelChildrenList(cause: Throwable?) {
1053-
forEach<Child> { it.child.cancel(cause) }
1062+
forEach<Child> { it.childJob.cancel(cause) }
10541063
}
10551064

10561065
/**
@@ -1079,7 +1088,7 @@ public open class JobSupport(active: Boolean) : Job, SelectClause0, SelectClause
10791088

10801089
// for nicer debugging
10811090
override final fun toString(): String =
1082-
"${nameString()}{${stateString()}}@${Integer.toHexString(System.identityHashCode(this))}"
1091+
"${nameString()}{${stateString()}}@$hexAddress"
10831092

10841093
/**
10851094
* @suppress **This is unstable API and it is subject to change.**
@@ -1393,22 +1402,22 @@ private class InvokeOnCancellation(
13931402

13941403
internal class Child(
13951404
parent: JobSupport,
1396-
val child: Job
1405+
@JvmField val childJob: Job
13971406
) : JobCancellationNode<JobSupport>(parent) {
13981407
override fun invoke(reason: Throwable?) {
13991408
// Always materialize the actual instance of parent's completion exception and cancel child with it
1400-
child.cancel(job.getCancellationException())
1409+
childJob.cancel(job.getCancellationException())
14011410
}
1402-
override fun toString(): String = "Child[$child]"
1411+
override fun toString(): String = "Child[$childJob]"
14031412
}
14041413

14051414
private class ChildCompletion(
14061415
private val parent: JobSupport,
1407-
private val waitChild: Child,
1416+
private val child: Child,
14081417
private val proposedUpdate: Any?
1409-
) : JobNode<Job>(waitChild.child) {
1418+
) : JobNode<Job>(child.childJob) {
14101419
override fun invoke(reason: Throwable?) {
1411-
parent.continueCompleting(waitChild, proposedUpdate)
1420+
parent.continueCompleting(child, proposedUpdate)
14121421
}
14131422
}
14141423

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/CoroutinesTest.kt

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,4 +266,49 @@ class CoroutinesTest : TestBase() {
266266
}
267267
expectUnreached()
268268
}
269+
270+
@Test
271+
fun testParentCrashCancelsChildren() = runTest(
272+
unhandled = listOf({ it -> it is IOException })
273+
) {
274+
expect(1)
275+
val parent = launch(coroutineContext + Job()) {
276+
expect(4)
277+
throw IOException("Crashed")
278+
}
279+
launch(coroutineContext + parent, CoroutineStart.UNDISPATCHED) {
280+
expect(2)
281+
try {
282+
yield() // to test
283+
} finally {
284+
expect(5)
285+
run(NonCancellable) { yield() } // to test
286+
expect(7)
287+
}
288+
expectUnreached() // will get cancelled, because parent crashes
289+
}
290+
expect(3)
291+
yield() // to parent
292+
expect(6)
293+
parent.join() // make sure crashed parent still waits for its child
294+
finish(8)
295+
}
296+
297+
@Test
298+
fun testYieldInFinally() = runTest(
299+
expected = { it is IOException }
300+
) {
301+
expect(1)
302+
try {
303+
expect(2)
304+
throwIOException()
305+
} finally {
306+
expect(3)
307+
yield()
308+
finish(4)
309+
}
310+
expectUnreached()
311+
}
312+
313+
private fun throwIOException() { throw IOException() }
269314
}

0 commit comments

Comments
 (0)