Skip to content

Commit 474d651

Browse files
authored
Change FakeClock.sleep() to be safely re-entrant in attempt to fix StreamingDataflowWorkerTest flakes. (#36754)
Also fix expectation parameter ordering
1 parent 9516397 commit 474d651

File tree

1 file changed

+13
-9
lines changed

1 file changed

+13
-9
lines changed

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3623,8 +3623,8 @@ public void testLatencyAttributionToQueuedState() throws Exception {
36233623
worker.stop();
36243624

36253625
assertEquals(
3626-
awrSink.getLatencyAttributionDuration(workToken, State.QUEUED), Duration.millis(1000));
3627-
assertEquals(awrSink.getLatencyAttributionDuration(workToken + 1, State.QUEUED), Duration.ZERO);
3626+
Duration.millis(1000), awrSink.getLatencyAttributionDuration(workToken, State.QUEUED));
3627+
assertEquals(Duration.ZERO, awrSink.getLatencyAttributionDuration(workToken + 1, State.QUEUED));
36283628
}
36293629

36303630
@Test
@@ -3657,7 +3657,7 @@ public void testLatencyAttributionToActiveState() throws Exception {
36573657
worker.stop();
36583658

36593659
assertEquals(
3660-
awrSink.getLatencyAttributionDuration(workToken, State.ACTIVE), Duration.millis(1000));
3660+
Duration.millis(1000), awrSink.getLatencyAttributionDuration(workToken, State.ACTIVE));
36613661
}
36623662

36633663
@Test
@@ -3695,7 +3695,7 @@ public void testLatencyAttributionToReadingState() throws Exception {
36953695
worker.stop();
36963696

36973697
assertEquals(
3698-
awrSink.getLatencyAttributionDuration(workToken, State.READING), Duration.millis(1000));
3698+
Duration.millis(1000), awrSink.getLatencyAttributionDuration(workToken, State.READING));
36993699
}
37003700

37013701
@Test
@@ -3735,7 +3735,7 @@ public void testLatencyAttributionToCommittingState() throws Exception {
37353735
worker.stop();
37363736

37373737
assertEquals(
3738-
awrSink.getLatencyAttributionDuration(workToken, State.COMMITTING), Duration.millis(1000));
3738+
Duration.millis(1000), awrSink.getLatencyAttributionDuration(workToken, State.COMMITTING));
37393739
}
37403740

37413741
@Test
@@ -3784,11 +3784,11 @@ public void testLatencyAttributionPopulatedInCommitRequest() throws Exception {
37843784
// Initial fake latency provided to FakeWindmillServer when invoke receiveWork in
37853785
// GetWorkStream().
37863786
assertEquals(
3787-
workItemCommitRequest.get((long) workToken).getPerWorkItemLatencyAttributions(1),
37883787
LatencyAttribution.newBuilder()
37893788
.setState(State.GET_WORK_IN_TRANSIT_TO_USER_WORKER)
37903789
.setTotalDurationMillis(1000)
3791-
.build());
3790+
.build(),
3791+
workItemCommitRequest.get((long) workToken).getPerWorkItemLatencyAttributions(1));
37923792
}
37933793
}
37943794

@@ -4475,7 +4475,7 @@ public synchronized void sleep(Duration duration) {
44754475
if (duration.isShorterThan(Duration.ZERO)) {
44764476
throw new UnsupportedOperationException("Cannot sleep backwards in time");
44774477
}
4478-
Instant endOfSleep = now.plus(duration);
4478+
final Instant endOfSleep = now.plus(duration);
44794479
while (true) {
44804480
Job job = jobs.peek();
44814481
if (job == null || job.when.isAfter(endOfSleep)) {
@@ -4485,7 +4485,11 @@ public synchronized void sleep(Duration duration) {
44854485
now = job.when;
44864486
job.work.run();
44874487
}
4488-
now = endOfSleep;
4488+
// Handle possibly re-entrant sleep. The contained sleep may advance now
4489+
// past endOfSleep.
4490+
if (endOfSleep.isAfter(now)) {
4491+
now = endOfSleep;
4492+
}
44894493
}
44904494

44914495
private synchronized void schedule(Duration fromNow, Runnable work) {

0 commit comments

Comments
 (0)