Skip to content

Commit 7301774

Browse files
fix: Implement BulkWriter shutdown logic (#2202)
* Implement BulkWriter shutdown logic * chore: generate libraries at Wed Jul 23 23:21:35 UTC 2025 * Fix * Fix comment --------- Co-authored-by: cloud-java-bot <[email protected]>
1 parent b53338a commit 7301774

File tree

2 files changed

+43
-28
lines changed

2 files changed

+43
-28
lines changed

google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,12 @@ enum OperationType {
160160
// don't want to block a gax/grpc executor while running user error and success callbacks.
161161
private final ScheduledExecutorService bulkWriterExecutor;
162162

163+
/**
164+
* The BulkWriter will shutdown executor when closed and all writes are done. This is important to
165+
* prevent leaking threads.
166+
*/
167+
boolean autoShutdownBulkWriterExecutor;
168+
163169
/** The maximum number of writes that can be in a single batch. */
164170
private int maxBatchSize = MAX_BATCH_SIZE;
165171

@@ -230,7 +236,7 @@ enum OperationType {
230236
@GuardedBy("lock")
231237
private Executor errorExecutor;
232238

233-
Context traceContext;
239+
private final Context traceContext;
234240

235241
/**
236242
* Used to track when writes are enqueued. The user handler executors cannot be changed after a
@@ -241,10 +247,13 @@ enum OperationType {
241247

242248
BulkWriter(FirestoreImpl firestore, BulkWriterOptions options) {
243249
this.firestore = firestore;
244-
this.bulkWriterExecutor =
245-
options.getExecutor() != null
246-
? options.getExecutor()
247-
: Executors.newSingleThreadScheduledExecutor();
250+
if (options.getExecutor() != null) {
251+
this.bulkWriterExecutor = options.getExecutor();
252+
this.autoShutdownBulkWriterExecutor = false;
253+
} else {
254+
this.bulkWriterExecutor = Executors.newSingleThreadScheduledExecutor();
255+
this.autoShutdownBulkWriterExecutor = true;
256+
}
248257
this.successExecutor = MoreExecutors.directExecutor();
249258
this.errorExecutor = MoreExecutors.directExecutor();
250259
this.bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, maxBatchSize);
@@ -707,7 +716,7 @@ private ApiFuture<Void> flushLocked() {
707716
lastFlushOperation = lastOperation;
708717
scheduleCurrentBatchLocked();
709718
}
710-
return lastOperation;
719+
return lastFlushOperation;
711720
}
712721

713722
/**
@@ -722,10 +731,16 @@ private ApiFuture<Void> flushLocked() {
722731
public void close() throws InterruptedException, ExecutionException {
723732
ApiFuture<Void> flushFuture;
724733
synchronized (lock) {
725-
flushFuture = flushLocked();
726-
closed = true;
734+
if (!closed) {
735+
flushLocked();
736+
closed = true;
737+
}
738+
flushFuture = lastFlushOperation;
727739
}
728740
flushFuture.get();
741+
if (autoShutdownBulkWriterExecutor) {
742+
bulkWriterExecutor.shutdown();
743+
}
729744
}
730745

731746
/**

google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static com.google.cloud.firestore.LocalFirestoreHelper.update;
2525
import static org.junit.Assert.assertArrayEquals;
2626
import static org.junit.Assert.assertEquals;
27+
import static org.junit.Assert.assertFalse;
2728
import static org.junit.Assert.assertTrue;
2829
import static org.junit.Assert.fail;
2930
import static org.mockito.Mockito.*;
@@ -97,24 +98,13 @@ public class BulkWriterTest {
9798

9899
@Rule public Timeout timeout = new Timeout(2, TimeUnit.SECONDS);
99100

100-
@Spy private final FirestoreRpc firestoreRpc = Mockito.mock(FirestoreRpc.class);
101-
102101
private ScheduledExecutorService testExecutor;
103102

104-
/** Executor that executes delayed tasks without delay. */
105-
private final ScheduledExecutorService immediateExecutor =
106-
new ScheduledThreadPoolExecutor(1) {
107-
@Override
108-
@Nonnull
109-
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
110-
return super.schedule(command, 0, TimeUnit.MILLISECONDS);
111-
}
112-
};
113-
114103
@Spy
115104
private final FirestoreImpl firestoreMock =
116105
new FirestoreImpl(
117-
FirestoreOptions.newBuilder().setProjectId("test-project").build(), firestoreRpc);
106+
FirestoreOptions.newBuilder().setProjectId("test-project").build(),
107+
Mockito.mock(FirestoreRpc.class));
118108

119109
@Captor private ArgumentCaptor<BatchWriteRequest> batchWriteCapture;
120110

@@ -155,7 +145,6 @@ public static ApiFuture<BatchWriteResponse> mergeResponses(
155145

156146
@Before
157147
public void before() {
158-
lenient().doReturn(immediateExecutor).when(firestoreRpc).getExecutor();
159148
testExecutor = Executors.newSingleThreadScheduledExecutor();
160149

161150
timeoutExecutor =
@@ -169,23 +158,27 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
169158

170159
bulkWriter =
171160
firestoreMock.bulkWriter(BulkWriterOptions.builder().setExecutor(timeoutExecutor).build());
161+
bulkWriter.autoShutdownBulkWriterExecutor = true;
172162
doc1 = firestoreMock.document("coll/doc1");
173163
doc2 = firestoreMock.document("coll/doc2");
174164
}
175165

176166
@After
177167
public void after() throws InterruptedException {
168+
shutdownScheduledExecutorService(testExecutor);
178169
shutdownScheduledExecutorService(timeoutExecutor);
179170
}
180171

181172
void shutdownScheduledExecutorService(ScheduledExecutorService executorService)
182173
throws InterruptedException {
174+
executorService.shutdown();
183175
// Wait for the executor to finish after each test.
184176
//
185177
// This ensures the executor service is shut down properly within the given timeout, and thereby
186178
// avoids potential hangs caused by lingering threads. Note that if a given thread is terminated
187179
// because of the timeout, the associated test will fail, which is what we want.
188180
executorService.awaitTermination(100, TimeUnit.MILLISECONDS);
181+
assertTrue(executorService.isTerminated());
189182
}
190183

191184
@Test
@@ -369,12 +362,19 @@ public void cannotCallMethodsAfterClose() throws Exception {
369362
} catch (Exception e) {
370363
assertEquals(expected, e.getMessage());
371364
}
372-
try {
373-
bulkWriter.close();
374-
fail("close() should have failed");
375-
} catch (Exception e) {
376-
assertEquals(expected, e.getMessage());
377-
}
365+
// Close is idempotent and can be called multiple time.
366+
bulkWriter.close();
367+
}
368+
369+
@Test
370+
public void closeWillShutdownExecutor() throws Exception {
371+
// We ONLY shutdown executor when the executor was created within the BulkWriter.
372+
// To simulate this, we set the autoShutdownBulkWriterExecutor field to true.
373+
bulkWriter.autoShutdownBulkWriterExecutor = true;
374+
375+
assertFalse(timeoutExecutor.isShutdown());
376+
bulkWriter.close();
377+
assertTrue(timeoutExecutor.isShutdown());
378378
}
379379

380380
@Test

0 commit comments

Comments
 (0)