Skip to content

Commit 29affbb

Browse files
committed
Streamlined Job APIs that support cancelling state:
Job.isCancelledOrCompleted and Job.invokeOnCancellation are removed, the later replaced with invokeOnCompletion(handler, onCancelling=true)
1 parent 4518104 commit 29affbb

File tree

11 files changed

+97
-89
lines changed

11 files changed

+97
-89
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,23 @@ import kotlin.coroutines.experimental.suspendCoroutine
4141
* Invocation of [cancel] transitions this continuation from _active_ to _cancelled_ state, while
4242
* invocation of [resume] or [resumeWithException] transitions it from _active_ to _resumed_ state.
4343
*
44-
* A [cancelled][isCancelled] continuation implies that it is [completed][isCompleted], so
45-
* [invokeOnCancellation] and [invokeOnCompletion] have the same effect.
44+
* A [cancelled][isCancelled] continuation implies that it is [completed][isCompleted].
4645
*
4746
* Invocation of [resume] or [resumeWithException] in _resumed_ state produces [IllegalStateException]
4847
* but is ignored in _cancelled_ state.
48+
*
49+
* ```
50+
* +-----------+ resume +---------+
51+
* | Active | ----------> | Resumed |
52+
* +-----------+ +---------+
53+
* |
54+
* | cancel
55+
* V
56+
* +-----------+
57+
* | Cancelled |
58+
* +-----------+
59+
*
60+
* ```
4961
*/
5062
public interface CancellableContinuation<in T> : Continuation<T>, Job {
5163
/**

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ internal class DispatchTask<in T>(
115115
val job = if (cancellable) context[Job] else null
116116
withCoroutineContext(context) {
117117
when {
118-
job != null && job.isCancelledOrCompleted -> continuation.resumeWithException(job.getCompletionException())
118+
job != null && !job.isActive -> continuation.resumeWithException(job.getCompletionException())
119119
exception -> continuation.resumeWithException(value as Throwable)
120120
else -> continuation.resume(value as T)
121121
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import kotlin.coroutines.experimental.CoroutineContext
2424
*/
2525
public interface CoroutineScope {
2626
/**
27-
* Returns `true` when this coroutine is still active (has not completed yet).
27+
* Returns `true` when this coroutine is still active (has not completed and was not cancelled yet).
2828
*
2929
* Check this property in long-running computation loops to support cancellation:
3030
* ```

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 44 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,6 @@ public interface Job : CoroutineContext.Element {
115115
*/
116116
public val isCancelled: Boolean
117117

118-
/**
119-
* Returns `true` when this job is either [isCancelled] or [isCompleted].
120-
*/
121-
public val isCancelledOrCompleted: Boolean
122-
123118
/**
124119
* Returns the exception that signals the completion of this job -- it returns the original
125120
* [cancel] cause or an instance of [CancellationException] if this job had completed
@@ -171,35 +166,40 @@ public interface Job : CoroutineContext.Element {
171166
// ------------ low-level state-notification ------------
172167

173168
/**
174-
* Registers handler that is **synchronously** invoked on cancellation or completion of this job.
175-
* When job is already in _cancelling_ state or is complete for any reason, then the handler
176-
* is immediately invoked with a job's cancellation cause or `null`. Otherwise, handler will be
177-
* invoked once when this job is [cancelled][cancel] or becomes complete.
178-
*
179-
* Unlike [invokeOnCompletion], here the handler is immediately invoked on invocation of [cancel]
180-
* even if the corresponding coroutine has not finished its execution yet.
169+
* Registers handler that is **synchronously** invoked once on completion of this job.
170+
* When job is already complete, then the handler is immediately invoked
171+
* with a job's cancellation cause or `null`. Otherwise, handler will be invoked once when this
172+
* job is complete.
181173
*
182174
* The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] the
183175
* registration of this handler and release its memory if its invocation is no longer needed.
184176
* There is no need to dispose the handler after completion of this job. The references to
185177
* all the handlers are released when this job completes.
186178
*
179+
* Note, that the handler is not invoked on invocation of [cancel] when
180+
* job becomes _cancelling_, but only when the corresponding coroutine had finished execution
181+
* of its code and became _cancelled_. There is an overloaded version of this function
182+
* with `onCancelling` parameter to receive notification on _cancelling_ state.
183+
*
187184
* **Note**: This function is a part of internal machinery that supports parent-child hierarchies
188185
* and allows for implementation of suspending functions that wait on the Job's state.
189186
* This function should not be used in general application code.
190-
* Implementations of `CompletionHandler` must be fast and _lock-free_
187+
* Implementations of `CompletionHandler` must be fast and _lock-free_.
191188
*/
192-
public fun invokeOnCancellation(handler: CompletionHandler): DisposableHandle
189+
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
193190

194191
/**
195-
* Registers handler that is **synchronously** invoked on completion of this job.
192+
* Registers handler that is **synchronously** invoked once on cancellation or completion of this job.
196193
* When job is already complete, then the handler is immediately invoked
197194
* with a job's cancellation cause or `null`. Otherwise, handler will be invoked once when this
198-
* job is complete.
195+
* job is cancelled or complete.
199196
*
200-
* Unlike [invokeOnCancellation], here the handler is not invoked on invocation of [cancel] when
201-
* job becomes _cancelling_, but only when the corresponding coroutine had finished execution
202-
* of its code and became _cancelled_.
197+
* Invocation of this handler on a transition to a transient _cancelling_ state
198+
* is controlled by [onCancelling] boolean parameter.
199+
* The handler is invoked on invocation of [cancel] when
200+
* job becomes _cancelling_ when [onCancelling] parameters is set to `true`. However,
201+
* when this [Job] is not backed by a coroutine, like [CompletableDeferred] or [CancellableContinuation]
202+
* (both of which do not posses a _cancelling_ state), then the value of [onCancelling] parameter is ignored.
203203
*
204204
* The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] the
205205
* registration of this handler and release its memory if its invocation is no longer needed.
@@ -209,9 +209,9 @@ public interface Job : CoroutineContext.Element {
209209
* **Note**: This function is a part of internal machinery that supports parent-child hierarchies
210210
* and allows for implementation of suspending functions that wait on the Job's state.
211211
* This function should not be used in general application code.
212-
* Implementations of `CompletionHandler` must be fast and _lock-free_
212+
* Implementations of `CompletionHandler` must be fast and _lock-free_.
213213
*/
214-
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
214+
public fun invokeOnCompletion(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle
215215

216216
// ------------ unstable internal API ------------
217217

@@ -272,12 +272,12 @@ public interface DisposableHandle : Job.Registration {
272272
}
273273

274274
/**
275-
* Handler for [Job.invokeOnCompletion] and [Job.invokeOnCancellation].
275+
* Handler for [Job.invokeOnCompletion].
276276
*
277277
* **Note**: This type is a part of internal machinery that supports parent-child hierarchies
278278
* and allows for implementation of suspending functions that wait on the Job's state.
279279
* This type should not be used in general application code.
280-
* Implementations of `CompletionHandler` must be fast and _lock-free_
280+
* Implementations of `CompletionHandler` must be fast and _lock-free_.
281281
*/
282282
public typealias CompletionHandler = (Throwable?) -> Unit
283283

@@ -441,13 +441,19 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
441441
parentHandle = NonDisposableHandle
442442
return
443443
}
444+
parent.start() // make sure the parent is started
444445
// directly pass HandlerNode to parent scope to optimize one closure object (see makeNode)
445-
val newRegistration = parent.invokeOnCancellation(ParentOnCancellation(parent, this))
446+
val newRegistration = parent.invokeOnCompletion(ParentOnCancellation(parent), onCancelling = true)
446447
parentHandle = newRegistration
447448
// now check our state _after_ registering (see updateState order of actions)
448449
if (isCompleted) newRegistration.dispose()
449450
}
450451

452+
private inner class ParentOnCancellation(parent: Job) : JobCancellationNode<Job>(parent) {
453+
override fun invokeOnce(reason: Throwable?) { onParentCancellation(reason) }
454+
override fun toString(): String = "ParentOnCancellation[${this@JobSupport}]"
455+
}
456+
451457
/**
452458
* Invoked at most once on parent completion.
453459
* @suppress **This is unstable API and it is subject to change.**
@@ -490,11 +496,6 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
490496
return state is Cancelled || state is Cancelling
491497
}
492498

493-
public final override val isCancelledOrCompleted: Boolean get() {
494-
val state = this.state
495-
return state !is Incomplete || state is Cancelling
496-
}
497-
498499
// ------------ state update ------------
499500

500501
/**
@@ -637,20 +638,20 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
637638
}
638639
}
639640

640-
public final override fun invokeOnCancellation(handler: CompletionHandler): DisposableHandle =
641-
installHandler(handler, onCancellation = hasCancellingState)
642-
643641
public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
644-
installHandler(handler, onCancellation = false)
642+
installHandler(handler, onCancelling = false)
643+
644+
public final override fun invokeOnCompletion(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle =
645+
installHandler(handler, onCancelling = onCancelling && hasCancellingState)
645646

646-
private fun installHandler(handler: CompletionHandler, onCancellation: Boolean): DisposableHandle {
647+
private fun installHandler(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle {
647648
var nodeCache: JobNode<*>? = null
648649
lockFreeLoopOnState { state ->
649650
when (state) {
650651
is Empty -> { // EMPTY_X state -- no completion handlers
651652
if (state.isActive) {
652653
// try move to SINGLE state
653-
val node = nodeCache ?: makeNode(handler, onCancellation).also { nodeCache = it }
654+
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
654655
if (STATE.compareAndSet(this, state, node)) return node
655656
} else
656657
promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
@@ -659,15 +660,15 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
659660
promoteSingleToNodeList(state)
660661
}
661662
is NodeList -> { // LIST -- a list of completion handlers (either new or active)
662-
val node = nodeCache ?: makeNode(handler, onCancellation).also { nodeCache = it }
663+
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
663664
if (addLastAtomic(state, state, node)) return node
664665
}
665666
is Cancelling -> { // CANCELLING -- has a list of completion handlers
666-
if (onCancellation) { // installing cancellation handler on job that is being cancelled
667+
if (onCancelling) { // installing cancellation handler on job that is being cancelled
667668
handler((state as? CompletedExceptionally)?.exception)
668669
return NonDisposableHandle
669670
}
670-
val node = nodeCache ?: makeNode(handler, onCancellation).also { nodeCache = it }
671+
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
671672
if (addLastAtomic(state, state.list, node)) return node
672673
}
673674
else -> { // is inactive
@@ -678,8 +679,8 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
678679
}
679680
}
680681

681-
private fun makeNode(handler: CompletionHandler, onCancellation: Boolean): JobNode<*> =
682-
if (onCancellation)
682+
private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> =
683+
if (onCancelling)
683684
(handler as? JobCancellationNode<*>)?.also { require(it.job === this) }
684685
?: InvokeOnCancellation(this, handler)
685686
else
@@ -816,14 +817,15 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
816817

817818
/**
818819
* Override to process any exceptions that were encountered while invoking completion handlers
819-
* installed via [invokeOnCancellation] or [invokeOnCompletion].
820+
* installed via [invokeOnCompletion].
820821
*/
821822
protected open fun handleException(exception: Throwable) {
822823
throw exception
823824
}
824825

825826
/**
826-
* It is invoked once when job is cancelled or is completed, similarly to [invokeOnCancellation].
827+
* It is invoked once when job is cancelled or is completed, similarly to [invokeOnCompletion] with
828+
* `onCancelling` set to `true`.
827829
*/
828830
protected open fun onCancellation() {}
829831

@@ -912,14 +914,6 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
912914
) : CompletedExceptionally(cause)
913915

914916

915-
private class ParentOnCancellation(
916-
parentJob: Job,
917-
private val subordinateJob: JobSupport
918-
) : JobCancellationNode<Job>(parentJob) {
919-
override fun invokeOnce(reason: Throwable?) { subordinateJob.onParentCancellation(reason) }
920-
override fun toString(): String = "ParentOnCancellation[$subordinateJob]"
921-
}
922-
923917
/*
924918
* =================================================================================================
925919
* This is ready-to-use implementation for Deferred interface.

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,6 @@ object NonCancellable : AbstractCoroutineContextElement(Job), Job {
4141
/** Always returns `false`. */
4242
override val isCancelled: Boolean get() = false
4343

44-
/** Always returns `false`. */
45-
override val isCancelledOrCompleted: Boolean get() = false
46-
4744
/** Always returns `false`. */
4845
override fun start(): Boolean = false
4946

@@ -64,10 +61,10 @@ object NonCancellable : AbstractCoroutineContextElement(Job), Job {
6461
override fun getCompletionException(): CancellationException = throw IllegalStateException("This job is always active")
6562

6663
/** Always returns [NonDisposableHandle]. */
67-
override fun invokeOnCancellation(handler: CompletionHandler): DisposableHandle = NonDisposableHandle
64+
override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle = NonDisposableHandle
6865

6966
/** Always returns [NonDisposableHandle]. */
70-
override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle = NonDisposableHandle
67+
override fun invokeOnCompletion(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle = NonDisposableHandle
7168

7269
/** Always returns `false`. */
7370
override fun cancel(cause: Throwable?): Boolean = false

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
3030
suspend fun yield(): Unit = suspendCoroutineOrReturn sc@ { cont ->
3131
val context = cont.context
3232
val job = context[Job]
33-
if (job != null && job.isCancelledOrCompleted) throw job.getCompletionException()
33+
if (job != null && !job.isActive) throw job.getCompletionException()
3434
if (cont !is DispatchedContinuation<Unit>) return@sc Unit
3535
if (!cont.dispatcher.isDispatchNeeded(context)) return@sc Unit
3636
cont.dispatchYield(Unit)

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -284,15 +284,20 @@ internal class SelectBuilderImpl<in R>(
284284

285285
private fun initCancellability() {
286286
val parent = context[Job] ?: return
287-
val newRegistration = parent.invokeOnCancellation { cause ->
288-
if (trySelect(null))
289-
resumeSelectCancellableWithException(cause ?: CancellationException("Select was cancelled"))
290-
}
287+
val newRegistration = parent.invokeOnCompletion(SelectOnCancellation(parent), onCancelling = true)
291288
parentHandle = newRegistration
292289
// now check our state _after_ registering
293290
if (isSelected) newRegistration.dispose()
294291
}
295292

293+
private inner class SelectOnCancellation(job: Job) : JobCancellationNode<Job>(job) {
294+
override fun invokeOnce(reason: Throwable?) {
295+
if (trySelect(null))
296+
resumeSelectCancellableWithException(reason ?: CancellationException("Select was cancelled"))
297+
}
298+
override fun toString(): String = "SelectOnCancellation[${this@SelectBuilderImpl}]"
299+
}
300+
296301
private val state: Any? get() {
297302
while (true) { // lock-free helping loop
298303
val state = _state

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/JobDisposeTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class JobDisposeTest: TestBase() {
5151
threads += testThread("creator") {
5252
while (!done) {
5353
val job = TestJob()
54-
val handle = job.invokeOnCancellation { /* nothing */ }
54+
val handle = job.invokeOnCompletion({ /* nothing */ }, onCancelling = true)
5555
this.job = job // post job to cancelling thread
5656
this.handle = handle // post handle to concurrent disposer thread
5757
handle.dispose() // dispose of handle from this thread (concurrently with other disposer)

reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ private class PublisherCoroutine<T>(
113113
// assert: mutex.isLocked()
114114
private fun doLockedNext(elem: T) {
115115
// check if already closed for send
116-
if (isCancelledOrCompleted) {
116+
if (!isActive) {
117117
doLockedSignalCompleted()
118118
throw sendException()
119119
}
@@ -141,14 +141,14 @@ private class PublisherCoroutine<T>(
141141
}
142142
}
143143
/*
144-
There is no sense to check for `isCompleted` before doing `unlock`, because completion might
145-
happen after this check and before `unlock` (see `afterCompleted` that does not do anything
144+
There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
145+
happen after this check and before `unlock` (see `onCancellation` that does not do anything
146146
if it fails to acquire the lock that we are still holding).
147-
We have to recheck `isCompleted` after `unlock` anyway.
147+
We have to recheck `isActive` after `unlock` anyway.
148148
*/
149149
mutex.unlock()
150-
// recheck isCancelledOrCompleted
151-
if (isCancelledOrCompleted && mutex.tryLock())
150+
// recheck isActive
151+
if (!isActive && mutex.tryLock())
152152
doLockedSignalCompleted()
153153
}
154154

@@ -188,8 +188,8 @@ private class PublisherCoroutine<T>(
188188
// unlock the mutex when we don't have back-pressure anymore
189189
if (cur == 0L) {
190190
mutex.unlock()
191-
// recheck isCancelledOrCompleted
192-
if (isCancelledOrCompleted && mutex.tryLock())
191+
// recheck isActive
192+
if (!isActive && mutex.tryLock())
193193
doLockedSignalCompleted()
194194
}
195195
return
@@ -200,7 +200,7 @@ private class PublisherCoroutine<T>(
200200
override fun onCancellation() {
201201
while (true) { // lock-free loop for nRequested
202202
val cur = nRequested
203-
if (cur == SIGNALLED) return // some other thread holding lock already signalled completion
203+
if (cur == SIGNALLED) return // some other thread holding lock already signalled cancellation/completion
204204
check(cur >= 0) // no other thread could have marked it as CLOSED, because onCancellation is invoked once
205205
if (!N_REQUESTED.compareAndSet(this, cur, CLOSED)) continue // retry on failed CAS
206206
// Ok -- marked as CLOSED, now can unlock the mutex if it was locked due to backpressure

0 commit comments

Comments
 (0)