diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java index c32487a30bc..6f3e421b2c7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java @@ -111,6 +111,11 @@ public class ReplicationWorker implements Runnable { private final long baseBackoffForLockReleaseOfFailedLedger; private final BiConsumer onReadEntryFailureCallback; private final LedgerManager ledgerManager; + enum ReplicateResult { + SUCCESS, + FAILED, + SKIP_OPEN_FRAGMENT + } // Expose Stats private final StatsLogger statsLogger; @@ -246,7 +251,7 @@ public void run() { workerRunning = true; while (workerRunning) { try { - if (!rereplicate()) { + if (rereplicate() == ReplicateResult.FAILED) { LOG.warn("failed while replicating fragments"); waitBackOffTime(rwRereplicateBackoffMs); } @@ -290,24 +295,27 @@ private static void waitBackOffTime(long backoffMs) { * Replicates the under replicated fragments from failed bookie ledger to * targetBookie. */ - private boolean rereplicate() throws InterruptedException, BKException, + private ReplicateResult rereplicate() throws InterruptedException, BKException, UnavailableException { long ledgerIdToReplicate = underreplicationManager .getLedgerToRereplicate(); Stopwatch stopwatch = Stopwatch.createStarted(); - boolean success = false; + + ReplicateResult result = ReplicateResult.FAILED; try { - success = rereplicate(ledgerIdToReplicate); + result = rereplicate(ledgerIdToReplicate); } finally { long latencyMillis = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS); - if (success) { + if (result == ReplicateResult.SUCCESS) { rereplicateOpStats.registerSuccessfulEvent(latencyMillis, TimeUnit.MILLISECONDS); - } else { + } else if (result == ReplicateResult.FAILED) { rereplicateOpStats.registerFailedEvent(latencyMillis, TimeUnit.MILLISECONDS); + } else { + LOG.info("Replication of ledger with open fragment is skipped, ledgerId is {}.", ledgerIdToReplicate); } } - return success; + return result; } private void logBKExceptionAndReleaseLedger(BKException e, long ledgerIdToReplicate) @@ -440,7 +448,7 @@ private Set getNeedRepairedPlacementNotAdheringFragments(LedgerH } @SuppressFBWarnings("RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE") - private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedException, BKException, + private ReplicateResult rereplicate(long ledgerIdToReplicate) throws InterruptedException, BKException, UnavailableException { if (LOG.isDebugEnabled()) { LOG.debug("Going to replicate the fragments of the ledger: {}", ledgerIdToReplicate); @@ -495,21 +503,21 @@ private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedExceptio if (foundOpenFragments || isLastSegmentOpenAndMissingBookies(lh)) { deferLedgerLockRelease = true; deferLedgerLockRelease(ledgerIdToReplicate); - return false; + return ReplicateResult.SKIP_OPEN_FRAGMENT; } fragments = getUnderreplicatedFragments(lh, conf.getAuditorLedgerVerificationPercentage()); if (fragments.size() == 0) { LOG.info("Ledger replicated successfully. ledger id is: " + ledgerIdToReplicate); underreplicationManager.markLedgerReplicated(ledgerIdToReplicate); - return true; + return ReplicateResult.SUCCESS; } else { deferLedgerLockRelease = true; deferLedgerLockReleaseOfFailedLedger(ledgerIdToReplicate); numDeferLedgerLockReleaseOfFailedLedger.inc(); // Releasing the underReplication ledger lock and compete // for the replication again for the pending fragments - return false; + return ReplicateResult.FAILED; } } catch (BKNoSuchLedgerExistsOnMetadataServerException e) { @@ -520,13 +528,13 @@ private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedExceptio + "So, no harm to continue", ledgerIdToReplicate); underreplicationManager.markLedgerReplicated(ledgerIdToReplicate); getExceptionCounter("BKNoSuchLedgerExistsOnMetadataServerException").inc(); - return false; + return ReplicateResult.FAILED; } catch (BKNotEnoughBookiesException e) { logBKExceptionAndReleaseLedger(e, ledgerIdToReplicate); throw e; } catch (BKException e) { logBKExceptionAndReleaseLedger(e, ledgerIdToReplicate); - return false; + return ReplicateResult.FAILED; } finally { // we make sure we always release the underreplicated lock, unless we decided to defer it. If the lock has // already been released, this is a no-op diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java index 507f143d5ca..0a1a30f07a7 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java @@ -1248,6 +1248,44 @@ public void testReplicateEmptyOpenStateLedger() throws Exception { } } + @Test + public void testReplicateOpenStateLedger() throws Exception { + LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, + TESTPASSWD); + for (int i = 0; i < 10; i++) { + lh.addEntry(data); + } + + BookieId replicaToKill = lh.getLedgerMetadata().getAllEnsembles().get(0L).get(0); + LOG.info("Killing Bookie : {}", replicaToKill); + killBookie(replicaToKill); + + ReplicationWorker rw = new ReplicationWorker(baseConf); + try { + underReplicationManager.markLedgerUnderreplicated(lh.getId(), + replicaToKill.toString()); + while (!ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh + .getId(), basePath)) { + Thread.sleep(100); + } + + final Method rereplicate = rw.getClass().getDeclaredMethod("rereplicate", long.class); + rereplicate.setAccessible(true); + assertFalse(lh.getLedgerMetadata().isClosed()); + Object result = rereplicate.invoke(rw, lh.getId()); + assertEquals(result, ReplicationWorker.ReplicateResult.SKIP_OPEN_FRAGMENT); + lh.close(); + assertTrue(lh.getLedgerMetadata().isClosed()); + result = rereplicate.invoke(rw, lh.getId()); + assertEquals(result, ReplicationWorker.ReplicateResult.FAILED); + BookieId newBkAddr = startNewBookieAndReturnBookieId(); + LOG.info("New Bookie addr : {}", newBkAddr); + result = rereplicate.invoke(rw, lh.getId()); + assertEquals(result, ReplicationWorker.ReplicateResult.SUCCESS); + } finally { + rw.shutdown(); + } + } @Test public void testRepairedNotAdheringPlacementPolicyLedgerFragmentsOnRack() throws Exception { testRepairedNotAdheringPlacementPolicyLedgerFragments(RackawareEnsemblePlacementPolicy.class, null); @@ -1275,14 +1313,14 @@ public void testReplicationStats() throws Exception { 1, numDeferLedgerLockReleaseOfFailedLedgerCounter.get().longValue()); if (first) { - assertFalse((boolean) result); + assertEquals(result, ReplicationWorker.ReplicateResult.FAILED); assertEquals("NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED", 0, numLedgersReplicatedCounter.get().longValue()); assertEquals("NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED", 0, numNotAdheringPlacementLedgersCounter.get().longValue()); } else { - assertTrue((boolean) result); + assertEquals(result, ReplicationWorker.ReplicateResult.SUCCESS); assertEquals("NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED", 1, numLedgersReplicatedCounter.get().longValue()); assertEquals("NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED",