Skip to content

Commit 7bd2c50

Browse files
Jonathan Cornazelizarov
authored andcommitted
Add conversion of CompletionStage to Deferred
Fixes #262
1 parent 562c1bb commit 7bd2c50

File tree

2 files changed

+90
-2
lines changed
  • integration/kotlinx-coroutines-jdk8/src
    • main/kotlin/kotlinx/coroutines/experimental/future
    • test/kotlin/kotlinx/coroutines/experimental/future

2 files changed

+90
-2
lines changed

integration/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/future/Future.kt

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import kotlinx.coroutines.experimental.*
2020
import java.util.concurrent.CompletableFuture
2121
import java.util.concurrent.CompletionStage
2222
import java.util.concurrent.ExecutionException
23+
import java.util.concurrent.Future
2324
import java.util.function.BiConsumer
2425
import kotlin.coroutines.experimental.Continuation
2526
import kotlin.coroutines.experimental.ContinuationInterceptor
@@ -114,6 +115,34 @@ public suspend fun <T> CompletionStage<T>.await(): T = suspendCoroutine { cont:
114115
whenComplete(ContinuationConsumer(cont))
115116
}
116117

118+
/**
119+
* Converts this future to an instance of [Deferred].
120+
*/
121+
public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> {
122+
123+
// Fast path if already completed
124+
if (this is Future<*> && isDone()){
125+
return try {
126+
@Suppress("UNCHECKED_CAST")
127+
CompletableDeferred(get() as T)
128+
} catch (t: Throwable) {
129+
CompletableDeferred<T>().also { it.completeExceptionally(t) }
130+
}
131+
}
132+
133+
val result = CompletableDeferred<T>()
134+
135+
whenComplete { value, exception ->
136+
if (exception == null) {
137+
result.complete(value)
138+
} else {
139+
result.completeExceptionally(exception)
140+
}
141+
}
142+
143+
return result
144+
}
145+
117146
/**
118147
* Awaits for completion of the future without blocking a thread.
119148
*

integration/kotlinx-coroutines-jdk8/src/test/kotlin/kotlinx/coroutines/experimental/future/FutureTest.kt

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import java.util.concurrent.CompletableFuture
2525
import java.util.concurrent.CompletionStage
2626
import java.util.concurrent.ExecutionException
2727
import java.util.concurrent.atomic.AtomicInteger
28+
import java.util.concurrent.locks.ReentrantLock
29+
import kotlin.concurrent.withLock
2830
import kotlin.coroutines.experimental.CoroutineContext
2931

3032
class FutureTest : TestBase() {
@@ -240,13 +242,13 @@ class FutureTest : TestBase() {
240242
assertEquals("Part before first suspension must be wrapped", 1, depth.get())
241243
val result =
242244
CompletableFuture.supplyAsync {
243-
while (depth.get() > 0) ;
245+
while (depth.get() > 0);
244246
assertEquals("Part inside suspension point should not be wrapped", 0, depth.get())
245247
"OK"
246248
}.await()
247249
assertEquals("Part after first suspension should be wrapped", 1, depth.get())
248250
CompletableFuture.supplyAsync {
249-
while (depth.get() > 0) ;
251+
while (depth.get() > 0);
250252
assertEquals("Part inside suspension point should not be wrapped", 0, depth.get())
251253
"ignored"
252254
}.await()
@@ -255,6 +257,63 @@ class FutureTest : TestBase() {
255257
assertThat(future.get(), IsEqual("OK"))
256258
}
257259

260+
@Test
261+
fun testCompletableFutureStageAsDeferred() = runBlocking {
262+
val lock = ReentrantLock().apply { lock() }
263+
264+
val deferred: Deferred<Int> = CompletableFuture.supplyAsync {
265+
lock.withLock { 42 }
266+
}.asDeferred()
267+
268+
assertFalse(deferred.isCompleted)
269+
lock.unlock()
270+
271+
assertEquals(42, deferred.await())
272+
assertTrue(deferred.isCompleted)
273+
}
274+
275+
@Test
276+
fun testCompletedFutureAsDeferred() = runBlocking {
277+
val deferred: Deferred<Int> = CompletableFuture.completedFuture(42).asDeferred()
278+
assertEquals(42, deferred.await())
279+
}
280+
281+
@Test
282+
fun testFailedFutureAsDeferred() = runBlocking {
283+
val future = CompletableFuture<Int>().apply { completeExceptionally(Exception("something went wrong")) }
284+
val deferred = future.asDeferred()
285+
286+
assertTrue(deferred.isCompletedExceptionally)
287+
assertEquals("something went wrong", deferred.getCompletionExceptionOrNull()!!.cause!!.message)
288+
289+
try {
290+
deferred.await()
291+
fail("deferred.await() should throw an exception")
292+
} catch (e: Exception) {
293+
assertEquals("something went wrong", e.cause!!.message)
294+
}
295+
}
296+
297+
@Test
298+
fun testCompletableFutureWithExceptionAsDeferred() = runBlocking {
299+
val lock = ReentrantLock().apply { lock() }
300+
301+
val deferred: Deferred<Int> = CompletableFuture.supplyAsync {
302+
lock.withLock { throw Exception("something went wrong") }
303+
}.asDeferred()
304+
305+
assertFalse(deferred.isCompleted)
306+
lock.unlock()
307+
308+
try {
309+
deferred.await()
310+
fail("deferred.await() should throw an exception")
311+
} catch (e: Exception) {
312+
assertTrue(deferred.isCompletedExceptionally)
313+
assertEquals("something went wrong", e.cause!!.message)
314+
}
315+
}
316+
258317
private fun wrapContinuation(wrapper: (() -> Unit) -> Unit): CoroutineDispatcher = object : CoroutineDispatcher() {
259318
override fun dispatch(context: CoroutineContext, block: Runnable) {
260319
wrapper {

0 commit comments

Comments
 (0)