Skip to content

Commit e919ebb

Browse files
fix: Resolve RejectedExecutionException that occurred on BulkWriter close or executor shutdown (#2223)
1 parent 2a9dd7d commit e919ebb

File tree

2 files changed

+36
-6
lines changed

2 files changed

+36
-6
lines changed

google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -661,8 +661,10 @@ private ApiFuture<WriteResult> executeWrite(
661661
ApiFutures.transformAsync(
662662
operation.getFuture(),
663663
result -> {
664-
pendingOpsCount--;
665-
processBufferedOperations();
664+
synchronized (lock) {
665+
pendingOpsCount--;
666+
processBufferedOperations();
667+
}
666668
return ApiFutures.immediateFuture(result);
667669
},
668670
MoreExecutors.directExecutor());
@@ -671,8 +673,10 @@ private ApiFuture<WriteResult> executeWrite(
671673
processedOperationFuture,
672674
ApiException.class,
673675
e -> {
674-
pendingOpsCount--;
675-
processBufferedOperations();
676+
synchronized (lock) {
677+
pendingOpsCount--;
678+
processBufferedOperations();
679+
}
676680
throw e;
677681
},
678682
MoreExecutors.directExecutor());
@@ -951,7 +955,7 @@ private void sendBatchLocked(final BulkCommitBatch batch) {
951955
}
952956
}
953957
},
954-
bulkWriterExecutor);
958+
MoreExecutors.directExecutor());
955959
}
956960
span.endAtFuture(result);
957961
metricsContext.recordLatencyAtFuture(MetricType.END_TO_END_LATENCY, result);

google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
158158

159159
bulkWriter =
160160
firestoreMock.bulkWriter(BulkWriterOptions.builder().setExecutor(timeoutExecutor).build());
161-
bulkWriter.autoShutdownBulkWriterExecutor = true;
161+
162162
doc1 = firestoreMock.document("coll/doc1");
163163
doc2 = firestoreMock.document("coll/doc2");
164164
}
@@ -1401,4 +1401,30 @@ public void optionsInitialAndMaxRatesAreProperlySet() throws Exception {
14011401
assertEquals(bulkWriter.getRateLimiter().getInitialCapacity(), Integer.MAX_VALUE);
14021402
assertEquals(bulkWriter.getRateLimiter().getMaximumRate(), Integer.MAX_VALUE);
14031403
}
1404+
1405+
@Test
1406+
public void closeHandlesLargeNumberOfBufferedOps() throws Exception {
1407+
final int numOps = 100;
1408+
1409+
bulkWriter.setMaxPendingOpCount(5);
1410+
bulkWriter.setMaxBatchSize(1);
1411+
bulkWriter.autoShutdownBulkWriterExecutor = true;
1412+
1413+
ResponseStubber responseStubber = new ResponseStubber();
1414+
1415+
for (int i = 0; i < numOps; i += 1) {
1416+
responseStubber.put(
1417+
batchWrite(set(LocalFirestoreHelper.SINGLE_FIELD_PROTO, "coll/doc" + i)),
1418+
successResponse(1));
1419+
}
1420+
1421+
responseStubber.initializeStub(batchWriteCapture, firestoreMock);
1422+
1423+
for (int i = 0; i < numOps; ++i) {
1424+
bulkWriter.set(firestoreMock.document("coll/doc" + i), LocalFirestoreHelper.SINGLE_FIELD_MAP);
1425+
}
1426+
bulkWriter.close();
1427+
responseStubber.verifyAllRequestsSent();
1428+
assertEquals(numOps, responseStubber.actualRequestList.size());
1429+
}
14041430
}

0 commit comments

Comments
 (0)