Skip to content
This repository was archived by the owner on Sep 26, 2023. It is now read-only.

Commit ff4b61e

Browse files
authored
batching: fix permit leak (#567)
Previously flow control was released only if the RPC returns successfully. This caused us to leak permits if RPCs fail. This commit makes us unconditionally release permits.
1 parent 7f216a9 commit ff4b61e

File tree

3 files changed

+130
-44
lines changed

3 files changed

+130
-44
lines changed

gax/src/main/java/com/google/api/gax/batching/AccumulatingBatchReceiver.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,20 @@
3030
package com.google.api.gax.batching;
3131

3232
import com.google.api.core.ApiFuture;
33-
import com.google.api.core.ApiFutures;
3433
import com.google.api.core.BetaApi;
3534
import java.util.ArrayList;
3635
import java.util.List;
36+
import java.util.concurrent.ConcurrentLinkedQueue;
3737

38-
/** A simple ThresholdBatchReceiver that just accumulates batches. Not thread-safe. */
38+
/** A simple ThresholdBatchReceiver that just accumulates batches. */
3939
@BetaApi("The surface for batching is not stable yet and may change in the future.")
4040
public final class AccumulatingBatchReceiver<T> implements ThresholdBatchReceiver<T> {
41-
private final List<T> batches = new ArrayList<>();
41+
private final ConcurrentLinkedQueue<T> batches = new ConcurrentLinkedQueue<>();
42+
private final ApiFuture<?> retFuture;
43+
44+
public AccumulatingBatchReceiver(ApiFuture<?> retFuture) {
45+
this.retFuture = retFuture;
46+
}
4247

4348
@Override
4449
public void validateBatch(T message) {
@@ -48,11 +53,14 @@ public void validateBatch(T message) {
4853
@Override
4954
public ApiFuture<?> processBatch(T batch) {
5055
batches.add(batch);
51-
return ApiFutures.<Void>immediateFuture(null);
56+
return retFuture;
5257
}
5358

54-
/** Returns the accumulated batches. */
59+
/**
60+
* Returns the accumulated batches. If called concurrently with {@code processBatch}, the new
61+
* batch may or may not be returned.
62+
*/
5563
public List<T> getBatches() {
56-
return batches;
64+
return new ArrayList<>(batches);
5765
}
5866
}

gax/src/main/java/com/google/api/gax/batching/ThresholdBatcher.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@
3333

3434
import com.google.api.core.ApiFunction;
3535
import com.google.api.core.ApiFuture;
36+
import com.google.api.core.ApiFutureCallback;
3637
import com.google.api.core.ApiFutures;
3738
import com.google.api.core.BetaApi;
39+
import com.google.api.core.SettableApiFuture;
3840
import com.google.api.gax.batching.FlowController.FlowControlException;
3941
import com.google.common.annotations.VisibleForTesting;
4042
import com.google.common.base.Preconditions;
@@ -216,10 +218,35 @@ public ApiFuture<Void> pushCurrentBatch() {
216218
final E batch = removeBatch();
217219
if (batch == null) {
218220
return ApiFutures.immediateFuture(null);
219-
} else {
220-
return ApiFutures.transform(
221-
receiver.processBatch(batch), new ReleaseResourcesFunction<>(batch), directExecutor());
222221
}
222+
223+
final SettableApiFuture<Void> retFuture = SettableApiFuture.create();
224+
225+
// It is tempting to use transform to both release and get ApiFuture<Void>.
226+
// This is incorrect because we also need to release on failure.
227+
//
228+
// It is also tempting to transform to get ApiFuture<Void> and addListener
229+
// separately to release. This probably works as most users expect,
230+
// but makes this class hard to test: retFuture.get() returning
231+
// won't guarantee that flow control has been released.
232+
ApiFutures.addCallback(
233+
receiver.processBatch(batch),
234+
new ApiFutureCallback<Object>() {
235+
@Override
236+
public void onSuccess(Object obj) {
237+
flowController.release(batch);
238+
retFuture.set(null);
239+
}
240+
241+
@Override
242+
public void onFailure(Throwable t) {
243+
flowController.release(batch);
244+
retFuture.setException(t);
245+
}
246+
},
247+
directExecutor());
248+
249+
return retFuture;
223250
}
224251

225252
private E removeBatch() {

gax/src/test/java/com/google/api/gax/batching/ThresholdBatcherTest.java

Lines changed: 86 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,18 @@
2929
*/
3030
package com.google.api.gax.batching;
3131

32+
import static com.google.common.truth.Truth.assertThat;
33+
34+
import com.google.api.core.ApiFutures;
3235
import com.google.api.gax.batching.FlowController.FlowControlException;
3336
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
34-
import com.google.common.truth.Truth;
3537
import java.util.ArrayList;
3638
import java.util.Arrays;
3739
import java.util.List;
40+
import java.util.concurrent.ExecutionException;
3841
import java.util.concurrent.ScheduledExecutorService;
3942
import java.util.concurrent.ScheduledThreadPoolExecutor;
43+
import org.junit.Assert;
4044
import org.junit.Rule;
4145
import org.junit.Test;
4246
import org.junit.rules.ExpectedException;
@@ -142,21 +146,23 @@ private static ThresholdBatcher.Builder<SimpleBatch> createSimpleBatcherBuidler(
142146

143147
@Test
144148
public void testAdd() throws Exception {
145-
AccumulatingBatchReceiver<SimpleBatch> receiver = new AccumulatingBatchReceiver<>();
149+
AccumulatingBatchReceiver<SimpleBatch> receiver =
150+
new AccumulatingBatchReceiver<>(ApiFutures.<Void>immediateFuture(null));
146151
ThresholdBatcher<SimpleBatch> batcher = createSimpleBatcherBuidler(receiver).build();
147152
batcher.add(SimpleBatch.fromInteger(14));
148-
Truth.assertThat(batcher.isEmpty()).isFalse();
149-
Truth.assertThat(receiver.getBatches().size()).isEqualTo(0);
153+
assertThat(batcher.isEmpty()).isFalse();
154+
assertThat(receiver.getBatches()).hasSize(0);
150155

151156
batcher.pushCurrentBatch().get();
152-
Truth.assertThat(batcher.isEmpty()).isTrue();
153-
Truth.assertThat(receiver.getBatches().size()).isEqualTo(1);
154-
Truth.assertThat(receiver.getBatches().get(0).getIntegers()).isEqualTo(Arrays.asList(14));
157+
assertThat(batcher.isEmpty()).isTrue();
158+
assertThat(receiver.getBatches()).hasSize(1);
159+
assertThat(receiver.getBatches().get(0).getIntegers()).isEqualTo(Arrays.asList(14));
155160
}
156161

157162
@Test
158163
public void testBatching() throws Exception {
159-
AccumulatingBatchReceiver<SimpleBatch> receiver = new AccumulatingBatchReceiver<>();
164+
AccumulatingBatchReceiver<SimpleBatch> receiver =
165+
new AccumulatingBatchReceiver<>(ApiFutures.<Void>immediateFuture(null));
160166
ThresholdBatcher<SimpleBatch> batcher =
161167
createSimpleBatcherBuidler(receiver)
162168
.setThresholds(BatchingThresholds.<SimpleBatch>create(2))
@@ -166,13 +172,13 @@ public void testBatching() throws Exception {
166172
batcher.add(SimpleBatch.fromInteger(5));
167173
// Give time for the executor to push the batch
168174
Thread.sleep(100);
169-
Truth.assertThat(receiver.getBatches().size()).isEqualTo(1);
175+
assertThat(receiver.getBatches()).hasSize(1);
170176

171177
batcher.add(SimpleBatch.fromInteger(7));
172178
batcher.add(SimpleBatch.fromInteger(9));
173179
// Give time for the executor to push the batch
174180
Thread.sleep(100);
175-
Truth.assertThat(receiver.getBatches().size()).isEqualTo(2);
181+
assertThat(receiver.getBatches()).hasSize(2);
176182

177183
batcher.add(SimpleBatch.fromInteger(11));
178184

@@ -184,20 +190,21 @@ public void testBatching() throws Exception {
184190
for (SimpleBatch batch : receiver.getBatches()) {
185191
actual.add(batch.getIntegers());
186192
}
187-
Truth.assertThat(actual).isEqualTo(expected);
193+
assertThat(actual).isEqualTo(expected);
188194
}
189195

190196
@Test
191197
public void testBatchingWithDelay() throws Exception {
192-
AccumulatingBatchReceiver<SimpleBatch> receiver = new AccumulatingBatchReceiver<>();
198+
AccumulatingBatchReceiver<SimpleBatch> receiver =
199+
new AccumulatingBatchReceiver<>(ApiFutures.<Void>immediateFuture(null));
193200
ThresholdBatcher<SimpleBatch> batcher =
194201
createSimpleBatcherBuidler(receiver).setMaxDelay(Duration.ofMillis(100)).build();
195202

196203
batcher.add(SimpleBatch.fromInteger(3));
197204
batcher.add(SimpleBatch.fromInteger(5));
198205
// Give time for the delay to trigger and push the batch
199206
Thread.sleep(500);
200-
Truth.assertThat(receiver.getBatches().size()).isEqualTo(1);
207+
assertThat(receiver.getBatches()).hasSize(1);
201208

202209
batcher.add(SimpleBatch.fromInteger(11));
203210

@@ -208,7 +215,7 @@ public void testBatchingWithDelay() throws Exception {
208215
for (SimpleBatch batch : receiver.getBatches()) {
209216
actual.add(batch.getIntegers());
210217
}
211-
Truth.assertThat(actual).isEqualTo(expected);
218+
assertThat(actual).isEqualTo(expected);
212219
}
213220

214221
@Test
@@ -218,35 +225,37 @@ public void testExceptionWithNullFlowController() {
218225
.setThresholds(BatchingThresholds.<SimpleBatch>create(100))
219226
.setExecutor(EXECUTOR)
220227
.setMaxDelay(Duration.ofMillis(10000))
221-
.setReceiver(new AccumulatingBatchReceiver<SimpleBatch>())
228+
.setReceiver(
229+
new AccumulatingBatchReceiver<SimpleBatch>(ApiFutures.<Void>immediateFuture(null)))
222230
.setBatchMerger(new SimpleBatchMerger())
223231
.build();
224232
}
225233

226234
@Test
227235
public void testBatchingWithFlowControl() throws Exception {
228-
AccumulatingBatchReceiver<SimpleBatch> receiver = new AccumulatingBatchReceiver<>();
236+
AccumulatingBatchReceiver<SimpleBatch> receiver =
237+
new AccumulatingBatchReceiver<>(ApiFutures.<Void>immediateFuture(null));
229238
ThresholdBatcher<SimpleBatch> batcher =
230239
createSimpleBatcherBuidler(receiver)
231240
.setThresholds(BatchingThresholds.<SimpleBatch>create(2))
232241
.setFlowController(
233242
getTrackedIntegerBatchingFlowController(2L, null, LimitExceededBehavior.Block))
234243
.build();
235244

236-
Truth.assertThat(trackedFlowController.getElementsReserved()).isEqualTo(0);
237-
Truth.assertThat(trackedFlowController.getElementsReleased()).isEqualTo(0);
238-
Truth.assertThat(trackedFlowController.getBytesReserved()).isEqualTo(0);
239-
Truth.assertThat(trackedFlowController.getBytesReleased()).isEqualTo(0);
245+
assertThat(trackedFlowController.getElementsReserved()).isEqualTo(0);
246+
assertThat(trackedFlowController.getElementsReleased()).isEqualTo(0);
247+
assertThat(trackedFlowController.getBytesReserved()).isEqualTo(0);
248+
assertThat(trackedFlowController.getBytesReleased()).isEqualTo(0);
240249

241250
batcher.add(SimpleBatch.fromInteger(3));
242251
batcher.add(SimpleBatch.fromInteger(5));
243252
batcher.add(
244253
SimpleBatch.fromInteger(7)); // We expect to block here until the first batch is handled
245-
Truth.assertThat(receiver.getBatches().size()).isEqualTo(1);
254+
assertThat(receiver.getBatches()).hasSize(1);
246255
batcher.add(SimpleBatch.fromInteger(9));
247256
batcher.add(
248257
SimpleBatch.fromInteger(11)); // We expect to block here until the second batch is handled
249-
Truth.assertThat(receiver.getBatches().size()).isEqualTo(2);
258+
assertThat(receiver.getBatches()).hasSize(2);
250259

251260
batcher.pushCurrentBatch().get();
252261

@@ -256,17 +265,18 @@ public void testBatchingWithFlowControl() throws Exception {
256265
for (SimpleBatch batch : receiver.getBatches()) {
257266
actual.add(batch.getIntegers());
258267
}
259-
Truth.assertThat(actual).isEqualTo(expected);
268+
assertThat(actual).isEqualTo(expected);
260269

261-
Truth.assertThat(trackedFlowController.getElementsReserved())
270+
assertThat(trackedFlowController.getElementsReserved())
262271
.isEqualTo(trackedFlowController.getElementsReleased());
263-
Truth.assertThat(trackedFlowController.getBytesReserved())
272+
assertThat(trackedFlowController.getBytesReserved())
264273
.isEqualTo(trackedFlowController.getBytesReleased());
265274
}
266275

267276
@Test
268277
public void testBatchingFlowControlExceptionRecovery() throws Exception {
269-
AccumulatingBatchReceiver<SimpleBatch> receiver = new AccumulatingBatchReceiver<>();
278+
AccumulatingBatchReceiver<SimpleBatch> receiver =
279+
new AccumulatingBatchReceiver<>(ApiFutures.<Void>immediateFuture(null));
270280
ThresholdBatcher<SimpleBatch> batcher =
271281
createSimpleBatcherBuidler(receiver)
272282
.setThresholds(BatchingThresholds.<SimpleBatch>create(4))
@@ -275,21 +285,21 @@ public void testBatchingFlowControlExceptionRecovery() throws Exception {
275285
3L, null, LimitExceededBehavior.ThrowException))
276286
.build();
277287

278-
Truth.assertThat(trackedFlowController.getElementsReserved()).isEqualTo(0);
279-
Truth.assertThat(trackedFlowController.getElementsReleased()).isEqualTo(0);
280-
Truth.assertThat(trackedFlowController.getBytesReserved()).isEqualTo(0);
281-
Truth.assertThat(trackedFlowController.getBytesReleased()).isEqualTo(0);
288+
assertThat(trackedFlowController.getElementsReserved()).isEqualTo(0);
289+
assertThat(trackedFlowController.getElementsReleased()).isEqualTo(0);
290+
assertThat(trackedFlowController.getBytesReserved()).isEqualTo(0);
291+
assertThat(trackedFlowController.getBytesReleased()).isEqualTo(0);
282292

283293
batcher.add(SimpleBatch.fromInteger(3));
284294
batcher.add(SimpleBatch.fromInteger(5));
285295
batcher.add(SimpleBatch.fromInteger(7));
286296
try {
287297
batcher.add(SimpleBatch.fromInteger(9));
288-
Truth.assertWithMessage("Failing: expected exception").that(false).isTrue();
298+
Assert.fail("expected exception");
289299
} catch (FlowControlException e) {
290300
}
291301
batcher.pushCurrentBatch().get();
292-
Truth.assertThat(receiver.getBatches().size()).isEqualTo(1);
302+
assertThat(receiver.getBatches()).hasSize(1);
293303
batcher.add(SimpleBatch.fromInteger(11));
294304
batcher.add(SimpleBatch.fromInteger(13));
295305
batcher.pushCurrentBatch().get();
@@ -299,11 +309,52 @@ public void testBatchingFlowControlExceptionRecovery() throws Exception {
299309
for (SimpleBatch batch : receiver.getBatches()) {
300310
actual.add(batch.getIntegers());
301311
}
302-
Truth.assertThat(actual).isEqualTo(expected);
312+
assertThat(actual).isEqualTo(expected);
313+
314+
assertThat(trackedFlowController.getElementsReserved())
315+
.isEqualTo(trackedFlowController.getElementsReleased());
316+
assertThat(trackedFlowController.getBytesReserved())
317+
.isEqualTo(trackedFlowController.getBytesReleased());
318+
}
319+
320+
@Test
321+
public void testBatchingFailedRPC() throws Exception {
322+
Exception ex = new IllegalStateException("does nothing, unsuccessfully");
323+
AccumulatingBatchReceiver<SimpleBatch> receiver =
324+
new AccumulatingBatchReceiver<>(ApiFutures.<Void>immediateFailedFuture(ex));
325+
ThresholdBatcher<SimpleBatch> batcher =
326+
createSimpleBatcherBuidler(receiver)
327+
.setThresholds(BatchingThresholds.<SimpleBatch>create(4))
328+
.setFlowController(
329+
getTrackedIntegerBatchingFlowController(
330+
3L, null, LimitExceededBehavior.ThrowException))
331+
.build();
332+
333+
assertThat(trackedFlowController.getElementsReserved()).isEqualTo(0);
334+
assertThat(trackedFlowController.getElementsReleased()).isEqualTo(0);
335+
assertThat(trackedFlowController.getBytesReserved()).isEqualTo(0);
336+
assertThat(trackedFlowController.getBytesReleased()).isEqualTo(0);
337+
338+
batcher.add(SimpleBatch.fromInteger(3));
339+
try {
340+
batcher.pushCurrentBatch().get();
341+
Assert.fail("expected exception");
342+
} catch (Exception e) {
343+
assertThat(e).isInstanceOf(ExecutionException.class);
344+
assertThat(e).hasCauseThat().isSameAs(ex);
345+
}
346+
assertThat(receiver.getBatches()).hasSize(1);
347+
348+
List<List<Integer>> expected = Arrays.asList(Arrays.asList(3));
349+
List<List<Integer>> actual = new ArrayList<>();
350+
for (SimpleBatch batch : receiver.getBatches()) {
351+
actual.add(batch.getIntegers());
352+
}
353+
assertThat(actual).isEqualTo(expected);
303354

304-
Truth.assertThat(trackedFlowController.getElementsReserved())
355+
assertThat(trackedFlowController.getElementsReserved())
305356
.isEqualTo(trackedFlowController.getElementsReleased());
306-
Truth.assertThat(trackedFlowController.getBytesReserved())
357+
assertThat(trackedFlowController.getBytesReserved())
307358
.isEqualTo(trackedFlowController.getBytesReleased());
308359
}
309360
}

0 commit comments

Comments
 (0)