Skip to content

Commit d1fce60

Browse files
authored
Fix flake in StreamingEngineWorkCommitterTest by waiting for responses (#35506)
1 parent 13e5921 commit d1fce60

File tree

1 file changed

+19
-6
lines changed

1 file changed

+19
-6
lines changed

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitterTest.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.ArrayList;
2929
import java.util.Collections;
3030
import java.util.HashMap;
31-
import java.util.HashSet;
3231
import java.util.List;
3332
import java.util.Map;
3433
import java.util.Optional;
@@ -75,6 +74,17 @@ public class StreamingEngineWorkCommitterTest {
7574
private FakeWindmillServer fakeWindmillServer;
7675
private Supplier<CloseableStream<CommitWorkStream>> commitWorkStreamFactory;
7776

77+
private static void waitForExpectedSetSize(Set<?> s, int expectedSize) {
78+
while (s.size() < expectedSize) {
79+
try {
80+
Thread.sleep(10);
81+
} catch (InterruptedException e) {
82+
throw new RuntimeException(e);
83+
}
84+
}
85+
assertThat(s).hasSize(expectedSize);
86+
}
87+
7888
private static Work createMockWork(long workToken) {
7989
WorkItem workItem =
8090
WorkItem.newBuilder()
@@ -135,7 +145,7 @@ private WorkCommitter createWorkCommitter(Consumer<CompleteCommit> onCommitCompl
135145

136146
@Test
137147
public void testCommit_sendsCommitsToStreamingEngine() {
138-
Set<CompleteCommit> completeCommits = new HashSet<>();
148+
Set<CompleteCommit> completeCommits = Collections.newSetFromMap(new ConcurrentHashMap<>());
139149
workCommitter = createWorkCommitter(completeCommits::add);
140150
List<Commit> commits = new ArrayList<>();
141151
for (int i = 1; i <= 5; i++) {
@@ -155,6 +165,7 @@ public void testCommit_sendsCommitsToStreamingEngine() {
155165

156166
Map<Long, WorkItemCommitRequest> committed =
157167
fakeWindmillServer.waitForAndGetCommits(commits.size());
168+
waitForExpectedSetSize(completeCommits, 5);
158169

159170
for (Commit commit : commits) {
160171
WorkItemCommitRequest request = committed.get(commit.work().getWorkItem().getWorkToken());
@@ -168,7 +179,7 @@ public void testCommit_sendsCommitsToStreamingEngine() {
168179

169180
@Test
170181
public void testCommit_handlesFailedCommits() {
171-
Set<CompleteCommit> completeCommits = new HashSet<>();
182+
Set<CompleteCommit> completeCommits = Collections.newSetFromMap(new ConcurrentHashMap<>());
172183
workCommitter = createWorkCommitter(completeCommits::add);
173184
List<Commit> commits = new ArrayList<>();
174185
for (int i = 1; i <= 10; i++) {
@@ -192,6 +203,7 @@ public void testCommit_handlesFailedCommits() {
192203

193204
Map<Long, WorkItemCommitRequest> committed =
194205
fakeWindmillServer.waitForAndGetCommits(commits.size() / 2);
206+
waitForExpectedSetSize(completeCommits, 10);
195207

196208
for (Commit commit : commits) {
197209
if (commit.work().isFailed()) {
@@ -210,7 +222,7 @@ public void testCommit_handlesFailedCommits() {
210222

211223
@Test
212224
public void testCommit_handlesCompleteCommits_commitStatusNotOK() {
213-
Set<CompleteCommit> completeCommits = new HashSet<>();
225+
Set<CompleteCommit> completeCommits = Collections.newSetFromMap(new ConcurrentHashMap<>());
214226
workCommitter = createWorkCommitter(completeCommits::add);
215227
Map<WorkId, Windmill.CommitStatus> expectedCommitStatus = new HashMap<>();
216228
Random commitStatusSelector = new Random();
@@ -249,6 +261,7 @@ public void testCommit_handlesCompleteCommits_commitStatusNotOK() {
249261

250262
Map<Long, WorkItemCommitRequest> committed =
251263
fakeWindmillServer.waitForAndGetCommits(commits.size());
264+
waitForExpectedSetSize(completeCommits, commits.size());
252265

253266
for (Commit commit : commits) {
254267
WorkItemCommitRequest request = committed.get(commit.work().getWorkItem().getWorkToken());
@@ -257,7 +270,6 @@ public void testCommit_handlesCompleteCommits_commitStatusNotOK() {
257270
assertThat(completeCommits)
258271
.contains(asCompleteCommit(commit, expectedCommitStatus.get(commit.work().id())));
259272
}
260-
assertThat(completeCommits.size()).isEqualTo(commits.size());
261273

262274
workCommitter.stop();
263275
}
@@ -314,7 +326,7 @@ public void shutdown() {}
314326
WindmillStreamPool.create(1, Duration.standardMinutes(1), fakeCommitWorkStream)
315327
::getCloseableStream;
316328

317-
Set<CompleteCommit> completeCommits = new HashSet<>();
329+
Set<CompleteCommit> completeCommits = Collections.newSetFromMap(new ConcurrentHashMap<>());
318330
workCommitter = createWorkCommitter(completeCommits::add);
319331

320332
List<Commit> commits = new ArrayList<>();
@@ -376,6 +388,7 @@ public void testMultipleCommitSendersSingleStream() {
376388
commits.parallelStream().forEach(workCommitter::commit);
377389
Map<Long, WorkItemCommitRequest> committed =
378390
fakeWindmillServer.waitForAndGetCommits(commits.size());
391+
waitForExpectedSetSize(completeCommits, commits.size());
379392

380393
for (Commit commit : commits) {
381394
WorkItemCommitRequest request = committed.get(commit.work().getWorkItem().getWorkToken());

0 commit comments

Comments
 (0)