Skip to content

Commit 06cc13f

Browse files
committed
use AsyncQueue instead of calling setTimeout directly
1 parent e5a6e73 commit 06cc13f

File tree

2 files changed

+51
-26
lines changed

2 files changed

+51
-26
lines changed

packages/firestore/src/remote/remote_store.ts

Lines changed: 50 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import { TargetData } from '../local/target_data';
2828
import { MutationResult } from '../model/mutation';
2929
import { MutationBatch, MutationBatchResult } from '../model/mutation_batch';
3030
import { debugAssert, debugCast } from '../util/assert';
31-
import { AsyncQueue } from '../util/async_queue';
31+
import { AsyncQueue, DelayedOperation, TimerId } from '../util/async_queue';
3232
import { ByteString } from '../util/byte_string';
3333
import { FirestoreError } from '../util/error';
3434
import { logDebug } from '../util/log';
@@ -77,6 +77,11 @@ const enum OfflineCause {
7777
Shutdown
7878
}
7979

80+
export interface WritePipelineEntry {
81+
mutationBatch: MutationBatch;
82+
sent: boolean;
83+
}
84+
8085
/**
8186
* RemoteStore - An interface to remotely stored data, basically providing a
8287
* wrapper around the Datastore that is more reliable for the rest of the
@@ -124,11 +129,15 @@ class RemoteStoreImpl implements RemoteStore {
124129
* purely based on order, and so we can just shift() writes from the front of
125130
* the writePipeline as we receive responses.
126131
*/
127-
writePipeline: MutationBatch[] = [];
128-
129-
sentWrites = new WeakSet<MutationBatch>();
132+
writePipeline: WritePipelineEntry[] = [];
130133

131-
writeTimeoutId: ReturnType<typeof setTimeout> | null = null;
134+
/**
135+
* The operation that is enqueued to send writes from the `writePipeline` that
136+
* have not yet been sent. The delay is used to enable batching of writes,
137+
* reducing the overall number of HTTP requests that need to be made to the
138+
* backend.
139+
*/
140+
sendWritesOperation: DelayedOperation<void> | null = null;
132141

133142
/**
134143
* A mapping of watched targets that the client cares about tracking and the
@@ -678,7 +687,7 @@ export async function fillWritePipeline(
678687
let lastBatchIdRetrieved =
679688
remoteStoreImpl.writePipeline.length > 0
680689
? remoteStoreImpl.writePipeline[remoteStoreImpl.writePipeline.length - 1]
681-
.batchId
690+
.mutationBatch.batchId
682691
: BATCHID_UNKNOWN;
683692

684693
while (canAddToWritePipeline(remoteStoreImpl)) {
@@ -736,25 +745,33 @@ function addToWritePipeline(
736745
canAddToWritePipeline(remoteStoreImpl),
737746
'addToWritePipeline called when pipeline is full'
738747
);
739-
remoteStoreImpl.writePipeline.push(batch);
748+
remoteStoreImpl.writePipeline.push({
749+
mutationBatch: batch,
750+
sent: false
751+
});
740752

741-
if (remoteStoreImpl.writeTimeoutId !== null) {
753+
if (remoteStoreImpl.sendWritesOperation !== null) {
742754
return;
743755
}
744756

745-
remoteStoreImpl.writeTimeoutId = setTimeout(() => {
746-
remoteStoreImpl.writeTimeoutId = null;
747-
const writeStream = ensureWriteStream(remoteStoreImpl);
748-
if (writeStream.isOpen() && writeStream.handshakeComplete) {
749-
for (const curBatch of remoteStoreImpl.writePipeline) {
750-
if (remoteStoreImpl.sentWrites.has(curBatch)) {
751-
continue;
757+
remoteStoreImpl.sendWritesOperation =
758+
remoteStoreImpl.asyncQueue.enqueueAfterDelay(
759+
TimerId.WriteStreamSendDelay,
760+
200,
761+
() => {
762+
remoteStoreImpl.sendWritesOperation = null;
763+
const writeStream = ensureWriteStream(remoteStoreImpl);
764+
if (writeStream.isOpen() && writeStream.handshakeComplete) {
765+
for (const pipelineEntry of remoteStoreImpl.writePipeline) {
766+
if (!pipelineEntry.sent) {
767+
writeStream.writeMutations(pipelineEntry.mutationBatch.mutations);
768+
pipelineEntry.sent = true;
769+
}
770+
}
752771
}
753-
writeStream.writeMutations(curBatch.mutations);
754-
remoteStoreImpl.sentWrites.add(curBatch);
772+
return Promise.resolve();
755773
}
756-
}
757-
}, 200);
774+
);
758775
}
759776

760777
function shouldStartWriteStream(remoteStoreImpl: RemoteStoreImpl): boolean {
@@ -784,9 +801,9 @@ async function onWriteHandshakeComplete(
784801
): Promise<void> {
785802
const writeStream = ensureWriteStream(remoteStoreImpl);
786803
// Send the write pipeline now that the stream is established.
787-
for (const batch of remoteStoreImpl.writePipeline) {
788-
writeStream.writeMutations(batch.mutations);
789-
remoteStoreImpl.sentWrites.add(batch);
804+
for (const pipelineEntry of remoteStoreImpl.writePipeline) {
805+
writeStream.writeMutations(pipelineEntry.mutationBatch.mutations);
806+
pipelineEntry.sent = true;
790807
}
791808
}
792809

@@ -801,8 +818,12 @@ async function onMutationResult(
801818
remoteStoreImpl.writePipeline.length > 0,
802819
'Got result for empty write pipeline'
803820
);
804-
const batch = remoteStoreImpl.writePipeline.shift()!;
805-
const success = MutationBatchResult.from(batch, commitVersion, results);
821+
const pipelineEntry = remoteStoreImpl.writePipeline.shift()!;
822+
const success = MutationBatchResult.from(
823+
pipelineEntry.mutationBatch,
824+
commitVersion,
825+
results
826+
);
806827

807828
debugAssert(
808829
!!remoteStoreImpl.remoteSyncer.applySuccessfulWrite,
@@ -853,7 +874,7 @@ async function handleWriteError(
853874
if (isPermanentWriteError(error.code)) {
854875
// This was a permanent error, the request itself was the problem
855876
// so it's not going to succeed if we resend it.
856-
const batch = remoteStoreImpl.writePipeline.shift()!;
877+
const pipelineEntry = remoteStoreImpl.writePipeline.shift()!;
857878

858879
// In this case it's also unlikely that the server itself is melting
859880
// down -- this was just a bad request so inhibit backoff on the next
@@ -865,7 +886,10 @@ async function handleWriteError(
865886
'rejectFailedWrite() not set'
866887
);
867888
await executeWithRecovery(remoteStoreImpl, () =>
868-
remoteStoreImpl.remoteSyncer.rejectFailedWrite!(batch.batchId, error)
889+
remoteStoreImpl.remoteSyncer.rejectFailedWrite!(
890+
pipelineEntry.mutationBatch.batchId,
891+
error
892+
)
869893
);
870894

871895
// It's possible that with the completion of this mutation

packages/firestore/src/util/async_queue.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ export const enum TimerId {
4848
ListenStreamConnectionBackoff = 'listen_stream_connection_backoff',
4949
WriteStreamIdle = 'write_stream_idle',
5050
WriteStreamConnectionBackoff = 'write_stream_connection_backoff',
51+
WriteStreamSendDelay = 'write_stream_send_delay',
5152
HealthCheckTimeout = 'health_check_timeout',
5253

5354
/**

0 commit comments

Comments
 (0)