diff --git a/exist-core/src/test/java/org/exist/storage/BrokerPoolTest.java b/exist-core/src/test/java/org/exist/storage/BrokerPoolTest.java index 42fda2d6e8b..5ae9d4f7118 100644 --- a/exist-core/src/test/java/org/exist/storage/BrokerPoolTest.java +++ b/exist-core/src/test/java/org/exist/storage/BrokerPoolTest.java @@ -30,12 +30,14 @@ import org.junit.Test; import org.xmldb.api.base.XMLDBException; +import java.util.List; +import java.util.ArrayList; import java.util.Optional; import java.util.concurrent.*; +import java.util.function.Consumer; import static junit.framework.TestCase.assertTrue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; /** * @author Adam Retter @@ -50,7 +52,7 @@ public void noPrivilegeEscalationThroughBrokerRelease() throws EXistException { //take a broker with the guest user final BrokerPool pool = existEmbeddedServer.getBrokerPool(); final Subject guestUser = pool.getSecurityManager().getGuestSubject(); - try(final DBBroker broker1 = pool.get(Optional.of(guestUser))) { + try (final DBBroker broker1 = pool.get(Optional.of(guestUser))) { assertEquals("Expected `guest` user, but was: " + broker1.getCurrentSubject().getName(), guestUser.getId(), broker1.getCurrentSubject().getId()); @@ -70,7 +72,7 @@ public void privilegeStableWhenSubjectNull() throws EXistException { //take a broker with the SYSTEM user final BrokerPool pool = existEmbeddedServer.getBrokerPool(); final Subject sysUser = pool.getSecurityManager().getSystemSubject(); - try(final DBBroker broker1 = pool.get(Optional.of(sysUser))) { + try (final DBBroker broker1 = pool.get(Optional.of(sysUser))) { assertEquals("Expected `SYSTEM` user, but was: " + broker1.getCurrentSubject().getName(), sysUser.getId(), broker1.getCurrentSubject().getId()); @@ -88,7 +90,7 @@ public void privilegeStableWhenSubjectNull() throws EXistException { public void guestDefaultPriviledge() throws EXistException { //take a broker with default perms final BrokerPool pool = existEmbeddedServer.getBrokerPool(); - try(final DBBroker broker1 = pool.getBroker()) { + try (final DBBroker broker1 = pool.getBroker()) { final Subject guestUser = pool.getSecurityManager().getGuestSubject(); @@ -109,7 +111,7 @@ public void noPrivilegeEscalationThroughBrokerRelease_xmldb() throws EXistExcept //take a broker with the guest user final BrokerPool pool = existEmbeddedServer.getBrokerPool(); final Subject guestUser = pool.getSecurityManager().getGuestSubject(); - try(final DBBroker broker1 = pool.get(Optional.of(guestUser))) { + try (final DBBroker broker1 = pool.get(Optional.of(guestUser))) { assertEquals("Expected `guest` user, but was: " + broker1.getCurrentSubject().getName(), guestUser.getId(), broker1.getCurrentSubject().getId()); @@ -135,20 +137,17 @@ public void canReleaseWhenSaturated() throws InterruptedException, ExecutionExce // test requires at least 2 leasedBrokers to prove the issue assertTrue(maxBrokers > 1); + final ExecutorService executor = Executors.newFixedThreadPool(maxBrokers + 1); + final List> tasks = new ArrayList<>(maxBrokers); final CountDownLatch firstBrokerReleaseLatch = new CountDownLatch(1); final CountDownLatch releaseLatch = new CountDownLatch(1); try { // lease all brokers - final Thread brokerUsers[] = new Thread[maxBrokers]; final CountDownLatch acquiredLatch = new CountDownLatch(maxBrokers); - - final Thread firstBrokerUser = new Thread(new BrokerUser(pool, acquiredLatch, firstBrokerReleaseLatch), "first-brokerUser"); - brokerUsers[0] = firstBrokerUser; - brokerUsers[0].start(); - for (int i = 1; i < maxBrokers; i++) { - brokerUsers[i] = new Thread(new BrokerUser(pool, acquiredLatch, releaseLatch)); - brokerUsers[i].start(); + Future firstBrokerUser = executor.submit(new BrokerUser(pool, acquiredLatch, firstBrokerReleaseLatch)); + for (int count = 1; count < maxBrokers; count++) { + tasks.add(executor.submit(new BrokerUser(pool, acquiredLatch, releaseLatch))); } // wait for all brokers to be acquired @@ -160,9 +159,8 @@ public void canReleaseWhenSaturated() throws InterruptedException, ExecutionExce // create a new thread and attempt to get an additional broker final CountDownLatch additionalBrokerAcquiredLatch = new CountDownLatch(1); - final Thread additionalBrokerUser = new Thread(new BrokerUser(pool, additionalBrokerAcquiredLatch, releaseLatch), "additional-brokerUser"); assertEquals(1, additionalBrokerAcquiredLatch.getCount()); - additionalBrokerUser.start(); + executor.submit(new BrokerUser(pool, additionalBrokerAcquiredLatch, releaseLatch)); // we should not be able to acquire an additional broker, as we have already leased max Thread.sleep(500); // just to ensure the other thread has done something @@ -172,23 +170,92 @@ public void canReleaseWhenSaturated() throws InterruptedException, ExecutionExce assertEquals(1, firstBrokerReleaseLatch.getCount()); firstBrokerReleaseLatch.countDown(); assertEquals(0, firstBrokerReleaseLatch.getCount()); - firstBrokerUser.join(); // wait for the first broker lease thread to complete + firstBrokerUser.get(); // wait for the first broker lease thread to complete // check that the additional broker lease has now been acquired Thread.sleep(500); // just to ensure the other thread has done something assertEquals(0, additionalBrokerAcquiredLatch.getCount()); + executor.shutdown(); } finally { // release all brokers from brokerUsers - if(firstBrokerReleaseLatch.getCount() == 1) { + if (firstBrokerReleaseLatch.getCount() == 1) { firstBrokerReleaseLatch.countDown(); } releaseLatch.countDown(); + assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS)); + for (Future task : tasks) { + task.get(); + } + for (Runnable task : executor.shutdownNow()) { + assertNotNull(task); + } + } + } + + @Test + public void concurrentShutdownAndUse() throws InterruptedException, ExecutionException { + final BrokerPool pool = existEmbeddedServer.getBrokerPool(); + final int maxBrokers = pool.getMax(); + final int taskAmount = maxBrokers * 50; + + // test requires at least 5 leasedBrokers to prove the issue + assertTrue(maxBrokers > 4); + + final CountDownLatch readyLatch = new CountDownLatch(1); + final CountDownLatch executeLatch = new CountDownLatch(1); + final ExecutorService executor = Executors.newFixedThreadPool(taskAmount); + final List> tasks = new ArrayList<>(taskAmount); + final Consumer brokerAquire = brokerPool -> { + try (final DBBroker broker = brokerPool.getBroker()) { + TimeUnit.SECONDS.sleep(1); + } catch (EXistException e) { + throw new IllegalStateException(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + }; + for (int count = 0; count < taskAmount; count++) { + tasks.add(executor.submit(new PoolAction(pool, readyLatch, executeLatch, (count % 2 == 0) ? BrokerPool::shutdown : brokerAquire))); + } + executor.shutdown(); + + TimeUnit.SECONDS.sleep(2); + readyLatch.countDown(); + + assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES)); + for (Future task : tasks) { + task.get(); + } + for (Runnable task : executor.shutdownNow()) { + assertNotNull(task); + } + } + + static class PoolAction implements Callable { + private final BrokerPool brokerPool; + private final CountDownLatch readyLatch; + private final CountDownLatch excuteLatch; + private final Consumer action; + + PoolAction(final BrokerPool brokerPool, CountDownLatch readyLatch, CountDownLatch excuteLatch, Consumer action) { + this.brokerPool = brokerPool; + this.readyLatch = readyLatch; + this.excuteLatch = excuteLatch; + this.action = action; + } + + @Override + public Void call() throws InterruptedException { + readyLatch.await(); + action.accept(brokerPool); + return null; } } - public static class BrokerUser implements Runnable { + public static class BrokerUser implements Callable { final BrokerPool brokerPool; private final CountDownLatch acquiredLatch; private final CountDownLatch releaseLatch; @@ -200,8 +267,8 @@ public BrokerUser(final BrokerPool brokerPool, final CountDownLatch acquiredLatc } @Override - public void run() { - try(final DBBroker broker = brokerPool.getBroker()) { + public Void call() throws EXistException, InterruptedException { + try (final DBBroker broker = brokerPool.getBroker()) { // signal that we have acquired the broker acquiredLatch.countDown(); @@ -210,9 +277,8 @@ public void run() { // wait for signal to release the broker releaseLatch.await(); - } catch(final EXistException | InterruptedException e) { - fail(e.getMessage()); } + return null; } }