Skip to content

Commit 627619d

Browse files
authored
fix: BulkWriter flush logic (#1778)
* Fix race condition * Pretty * Optimize
1 parent 7ae2952 commit 627619d

File tree

1 file changed

+27
-24
lines changed
  • google-cloud-firestore/src/main/java/com/google/cloud/firestore

1 file changed

+27
-24
lines changed

google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,10 @@ enum OperationType {
210210
@GuardedBy("lock")
211211
private ApiFuture<Void> lastOperation = ApiFutures.immediateFuture(null);
212212

213+
/** A pointer to the lastOperation pointer as of last flush operation. */
214+
@GuardedBy("lock")
215+
private ApiFuture<Void> lastFlushOperation = lastOperation;
216+
213217
/** Whether this BulkWriter instance is closed. Once closed, it cannot be opened again. */
214218
@GuardedBy("lock")
215219
private boolean closed = false;
@@ -699,7 +703,10 @@ public ApiFuture<Void> flush() {
699703

700704
private ApiFuture<Void> flushLocked() {
701705
verifyNotClosedLocked();
702-
scheduleCurrentBatchLocked(/* flush= */ true);
706+
if (!lastOperation.isDone()) {
707+
lastFlushOperation = lastOperation;
708+
scheduleCurrentBatchLocked();
709+
}
703710
return lastOperation;
704711
}
705712

@@ -873,14 +880,8 @@ public void addWriteErrorListener(@Nonnull Executor executor, WriteErrorCallback
873880
}
874881
}
875882

876-
/**
877-
* Sends the current batch and resets {@link #bulkCommitBatch}.
878-
*
879-
* @param flush If provided, keeps re-sending operations until no more operations are enqueued.
880-
* This allows retries to resolve as part of a {@link BulkWriter#flush()} or {@link
881-
* BulkWriter#close()} call.
882-
*/
883-
private void scheduleCurrentBatchLocked(final boolean flush) {
883+
/** Sends the current batch and resets {@link #bulkCommitBatch}. */
884+
private void scheduleCurrentBatchLocked() {
884885
if (bulkCommitBatch.getMutationsSize() == 0) return;
885886

886887
final BulkCommitBatch pendingBatch = bulkCommitBatch;
@@ -898,15 +899,15 @@ private void scheduleCurrentBatchLocked(final boolean flush) {
898899
bulkWriterExecutor.schedule(
899900
() -> {
900901
synchronized (lock) {
901-
sendBatchLocked(pendingBatch, flush);
902+
sendBatchLocked(pendingBatch);
902903
}
903904
},
904905
backoffMsWithJitter,
905906
TimeUnit.MILLISECONDS);
906907
}
907908

908909
/** Sends the provided batch once the rate limiter does not require any delay. */
909-
private void sendBatchLocked(final BulkCommitBatch batch, final boolean flush) {
910+
private void sendBatchLocked(final BulkCommitBatch batch) {
910911
// Send the batch if it does not require any delay, or schedule another attempt after the
911912
// appropriate timeout.
912913
boolean underRateLimit = rateLimiter.tryMakeRequest(batch.getMutationsSize());
@@ -926,15 +927,17 @@ private void sendBatchLocked(final BulkCommitBatch batch, final boolean flush) {
926927

927928
try (Scope ignored = span.makeCurrent()) {
928929
ApiFuture<Void> result = batch.bulkCommit();
929-
result.addListener(
930-
() -> {
931-
if (flush) {
932-
synchronized (lock) {
933-
scheduleCurrentBatchLocked(/* flush= */ true);
930+
if (!lastFlushOperation.isDone()) {
931+
result.addListener(
932+
() -> {
933+
if (!lastFlushOperation.isDone()) {
934+
synchronized (lock) {
935+
scheduleCurrentBatchLocked();
936+
}
934937
}
935-
}
936-
},
937-
bulkWriterExecutor);
938+
},
939+
bulkWriterExecutor);
940+
}
938941
span.endAtFuture(result);
939942
metricsContext.recordLatencyAtFuture(MetricType.END_TO_END_LATENCY, result);
940943
} catch (Exception error) {
@@ -948,7 +951,7 @@ private void sendBatchLocked(final BulkCommitBatch batch, final boolean flush) {
948951
bulkWriterExecutor.schedule(
949952
() -> {
950953
synchronized (lock) {
951-
sendBatchLocked(batch, flush);
954+
sendBatchLocked(batch);
952955
}
953956
},
954957
delayMs,
@@ -991,15 +994,15 @@ private void sendOperationLocked(
991994
// that the batch is under the 10MiB limit.
992995
if (op.getBackoffDuration() > 0) {
993996
if (bulkCommitBatch.getMutationsSize() >= RETRY_MAX_BATCH_SIZE) {
994-
scheduleCurrentBatchLocked(/* flush= */ false);
997+
scheduleCurrentBatchLocked();
995998
}
996999
bulkCommitBatch.setMaxBatchSize(RETRY_MAX_BATCH_SIZE);
9971000
}
9981001

9991002
if (bulkCommitBatch.has(op.getDocumentReference())) {
10001003
// Create a new batch since the backend doesn't support batches with two writes to the same
10011004
// document.
1002-
scheduleCurrentBatchLocked(/* flush= */ false);
1005+
scheduleCurrentBatchLocked();
10031006
}
10041007

10051008
// Run the operation on the current batch and advance the `lastOperation` pointer. This
@@ -1008,8 +1011,8 @@ private void sendOperationLocked(
10081011
bulkCommitBatch.enqueueOperation(op);
10091012
enqueueOperationOnBatchCallback.apply(bulkCommitBatch);
10101013

1011-
if (bulkCommitBatch.getMutationsSize() == bulkCommitBatch.getMaxBatchSize()) {
1012-
scheduleCurrentBatchLocked(/* flush= */ false);
1014+
if (bulkCommitBatch.getMutationsSize() >= bulkCommitBatch.getMaxBatchSize()) {
1015+
scheduleCurrentBatchLocked();
10131016
}
10141017
}
10151018

0 commit comments

Comments
 (0)