|
| 1 | +/* |
| 2 | + * Copyright 2023 The Android Open Source Project |
| 3 | + * |
| 4 | + * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | + * you may not use this file except in compliance with the License. |
| 6 | + * You may obtain a copy of the License at |
| 7 | + * |
| 8 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | + * |
| 10 | + * Unless required by applicable law or agreed to in writing, software |
| 11 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | + * See the License for the specific language governing permissions and |
| 14 | + * limitations under the License. |
| 15 | + */ |
| 16 | + |
| 17 | +package androidx.test.core.view |
| 18 | + |
| 19 | +import androidx.concurrent.futures.ResolvableFuture |
| 20 | +import com.google.common.util.concurrent.ListenableFuture |
| 21 | +import java.util.concurrent.Executor |
| 22 | +import java.util.concurrent.TimeUnit |
| 23 | +import kotlin.coroutines.Continuation |
| 24 | +import kotlin.coroutines.CoroutineContext |
| 25 | +import kotlin.coroutines.EmptyCoroutineContext |
| 26 | +import kotlin.coroutines.createCoroutine |
| 27 | +import kotlin.coroutines.resume |
| 28 | +import kotlinx.coroutines.CancellationException |
| 29 | +import kotlinx.coroutines.CoroutineScope |
| 30 | +import kotlinx.coroutines.CoroutineStart |
| 31 | +import kotlinx.coroutines.Deferred |
| 32 | +import kotlinx.coroutines.Dispatchers |
| 33 | +import kotlinx.coroutines.async |
| 34 | + |
| 35 | +/** |
| 36 | + * A utility for launching suspending calls scoped and managed by a returned [ListenableFuture], |
| 37 | + * used for adapting Kotlin suspending APIs to be callable from the Java programming language. |
| 38 | + * |
| 39 | + * TODO(b/336855276): Forked from androidx.concurrent. Remove in favor of just using androidx.concurrent |
| 40 | + * 1.2.0 when available and toolchain compatibility issues have been addressed |
| 41 | + */ |
| 42 | +internal object SuspendToFutureAdapter { |
| 43 | + |
| 44 | + // the CoroutineScope() factory function is not used here as it adds a Job by default; |
| 45 | + // we don't want one as a failed task shouldn't fail a root Job. |
| 46 | + // To make SuspendToFutureAdapter behave as much like a "regular" ListenableFuture-returning |
| 47 | + // task as possible we don't want to hold additional references to child jobs from a global/root |
| 48 | + // scope, hence no SupervisorJob either. |
| 49 | + private val GlobalListenableFutureScope = object : CoroutineScope { |
| 50 | + override val coroutineContext: CoroutineContext = Dispatchers.Main |
| 51 | + } |
| 52 | + private val GlobalListenableFutureAwaitContext = Dispatchers.Unconfined |
| 53 | + |
| 54 | + /** |
| 55 | + * Launch [block] in [context], returning a [ListenableFuture] to manage the launched operation. |
| 56 | + * [block] will run **synchronously** to its first suspend point, behaving as |
| 57 | + * [CoroutineStart.UNDISPATCHED] by default; set [launchUndispatched] to false to override |
| 58 | + * and behave as [CoroutineStart.DEFAULT]. |
| 59 | + * |
| 60 | + * [launchFuture] can be used to write adapters for calling suspending functions from the |
| 61 | + * Java programming language, e.g. |
| 62 | + * |
| 63 | + * ``` |
| 64 | + * @file:JvmName("FancyServices") |
| 65 | + * |
| 66 | + * fun FancyService.requestAsync( |
| 67 | + * args: FancyServiceArgs |
| 68 | + * ): ListenableFuture<FancyResult> = SuspendToFutureAdapter.launchFuture { |
| 69 | + * request(args) |
| 70 | + * } |
| 71 | + * ``` |
| 72 | + * |
| 73 | + * which can be called from Java language source code as follows: |
| 74 | + * ``` |
| 75 | + * final ListenableFuture<FancyResult> result = FancyServices.requestAsync(service, args); |
| 76 | + * ``` |
| 77 | + * |
| 78 | + * If no [kotlinx.coroutines.CoroutineDispatcher] is provided in [context], [Dispatchers.Main] |
| 79 | + * is used as the default. [ListenableFuture.get] should not be called from the main thread |
| 80 | + * prior to the future's completion (whether it was obtained from [SuspendToFutureAdapter] |
| 81 | + * or not) as any operation performed in the process of completing the future may require |
| 82 | + * main thread event processing in order to proceed, leading to potential main thread deadlock. |
| 83 | + * |
| 84 | + * If the operation performed by [block] is known to be safe for potentially reentrant |
| 85 | + * continuation resumption, immediate dispatchers such as [Dispatchers.Unconfined] may be used |
| 86 | + * as part of [context] to avoid additional thread dispatch latency. This should not be used |
| 87 | + * as a means of supporting clients blocking the main thread using [ListenableFuture.get]; |
| 88 | + * this support can be broken by valid internal implementation changes to any transitive |
| 89 | + * dependencies of the operation performed by [block]. |
| 90 | + */ |
| 91 | + @Suppress("AsyncSuffixFuture") |
| 92 | + public fun <T> launchFuture( |
| 93 | + context: CoroutineContext = EmptyCoroutineContext, |
| 94 | + launchUndispatched: Boolean = true, |
| 95 | + block: suspend CoroutineScope.() -> T, |
| 96 | + ): ListenableFuture<T> { |
| 97 | + val resultDeferred = GlobalListenableFutureScope.async( |
| 98 | + context = context, |
| 99 | + start = if (launchUndispatched) CoroutineStart.UNDISPATCHED else CoroutineStart.DEFAULT, |
| 100 | + block = block |
| 101 | + ) |
| 102 | + return DeferredFuture(resultDeferred).also { future -> |
| 103 | + // Deferred.getCompleted is marked experimental, so external libraries can't rely on it. |
| 104 | + // Instead, use await in a raw coroutine that will invoke [resumeWith] when it returns |
| 105 | + // using the Unconfined dispatcher. |
| 106 | + resultDeferred::await.createCoroutine(future).resume(Unit) |
| 107 | + } |
| 108 | + } |
| 109 | + |
| 110 | + private class DeferredFuture<T>( |
| 111 | + private val resultDeferred: Deferred<T> |
| 112 | + ) : ListenableFuture<T>, Continuation<T> { |
| 113 | + |
| 114 | + private val delegateFuture = ResolvableFuture.create<T>() |
| 115 | + |
| 116 | + // Implements external cancellation, propagating the cancel request to resultDeferred. |
| 117 | + // delegateFuture will be cancelled if resultDeferred becomes cancelled for |
| 118 | + // internal cancellation. |
| 119 | + override fun cancel(shouldInterrupt: Boolean): Boolean = |
| 120 | + delegateFuture.cancel(shouldInterrupt).also { didCancel -> |
| 121 | + if (didCancel) { |
| 122 | + resultDeferred.cancel() |
| 123 | + } |
| 124 | + } |
| 125 | + |
| 126 | + override fun isCancelled(): Boolean = delegateFuture.isCancelled |
| 127 | + |
| 128 | + override fun isDone(): Boolean = delegateFuture.isDone |
| 129 | + |
| 130 | + override fun get(): T = delegateFuture.get() |
| 131 | + |
| 132 | + override fun get(timeout: Long, unit: TimeUnit): T = delegateFuture.get(timeout, unit) |
| 133 | + |
| 134 | + override fun addListener(listener: Runnable, executor: Executor) = |
| 135 | + delegateFuture.addListener(listener, executor) |
| 136 | + |
| 137 | + override val context: CoroutineContext |
| 138 | + get() = GlobalListenableFutureAwaitContext |
| 139 | + |
| 140 | + /** |
| 141 | + * Implementation of [Continuation] that will resume for the raw call to await |
| 142 | + * to resolve the [delegateFuture] |
| 143 | + */ |
| 144 | + override fun resumeWith(result: Result<T>) { |
| 145 | + val unused = result.fold( |
| 146 | + onSuccess = { |
| 147 | + delegateFuture.set(it) |
| 148 | + }, |
| 149 | + onFailure = { |
| 150 | + if (it is CancellationException) { |
| 151 | + delegateFuture.cancel(false) |
| 152 | + } else { |
| 153 | + delegateFuture.setException(it) |
| 154 | + } |
| 155 | + } |
| 156 | + ) |
| 157 | + } |
| 158 | + } |
| 159 | +} |
0 commit comments