Skip to content

Commit eb4f9be

Browse files
committed
Use fast path in CompletionStage.await() and make it cancellable;
Make CompletionStage.asDeferred cancel original future on deferred cancellation if possible.
1 parent 19c1f2e commit eb4f9be

File tree

3 files changed

+25
-40
lines changed
  • integration/kotlinx-coroutines-jdk8

3 files changed

+25
-40
lines changed

integration/kotlinx-coroutines-jdk8/README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ Extension functions:
1212

1313
| **Name** | **Description**
1414
| -------- | ---------------
15-
| [CompletionStage.await][java.util.concurrent.CompletionStage.await] | Awaits for completion of the completion stage (non-cancellable)
16-
| [CompletableFuture.await][java.util.concurrent.CompletableFuture.await] | Awaits for completion of the future (cancellable)
15+
| [CompletionStage.await][java.util.concurrent.CompletionStage.await] | Awaits for completion of the completion stage
16+
| [CompletionStage.asDeferred][java.util.concurrent.CompletionStage.asDeferred] | Converts completion stage to an instance of [Deferred]
1717
| [Deferred.asCompletableFuture][kotlinx.coroutines.experimental.Deferred.asCompletableFuture] | Converts a deferred value to the future
1818

1919
## Example
@@ -52,11 +52,11 @@ Integration with JDK8 [`CompletableFuture`][java.util.concurrent.CompletableFutu
5252
<!--- MODULE kotlinx-coroutines-core -->
5353
<!--- INDEX kotlinx.coroutines.experimental -->
5454
[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/index.html
55+
[Deferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/index.html
5556
<!--- MODULE kotlinx-coroutines-jdk8 -->
5657
<!--- INDEX kotlinx.coroutines.experimental.future -->
57-
[java.util.concurrent.CompletableFuture]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.experimental.future/java.util.concurrent.-completable-future/index.html
5858
[future]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.experimental.future/future.html
5959
[java.util.concurrent.CompletionStage.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.experimental.future/java.util.concurrent.-completion-stage/await.html
60-
[java.util.concurrent.CompletableFuture.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.experimental.future/java.util.concurrent.-completable-future/await.html
60+
[java.util.concurrent.CompletionStage.asDeferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.experimental.future/java.util.concurrent.-completion-stage/as-deferred.html
6161
[kotlinx.coroutines.experimental.Deferred.asCompletableFuture]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.experimental.future/kotlinx.coroutines.experimental.-deferred/as-completable-future.html
6262
<!--- END -->

integration/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/future/Future.kt

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -106,17 +106,9 @@ public fun <T> Deferred<T>.asCompletableFuture(): CompletableFuture<T> {
106106
}
107107

108108
/**
109-
* Awaits for completion of the completion stage without blocking a thread.
110-
*
111-
* This suspending function is not cancellable, because there is no way to cancel a `CompletionStage`.
112-
* Use `CompletableFuture.await()` for cancellable wait.
113-
*/
114-
public suspend fun <T> CompletionStage<T>.await(): T = suspendCoroutine { cont: Continuation<T> ->
115-
whenComplete(ContinuationConsumer(cont))
116-
}
117-
118-
/**
119-
* Converts this future to an instance of [Deferred].
109+
* Converts this completion stage to an instance of [Deferred].
110+
* When this completion stage is an instance of [Future], then it is cancelled when
111+
* the resulting deferred is cancelled.
120112
*/
121113
public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> {
122114
// Fast path if already completed
@@ -138,26 +130,37 @@ public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> {
138130
result.completeExceptionally(exception)
139131
}
140132
}
133+
if (this is Future<*>) result.cancelFutureOnCompletion(this)
141134
return result
142135
}
143136

144137
/**
145138
* Awaits for completion of the future without blocking a thread.
146139
*
140+
* @suppress **Deprecated**: For binary compatibility only
141+
*/
142+
@Deprecated("For binary compatibility only", level = DeprecationLevel.HIDDEN)
143+
public suspend fun <T> CompletableFuture<T>.await(): T =
144+
(this as CompletionStage<T>).await()
145+
146+
/**
147+
* Awaits for completion of the completion stage without blocking a thread.
148+
*
147149
* This suspending function is cancellable.
148150
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
149-
* stops waiting for the future and immediately resumes with [CancellationException].
151+
* stops waiting for the completion stage and immediately resumes with [CancellationException].
150152
*
151-
* Note, that `CompletableFuture` does not support prompt removal of installed listeners, so on cancellation of this wait
152-
* a few small objects will remain in the `CompletableFuture` stack of completion actions until the future completes.
153+
* Note, that `CompletionStage` implementation does not support prompt removal of installed listeners, so on cancellation of this wait
154+
* a few small objects will remain in the `CompletionStage` stack of completion actions until it completes itself.
153155
* However, the care is taken to clear the reference to the waiting coroutine itself, so that its memory can be
154-
* released even if the future never completes.
156+
* released even if the completion stage never completes.
155157
*/
156-
public suspend fun <T> CompletableFuture<T>.await(): T {
158+
public suspend fun <T> CompletionStage<T>.await(): T {
157159
// fast path when CompletableFuture is already done (does not suspend)
158-
if (isDone) {
160+
if (this is Future<*> && isDone()) {
159161
try {
160-
return get()
162+
@Suppress("UNCHECKED")
163+
return get() as T
161164
} catch (e: ExecutionException) {
162165
throw e.cause ?: e // unwrap original cause from ExecutionException
163166
}

integration/kotlinx-coroutines-jdk8/src/test/kotlin/kotlinx/coroutines/experimental/future/FutureTest.kt

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -212,24 +212,6 @@ class FutureTest : TestBase() {
212212
finish(6)
213213
}
214214

215-
@Test
216-
fun testNonCancellableAwaitCompletionStage() = runBlocking {
217-
expect(1)
218-
val completable = CompletableFuture<String>()
219-
val toAwait: CompletionStage<String> = completable
220-
val job = launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
221-
expect(2)
222-
assertThat(toAwait.await(), IsEqual("OK")) // suspends
223-
expect(5)
224-
}
225-
expect(3)
226-
job.cancel() // cancel the job
227-
completable.complete("OK") // ok, because await on completion stage is not cancellable
228-
expect(4) // job processing of was scheduled, not executed yet
229-
yield() // yield main thread to job
230-
finish(6)
231-
}
232-
233215
@Test
234216
fun testContinuationWrapped() {
235217
val depth = AtomicInteger()

0 commit comments

Comments
 (0)