diff --git a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java index 1b5a89821b24..f9649fd15f35 100644 --- a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java +++ b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java @@ -22,6 +22,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -115,49 +117,52 @@ public void cancelNotRunningExecutionReturnsError() { @Test public void cancelAllExecutionsWithRunningExecutionsReturnsCanceledExecutions() { - CountDownLatch latch = new CountDownLatch(2); + int executions = 2; + CountDownLatch latch = new CountDownLatch(executions); + ExecutorService executorService = Executors.newFixedThreadPool(executions); Callable firstExecution = () -> { latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS); return null; }; - CompletableFuture - .supplyAsync(() -> { - try { - return service.execute(firstExecution, "myRegion", "mySender1"); - } catch (Exception e) { - return null; - } - }); + executorService.submit(() -> { + try { + return service.execute(firstExecution, "myRegion", "mySender1"); + } catch (Exception e) { + return null; + } + }); Callable secondExecution = () -> { latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS); return null; }; - CompletableFuture - .supplyAsync(() -> { - try { - return service.execute(secondExecution, "myRegion", "mySender"); - } catch (Exception e) { - return null; - } - }); + executorService.submit(() -> { + try { + return service.execute(secondExecution, "myRegion", "mySender"); + } catch (Exception e) { + return null; + } + }); // Wait for the functions to start execution - await().untilAsserted(() -> assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(2)); + await().untilAsserted( + () -> assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(executions)); // Cancel the function execution String executionsString = service.cancelAll(); assertThat(executionsString).isEqualTo("[(myRegion,mySender1), (myRegion,mySender)]"); await().untilAsserted(() -> assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(0)); + executorService.shutdown(); } @Test public void severalExecuteWithDifferentRegionOrSenderAreAllowed() { int executions = 5; CountDownLatch latch = new CountDownLatch(executions); + ExecutorService executorService = Executors.newFixedThreadPool(executions); for (int i = 0; i < executions; i++) { Callable execution = () -> { latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS); @@ -165,14 +170,13 @@ public void severalExecuteWithDifferentRegionOrSenderAreAllowed() { }; final String regionName = String.valueOf(i); - CompletableFuture - .supplyAsync(() -> { - try { - return service.execute(execution, regionName, "mySender1"); - } catch (Exception e) { - return null; - } - }); + executorService.submit(() -> { + try { + return service.execute(execution, regionName, "mySender1"); + } catch (Exception e) { + return null; + } + }); } // Wait for the functions to start execution @@ -183,6 +187,7 @@ public void severalExecuteWithDifferentRegionOrSenderAreAllowed() { for (int i = 0; i < executions; i++) { latch.countDown(); } + executorService.shutdown(); } @Test @@ -193,6 +198,7 @@ public void concurrentExecutionsDoesNotExceedMaxConcurrentExecutions() { int executions = 4; CountDownLatch latch = new CountDownLatch(executions); AtomicInteger concurrentExecutions = new AtomicInteger(0); + ExecutorService executorService = Executors.newFixedThreadPool(executions); for (int i = 0; i < executions; i++) { Callable execution = () -> { concurrentExecutions.incrementAndGet(); @@ -202,14 +208,13 @@ public void concurrentExecutionsDoesNotExceedMaxConcurrentExecutions() { }; final String regionName = String.valueOf(i); - CompletableFuture - .supplyAsync(() -> { - try { - return service.execute(execution, regionName, "mySender1"); - } catch (Exception e) { - return null; - } - }); + executorService.submit(() -> { + try { + return service.execute(execution, regionName, "mySender1"); + } catch (Exception e) { + return null; + } + }); } // Wait for the functions to start execution @@ -225,6 +230,7 @@ public void concurrentExecutionsDoesNotExceedMaxConcurrentExecutions() { } await().untilAsserted(() -> assertThat(concurrentExecutions.get()).isEqualTo(0)); + executorService.shutdown(); } }