Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions firebase-firestore/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
[#7376](//github.com/firebase/firebase-android-sdk/issues/7376)
- [changed] Improve query performance via internal memoization of calculated document data.
[#7370](//github.com/firebase/firebase-android-sdk/issues/7370)
- [changed] Replace deprecated AsyncTask-based executor with mordern Kotlin dispatchers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There's a typo here ('mordern'). Also, please remember to replace the [#NNNN] placeholder on the next line with the actual issue or pull request number.

Suggested change
- [changed] Replace deprecated AsyncTask-based executor with mordern Kotlin dispatchers.
- [changed] Replace deprecated AsyncTask-based executor with modern Kotlin dispatchers.

[#NNNN](//github.com/firebase/firebase-android-sdk/issues/NNNN)

# 26.0.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import com.google.firebase.Firebase
import com.google.firebase.FirebaseApp
import com.google.firebase.components.Component
import com.google.firebase.components.ComponentRegistrar
import com.google.firebase.firestore.*
import com.google.firebase.firestore.util.Executors.BACKGROUND_EXECUTOR
import com.google.firebase.firestore.util.Executors.newSequentialExecutor
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.channels.trySendBlocking
Expand Down Expand Up @@ -233,7 +232,9 @@ fun DocumentReference.snapshots(
): Flow<DocumentSnapshot> {
return callbackFlow {
val registration =
addSnapshotListener(BACKGROUND_EXECUTOR, metadataChanges) { snapshot, exception ->
addSnapshotListener(newSequentialExecutor("DocRef.snapshots"), metadataChanges) {
snapshot,
exception ->
if (exception != null) {
cancel(message = "Error getting DocumentReference snapshot", cause = exception)
} else if (snapshot != null) {
Expand All @@ -257,7 +258,9 @@ fun Query.snapshots(
): Flow<QuerySnapshot> {
return callbackFlow {
val registration =
addSnapshotListener(BACKGROUND_EXECUTOR, metadataChanges) { snapshot, exception ->
addSnapshotListener(newSequentialExecutor("Query.snapshots"), metadataChanges) {
snapshot,
exception ->
if (exception != null) {
cancel(message = "Error getting Query snapshot", cause = exception)
} else if (snapshot != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private void initChannelTask() {
// the AsyncQueue.
this.channelTask =
Tasks.call(
Executors.BACKGROUND_EXECUTOR,
Executors.SHORT_WORKLOAD_EXECUTOR,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The initChannel() method performs blocking I/O, such as ProviderInstaller.installIfNeeded(context). According to the Javadoc in Executors.java, IO_WORKLOAD_EXECUTOR is more suitable for I/O-bound workloads than SHORT_WORKLOAD_EXECUTOR, which is intended for short tasks with little to no blocking. Using IO_WORKLOAD_EXECUTOR would be more semantically correct and robust here.

Suggested change
Executors.SHORT_WORKLOAD_EXECUTOR,
Executors.IO_WORKLOAD_EXECUTOR,

() -> {
ManagedChannel channel = initChannel(context, databaseInfo);
asyncQueue.enqueueAndForget(() -> onConnectivityStateChange(channel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
*/
package com.google.firebase.firestore.util

import java.util.concurrent.Executor
import java.util.concurrent.Semaphore
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.asExecutor

/**
* Manages CPU-bound work on background threads to enable parallel processing.
Expand All @@ -41,7 +38,7 @@ internal class BackgroundQueue {
check(submittingState is State.Submitting) { "submit() may not be called after drain()" }

submittingState.taskCount++
executor.execute {
Executors.CPU_WORKLOAD_EXECUTOR.execute {
try {
runnable.run()
} finally {
Expand Down Expand Up @@ -70,17 +67,4 @@ internal class BackgroundQueue {
}
object Draining : State
}

companion object {

/**
* The maximum amount of parallelism shared by all instances of this class.
*
* This is equal to the number of processor cores available, or 2, whichever is larger.
*/
val maxParallelism = Runtime.getRuntime().availableProcessors().coerceAtLeast(2)

private val executor: Executor =
Dispatchers.IO.limitedParallelism(maxParallelism, "firestore.BackgroundQueue").asExecutor()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,23 @@

package com.google.firebase.firestore.util;

import android.os.AsyncTask;
import static kotlinx.coroutines.ExecutorsKt.asExecutor;

import com.google.android.gms.tasks.TaskExecutors;
import java.util.concurrent.Executor;
import kotlinx.coroutines.Dispatchers;

/** Helper class for executors. */
public final class Executors {
/**
* The maximum number of tasks we submit to AsyncTask.THREAD_POOL_EXECUTOR.
*
* <p>The limit is based on the number of core threads spun by THREAD_POOL_EXECUTOR and is well
* below the queue size limit of 120 pending tasks. Limiting our usage of the THREAD_POOL_EXECUTOR
* allows other users to schedule their own operations on the shared THREAD_POOL_EXECUTOR.
* The number of physical CPU cores available for multithreaded execution, or 2, whichever is
* larger.
* <p>
* CPU-bound tasks should never use more than this number of concurrent threads as doing so will
* almost certainly reduce throughput due to the overhead of context switching.
*/
private static final int ASYNC_THREAD_POOL_MAXIMUM_CONCURRENCY = 4;
public static final int HARDWARE_CONCURRENCY =
Math.max(2, Runtime.getRuntime().availableProcessors());

/**
* The default executor for user visible callbacks. It is an executor scheduling callbacks on
Expand All @@ -38,10 +41,46 @@ public final class Executors {
/** An executor that executes the provided runnable immediately on the current thread. */
public static final Executor DIRECT_EXECUTOR = Runnable::run;

/** An executor that runs tasks in parallel on Android's AsyncTask.THREAD_POOL_EXECUTOR. */
public static final Executor BACKGROUND_EXECUTOR =
new ThrottledForwardingExecutor(
ASYNC_THREAD_POOL_MAXIMUM_CONCURRENCY, AsyncTask.THREAD_POOL_EXECUTOR);
/**
* An executor suitable for short tasks that perform little or no blocking.
*/
public static final Executor SHORT_WORKLOAD_EXECUTOR =
asExecutor(
Dispatchers.getIO()
.limitedParallelism(HARDWARE_CONCURRENCY, "firestore.SHORT_WORKLOAD_EXECUTOR"));

/**
* An executor suitable for IO-bound workloads. New threads are usually created to satisfy demand,
* and, therefore, tasks do not usually wait in a queue for execution.
*/
public static final Executor IO_WORKLOAD_EXECUTOR = asExecutor(Dispatchers.getIO());

/**
* An executor suitable for CPU-bound workloads. No more tasks than available CPU cores will
* execute concurrently, while other tasks line up and wait for a thread to become available, and
* are scheduled in an arbitrary order.
*/
public static final Executor CPU_WORKLOAD_EXECUTOR =
asExecutor(
Dispatchers.getIO()
.limitedParallelism(HARDWARE_CONCURRENCY, "firestore.CPU_WORKLOAD_EXECUTOR"));
Comment on lines +63 to +66
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There are a couple of issues with this executor definition:

  1. CPU_WORKLOAD_EXECUTOR and SHORT_WORKLOAD_EXECUTOR have identical implementations (Dispatchers.getIO().limitedParallelism(...)), which is confusing given their different names and Javadocs.
  2. For CPU-bound tasks, it's idiomatic to use Dispatchers.getDefault(). Using Dispatchers.getIO() is generally for I/O-bound tasks.

I suggest changing CPU_WORKLOAD_EXECUTOR to use Dispatchers.getDefault() to align with its intended purpose and distinguish it from SHORT_WORKLOAD_EXECUTOR.

Suggested change
public static final Executor CPU_WORKLOAD_EXECUTOR =
asExecutor(
Dispatchers.getIO()
.limitedParallelism(HARDWARE_CONCURRENCY, "firestore.CPU_WORKLOAD_EXECUTOR"));
public static final Executor CPU_WORKLOAD_EXECUTOR =
asExecutor(
Dispatchers.getDefault()
.limitedParallelism(HARDWARE_CONCURRENCY, "firestore.CPU_WORKLOAD_EXECUTOR"));


/**
* Creates and returns a new {@link Executor} that executes tasks sequentially.
* <p>
* The implementation guarantees that tasks are executed sequentially and that a happens-before
* relation is established between them. This means that tasks run by this executor do _not_ need
* to synchronize access to shared resources, such as using "synchronized" blocks or "volatile"
* variables. See `kotlinx.coroutines.limitedParallelism` for full details.
* <p>
* Note that there is no guarantee that tasks will all run on the _same thread_.
*
* @param name a brief name to assign to the executor, for debugging purposes.
* @return the newly-created executor.
*/
public static Executor newSequentialExecutor(String name) {
return asExecutor(Dispatchers.getIO().limitedParallelism(1, "firestore.seq." + name));
}

private Executors() {
// Private constructor to prevent initialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import static com.google.common.truth.Truth.assertThat;
import static com.google.firebase.firestore.testutil.TestUtil.map;
import static com.google.firebase.firestore.util.Executors.BACKGROUND_EXECUTOR;
import static com.google.firebase.firestore.util.Executors.SHORT_WORKLOAD_EXECUTOR;
import static org.mockito.Mockito.verify;

import androidx.annotation.NonNull;
Expand Down Expand Up @@ -78,9 +78,9 @@ public static String getResourcePrefixValue(DatabaseId databaseId) {

private static <T> Task<T> waitFor(Task<T> task) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
task.addOnSuccessListener(BACKGROUND_EXECUTOR, t -> countDownLatch.countDown());
task.addOnFailureListener(BACKGROUND_EXECUTOR, e -> countDownLatch.countDown());
task.addOnCanceledListener(BACKGROUND_EXECUTOR, () -> countDownLatch.countDown());
task.addOnSuccessListener(SHORT_WORKLOAD_EXECUTOR, t -> countDownLatch.countDown());
task.addOnFailureListener(SHORT_WORKLOAD_EXECUTOR, e -> countDownLatch.countDown());
task.addOnCanceledListener(SHORT_WORKLOAD_EXECUTOR, () -> countDownLatch.countDown());
countDownLatch.await(15, TimeUnit.SECONDS);
return task;
}
Expand Down
Loading