diff --git a/firebase-firestore/CHANGELOG.md b/firebase-firestore/CHANGELOG.md index 375c5d3d447..b65d54d2333 100644 --- a/firebase-firestore/CHANGELOG.md +++ b/firebase-firestore/CHANGELOG.md @@ -8,6 +8,8 @@ [#7376](//github.com/firebase/firebase-android-sdk/issues/7376) - [changed] Improve query performance via internal memoization of calculated document data. [#7370](//github.com/firebase/firebase-android-sdk/issues/7370) +- [changed] Replace deprecated AsyncTask-based executor with mordern Kotlin dispatchers. + [#NNNN](//github.com/firebase/firebase-android-sdk/issues/NNNN) # 26.0.0 diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/Firestore.kt b/firebase-firestore/src/main/java/com/google/firebase/firestore/Firestore.kt index 9f5027b5e29..c5caebbe0b6 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/Firestore.kt +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/Firestore.kt @@ -21,8 +21,7 @@ import com.google.firebase.Firebase import com.google.firebase.FirebaseApp import com.google.firebase.components.Component import com.google.firebase.components.ComponentRegistrar -import com.google.firebase.firestore.* -import com.google.firebase.firestore.util.Executors.BACKGROUND_EXECUTOR +import com.google.firebase.firestore.util.Executors.newSequentialExecutor import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.channels.trySendBlocking @@ -233,7 +232,9 @@ fun DocumentReference.snapshots( ): Flow { return callbackFlow { val registration = - addSnapshotListener(BACKGROUND_EXECUTOR, metadataChanges) { snapshot, exception -> + addSnapshotListener(newSequentialExecutor("DocRef.snapshots"), metadataChanges) { + snapshot, + exception -> if (exception != null) { cancel(message = "Error getting DocumentReference snapshot", cause = exception) } else if (snapshot != null) { @@ -257,7 +258,9 @@ fun Query.snapshots( ): Flow { return callbackFlow { val registration = - addSnapshotListener(BACKGROUND_EXECUTOR, metadataChanges) { snapshot, exception -> + addSnapshotListener(newSequentialExecutor("Query.snapshots"), metadataChanges) { + snapshot, + exception -> if (exception != null) { cancel(message = "Error getting Query snapshot", cause = exception) } else if (snapshot != null) { diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/GrpcCallProvider.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/GrpcCallProvider.java index 4cb9f9347c3..1499dcb48c3 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/GrpcCallProvider.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/GrpcCallProvider.java @@ -237,7 +237,7 @@ private void initChannelTask() { // the AsyncQueue. this.channelTask = Tasks.call( - Executors.BACKGROUND_EXECUTOR, + Executors.SHORT_WORKLOAD_EXECUTOR, () -> { ManagedChannel channel = initChannel(context, databaseInfo); asyncQueue.enqueueAndForget(() -> onConnectivityStateChange(channel)); diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/util/BackgroundQueue.kt b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/BackgroundQueue.kt index 14f38b6c260..a35ad69d109 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/util/BackgroundQueue.kt +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/BackgroundQueue.kt @@ -15,10 +15,7 @@ */ package com.google.firebase.firestore.util -import java.util.concurrent.Executor import java.util.concurrent.Semaphore -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.asExecutor /** * Manages CPU-bound work on background threads to enable parallel processing. @@ -41,7 +38,7 @@ internal class BackgroundQueue { check(submittingState is State.Submitting) { "submit() may not be called after drain()" } submittingState.taskCount++ - executor.execute { + Executors.CPU_WORKLOAD_EXECUTOR.execute { try { runnable.run() } finally { @@ -70,17 +67,4 @@ internal class BackgroundQueue { } object Draining : State } - - companion object { - - /** - * The maximum amount of parallelism shared by all instances of this class. - * - * This is equal to the number of processor cores available, or 2, whichever is larger. - */ - val maxParallelism = Runtime.getRuntime().availableProcessors().coerceAtLeast(2) - - private val executor: Executor = - Dispatchers.IO.limitedParallelism(maxParallelism, "firestore.BackgroundQueue").asExecutor() - } } diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/util/Executors.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/Executors.java index 13a16951f61..b2d64c3b500 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/util/Executors.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/Executors.java @@ -14,20 +14,23 @@ package com.google.firebase.firestore.util; -import android.os.AsyncTask; +import static kotlinx.coroutines.ExecutorsKt.asExecutor; + import com.google.android.gms.tasks.TaskExecutors; import java.util.concurrent.Executor; +import kotlinx.coroutines.Dispatchers; /** Helper class for executors. */ public final class Executors { /** - * The maximum number of tasks we submit to AsyncTask.THREAD_POOL_EXECUTOR. - * - *

The limit is based on the number of core threads spun by THREAD_POOL_EXECUTOR and is well - * below the queue size limit of 120 pending tasks. Limiting our usage of the THREAD_POOL_EXECUTOR - * allows other users to schedule their own operations on the shared THREAD_POOL_EXECUTOR. + * The number of physical CPU cores available for multithreaded execution, or 2, whichever is + * larger. + *

+ * CPU-bound tasks should never use more than this number of concurrent threads as doing so will + * almost certainly reduce throughput due to the overhead of context switching. */ - private static final int ASYNC_THREAD_POOL_MAXIMUM_CONCURRENCY = 4; + public static final int HARDWARE_CONCURRENCY = + Math.max(2, Runtime.getRuntime().availableProcessors()); /** * The default executor for user visible callbacks. It is an executor scheduling callbacks on @@ -38,10 +41,46 @@ public final class Executors { /** An executor that executes the provided runnable immediately on the current thread. */ public static final Executor DIRECT_EXECUTOR = Runnable::run; - /** An executor that runs tasks in parallel on Android's AsyncTask.THREAD_POOL_EXECUTOR. */ - public static final Executor BACKGROUND_EXECUTOR = - new ThrottledForwardingExecutor( - ASYNC_THREAD_POOL_MAXIMUM_CONCURRENCY, AsyncTask.THREAD_POOL_EXECUTOR); + /** + * An executor suitable for short tasks that perform little or no blocking. + */ + public static final Executor SHORT_WORKLOAD_EXECUTOR = + asExecutor( + Dispatchers.getIO() + .limitedParallelism(HARDWARE_CONCURRENCY, "firestore.SHORT_WORKLOAD_EXECUTOR")); + + /** + * An executor suitable for IO-bound workloads. New threads are usually created to satisfy demand, + * and, therefore, tasks do not usually wait in a queue for execution. + */ + public static final Executor IO_WORKLOAD_EXECUTOR = asExecutor(Dispatchers.getIO()); + + /** + * An executor suitable for CPU-bound workloads. No more tasks than available CPU cores will + * execute concurrently, while other tasks line up and wait for a thread to become available, and + * are scheduled in an arbitrary order. + */ + public static final Executor CPU_WORKLOAD_EXECUTOR = + asExecutor( + Dispatchers.getIO() + .limitedParallelism(HARDWARE_CONCURRENCY, "firestore.CPU_WORKLOAD_EXECUTOR")); + + /** + * Creates and returns a new {@link Executor} that executes tasks sequentially. + *

+ * The implementation guarantees that tasks are executed sequentially and that a happens-before + * relation is established between them. This means that tasks run by this executor do _not_ need + * to synchronize access to shared resources, such as using "synchronized" blocks or "volatile" + * variables. See `kotlinx.coroutines.limitedParallelism` for full details. + *

+ * Note that there is no guarantee that tasks will all run on the _same thread_. + * + * @param name a brief name to assign to the executor, for debugging purposes. + * @return the newly-created executor. + */ + public static Executor newSequentialExecutor(String name) { + return asExecutor(Dispatchers.getIO().limitedParallelism(1, "firestore.seq." + name)); + } private Executors() { // Private constructor to prevent initialization diff --git a/firebase-firestore/src/test/java/com/google/firebase/firestore/integration/FirebaseFirestoreTest.java b/firebase-firestore/src/test/java/com/google/firebase/firestore/integration/FirebaseFirestoreTest.java index adaee0dcf87..a7ff5dd3a36 100644 --- a/firebase-firestore/src/test/java/com/google/firebase/firestore/integration/FirebaseFirestoreTest.java +++ b/firebase-firestore/src/test/java/com/google/firebase/firestore/integration/FirebaseFirestoreTest.java @@ -16,7 +16,7 @@ import static com.google.common.truth.Truth.assertThat; import static com.google.firebase.firestore.testutil.TestUtil.map; -import static com.google.firebase.firestore.util.Executors.BACKGROUND_EXECUTOR; +import static com.google.firebase.firestore.util.Executors.SHORT_WORKLOAD_EXECUTOR; import static org.mockito.Mockito.verify; import androidx.annotation.NonNull; @@ -78,9 +78,9 @@ public static String getResourcePrefixValue(DatabaseId databaseId) { private static Task waitFor(Task task) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); - task.addOnSuccessListener(BACKGROUND_EXECUTOR, t -> countDownLatch.countDown()); - task.addOnFailureListener(BACKGROUND_EXECUTOR, e -> countDownLatch.countDown()); - task.addOnCanceledListener(BACKGROUND_EXECUTOR, () -> countDownLatch.countDown()); + task.addOnSuccessListener(SHORT_WORKLOAD_EXECUTOR, t -> countDownLatch.countDown()); + task.addOnFailureListener(SHORT_WORKLOAD_EXECUTOR, e -> countDownLatch.countDown()); + task.addOnCanceledListener(SHORT_WORKLOAD_EXECUTOR, () -> countDownLatch.countDown()); countDownLatch.await(15, TimeUnit.SECONDS); return task; }