diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 7d1f67a2e4b..6872a3e81b9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -2271,27 +2271,43 @@ void ensembleChangeLoop(List origEnsemble, Map fail List newEnsemble = null; Set replaced = null; + synchronized (metadataLock) { + if (delayedWriteFailedBookies.isEmpty()) { + newEnsemble = getCurrentEnsemble(); + replaced = EnsembleUtils.diffEnsemble(origEnsemble, newEnsemble); + LOG.info("New Ensemble: {} for ledger: {}", newEnsemble, ledgerId); + } + } + + if (newEnsemble != null) { + // Resend write requests while changingEnsemble is true (unsetSuccess outside the lock). + // sendAddSuccessCallbacks() is skipped in this state, + // preventing pending adds from being marked complete prematurely. + unsetSuccessAndSendWriteRequest(newEnsemble, replaced); + } Map toReplace = null; + List nextOrigEnsemble = null; + boolean triggerCallbacks = false; synchronized (metadataLock) { if (!delayedWriteFailedBookies.isEmpty()) { toReplace = new HashMap<>(delayedWriteFailedBookies); delayedWriteFailedBookies.clear(); + nextOrigEnsemble = newEnsemble != null ? newEnsemble : origEnsemble; } else { - newEnsemble = getCurrentEnsemble(); - replaced = EnsembleUtils.diffEnsemble(origEnsemble, newEnsemble); - LOG.info("New Ensemble: {} for ledger: {}", newEnsemble, ledgerId); - changingEnsemble = false; + triggerCallbacks = true; } } - if (toReplace != null && !toReplace.isEmpty()) { - ensembleChangeLoop(origEnsemble, toReplace); + if (triggerCallbacks) { + // Trigger sendAddSuccessCallbacks() after changingEnsemble is set to false, + // so that pending adds can complete once the ensemble change is finished. + sendAddSuccessCallbacks(); } - if (newEnsemble != null) { // unsetSuccess outside of lock - unsetSuccessAndSendWriteRequest(newEnsemble, replaced); + if (toReplace != null && !toReplace.isEmpty()) { + ensembleChangeLoop(nextOrigEnsemble, toReplace); } } }, clientCtx.getMainWorkerPool().chooseThread(ledgerId)); @@ -2303,6 +2319,21 @@ void unsetSuccessAndSendWriteRequest(List ensemble, final Set pendingAddOp.unsetSuccessAndSendWriteRequest(ensemble, bookieIndex); } } + // Suppose that unset doesn't happen on the write set of an entry. In this + // case we don't need to resend the write request upon an ensemble change. + // We do need to invoke #sendAddSuccessCallbacks() for such entries because + // they may have already completed, but they are just waiting for the ensemble + // to change. + // E.g. + // ensemble (A, B, C, D), entry k is written to (A, B, D). An ensemble change + // happens to replace C with E. Entry k does not complete until C is + // replaced with E successfully. When the ensemble change completes, it tries + // to unset entry k. C however is not in k's write set, so no entry is written + // again, and no one triggers #sendAddSuccessCallbacks. Consequently, k never + // completes. + // + // We call sendAddSuccessCallback to cover this case. + sendAddSuccessCallbacks(); } void registerOperationFailureOnBookie(BookieId bookie, long entryId) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java index 6b82200b3fa..4ae0af75904 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java @@ -188,20 +188,7 @@ synchronized void unsetSuccessAndSendWriteRequest(List ensemble, int b } // Suppose that unset doesn't happen on the write set of an entry. In this // case we don't need to resend the write request upon an ensemble change. - // We do need to invoke #sendAddSuccessCallbacks() for such entries because - // they may have already completed, but they are just waiting for the ensemble - // to change. - // E.g. - // ensemble (A, B, C, D), entry k is written to (A, B, D). An ensemble change - // happens to replace C with E. Entry k does not complete until C is - // replaced with E successfully. When the ensemble change completes, it tries - // to unset entry k. C however is not in k's write set, so no entry is written - // again, and no one triggers #sendAddSuccessCallbacks. Consequently, k never - // completes. - // - // We call sendAddSuccessCallback when unsetting t cover this case. if (!lh.distributionSchedule.hasEntry(entryId, bookieIndex)) { - lh.sendAddSuccessCallbacks(); return; } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java index 93ace7eacb1..9e4a0afe6e4 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java @@ -101,9 +101,7 @@ public void testConstructionZkDelay() throws Exception { conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()) .setZkTimeout(20000); - CountDownLatch l = new CountDownLatch(1); - zkUtil.sleepCluster(200, TimeUnit.MILLISECONDS, l); - l.await(); + zkUtil.sleepCluster(200, TimeUnit.MILLISECONDS); BookKeeper bkc = new BookKeeper(conf); bkc.createLedger(digestType, "testPasswd".getBytes()).close(); @@ -117,9 +115,7 @@ public void testConstructionNotConnectedExplicitZk() throws Exception { conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()) .setZkTimeout(20000); - CountDownLatch l = new CountDownLatch(1); - zkUtil.sleepCluster(200, TimeUnit.MILLISECONDS, l); - l.await(); + zkUtil.sleepCluster(200, TimeUnit.MILLISECONDS); ZooKeeper zk = new ZooKeeper( zkUtil.getZooKeeperConnectString(), diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java index ea0d1b56b49..88679d70d68 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java @@ -37,6 +37,7 @@ import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; import java.io.IOException; +import java.lang.reflect.Field; import java.net.URI; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -47,6 +48,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -1538,7 +1540,142 @@ public void testLedgerMetadataTest() throws Exception { lh.close(); } + @Test + public void testToDelayEnsembleReplacementAndRewriteEntry() throws Exception { + lh = bkc.createLedger(4, 2, digestType, ledgerPassword); + + // Put Bookie0 to sleep. + List currentEnsemble = lh.getLedgerMetadata() + .getAllEnsembles().entrySet().iterator().next().getValue(); + CountDownLatch bookie0Latch = new CountDownLatch(1); + sleepBookie(currentEnsemble.get(0), bookie0Latch); + + // Write entry0,1,2,3 to Bookie. + int sendCount = 7; + CountDownLatch addCompleteLatch = new CountDownLatch(sendCount); + for (int count = 0; count < 4; count++) { + ByteBuffer entry = ByteBuffer.allocate(4); + entry.putInt(count); + entry.position(0); + entries1.add(entry.array()); + + lh.asyncAddEntry(entry.array(), new AddCallback() { + @Override + public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) { + CountDownLatch addCompleteLatch = (CountDownLatch) ctx; + addCompleteLatch.countDown(); + } + }, addCompleteLatch); + } + + // Expected state of entries. + // entry: 0(Bookie0, Bookie1) -> Waiting for successful write to Bookie0. + // entry: 1(Bookie1, Bookie2) -> Writing to Bookie1,2 was successful, but its completion is pending. + // entry: 2(Bookie2, Bookie3) -> Writing to Bookie2,3 was successful, but its completion is pending. + // entry: 3(Bookie3, Bookie0) -> Waiting for successful write to Bookie0. + Field fieldPendingAddOps = lh.getClass().getDeclaredField("pendingAddOps"); + fieldPendingAddOps.setAccessible(true); + int completedCount; + do { + Thread.sleep(100); + + completedCount = 0; + for (PendingAddOp pendingAddOp : (Queue) fieldPendingAddOps.get(lh)) { + if (pendingAddOp.completed) { + completedCount++; + } + } + } while (completedCount != 2); + + // Kill Bookie2,3 and start a new Bookie. + killBookie(currentEnsemble.get(2)); + killBookie(currentEnsemble.get(3)); + startNewBookie(); + + // Put ZK cluster to sleep to delay ensemble replacement. + CountDownLatch zkLatch = new CountDownLatch(1); + sleepZKCluster(zkLatch); + + // Write entry4,5,6 to Bookie. + for (int count = 4; count < sendCount; count++) { + ByteBuffer entry = ByteBuffer.allocate(4); + entry.putInt(count); + entry.position(0); + entries1.add(entry.array()); + + lh.asyncAddEntry(entry.array(), new AddCallback() { + @Override + public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) { + CountDownLatch addCompleteLatch = (CountDownLatch) ctx; + addCompleteLatch.countDown(); + } + }, addCompleteLatch); + } + + // Expected state of entries. + // entry: 0(Bookie0, Bookie1) -> Waiting for successful write to Bookie0. + // entry: 1(Bookie1, Bookie2) -> Writing to Bookie1,2 was successful, but its completion is pending. + // entry: 2(Bookie2, Bookie3) -> Writing to Bookie2,3 was successful, but its completion is pending. + // entry: 3(Bookie3, Bookie0) -> Waiting for successful write to Bookie0. + // entry: 4(Bookie0, Bookie1) -> Waiting for successful write to Bookie0. + // entry: 5(Bookie1, Bookie2) -> Failed to write to Bookie2. + // entry: 6(Bookie2, Bookie3) -> Failed to write to Bookie2,3. + Field fieldChangingEnsemble = lh.getClass().getDeclaredField("changingEnsemble"); + fieldChangingEnsemble.setAccessible(true); + boolean changingEnsemble; + do { + Thread.sleep(100); + + changingEnsemble = (boolean) fieldChangingEnsemble.get(lh); + } while (!changingEnsemble); + + // Bookie0 is wake up, write to Bookie0 is successful. + // + // Expected state of entries. + // entry: 0(Bookie0, Bookie1) -> Writing to Bookie0,1 was successful, but its completion is pending. + // entry: 1(Bookie1, Bookie2) -> Writing to Bookie1,2 was successful, but its completion is pending. + // entry: 2(Bookie2, Bookie3) -> Writing to Bookie2,3 was successful, but its completion is pending. + // entry: 3(Bookie3, Bookie0) -> Writing to Bookie3,0 was successful, but its completion is pending. + // entry: 4(Bookie0, Bookie1) -> Writing to Bookie0,1 was successful, but its completion is pending. + // entry: 5(Bookie1, Bookie2) -> Failed to write to Bookie2. + // entry: 6(Bookie2, Bookie3) -> Failed to write to Bookie2,3. + bookie0Latch.countDown(); + do { + Thread.sleep(100); + + completedCount = 0; + for (PendingAddOp pendingAddOp : (Queue) fieldPendingAddOps.get(lh)) { + if (pendingAddOp.completed) { + completedCount++; + } + } + } while (completedCount != 5); + + // ZK cluster is wake up, then ensemble replacement is completed. + // + // Expected state of entries. + // entry: 0(Bookie0, Bookie1) -> Entry write is completed. + // entry: 1(Bookie1, Bookie4) -> Write to Bookie4. + // entry: 2(Bookie4, Bookie5) -> Write to Bookie4,5. + // entry: 3(Bookie5, Bookie0) -> Write to Bookie5. + // entry: 4(Bookie0, Bookie1) -> Writing to Bookie0,1 was successful, but its completion is pending. + // entry: 5(Bookie1, Bookie4) -> Write to Bookie4. + // entry: 6(Bookie4, Bookie5) -> Write to Bookie4,5. + zkLatch.countDown(); + + // Waiting for all Entry writes to complete. + addCompleteLatch.await(); + + readEntries(lh, entries1, sendCount); + lh.close(); + } + private void readEntries(LedgerHandle lh, List entries) throws InterruptedException, BKException { + readEntries(lh, entries, numEntriesToWrite); + } + + private void readEntries(LedgerHandle lh, List entries, int numEntriesToWrite) + throws InterruptedException, BKException { ls = lh.readEntries(0, numEntriesToWrite - 1); int index = 0; while (ls.hasMoreElements()) { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/DriverMetadataServiceAvailableTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/DriverMetadataServiceAvailableTest.java index 2b0c01bf8ec..a669002312c 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/DriverMetadataServiceAvailableTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/DriverMetadataServiceAvailableTest.java @@ -20,7 +20,6 @@ */ package org.apache.bookkeeper.client.api; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; @@ -44,7 +43,7 @@ public void testDriverMetadataServiceAvailable() conf.setZkTimeout(3000); try (BookKeeper bkc = BookKeeper.newBuilder(conf).build()) { Awaitility.await().until(() -> bkc.isDriverMetadataServiceAvailable().get()); - zkUtil.sleepCluster(5, TimeUnit.SECONDS, new CountDownLatch(1)); + zkUtil.sleepCluster(5, TimeUnit.SECONDS); Awaitility.await().until(() -> !bkc.isDriverMetadataServiceAvailable().get()); Awaitility.await().until(() -> bkc.isDriverMetadataServiceAvailable().get()); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index b73a3ee7b44..daa62b2c4c1 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -267,6 +267,11 @@ protected void stopZKCluster() throws Exception { zkUtil.killCluster(); } + protected void sleepZKCluster(final CountDownLatch l) + throws InterruptedException, IOException { + zkUtil.sleepCluster(l); + } + /** * Start cluster. Also, starts the auto recovery process for each bookie, if * isAutoRecoveryEnabled is true. diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperCluster.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperCluster.java index b0e828bd5ca..9f1cebdb37e 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperCluster.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperCluster.java @@ -58,7 +58,10 @@ public interface ZooKeeperCluster { void killCluster() throws Exception; - void sleepCluster(int time, TimeUnit timeUnit, CountDownLatch l) + void sleepCluster(int time, TimeUnit timeUnit) + throws InterruptedException, IOException; + + void sleepCluster(CountDownLatch l) throws InterruptedException, IOException; default void expireSession(ZooKeeper zk) throws Exception { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperClusterUtil.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperClusterUtil.java index 6dbf182110f..342eaa30abb 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperClusterUtil.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperClusterUtil.java @@ -136,7 +136,12 @@ public void killCluster() throws Exception { } @Override - public void sleepCluster(int time, TimeUnit timeUnit, CountDownLatch l) throws InterruptedException, IOException { + public void sleepCluster(int time, TimeUnit timeUnit) throws InterruptedException, IOException { + throw new UnsupportedOperationException("sleepServer operation is not supported for ZooKeeperClusterUtil"); + } + + @Override + public void sleepCluster(CountDownLatch l) throws InterruptedException, IOException { throw new UnsupportedOperationException("sleepServer operation is not supported for ZooKeeperClusterUtil"); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java index dcaa0506afe..095e7477fe5 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java @@ -140,19 +140,19 @@ public void restartCluster() throws Exception { @Override public void sleepCluster(final int time, - final TimeUnit timeUnit, - final CountDownLatch l) + final TimeUnit timeUnit) throws InterruptedException, IOException { Thread[] allthreads = new Thread[Thread.activeCount()]; Thread.enumerate(allthreads); for (final Thread t : allthreads) { if (t.getName().contains("SyncThread:0")) { + final CountDownLatch suspendLatch = new CountDownLatch(1); Thread sleeper = new Thread() { @SuppressWarnings("deprecation") public void run() { try { t.suspend(); - l.countDown(); + suspendLatch.countDown(); timeUnit.sleep(time); t.resume(); } catch (Exception e) { @@ -161,6 +161,36 @@ public void run() { } }; sleeper.start(); + suspendLatch.await(); + return; + } + } + throw new IOException("ZooKeeper thread not found"); + } + + @Override + public void sleepCluster(final CountDownLatch l) + throws InterruptedException, IOException { + Thread[] allthreads = new Thread[Thread.activeCount()]; + Thread.enumerate(allthreads); + for (final Thread t : allthreads) { + if (t.getName().contains("SyncThread:0")) { + final CountDownLatch suspendLatch = new CountDownLatch(1); + Thread sleeper = new Thread() { + @SuppressWarnings("deprecation") + public void run() { + try { + t.suspend(); + suspendLatch.countDown(); + l.await(); + t.resume(); + } catch (Exception e) { + LOG.error("Error suspending thread", e); + } + } + }; + sleeper.start(); + suspendLatch.await(); return; } }