Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -75,6 +74,17 @@ public class StreamingEngineWorkCommitterTest {
private FakeWindmillServer fakeWindmillServer;
private Supplier<CloseableStream<CommitWorkStream>> commitWorkStreamFactory;

private static void waitForExpectedSetSize(Set<?> s, int expectedSize) {
while (s.size() < expectedSize) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
assertThat(s).hasSize(expectedSize);
}

private static Work createMockWork(long workToken) {
WorkItem workItem =
WorkItem.newBuilder()
Expand Down Expand Up @@ -135,7 +145,7 @@ private WorkCommitter createWorkCommitter(Consumer<CompleteCommit> onCommitCompl

@Test
public void testCommit_sendsCommitsToStreamingEngine() {
Set<CompleteCommit> completeCommits = new HashSet<>();
Set<CompleteCommit> completeCommits = Collections.newSetFromMap(new ConcurrentHashMap<>());
workCommitter = createWorkCommitter(completeCommits::add);
List<Commit> commits = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
Expand All @@ -155,6 +165,7 @@ public void testCommit_sendsCommitsToStreamingEngine() {

Map<Long, WorkItemCommitRequest> committed =
fakeWindmillServer.waitForAndGetCommits(commits.size());
waitForExpectedSetSize(completeCommits, 5);

for (Commit commit : commits) {
WorkItemCommitRequest request = committed.get(commit.work().getWorkItem().getWorkToken());
Expand All @@ -168,7 +179,7 @@ public void testCommit_sendsCommitsToStreamingEngine() {

@Test
public void testCommit_handlesFailedCommits() {
Set<CompleteCommit> completeCommits = new HashSet<>();
Set<CompleteCommit> completeCommits = Collections.newSetFromMap(new ConcurrentHashMap<>());
workCommitter = createWorkCommitter(completeCommits::add);
List<Commit> commits = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
Expand All @@ -192,6 +203,7 @@ public void testCommit_handlesFailedCommits() {

Map<Long, WorkItemCommitRequest> committed =
fakeWindmillServer.waitForAndGetCommits(commits.size() / 2);
waitForExpectedSetSize(completeCommits, 10);

for (Commit commit : commits) {
if (commit.work().isFailed()) {
Expand All @@ -210,7 +222,7 @@ public void testCommit_handlesFailedCommits() {

@Test
public void testCommit_handlesCompleteCommits_commitStatusNotOK() {
Set<CompleteCommit> completeCommits = new HashSet<>();
Set<CompleteCommit> completeCommits = Collections.newSetFromMap(new ConcurrentHashMap<>());
workCommitter = createWorkCommitter(completeCommits::add);
Map<WorkId, Windmill.CommitStatus> expectedCommitStatus = new HashMap<>();
Random commitStatusSelector = new Random();
Expand Down Expand Up @@ -249,6 +261,7 @@ public void testCommit_handlesCompleteCommits_commitStatusNotOK() {

Map<Long, WorkItemCommitRequest> committed =
fakeWindmillServer.waitForAndGetCommits(commits.size());
waitForExpectedSetSize(completeCommits, commits.size());

for (Commit commit : commits) {
WorkItemCommitRequest request = committed.get(commit.work().getWorkItem().getWorkToken());
Expand All @@ -257,7 +270,6 @@ public void testCommit_handlesCompleteCommits_commitStatusNotOK() {
assertThat(completeCommits)
.contains(asCompleteCommit(commit, expectedCommitStatus.get(commit.work().id())));
}
assertThat(completeCommits.size()).isEqualTo(commits.size());

workCommitter.stop();
}
Expand Down Expand Up @@ -314,7 +326,7 @@ public void shutdown() {}
WindmillStreamPool.create(1, Duration.standardMinutes(1), fakeCommitWorkStream)
::getCloseableStream;

Set<CompleteCommit> completeCommits = new HashSet<>();
Set<CompleteCommit> completeCommits = Collections.newSetFromMap(new ConcurrentHashMap<>());
workCommitter = createWorkCommitter(completeCommits::add);

List<Commit> commits = new ArrayList<>();
Expand Down Expand Up @@ -376,6 +388,7 @@ public void testMultipleCommitSendersSingleStream() {
commits.parallelStream().forEach(workCommitter::commit);
Map<Long, WorkItemCommitRequest> committed =
fakeWindmillServer.waitForAndGetCommits(commits.size());
waitForExpectedSetSize(completeCommits, commits.size());

for (Commit commit : commits) {
WorkItemCommitRequest request = committed.get(commit.work().getWorkItem().getWorkToken());
Expand Down
Loading