Skip to content

Commit b9fb575

Browse files
authored
Stabilize CancellableContinuation.resume with onCancellation (#4090)
Additionally, give `onCancellation` extra parameters that help to avoid allocating closures in some cases. Fixes #4088
1 parent fd69663 commit b9fb575

12 files changed

+515
-98
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,11 @@ public abstract interface class kotlinx/coroutines/CancellableContinuation : kot
3939
public abstract fun isCancelled ()Z
4040
public abstract fun isCompleted ()Z
4141
public abstract fun resume (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)V
42+
public abstract fun resume (Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)V
4243
public abstract fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
4344
public abstract fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
4445
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
45-
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
46+
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Ljava/lang/Object;
4647
public abstract fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
4748
}
4849

@@ -54,7 +55,7 @@ public final class kotlinx/coroutines/CancellableContinuation$DefaultImpls {
5455
public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation, kotlinx/coroutines/Waiter {
5556
public fun <init> (Lkotlin/coroutines/Continuation;I)V
5657
public final fun callCancelHandler (Lkotlinx/coroutines/CancelHandler;Ljava/lang/Throwable;)V
57-
public final fun callOnCancellation (Lkotlin/jvm/functions/Function1;Ljava/lang/Throwable;)V
58+
public final fun callOnCancellation (Lkotlin/jvm/functions/Function3;Ljava/lang/Throwable;Ljava/lang/Object;)V
5859
public fun cancel (Ljava/lang/Throwable;)Z
5960
public fun completeResume (Ljava/lang/Object;)V
6061
public fun getCallerFrame ()Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;
@@ -70,12 +71,13 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
7071
public fun isCompleted ()Z
7172
protected fun nameString ()Ljava/lang/String;
7273
public fun resume (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)V
74+
public fun resume (Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)V
7375
public fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
7476
public fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
7577
public fun resumeWith (Ljava/lang/Object;)V
7678
public fun toString ()Ljava/lang/String;
7779
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
78-
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
80+
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Ljava/lang/Object;
7981
public fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
8082
}
8183

kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,13 +104,14 @@ abstract interface <#A: in kotlin/Any?> kotlinx.coroutines.channels/SendChannel
104104
abstract interface <#A: in kotlin/Any?> kotlinx.coroutines/CancellableContinuation : kotlin.coroutines/Continuation<#A> { // kotlinx.coroutines/CancellableContinuation|null[0]
105105
abstract fun (kotlinx.coroutines/CoroutineDispatcher).resumeUndispatched(#A) // kotlinx.coroutines/CancellableContinuation.resumeUndispatched|[email protected](1:0){}[0]
106106
abstract fun (kotlinx.coroutines/CoroutineDispatcher).resumeUndispatchedWithException(kotlin/Throwable) // kotlinx.coroutines/CancellableContinuation.resumeUndispatchedWithException|resumeUndispatchedWithException@kotlinx.coroutines.CoroutineDispatcher(kotlin.Throwable){}[0]
107+
abstract fun <#A1: #A> resume(#A1, kotlin/Function3<kotlin/Throwable, #A1, kotlin.coroutines/CoroutineContext, kotlin/Unit>?) // kotlinx.coroutines/CancellableContinuation.resume|resume(0:0;kotlin.Function3<kotlin.Throwable,0:0,kotlin.coroutines.CoroutineContext,kotlin.Unit>?){0§<1:0>}[0]
108+
abstract fun <#A1: #A> tryResume(#A1, kotlin/Any?, kotlin/Function3<kotlin/Throwable, #A1, kotlin.coroutines/CoroutineContext, kotlin/Unit>?): kotlin/Any? // kotlinx.coroutines/CancellableContinuation.tryResume|tryResume(0:0;kotlin.Any?;kotlin.Function3<kotlin.Throwable,0:0,kotlin.coroutines.CoroutineContext,kotlin.Unit>?){0§<1:0>}[0]
107109
abstract fun cancel(kotlin/Throwable? = ...): kotlin/Boolean // kotlinx.coroutines/CancellableContinuation.cancel|cancel(kotlin.Throwable?){}[0]
108110
abstract fun completeResume(kotlin/Any) // kotlinx.coroutines/CancellableContinuation.completeResume|completeResume(kotlin.Any){}[0]
109111
abstract fun initCancellability() // kotlinx.coroutines/CancellableContinuation.initCancellability|initCancellability(){}[0]
110112
abstract fun invokeOnCancellation(kotlin/Function1<kotlin/Throwable?, kotlin/Unit>) // kotlinx.coroutines/CancellableContinuation.invokeOnCancellation|invokeOnCancellation(kotlin.Function1<kotlin.Throwable?,kotlin.Unit>){}[0]
111113
abstract fun resume(#A, kotlin/Function1<kotlin/Throwable, kotlin/Unit>?) // kotlinx.coroutines/CancellableContinuation.resume|resume(1:0;kotlin.Function1<kotlin.Throwable,kotlin.Unit>?){}[0]
112114
abstract fun tryResume(#A, kotlin/Any? = ...): kotlin/Any? // kotlinx.coroutines/CancellableContinuation.tryResume|tryResume(1:0;kotlin.Any?){}[0]
113-
abstract fun tryResume(#A, kotlin/Any?, kotlin/Function1<kotlin/Throwable, kotlin/Unit>?): kotlin/Any? // kotlinx.coroutines/CancellableContinuation.tryResume|tryResume(1:0;kotlin.Any?;kotlin.Function1<kotlin.Throwable,kotlin.Unit>?){}[0]
114115
abstract fun tryResumeWithException(kotlin/Throwable): kotlin/Any? // kotlinx.coroutines/CancellableContinuation.tryResumeWithException|tryResumeWithException(kotlin.Throwable){}[0]
115116
abstract val isActive // kotlinx.coroutines/CancellableContinuation.isActive|{}isActive[0]
116117
abstract fun <get-isActive>(): kotlin/Boolean // kotlinx.coroutines/CancellableContinuation.isActive.<get-isActive>|<get-isActive>(){}[0]
@@ -774,11 +775,13 @@ open annotation class kotlinx.coroutines/ObsoleteCoroutinesApi : kotlin/Annotati
774775
}
775776
open class <#A: in kotlin/Any?> kotlinx.coroutines/CancellableContinuationImpl : kotlinx.coroutines.internal/CoroutineStackFrame, kotlinx.coroutines/CancellableContinuation<#A>, kotlinx.coroutines/DispatchedTask<#A>, kotlinx.coroutines/Waiter { // kotlinx.coroutines/CancellableContinuationImpl|null[0]
776777
constructor <init>(kotlin.coroutines/Continuation<#A>, kotlin/Int) // kotlinx.coroutines/CancellableContinuationImpl.<init>|<init>(kotlin.coroutines.Continuation<1:0>;kotlin.Int){}[0]
778+
final fun <#A1: kotlin/Any?> callOnCancellation(kotlin/Function3<kotlin/Throwable, #A1, kotlin.coroutines/CoroutineContext, kotlin/Unit>, kotlin/Throwable, #A1) // kotlinx.coroutines/CancellableContinuationImpl.callOnCancellation|callOnCancellation(kotlin.Function3<kotlin.Throwable,0:0,kotlin.coroutines.CoroutineContext,kotlin.Unit>;kotlin.Throwable;0:0){0§<kotlin.Any?>}[0]
777779
final fun callCancelHandler(kotlinx.coroutines/CancelHandler, kotlin/Throwable?) // kotlinx.coroutines/CancellableContinuationImpl.callCancelHandler|callCancelHandler(kotlinx.coroutines.CancelHandler;kotlin.Throwable?){}[0]
778-
final fun callOnCancellation(kotlin/Function1<kotlin/Throwable, kotlin/Unit>, kotlin/Throwable) // kotlinx.coroutines/CancellableContinuationImpl.callOnCancellation|callOnCancellation(kotlin.Function1<kotlin.Throwable,kotlin.Unit>;kotlin.Throwable){}[0]
779780
final fun getResult(): kotlin/Any? // kotlinx.coroutines/CancellableContinuationImpl.getResult|getResult(){}[0]
780781
open fun (kotlinx.coroutines/CoroutineDispatcher).resumeUndispatched(#A) // kotlinx.coroutines/CancellableContinuationImpl.resumeUndispatched|[email protected](1:0){}[0]
781782
open fun (kotlinx.coroutines/CoroutineDispatcher).resumeUndispatchedWithException(kotlin/Throwable) // kotlinx.coroutines/CancellableContinuationImpl.resumeUndispatchedWithException|resumeUndispatchedWithException@kotlinx.coroutines.CoroutineDispatcher(kotlin.Throwable){}[0]
783+
open fun <#A1: #A> resume(#A1, kotlin/Function3<kotlin/Throwable, #A1, kotlin.coroutines/CoroutineContext, kotlin/Unit>?) // kotlinx.coroutines/CancellableContinuationImpl.resume|resume(0:0;kotlin.Function3<kotlin.Throwable,0:0,kotlin.coroutines.CoroutineContext,kotlin.Unit>?){0§<1:0>}[0]
784+
open fun <#A1: #A> tryResume(#A1, kotlin/Any?, kotlin/Function3<kotlin/Throwable, #A1, kotlin.coroutines/CoroutineContext, kotlin/Unit>?): kotlin/Any? // kotlinx.coroutines/CancellableContinuationImpl.tryResume|tryResume(0:0;kotlin.Any?;kotlin.Function3<kotlin.Throwable,0:0,kotlin.coroutines.CoroutineContext,kotlin.Unit>?){0§<1:0>}[0]
782785
open fun cancel(kotlin/Throwable?): kotlin/Boolean // kotlinx.coroutines/CancellableContinuationImpl.cancel|cancel(kotlin.Throwable?){}[0]
783786
open fun completeResume(kotlin/Any) // kotlinx.coroutines/CancellableContinuationImpl.completeResume|completeResume(kotlin.Any){}[0]
784787
open fun getContinuationCancellationCause(kotlinx.coroutines/Job): kotlin/Throwable // kotlinx.coroutines/CancellableContinuationImpl.getContinuationCancellationCause|getContinuationCancellationCause(kotlinx.coroutines.Job){}[0]
@@ -791,7 +794,6 @@ open class <#A: in kotlin/Any?> kotlinx.coroutines/CancellableContinuationImpl :
791794
open fun resumeWith(kotlin/Result<#A>) // kotlinx.coroutines/CancellableContinuationImpl.resumeWith|resumeWith(kotlin.Result<1:0>){}[0]
792795
open fun toString(): kotlin/String // kotlinx.coroutines/CancellableContinuationImpl.toString|toString(){}[0]
793796
open fun tryResume(#A, kotlin/Any?): kotlin/Any? // kotlinx.coroutines/CancellableContinuationImpl.tryResume|tryResume(1:0;kotlin.Any?){}[0]
794-
open fun tryResume(#A, kotlin/Any?, kotlin/Function1<kotlin/Throwable, kotlin/Unit>?): kotlin/Any? // kotlinx.coroutines/CancellableContinuationImpl.tryResume|tryResume(1:0;kotlin.Any?;kotlin.Function1<kotlin.Throwable,kotlin.Unit>?){}[0]
795797
open fun tryResumeWithException(kotlin/Throwable): kotlin/Any? // kotlinx.coroutines/CancellableContinuationImpl.tryResumeWithException|tryResumeWithException(kotlin.Throwable){}[0]
796798
open val callerFrame // kotlinx.coroutines/CancellableContinuationImpl.callerFrame|{}callerFrame[0]
797799
open fun <get-callerFrame>(): kotlinx.coroutines.internal/CoroutineStackFrame? // kotlinx.coroutines/CancellableContinuationImpl.callerFrame.<get-callerFrame>|<get-callerFrame>(){}[0]
@@ -903,7 +905,7 @@ sealed interface kotlinx.coroutines.selects/SelectClause { // kotlinx.coroutines
903905
abstract val clauseObject // kotlinx.coroutines.selects/SelectClause.clauseObject|{}clauseObject[0]
904906
abstract fun <get-clauseObject>(): kotlin/Any // kotlinx.coroutines.selects/SelectClause.clauseObject.<get-clauseObject>|<get-clauseObject>(){}[0]
905907
abstract val onCancellationConstructor // kotlinx.coroutines.selects/SelectClause.onCancellationConstructor|{}onCancellationConstructor[0]
906-
abstract fun <get-onCancellationConstructor>(): kotlin/Function3<kotlinx.coroutines.selects/SelectInstance<*>, kotlin/Any?, kotlin/Any?, kotlin/Function1<kotlin/Throwable, kotlin/Unit>>? // kotlinx.coroutines.selects/SelectClause.onCancellationConstructor.<get-onCancellationConstructor>|<get-onCancellationConstructor>(){}[0]
908+
abstract fun <get-onCancellationConstructor>(): kotlin/Function3<kotlinx.coroutines.selects/SelectInstance<*>, kotlin/Any?, kotlin/Any?, kotlin/Function3<kotlin/Throwable, kotlin/Any?, kotlin.coroutines/CoroutineContext, kotlin/Unit>>? // kotlinx.coroutines.selects/SelectClause.onCancellationConstructor.<get-onCancellationConstructor>|<get-onCancellationConstructor>(){}[0]
907909
abstract val processResFunc // kotlinx.coroutines.selects/SelectClause.processResFunc|{}processResFunc[0]
908910
abstract fun <get-processResFunc>(): kotlin/Function3<kotlin/Any, kotlin/Any?, kotlin/Any?, kotlin/Any?> // kotlinx.coroutines.selects/SelectClause.processResFunc.<get-processResFunc>|<get-processResFunc>(){}[0]
909911
abstract val regFunc // kotlinx.coroutines.selects/SelectClause.regFunc|{}regFunc[0]

kotlinx-coroutines-core/common/src/CancellableContinuation.kt

Lines changed: 50 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -74,16 +74,21 @@ public interface CancellableContinuation<in T> : Continuation<T> {
7474
public fun tryResume(value: T, idempotent: Any? = null): Any?
7575

7676
/**
77-
* Same as [tryResume] but with [onCancellation] handler that called if and only if the value is not
78-
* delivered to the caller because of the dispatch in the process, so that atomicity delivery
79-
* guaranteed can be provided by having a cancellation fallback.
77+
* Same as [tryResume] but with an [onCancellation] handler that is called if and only if the value is not
78+
* delivered to the caller because of the dispatch in the process.
79+
*
80+
* The purpose of this function is to enable atomic delivery guarantees: either resumption succeeded, passing
81+
* the responsibility for [value] to the continuation, or the [onCancellation] block will be invoked,
82+
* allowing one to free the resources in [value].
8083
*
8184
* Implementation note: current implementation always returns RESUME_TOKEN or `null`
8285
*
8386
* @suppress **This is unstable API and it is subject to change.**
8487
*/
8588
@InternalCoroutinesApi
86-
public fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any?
89+
public fun <R: T> tryResume(
90+
value: R, idempotent: Any?, onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)?
91+
): Any?
8792

8893
/**
8994
* Tries to resume this continuation with the specified [exception] and returns a non-null object token if successful,
@@ -126,7 +131,7 @@ public interface CancellableContinuation<in T> : Continuation<T> {
126131
* Otherwise, the handler will be invoked as soon as this continuation is cancelled.
127132
*
128133
* The installed [handler] should not throw any exceptions.
129-
* If it does, they will get caught, wrapped into a [CompletionHandlerException] and
134+
* If it does, they will get caught, wrapped into a `CompletionHandlerException` and
130135
* processed as an uncaught exception in the context of the current coroutine
131136
* (see [CoroutineExceptionHandler]).
132137
*
@@ -168,36 +173,60 @@ public interface CancellableContinuation<in T> : Continuation<T> {
168173
@ExperimentalCoroutinesApi
169174
public fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable)
170175

176+
/** @suppress */
177+
@Deprecated(
178+
"Use the overload that also accepts the `value` and the coroutine context in lambda",
179+
level = DeprecationLevel.WARNING,
180+
replaceWith = ReplaceWith("resume(value) { cause, _, _ -> onCancellation(cause) }")
181+
) // warning since 1.9.0, was experimental
182+
public fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?)
183+
171184
/**
172-
* Resumes this continuation with the specified `value` and calls the specified `onCancellation`
173-
* handler when either resumed too late (when continuation was already cancelled) or, although resumed
174-
* successfully (before cancellation), the coroutine's job was cancelled before it had a
175-
* chance to run in its dispatcher, so that the suspended function threw an exception
176-
* instead of returning this value.
185+
* Resumes this continuation with the specified [value], calling the specified [onCancellation] if and only if
186+
* the [value] was not successfully used to resume the continuation.
187+
*
188+
* The [value] can be rejected in two cases (in both of which [onCancellation] will be called):
189+
* - Cancellation happened before the handler was resumed;
190+
* - The continuation was resumed successfully (before cancellation), but the coroutine's job was cancelled before
191+
* it had a chance to run in its dispatcher, and so the suspended function threw an exception instead of returning
192+
* this value.
177193
*
178194
* The installed [onCancellation] handler should not throw any exceptions.
179-
* If it does, they will get caught, wrapped into a [CompletionHandlerException] and
195+
* If it does, they will get caught, wrapped into a `CompletionHandlerException`, and
180196
* processed as an uncaught exception in the context of the current coroutine
181197
* (see [CoroutineExceptionHandler]).
182198
*
183-
* This function shall be used when resuming with a resource that must be closed by
184-
* code that called the corresponding suspending function, for example:
199+
* With this version of [resume], it's possible to pass resources that can not simply be left for the garbage
200+
* collector (like file handles, sockets, etc.) and need to be closed explicitly:
185201
*
186202
* ```
187-
* continuation.resume(resource) {
188-
* resource.close()
203+
* continuation.resume(resourceToResumeWith) { _, resourceToClose, _ ->
204+
* resourceToClose.close()
189205
* }
190206
* ```
191207
*
208+
* [onCancellation] accepts three arguments:
209+
*
210+
* - `cause: Throwable` is the exception with which the continuation was cancelled.
211+
* - `value` is exactly the same as the [value] passed to [resume] itself.
212+
* In the example above, `resourceToResumeWith` is exactly the same as `resourceToClose`; in particular,
213+
* one could call `resourceToResumeWith.close()` in the lambda for the same effect.
214+
* The reason to reference `resourceToClose` anyway is to avoid a memory allocation due to the lambda
215+
* capturing the `resourceToResumeWith` reference.
216+
* - `context` is the [context] of this continuation.
217+
* Like with `value`, the reason this is available as a lambda parameter, even though it is always possible to
218+
* call [context] from the lambda instead, is to allow lambdas to capture less of their environment.
219+
*
192220
* A more complete example and further details are given in
193221
* the documentation for the [suspendCancellableCoroutine] function.
194222
*
195223
* **Note**: The [onCancellation] handler must be fast, non-blocking, and thread-safe.
196224
* It can be invoked concurrently with the surrounding code.
197225
* There is no guarantee on the execution context of its invocation.
198226
*/
199-
@ExperimentalCoroutinesApi // since 1.2.0
200-
public fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?)
227+
public fun <R: T> resume(
228+
value: R, onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)?
229+
)
201230
}
202231

203232
/**
@@ -293,8 +322,10 @@ internal fun <T> CancellableContinuation<T>.invokeOnCancellation(handler: Cancel
293322
* override fun onCompleted(resource: T) {
294323
* // Resume coroutine with a value provided by the callback and ensure the resource is closed in case
295324
* // when the coroutine is cancelled before the caller gets a reference to the resource.
296-
* continuation.resume(resource) {
297-
* resource.close() // Close the resource on cancellation
325+
* continuation.resume(resource) { cause, resourceToClose, context ->
326+
* resourceToClose.close() // Close the resource on cancellation
327+
* // If we used `resource` instead of `resourceToClose`, this lambda would need to allocate a closure,
328+
* // but with `resourceToClose`, the lambda does not capture any of its environment.
298329
* }
299330
* }
300331
* // ...

0 commit comments

Comments
 (0)