Skip to content

Commit 3aba905

Browse files
authored
Fix await/asDeferred for MinimalState implementations (#2457)
Fixes #2456
1 parent cc3c54c commit 3aba905

File tree

2 files changed

+96
-14
lines changed

2 files changed

+96
-14
lines changed

integration/kotlinx-coroutines-jdk8/src/future/Future.kt

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -105,16 +105,19 @@ private fun Job.setupCancellation(future: CompletableFuture<*>) {
105105
}
106106

107107
/**
108-
* Converts this completion stage to an instance of [Deferred].
109-
* When this completion stage is an instance of [Future], then it is cancelled when
110-
* the resulting deferred is cancelled.
108+
* Converts this [CompletionStage] to an instance of [Deferred].
109+
*
110+
* The [CompletableFuture] that corresponds to this [CompletionStage] (see [CompletionStage.toCompletableFuture])
111+
* is cancelled when the resulting deferred is cancelled.
111112
*/
113+
@Suppress("DeferredIsResult")
112114
public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> {
115+
val future = toCompletableFuture() // retrieve the future
113116
// Fast path if already completed
114-
if (this is Future<*> && isDone()){
117+
if (future.isDone) {
115118
return try {
116119
@Suppress("UNCHECKED_CAST")
117-
CompletableDeferred(get() as T)
120+
CompletableDeferred(future.get() as T)
118121
} catch (e: Throwable) {
119122
// unwrap original cause from ExecutionException
120123
val original = (e as? ExecutionException)?.cause ?: e
@@ -132,25 +135,28 @@ public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> {
132135
result.completeExceptionally((exception as? CompletionException)?.cause ?: exception)
133136
}
134137
}
135-
if (this is Future<*>) result.cancelFutureOnCompletion(this)
138+
result.cancelFutureOnCompletion(future)
136139
return result
137140
}
138141

139142
/**
140-
* Awaits for completion of the completion stage without blocking a thread.
143+
* Awaits for completion of [CompletionStage] without blocking a thread.
141144
*
142145
* This suspending function is cancellable.
143146
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
144147
* stops waiting for the completion stage and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
145-
* This method is intended to be used with one-shot futures, so on coroutine cancellation completion stage is cancelled as well if it is instance of [CompletableFuture].
146-
* If cancelling given stage is undesired, `stage.asDeferred().await()` should be used instead.
148+
*
149+
* This method is intended to be used with one-shot futures, so on coroutine cancellation the [CompletableFuture] that
150+
* corresponds to this [CompletionStage] (see [CompletionStage.toCompletableFuture])
151+
* is cancelled. If cancelling the given stage is undesired, `stage.asDeferred().await()` should be used instead.
147152
*/
148153
public suspend fun <T> CompletionStage<T>.await(): T {
154+
val future = toCompletableFuture() // retrieve the future
149155
// fast path when CompletableFuture is already done (does not suspend)
150-
if (this is Future<*> && isDone()) {
156+
if (future.isDone) {
151157
try {
152-
@Suppress("UNCHECKED_CAST")
153-
return get() as T
158+
@Suppress("UNCHECKED_CAST", "BlockingMethodInNonBlockingContext")
159+
return future.get() as T
154160
} catch (e: ExecutionException) {
155161
throw e.cause ?: e // unwrap original cause from ExecutionException
156162
}
@@ -160,8 +166,7 @@ public suspend fun <T> CompletionStage<T>.await(): T {
160166
val consumer = ContinuationConsumer(cont)
161167
whenComplete(consumer)
162168
cont.invokeOnCancellation {
163-
// mayInterruptIfRunning is not used
164-
(this as? CompletableFuture<T>)?.cancel(false)
169+
future.cancel(false)
165170
consumer.cont = null // shall clear reference to continuation to aid GC
166171
}
167172
}

integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,4 +490,81 @@ class FutureTest : TestBase() {
490490
}
491491
}
492492
}
493+
494+
/**
495+
* https://github.com/Kotlin/kotlinx.coroutines/issues/2456
496+
*/
497+
@Test
498+
fun testCompletedStageAwait() = runTest {
499+
val stage = CompletableFuture.completedStage("OK")
500+
assertEquals("OK", stage.await())
501+
}
502+
503+
/**
504+
* https://github.com/Kotlin/kotlinx.coroutines/issues/2456
505+
*/
506+
@Test
507+
fun testCompletedStageAsDeferredAwait() = runTest {
508+
val stage = CompletableFuture.completedStage("OK")
509+
val deferred = stage.asDeferred()
510+
assertEquals("OK", deferred.await())
511+
}
512+
513+
@Test
514+
fun testCompletedStateThenApplyAwait() = runTest {
515+
expect(1)
516+
val cf = CompletableFuture<String>()
517+
launch {
518+
expect(3)
519+
cf.complete("O")
520+
}
521+
expect(2)
522+
val stage = cf.thenApply { it + "K" }
523+
assertEquals("OK", stage.await())
524+
finish(4)
525+
}
526+
527+
@Test
528+
fun testCompletedStateThenApplyAwaitCancel() = runTest {
529+
expect(1)
530+
val cf = CompletableFuture<String>()
531+
launch {
532+
expect(3)
533+
cf.cancel(false)
534+
}
535+
expect(2)
536+
val stage = cf.thenApply { it + "K" }
537+
assertFailsWith<CancellationException> { stage.await() }
538+
finish(4)
539+
}
540+
541+
@Test
542+
fun testCompletedStateThenApplyAsDeferredAwait() = runTest {
543+
expect(1)
544+
val cf = CompletableFuture<String>()
545+
launch {
546+
expect(3)
547+
cf.complete("O")
548+
}
549+
expect(2)
550+
val stage = cf.thenApply { it + "K" }
551+
val deferred = stage.asDeferred()
552+
assertEquals("OK", deferred.await())
553+
finish(4)
554+
}
555+
556+
@Test
557+
fun testCompletedStateThenApplyAsDeferredAwaitCancel() = runTest {
558+
expect(1)
559+
val cf = CompletableFuture<String>()
560+
expect(2)
561+
val stage = cf.thenApply { it + "K" }
562+
val deferred = stage.asDeferred()
563+
launch {
564+
expect(3)
565+
deferred.cancel() // cancel the deferred!
566+
}
567+
assertFailsWith<CancellationException> { stage.await() }
568+
finish(4)
569+
}
493570
}

0 commit comments

Comments
 (0)