Skip to content

Commit 447b88a

Browse files
committed
Job.attachChild is deprecated as error-prone API;
Fixed StackOverflow when waiting for a lot of completed children, that did not remove their handles from the parent.
1 parent 5610e1d commit 447b88a

File tree

4 files changed

+69
-29
lines changed

4 files changed

+69
-29
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ import kotlin.coroutines.experimental.CoroutineContext
4848
* _cancelling_ state immediately. A simple implementation of deferred -- [CompletableDeferred],
4949
* that is not backed by a coroutine, does not have a _cancelling_ state, but becomes _cancelled_
5050
* on [cancel] immediately. Coroutines, on the other hand, become _cancelled_ only when they finish
51-
* executing their code and after all their [children][attachChild] complete.
51+
* executing their code and after all their children complete.
5252
*
5353
* ```
5454
* wait children
@@ -71,7 +71,7 @@ import kotlin.coroutines.experimental.CoroutineContext
7171
* or the cancellation cause inside the coroutine.
7272
*
7373
* A deferred value can have a _parent_ job. A deferred value with a parent is cancelled when its parent is
74-
* cancelled or completes. Parent waits for all its [children][attachChild] to complete in _completing_ or
74+
* cancelled or completes. Parent waits for all its children to complete in _completing_ or
7575
* _cancelling_ state. _Completing_ state is purely internal. For an outside observer a _completing_
7676
* deferred is still active, while internally it is waiting for its children.
7777
*

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 51 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
6060
*
6161
* A job can be _cancelled_ at any time with [cancel] function that forces it to transition to
6262
* _cancelling_ state immediately. Job that is not backed by a coroutine and does not have
63-
* [children][attachChild] becomes _cancelled_ on [cancel] immediately.
63+
* children becomes _cancelled_ on [cancel] immediately.
6464
* Otherwise, job becomes _cancelled_ when it finishes executing its code and
6565
* when all its children [complete][isCompleted].
6666
*
@@ -83,7 +83,7 @@ import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
8383
* the coroutine is suspended on a _cancellable_ suspension point by throwing [CancellationException].
8484
*
8585
* A job can have a _parent_ job. A job with a parent is cancelled when its parent is cancelled or completes exceptionally.
86-
* Parent job waits for all its [children][attachChild] to complete in _completing_ or _cancelling_ state.
86+
* Parent job waits for all its children to complete in _completing_ or _cancelling_ state.
8787
* _Completing_ state is purely internal to the job. For an outside observer a _completing_ job is still active,
8888
* while internally it is waiting for its children.
8989
*
@@ -117,15 +117,15 @@ public interface Job : CoroutineContext.Element {
117117

118118
/**
119119
* Returns `true` when this job is active -- it was already started and has not completed or cancelled yet.
120-
* The job that is waiting for its [children][attachChild] to complete is still considered to be active if it
120+
* The job that is waiting for its children to complete is still considered to be active if it
121121
* was not cancelled.
122122
*/
123123
public val isActive: Boolean
124124

125125
/**
126126
* Returns `true` when this job has completed for any reason. A job that was cancelled and has
127127
* finished its execution is also considered complete. Job becomes complete only after
128-
* all its [children][attachChild] complete.
128+
* all its children complete.
129129
*/
130130
public val isCompleted: Boolean
131131

@@ -190,17 +190,20 @@ public interface Job : CoroutineContext.Element {
190190
* * Parent cannot complete until all its children are complete. Parent waits for all its children to
191191
* complete in _completing_ or _cancelling_ state.
192192
*
193-
* A child must store the resulting [DisposableHandle] and [dispose][DisposableHandle.dispose] the attachment
194-
* to its parent on its own completion.
193+
* **A child must store the resulting [DisposableHandle] and [dispose][DisposableHandle.dispose] the attachment
194+
* to its parent on its own completion.**
195195
*
196196
* Coroutine builders and job factory functions that accept `parent` [CoroutineContext] parameter
197197
* lookup a [Job] instance in the parent context and use this function to attach themselves as a child.
198198
* They also store a reference to the resulting [DisposableHandle] and dispose a handle when they complete.
199+
*
200+
* @suppress This is an internal API. This method is too error prone for public API.
199201
*/
202+
@Deprecated(message = "Start child coroutine with 'parent' parameter", level = DeprecationLevel.WARNING)
200203
public fun attachChild(child: Job): DisposableHandle
201204

202205
/**
203-
* Cancels all [children][attachChild] jobs of this coroutine with the given [cause]. Unlike [cancel],
206+
* Cancels all children jobs of this coroutine with the given [cause]. Unlike [cancel],
204207
* the state of this job itself is not affected.
205208
*/
206209
public fun cancelChildren(cause: Throwable? = null)
@@ -212,7 +215,7 @@ public interface Job : CoroutineContext.Element {
212215
* when the job is complete for any reason and the [Job] of the invoking coroutine is still [active][isActive].
213216
* This function also [starts][Job.start] the corresponding coroutine if the [Job] was still in _new_ state.
214217
*
215-
* Note, that the job becomes complete only when all its [children][attachChild] are complete.
218+
* Note, that the job becomes complete only when all its children are complete.
216219
*
217220
* This suspending function is cancellable and **always** checks for the cancellation of invoking coroutine's Job.
218221
* If the [Job] of the invoking coroutine is cancelled or completed when this
@@ -274,6 +277,12 @@ public interface Job : CoroutineContext.Element {
274277

275278
// ------------ unstable internal API ------------
276279

280+
/**
281+
* @return `true` when Job was not complete and handler was installed, `false` otherwise (and handler is not invoked).
282+
* @suppress **This is unstable API and it is subject to change.**
283+
*/
284+
public fun invokeOnCompletionIfNotComplete(handler: CompletionHandler): Boolean
285+
277286
/**
278287
* @suppress **Error**: Operator '+' on two Job objects is meaningless.
279288
* Job is a coroutine context element and `+` is a set-sum operator for coroutine contexts.
@@ -568,7 +577,10 @@ public open class JobSupport(active: Boolean) : Job, SelectClause0, SelectClause
568577
val handle = parent.attachChild(this)
569578
parentHandle = handle
570579
// now check our state _after_ registering (see updateState order of actions)
571-
if (isCompleted) handle.dispose()
580+
if (isCompleted) {
581+
handle.dispose()
582+
parentHandle = NonDisposableHandle // release it just in case, to aid GC
583+
}
572584
}
573585

574586
// ------------ state query ------------
@@ -647,7 +659,10 @@ public open class JobSupport(active: Boolean) : Job, SelectClause0, SelectClause
647659
require(update !is Incomplete) // only incomplete -> completed transition is allowed
648660
if (!_state.compareAndSet(expect, update)) return false
649661
// Unregister from parent job
650-
parentHandle?.dispose() // volatile read parentHandle _after_ state was updated
662+
parentHandle?.let {
663+
it.dispose() // volatile read parentHandle _after_ state was updated
664+
parentHandle = NonDisposableHandle // release it just in case, to aid GC
665+
}
651666
return true // continues in completeUpdateState
652667
}
653668

@@ -763,16 +778,19 @@ public open class JobSupport(active: Boolean) : Job, SelectClause0, SelectClause
763778

764779
@Suppress("OverridingDeprecatedMember")
765780
public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
766-
installHandler(handler, onCancelling = false)
781+
invokeOnCompletion(false, handler)
767782

768783
@Suppress("OverridingDeprecatedMember")
769784
public final override fun invokeOnCompletion(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle =
770-
installHandler(handler, onCancelling = onCancelling && hasCancellingState)
785+
invokeOnCompletion(onCancelling, handler)
771786

772787
public final override fun invokeOnCompletion(onCancelling: Boolean, handler: CompletionHandler): DisposableHandle =
773-
installHandler(handler, onCancelling = onCancelling && hasCancellingState)
788+
installHandler(handler, onCancelling = onCancelling && hasCancellingState, invokeNow = true) ?: NonDisposableHandle
789+
790+
override fun invokeOnCompletionIfNotComplete(handler: CompletionHandler): Boolean =
791+
installHandler(handler, false, invokeNow = false) != null
774792

775-
private fun installHandler(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle {
793+
private fun installHandler(handler: CompletionHandler, onCancelling: Boolean, invokeNow: Boolean): DisposableHandle? {
776794
var nodeCache: JobNode<*>? = null
777795
loopOnState { state ->
778796
when (state) {
@@ -791,16 +809,16 @@ public open class JobSupport(active: Boolean) : Job, SelectClause0, SelectClause
791809
} else {
792810
if (state is Finishing && state.cancelled != null && onCancelling) {
793811
// installing cancellation handler on job that is being cancelled
794-
handler((state as? CompletedExceptionally)?.exception)
795-
return NonDisposableHandle
812+
if (invokeNow) handler(state.cancelled.cause)
813+
return null
796814
}
797815
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
798816
if (addLastAtomic(state, list, node)) return node
799817
}
800818
}
801819
else -> { // is complete
802-
handler((state as? CompletedExceptionally)?.exception)
803-
return NonDisposableHandle
820+
if (invokeNow) handler((state as? CompletedExceptionally)?.cause)
821+
return null
804822
}
805823
}
806824
}
@@ -988,8 +1006,10 @@ public open class JobSupport(active: Boolean) : Job, SelectClause0, SelectClause
9881006
// switch to completing state
9891007
val completing = Finishing(state.list!!, (state as? Finishing)?.cancelled, true)
9901008
if (_state.compareAndSet(state, completing)) {
991-
waitForChild(child, proposedUpdate)
992-
return false
1009+
if (tryWaitForChild(child, proposedUpdate))
1010+
return false
1011+
if (updateState(completing, proposedUpdate, MODE_ATOMIC_DEFAULT))
1012+
return true
9931013
}
9941014
}
9951015
}
@@ -1005,20 +1025,24 @@ public open class JobSupport(active: Boolean) : Job, SelectClause0, SelectClause
10051025
private fun firstChild(state: Incomplete) =
10061026
state as? Child ?: state.list?.nextChild()
10071027

1008-
private fun waitForChild(child: Child, proposedUpdate: Any?) {
1009-
child.childJob.invokeOnCompletion(handler = ChildCompletion(this, child, proposedUpdate))
1028+
// return false when there is no more incomplete children to wait
1029+
private tailrec fun tryWaitForChild(child: Child, proposedUpdate: Any?): Boolean {
1030+
if (child.childJob.invokeOnCompletionIfNotComplete(ChildCompletion(this, child, proposedUpdate)))
1031+
return true
1032+
val nextChild = child.nextChild() ?: return false
1033+
return tryWaitForChild(nextChild, proposedUpdate)
10101034
}
10111035

10121036
internal fun continueCompleting(lastChild: Child, proposedUpdate: Any?) {
10131037
loopOnState { state ->
10141038
if (state !is Finishing)
10151039
throw IllegalStateException("Job $this is found in expected state while completing with $proposedUpdate", proposedUpdate.exceptionOrNull)
10161040
// figure out if we need to wait for next child
1017-
val waitChild = lastChild.nextChild() ?: // or else no more children
1018-
if (updateState(state, proposedUpdate, MODE_ATOMIC_DEFAULT)) return else return@loopOnState
1019-
// wait for next child
1020-
waitForChild(waitChild, proposedUpdate)
1021-
return
1041+
val waitChild = lastChild.nextChild()
1042+
// try wait for next child
1043+
if (waitChild != null && tryWaitForChild(waitChild, proposedUpdate)) return // waiting for next child
1044+
// no more children to wait -- try update state
1045+
if (updateState(state, proposedUpdate, MODE_ATOMIC_DEFAULT)) return
10221046
}
10231047
}
10241048

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ object NonCancellable : AbstractCoroutineContextElement(Job), Job {
6666
/** Always returns [NonDisposableHandle]. */
6767
override fun invokeOnCompletion(onCancelling: Boolean, handler: CompletionHandler): DisposableHandle = NonDisposableHandle
6868

69+
/** @suppress **This is unstable API and it is subject to change. */
70+
override fun invokeOnCompletionIfNotComplete(handler: CompletionHandler): Boolean = false
71+
6972
/** Always returns `false`. */
7073
override fun cancel(cause: Throwable?): Boolean = false
7174

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/CoroutinesTest.kt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,5 +346,18 @@ class CoroutinesTest : TestBase() {
346346
d.await()
347347
}
348348

349+
@Test
350+
fun testCancelManyCompletedAttachedChildren() = runTest {
351+
val parent = launch(coroutineContext) { /* do nothing */ }
352+
val n = 10_000 * stressTestMultiplier
353+
repeat(n) {
354+
// create a child that already completed
355+
val child = launch(coroutineContext, CoroutineStart.UNDISPATCHED) { /* do nothing */ }
356+
// attach it manually
357+
parent.attachChild(child)
358+
}
359+
parent.cancelAndJoin() // cancel parent, make sure no stack overflow
360+
}
361+
349362
private fun throwIOException() { throw IOException() }
350363
}

0 commit comments

Comments
 (0)