Skip to content

Commit d5c285e

Browse files
committed
Fix flaky test in TestMemoryRevokingScheduler
1 parent 34e068c commit d5c285e

File tree

2 files changed

+15
-0
lines changed

2 files changed

+15
-0
lines changed

presto-main/src/main/java/com/facebook/presto/execution/MemoryRevokingScheduler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,12 @@ void awaitAsynchronousCallbacksRun()
151151
memoryRevocationExecutor.invokeAll(singletonList((Callable<?>) () -> null));
152152
}
153153

154+
@VisibleForTesting
155+
void submitAsynchronousCallable(Callable<?> callable)
156+
{
157+
memoryRevocationExecutor.submit(callable);
158+
}
159+
154160
private void onMemoryReserved(MemoryPool memoryPool, QueryId queryId, long queryMemoryReservation)
155161
{
156162
try {

presto-tests/src/test/java/com/facebook/presto/execution/TestMemoryRevokingScheduler.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import java.util.Optional;
5757
import java.util.Set;
5858
import java.util.concurrent.Callable;
59+
import java.util.concurrent.CompletableFuture;
5960
import java.util.concurrent.Executor;
6061
import java.util.concurrent.ExecutorService;
6162
import java.util.concurrent.ScheduledExecutorService;
@@ -344,12 +345,20 @@ public void testTaskRevokingOrderForRevocableBytes()
344345
try {
345346
scheduler.start();
346347

348+
// Waiting for all existing tasks in scheduler's memoryRevocationExecutor to complete
347349
scheduler.awaitAsynchronousCallbacksRun();
348350
assertMemoryRevokingNotRequested();
349351

352+
CompletableFuture<Void> future = new CompletableFuture<>();
353+
// Submit a task that will only be completed after the following two memory reserving actions have occurred.
354+
// It can make sure that no asynchronous memory revoking task occurs between the two memory reserving actions,
355+
// since `memoryRevocationExecutor` of the scheduler where all these tasks run is a single threaded pool
356+
scheduler.submitAsynchronousCallable(() -> future.get());
350357
operatorContext1.localRevocableMemoryContext().setBytes(11);
351358
operatorContext2.localRevocableMemoryContext().setBytes(12);
359+
future.complete(null);
352360

361+
// Waiting for all existing tasks in scheduler's memoryRevocationExecutor to complete
353362
scheduler.awaitAsynchronousCallbacksRun();
354363
assertMemoryRevokingRequestedFor(operatorContext1, operatorContext2);
355364
assertEquals(TestOperatorContext.firstOperator, "operator2"); // operator2 should revoke first since it (and it's encompassing task) has allocated more bytes

0 commit comments

Comments
 (0)