Skip to content

Commit 4d626de

Browse files
committed
Job.cancel and CompletableDeferred.complete support cancelling/completing
states and properly wait for their children to complete on join/await. Fixes #199
1 parent b699143 commit 4d626de

File tree

17 files changed

+196
-164
lines changed

17 files changed

+196
-164
lines changed

common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/CommonCompletableDeferredTest.kt

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,47 @@ class CommonCompletableDeferredTest : TestBase() {
174174
finish(7)
175175
}
176176

177+
@Test
178+
fun testCancelAndAwaitParentWaitChildren() = runTest {
179+
expect(1)
180+
val parent = CompletableDeferred<String>()
181+
launch(coroutineContext, start = CoroutineStart.UNDISPATCHED, parent = parent) {
182+
expect(2)
183+
try {
184+
yield() // will get cancelled
185+
} finally {
186+
expect(5)
187+
}
188+
}
189+
expect(3)
190+
parent.cancel()
191+
expect(4)
192+
try {
193+
parent.await()
194+
} catch (e: CancellationException) {
195+
finish(6)
196+
}
197+
}
198+
199+
@Test
200+
fun testCompleteAndAwaitParentWaitChildren() = runTest {
201+
expect(1)
202+
val parent = CompletableDeferred<String>()
203+
launch(coroutineContext, start = CoroutineStart.UNDISPATCHED, parent = parent) {
204+
expect(2)
205+
try {
206+
yield() // will get cancelled
207+
} finally {
208+
expect(5)
209+
}
210+
}
211+
expect(3)
212+
parent.complete("OK")
213+
expect(4)
214+
assertEquals("OK", parent.await())
215+
finish(6)
216+
}
217+
177218
private inline fun <reified T: Throwable> assertThrows(block: () -> Unit) {
178219
try {
179220
block()

common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/CommonJobTest.kt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,4 +149,23 @@ class CommonJobTest : TestBase() {
149149
job.cancel()
150150
assertEquals(0, fireCount)
151151
}
152+
153+
@Test
154+
fun testCancelAndJoinParentWaitChildren() = runTest {
155+
expect(1)
156+
val parent = Job()
157+
launch(coroutineContext, start = CoroutineStart.UNDISPATCHED, parent = parent) {
158+
expect(2)
159+
try {
160+
yield() // will get cancelled
161+
} finally {
162+
expect(5)
163+
}
164+
}
165+
expect(3)
166+
parent.cancel()
167+
expect(4)
168+
parent.join()
169+
finish(6)
170+
}
152171
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,14 @@ public abstract class AbstractCoroutine<in T>(
3838
public final override val context: CoroutineContext = parentContext + this
3939
public final override val coroutineContext: CoroutineContext get() = context
4040

41-
// all coroutines are cancelled through an intermediate cancelling state
42-
final override val hasCancellingState: Boolean get() = true
43-
4441
protected open val defaultResumeMode: Int get() = MODE_ATOMIC_DEFAULT
4542

4643
final override fun resume(value: T) {
47-
makeCompleting(value, defaultResumeMode)
44+
makeCompletingOnce(value, defaultResumeMode)
4845
}
4946

5047
final override fun resumeWithException(exception: Throwable) {
51-
makeCompleting(CompletedExceptionally(exception), defaultResumeMode)
48+
makeCompletingOnce(CompletedExceptionally(exception), defaultResumeMode)
5249
}
5350

5451
final override fun handleException(exception: Throwable) {

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -206,10 +206,7 @@ private class RunCompletion<in T>(
206206
override val context: CoroutineContext,
207207
delegate: Continuation<T>,
208208
resumeMode: Int
209-
) : AbstractContinuation<T>(delegate, resumeMode) {
210-
override val hasCancellingState: Boolean
211-
get() = true
212-
}
209+
) : AbstractContinuation<T>(delegate, resumeMode)
213210

214211
private class BlockingCoroutine<T>(
215212
parentContext: CoroutineContext,

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,8 @@ internal class CancellableContinuationImpl<in T>(
225225
initParentJob(delegate.context[Job])
226226
}
227227

228+
override val onCancelMode: Int get() = ON_CANCEL_MAKE_CANCELLED
229+
228230
override fun tryResume(value: T, idempotent: Any?): Any? {
229231
while (true) { // lock-free loop on state
230232
val state = this.state // atomic read

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CompletableDeferred.kt

Lines changed: 8 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -22,31 +22,9 @@ import kotlinx.coroutines.experimental.selects.SelectClause1
2222
* A [Deferred] that can be completed via public functions
2323
* [complete], [completeExceptionally], and [cancel].
2424
*
25-
* Completion functions return `false` when this deferred value is already complete.
25+
* Completion functions return `false` when this deferred value is already complete or completing.
2626
*
27-
* A completable deferred value has the following states:
28-
*
29-
* | **State** | [isActive] | [isCompleted] | [isCompletedExceptionally] | [isCancelled] |
30-
* | ------------------------- | ---------- | ------------- | -------------------------- | ------------- |
31-
* | _Active_ (initial state) | `true` | `false` | `false` | `false` |
32-
* | _Cancelled_ (final state) | `false` | `true` | `true` | `true` |
33-
* | _Resolved_ (final state) | `false` | `true` | `false` | `false` |
34-
* | _Failed_ (final state) | `false` | `true` | `true` | `false` |
35-
*
36-
* A an instance of completable deferred can be created by `CompletableDeferred()` function in _active_ state.
37-
*
38-
* ```
39-
* +--------+ complete +-----------+
40-
* | Active | ---------+-> | Resolved |
41-
* +--------+ | |(completed)|
42-
* | | +-----------+
43-
* | cancel |
44-
* V | +-----------+
45-
* +-----------+ +-> | Failed |
46-
* | Cancelled | |(completed)|
47-
* |(completed)| +-----------+
48-
* +-----------+
49-
* ```
27+
* An instance of completable deferred can be created by `CompletableDeferred()` function in _active_ state.
5028
*
5129
* All functions on this interface and on all interfaces derived from it are **thread-safe** and can
5230
* be safely invoked from concurrent coroutines without external synchronization.
@@ -95,34 +73,16 @@ private class CompletableDeferredImpl<T>(
9573
parent: Job?
9674
) : JobSupport(true), CompletableDeferred<T> {
9775
init { initParentJob(parent) }
76+
override val onCancelMode: Int get() = ON_CANCEL_MAKE_COMPLETING
77+
9878
override fun getCompleted(): T = getCompletedInternal() as T
9979
override suspend fun await(): T = awaitInternal() as T
10080
override val onAwait: SelectClause1<T>
10181
get() = this as SelectClause1<T>
10282

103-
override fun complete(value: T): Boolean {
104-
loopOnState { state ->
105-
when (state) {
106-
is Incomplete -> {
107-
// actually, we don't care about the mode here at all, so just use a default
108-
if (updateState(state, value, mode = MODE_ATOMIC_DEFAULT))
109-
return true
110-
}
111-
else -> return false
112-
}
113-
}
114-
}
83+
override fun complete(value: T): Boolean =
84+
makeCompleting(value)
11585

116-
override fun completeExceptionally(exception: Throwable): Boolean {
117-
loopOnState { state ->
118-
when (state) {
119-
is Incomplete -> {
120-
// actually, we don't care about the mode here at all, so just use a default
121-
if (updateState(state, CompletedExceptionally(exception), mode = MODE_ATOMIC_DEFAULT))
122-
return true
123-
}
124-
else -> return false
125-
}
126-
}
127-
}
86+
override fun completeExceptionally(exception: Throwable): Boolean =
87+
makeCompleting(CompletedExceptionally(exception))
12888
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,10 @@ import kotlin.coroutines.experimental.CoroutineContext
4545
* Such a deferred can be be made _active_ by invoking [start], [join], or [await].
4646
*
4747
* A deferred can be _cancelled_ at any time with [cancel] function that forces it to transition to
48-
* _cancelling_ state immediately. A simple implementation of deferred -- [CompletableDeferred],
49-
* that is not backed by a coroutine, does not have a _cancelling_ state, but becomes _cancelled_
50-
* on [cancel] immediately. Coroutines, on the other hand, become _cancelled_ only when they finish
51-
* executing their code and after all their [children] complete.
48+
* _cancelling_ state immediately. Deferred that is not backed by a coroutine (see [CompletableDeferred]) and does not have
49+
* [children] becomes _cancelled_ on [cancel] immediately.
50+
* Otherwise, deferred becomes _cancelled_ when it finishes executing its code and
51+
* when all its children [complete][isCompleted].
5252
*
5353
* ```
5454
* wait children

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
6060
* [CoroutineStart.LAZY]. Such a job can be made _active_ by invoking [start] or [join].
6161
*
6262
* A job can be _cancelled_ at any time with [cancel] function that forces it to transition to
63-
* _cancelling_ state immediately. Job that is not backed by a coroutine and does not have
63+
* _cancelling_ state immediately. Job that is not backed by a coroutine (see `Job()` function) and does not have
6464
* [children] becomes _cancelled_ on [cancel] immediately.
6565
* Otherwise, job becomes _cancelled_ when it finishes executing its code and
6666
* when all its children [complete][isCompleted].
@@ -841,7 +841,7 @@ public open class JobSupport(active: Boolean) : Job, SelectClause0, SelectClause
841841
promoteSingleToNodeList(state as JobNode<*>)
842842
} else {
843843
if (state is Finishing && state.cancelled != null && onCancelling) {
844-
check(hasCancellingState) // cannot be in this state unless were support cancelling state
844+
check(onCancelMode != ON_CANCEL_MAKE_CANCELLED) // cannot be in this state unless were support cancelling state
845845
// installing cancellation handler on job that is being cancelled
846846
if (invokeImmediately) handler(state.cancelled.cause)
847847
return NonDisposableHandle
@@ -858,14 +858,15 @@ public open class JobSupport(active: Boolean) : Job, SelectClause0, SelectClause
858858
}
859859
}
860860

861-
private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> =
862-
if (onCancelling && hasCancellingState)
861+
private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> {
862+
val hasCancellingState = onCancelMode != ON_CANCEL_MAKE_CANCELLED
863+
return if (onCancelling && hasCancellingState)
863864
(handler as? JobCancellationNode<*>)?.also { require(it.job === this) }
864865
?: InvokeOnCancellation(this, handler)
865866
else
866867
(handler as? JobNode<*>)?.also { require(it.job === this && (!hasCancellingState || it !is JobCancellationNode)) }
867868
?: InvokeOnCompletion(this, handler)
868-
869+
}
869870

870871
private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode<*>) =
871872
list.addLastIf(node) { this.state === expect }
@@ -948,12 +949,14 @@ public open class JobSupport(active: Boolean) : Job, SelectClause0, SelectClause
948949
}
949950
}
950951

951-
protected open val hasCancellingState: Boolean get() = false
952+
protected open val onCancelMode: Int get() = ON_CANCEL_MAKE_CANCELLING
952953

953-
public override fun cancel(cause: Throwable?): Boolean =
954-
if (hasCancellingState)
955-
makeCancelling(cause) else
956-
makeCancelled(cause)
954+
public override fun cancel(cause: Throwable?): Boolean = when (onCancelMode) {
955+
ON_CANCEL_MAKE_CANCELLED -> makeCancelled(cause)
956+
ON_CANCEL_MAKE_CANCELLING -> makeCancelling(cause)
957+
ON_CANCEL_MAKE_COMPLETING -> makeCompletingOnCancel(cause)
958+
else -> error("Invalid onCancelMode $onCancelMode")
959+
}
957960

958961
// we will be dispatching coroutine to process its cancellation exception, so there is no need for
959962
// an extra check for Job status in MODE_CANCELLABLE
@@ -1013,6 +1016,15 @@ public open class JobSupport(active: Boolean) : Job, SelectClause0, SelectClause
10131016
return true
10141017
}
10151018

1019+
private fun makeCompletingOnCancel(cause: Throwable?): Boolean =
1020+
makeCompleting(Cancelled(this, cause))
1021+
1022+
internal fun makeCompleting(proposedUpdate: Any?): Boolean =
1023+
when (makeCompletingInternal(proposedUpdate, mode = MODE_ATOMIC_DEFAULT)) {
1024+
COMPLETING_ALREADY_COMPLETING -> false
1025+
else -> true
1026+
}
1027+
10161028
/**
10171029
* Returns:
10181030
* * `true` if state was updated to completed/cancelled;
@@ -1021,30 +1033,38 @@ public open class JobSupport(active: Boolean) : Job, SelectClause0, SelectClause
10211033
* @throws IllegalStateException if job is already complete or completing
10221034
* @suppress **This is unstable API and it is subject to change.**
10231035
*/
1024-
internal fun makeCompleting(proposedUpdate: Any?, mode: Int): Boolean {
1036+
internal fun makeCompletingOnce(proposedUpdate: Any?, mode: Int): Boolean =
1037+
when (makeCompletingInternal(proposedUpdate, mode)) {
1038+
COMPLETING_COMPLETED -> true
1039+
COMPLETING_WAITING_CHILDREN -> false
1040+
else -> throw IllegalStateException("Job $this is already complete or completing, " +
1041+
"but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull)
1042+
}
1043+
1044+
private fun makeCompletingInternal(proposedUpdate: Any?, mode: Int): Int {
10251045
loopOnState { state ->
10261046
if (state !is Incomplete)
1027-
throw IllegalStateException("Job $this is already complete, but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull)
1047+
return COMPLETING_ALREADY_COMPLETING
10281048
if (state is Finishing && state.completing)
1029-
throw IllegalStateException("Job $this is already completing, but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull)
1049+
return COMPLETING_ALREADY_COMPLETING
10301050
val child: Child = firstChild(state) ?: // or else complete immediately w/o children
1031-
if (updateState(state, proposedUpdate, mode)) return true else return@loopOnState
1051+
if (updateState(state, proposedUpdate, mode)) return COMPLETING_COMPLETED else return@loopOnState
10321052
// must promote to list to correct operate on child lists
10331053
if (state is JobNode<*>) {
10341054
promoteSingleToNodeList(state)
10351055
return@loopOnState // retry
10361056
}
10371057
// cancel all children in list on exceptional completion
1038-
if (proposedUpdate is CompletedExceptionally
1039-
)
1058+
if (proposedUpdate is CompletedExceptionally)
10401059
child.cancelChildrenInternal(proposedUpdate.exception)
10411060
// switch to completing state
1042-
val completing = Finishing(state.list!!, (state as? Finishing)?.cancelled, true)
1061+
val cancelled = (state as? Finishing)?.cancelled ?: (proposedUpdate as? Cancelled)
1062+
val completing = Finishing(state.list!!, cancelled, true)
10431063
if (_state.compareAndSet(state, completing)) {
10441064
if (tryWaitForChild(child, proposedUpdate))
1045-
return false
1046-
if (updateState(completing, proposedUpdate, MODE_ATOMIC_DEFAULT))
1047-
return true
1065+
return COMPLETING_WAITING_CHILDREN
1066+
if (updateState(completing, proposedUpdate, mode = MODE_ATOMIC_DEFAULT))
1067+
return COMPLETING_COMPLETED
10481068
}
10491069
}
10501070
}
@@ -1251,8 +1271,7 @@ public open class JobSupport(active: Boolean) : Job, SelectClause0, SelectClause
12511271
cont.disposeOnCompletion(invokeOnCompletion {
12521272
val state = this.state
12531273
check(state !is Incomplete)
1254-
if (state is CompletedExceptionally
1255-
)
1274+
if (state is CompletedExceptionally)
12561275
cont.resumeWithException(state.exception)
12571276
else
12581277
cont.resume(state)
@@ -1267,8 +1286,7 @@ public open class JobSupport(active: Boolean) : Job, SelectClause0, SelectClause
12671286
if (state !is Incomplete) {
12681287
// already complete -- select result
12691288
if (select.trySelect(null)) {
1270-
if (state is CompletedExceptionally
1271-
)
1289+
if (state is CompletedExceptionally)
12721290
select.resumeSelectCancellableWithException(state.exception)
12731291
else
12741292
block.startCoroutineUndispatched(state, select.completion)
@@ -1286,14 +1304,21 @@ public open class JobSupport(active: Boolean) : Job, SelectClause0, SelectClause
12861304
internal fun <R> selectAwaitCompletion(select: SelectInstance<R>, block: suspend (Any?) -> R) {
12871305
val state = this.state
12881306
// Note: await is non-atomic (can be cancelled while dispatched)
1289-
if (state is CompletedExceptionally
1290-
)
1307+
if (state is CompletedExceptionally)
12911308
select.resumeSelectCancellableWithException(state.exception)
12921309
else
12931310
block.startCoroutineCancellable(state, select.completion)
12941311
}
12951312
}
12961313

1314+
internal const val ON_CANCEL_MAKE_CANCELLED = 0
1315+
internal const val ON_CANCEL_MAKE_CANCELLING = 1
1316+
internal const val ON_CANCEL_MAKE_COMPLETING = 2
1317+
1318+
private const val COMPLETING_ALREADY_COMPLETING = 0
1319+
private const val COMPLETING_COMPLETED = 1
1320+
private const val COMPLETING_WAITING_CHILDREN = 2
1321+
12971322
private const val RETRY = -1
12981323
private const val FALSE = 0
12991324
private const val TRUE = 1
@@ -1310,6 +1335,7 @@ private class Empty(override val isActive: Boolean) : JobSupport.Incomplete {
13101335

13111336
private class JobImpl(parent: Job? = null) : JobSupport(true) {
13121337
init { initParentJob(parent) }
1338+
override val onCancelMode: Int get() = ON_CANCEL_MAKE_COMPLETING
13131339
}
13141340

13151341
// -------- invokeOnCompletion nodes

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ private fun <U, T: U> setupTimeout(
8888
}
8989
return when {
9090
result == COROUTINE_SUSPENDED -> COROUTINE_SUSPENDED
91-
coroutine.makeCompleting(result, MODE_IGNORE) -> {
91+
coroutine.makeCompletingOnce(result, MODE_IGNORE) -> {
9292
if (result is CompletedExceptionally) throw result.exception else result
9393
}
9494
else -> COROUTINE_SUSPENDED

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/JobDisposeTest.kt

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,5 @@ class JobDisposeTest: TestBase() {
9090
exception?.let { throw it }
9191
}
9292

93-
class TestJob : JobSupport(active = true) {
94-
// The bug was triggering only with cancelling state
95-
override val hasCancellingState: Boolean
96-
get() = true
97-
}
93+
class TestJob : JobSupport(active = true)
9894
}

0 commit comments

Comments
 (0)