Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1854,6 +1854,7 @@ class ForceAuditorChecksCmd extends MyCommand {
+ "upon next Auditor startup ");
opts.addOption("rc", "replicascheck", false, "Force replicasCheck audit "
+ "upon next Auditor startup ");
opts.addOption("f", "force", false, "Force re-schedule auditor task to run.");
}

@Override
Expand All @@ -1870,14 +1871,15 @@ String getDescription() {

@Override
String getUsage() {
return "forceauditchecks [-checkallledgerscheck [-placementpolicycheck] [-replicascheck]";
return "forceauditchecks [-checkallledgerscheck] [-placementpolicycheck] [-replicascheck] [-force]";
}

@Override
int runCmd(CommandLine cmdLine) throws Exception {
boolean checkAllLedgersCheck = cmdLine.hasOption("calc");
boolean placementPolicyCheck = cmdLine.hasOption("ppc");
boolean replicasCheck = cmdLine.hasOption("rc");
boolean forceScheduleTask = cmdLine.hasOption("f");

if (checkAllLedgersCheck || placementPolicyCheck || replicasCheck) {
runFunctionWithLedgerManagerFactory(bkConf, mFactory -> {
Expand All @@ -1898,6 +1900,10 @@ int runCmd(CommandLine cmdLine) throws Exception {
LOG.info("Resetting ReplicasCheckCTime to : " + new Timestamp(time));
underreplicationManager.setReplicasCheckCTime(time);
}
if (forceScheduleTask) {
LOG.info("Emitting Reschedule Auditor check tasks execution.");
underreplicationManager.emitRescheduleAuditorTasks();
}
}
} catch (InterruptedException | ReplicationException e) {
LOG.error("Exception while trying to reset last run time ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,39 @@ boolean isLedgerReplicationEnabled()
void notifyLedgerReplicationEnabled(GenericCallback<Void> cb)
throws ReplicationException.UnavailableException;

/**
* Emit Auditor tasks re-schedule.
*
*/
void emitRescheduleAuditorTasks()
throws ReplicationException.UnavailableException;

/**
* Finishing Auditor tasks re-schedule.
*
*/
void finishedRescheduleAuditorTasks()
throws ReplicationException.UnavailableException;

/**
* Check whether the Auditor tasks re-schedule is emitted or not. This will return
* true if the schedule task is emitted, otherwise return false.
*
* @return - return true if it is emitted otherwise return false
*/
boolean isAuditorTasksRescheduleEmit()
throws ReplicationException.UnavailableException;

/**
* Receive notification asynchronously when the schedule auditor tasks
* is emitted.
*
* @param cb
* - callback implementation to receive the notification
*/
void notifyRescheduleAuditorTasksChanged(GenericCallback<Void> cb)
throws ReplicationException.UnavailableException;

/**
* Creates the zNode for lostBookieRecoveryDelay with the specified value and returns true.
* If the node is already existing, then it returns false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,16 @@ public boolean isLedgerReplicationEnabled() {
@Override
public void notifyLedgerReplicationEnabled(GenericCallback<Void> cb) {}
@Override
public void emitRescheduleAuditorTasks() {}
@Override
public void finishedRescheduleAuditorTasks() {}
@Override
public boolean isAuditorTasksRescheduleEmit() {
return false;
}
@Override
public void notifyRescheduleAuditorTasksChanged(GenericCallback<Void> cb) {}
@Override
public boolean initializeLostBookieRecoveryDelay(int lostBookieRecoveryDelay) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,104 @@ public void process(WatchedEvent e) {
}
}

@Override
public void emitRescheduleAuditorTasks()
throws ReplicationException.UnavailableException {
List<ACL> zkAcls = ZkUtils.getACLs(conf);
if (LOG.isDebugEnabled()) {
LOG.debug("emitRescheduleAuditorTasks()");
}
try {
String znode = basePath + '/' + BookKeeperConstants.SCHEDULE_AUDITOR_NODE;
zkc.create(znode, "".getBytes(UTF_8), zkAcls, CreateMode.PERSISTENT);
LOG.info("Auto Schedule auditor tasks emitted!");
} catch (KeeperException.NodeExistsException ke) {
LOG.warn("Schedule auditor tasks is already emitted!", ke);
throw new ReplicationException.UnavailableException(
"Schedule auditor tasks is already emitted!", ke);
} catch (KeeperException ke) {
LOG.error("Exception while emitting auto schedule auditor tasks", ke);
throw ReplicationException.fromKeeperException("Exception while emitting auto schedule auditor tasks", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException(
"Interrupted while emitting auto schedule auditor tasks", ie);
}
}

@Override
public void finishedRescheduleAuditorTasks()
throws ReplicationException.UnavailableException {
if (LOG.isDebugEnabled()) {
LOG.debug("finishedRescheduleAuditorTasks()");
}
try {
zkc.delete(basePath + '/' + BookKeeperConstants.SCHEDULE_AUDITOR_NODE, -1);
LOG.info("Finished automatic schedule auditor tasks");
} catch (KeeperException.NoNodeException ke) {
LOG.warn("Schedule auditor tasks is already finished!", ke);
throw new ReplicationException.UnavailableException(
"Schedule auditor tasks is already finished!", ke);
} catch (KeeperException ke) {
LOG.error("Exception while finishing schedule auditor tasks", ke);
throw ReplicationException.fromKeeperException("Exception while finishing schedule auditor tasks", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException(
"Interrupted while finishing schedule auditor tasks", ie);
}
}

@Override
public boolean isAuditorTasksRescheduleEmit()
throws ReplicationException.UnavailableException {
if (LOG.isDebugEnabled()) {
LOG.debug("isAuditorTasksRescheduleEmit()");
}
try {
return null != zkc.exists(basePath + '/'
+ BookKeeperConstants.SCHEDULE_AUDITOR_NODE, false);
} catch (KeeperException ke) {
LOG.error("Error while checking the state of "
+ "auditor tasks schedule", ke);
throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException(
"Interrupted while contacting zookeeper", ie);
}
}

@Override
public void notifyRescheduleAuditorTasksChanged(final GenericCallback<Void> cb)
throws ReplicationException.UnavailableException {
if (LOG.isDebugEnabled()) {
LOG.debug("notifyRescheduleAuditorTasksChanged()");
}
Watcher w = new Watcher() {
@Override
public void process(WatchedEvent e) {
if (e.getType() == Event.EventType.NodeCreated) {
LOG.info("Schedule auditor tasks is emitted externally through Zookeeper, "
+ "since SCHEDULE_AUDITOR_NODE ZNode is created");
cb.operationComplete(0, null);
}
}
};
try {
zkc.addWatch(basePath + "/"
+ BookKeeperConstants.SCHEDULE_AUDITOR_NODE, w, AddWatchMode.PERSISTENT);
} catch (KeeperException ke) {
LOG.error("Error while checking the state of "
+ "schedule auditor tasks", ke);
throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException(
"Interrupted while contacting zookeeper", ie);
}
}

/**
* Check whether the ledger is being replicated by any bookie.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -55,6 +56,7 @@
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -88,6 +90,9 @@ public class Auditor implements AutoCloseable {
protected AuditorTask auditorPlacementPolicyCheckTask;
protected AuditorTask auditorReplicasCheckTask;
private final List<AuditorTask> allAuditorTasks = Lists.newArrayList();
protected volatile ScheduledFuture<?> auditorCheckAllLedgersTaskFuture;
protected volatile ScheduledFuture<?> auditorPlacementPolicyCheckTaskFuture;
protected volatile ScheduledFuture<?> auditorReplicasCheckTaskFuture;

private final AuditorStats auditorStats;

Expand Down Expand Up @@ -379,6 +384,63 @@ synchronized Future<?> submitLostBookieRecoveryDelayChangedEvent() {
});
}

synchronized Future<?> submitRescheduleAuditorTasksChangedEvent() {
if (executor.isShutdown()) {
SettableFuture<Void> f = SettableFuture.<Void>create();
f.setException(new BKAuditException("Auditor shutting down"));
return f;
}
return executor.submit(() -> {
boolean reScheduleEmit = true;
try {
waitIfLedgerReplicationDisabled();
reScheduleEmit = Auditor.this.ledgerUnderreplicationManager.isAuditorTasksRescheduleEmit();
if (reScheduleEmit) {
if (auditorCheckAllLedgersTaskFuture != null
&& !auditorCheckAllLedgersTaskFuture.isCancelled()) {
LOG.info("RescheduleAuditorTasks has been emitted "
+ "so canceling the pending auditorCheckAllLedgersTask");
auditorCheckAllLedgersTaskFuture.cancel(false);
}
if (auditorPlacementPolicyCheckTaskFuture != null
&& !auditorPlacementPolicyCheckTaskFuture.isCancelled()) {
LOG.info("RescheduleAuditorTasks has been emitted "
+ "so canceling the pending auditorPlacementPolicyCheckTask");
auditorPlacementPolicyCheckTaskFuture.cancel(false);
}
if (auditorReplicasCheckTaskFuture != null
&& !auditorReplicasCheckTaskFuture.isCancelled()) {
LOG.info("RescheduleAuditorTasks has been emitted "
+ "so canceling the pending auditorReplicasCheckTask");
auditorReplicasCheckTaskFuture.cancel(false);
}

scheduleCheckAllLedgersTask();
schedulePlacementPolicyCheckTask();
scheduleReplicasCheckTask();
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while for LedgersReplication to be enabled ", ie);
} catch (ReplicationException.NonRecoverableReplicationException nre) {
LOG.error("Non Recoverable Exception while reading from ZK", nre);
submitShutdownTask();
} catch (UnavailableException ue) {
LOG.error("Exception while reading from ZK", ue);
} finally {
if (reScheduleEmit) {
auditorStats.getNumAuditorTasksRescheduleEmitted().inc();
try {
okFinishedRescheduleAuditorTasks();
} catch (UnavailableException e) {
LOG.error("Exception while finished schedule tasks ", e);
submitShutdownTask();
}
}
}
});
}

public void start() {
LOG.info("I'm starting as Auditor Bookie. ID: {}", bookieIdentifier);
// on startup watching available bookie and based on the
Expand All @@ -395,6 +457,11 @@ public void start() {
.notifyLostBookieRecoveryDelayChanged(new LostBookieRecoveryDelayChangedCb());
this.ledgerUnderreplicationManager.notifyUnderReplicationLedgerChanged(
new UnderReplicatedLedgersChangedCb());
// We always finished earlier triggered Reschedule auditor tasks when Auditor starting.
// Because we can directly schedule tasks when the Auditor starts.
okFinishedRescheduleAuditorTasks();
this.ledgerUnderreplicationManager
.notifyRescheduleAuditorTasksChanged(new RescheduleAuditorTasksChangedCb());
} catch (BKException bke) {
LOG.error("Couldn't get bookie list, so exiting", bke);
submitShutdownTask();
Expand All @@ -412,6 +479,20 @@ public void start() {
}
}

private void okFinishedRescheduleAuditorTasks() throws UnavailableException {
try {
Auditor.this.ledgerUnderreplicationManager.finishedRescheduleAuditorTasks();
} catch (UnavailableException e) {
if (e.getCause() != null && e.getCause() instanceof KeeperException.NoNodeException) {
if (LOG.isDebugEnabled()) {
LOG.debug("NoNode is ok while finished schedule tasks.");
}
} else {
throw e;
}
}
}

protected void submitBookieCheckTask() {
executor.submit(auditorBookieCheckTask);
}
Expand Down Expand Up @@ -466,7 +547,8 @@ private void scheduleCheckAllLedgersTask() {
+ "durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}",
checkAllLedgersLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval);

executor.scheduleAtFixedRate(auditorCheckAllLedgersTask, initialDelay, interval, TimeUnit.SECONDS);
auditorCheckAllLedgersTaskFuture =
executor.scheduleAtFixedRate(auditorCheckAllLedgersTask, initialDelay, interval, TimeUnit.SECONDS);
} else {
LOG.info("Periodic checking disabled");
}
Expand Down Expand Up @@ -510,7 +592,9 @@ private void schedulePlacementPolicyCheckTask() {
+ "durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}",
placementPolicyCheckLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval);

executor.scheduleAtFixedRate(auditorPlacementPolicyCheckTask, initialDelay, interval, TimeUnit.SECONDS);
auditorPlacementPolicyCheckTaskFuture =
executor.scheduleAtFixedRate(
auditorPlacementPolicyCheckTask, initialDelay, interval, TimeUnit.SECONDS);
} else {
LOG.info("Periodic placementPolicy check disabled");
}
Expand Down Expand Up @@ -555,7 +639,8 @@ private void scheduleReplicasCheckTask() {
+ "durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}",
replicasCheckLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval);

executor.scheduleAtFixedRate(auditorReplicasCheckTask, initialDelay, interval, TimeUnit.SECONDS);
auditorReplicasCheckTaskFuture =
executor.scheduleAtFixedRate(auditorReplicasCheckTask, initialDelay, interval, TimeUnit.SECONDS);
}

private class UnderReplicatedLedgersChangedCb implements GenericCallback<Void> {
Expand Down Expand Up @@ -584,6 +669,22 @@ public void operationComplete(int rc, Void result) {
}
}

private class RescheduleAuditorTasksChangedCb implements GenericCallback<Void> {
@Override
public void operationComplete(int rc, Void result) {
try {
Auditor.this.ledgerUnderreplicationManager
.notifyRescheduleAuditorTasksChanged(RescheduleAuditorTasksChangedCb.this);
} catch (ReplicationException.NonRecoverableReplicationException nre) {
LOG.error("Non Recoverable Exception while reading from ZK", nre);
submitShutdownTask();
} catch (UnavailableException ae) {
LOG.error("Exception while registering for a RescheduleAuditorTasks notification", ae);
}
Auditor.this.submitRescheduleAuditorTasksChangedEvent();
}
}

private void waitIfLedgerReplicationDisabled() throws UnavailableException,
InterruptedException {
if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
Expand Down
Loading