Skip to content

Commit c35953c

Browse files
committed
Structured concurrency for Completable/Listenable futures
BREAKING BEHAVIOR CHANGE: * kotlinx-coroutines-jdk8 and -guava integration modules future { ... } builders now honor structured concurrency in the same way as all other builders -- a failure inside the child (builder) code now cancels parent coroutine. Note that is does not affect non-structured (typical) usage like GlobalScope.future { ... } MINOR BEHAVIOR CHANGE: * Exception in installed CancellableCoroutine.invokeOnCancellation handler does not cancel the parent job, but is considered to be an uncaught exception, so it goes to CoroutineExceptionHandler. Internal changes: * JobSupport.cancelsParents=true is now a default, since there are only a fewer exceptions for builder that throw their exception from block * JobSupport.handleJobException has additional "handled" parameter to distinguish cases when parent did/did-not handle it. * handleCoroutineException logic is updated. It never cancels parent, since parent cancellation is taken care of by structured concurrency. * handleCoroutineException is always invoked with current coroutine's context (as opposed to parent) Fixes #1007
1 parent 08f8214 commit c35953c

File tree

27 files changed

+162
-100
lines changed

27 files changed

+162
-100
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,7 @@ public final class kotlinx/coroutines/CoroutineExceptionHandler$Key : kotlin/cor
173173

174174
public final class kotlinx/coroutines/CoroutineExceptionHandlerKt {
175175
public static final fun CoroutineExceptionHandler (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/CoroutineExceptionHandler;
176-
public static final fun handleCoroutineException (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Throwable;Lkotlinx/coroutines/Job;)V
177-
public static synthetic fun handleCoroutineException$default (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Throwable;Lkotlinx/coroutines/Job;ILjava/lang/Object;)V
178-
public static final fun handleExceptionViaHandler (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Throwable;)V
176+
public static final fun handleCoroutineException (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Throwable;)V
179177
}
180178

181179
public final class kotlinx/coroutines/CoroutineName : kotlin/coroutines/AbstractCoroutineContextElement {
@@ -371,7 +369,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
371369
protected fun getHandlesException ()Z
372370
public final fun getKey ()Lkotlin/coroutines/CoroutineContext$Key;
373371
public final fun getOnJoin ()Lkotlinx/coroutines/selects/SelectClause0;
374-
protected fun handleJobException (Ljava/lang/Throwable;)V
372+
protected fun handleJobException (Ljava/lang/Throwable;Z)V
375373
public final fun invokeOnCompletion (Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/DisposableHandle;
376374
public final fun invokeOnCompletion (ZZLkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/DisposableHandle;
377375
public fun isActive ()Z

integration/kotlinx-coroutines-guava/src/ListenableFuture.kt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,8 @@ public fun <T> CoroutineScope.future(
4747

4848
private class ListenableFutureCoroutine<T>(
4949
context: CoroutineContext,
50-
private val completion: SettableFuture<T>
50+
private val future: SettableFuture<T>
5151
) : AbstractCoroutine<T>(context), FutureCallback<T> {
52-
5352
/*
5453
* We register coroutine as callback to the future this coroutine completes.
5554
* But when future is cancelled externally, we'd like to cancel coroutine,
@@ -66,12 +65,13 @@ private class ListenableFutureCoroutine<T>(
6665
}
6766

6867
override fun onCompleted(value: T) {
69-
completion.set(value)
68+
future.set(value)
7069
}
7170

72-
override fun onCompletedExceptionally(exception: Throwable) {
73-
if (!completion.setException(exception)) {
74-
handleCoroutineException(parentContext, exception, this)
71+
override fun handleJobException(exception: Throwable, handled: Boolean) {
72+
if (!future.setException(exception) && !handled) {
73+
// prevents loss of exception that was not handled by parent & could not be set to SettableFuture
74+
handleCoroutineException(context, exception)
7575
}
7676
}
7777
}

integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,13 +258,24 @@ class ListenableFutureTest : TestBase() {
258258
}
259259

260260
@Test
261-
fun testChildException() = runTest {
261+
fun testStructuredException() = runTest(
262+
expected = { it is TestException } // exception propagates to parent with structured concurrency
263+
) {
264+
val result = future<Int>(Dispatchers.Unconfined) {
265+
throw TestException("FAIL")
266+
}
267+
result.checkFutureException<TestException>()
268+
}
269+
270+
@Test
271+
fun testChildException() = runTest(
272+
expected = { it is TestException } // exception propagates to parent with structured concurrency
273+
) {
262274
val result = future(Dispatchers.Unconfined) {
263275
// child crashes
264276
launch { throw TestException("FAIL") }
265277
42
266278
}
267-
268279
result.checkFutureException<TestException>()
269280
}
270281

@@ -295,7 +306,26 @@ class ListenableFutureTest : TestBase() {
295306
throw TestException()
296307
}
297308
}
309+
result.cancel(true)
310+
finish(3)
311+
}
298312

313+
@Test
314+
fun testUnhandledExceptionOnExternalCancellation() = runTest(
315+
unhandled = listOf(
316+
{ it -> it is TestException } // exception is unhandled because there is no parent
317+
)
318+
) {
319+
expect(1)
320+
// No parent here (NonCancellable), so nowhere to propagate exception
321+
val result = future(NonCancellable + Dispatchers.Unconfined) {
322+
try {
323+
delay(Long.MAX_VALUE)
324+
} finally {
325+
expect(2)
326+
throw TestException() // this exception cannot be handled
327+
}
328+
}
299329
result.cancel(true)
300330
finish(3)
301331
}

integration/kotlinx-coroutines-jdk8/src/future/Future.kt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,20 +46,20 @@ public fun <T> CoroutineScope.future(
4646

4747
private class CompletableFutureCoroutine<T>(
4848
context: CoroutineContext,
49-
private val completion: CompletableFuture<T>
49+
private val future: CompletableFuture<T>
5050
) : AbstractCoroutine<T>(context), BiConsumer<T?, Throwable?> {
51-
5251
override fun accept(value: T?, exception: Throwable?) {
5352
cancel()
5453
}
5554

5655
override fun onCompleted(value: T) {
57-
completion.complete(value)
56+
future.complete(value)
5857
}
5958

60-
override fun onCompletedExceptionally(exception: Throwable) {
61-
if (!completion.completeExceptionally(exception)) {
62-
handleCoroutineException(parentContext, exception, this)
59+
override fun handleJobException(exception: Throwable, handled: Boolean) {
60+
if (!future.completeExceptionally(exception) && !handled) {
61+
// prevents loss of exception that was not handled by parent & could not be set to CompletableFuture
62+
handleCoroutineException(context, exception)
6363
}
6464
}
6565
}

integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -368,29 +368,39 @@ class FutureTest : TestBase() {
368368
}
369369

370370
@Test
371-
fun testChildException() = runTest {
371+
fun testStructuredException() = runTest(
372+
expected = { it is TestException } // exception propagates to parent with structured concurrency
373+
) {
374+
val result = future<Int>(Dispatchers.Unconfined) {
375+
throw TestException("FAIL")
376+
}
377+
result.checkFutureException<TestException>()
378+
}
379+
380+
@Test
381+
fun testChildException() = runTest(
382+
expected = { it is TestException } // exception propagates to parent with structured concurrency
383+
) {
372384
val result = future(Dispatchers.Unconfined) {
373385
// child crashes
374386
launch { throw TestException("FAIL") }
375387
42
376388
}
377-
378389
result.checkFutureException<TestException>()
379390
}
380391

381392
@Test
382-
fun testExceptionAggregation() = runTest {
393+
fun testExceptionAggregation() = runTest(
394+
expected = { it is TestException } // exception propagates to parent with structured concurrency
395+
) {
383396
val result = future(Dispatchers.Unconfined) {
384397
// child crashes
385398
launch(start = CoroutineStart.ATOMIC) { throw TestException1("FAIL") }
386399
launch(start = CoroutineStart.ATOMIC) { throw TestException2("FAIL") }
387400
throw TestException()
388401
}
389-
390-
expect(1)
391402
result.checkFutureException<TestException>(TestException1::class, TestException2::class)
392-
yield()
393-
finish(2) // we are not cancelled
403+
finish(1)
394404
}
395405

396406
@Test
@@ -409,7 +419,9 @@ class FutureTest : TestBase() {
409419
}
410420

411421
@Test
412-
fun testExceptionOnExternalCompletion() = runTest(expected = {it is TestException}) {
422+
fun testExceptionOnExternalCompletion() = runTest(
423+
expected = { it is TestException } // exception propagates to parent with structured concurrency
424+
) {
413425
expect(1)
414426
val result = future(Dispatchers.Unconfined) {
415427
try {
@@ -419,7 +431,26 @@ class FutureTest : TestBase() {
419431
throw TestException()
420432
}
421433
}
434+
result.complete(Unit)
435+
finish(3)
436+
}
422437

438+
@Test
439+
fun testUnhandledExceptionOnExternalCompletion() = runTest(
440+
unhandled = listOf(
441+
{ it -> it is TestException } // exception is unhandled because there is no parent
442+
)
443+
) {
444+
expect(1)
445+
// No parent here (NonCancellable), so nowhere to propagate exception
446+
val result = future(NonCancellable + Dispatchers.Unconfined) {
447+
try {
448+
delay(Long.MAX_VALUE)
449+
} finally {
450+
expect(2)
451+
throw TestException() // this exception cannot be handled
452+
}
453+
}
423454
result.complete(Unit)
424455
finish(3)
425456
}

kotlinx-coroutines-core/common/src/AbstractCoroutine.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public abstract class AbstractCoroutine<in T>(
117117
}
118118

119119
internal final override fun handleOnCompletionException(exception: Throwable) {
120-
handleCoroutineException(parentContext, exception, this)
120+
handleCoroutineException(context, exception)
121121
}
122122

123123
internal override fun nameString(): String {

kotlinx-coroutines-core/common/src/Builders.common.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ private open class DeferredCoroutine<T>(
9494
parentContext: CoroutineContext,
9595
active: Boolean
9696
) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
97-
override val cancelsParent: Boolean get() = true
9897
override fun getCompleted(): T = getCompletedInternal() as T
9998
override suspend fun await(): T = awaitInternal() as T
10099
override val onAwait: SelectClause1<T> get() = this
@@ -169,8 +168,9 @@ private open class StandaloneCoroutine(
169168
parentContext: CoroutineContext,
170169
active: Boolean
171170
) : AbstractCoroutine<Unit>(parentContext, active) {
172-
override val cancelsParent: Boolean get() = true
173-
override fun handleJobException(exception: Throwable) = handleExceptionViaHandler(parentContext, exception)
171+
override fun handleJobException(exception: Throwable, handled: Boolean) {
172+
if (!handled) handleCoroutineException(context, exception)
173+
}
174174
}
175175

176176
private class LazyStandaloneCoroutine(

kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ internal open class CancellableContinuationImpl<in T>(
121121
try {
122122
block()
123123
} catch (ex: Throwable) {
124+
// Handler should never fail, if it does -- it is an unhandled exception
124125
handleCoroutineException(
125126
context,
126127
CompletionHandlerException("Exception in cancellation handler for $this", ex)

kotlinx-coroutines-core/common/src/CompletableDeferred.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ private class CompletableDeferredImpl<T>(
6666
parent: Job?
6767
) : JobSupport(true), CompletableDeferred<T>, SelectClause1<T> {
6868
init { initParentJobInternal(parent) }
69-
override val cancelsParent: Boolean get() = true
7069
override val onCancelComplete get() = true
7170
override fun getCompleted(): T = getCompletedInternal() as T
7271
override suspend fun await(): T = awaitInternal() as T

kotlinx-coroutines-core/common/src/CoroutineExceptionHandler.kt

Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,34 +9,16 @@ import kotlin.coroutines.*
99
internal expect fun handleCoroutineExceptionImpl(context: CoroutineContext, exception: Throwable)
1010

1111
/**
12-
* Helper function for coroutine builder implementations to handle uncaught exception in coroutines.
12+
* Helper function for coroutine builder implementations to handle uncaught and unexpected exceptions in coroutines,
13+
* that could not be otherwise handled in a normal way through structured concurrency, saving them to a future, and
14+
* cannot be rethrown. This is a last resort handler to prevent lost exceptions.
1315
*
14-
* It tries to handle uncaught exception in the following way:
15-
* If current exception is [CancellationException], it's ignored: [CancellationException] is a normal way to cancel
16-
* coroutine.
17-
*
18-
* If there is a [Job] in the context and it's not a [caller], then [Job.cancel] is invoked.
19-
* If invocation returned `true`, method terminates: now [Job] is responsible for handling an exception.
20-
* Otherwise, If there is [CoroutineExceptionHandler] in the context, it is used. If it throws an exception during handling
21-
* or is absent, all instances of [CoroutineExceptionHandler] found via [ServiceLoader] and [Thread.uncaughtExceptionHandler] are invoked
16+
* If there is [CoroutineExceptionHandler] in the context, then it is used. If it throws an exception during handling
17+
* or is absent, all instances of [CoroutineExceptionHandler] found via [ServiceLoader] and
18+
* [Thread.uncaughtExceptionHandler] are invoked.
2219
*/
2320
@InternalCoroutinesApi
24-
public fun handleCoroutineException(context: CoroutineContext, exception: Throwable, caller: Job? = null) {
25-
// Ignore CancellationException (they are normal ways to terminate a coroutine)
26-
if (exception is CancellationException) return // nothing to do
27-
// Try propagate exception to parent
28-
val job = context[Job]
29-
@Suppress("DEPRECATION")
30-
if (job !== null && job !== caller && job.cancel(exception)) return // handle by parent
31-
// otherwise -- use exception handlers
32-
handleExceptionViaHandler(context, exception)
33-
}
34-
35-
/**
36-
* @suppress This is an internal API and it is subject to change.
37-
*/
38-
@InternalCoroutinesApi
39-
public fun handleExceptionViaHandler(context: CoroutineContext, exception: Throwable) {
21+
public fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
4022
// Invoke exception handler from the context if present
4123
try {
4224
context[CoroutineExceptionHandler]?.let {
@@ -47,7 +29,6 @@ public fun handleExceptionViaHandler(context: CoroutineContext, exception: Throw
4729
handleCoroutineExceptionImpl(context, handlerException(exception, t))
4830
return
4931
}
50-
5132
// If handler is not present in the context or exception was thrown, fallback to the global handler
5233
handleCoroutineExceptionImpl(context, exception)
5334
}

0 commit comments

Comments
 (0)