Skip to content

Commit 3fa4bec

Browse files
committed
Support CoroutineStart & CoroutineScope in JDK8 future builder
1 parent 7b10c94 commit 3fa4bec

File tree

1 file changed

+51
-22
lines changed
  • integration/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/future

1 file changed

+51
-22
lines changed

integration/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/future/Future.kt

Lines changed: 51 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,33 +19,57 @@ package kotlinx.coroutines.experimental.future
1919
import kotlinx.coroutines.experimental.*
2020
import java.util.concurrent.CompletableFuture
2121
import java.util.concurrent.CompletionStage
22+
import java.util.function.BiConsumer
2223
import kotlin.coroutines.experimental.Continuation
2324
import kotlin.coroutines.experimental.CoroutineContext
24-
import kotlin.coroutines.experimental.startCoroutine
2525
import kotlin.coroutines.experimental.suspendCoroutine
2626

2727
/**
2828
* Starts new coroutine and returns its results an an implementation of [CompletableFuture].
2929
* This coroutine builder uses [CommonPool] context by default and is conceptually similar to [CompletableFuture.supplyAsync].
3030
*
3131
* The running coroutine is cancelled when the resulting future is cancelled or otherwise completed.
32-
* If the [context] for the new coroutine is explicitly specified, then it must include [CoroutineDispatcher] element.
32+
* If the [context] for the new coroutine is explicitly specified and does not include a coroutine interceptor,
33+
* then [CoroutineDispatcher] element.
3334
* See [CoroutineDispatcher] for the standard [context] implementations that are provided by `kotlinx.coroutines`.
3435
* The specified context is added to the context of the parent running coroutine (if any) inside which this function
3536
* is invoked. The [Job] of the resulting coroutine is a child of the job of the parent coroutine (if any).
3637
*
38+
* By default, the coroutine is immediately scheduled for execution.
39+
* Other options can be specified via `start` parameter. See [CoroutineStart] for details.
40+
* A value of [CoroutineStart.LAZY] is not supported
41+
* (since `ListenableFuture` framework does not provide the corresponding capability) and
42+
* produces [IllegalArgumentException].
43+
*
3744
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
45+
*
46+
* @param context context of the coroutine
47+
* @param start coroutine start option
48+
* @param block the coroutine code
3849
*/
39-
public fun <T> future(context: CoroutineContext = CommonPool, block: suspend () -> T): CompletableFuture<T> {
50+
public fun <T> future(
51+
context: CoroutineContext = CommonPool,
52+
start: CoroutineStart = CoroutineStart.DEFAULT,
53+
block: suspend CoroutineScope.() -> T
54+
): CompletableFuture<T> {
55+
require(!start.isLazy) { "$start start is not supported" }
4056
val newContext = newCoroutineContext(CommonPool + context)
4157
val job = Job(newContext[Job])
4258
val future = CompletableFutureCoroutine<T>(newContext + job)
4359
job.cancelFutureOnCompletion(future)
4460
future.whenComplete { _, exception -> job.cancel(exception) }
45-
block.startCoroutine(future)
61+
start(block, receiver=future, completion=future)
4662
return future
4763
}
4864

65+
private class CompletableFutureCoroutine<T>(
66+
override val context: CoroutineContext
67+
) : CompletableFuture<T>(), Continuation<T>, CoroutineScope {
68+
override val isActive: Boolean get() = context[Job]!!.isActive
69+
override fun resume(value: T) { complete(value) }
70+
override fun resumeWithException(exception: Throwable) { completeExceptionally(exception) }
71+
}
72+
4973
/**
5074
* Converts this deferred value to the instance of [CompletableFuture].
5175
* The deferred value is cancelled when the resulting future is cancelled or otherwise completed.
@@ -70,20 +94,15 @@ public fun <T> Deferred<T>.asCompletableFuture(): CompletableFuture<T> {
7094
* Use `CompletableFuture.await()` for cancellation support.
7195
*/
7296
public suspend fun <T> CompletionStage<T>.await(): T = suspendCoroutine { cont: Continuation<T> ->
73-
whenComplete { result, exception ->
74-
if (exception == null) // the stage has been completed normally
75-
cont.resume(result)
76-
else // the stage has completed with an exception
77-
cont.resumeWithException(exception)
78-
}
97+
whenComplete(ContinuationConsumer(cont))
7998
}
8099

81100
/**
82101
* Awaits for completion of the future without blocking a thread.
83102
*
84103
* This suspending function is cancellable.
85104
* If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
86-
* cancels the `CompletableFuture` and immediately resumes with [CancellationException] .
105+
* stops waiting for the future and immediately resumes with [CancellationException].
87106
*/
88107
public suspend fun <T> CompletableFuture<T>.await(): T {
89108
if (isDone) { // fast path when CompletableFuture is already done (does not suspend)
@@ -99,21 +118,25 @@ public suspend fun <T> CompletableFuture<T>.await(): T {
99118
}
100119
// slow path -- suspend
101120
return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
102-
val completionFuture = whenComplete { result, exception ->
103-
if (exception == null) // the future has been completed normally
104-
cont.resume(result)
105-
else // the future has completed with an exception
106-
cont.resumeWithException(exception)
121+
val consumer = ContinuationConsumer(cont)
122+
val completionFuture = whenComplete(consumer)
123+
cont.invokeOnCompletion {
124+
completionFuture.cancel(false) // cancel future
125+
consumer.cont = null // shall clear reference to continuation, because CompletableFuture continues to keep it
107126
}
108-
cont.cancelFutureOnCompletion(completionFuture)
109127
}
110128
}
111129

112-
private class CompletableFutureCoroutine<T>(
113-
override val context: CoroutineContext
114-
) : CompletableFuture<T>(), Continuation<T> {
115-
override fun resume(value: T) { complete(value) }
116-
override fun resumeWithException(exception: Throwable) { completeExceptionally(exception) }
130+
private class ContinuationConsumer<T>(
131+
@JvmField var cont: Continuation<T>?
132+
) : BiConsumer<T?, Throwable?> {
133+
override fun accept(result: T?, exception: Throwable?) {
134+
val cont = this.cont ?: return // atomically read current value unless null, benign data race when it is set to null
135+
if (exception == null) // the future has been completed normally
136+
cont.resume(result as T)
137+
else // the future has completed with an exception
138+
cont.resumeWithException(exception)
139+
}
117140
}
118141

119142
// --------------------------------------- DEPRECATED APIs ---------------------------------------
@@ -128,3 +151,9 @@ private class CompletableFutureCoroutine<T>(
128151
@Deprecated("Renamed to `asCompletableFuture`",
129152
replaceWith = ReplaceWith("asCompletableFuture()"))
130153
public fun <T> Deferred<T>.toCompletableFuture(): CompletableFuture<T> = asCompletableFuture()
154+
155+
@Deprecated("Use the other version. This one is for binary compatibility only.", level=DeprecationLevel.HIDDEN)
156+
public fun <T> future(
157+
context: CoroutineContext = CommonPool,
158+
block: suspend () -> T
159+
): CompletableFuture<T> = future(context=context) { block() }

0 commit comments

Comments
 (0)