|
40 | 40 | import io.temporal.internal.replay.ReplayWorkflowFactory;
|
41 | 41 | import io.temporal.internal.replay.ReplayWorkflowTaskHandler;
|
42 | 42 | import io.temporal.serviceclient.WorkflowServiceStubs;
|
| 43 | +import io.temporal.testUtils.Eventually; |
43 | 44 | import io.temporal.testUtils.HistoryUtils;
|
44 | 45 | import io.temporal.worker.MetricsType;
|
45 | 46 | import io.temporal.worker.tuning.FixedSizeSlotSupplier;
|
@@ -192,27 +193,32 @@ public void concurrentPollRequestLockTest() throws Exception {
|
192 | 193 | pollTaskQueueLatch.await();
|
193 | 194 | // Wait until the worker handles at least one WFT
|
194 | 195 | handleTaskLatch.await();
|
195 |
| - // Sleep to allow metrics to be published |
196 |
| - Thread.sleep(100); |
197 |
| - // Since all polls have the same runID only one should get through, the other two should be |
198 |
| - // blocked |
199 |
| - assertEquals(runLockManager.totalLocks(), 1); |
200 | 196 | // Verify 3 slots have been used
|
201 |
| - reporter.assertGauge( |
202 |
| - MetricsType.WORKER_TASK_SLOTS_AVAILABLE, |
203 |
| - ImmutableMap.of("worker_type", "WorkflowWorker"), |
204 |
| - 97.0); |
| 197 | + Eventually.assertEventually( |
| 198 | + Duration.ofSeconds(10), |
| 199 | + () -> { |
| 200 | + // Since all polls have the same runID only one should get through, the other two should |
| 201 | + // be |
| 202 | + // blocked |
| 203 | + assertEquals(runLockManager.totalLocks(), 1); |
| 204 | + reporter.assertGauge( |
| 205 | + MetricsType.WORKER_TASK_SLOTS_AVAILABLE, |
| 206 | + ImmutableMap.of("worker_type", "WorkflowWorker"), |
| 207 | + 97.0); |
| 208 | + }); |
205 | 209 | // Wait for the worker to respond, by this time the other blocked tasks should have timed out
|
206 | 210 | respondTaskLatch.await();
|
207 |
| - // Sleep to allow metrics to be published |
208 |
| - Thread.sleep(100); |
209 |
| - // No task should have the lock anymore |
210 |
| - assertEquals(runLockManager.totalLocks(), 0); |
211 | 211 | // All slots should be available
|
212 |
| - reporter.assertGauge( |
213 |
| - MetricsType.WORKER_TASK_SLOTS_AVAILABLE, |
214 |
| - ImmutableMap.of("worker_type", "WorkflowWorker"), |
215 |
| - 100.0); |
| 212 | + Eventually.assertEventually( |
| 213 | + Duration.ofSeconds(10), |
| 214 | + () -> { |
| 215 | + // No task should have the lock anymore |
| 216 | + assertEquals(runLockManager.totalLocks(), 0); |
| 217 | + reporter.assertGauge( |
| 218 | + MetricsType.WORKER_TASK_SLOTS_AVAILABLE, |
| 219 | + ImmutableMap.of("worker_type", "WorkflowWorker"), |
| 220 | + 100.0); |
| 221 | + }); |
216 | 222 | // Cleanup
|
217 | 223 | worker.shutdown(new ShutdownManager(), false).get();
|
218 | 224 | // Verify we only handled two tasks
|
|
0 commit comments