Skip to content

Commit 11a7ca6

Browse files
Fix PollUtils.poll to use wall-clock time for timeout instead of accumulating sleep durations (#1000)
* Fix PollUtils.poll to use wall-clock time for timeout instead of accumulating sleep durations * Added TCs
1 parent fa517b3 commit 11a7ca6

File tree

2 files changed

+175
-6
lines changed

2 files changed

+175
-6
lines changed

datastream-utils/src/main/java/com/linkedin/datastream/common/PollUtils.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public static boolean poll(BooleanSupplier cond, long periodMs, long timeoutMs)
121121
* @return true if condition is met, false otherwise
122122
*/
123123
public static <T> boolean poll(InterruptablePredicate<T> cond, long periodMs, long timeoutMs, T arg) {
124-
long elapsedMs = 0;
124+
long startMs = System.currentTimeMillis();
125125
if (timeoutMs > 0 && periodMs > timeoutMs) {
126126
return false;
127127
}
@@ -134,8 +134,7 @@ public static <T> boolean poll(InterruptablePredicate<T> cond, long periodMs, lo
134134
} catch (InterruptedException e) {
135135
break;
136136
}
137-
elapsedMs += periodMs;
138-
if (timeoutMs > 0 && elapsedMs >= timeoutMs) {
137+
if (timeoutMs > 0 && System.currentTimeMillis() - startMs >= timeoutMs) {
139138
break;
140139
}
141140
}
@@ -152,7 +151,7 @@ public static <T> boolean poll(InterruptablePredicate<T> cond, long periodMs, lo
152151
*/
153152
public static <T> Optional<T> poll(InterruptableSupplier<T> interruptableSupplier, InterruptablePredicate<T> cond,
154153
long periodMs, long timeoutMs) {
155-
long elapsedMs = 0;
154+
long startMs = System.currentTimeMillis();
156155
if (periodMs > timeoutMs) {
157156
return Optional.empty();
158157
}
@@ -167,8 +166,7 @@ public static <T> Optional<T> poll(InterruptableSupplier<T> interruptableSupplie
167166
} catch (InterruptedException e) {
168167
break;
169168
}
170-
elapsedMs += periodMs;
171-
if (elapsedMs >= timeoutMs) {
169+
if (System.currentTimeMillis() - startMs >= timeoutMs) {
172170
break;
173171
}
174172
}

datastream-utils/src/test/java/com/linkedin/datastream/common/TestPollUtils.java

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
*/
66
package com.linkedin.datastream.common;
77

8+
import java.util.Optional;
9+
import java.util.concurrent.atomic.AtomicInteger;
810
import java.util.function.BooleanSupplier;
911

1012
import org.testng.Assert;
@@ -55,4 +57,173 @@ public void testpollWithPredicate() {
5557
long now2 = System.currentTimeMillis();
5658
Assert.assertTrue(now2 - now1 >= 350);
5759
}
60+
61+
/**
62+
* Validates that the predicate overload uses wall-clock time for timeout.
63+
* The predicate simulates a slow operation (50ms per call) with a short sleep period (10ms).
64+
* With the old accumulator-based approach, only 10ms would count per iteration toward the
65+
* 500ms timeout, so it would take ~3000ms wall-clock time (500/10 * 60ms per iteration).
66+
* With wall-clock timeout, it should complete within ~600ms (500ms timeout + tolerance).
67+
*/
68+
@Test
69+
public void testPredicatePollTimesOutByWallClock() {
70+
AtomicInteger invocationCount = new AtomicInteger(0);
71+
long periodMs = 10;
72+
long timeoutMs = 500;
73+
74+
long startMs = System.currentTimeMillis();
75+
boolean result = PollUtils.poll((ignored) -> {
76+
invocationCount.incrementAndGet();
77+
try {
78+
// Simulate a slow predicate (e.g., network call) that takes 50ms each time
79+
Thread.sleep(50);
80+
} catch (InterruptedException e) {
81+
Thread.currentThread().interrupt();
82+
}
83+
return false; // Never satisfy the condition
84+
}, periodMs, timeoutMs, null);
85+
long elapsedMs = System.currentTimeMillis() - startMs;
86+
87+
Assert.assertFalse(result, "Poll should return false since the condition is never met");
88+
// Wall-clock elapsed time should be close to the timeout, not inflated
89+
// With old code: elapsed would be ~invocations * (50ms + 10ms) = many seconds
90+
// With fix: elapsed should be approximately timeoutMs
91+
Assert.assertTrue(elapsedMs < timeoutMs + 200,
92+
"Wall-clock time (" + elapsedMs + "ms) should not significantly exceed timeout ("
93+
+ timeoutMs + "ms). Predicate was invoked " + invocationCount.get() + " times.");
94+
Assert.assertTrue(elapsedMs >= timeoutMs - 100,
95+
"Wall-clock time (" + elapsedMs + "ms) should be close to timeout (" + timeoutMs + "ms)");
96+
}
97+
98+
/**
99+
* Validates that the supplier overload uses wall-clock time for timeout.
100+
* The supplier simulates a slow operation (50ms per call) with a short sleep period (10ms).
101+
*/
102+
@Test
103+
public void testSupplierPollTimesOutByWallClock() {
104+
AtomicInteger invocationCount = new AtomicInteger(0);
105+
long periodMs = 10;
106+
long timeoutMs = 500;
107+
108+
long startMs = System.currentTimeMillis();
109+
Optional<String> result = PollUtils.poll(() -> {
110+
invocationCount.incrementAndGet();
111+
try {
112+
// Simulate a slow supplier (e.g., schema registry lookup) that takes 50ms
113+
Thread.sleep(50);
114+
} catch (InterruptedException e) {
115+
Thread.currentThread().interrupt();
116+
}
117+
return null; // Return null so the condition check below fails
118+
}, (val) -> val != null, periodMs, timeoutMs);
119+
long elapsedMs = System.currentTimeMillis() - startMs;
120+
121+
Assert.assertFalse(result.isPresent(), "Poll should return empty since supplier never returns non-null");
122+
Assert.assertTrue(elapsedMs < timeoutMs + 200,
123+
"Wall-clock time (" + elapsedMs + "ms) should not significantly exceed timeout ("
124+
+ timeoutMs + "ms). Supplier was invoked " + invocationCount.get() + " times.");
125+
Assert.assertTrue(elapsedMs >= timeoutMs - 100,
126+
"Wall-clock time (" + elapsedMs + "ms) should be close to timeout (" + timeoutMs + "ms)");
127+
}
128+
129+
/**
130+
* Validates that the predicate overload still returns true promptly when the condition
131+
* is met, even when the predicate takes time to execute.
132+
*/
133+
@Test
134+
public void testPredicatePollSucceedsWithSlowPredicate() {
135+
AtomicInteger invocationCount = new AtomicInteger(0);
136+
137+
long startMs = System.currentTimeMillis();
138+
boolean result = PollUtils.poll((ignored) -> {
139+
int count = invocationCount.incrementAndGet();
140+
try {
141+
Thread.sleep(30); // Each invocation takes 30ms
142+
} catch (InterruptedException e) {
143+
Thread.currentThread().interrupt();
144+
}
145+
return count >= 3; // Succeed on the 3rd invocation
146+
}, 10, 2000, null);
147+
long elapsedMs = System.currentTimeMillis() - startMs;
148+
149+
Assert.assertTrue(result, "Poll should succeed on the 3rd invocation");
150+
Assert.assertEquals(invocationCount.get(), 3);
151+
// 3 invocations * ~40ms each (30ms work + 10ms sleep) = ~120ms, well under 2s timeout
152+
Assert.assertTrue(elapsedMs < 500,
153+
"Should complete well before timeout. Elapsed: " + elapsedMs + "ms");
154+
}
155+
156+
/**
157+
* Validates that the supplier overload still returns the result promptly when
158+
* the condition is met, even when the supplier takes time to execute.
159+
*/
160+
@Test
161+
public void testSupplierPollSucceedsWithSlowSupplier() {
162+
AtomicInteger invocationCount = new AtomicInteger(0);
163+
164+
long startMs = System.currentTimeMillis();
165+
Optional<String> result = PollUtils.poll(() -> {
166+
int count = invocationCount.incrementAndGet();
167+
try {
168+
Thread.sleep(30); // Each invocation takes 30ms
169+
} catch (InterruptedException e) {
170+
Thread.currentThread().interrupt();
171+
}
172+
return count >= 3 ? "success" : null;
173+
}, (val) -> val != null, 10, 2000);
174+
long elapsedMs = System.currentTimeMillis() - startMs;
175+
176+
Assert.assertTrue(result.isPresent(), "Poll should return a value on the 3rd invocation");
177+
Assert.assertEquals(result.get(), "success");
178+
Assert.assertEquals(invocationCount.get(), 3);
179+
Assert.assertTrue(elapsedMs < 500,
180+
"Should complete well before timeout. Elapsed: " + elapsedMs + "ms");
181+
}
182+
183+
/**
184+
* Validates that the BooleanSupplier overload (which delegates to the predicate overload)
185+
* also respects wall-clock timeout when the supplier is slow.
186+
*/
187+
@Test
188+
public void testBooleanSupplierPollTimesOutByWallClock() {
189+
AtomicInteger invocationCount = new AtomicInteger(0);
190+
long periodMs = 10;
191+
long timeoutMs = 500;
192+
193+
long startMs = System.currentTimeMillis();
194+
boolean result = PollUtils.poll(() -> {
195+
invocationCount.incrementAndGet();
196+
try {
197+
Thread.sleep(50);
198+
} catch (InterruptedException e) {
199+
Thread.currentThread().interrupt();
200+
}
201+
return false;
202+
}, periodMs, timeoutMs);
203+
long elapsedMs = System.currentTimeMillis() - startMs;
204+
205+
Assert.assertFalse(result);
206+
Assert.assertTrue(elapsedMs < timeoutMs + 200,
207+
"Wall-clock time (" + elapsedMs + "ms) should not significantly exceed timeout ("
208+
+ timeoutMs + "ms). Supplier was invoked " + invocationCount.get() + " times.");
209+
}
210+
211+
/**
212+
* Validates that periodMs > timeoutMs still returns false/empty immediately
213+
* for both overloads (edge case preserved by the fix).
214+
*/
215+
@Test
216+
public void testPeriodExceedsTimeoutReturnsImmediately() {
217+
long startMs = System.currentTimeMillis();
218+
219+
// Predicate overload with positive timeout
220+
Assert.assertFalse(PollUtils.poll((ignored) -> true, 200, 100, null));
221+
222+
// Supplier overload
223+
Optional<String> result = PollUtils.poll(() -> "value", (val) -> true, 200, 100);
224+
Assert.assertFalse(result.isPresent());
225+
226+
long elapsedMs = System.currentTimeMillis() - startMs;
227+
Assert.assertTrue(elapsedMs < 50, "Should return immediately when periodMs > timeoutMs");
228+
}
58229
}

0 commit comments

Comments
 (0)