Skip to content

Commit 80c4aae

Browse files
committed
simplify tests
1 parent 2454876 commit 80c4aae

File tree

1 file changed

+29
-30
lines changed

1 file changed

+29
-30
lines changed

sdks/java/core/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,16 @@ public synchronized void releaseLock() {
167167
notifyAll();
168168
}
169169

170-
public synchronized boolean isBlocked() {
171-
return isBlocked;
170+
public void waitUntilBlocking() throws InterruptedException {
171+
while (!isBlocked) {
172+
Thread.sleep(1);
173+
}
174+
}
175+
176+
public void waitUntilUnblocking() throws InterruptedException {
177+
while (isBlocked) {
178+
Thread.sleep(1);
179+
}
172180
}
173181
}
174182

@@ -179,46 +187,37 @@ public void testClaimObserversMaintainBacklogInterfaces() {
179187
assertThat(hasSize, instanceOf(HasProgress.class));
180188
}
181189

182-
private void testBlocking(String testCase) throws InterruptedException {
183-
RestrictionTrackerWithProgress withProgress;
184-
if ("tryClaim".equals(testCase)) {
185-
withProgress = new RestrictionTrackerWithProgress(true, false);
186-
} else if ("trySplit".equals(testCase)) {
187-
withProgress = new RestrictionTrackerWithProgress(false, true);
188-
} else {
189-
throw new IllegalArgumentException("unknown test case " + testCase);
190-
}
190+
@Test
191+
public void testClaimObserversProgressNonBlockingOnTryClaim() throws InterruptedException {
192+
RestrictionTrackerWithProgress withProgress = new RestrictionTrackerWithProgress(true, false);
191193
RestrictionTracker<Object, Object> tracker =
192194
RestrictionTrackers.observe(withProgress, new RestrictionTrackers.NoopClaimObserver<>());
193-
Runnable runnable;
194-
if ("tryClaim".equals(testCase)) {
195-
runnable = () -> tracker.tryClaim(new Object());
196-
} else {
197-
runnable = () -> tracker.trySplit(0.5);
198-
}
199-
Thread blocking = new Thread(runnable);
195+
Thread blocking = new Thread(() -> tracker.tryClaim(new Object()));
200196
blocking.start();
201-
while (!withProgress.isBlocked()) {
202-
Thread.sleep(1);
203-
}
197+
withProgress.waitUntilBlocking();
204198
RestrictionTracker.Progress progress =
205199
((RestrictionTrackers.RestrictionTrackerObserverWithProgress) tracker).getProgress(1);
206200
assertEquals(RestrictionTracker.Progress.NONE, progress);
207201
withProgress.releaseLock();
208-
while (withProgress.isBlocked()) {
209-
Thread.sleep(1);
210-
}
202+
withProgress.waitUntilUnblocking();
211203
progress = ((HasProgress) tracker).getProgress();
212204
assertEquals(RestrictionTrackerWithProgress.REPORT_PROGRESS, progress);
213205
}
214206

215-
@Test
216-
public void testClaimObserversProgressNonBlockingOnTryClaim() throws InterruptedException {
217-
testBlocking("tryClaim");
218-
}
219-
220207
@Test
221208
public void testClaimObserversProgressNonBlockingOnTrySplit() throws InterruptedException {
222-
testBlocking("trySplit");
209+
RestrictionTrackerWithProgress withProgress = new RestrictionTrackerWithProgress(false, true);
210+
RestrictionTracker<Object, Object> tracker =
211+
RestrictionTrackers.observe(withProgress, new RestrictionTrackers.NoopClaimObserver<>());
212+
Thread blocking = new Thread(() -> tracker.trySplit(0.5));
213+
blocking.start();
214+
withProgress.waitUntilBlocking();
215+
RestrictionTracker.Progress progress =
216+
((RestrictionTrackers.RestrictionTrackerObserverWithProgress) tracker).getProgress(1);
217+
assertEquals(RestrictionTracker.Progress.NONE, progress);
218+
withProgress.releaseLock();
219+
withProgress.waitUntilUnblocking();
220+
progress = ((HasProgress) tracker).getProgress();
221+
assertEquals(RestrictionTrackerWithProgress.REPORT_PROGRESS, progress);
223222
}
224223
}

0 commit comments

Comments
 (0)