Skip to content

Commit 2bac00f

Browse files
committed
Re-implement jdk8 and Guava future builders on top o AbstractCoroutine in order to properly handle parallel decomposition
* Get rid of cancel(cause) * Improve test coverage for cancellation and exception handling Fixes #751
1 parent fb6ba0d commit 2bac00f

File tree

4 files changed

+151
-24
lines changed

4 files changed

+151
-24
lines changed

integration/kotlinx-coroutines-guava/src/ListenableFuture.kt

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package kotlinx.coroutines.guava
77
import com.google.common.util.concurrent.*
88
import kotlinx.coroutines.*
99
import java.util.concurrent.*
10+
import java.util.concurrent.CancellationException
1011
import kotlin.coroutines.*
1112

1213
/**
@@ -37,23 +38,40 @@ public fun <T> CoroutineScope.future(
3738
): ListenableFuture<T> {
3839
require(!start.isLazy) { "$start start is not supported" }
3940
val newContext = newCoroutineContext(context)
40-
val job = Job(newContext[Job])
41-
val future = ListenableFutureCoroutine<T>(newContext + job)
42-
job.cancelFutureOnCompletion(future)
43-
start(block, receiver = future, completion = future) // use the specified start strategy
41+
val future = SettableFuture.create<T>()
42+
val coroutine = ListenableFutureCoroutine(newContext, future)
43+
future.addCallback(coroutine, MoreExecutors.directExecutor())
44+
coroutine.start(start, coroutine, block)
4445
return future
4546
}
4647

4748
private class ListenableFutureCoroutine<T>(
48-
override val context: CoroutineContext
49-
) : AbstractFuture<T>(), Continuation<T>, CoroutineScope {
50-
override val coroutineContext: CoroutineContext get() = context
51-
override fun resumeWith(result: Result<T>) {
52-
result
53-
.onSuccess { set(it) }
54-
.onFailure { setException(it) }
49+
context: CoroutineContext,
50+
private val completion: SettableFuture<T>
51+
) : AbstractCoroutine<T>(context), FutureCallback<T> {
52+
53+
/*
54+
* We register coroutine as callback to the future this coroutine completes.
55+
* But when future is cancelled externally, we'd like to cancel coroutine,
56+
* so we register on failure handler for this purpose
57+
*/
58+
override fun onSuccess(result: T?) {
59+
// Do nothing
60+
}
61+
62+
override fun onFailure(t: Throwable) {
63+
if (t is CancellationException) {
64+
cancel()
65+
}
66+
}
67+
68+
override fun onCompleted(value: T) {
69+
completion.set(value)
70+
}
71+
72+
override fun onCompletedExceptionally(exception: Throwable) {
73+
completion.setException(exception)
5574
}
56-
override fun interruptTask() { context[Job]!!.cancel() }
5775
}
5876

5977
/**

integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import org.junit.Assert.*
1313
import org.junit.Test
1414
import java.io.*
1515
import java.util.concurrent.*
16+
import kotlin.reflect.*
1617
import kotlin.test.assertFailsWith
1718

1819
class ListenableFutureTest : TestBase() {
@@ -257,6 +258,57 @@ class ListenableFutureTest : TestBase() {
257258
}
258259
}
259260

261+
@Test
262+
fun testChildException() = runTest {
263+
val result = future(Dispatchers.Unconfined) {
264+
// child crashes
265+
launch { throw TestException("FAIL") }
266+
42
267+
}
268+
269+
result.checkFutureException<TestException>()
270+
}
271+
272+
@Test
273+
fun testExceptionAggregation() = runTest {
274+
val result = future(Dispatchers.Unconfined) {
275+
// child crashes
276+
launch(start = CoroutineStart.ATOMIC) { throw TestException1("FAIL") }
277+
launch(start = CoroutineStart.ATOMIC) { throw TestException2("FAIL") }
278+
throw TestException()
279+
}
280+
281+
expect(1)
282+
result.checkFutureException<TestException>(TestException1::class, TestException2::class)
283+
yield()
284+
finish(2) // we are not cancelled
285+
}
286+
287+
@Test
288+
fun testExternalCancellation() = runTest {
289+
val future = future(Dispatchers.Unconfined) {
290+
try {
291+
delay(Long.MAX_VALUE)
292+
} finally {
293+
expect(2)
294+
}
295+
}
296+
297+
yield()
298+
expect(1)
299+
future.cancel(true)
300+
finish(3)
301+
}
302+
303+
private inline fun <reified T: Throwable> ListenableFuture<*>.checkFutureException(vararg suppressed: KClass<out Throwable>) {
304+
val e = assertFailsWith<ExecutionException> { get() }
305+
val cause = e.cause!!
306+
assertTrue(cause is T)
307+
for ((index, clazz) in suppressed.withIndex()) {
308+
assertTrue(clazz.isInstance(cause.suppressed[index]))
309+
}
310+
}
311+
260312
private suspend fun CoroutineScope.awaitFutureWithCancel(cancellable: Boolean): ListenableFuture<Int> {
261313
val latch = CountDownLatch(1)
262314
val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())

integration/kotlinx-coroutines-jdk8/src/future/Future.kt

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,28 @@ public fun <T> CoroutineScope.future(
3737
) : CompletableFuture<T> {
3838
require(!start.isLazy) { "$start start is not supported" }
3939
val newContext = this.newCoroutineContext(context)
40-
val job = Job(newContext[Job])
41-
val future = CompletableFutureCoroutine<T>(newContext + job)
42-
job.cancelFutureOnCompletion(future)
43-
future.whenComplete { _, exception -> job.cancel(exception) }
44-
start(block, receiver = future, completion = future) // use the specified start strategy
40+
val future = CompletableFuture<T>()
41+
val coroutine = CompletableFutureCoroutine(newContext, future)
42+
future.whenComplete(coroutine) // Cancel coroutine if future was completed externally
43+
coroutine.start(start, coroutine, block)
4544
return future
4645
}
4746

4847
private class CompletableFutureCoroutine<T>(
49-
override val context: CoroutineContext
50-
) : CompletableFuture<T>(), Continuation<T>, CoroutineScope {
51-
override val coroutineContext: CoroutineContext get() = context
52-
override fun resumeWith(result: Result<T>) {
53-
result
54-
.onSuccess { complete(it) }
55-
.onFailure { completeExceptionally(it) }
48+
context: CoroutineContext,
49+
private val completion: CompletableFuture<T>
50+
) : AbstractCoroutine<T>(context), BiConsumer<T?, Throwable?> {
51+
52+
override fun accept(value: T?, exception: Throwable?) {
53+
cancel()
54+
}
55+
56+
override fun onCompleted(value: T) {
57+
completion.complete(value)
58+
}
59+
60+
override fun onCompletedExceptionally(exception: Throwable) {
61+
completion.completeExceptionally(exception)
5662
}
5763
}
5864

integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import java.util.concurrent.locks.*
1515
import java.util.function.*
1616
import kotlin.concurrent.*
1717
import kotlin.coroutines.*
18+
import kotlin.reflect.*
1819
import kotlin.test.assertFailsWith
1920

2021
class FutureTest : TestBase() {
@@ -369,6 +370,56 @@ class FutureTest : TestBase() {
369370
return future
370371
}
371372

373+
@Test
374+
fun testChildException() = runTest {
375+
val result = future(Dispatchers.Unconfined) {
376+
// child crashes
377+
launch { throw TestException("FAIL") }
378+
42
379+
}
380+
381+
result.checkFutureException<TestException>()
382+
}
383+
384+
@Test
385+
fun testExceptionAggregation() = runTest {
386+
val result = future(Dispatchers.Unconfined) {
387+
// child crashes
388+
launch(start = CoroutineStart.ATOMIC) { throw TestException1("FAIL") }
389+
launch(start = CoroutineStart.ATOMIC) { throw TestException2("FAIL") }
390+
throw TestException()
391+
}
392+
393+
expect(1)
394+
result.checkFutureException<TestException>(TestException1::class, TestException2::class)
395+
yield()
396+
finish(2) // we are not cancelled
397+
}
398+
399+
@Test
400+
fun testExternalCompletion() = runTest {
401+
expect(1)
402+
val result = future(Dispatchers.Unconfined) {
403+
try {
404+
delay(Long.MAX_VALUE)
405+
} finally {
406+
expect(2)
407+
}
408+
}
409+
410+
result.complete(Unit)
411+
finish(3)
412+
}
413+
414+
private inline fun <reified T: Throwable> CompletableFuture<*>.checkFutureException(vararg suppressed: KClass<out Throwable>) {
415+
val e = assertFailsWith<ExecutionException> { get() }
416+
val cause = e.cause!!
417+
assertTrue(cause is T)
418+
for ((index, clazz) in suppressed.withIndex()) {
419+
assertTrue(clazz.isInstance(cause.suppressed[index]))
420+
}
421+
}
422+
372423
private fun wrapContinuation(wrapper: (() -> Unit) -> Unit): CoroutineDispatcher = object : CoroutineDispatcher() {
373424
override fun dispatch(context: CoroutineContext, block: Runnable) {
374425
wrapper {

0 commit comments

Comments
 (0)