Skip to content

Commit d82b3a9

Browse files
committed
Job join/await waits for coroutine code completion,
"Cancelling" state is introduced for Job/Deferred objects.
1 parent 2b12d58 commit d82b3a9

File tree

27 files changed

+679
-309
lines changed

27 files changed

+679
-309
lines changed

integration/kotlinx-coroutines-guava/src/main/kotlin/kotlinx/coroutines/experimental/guava/ListenableFuture.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ private class DeferredListenableFuture<T>(
9595
* Awaits for completion of the future without blocking a thread.
9696
*
9797
* This suspending function is cancellable.
98-
* If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
98+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
9999
* stops waiting for the future and immediately resumes with [CancellationException].
100100
*
101101
* Note, that `ListenableFuture` does not support removal of installed listeners, so on cancellation of this wait

integration/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/future/Future.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public suspend fun <T> CompletionStage<T>.await(): T = suspendCoroutine { cont:
102102
* Awaits for completion of the future without blocking a thread.
103103
*
104104
* This suspending function is cancellable.
105-
* If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
105+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
106106
* stops waiting for the future and immediately resumes with [CancellationException].
107107
*
108108
* Note, that `CompletableFuture` does not support prompt removal of installed listeners, so on cancellation of this wait

integration/kotlinx-coroutines-nio/src/main/kotlin/kotlinx/coroutines/experimental/nio/Nio.kt

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit
2828
/**
2929
* Performs [AsynchronousFileChannel.lock] without blocking a thread and resumes when asynchronous operation completes.
3030
* This suspending function is cancellable.
31-
* If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
31+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
3232
* *closes the underlying channel* and immediately resumes with [CancellationException].
3333
*/
3434
suspend fun AsynchronousFileChannel.aLock() = suspendCancellableCoroutine<FileLock> { cont ->
@@ -39,7 +39,7 @@ suspend fun AsynchronousFileChannel.aLock() = suspendCancellableCoroutine<FileLo
3939
/**
4040
* Performs [AsynchronousFileChannel.lock] without blocking a thread and resumes when asynchronous operation completes.
4141
* This suspending function is cancellable.
42-
* If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
42+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
4343
* *closes the underlying channel* and immediately resumes with [CancellationException].
4444
*/
4545
suspend fun AsynchronousFileChannel.aLock(
@@ -54,7 +54,7 @@ suspend fun AsynchronousFileChannel.aLock(
5454
/**
5555
* Performs [AsynchronousFileChannel.read] without blocking a thread and resumes when asynchronous operation completes.
5656
* This suspending function is cancellable.
57-
* If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
57+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
5858
* *closes the underlying channel* and immediately resumes with [CancellationException].
5959
*/
6060
suspend fun AsynchronousFileChannel.aRead(
@@ -68,7 +68,7 @@ suspend fun AsynchronousFileChannel.aRead(
6868
/**
6969
* Performs [AsynchronousFileChannel.write] without blocking a thread and resumes when asynchronous operation completes.
7070
* This suspending function is cancellable.
71-
* If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
71+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
7272
* *closes the underlying channel* and immediately resumes with [CancellationException].
7373
*/
7474
suspend fun AsynchronousFileChannel.aWrite(
@@ -82,7 +82,7 @@ suspend fun AsynchronousFileChannel.aWrite(
8282
/**
8383
* Performs [AsynchronousServerSocketChannel.accept] without blocking a thread and resumes when asynchronous operation completes.
8484
* This suspending function is cancellable.
85-
* If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
85+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
8686
* *closes the underlying channel* and immediately resumes with [CancellationException].
8787
*/
8888
suspend fun AsynchronousServerSocketChannel.aAccept() = suspendCancellableCoroutine<AsynchronousSocketChannel> { cont ->
@@ -93,7 +93,7 @@ suspend fun AsynchronousServerSocketChannel.aAccept() = suspendCancellableCorout
9393
/**
9494
* Performs [AsynchronousSocketChannel.connect] without blocking a thread and resumes when asynchronous operation completes.
9595
* This suspending function is cancellable.
96-
* If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
96+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
9797
* *closes the underlying channel* and immediately resumes with [CancellationException].
9898
*/
9999
suspend fun AsynchronousSocketChannel.aConnect(
@@ -106,7 +106,7 @@ suspend fun AsynchronousSocketChannel.aConnect(
106106
/**
107107
* Performs [AsynchronousSocketChannel.read] without blocking a thread and resumes when asynchronous operation completes.
108108
* This suspending function is cancellable.
109-
* If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
109+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
110110
* *closes the underlying channel* and immediately resumes with [CancellationException].
111111
*/
112112
suspend fun AsynchronousSocketChannel.aRead(
@@ -121,7 +121,7 @@ suspend fun AsynchronousSocketChannel.aRead(
121121
/**
122122
* Performs [AsynchronousSocketChannel.write] without blocking a thread and resumes when asynchronous operation completes.
123123
* This suspending function is cancellable.
124-
* If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
124+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
125125
* *closes the underlying channel* and immediately resumes with [CancellationException].
126126
*/
127127
suspend fun AsynchronousSocketChannel.aWrite(

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/AbstractContinuation.kt

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ internal abstract class AbstractContinuation<in T>(
5555
}
5656

5757
protected fun trySuspend(): Boolean {
58-
while (true) { // lock-free loop
58+
while (true) { // lock-free loop on decision
5959
val decision = this.decision // volatile read
6060
when (decision) {
6161
UNDECIDED -> if (DECISION.compareAndSet(this, UNDECIDED, SUSPENDED)) return true
@@ -66,7 +66,7 @@ internal abstract class AbstractContinuation<in T>(
6666
}
6767

6868
protected fun tryResume(): Boolean {
69-
while (true) { // lock-free loop
69+
while (true) { // lock-free loop on decision
7070
val decision = this.decision // volatile read
7171
when (decision) {
7272
UNDECIDED -> if (DECISION.compareAndSet(this, UNDECIDED, RESUMED)) return true
@@ -79,8 +79,7 @@ internal abstract class AbstractContinuation<in T>(
7979
override fun resume(value: T) = resumeImpl(value, resumeMode)
8080

8181
protected fun resumeImpl(value: T, resumeMode: Int) {
82-
while (true) { // lock-free loop on state
83-
val state = this.state // atomic read
82+
lockFreeLoopOnState { state ->
8483
when (state) {
8584
is Incomplete -> if (updateState(state, value, resumeMode)) return
8685
is Cancelled -> return // ignore resumes on cancelled continuation
@@ -92,8 +91,7 @@ internal abstract class AbstractContinuation<in T>(
9291
override fun resumeWithException(exception: Throwable) = resumeWithExceptionImpl(exception, resumeMode)
9392

9493
protected fun resumeWithExceptionImpl(exception: Throwable, resumeMode: Int) {
95-
while (true) { // lock-free loop on state
96-
val state = this.state // atomic read
94+
lockFreeLoopOnState { state ->
9795
when (state) {
9896
is Incomplete -> {
9997
if (updateState(state, CompletedExceptionally(exception), resumeMode)) return
@@ -108,7 +106,7 @@ internal abstract class AbstractContinuation<in T>(
108106
}
109107
}
110108

111-
override fun handleCompletionException(closeException: Throwable) {
112-
handleCoroutineException(context, closeException)
109+
override fun handleException(exception: Throwable) {
110+
handleCoroutineException(context, exception)
113111
}
114112
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import kotlin.coroutines.experimental.Continuation
20+
import kotlin.coroutines.experimental.CoroutineContext
21+
22+
/**
23+
* Abstract class to simplify writing of coroutine completion objects that
24+
* implement completion [Continuation], [Job], and [CoroutineScope] interfaces.
25+
* It stores the result of continuation in the state of the job.
26+
*
27+
* @param active when `true` coroutine is created in _active_ state, when `false` in _new_ state. See [Job] for details.
28+
* @suppress **This is unstable API and it is subject to change.**
29+
*/
30+
public abstract class AbstractCoroutine<in T>(
31+
private val parentContext: CoroutineContext,
32+
active: Boolean
33+
) : JobSupport(active), Continuation<T>, CoroutineScope {
34+
@Suppress("LeakingThis")
35+
public final override val context: CoroutineContext = parentContext + this
36+
37+
final override val hasCancellingState: Boolean get() = true
38+
39+
final override fun resume(value: T) {
40+
lockFreeLoopOnState { state ->
41+
when (state) {
42+
is Incomplete -> if (updateState(state, value, MODE_ATOMIC_DEFAULT)) return
43+
is Cancelled -> return // ignore resumes on cancelled continuation
44+
else -> error("Already resumed, but got value $value")
45+
}
46+
}
47+
}
48+
49+
final override fun resumeWithException(exception: Throwable) {
50+
lockFreeLoopOnState { state ->
51+
when (state) {
52+
is Incomplete -> {
53+
if (updateState(state, CompletedExceptionally(exception), MODE_ATOMIC_DEFAULT)) return
54+
}
55+
is Cancelled -> {
56+
// ignore resumes on cancelled continuation, but handle exception if a different one is here
57+
if (exception != state.exception) handleCoroutineException(context, exception)
58+
return
59+
}
60+
else -> throw IllegalStateException("Already resumed, but got exception $exception", exception)
61+
}
62+
}
63+
}
64+
65+
final override fun handleException(exception: Throwable) {
66+
handleCoroutineException(parentContext, exception)
67+
}
68+
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ import kotlin.coroutines.experimental.suspendCoroutine
2727

2828
/**
2929
* Cancellable continuation. Its job is _completed_ when it is resumed or cancelled.
30-
* When [cancel] function is explicitly invoked, this continuation resumes with [CancellationException] or
30+
* When [cancel] function is explicitly invoked, this continuation immediately resumes with [CancellationException] or
3131
* with the specified cancel cause.
3232
*
33-
* Cancellable continuation has three states:
33+
* Cancellable continuation has three states (as subset of [Job] states):
3434
*
3535
* | **State** | [isActive] | [isCompleted] | [isCancelled] |
3636
* | ----------------------------------- | ---------- | ------------- | ------------- |
@@ -41,6 +41,9 @@ import kotlin.coroutines.experimental.suspendCoroutine
4141
* Invocation of [cancel] transitions this continuation from _active_ to _cancelled_ state, while
4242
* invocation of [resume] or [resumeWithException] transitions it from _active_ to _resumed_ state.
4343
*
44+
* A [cancelled][isCancelled] continuation implies that it is [completed][isCompleted], so
45+
* [invokeOnCancellation] and [invokeOnCompletion] have the same effect.
46+
*
4447
* Invocation of [resume] or [resumeWithException] in _resumed_ state produces [IllegalStateException]
4548
* but is ignored in _cancelled_ state.
4649
*/
@@ -50,7 +53,7 @@ public interface CancellableContinuation<in T> : Continuation<T>, Job {
5053
*
5154
* It implies that [isActive] is `false` and [isCompleted] is `true`.
5255
*/
53-
public val isCancelled: Boolean
56+
public override val isCancelled: Boolean
5457

5558
/**
5659
* Tries to resume this continuation with a given value and returns non-null object token if it was successful,
@@ -111,7 +114,7 @@ public interface CancellableContinuation<in T> : Continuation<T>, Job {
111114

112115
/**
113116
* Suspends coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
114-
* the [block]. This function throws [CancellationException] if the coroutine is cancelled while suspended.
117+
* the [block]. This function throws [CancellationException] if the coroutine is cancelled or completed while suspended.
115118
*
116119
* If [holdCancellability] optional parameter is `true`, then the coroutine is suspended, but it is not
117120
* cancellable until [CancellableContinuation.initCancellability] is invoked.

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CompletableDeferred.kt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@ public class CompletableDeferred<T> : JobSupport(true), Deferred<T> {
3838
* Repeated invocations of this function have no effect and always produce `false`.
3939
*/
4040
public fun complete(value: T): Boolean {
41-
while (true) { // lock-free loop on state
42-
val state = this.state // atomic read
41+
lockFreeLoopOnState { state ->
4342
when (state) {
4443
is Incomplete -> {
4544
// actually, we don't care about the mode here at all, so just use a default
@@ -58,8 +57,7 @@ public class CompletableDeferred<T> : JobSupport(true), Deferred<T> {
5857
* Repeated invocations of this function have no effect and always produce `false`.
5958
*/
6059
public fun completeExceptionally(exception: Throwable): Boolean {
61-
while (true) { // lock-free loop on state
62-
val state = this.state // atomic read
60+
lockFreeLoopOnState { state ->
6361
when (state) {
6462
is Incomplete -> {
6563
// actually, we don't care about the mode here at all, so just use a default

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ internal class DispatchTask<in T>(
113113
override fun run() {
114114
val job = if (cancellable) dispatched.context[Job] else null
115115
when {
116-
job != null && job.isCompleted ->
116+
job != null && job.isCancelledOrCompleted ->
117117
dispatched.resumeUndispatchedWithException(job.getCompletionException())
118118
exception -> dispatched.resumeUndispatchedWithException(value as Throwable)
119119
else -> dispatched.resumeUndispatched(value as T)
@@ -181,7 +181,7 @@ internal class DispatchedContinuation<in T>(
181181
val context = continuation.context
182182
dispatcher.dispatch(context, Runnable {
183183
withCoroutineContext(context) {
184-
if (job != null && job.isCompleted)
184+
if (job != null && job.isCancelledOrCompleted)
185185
continuation.resumeWithException(job.getCompletionException())
186186
else
187187
continuation.resume(value)

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt

Lines changed: 1 addition & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19-
import kotlin.coroutines.experimental.Continuation
2019
import kotlin.coroutines.experimental.CoroutineContext
2120

2221
/**
@@ -42,53 +41,4 @@ public interface CoroutineScope {
4241
* Returns the context of this coroutine.
4342
*/
4443
public val context: CoroutineContext
45-
}
46-
47-
/**
48-
* Abstract class to simplify writing of coroutine completion objects that
49-
* implement completion [Continuation], [Job], and [CoroutineScope] interfaces.
50-
* It stores the result of continuation in the state of the job.
51-
*
52-
* @param active when `true` coroutine is created in _active_ state, when `false` in _new_ state. See [Job] for details.
53-
* @suppress **This is unstable API and it is subject to change.**
54-
*/
55-
public abstract class AbstractCoroutine<in T>(
56-
private val parentContext: CoroutineContext,
57-
active: Boolean
58-
) : JobSupport(active), Continuation<T>, CoroutineScope {
59-
@Suppress("LeakingThis")
60-
public final override val context: CoroutineContext = parentContext + this
61-
62-
final override fun resume(value: T) {
63-
while (true) { // lock-free loop on state
64-
val state = this.state // atomic read
65-
when (state) {
66-
is Incomplete -> if (updateState(state, value, MODE_ATOMIC_DEFAULT)) return
67-
is Cancelled -> return // ignore resumes on cancelled continuation
68-
else -> error("Already resumed, but got value $value")
69-
}
70-
}
71-
}
72-
73-
final override fun resumeWithException(exception: Throwable) {
74-
while (true) { // lock-free loop on state
75-
val state = this.state // atomic read
76-
when (state) {
77-
is Incomplete -> {
78-
if (updateState(state, CompletedExceptionally(exception), MODE_ATOMIC_DEFAULT)) return
79-
}
80-
is Cancelled -> {
81-
// ignore resumes on cancelled continuation, but handle exception if a different one is here
82-
if (exception != state.exception) handleCoroutineException(context, exception)
83-
return
84-
}
85-
else -> throw IllegalStateException("Already resumed, but got exception $exception", exception)
86-
}
87-
}
88-
}
89-
90-
final override fun handleCompletionException(closeException: Throwable) {
91-
handleCoroutineException(parentContext, closeException)
92-
}
93-
}
94-
44+
}

0 commit comments

Comments
 (0)