diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java index 944b97e79dc..476d5c4b18d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java @@ -1854,6 +1854,7 @@ class ForceAuditorChecksCmd extends MyCommand { + "upon next Auditor startup "); opts.addOption("rc", "replicascheck", false, "Force replicasCheck audit " + "upon next Auditor startup "); + opts.addOption("f", "force", false, "Force re-schedule auditor task to run."); } @Override @@ -1870,7 +1871,7 @@ String getDescription() { @Override String getUsage() { - return "forceauditchecks [-checkallledgerscheck [-placementpolicycheck] [-replicascheck]"; + return "forceauditchecks [-checkallledgerscheck] [-placementpolicycheck] [-replicascheck] [-force]"; } @Override @@ -1878,6 +1879,7 @@ int runCmd(CommandLine cmdLine) throws Exception { boolean checkAllLedgersCheck = cmdLine.hasOption("calc"); boolean placementPolicyCheck = cmdLine.hasOption("ppc"); boolean replicasCheck = cmdLine.hasOption("rc"); + boolean forceScheduleTask = cmdLine.hasOption("f"); if (checkAllLedgersCheck || placementPolicyCheck || replicasCheck) { runFunctionWithLedgerManagerFactory(bkConf, mFactory -> { @@ -1898,6 +1900,10 @@ int runCmd(CommandLine cmdLine) throws Exception { LOG.info("Resetting ReplicasCheckCTime to : " + new Timestamp(time)); underreplicationManager.setReplicasCheckCTime(time); } + if (forceScheduleTask) { + LOG.info("Emitting Reschedule Auditor check tasks execution."); + underreplicationManager.emitRescheduleAuditorTasks(); + } } } catch (InterruptedException | ReplicationException e) { LOG.error("Exception while trying to reset last run time ", e); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java index 3cd1ceb4b15..a61c0fdc28a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java @@ -158,6 +158,39 @@ boolean isLedgerReplicationEnabled() void notifyLedgerReplicationEnabled(GenericCallback cb) throws ReplicationException.UnavailableException; + /** + * Emit Auditor tasks re-schedule. + * + */ + void emitRescheduleAuditorTasks() + throws ReplicationException.UnavailableException; + + /** + * Finishing Auditor tasks re-schedule. + * + */ + void finishedRescheduleAuditorTasks() + throws ReplicationException.UnavailableException; + + /** + * Check whether the Auditor tasks re-schedule is emitted or not. This will return + * true if the schedule task is emitted, otherwise return false. + * + * @return - return true if it is emitted otherwise return false + */ + boolean isAuditorTasksRescheduleEmit() + throws ReplicationException.UnavailableException; + + /** + * Receive notification asynchronously when the schedule auditor tasks + * is emitted. + * + * @param cb + * - callback implementation to receive the notification + */ + void notifyRescheduleAuditorTasksChanged(GenericCallback cb) + throws ReplicationException.UnavailableException; + /** * Creates the zNode for lostBookieRecoveryDelay with the specified value and returns true. * If the node is already existing, then it returns false. diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java index 7d2d84381da..ae8c40d5fb1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java @@ -359,6 +359,16 @@ public boolean isLedgerReplicationEnabled() { @Override public void notifyLedgerReplicationEnabled(GenericCallback cb) {} @Override + public void emitRescheduleAuditorTasks() {} + @Override + public void finishedRescheduleAuditorTasks() {} + @Override + public boolean isAuditorTasksRescheduleEmit() { + return false; + } + @Override + public void notifyRescheduleAuditorTasksChanged(GenericCallback cb) {} + @Override public boolean initializeLostBookieRecoveryDelay(int lostBookieRecoveryDelay) { return false; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java index a8c30c43270..c8585960be5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java @@ -772,6 +772,104 @@ public void process(WatchedEvent e) { } } + @Override + public void emitRescheduleAuditorTasks() + throws ReplicationException.UnavailableException { + List zkAcls = ZkUtils.getACLs(conf); + if (LOG.isDebugEnabled()) { + LOG.debug("emitRescheduleAuditorTasks()"); + } + try { + String znode = basePath + '/' + BookKeeperConstants.SCHEDULE_AUDITOR_NODE; + zkc.create(znode, "".getBytes(UTF_8), zkAcls, CreateMode.PERSISTENT); + LOG.info("Auto Schedule auditor tasks emitted!"); + } catch (KeeperException.NodeExistsException ke) { + LOG.warn("Schedule auditor tasks is already emitted!", ke); + throw new ReplicationException.UnavailableException( + "Schedule auditor tasks is already emitted!", ke); + } catch (KeeperException ke) { + LOG.error("Exception while emitting auto schedule auditor tasks", ke); + throw ReplicationException.fromKeeperException("Exception while emitting auto schedule auditor tasks", ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new ReplicationException.UnavailableException( + "Interrupted while emitting auto schedule auditor tasks", ie); + } + } + + @Override + public void finishedRescheduleAuditorTasks() + throws ReplicationException.UnavailableException { + if (LOG.isDebugEnabled()) { + LOG.debug("finishedRescheduleAuditorTasks()"); + } + try { + zkc.delete(basePath + '/' + BookKeeperConstants.SCHEDULE_AUDITOR_NODE, -1); + LOG.info("Finished automatic schedule auditor tasks"); + } catch (KeeperException.NoNodeException ke) { + LOG.warn("Schedule auditor tasks is already finished!", ke); + throw new ReplicationException.UnavailableException( + "Schedule auditor tasks is already finished!", ke); + } catch (KeeperException ke) { + LOG.error("Exception while finishing schedule auditor tasks", ke); + throw ReplicationException.fromKeeperException("Exception while finishing schedule auditor tasks", ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new ReplicationException.UnavailableException( + "Interrupted while finishing schedule auditor tasks", ie); + } + } + + @Override + public boolean isAuditorTasksRescheduleEmit() + throws ReplicationException.UnavailableException { + if (LOG.isDebugEnabled()) { + LOG.debug("isAuditorTasksRescheduleEmit()"); + } + try { + return null != zkc.exists(basePath + '/' + + BookKeeperConstants.SCHEDULE_AUDITOR_NODE, false); + } catch (KeeperException ke) { + LOG.error("Error while checking the state of " + + "auditor tasks schedule", ke); + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new ReplicationException.UnavailableException( + "Interrupted while contacting zookeeper", ie); + } + } + + @Override + public void notifyRescheduleAuditorTasksChanged(final GenericCallback cb) + throws ReplicationException.UnavailableException { + if (LOG.isDebugEnabled()) { + LOG.debug("notifyRescheduleAuditorTasksChanged()"); + } + Watcher w = new Watcher() { + @Override + public void process(WatchedEvent e) { + if (e.getType() == Event.EventType.NodeCreated) { + LOG.info("Schedule auditor tasks is emitted externally through Zookeeper, " + + "since SCHEDULE_AUDITOR_NODE ZNode is created"); + cb.operationComplete(0, null); + } + } + }; + try { + zkc.addWatch(basePath + "/" + + BookKeeperConstants.SCHEDULE_AUDITOR_NODE, w, AddWatchMode.PERSISTENT); + } catch (KeeperException ke) { + LOG.error("Error while checking the state of " + + "schedule auditor tasks", ke); + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new ReplicationException.UnavailableException( + "Interrupted while contacting zookeeper", ie); + } + } + /** * Check whether the ledger is being replicated by any bookie. */ diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java index 220d52d8e55..5420086d97f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java @@ -34,6 +34,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -55,6 +56,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.commons.collections4.CollectionUtils; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,6 +90,9 @@ public class Auditor implements AutoCloseable { protected AuditorTask auditorPlacementPolicyCheckTask; protected AuditorTask auditorReplicasCheckTask; private final List allAuditorTasks = Lists.newArrayList(); + protected volatile ScheduledFuture auditorCheckAllLedgersTaskFuture; + protected volatile ScheduledFuture auditorPlacementPolicyCheckTaskFuture; + protected volatile ScheduledFuture auditorReplicasCheckTaskFuture; private final AuditorStats auditorStats; @@ -379,6 +384,63 @@ synchronized Future submitLostBookieRecoveryDelayChangedEvent() { }); } + synchronized Future submitRescheduleAuditorTasksChangedEvent() { + if (executor.isShutdown()) { + SettableFuture f = SettableFuture.create(); + f.setException(new BKAuditException("Auditor shutting down")); + return f; + } + return executor.submit(() -> { + boolean reScheduleEmit = true; + try { + waitIfLedgerReplicationDisabled(); + reScheduleEmit = Auditor.this.ledgerUnderreplicationManager.isAuditorTasksRescheduleEmit(); + if (reScheduleEmit) { + if (auditorCheckAllLedgersTaskFuture != null + && !auditorCheckAllLedgersTaskFuture.isCancelled()) { + LOG.info("RescheduleAuditorTasks has been emitted " + + "so canceling the pending auditorCheckAllLedgersTask"); + auditorCheckAllLedgersTaskFuture.cancel(false); + } + if (auditorPlacementPolicyCheckTaskFuture != null + && !auditorPlacementPolicyCheckTaskFuture.isCancelled()) { + LOG.info("RescheduleAuditorTasks has been emitted " + + "so canceling the pending auditorPlacementPolicyCheckTask"); + auditorPlacementPolicyCheckTaskFuture.cancel(false); + } + if (auditorReplicasCheckTaskFuture != null + && !auditorReplicasCheckTaskFuture.isCancelled()) { + LOG.info("RescheduleAuditorTasks has been emitted " + + "so canceling the pending auditorReplicasCheckTask"); + auditorReplicasCheckTaskFuture.cancel(false); + } + + scheduleCheckAllLedgersTask(); + schedulePlacementPolicyCheckTask(); + scheduleReplicasCheckTask(); + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.error("Interrupted while for LedgersReplication to be enabled ", ie); + } catch (ReplicationException.NonRecoverableReplicationException nre) { + LOG.error("Non Recoverable Exception while reading from ZK", nre); + submitShutdownTask(); + } catch (UnavailableException ue) { + LOG.error("Exception while reading from ZK", ue); + } finally { + if (reScheduleEmit) { + auditorStats.getNumAuditorTasksRescheduleEmitted().inc(); + try { + okFinishedRescheduleAuditorTasks(); + } catch (UnavailableException e) { + LOG.error("Exception while finished schedule tasks ", e); + submitShutdownTask(); + } + } + } + }); + } + public void start() { LOG.info("I'm starting as Auditor Bookie. ID: {}", bookieIdentifier); // on startup watching available bookie and based on the @@ -395,6 +457,11 @@ public void start() { .notifyLostBookieRecoveryDelayChanged(new LostBookieRecoveryDelayChangedCb()); this.ledgerUnderreplicationManager.notifyUnderReplicationLedgerChanged( new UnderReplicatedLedgersChangedCb()); + // We always finished earlier triggered Reschedule auditor tasks when Auditor starting. + // Because we can directly schedule tasks when the Auditor starts. + okFinishedRescheduleAuditorTasks(); + this.ledgerUnderreplicationManager + .notifyRescheduleAuditorTasksChanged(new RescheduleAuditorTasksChangedCb()); } catch (BKException bke) { LOG.error("Couldn't get bookie list, so exiting", bke); submitShutdownTask(); @@ -412,6 +479,20 @@ public void start() { } } + private void okFinishedRescheduleAuditorTasks() throws UnavailableException { + try { + Auditor.this.ledgerUnderreplicationManager.finishedRescheduleAuditorTasks(); + } catch (UnavailableException e) { + if (e.getCause() != null && e.getCause() instanceof KeeperException.NoNodeException) { + if (LOG.isDebugEnabled()) { + LOG.debug("NoNode is ok while finished schedule tasks."); + } + } else { + throw e; + } + } + } + protected void submitBookieCheckTask() { executor.submit(auditorBookieCheckTask); } @@ -466,7 +547,8 @@ private void scheduleCheckAllLedgersTask() { + "durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}", checkAllLedgersLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval); - executor.scheduleAtFixedRate(auditorCheckAllLedgersTask, initialDelay, interval, TimeUnit.SECONDS); + auditorCheckAllLedgersTaskFuture = + executor.scheduleAtFixedRate(auditorCheckAllLedgersTask, initialDelay, interval, TimeUnit.SECONDS); } else { LOG.info("Periodic checking disabled"); } @@ -510,7 +592,9 @@ private void schedulePlacementPolicyCheckTask() { + "durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}", placementPolicyCheckLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval); - executor.scheduleAtFixedRate(auditorPlacementPolicyCheckTask, initialDelay, interval, TimeUnit.SECONDS); + auditorPlacementPolicyCheckTaskFuture = + executor.scheduleAtFixedRate( + auditorPlacementPolicyCheckTask, initialDelay, interval, TimeUnit.SECONDS); } else { LOG.info("Periodic placementPolicy check disabled"); } @@ -555,7 +639,8 @@ private void scheduleReplicasCheckTask() { + "durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}", replicasCheckLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval); - executor.scheduleAtFixedRate(auditorReplicasCheckTask, initialDelay, interval, TimeUnit.SECONDS); + auditorReplicasCheckTaskFuture = + executor.scheduleAtFixedRate(auditorReplicasCheckTask, initialDelay, interval, TimeUnit.SECONDS); } private class UnderReplicatedLedgersChangedCb implements GenericCallback { @@ -584,6 +669,22 @@ public void operationComplete(int rc, Void result) { } } + private class RescheduleAuditorTasksChangedCb implements GenericCallback { + @Override + public void operationComplete(int rc, Void result) { + try { + Auditor.this.ledgerUnderreplicationManager + .notifyRescheduleAuditorTasksChanged(RescheduleAuditorTasksChangedCb.this); + } catch (ReplicationException.NonRecoverableReplicationException nre) { + LOG.error("Non Recoverable Exception while reading from ZK", nre); + submitShutdownTask(); + } catch (UnavailableException ae) { + LOG.error("Exception while registering for a RescheduleAuditorTasks notification", ae); + } + Auditor.this.submitRescheduleAuditorTasksChangedEvent(); + } + } + private void waitIfLedgerReplicationDisabled() throws UnavailableException, InterruptedException { if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorStats.java index 286ec388b30..c1a560e2df3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorStats.java @@ -21,6 +21,7 @@ import static org.apache.bookkeeper.replication.ReplicationStats.AUDIT_BOOKIES_TIME; import static org.apache.bookkeeper.replication.ReplicationStats.BOOKIE_TO_LEDGERS_MAP_CREATION_TIME; import static org.apache.bookkeeper.replication.ReplicationStats.CHECK_ALL_LEDGERS_TIME; +import static org.apache.bookkeeper.replication.ReplicationStats.NUM_AUDITOR_TASKS_RESCHEDULE_EMITTED; import static org.apache.bookkeeper.replication.ReplicationStats.NUM_BOOKIES_PER_LEDGER; import static org.apache.bookkeeper.replication.ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED; import static org.apache.bookkeeper.replication.ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED; @@ -177,6 +178,11 @@ public class AuditorStats { help = "the times of auditor check task skipped" ) private final Counter numSkippingCheckTaskTimes; + @StatsDoc( + name = NUM_BOOKIE_AUDITS_DELAYED, + help = "the number of auditor check tasks reschedule emitted" + ) + private final Counter numAuditorTasksRescheduleEmitted; public AuditorStats(StatsLogger statsLogger) { this.statsLogger = statsLogger; @@ -205,6 +211,7 @@ public AuditorStats(StatsLogger statsLogger) { .getCounter(ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED); numReplicatedLedgers = this.statsLogger.getCounter(NUM_REPLICATED_LEDGERS); numSkippingCheckTaskTimes = this.statsLogger.getCounter(NUM_SKIPPING_CHECK_TASK_TIMES); + numAuditorTasksRescheduleEmitted = this.statsLogger.getCounter(NUM_AUDITOR_TASKS_RESCHEDULE_EMITTED); numLedgersNotAdheringToPlacementPolicy = new Gauge() { @Override public Integer getDefaultValue() { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java index d6814edf094..9e81e1e26a6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java @@ -68,4 +68,5 @@ public interface ReplicationStats { String NUM_REPLICATED_LEDGERS = "NUM_REPLICATED_LEDGERS"; String NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED = "NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED"; String NUM_SKIPPING_CHECK_TASK_TIMES = "NUM_SKIPPING_CHECK_TASK_TIMES"; + String NUM_AUDITOR_TASKS_RESCHEDULE_EMITTED = "NUM_AUDITOR_TASKS_RESCHEDULE_EMITTED"; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java index 107708092f6..3e3ebb5f2aa 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java @@ -50,6 +50,7 @@ public class BookKeeperConstants { public static final String LAYOUT_ZNODE = "LAYOUT"; public static final String INSTANCEID = "INSTANCEID"; public static final String DISABLE_HEALTH_CHECK = "disableHealthCheck"; + public static final String SCHEDULE_AUDITOR_NODE = "scheduleAuditor"; /** * Set the max log size limit to 1GB. It makes extra room for entry log file before diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java index 18785a58b48..e4edad9d4da 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java @@ -47,7 +47,7 @@ public ForceAuditorChecksCmdTest() { */ @Test public void verifyAuditCTimeReset() throws Exception { - String[] argv = new String[] { "forceauditchecks", "-calc", "-ppc", "-rc" }; + String[] argv = new String[] { "forceauditchecks", "-calc", "-ppc", "-rc", "-f" }; long curTime = System.currentTimeMillis(); final ServerConfiguration conf = confByIndex(0); @@ -61,6 +61,9 @@ public void verifyAuditCTimeReset() throws Exception { urM.setCheckAllLedgersCTime(curTime); urM.setPlacementPolicyCheckCTime(curTime); urM.setReplicasCheckCTime(curTime); + + // check current not reschedule auditor task + Assert.assertFalse(urM.isAuditorTasksRescheduleEmit()); } catch (InterruptedException | ReplicationException e) { throw new UncheckedExecutionException(e); } @@ -86,6 +89,9 @@ public void verifyAuditCTimeReset() throws Exception { if (replicasCheckCTime > (curTime - (20 * 24 * 60 * 60 * 1000))) { Assert.fail("The replicasCheckCTime should have been reset to atleast 20 days old"); } + if (!urm.isAuditorTasksRescheduleEmit()) { + Assert.fail("The rescheduleTasks should have been emitted."); + } } catch (InterruptedException | ReplicationException e) { throw new UncheckedExecutionException(e); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java index 53e139b19e8..8a55fce09ec 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java @@ -898,6 +898,68 @@ public void testDelayBookieAuditOfReplicasCheck() throws Exception { auditor.close(); } + @Test + public void testRescheduleAuditorCheckTasks() throws Exception { + for (AuditorElector e : auditorElectors.values()) { + e.shutdown(); + } + + LedgerManagerFactory mFactory = driver.getLedgerManagerFactory(); + LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager(); + + ServerConfiguration servConf = new ServerConfiguration(confByIndex(0)); + + TestStatsProvider statsProvider = new TestStatsProvider(); + TestStatsLogger statsLogger = statsProvider.getStatsLogger(AUDITOR_SCOPE); + Counter numAuditorTasksRescheduleEmitted = + statsLogger.getCounter(ReplicationStats.NUM_AUDITOR_TASKS_RESCHEDULE_EMITTED); + + urm.setLostBookieRecoveryDelay(Integer.MAX_VALUE); + + AtomicBoolean canRun = new AtomicBoolean(true); + + final TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc, + false, statsLogger, canRun); + + // before auditor start we emit an old reschedule tasks. + urm.emitRescheduleAuditorTasks(); + assertTrue(urm.isAuditorTasksRescheduleEmit()); + auditor.start(); + + // verify before emit reschedule auditor tasks can be ok finished. + Awaitility.await().untilAsserted(() -> { + assertFalse(urm.isAuditorTasksRescheduleEmit()); + }); + + // verify before emit reschedule auditor tasks. + assertEquals("NUM_AUDITOR_TASKS_RESCHEDULE_EMITTED", 0, + (long) numAuditorTasksRescheduleEmitted.get()); + assertFalse(urm.isAuditorTasksRescheduleEmit()); + + // emit reschedule auditor check tasks + urm.emitRescheduleAuditorTasks(); + + Awaitility.await().untilAsserted(() -> { + assertEquals("NUM_AUDITOR_TASKS_RESCHEDULE_EMITTED", 1, + (long) numAuditorTasksRescheduleEmitted.get()); + }); + + // check finishedRescheduleAuditorTasks + assertFalse(urm.isAuditorTasksRescheduleEmit()); + + // Verify multiple emit reschedule auditor check tasks + urm.emitRescheduleAuditorTasks(); + Awaitility.await().untilAsserted(() -> { + assertEquals("NUM_AUDITOR_TASKS_RESCHEDULE_EMITTED", 2, + (long) numAuditorTasksRescheduleEmitted.get()); + }); + + // check finishedRescheduleAuditorTasks + assertFalse(urm.isAuditorTasksRescheduleEmit()); + + auditor.close(); + } + static class TestAuditor extends Auditor { final AtomicReference latchRef = new AtomicReference(new CountDownLatch(1));