Skip to content

Commit 7247c32

Browse files
test: deflake flow control test (#2259)
Change-Id: I222078817739f8190faefffe405bd01af9c96df9 Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) - [ ] Rollback plan is reviewed and LGTMed - [ ] All new data plane features have a completed end to end testing plan Fixes #<issue_number_goes_here> ☕️ If you write sample code, please follow the [samples format]( https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
1 parent eea4eb0 commit 7247c32

File tree

1 file changed

+13
-11
lines changed

1 file changed

+13
-11
lines changed

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/DynamicFlowControlCallableTest.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@
3636
import java.util.HashMap;
3737
import java.util.List;
3838
import java.util.Map;
39+
import java.util.concurrent.CompletableFuture;
3940
import java.util.concurrent.Future;
4041
import java.util.concurrent.TimeUnit;
41-
import java.util.concurrent.atomic.AtomicBoolean;
4242
import org.junit.After;
4343
import org.junit.Before;
4444
import org.junit.Rule;
@@ -264,6 +264,10 @@ public ApiFuture<List<MutateRowsResponse>> futureCall(
264264
try {
265265
Thread.sleep(Integer.valueOf(latencyHeader.get(0)));
266266
} catch (InterruptedException e) {
267+
Thread.currentThread().interrupt();
268+
return ApiFutures.immediateFailedFuture(
269+
new IllegalStateException(
270+
"Interrupted while sleeping as requested: " + latencyHeader, e));
267271
}
268272
if (Integer.valueOf(latencyHeader.get(0)) == DEADLINE_EXCEEDED_LATENCY) {
269273
return ApiFutures.immediateFailedFuture(
@@ -277,32 +281,30 @@ public ApiFuture<List<MutateRowsResponse>> futureCall(
277281

278282
private void createFlowControlEvent(final FlowController flowController) throws Exception {
279283
flowController.reserve(INITIAL_ELEMENT, 0);
280-
final AtomicBoolean threadStarted = new AtomicBoolean(false);
284+
CompletableFuture<Void> threadStarted = new CompletableFuture<>();
285+
CompletableFuture<Void> threadReservedOne = new CompletableFuture<>();
281286
Thread t =
282287
new Thread(
283288
new Runnable() {
284289
@Override
285290
public void run() {
291+
threadStarted.complete(null);
286292
try {
287-
threadStarted.set(true);
288293
flowController.reserve(1, 0);
294+
threadReservedOne.complete(null);
289295
} catch (Exception e) {
296+
threadReservedOne.completeExceptionally(e);
290297
}
291298
}
292299
});
293300
t.start();
294-
// Wait 5 seconds for the thread to start, and 50 milliseconds after it's started to make sure
301+
// Wait 50 milliseconds after the thread has started to make sure
295302
// flowController.reserve(1, 0) is blocked and creates a throttling event. It should never take
296303
// so long.
297-
for (int i = 0; i < 1000; i++) {
298-
if (threadStarted.get()) {
299-
break;
300-
}
301-
Thread.sleep(5);
302-
}
304+
threadStarted.get();
303305
Thread.sleep(50);
304306
flowController.release(INITIAL_ELEMENT, 0);
305-
t.join();
307+
threadReservedOne.get();
306308
flowController.release(1, 0);
307309

308310
assertThat(flowController.getFlowControlEventStats().getLastFlowControlEvent()).isNotNull();

0 commit comments

Comments
 (0)