Skip to content

Commit fcabad4

Browse files
committed
firestore: convert AsyncQueue to Kotlin dispatchers
1 parent 9a18f74 commit fcabad4

File tree

6 files changed

+65
-37
lines changed

6 files changed

+65
-37
lines changed

firebase-firestore/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
[#7376](//github.com/firebase/firebase-android-sdk/issues/7376)
99
- [changed] Improve query performance via internal memoization of calculated document data.
1010
[#7370](//github.com/firebase/firebase-android-sdk/issues/7370)
11+
- [changed] Replace deprecated AsyncTask-based executor with mordern Kotlin dispatchers.
12+
[#NNNN](//github.com/firebase/firebase-android-sdk/issues/NNNN)
1113

1214
# 26.0.0
1315

firebase-firestore/src/main/java/com/google/firebase/firestore/Firestore.kt

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ import com.google.firebase.Firebase
2121
import com.google.firebase.FirebaseApp
2222
import com.google.firebase.components.Component
2323
import com.google.firebase.components.ComponentRegistrar
24-
import com.google.firebase.firestore.*
25-
import com.google.firebase.firestore.util.Executors.BACKGROUND_EXECUTOR
24+
import com.google.firebase.firestore.util.Executors.newSequentialExecutor
2625
import kotlinx.coroutines.cancel
2726
import kotlinx.coroutines.channels.awaitClose
2827
import kotlinx.coroutines.channels.trySendBlocking
@@ -233,7 +232,9 @@ fun DocumentReference.snapshots(
233232
): Flow<DocumentSnapshot> {
234233
return callbackFlow {
235234
val registration =
236-
addSnapshotListener(BACKGROUND_EXECUTOR, metadataChanges) { snapshot, exception ->
235+
addSnapshotListener(newSequentialExecutor("DocRef.snapshots"), metadataChanges) {
236+
snapshot,
237+
exception ->
237238
if (exception != null) {
238239
cancel(message = "Error getting DocumentReference snapshot", cause = exception)
239240
} else if (snapshot != null) {
@@ -257,7 +258,9 @@ fun Query.snapshots(
257258
): Flow<QuerySnapshot> {
258259
return callbackFlow {
259260
val registration =
260-
addSnapshotListener(BACKGROUND_EXECUTOR, metadataChanges) { snapshot, exception ->
261+
addSnapshotListener(newSequentialExecutor("Query.snapshots"), metadataChanges) {
262+
snapshot,
263+
exception ->
261264
if (exception != null) {
262265
cancel(message = "Error getting Query snapshot", cause = exception)
263266
} else if (snapshot != null) {

firebase-firestore/src/main/java/com/google/firebase/firestore/remote/GrpcCallProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ private void initChannelTask() {
237237
// the AsyncQueue.
238238
this.channelTask =
239239
Tasks.call(
240-
Executors.BACKGROUND_EXECUTOR,
240+
Executors.SHORT_WORKLOAD_EXECUTOR,
241241
() -> {
242242
ManagedChannel channel = initChannel(context, databaseInfo);
243243
asyncQueue.enqueueAndForget(() -> onConnectivityStateChange(channel));

firebase-firestore/src/main/java/com/google/firebase/firestore/util/BackgroundQueue.kt

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,7 @@
1515
*/
1616
package com.google.firebase.firestore.util
1717

18-
import java.util.concurrent.Executor
1918
import java.util.concurrent.Semaphore
20-
import kotlinx.coroutines.Dispatchers
21-
import kotlinx.coroutines.asExecutor
2219

2320
/**
2421
* Manages CPU-bound work on background threads to enable parallel processing.
@@ -41,7 +38,7 @@ internal class BackgroundQueue {
4138
check(submittingState is State.Submitting) { "submit() may not be called after drain()" }
4239

4340
submittingState.taskCount++
44-
executor.execute {
41+
Executors.CPU_WORKLOAD_EXECUTOR.execute {
4542
try {
4643
runnable.run()
4744
} finally {
@@ -70,17 +67,4 @@ internal class BackgroundQueue {
7067
}
7168
object Draining : State
7269
}
73-
74-
companion object {
75-
76-
/**
77-
* The maximum amount of parallelism shared by all instances of this class.
78-
*
79-
* This is equal to the number of processor cores available, or 2, whichever is larger.
80-
*/
81-
val maxParallelism = Runtime.getRuntime().availableProcessors().coerceAtLeast(2)
82-
83-
private val executor: Executor =
84-
Dispatchers.IO.limitedParallelism(maxParallelism, "firestore.BackgroundQueue").asExecutor()
85-
}
8670
}

firebase-firestore/src/main/java/com/google/firebase/firestore/util/Executors.java

Lines changed: 50 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,23 @@
1414

1515
package com.google.firebase.firestore.util;
1616

17-
import android.os.AsyncTask;
17+
import static kotlinx.coroutines.ExecutorsKt.asExecutor;
18+
1819
import com.google.android.gms.tasks.TaskExecutors;
1920
import java.util.concurrent.Executor;
21+
import kotlinx.coroutines.Dispatchers;
2022

2123
/** Helper class for executors. */
2224
public final class Executors {
2325
/**
24-
* The maximum number of tasks we submit to AsyncTask.THREAD_POOL_EXECUTOR.
25-
*
26-
* <p>The limit is based on the number of core threads spun by THREAD_POOL_EXECUTOR and is well
27-
* below the queue size limit of 120 pending tasks. Limiting our usage of the THREAD_POOL_EXECUTOR
28-
* allows other users to schedule their own operations on the shared THREAD_POOL_EXECUTOR.
26+
* The number of physical CPU cores available for multithreaded execution, or 2, whichever is
27+
* larger.
28+
* <p>
29+
* CPU-bound tasks should never use more than this number of concurrent threads as doing so will
30+
* almost certainly reduce throughput due to the overhead of context switching.
2931
*/
30-
private static final int ASYNC_THREAD_POOL_MAXIMUM_CONCURRENCY = 4;
32+
public static final int HARDWARE_CONCURRENCY =
33+
Math.max(2, Runtime.getRuntime().availableProcessors());
3134

3235
/**
3336
* The default executor for user visible callbacks. It is an executor scheduling callbacks on
@@ -38,10 +41,46 @@ public final class Executors {
3841
/** An executor that executes the provided runnable immediately on the current thread. */
3942
public static final Executor DIRECT_EXECUTOR = Runnable::run;
4043

41-
/** An executor that runs tasks in parallel on Android's AsyncTask.THREAD_POOL_EXECUTOR. */
42-
public static final Executor BACKGROUND_EXECUTOR =
43-
new ThrottledForwardingExecutor(
44-
ASYNC_THREAD_POOL_MAXIMUM_CONCURRENCY, AsyncTask.THREAD_POOL_EXECUTOR);
44+
/**
45+
* An executor suitable for short tasks that perform little or no blocking.
46+
*/
47+
public static final Executor SHORT_WORKLOAD_EXECUTOR =
48+
asExecutor(
49+
Dispatchers.getIO()
50+
.limitedParallelism(HARDWARE_CONCURRENCY, "firestore.SHORT_WORKLOAD_EXECUTOR"));
51+
52+
/**
53+
* An executor suitable for IO-bound workloads. New threads are usually created to satisfy demand,
54+
* and, therefore, tasks do not usually wait in a queue for execution.
55+
*/
56+
public static final Executor IO_WORKLOAD_EXECUTOR = asExecutor(Dispatchers.getIO());
57+
58+
/**
59+
* An executor suitable for CPU-bound workloads. No more tasks than available CPU cores will
60+
* execute concurrently, while other tasks line up and wait for a thread to become available, and
61+
* are scheduled in an arbitrary order.
62+
*/
63+
public static final Executor CPU_WORKLOAD_EXECUTOR =
64+
asExecutor(
65+
Dispatchers.getIO()
66+
.limitedParallelism(HARDWARE_CONCURRENCY, "firestore.CPU_WORKLOAD_EXECUTOR"));
67+
68+
/**
69+
* Creates and returns a new {@link Executor} that executes tasks sequentially.
70+
* <p>
71+
* The implementation guarantees that tasks are executed sequentially and that a happens-before
72+
* relation is established between them. This means that tasks run by this executor do _not_ need
73+
* to synchronize access to shared resources, such as using "synchronized" blocks or "volatile"
74+
* variables. See `kotlinx.coroutines.limitedParallelism` for full details.
75+
* <p>
76+
* Note that there is no guarantee that tasks will all run on the _same thread_.
77+
*
78+
* @param name a brief name to assign to the executor, for debugging purposes.
79+
* @return the newly-created executor.
80+
*/
81+
public static Executor newSequentialExecutor(String name) {
82+
return asExecutor(Dispatchers.getIO().limitedParallelism(1, "firestore.seq." + name));
83+
}
4584

4685
private Executors() {
4786
// Private constructor to prevent initialization

firebase-firestore/src/test/java/com/google/firebase/firestore/integration/FirebaseFirestoreTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import static com.google.common.truth.Truth.assertThat;
1818
import static com.google.firebase.firestore.testutil.TestUtil.map;
19-
import static com.google.firebase.firestore.util.Executors.BACKGROUND_EXECUTOR;
19+
import static com.google.firebase.firestore.util.Executors.SHORT_WORKLOAD_EXECUTOR;
2020
import static org.mockito.Mockito.verify;
2121

2222
import androidx.annotation.NonNull;
@@ -78,9 +78,9 @@ public static String getResourcePrefixValue(DatabaseId databaseId) {
7878

7979
private static <T> Task<T> waitFor(Task<T> task) throws InterruptedException {
8080
CountDownLatch countDownLatch = new CountDownLatch(1);
81-
task.addOnSuccessListener(BACKGROUND_EXECUTOR, t -> countDownLatch.countDown());
82-
task.addOnFailureListener(BACKGROUND_EXECUTOR, e -> countDownLatch.countDown());
83-
task.addOnCanceledListener(BACKGROUND_EXECUTOR, () -> countDownLatch.countDown());
81+
task.addOnSuccessListener(SHORT_WORKLOAD_EXECUTOR, t -> countDownLatch.countDown());
82+
task.addOnFailureListener(SHORT_WORKLOAD_EXECUTOR, e -> countDownLatch.countDown());
83+
task.addOnCanceledListener(SHORT_WORKLOAD_EXECUTOR, () -> countDownLatch.countDown());
8484
countDownLatch.await(15, TimeUnit.SECONDS);
8585
return task;
8686
}

0 commit comments

Comments
 (0)