|
56 | 56 | import java.util.Optional; |
57 | 57 | import java.util.Set; |
58 | 58 | import java.util.concurrent.Callable; |
| 59 | +import java.util.concurrent.CompletableFuture; |
59 | 60 | import java.util.concurrent.Executor; |
60 | 61 | import java.util.concurrent.ExecutorService; |
61 | 62 | import java.util.concurrent.ScheduledExecutorService; |
@@ -344,12 +345,20 @@ public void testTaskRevokingOrderForRevocableBytes() |
344 | 345 | try { |
345 | 346 | scheduler.start(); |
346 | 347 |
|
| 348 | + // Waiting for all existing tasks in scheduler's memoryRevocationExecutor to complete |
347 | 349 | scheduler.awaitAsynchronousCallbacksRun(); |
348 | 350 | assertMemoryRevokingNotRequested(); |
349 | 351 |
|
| 352 | + CompletableFuture<Void> future = new CompletableFuture<>(); |
| 353 | + // Submit a task that will only be completed after the following two memory reserving actions have occurred. |
| 354 | + // It can make sure that no asynchronous memory revoking task occurs between the two memory reserving actions, |
| 355 | + // since `memoryRevocationExecutor` of the scheduler where all these tasks run is a single threaded pool |
| 356 | + scheduler.submitAsynchronousCallable(() -> future.get()); |
350 | 357 | operatorContext1.localRevocableMemoryContext().setBytes(11); |
351 | 358 | operatorContext2.localRevocableMemoryContext().setBytes(12); |
| 359 | + future.complete(null); |
352 | 360 |
|
| 361 | + // Waiting for all existing tasks in scheduler's memoryRevocationExecutor to complete |
353 | 362 | scheduler.awaitAsynchronousCallbacksRun(); |
354 | 363 | assertMemoryRevokingRequestedFor(operatorContext1, operatorContext2); |
355 | 364 | assertEquals(TestOperatorContext.firstOperator, "operator2"); // operator2 should revoke first since it (and it's encompassing task) has allocated more bytes |
|
0 commit comments