From 9264f2d3404f0de844854b6d5f694dfe9a5396c2 Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Thu, 24 Oct 2024 09:25:01 +0800 Subject: [PATCH] ZOOKEEPER-4883: Rollover leader epoch when zxid counter is exhausted The rollover procedure: 1. Treats last proposal of an epoch as rollover proposal. 2. Requests from next epoch are proposed normally. 3. Fences next epoch once rollover proposal persisted. 4. Proposals from next epoch will not be written to disk before rollover committed. 5. Leader commits rollover proposal once it get quorum ACKs. 6. Blocked new epoch proposals are logged once rollover proposal is committed in corresponding nodes. This results in: 1. No other leader cloud lead using next epoch number once rollover proposal is considered committed. 2. No proposals from next epoch will be written to disk before rollover proposal is considered committed. Refs: ZOOKEEPER-1277, ZOOKEEPER-2789, ZOOKEEPER-4870, ZOOKEEPER-4882, ZOOKEEPER-4570 and ZOOKEEPER-4571 --- .../org/apache/zookeeper/server/Request.java | 5 + .../server/SyncRequestProcessor.java | 22 +- .../zookeeper/server/quorum/Follower.java | 3 +- .../quorum/FollowerZooKeeperServer.java | 17 +- .../zookeeper/server/quorum/Leader.java | 68 +- .../server/quorum/LeaderZooKeeperServer.java | 28 +- .../zookeeper/server/quorum/Learner.java | 15 + .../server/quorum/LearnerHandler.java | 2 + .../quorum/ParticipantRequestSyncer.java | 68 ++ .../quorum/ProposalRequestProcessor.java | 24 +- .../zookeeper/server/quorum/QuorumPeer.java | 4 +- .../server/quorum/QuorumZooKeeperServer.java | 37 + .../zookeeper/server/util/ZxidUtils.java | 4 + .../zookeeper/server/ZxidRolloverTest.java | 42 +- .../server/quorum/LeaderBeanTest.java | 6 +- .../server/quorum/RaceConditionTest.java | 21 +- .../zookeeper/server/quorum/Zab1_0Test.java | 2 +- .../server/quorum/ZxidRolloverCrashTest.java | 969 ++++++++++++++++++ .../org/apache/zookeeper/test/ClientBase.java | 10 +- .../org/apache/zookeeper/test/QuorumUtil.java | 46 +- 20 files changed, 1270 insertions(+), 123 deletions(-) create mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ParticipantRequestSyncer.java create mode 100644 zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ZxidRolloverCrashTest.java diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java index 8b61091719d..e26e6edd118 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java @@ -34,6 +34,7 @@ import org.apache.zookeeper.server.quorum.LearnerHandler; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.apache.zookeeper.server.util.AuthUtil; +import org.apache.zookeeper.server.util.ZxidUtils; import org.apache.zookeeper.txn.TxnDigest; import org.apache.zookeeper.txn.TxnHeader; import org.slf4j.Logger; @@ -377,6 +378,10 @@ public boolean isQuorum() { } } + public boolean isRollover() { + return isQuorum() && zxid > 0 && ZxidUtils.isLastEpochZxid(zxid); + } + public static String op2String(int op) { switch (op) { case OpCode.notification: diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java index 0dc1a86860e..a3e172d15b7 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java @@ -29,6 +29,9 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.server.quorum.ObserverZooKeeperServer; +import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer; +import org.apache.zookeeper.server.util.ZxidUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -205,7 +208,7 @@ public void run() { // iff this is a read or a throttled request(which doesn't need to be written to the disk), // and there are no pending flushes (writes), then just pass this to the next processor if (nextProcessor != null) { - nextProcessor.processRequest(si); + handover(si); if (nextProcessor instanceof Flushable) { ((Flushable) nextProcessor).flush(); } @@ -213,7 +216,7 @@ public void run() { continue; } toFlush.add(si); - if (shouldFlush()) { + if (si.isRollover() || shouldFlush()) { flush(); } ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime); @@ -224,6 +227,19 @@ public void run() { LOG.info("SyncRequestProcessor exited!"); } + private void handover(Request request) throws IOException, RequestProcessorException { + if (request.isRollover() && zks instanceof QuorumZooKeeperServer) { + long nextEpoch = ZxidUtils.getEpochFromZxid(request.zxid) + 1; + // Fences upcoming epoch in leader election. So there will be no chance for other peer + // to lead next epoch if this request is considered committed. + ((QuorumZooKeeperServer) zks).fenceRolloverEpoch(nextEpoch); + if (zks instanceof ObserverZooKeeperServer) { + ((ObserverZooKeeperServer) zks).confirmRolloverEpoch(nextEpoch); + } + } + nextProcessor.processRequest(request); + } + private void flush() throws IOException, RequestProcessorException { if (this.toFlush.isEmpty()) { return; @@ -242,7 +258,7 @@ private void flush() throws IOException, RequestProcessorException { final Request i = this.toFlush.remove(); long latency = Time.currentElapsedTime() - i.syncQueueStartTime; ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency); - this.nextProcessor.processRequest(i); + handover(i); } if (this.nextProcessor instanceof Flushable) { ((Flushable) this.nextProcessor).flush(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java index ca99974cb52..baf7fd865cb 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java @@ -48,7 +48,8 @@ public class Follower extends Learner { ObserverMaster om; - Follower(final QuorumPeer self, final FollowerZooKeeperServer zk) { + // VisibleForTesting + public Follower(final QuorumPeer self, final FollowerZooKeeperServer zk) { this.self = Objects.requireNonNull(self); this.fzk = Objects.requireNonNull(zk); this.zk = zk; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java index 1b0b5cd9276..81a9dd31f64 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java @@ -47,6 +47,10 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer { private static final Logger LOG = LoggerFactory.getLogger(FollowerZooKeeperServer.class); + // This should be final as it is constructed with no external variables. It is not to allow mockito spy which + // intercepts `this`. + private ParticipantRequestSyncer requestSyncer; + /* * Pending sync requests */ ConcurrentLinkedQueue pendingSyncs; @@ -54,7 +58,8 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer { /** * @throws IOException */ - FollowerZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException { + // VisibleForTesting + public FollowerZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException { super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, self.clientPortListenBacklog, zkDb, self); this.pendingSyncs = new ConcurrentLinkedQueue<>(); } @@ -72,14 +77,17 @@ protected void setupRequestProcessors() { ((FollowerRequestProcessor) firstProcessor).start(); syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower())); syncProcessor.start(); + requestSyncer = new ParticipantRequestSyncer(this, LOG, this::syncRequest); } LinkedBlockingQueue pendingTxns = new LinkedBlockingQueue<>(); public void logRequest(Request request) { - if ((request.zxid & 0xffffffffL) != 0) { - pendingTxns.add(request); - } + requestSyncer.syncRequest(request); + } + + private void syncRequest(Request request) { + pendingTxns.add(request); syncProcessor.processRequest(request); } @@ -110,6 +118,7 @@ public void commit(long zxid) { Request request = pendingTxns.remove(); request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY); commitProcessor.commit(request); + requestSyncer.finishCommit(request.zxid); } public synchronized void sync() { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java index 1561a7dae18..eb39a5428ff 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java @@ -1025,14 +1025,13 @@ public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress fol // we're sending the designated leader, and if the leader is changing the followers are // responsible for closing the connection - this way we are sure that at least a majority of them // receive the commit message. - commitAndActivate(zxid, designatedLeader); + commitAndActivate(p, designatedLeader); informAndActivate(p, designatedLeader); } else { p.request.logLatency(ServerMetrics.getMetrics().QUORUM_ACK_LATENCY); - commit(zxid); + commit(p); inform(p); } - zk.commitProcessor.commit(p.request); if (pendingSyncs.containsKey(zxid)) { for (LearnerSyncRequest r : pendingSyncs.remove(zxid)) { sendSync(r); @@ -1065,16 +1064,7 @@ public synchronized void processAck(long sid, long zxid, SocketAddress followerA LOG.trace("outstanding proposals all"); } - if ((zxid & 0xffffffffL) == 0) { - /* - * We no longer process NEWLEADER ack with this method. However, - * the learner sends an ack back to the leader after it gets - * UPTODATE, so we just ignore the message. - */ - return; - } - - if (outstandingProposals.size() == 0) { + if (outstandingProposals.isEmpty()) { LOG.debug("outstanding is 0"); return; } @@ -1212,25 +1202,30 @@ void sendObserverPacket(QuorumPacket qp) { long lastCommitted = -1; /** - * Create a commit packet and send it to all the members of the quorum - * - * @param zxid + * Commit proposal to all connected followers including itself. */ - public void commit(long zxid) { + public void commit(Proposal p) { + long zxid = p.getZxid(); synchronized (this) { lastCommitted = zxid; } + + zk.commit(p.request); + QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null); sendPacket(qp); ServerMetrics.getMetrics().COMMIT_COUNT.add(1); } //commit and send some info - public void commitAndActivate(long zxid, long designatedLeader) { + public void commitAndActivate(Proposal p, long designatedLeader) { + long zxid = p.getZxid(); synchronized (this) { lastCommitted = zxid; } + zk.commit(p.request); + byte[] data = new byte[8]; ByteBuffer buffer = ByteBuffer.wrap(data); buffer.putLong(designatedLeader); @@ -1277,35 +1272,17 @@ public long getEpoch() { return ZxidUtils.getEpochFromZxid(lastProposed); } - @SuppressWarnings("serial") - public static class XidRolloverException extends Exception { - - public XidRolloverException(String message) { - super(message); - } - - } - /** * create a proposal and send it out to all the members * * @param request * @return the proposal that is queued to send to all the members */ - public Proposal propose(Request request) throws XidRolloverException { + public Proposal propose(Request request) { if (request.isThrottled()) { LOG.error("Throttled request send as proposal: {}. Exiting.", request); ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue()); } - /** - * Address the rollover issue. All lower 32bits set indicate a new leader - * election. Force a re-election instead. See ZOOKEEPER-1277 - */ - if ((request.zxid & 0xffffffffL) == 0xffffffffL) { - String msg = "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start"; - shutdown(msg); - throw new XidRolloverException(msg); - } byte[] data = request.getSerializeData(); proposalStats.setLastBufferSize(data.length); @@ -1331,6 +1308,7 @@ public Proposal propose(Request request) throws XidRolloverException { sendPacket(pp); } ServerMetrics.getMetrics().PROPOSAL_COUNT.add(1); + zk.logRequest(request); return p; } @@ -1465,6 +1443,22 @@ public void reportLookingSid(long sid) { } } + /** + * Comparing to {@link #getEpochToPropose(long, long)}, this method does not bump `acceptedEpoch` + * as the rollover txn may not be persisted yet. + */ + public void rolloverLeaderEpoch(long newEpoch) { + synchronized (connectingFollowers) { + if (waitingForNewEpoch) { + throw new IllegalStateException("ZAB is still waiting new epoch"); + } else if (newEpoch != epoch + 1) { + String msg = String.format("can not rollover leader epoch to %s, current epoch is %s", newEpoch, epoch); + throw new IllegalArgumentException(msg); + } + epoch = newEpoch; + } + } + @Override public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException { synchronized (connectingFollowers) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java index 799b8f96148..ab93cfe1273 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java @@ -33,8 +33,11 @@ import org.apache.zookeeper.server.RequestProcessor; import org.apache.zookeeper.server.ServerCnxn; import org.apache.zookeeper.server.ServerMetrics; +import org.apache.zookeeper.server.SyncRequestProcessor; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @@ -44,6 +47,7 @@ * FinalRequestProcessor */ public class LeaderZooKeeperServer extends QuorumZooKeeperServer { + private static final Logger LOG = LoggerFactory.getLogger(LeaderZooKeeperServer.class); private ContainerManager containerManager; // guarded by sync @@ -51,6 +55,10 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer { PrepRequestProcessor prepRequestProcessor; + SyncRequestProcessor syncProcessor; + private final ParticipantRequestSyncer requestSyncer = + new ParticipantRequestSyncer(this, LOG, r -> syncProcessor.processRequest(r)); + /** * @throws IOException */ @@ -68,8 +76,10 @@ protected void setupRequestProcessors() { RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader()); commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener()); commitProcessor.start(); + AckRequestProcessor ackProcessor = new AckRequestProcessor(getLeader()); + syncProcessor = new SyncRequestProcessor(this, ackProcessor); + syncProcessor.start(); ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor); - proposalProcessor.initialize(); prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor); prepRequestProcessor.start(); firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor); @@ -159,6 +169,7 @@ protected synchronized void shutdownComponents() { if (containerManager != null) { containerManager.stop(); } + syncProcessor.shutdown(); super.shutdownComponents(); } @@ -169,6 +180,21 @@ public int getGlobalOutstandingLimit() { return globalOutstandingLimit; } + @Override + public void confirmRolloverEpoch(long newEpoch) { + getLeader().rolloverLeaderEpoch(newEpoch); + super.confirmRolloverEpoch(newEpoch); + } + + public void logRequest(Request request) { + requestSyncer.syncRequest(request); + } + + public void commit(Request request) { + commitProcessor.commit(request); + requestSyncer.finishCommit(request.zxid); + } + @Override public void createSessionTracker() { sessionTracker = new LeaderSessionTracker( diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index adf0ef6e510..a6583df2bd2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -825,6 +825,21 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); writePacket(ack, true); zk.startup(); + + long lastCommittedZxid = zk.getLastProcessedZxid(); + long lastCommittedEpoch = ZxidUtils.getEpochFromZxid(lastCommittedZxid); + if (ZxidUtils.isLastEpochZxid(lastCommittedZxid)) { + lastCommittedEpoch += 1; + } + LOG.debug("lastCommittedZxid {}, lastCommittedEpoch {} newEpoch {}", + Long.toHexString(lastCommittedZxid), lastCommittedEpoch, newEpoch); + if (lastCommittedEpoch > newEpoch) { + LOG.info("Switch to new leader epoch {} from {}", lastCommittedEpoch, newEpoch); + newEpoch = lastCommittedEpoch; + self.setAcceptedEpoch(newEpoch); + self.setCurrentEpoch(newEpoch); + } + /* * Update the election vote here to ensure that all members of the * ensemble report the same vote to new servers that start up and diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java index 57947daa86c..d072dabc954 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java @@ -587,6 +587,8 @@ public void run() { ServerMetrics.getMetrics().SNAP_COUNT.add(1); } } else { + LOG.info("Sending diffs last zxid of peer is 0x{}, zxid of leader is 0x{}", + Long.toHexString(peerLastZxid), Long.toHexString(leaderLastZxid)); syncThrottler = learnerMaster.getLearnerDiffSyncThrottler(); syncThrottler.beginSync(exemptFromThrottle); ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress()); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ParticipantRequestSyncer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ParticipantRequestSyncer.java new file mode 100644 index 00000000000..2ed54014422 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ParticipantRequestSyncer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.util.ZxidUtils; +import org.slf4j.Logger; + +public class ParticipantRequestSyncer { + private QuorumZooKeeperServer server; + private final Logger log; + private final Consumer syncRequest; + private long nextEpoch = 0; + private final List nextEpochTxns = new ArrayList<>(); + + public ParticipantRequestSyncer(QuorumZooKeeperServer server, Logger log, Consumer syncRequest) { + this.server = server; + this.log = log; + this.syncRequest = syncRequest; + } + + public void syncRequest(Request request) { + if (nextEpoch != 0) { + // We can't persist requests from new leader epoch for now, as the new leader epoch + // has not been committed yet. Otherwise, we could run into inconsistent if another + // peer wins election and leads the same epoch. + // + // See also https://issues.apache.org/jira/browse/ZOOKEEPER-1277?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13149973 + log.debug("Block request(zxid: {}) on new leader epoch {}.", Long.toHexString(request.zxid), nextEpoch); + nextEpochTxns.add(request); + return; + } + if (ZxidUtils.isLastEpochZxid(request.zxid)) { + nextEpoch = ZxidUtils.getEpochFromZxid(request.zxid) + 1; + log.info("Receive last epoch zxid {}, preparing next epoch {}", Long.toHexString(request.zxid), nextEpoch); + } + syncRequest.accept(request); + } + + public void finishCommit(long zxid) { + if (ZxidUtils.isLastEpochZxid(zxid)) { + log.info("Switch to new leader epoch {}", nextEpoch); + server.confirmRolloverEpoch(nextEpoch); + nextEpoch = 0; + nextEpochTxns.forEach(syncRequest); + nextEpochTxns.clear(); + } + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java index c1e2fe16e43..41ae330d99b 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java @@ -21,14 +21,11 @@ import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; import org.apache.zookeeper.server.ServerMetrics; -import org.apache.zookeeper.server.SyncRequestProcessor; -import org.apache.zookeeper.server.quorum.Leader.XidRolloverException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * This RequestProcessor simply forwards requests to an AckRequestProcessor and - * SyncRequestProcessor. + * This RequestProcessor simply forwards requests to {@link Leader#propose(Request)}. */ public class ProposalRequestProcessor implements RequestProcessor { @@ -38,8 +35,6 @@ public class ProposalRequestProcessor implements RequestProcessor { RequestProcessor nextProcessor; - SyncRequestProcessor syncProcessor; - // If this property is set, requests from Learners won't be forwarded // to the CommitProcessor in order to save resources public static final String FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED = @@ -49,8 +44,6 @@ public class ProposalRequestProcessor implements RequestProcessor { public ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) { this.zks = zks; this.nextProcessor = nextProcessor; - AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader()); - syncProcessor = new SyncRequestProcessor(zks, ackProcessor); forwardLearnerRequestsToCommitProcessorDisabled = Boolean.getBoolean( FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED); @@ -58,13 +51,6 @@ public ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor next forwardLearnerRequestsToCommitProcessorDisabled); } - /** - * initialize this processor - */ - public void initialize() { - syncProcessor.start(); - } - public void processRequest(Request request) throws RequestProcessorException { /* In the following IF-THEN-ELSE block, we process syncs on the leader. * If the sync is coming from a follower, then the follower @@ -81,12 +67,7 @@ public void processRequest(Request request) throws RequestProcessorException { } if (request.getHdr() != null) { // We need to sync and get consensus on any transactions - try { - zks.getLeader().propose(request); - } catch (XidRolloverException e) { - throw new RequestProcessorException(e.getMessage(), e); - } - syncProcessor.processRequest(request); + zks.getLeader().propose(request); } } } @@ -94,7 +75,6 @@ public void processRequest(Request request) throws RequestProcessorException { public void shutdown() { LOG.info("Shutting down"); nextProcessor.shutdown(); - syncProcessor.shutdown(); } private boolean shouldForwardToNextProcessor(Request request) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java index 9a18d59d401..23e9d8ca0b7 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -1235,7 +1235,7 @@ private void loadDataBase() { } else { throw new IOException( "The current epoch, " + ZxidUtils.zxidToString(currentEpoch) - + ", is older than the last zxid, " + lastProcessedZxid); + + ", is older than the last zxid, " + Long.toHexString(lastProcessedZxid)); } } try { @@ -2263,7 +2263,7 @@ public void setZKDatabase(ZKDatabase database) { this.zkDb = database; } - protected ZKDatabase getZkDb() { + public ZKDatabase getZkDb() { return zkDb; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java index 240936956fc..a70e5317859 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java @@ -122,6 +122,43 @@ private Request makeUpgradeRequest(long sessionId) { return null; } + /** + * This must only be called after the rollover txn has been persisted as it bumps `currentEpoch`. + * + *

Note that it is possible for leader/follower to commit a proposal while it has not yet persisted + * that proposal. + * + *

See {@link Learner#registerWithLeader(int)} and {@link Leader#getEpochToPropose(long, long)} for details + * of {@link org.apache.zookeeper.server.quorum.QuorumPeer.ZabState#DISCOVERY} phase. + * + *

See {@link Learner#syncWithLeader(long)} for details of + * {@link org.apache.zookeeper.server.quorum.QuorumPeer.ZabState#SYNCHRONIZATION} phase. + */ + public void fenceRolloverEpoch(long newEpoch) throws IOException { + // Once majority nodes increase their `acceptedEpoch`, the `newEpoch` is fenced in future elections. + self.setAcceptedEpoch(newEpoch); + + // There is no DIFF to synchronize. + // + // It is erroneous to bump `currentEpoch` before persisting the rollover proposal. + // + // 1. Leader is able to commit a proposal even if it has not yet persisted the proposal. + // 2. `peerEpoch`(a.k.a. `currentEpoch`) take higher priority than `lastLoggedZxid`. + // + // So, if `currentEpoch` is bumped before persisting the rollover proposal, restarted + // leader could win election and truncate committed proposals in other nodes. + // + // The above applies to followers also. + self.setCurrentEpoch(newEpoch); + } + + public void confirmRolloverEpoch(long newEpoch) { + // Quorum confirms this, there is no chance for others to rule + // this new epoch. So it is safe to broadcast this fact, even + // though we may not persist rollover txn yet. + self.updateElectionVote(newEpoch); + } + /** * Implements the SessionUpgrader interface, * diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ZxidUtils.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ZxidUtils.java index b3b6935d5c7..5f80b8e0f39 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ZxidUtils.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ZxidUtils.java @@ -19,6 +19,7 @@ package org.apache.zookeeper.server.util; public class ZxidUtils { + public static final long MAX_COUNTER = 0x00000000ffffffffL; public static long getEpochFromZxid(long zxid) { return zxid >> 32L; @@ -33,4 +34,7 @@ public static String zxidToString(long zxid) { return Long.toHexString(zxid); } + public static boolean isLastEpochZxid(long zxid) { + return getCounterFromZxid(zxid) == MAX_COUNTER; + } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java index 495c1507e43..3a992e5d00a 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java @@ -18,6 +18,7 @@ package org.apache.zookeeper.server; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -26,8 +27,9 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.ConnectionLossException; import org.apache.zookeeper.ZKTestCase; -import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.test.ClientBase; import org.apache.zookeeper.test.ClientBase.CountdownWatcher; import org.apache.zookeeper.test.ClientTest; @@ -230,18 +232,14 @@ public void tearDown() throws Exception { */ private int createNodes(ZooKeeper zk, int start, int count) throws Exception { LOG.info("Creating nodes {} thru {}", start, (start + count)); - int j = 0; - try { - for (int i = start; i < start + count; i++) { - zk.create("/foo" + i, new byte[0], Ids.READ_ACL_UNSAFE, CreateMode.EPHEMERAL); - j++; - } - } catch (ConnectionLossException e) { - // this is ok - the leader has dropped leadership - waitForClientsConnected(); + for (int i = start; i < start + count; i++) { + Stat stat = new Stat(); + zk.create("/foo" + i, new byte[0], ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.EPHEMERAL, stat); + LOG.info("STAT: {}", Long.toHexString(stat.getCzxid())); } - return j; + return count; } + /** * Verify the expected znodes were created and that the last znode, which * caused the roll-over, did not. @@ -249,15 +247,15 @@ private int createNodes(ZooKeeper zk, int start, int count) throws Exception { private void checkNodes(ZooKeeper zk, int start, int count) throws Exception { LOG.info("Validating nodes {} thru {}", start, (start + count)); for (int i = start; i < start + count; i++) { - assertNotNull(zk.exists("/foo" + i, false)); - LOG.error("Exists zxid:{}", Long.toHexString(zk.exists("/foo" + i, false).getCzxid())); + Stat stat = zk.exists("/foo" + i, false); + assertNotNull(stat); + LOG.info("Exists zxid:{}", Long.toHexString(stat.getCzxid())); } assertNull(zk.exists("/foo" + (start + count), false)); } /** - * Prior to the fix this test would hang for a while, then fail with - * connection loss. + * Verify that operations during zxid rollover should be fine. */ @Test public void testSimpleRolloverFollower() throws Exception { @@ -267,6 +265,8 @@ public void testSimpleRolloverFollower() throws Exception { int countCreated = createNodes(zk, 0, 10); checkNodes(zk, 0, countCreated); + + assertEquals(10, countCreated); } /** @@ -310,8 +310,7 @@ public void testRolloverThenRestart() throws Exception { countCreated += createNodes(zk, countCreated, 10); // sanity check - assertTrue(countCreated > 0); - assertTrue(countCreated < 60); + assertEquals(60, countCreated); } /** @@ -352,8 +351,7 @@ public void testRolloverThenFollowerRestart() throws Exception { countCreated += createNodes(zk, countCreated, 10); // sanity check - assertTrue(countCreated > 0); - assertTrue(countCreated < 60); + assertEquals(60, countCreated); } /** @@ -397,8 +395,7 @@ public void testRolloverThenLeaderRestart() throws Exception { countCreated += createNodes(zk, countCreated, 10); // sanity check - assertTrue(countCreated > 0); - assertTrue(countCreated < 50); + assertEquals(50, countCreated); } /** @@ -444,8 +441,7 @@ public void testMultipleRollover() throws Exception { countCreated += createNodes(zk, countCreated, 10); // sanity check - assertTrue(countCreated > 0); - assertTrue(countCreated < 70); + assertEquals(70, countCreated); } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java index 9c4e5214d4c..225e4110c52 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java @@ -44,6 +44,7 @@ import org.apache.zookeeper.PortAssignment; import org.apache.zookeeper.common.X509Exception; import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.SyncRequestProcessor; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; @@ -93,6 +94,7 @@ public void setUp() throws IOException, X509Exception { ZKDatabase zkDb = new ZKDatabase(fileTxnSnapLog); zks = new LeaderZooKeeperServer(fileTxnSnapLog, qp, zkDb); + zks.syncProcessor = mock(SyncRequestProcessor.class); leader = new Leader(qp, zks); leaderBean = new LeaderBean(leader, zks); } @@ -135,7 +137,7 @@ public void testGetElectionTimeTaken() { } @Test - public void testGetProposalSize() throws IOException, Leader.XidRolloverException { + public void testGetProposalSize() throws IOException { // Arrange Request req = createMockRequest(); @@ -150,7 +152,7 @@ public void testGetProposalSize() throws IOException, Leader.XidRolloverExceptio } @Test - public void testResetProposalStats() throws IOException, Leader.XidRolloverException { + public void testResetProposalStats() throws IOException { // Arrange int initialProposalSize = leaderBean.getLastProposalSize(); Request req = createMockRequest(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java index 5216eb70324..1341ca87310 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java @@ -196,8 +196,10 @@ protected void setupRequestProcessors() { RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader()); commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener()); commitProcessor.start(); - ProposalRequestProcessor proposalProcessor = new MockProposalRequestProcessor(this, commitProcessor); - proposalProcessor.initialize(); + AckRequestProcessor ackProcessor = new AckRequestProcessor(this.getLeader()); + syncProcessor = new MockSyncRequestProcessor(this, ackProcessor); + syncProcessor.start(); + ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor); prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor); prepRequestProcessor.start(); firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor); @@ -229,21 +231,6 @@ public void shutdown() { } - private static class MockProposalRequestProcessor extends ProposalRequestProcessor { - - public MockProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) { - super(zks, nextProcessor); - - /** - * The only purpose here is to inject the mocked - * SyncRequestProcessor - */ - AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader()); - syncProcessor = new MockSyncRequestProcessor(zks, ackProcessor); - } - - } - private static class MockTestQPMain extends TestQPMain { @Override diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java index d374062e293..1b788b2097a 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -987,7 +987,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) thro @Test public void testTxnTimeout(@TempDir File testData) throws Exception { testLeaderConversation(new LeaderConversation() { - public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException, InterruptedException, org.apache.zookeeper.server.quorum.Leader.XidRolloverException { + public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException, InterruptedException { assertEquals(0, l.self.getAcceptedEpoch()); assertEquals(0, l.self.getCurrentEpoch()); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ZxidRolloverCrashTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ZxidRolloverCrashTest.java new file mode 100644 index 00000000000..f7df376cfb0 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ZxidRolloverCrashTest.java @@ -0,0 +1,969 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import java.io.IOException; +import java.net.SocketAddress; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.common.X509Exception; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.DataTree; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.server.util.ZxidUtils; +import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.test.QuorumUtil; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ZxidRolloverCrashTest extends ZKTestCase { + private static final Logger LOG = LoggerFactory.getLogger(ZxidRolloverCrashTest.class); + private static final long MAX_ZXID_COUNTER = ZxidUtils.MAX_COUNTER; + private static final int CONNECTION_TIMEOUT = ClientBase.CONNECTION_TIMEOUT; + + QuorumUtil qu; + + @BeforeEach + public void setUp() { + // write and sync + System.setProperty("zookeeper.maxBatchSize", "1"); + // allow discontinuous proposals so we can reset zxid and rollover epoch + System.setProperty("zookeeper.test.allowDiscontinuousProposals", "true"); + } + + @AfterEach + public void tearDown() { + System.clearProperty("zookeeper.test.allowDiscontinuousProposals"); + if (qu != null) { + qu.shutdownAll(); + } + } + + private long setZxidCounter(long zxid, long counter) { + return ZxidUtils.makeZxid(ZxidUtils.getEpochFromZxid(zxid), counter); + } + + private long maximizeZxid(long zxid) { + return setZxidCounter(zxid, MAX_ZXID_COUNTER); + } + + static class LeaderContext { + private final AtomicReference leader = new AtomicReference<>(); + private final AtomicLong zxid = new AtomicLong(Long.MAX_VALUE); + private final AtomicInteger acks = new AtomicInteger(); + private final CompletableFuture completed = new CompletableFuture<>(); + + Leader setLeader(Leader leader) { + this.leader.compareAndSet(null, leader); + return leader; + } + + boolean isLeader(Leader leader) { + return this.leader.get() == leader; + } + + int incAcks() { + return acks.incrementAndGet(); + } + } + + @Test + public void testLeaderCrashAfterRolloverMajorityReplicated() throws Exception { + // Intercepts leader to replicate rollover proposal only to n+1 of 2n+1 nodes(including leader). + LeaderContext context = new LeaderContext(); + final int N = 1; + qu = new QuorumUtil(N) { + @Override + protected Leader makeLeader(QuorumPeer self, FileTxnSnapLog logFactory) throws IOException, X509Exception { + return context.setLeader(new Leader(self, new LeaderZooKeeperServer(logFactory, self, self.getZkDb())) { + @Override + public synchronized void processAck(long sid, long zxid, SocketAddress followerAddr) { + if (context.isLeader(this) && zxid == context.zxid.get()) { + LOG.info("Ack to 0x{} from peer {}", ZxidUtils.zxidToString(zxid), sid); + if (context.incAcks() >= N + 1) { + self.setSuspended(true); + self.shuttingDownLE = true; + self.getElectionAlg().shutdown(); + context.completed.complete(null); + } + return; + } + super.processAck(sid, zxid, followerAddr); + } + + @Override + protected void sendPacket(QuorumPacket qp) { + if (context.isLeader(this) && qp.getType() == Leader.PROPOSAL && qp.getZxid() >= context.zxid.get()) { + getForwardingFollowers().stream().limit(N).forEach(follower -> { + follower.queuePacket(qp); + }); + } else { + super.sendPacket(qp); + } + } + }); + } + }; + qu.disableJMXTest = true; + qu.startAll(); + + int leaderId = qu.getLeaderServer(); + ZooKeeperServer zkLeader = qu.getLeaderQuorumPeer().getActiveServer(); + long epoch = ZxidUtils.getEpochFromZxid(zkLeader.getZxid()); + long exclusiveStartCounter = MAX_ZXID_COUNTER - 5; + + // Connect only to leader to avoid re-connect attempts. + try (ZooKeeper zk = ClientBase.createZKClient(qu.getConnectString(qu.getLeaderQuorumPeer()))) { + // given: leader about to rollover + zkLeader.setZxid(setZxidCounter(zkLeader.getZxid(), exclusiveStartCounter)); + + // when: multiple proposals during rollover to next epoch + context.zxid.set(maximizeZxid(zkLeader.getZxid())); + for (int i = 1; i <= 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/foo" + Long.toHexString(exclusiveStartCounter + i), + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + + // when: leader crash after rollover proposal replicated to minority, + // but before claiming it "committed" + context.completed.join(); + qu.shutdown(leaderId); + } + + qu.restart(leaderId); + + // then: after all servers up, every node should meet following conditions + for (int serverId = 1; serverId <= 2 * N + 1; serverId++) { + String connectString = qu.getConnectionStringForServer(serverId); + try (ZooKeeper zk = ClientBase.createZKClient(connectString)) { + // then: all proposals up to and including the rollover proposal must be replicated + for (long i = exclusiveStartCounter + 1; i <= MAX_ZXID_COUNTER; i++) { + String path = "/foo" + Long.toHexString(i); + Stat stat = zk.exists(path, false); + assertNotNull(stat, path + " not found"); + assertEquals(ZxidUtils.makeZxid(epoch, i), stat.getCzxid()); + } + + // then: new epoch proposals after rollover don't persist as leader declares no leadership yet + assertNull(zk.exists("/foo" + Long.toHexString(MAX_ZXID_COUNTER + 1), false)); + + // then: new epoch must greater than `epoch + 1` as at least n+1 nodes fenced it + Stat stat = new Stat(); + zk.create("/server" + serverId, null, ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + long newEpoch = ZxidUtils.getEpochFromZxid(stat.getCzxid()); + assertEquals(epoch + 2, newEpoch); + } + } + } + + @Test + public void testLeaderCrashAfterRolloverMinorityReplicated() throws Exception { + // Intercepts leader to replicate rollover proposal only to n of 2n+1 nodes(including leader). + LeaderContext context = new LeaderContext(); + final int N = 2; + qu = new QuorumUtil(N) { + @Override + protected Leader makeLeader(QuorumPeer self, FileTxnSnapLog logFactory) throws IOException, X509Exception { + return context.setLeader(new Leader(self, new LeaderZooKeeperServer(logFactory, self, self.getZkDb())) { + @Override + public synchronized void processAck(long sid, long zxid, SocketAddress followerAddr) { + if (context.isLeader(this) && zxid == context.zxid.get()) { + LOG.info("Ack to 0x{} from peer {}", ZxidUtils.zxidToString(zxid), sid); + if (context.incAcks() >= N) { + self.setSuspended(true); + self.shuttingDownLE = true; + self.getElectionAlg().shutdown(); + context.completed.complete(null); + } + return; + } + super.processAck(sid, zxid, followerAddr); + } + + @Override + protected void sendPacket(QuorumPacket qp) { + if (context.isLeader(this) && qp.getType() == Leader.PROPOSAL && qp.getZxid() >= context.zxid.get()) { + getForwardingFollowers().stream().limit(N - 1).forEach(follower -> { + follower.queuePacket(qp); + }); + } else { + super.sendPacket(qp); + } + } + }); + } + }; + qu.disableJMXTest = true; + qu.startAll(); + + int leaderId = qu.getLeaderServer(); + + ZooKeeperServer zkLeader = qu.getLeaderQuorumPeer().getActiveServer(); + long epoch = ZxidUtils.getEpochFromZxid(zkLeader.getZxid()); + final long exclusiveStartCounter = MAX_ZXID_COUNTER - 5; + // Connect only to leader to avoid re-connect attempts. + try (ZooKeeper zk = ClientBase.createZKClient(qu.getConnectString(qu.getLeaderQuorumPeer()))) { + // given: leader with about to rollover zxid + zkLeader.setZxid(setZxidCounter(zkLeader.getZxid(), exclusiveStartCounter)); + + // when: multiple proposals during rollover to next epoch + context.zxid.set(maximizeZxid(zkLeader.getZxid())); + for (int i = 1; i <= 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/foo" + Long.toHexString(exclusiveStartCounter + i), + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + + // when: leader crash after rollover proposal replicated to minority + context.completed.join(); + qu.shutdown(leaderId); + } + + qu.restart(leaderId); + + // then: after all servers up, every node should meet following conditions + long rolloverProposalZxid = 0; + for (int serverId = 1; serverId <= 2 * N + 1; serverId++) { + try (ZooKeeper zk = ClientBase.createZKClient(qu.getConnString())) { + // then: all proposals up to but not including the rollover proposal must be replicated + for (long i = exclusiveStartCounter + 1; i < MAX_ZXID_COUNTER; i++) { + String path = "/foo" + Long.toHexString(i); + Stat stat = zk.exists(path, false); + assertNotNull(stat, path + " not found"); + assertEquals(ZxidUtils.makeZxid(epoch, i), stat.getCzxid()); + } + + // It is indeterminate which part will win. The minority could win as they have higher `currentEpoch`. + // + // We can't make aggressive assertion like `equalTo(epoch + 1)` even when majority wins. As the new + // leader epoch is negotiated to greater than all `acceptedEpoch` in the quorum. So, it is possible + // for the leader epoch to be `greaterThan(epoch + 1)`. + // + // The situation is similar to leader crashed after minority has persisted `currentEpoch` in a + // normal "DISCOVERY" phase. + // + // See "Phase 1: Establish an epoch" of https://cwiki.apache.org/confluence/display/ZOOKEEPER/Zab1.0 + Stat stat = new Stat(); + zk.create("/server" + serverId, null, ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + long newEpoch = ZxidUtils.getEpochFromZxid(stat.getCzxid()); + assertThat(newEpoch, greaterThanOrEqualTo(epoch + 1)); + + // then: it is indeterminate whether the rollover proposal has been committed, + // but it must be consistent among all nodes + if (rolloverProposalZxid == 0) { + rolloverProposalZxid = + Optional.ofNullable(zk.exists("/foo" + Long.toHexString(MAX_ZXID_COUNTER), false)) + .map(Stat::getCzxid).orElse(-1L); + LOG.info("Get rollover proposal zxid {}", ZxidUtils.zxidToString(rolloverProposalZxid)); + } else { + long zxid = + Optional.ofNullable(zk.exists("/foo" + Long.toHexString(MAX_ZXID_COUNTER), false)) + .map(Stat::getCzxid).orElse(-1L); + assertEquals(ZxidUtils.zxidToString(rolloverProposalZxid), ZxidUtils.zxidToString(zxid)); + } + } + } + } + + @Test + public void testLeaderCrashBeforeRolloverReplication() throws Exception { + // Intercepts leader not to replicate rollover proposal. + LeaderContext context = new LeaderContext(); + final int N = 1; + qu = new QuorumUtil(N) { + @Override + protected Leader makeLeader(QuorumPeer self, FileTxnSnapLog logFactory) throws IOException, X509Exception { + return context.setLeader(new Leader(self, new LeaderZooKeeperServer(logFactory, self, self.getZkDb())) { + @Override + public synchronized void processAck(long sid, long zxid, SocketAddress followerAddr) { + super.processAck(sid, zxid, followerAddr); + if (!context.isLeader(this)) { + return; + } + if (zxid == context.zxid.get()) { + context.acks.incrementAndGet(); + } + if (context.acks.get() != 0 && outstandingProposals.get(context.zxid.get() - 1) == null) { + self.setSuspended(true); + self.shuttingDownLE = true; + self.getElectionAlg().shutdown(); + context.completed.complete(null); + } + } + + @Override + protected void sendPacket(QuorumPacket qp) { + if (context.isLeader(this) && qp.getType() == Leader.PROPOSAL && qp.getZxid() >= context.zxid.get()) { + return; + } + super.sendPacket(qp); + } + }); + } + }; + qu.disableJMXTest = true; + qu.startAll(); + + ZooKeeperServer zkLeader = qu.getLeaderQuorumPeer().getActiveServer(); + long epoch = ZxidUtils.getEpochFromZxid(zkLeader.getZxid()); + + int leaderId = qu.getLeaderServer(); + long exclusiveStartCounter = MAX_ZXID_COUNTER - 5; + // Connect only to leader to avoid re-connect attempts. + String leaderConnectString = qu.getConnectString(qu.getLeaderQuorumPeer()); + try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString)) { + // given: leader about to rollover + zkLeader.setZxid(setZxidCounter(zkLeader.getZxid(), exclusiveStartCounter)); + + // when: multiple proposals during rollover to next epoch + context.zxid.set(setZxidCounter(zkLeader.getZxid(), MAX_ZXID_COUNTER)); + for (int i = 1; i <= 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/foo" + Long.toHexString(exclusiveStartCounter + i), + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name, stat) -> { + }, + null); + } + + // when: leader crash before broadcasting rollover proposal + context.completed.join(); + qu.shutdown(leaderId); + } + + String connString = qu.getConnString() + .replace(leaderConnectString + ",", "") + .replace(leaderConnectString, ""); + boolean restarted = false; + for (int j = 0; true; j++) { + try (ZooKeeper zk = ClientBase.createZKClient(connString, ClientBase.CONNECTION_TIMEOUT)) { + // then: all proposals up to but not including the rollover proposal must be replicated + for (long i = exclusiveStartCounter + 1; i < ZxidUtils.MAX_COUNTER; i++) { + String path = "/foo" + Long.toHexString(i); + Stat stat = zk.exists(path, false); + assertNotNull(stat, path + " not found"); + assertEquals(ZxidUtils.makeZxid(epoch, i), stat.getCzxid()); + } + + // then: the rollover proposal is lost even after old leader with higher `currentEpoch` re-join. + assertNull(zk.exists("/foo" + Long.toHexString(ZxidUtils.MAX_COUNTER), false)); + + // then: new epoch will be `epoch + 1`. + Stat stat = new Stat(); + zk.create("/bar" + j, null, ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEquals(epoch + 1, ZxidUtils.getEpochFromZxid(stat.getCzxid())); + } + + if (restarted) { + break; + } + + // when: rejoin old leader which fences `epoch + 1` + // then: all above holds as majority has formed + qu.start(leaderId); + restarted = true; + connString = qu.getConnectionStringForServer(leaderId); + } + } + + @Test + public void testMinorityFollowersCrashBeforeWriteRolloverToDisk() throws Exception { + final int N = 1; + final int minorityN = N; + class Context { + final BlockingQueue followers = new ArrayBlockingQueue<>(minorityN); + final AtomicLong rolloverZxid = new AtomicLong(Long.MAX_VALUE); + final Set crashed_servers = Collections.synchronizedSet(new HashSet<>()); + final BlockingQueue crashing_servers = new LinkedBlockingQueue<>(); + + boolean bypass(Follower follower, long zxid) { + boolean minority = followers.offer(follower) || followers.contains(follower); + if (minority && zxid >= rolloverZxid.get()) { + if (crashed_servers.add(follower.self.getMyId())) { + crashing_servers.add(follower.self.getMyId()); + } + return true; + } + return false; + } + } + Context context = new Context(); + qu = new QuorumUtil(N) { + @Override + protected Follower makeFollower(QuorumPeer self, FileTxnSnapLog logFactory) throws IOException { + return new Follower(self, new FollowerZooKeeperServer(logFactory, self, self.getZkDb()) { + @Override + public void logRequest(Request request) { + if (!context.bypass(getFollower(), request.zxid)) { + super.logRequest(request); + } + } + }); + } + }; + qu.startAll(); + + ZooKeeperServer zkLeader = qu.getLeaderQuorumPeer().getActiveServer(); + long epoch = ZxidUtils.getEpochFromZxid(zkLeader.getZxid()); + long exclusiveStartCounter = MAX_ZXID_COUNTER - 5; + + // Connect only to leader to avoid re-connect attempts. + String leaderConnectString = qu.getConnectString(qu.getLeaderQuorumPeer()); + try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString)) { + context.rolloverZxid.set(ZxidUtils.makeZxid(epoch, MAX_ZXID_COUNTER)); + + // given: leader about to rollover + zkLeader.setZxid(setZxidCounter(zkLeader.getZxid(), exclusiveStartCounter)); + + // when: multiple proposals during rollover to next epoch + for (int i = 1; i <= 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/foo" + Long.toHexString(exclusiveStartCounter + i), + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + + // when: minority followers crashed before persisting rollover proposal + for (int i = 0; i < minorityN; i++) { + long serverId = context.crashing_servers.take(); + qu.shutdown((int) serverId); + } + } + + for (long serverId : context.crashed_servers) { + qu.restart((int) serverId); + } + + // then: after all servers up, every node should meet following conditions + for (int serverId = 1; serverId <= 2 * N + 1; serverId++) { + String connectString = qu.getConnectionStringForServer(serverId); + try (ZooKeeper zk = ClientBase.createZKClient(connectString)) { + // then: all proposals must be replicated + for (int i = 1; i <= 10; i++) { + String path = "/foo" + Long.toHexString(exclusiveStartCounter + i); + Stat stat = zk.exists(path, false); + assertNotNull(stat, path + " not found"); + } + + // then: we have rollover to nextEpoch + long nextEpoch = epoch + 1; + assertEquals(nextEpoch, ZxidUtils.getEpochFromZxid(qu.getPeer(serverId).peer.getZkDb().getDataTreeLastProcessedZxid())); + Stat stat = new Stat(); + zk.create("/server" + serverId, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEquals(nextEpoch, ZxidUtils.getEpochFromZxid(stat.getMzxid())); + } + } + } + + @Test + public void testMajorityFollowersCrashBeforeWriteRolloverToDisk() throws Exception { + final int N = 1; + final int majorityN = N + 1; + class Context { + final BlockingQueue followers = new ArrayBlockingQueue<>(majorityN); + final AtomicLong rolloverZxid = new AtomicLong(Long.MAX_VALUE); + final Set crashed_servers = Collections.synchronizedSet(new HashSet<>()); + final BlockingQueue crashing_servers = new LinkedBlockingQueue<>(); + + boolean bypass(Follower follower, long zxid) { + boolean majority = followers.offer(follower) || followers.contains(follower); + if (majority && zxid >= rolloverZxid.get()) { + if (crashed_servers.add(follower.self.getMyId())) { + crashing_servers.add(follower.self.getMyId()); + } + return true; + } + return false; + } + } + Context context = new Context(); + qu = new QuorumUtil(N) { + @Override + protected Follower makeFollower(QuorumPeer self, FileTxnSnapLog logFactory) throws IOException { + return new Follower(self, new FollowerZooKeeperServer(logFactory, self, self.getZkDb()) { + @Override + public void logRequest(Request request) { + if (!context.bypass(getFollower(), request.zxid)) { + super.logRequest(request); + } + } + }); + } + }; + qu.startAll(); + + ZooKeeperServer zkLeader = qu.getLeaderQuorumPeer().getActiveServer(); + long epoch = ZxidUtils.getEpochFromZxid(zkLeader.getZxid()); + long exclusiveStartCounter = MAX_ZXID_COUNTER - 5; + + // Connect only to leader to avoid re-connect attempts. + String leaderConnectString = qu.getConnectString(qu.getLeaderQuorumPeer()); + ClientBase.CountdownWatcher watcher = new ClientBase.CountdownWatcher(); + try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString, watcher)) { + context.rolloverZxid.set(ZxidUtils.makeZxid(epoch, MAX_ZXID_COUNTER)); + + // given: leader about to rollover + zkLeader.setZxid(setZxidCounter(zkLeader.getZxid(), exclusiveStartCounter)); + + // when: multiple proposals during rollover to next epoch + for (int i = 1; i <= 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/foo" + Long.toHexString(exclusiveStartCounter + i), + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + + // when: majority followers crashed before persisting rollover proposal + for (int i = 0; i < majorityN; i++) { + long serverId = context.crashing_servers.take(); + qu.shutdown((int) serverId); + } + + watcher.waitForDisconnected(CONNECTION_TIMEOUT); + watcher.reset(); + + for (long serverId : context.crashed_servers) { + qu.restart((int) serverId); + } + + watcher.waitForConnected(CONNECTION_TIMEOUT); + } + + // then: after quorum reformed, every node should meet following conditions + long rolloverProposalZxid = 0; + for (int serverId = 1; serverId <= 2 * N + 1; serverId++) { + String connectString = qu.getConnectionStringForServer(serverId); + try (ZooKeeper zk = ClientBase.createZKClient(connectString)) { + // then: all proposals up to but not including the rollover proposal must be replicated + for (long i = exclusiveStartCounter + 1; i < ZxidUtils.MAX_COUNTER; i++) { + String path = "/foo" + Long.toHexString(i); + Stat stat = zk.exists(path, false); + assertNotNull(stat, path + " not found"); + } + + // then: it is indeterminate whether the rollover proposal has been committed, + // but it must be consistent among all nodes + if (rolloverProposalZxid == 0) { + rolloverProposalZxid = + Optional.ofNullable(zk.exists("/foo" + Long.toHexString(MAX_ZXID_COUNTER), false)) + .map(Stat::getCzxid).orElse(-1L); + LOG.info("Get rollover proposal zxid {}", ZxidUtils.zxidToString(rolloverProposalZxid)); + } else { + long zxid = + Optional.ofNullable(zk.exists("/foo" + Long.toHexString(MAX_ZXID_COUNTER), false)) + .map(Stat::getCzxid).orElse(-1L); + assertEquals(ZxidUtils.zxidToString(rolloverProposalZxid), ZxidUtils.zxidToString(zxid)); + } + + // then: new epoch proposal must not be committed + assertNull(zk.exists("/foo" + Long.toHexString(ZxidUtils.MAX_COUNTER + 1), false)); + } + } + } + + @Test + public void testMinorityFollowersCrashAfterWriteRolloverToDisk() throws Exception { + final int N = 1; + class Context { + final Set crashed_servers = Collections.synchronizedSet(new HashSet<>()); + final BlockingQueue crashing_servers = new LinkedBlockingQueue<>(); + } + Context context = new Context(); + qu = new QuorumUtil(N) { + @Override + protected Follower makeFollower(QuorumPeer self, FileTxnSnapLog logFactory) throws IOException { + return new Follower(self, new FollowerZooKeeperServer(logFactory, self, self.getZkDb()) { + @Override + public void fenceRolloverEpoch(long newEpoch) throws IOException { + super.fenceRolloverEpoch(newEpoch); + long myId = self.getMyId(); + if (context.crashed_servers.size() < N && !context.crashed_servers.contains(myId)) { + context.crashed_servers.add(myId); + context.crashing_servers.add(myId); + throw new IOException("crash peer " + myId + "after persist max epoch zxid"); + } + } + }); + } + }; + qu.startAll(); + + ZooKeeperServer zkLeader = qu.getLeaderQuorumPeer().getActiveServer(); + long epoch = ZxidUtils.getEpochFromZxid(zkLeader.getZxid()); + long exclusiveStartCounter = MAX_ZXID_COUNTER - 5; + // Connect only to leader to avoid re-connect attempts. + String leaderConnectString = qu.getConnectString(qu.getLeaderQuorumPeer()); + try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString)) { + // given: leader about to rollover + zkLeader.setZxid(setZxidCounter(zkLeader.getZxid(), exclusiveStartCounter)); + + // when: multiple proposals during rollover to next epoch + for (int i = 1; i <= 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/foo" + Long.toHexString(exclusiveStartCounter + i), + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + + // when: minority followers crashed after replication + for (int i = 0; i < N; i++) { + long serverId = context.crashing_servers.take(); + qu.shutdown((int) serverId); + } + } + + for (long serverId : context.crashed_servers) { + qu.restart((int) serverId); + } + + // then: after all servers up, every node should meet following conditions + for (int serverId = 1; serverId <= 2 * N + 1; serverId++) { + String connectString = qu.getConnectionStringForServer(serverId); + try (ZooKeeper zk = ClientBase.createZKClient(connectString)) { + // then: all proposals must be replicated + for (int i = 1; i <= 10; i++) { + String path = "/foo" + Long.toHexString(exclusiveStartCounter + i); + Stat stat = zk.exists(path, false); + assertNotNull(stat, path + " not found"); + } + + // then: we have rollover to nextEpoch + long nextEpoch = epoch + 1; + assertEquals(nextEpoch, ZxidUtils.getEpochFromZxid(qu.getPeer(serverId).peer.getZkDb().getDataTreeLastProcessedZxid())); + Stat stat = new Stat(); + zk.create("/server" + serverId, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEquals(nextEpoch, ZxidUtils.getEpochFromZxid(stat.getMzxid())); + } + } + } + + @Test + public void testMajorityFollowersCrashAfterWriteRolloverToDisk() throws Exception { + final int N = 1; + class Context { + final Set crashed_servers = Collections.synchronizedSet(new LinkedHashSet<>()); + final BlockingQueue crashing_servers = new LinkedBlockingQueue<>(); + } + Context context = new Context(); + qu = new QuorumUtil(N) { + @Override + protected Follower makeFollower(QuorumPeer self, FileTxnSnapLog logFactory) throws IOException { + return new Follower(self, new FollowerZooKeeperServer(logFactory, self, self.getZkDb()) { + @Override + public void fenceRolloverEpoch(long newEpoch) throws IOException { + super.fenceRolloverEpoch(newEpoch); + long myId = self.getMyId(); + if (context.crashed_servers.size() < N + 1 && !context.crashed_servers.contains(myId)) { + context.crashed_servers.add(myId); + context.crashing_servers.add(myId); + throw new IOException("crash peer " + myId + "after persist max epoch zxid"); + } + } + }); + } + }; + qu.startAll(); + + ZooKeeperServer zkLeader = qu.getLeaderQuorumPeer().getActiveServer(); + long exclusiveStartCounter = MAX_ZXID_COUNTER - 5; + + // Connect only to leader to avoid re-connect attempts. + String leaderConnectString = qu.getConnectString(qu.getLeaderQuorumPeer()); + ClientBase.CountdownWatcher watcher = new ClientBase.CountdownWatcher(); + try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString, watcher)) { + // given: leader about to rollover + zkLeader.setZxid(setZxidCounter(zkLeader.getZxid(), exclusiveStartCounter)); + + // when: multiple proposals during rollover to next epoch + for (int i = 1; i <= 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/foo" + Long.toHexString(exclusiveStartCounter + i), + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + + // when: majority followers crashed after replicating rollover proposal + for (int i = 0; i < N + 1; i++) { + long serverId = context.crashing_servers.take(); + qu.shutdown((int) serverId); + } + + watcher.waitForDisconnected(CONNECTION_TIMEOUT); + watcher.reset(); + + for (long serverId : context.crashed_servers) { + qu.restart((int) serverId); + } + + watcher.waitForConnected(CONNECTION_TIMEOUT); + } + + // then: after quorum reformed, every node should meet following conditions + long rolloverProposalZxid = 0; + for (int serverId = 1; serverId <= 2 * N + 1; serverId++) { + String connectString = qu.getConnectionStringForServer(serverId); + try (ZooKeeper zk = ClientBase.createZKClient(connectString)) { + // then: all proposals up to rollover proposal must be replicated + for (long i = exclusiveStartCounter + 1; i < ZxidUtils.MAX_COUNTER; i++) { + String path = "/foo" + Long.toHexString(i); + Stat stat = zk.exists(path, false); + assertNotNull(stat, path + " not found"); + } + + // then: it is indeterminate whether the rollover proposal has been committed, + // but it must be consistent among all nodes + if (rolloverProposalZxid == 0) { + rolloverProposalZxid = + Optional.ofNullable(zk.exists("/foo" + Long.toHexString(MAX_ZXID_COUNTER), false)) + .map(Stat::getCzxid).orElse(-1L); + LOG.info("Get rollover proposal zxid {}", ZxidUtils.zxidToString(rolloverProposalZxid)); + } else { + long zxid = + Optional.ofNullable(zk.exists("/foo" + Long.toHexString(MAX_ZXID_COUNTER), false)) + .map(Stat::getCzxid).orElse(-1L); + assertEquals(ZxidUtils.zxidToString(rolloverProposalZxid), ZxidUtils.zxidToString(zxid)); + } + + // then: new epoch proposal must not be committed + assertNull(zk.exists("/foo" + Long.toHexString(ZxidUtils.MAX_COUNTER + 1), false)); + } + } + } + + @Test + public void testLearnerRejoinDuringLeaderRolloverEpoch() throws Exception { + final int N = 1; + class Context { + private final AtomicLong rolloverZxid = new AtomicLong(Long.MAX_VALUE); + private final AtomicLong followerId = new AtomicLong(-1); + private final CompletableFuture rolloverCommitting = new CompletableFuture<>(); + private final CompletableFuture rolloverCommitted = new CompletableFuture<>(); + } + Context context = new Context(); + qu = new QuorumUtil(N) { + @Override + protected Leader makeLeader(QuorumPeer self, FileTxnSnapLog logFactory) throws IOException, X509Exception { + return new Leader(self, new LeaderZooKeeperServer(logFactory, self, self.getZkDb()) { + @Override + public DataTree.ProcessTxnResult processTxn(Request request) { + DataTree.ProcessTxnResult result = super.processTxn(request); + // Leader is about to rollover, combining with below randomness, + // we can test all cases: + // 1. Sync before rollover proposal committed. + // 2. Sync after rollover proposal committed. + // 3. Sync after proposals from new epoch committed. + if (request.zxid + 2 >= context.rolloverZxid.get()) { + context.rolloverCommitting.join(); + context.rolloverCommitted.complete(null); + } + return result; + } + }) { + @Override + public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException { + super.waitForEpochAck(id, ss); + if (id == context.followerId.get()) { + context.rolloverCommitted.join(); + // Sleep a bit before sync to allow more proposals to come. + Thread.sleep(new Random().nextInt(10)); + } + } + }; + } + + @Override + protected Follower makeFollower(QuorumPeer self, FileTxnSnapLog logFactory) throws IOException { + return new Follower(self, new FollowerZooKeeperServer(logFactory, self, self.getZkDb())) { + @Override + protected void syncWithLeader(long newLeaderZxid) throws Exception { + super.syncWithLeader(newLeaderZxid); + } + + @Override + protected long registerWithLeader(int pktType) throws IOException { + long leaderZxid = super.registerWithLeader(pktType); + if (self.getMyId() == context.followerId.get()) { + context.rolloverCommitting.complete(null); + } + return leaderZxid; + } + }; + } + }; + qu.startAll(); + + int followerId = (int) qu.getFollowerQuorumPeers().get(0).getMyId(); + CompletableFuture restarted = new CompletableFuture<>(); + + ZooKeeperServer zkLeader = qu.getLeaderQuorumPeer().getActiveServer(); + long epoch = ZxidUtils.getEpochFromZxid(zkLeader.getZxid()); + long exclusiveStartCounter = MAX_ZXID_COUNTER - 5; + // Connect only to leader to avoid re-connect attempts. + String leaderConnectString = qu.getConnectString(qu.getLeaderQuorumPeer()); + try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString)) { + // given: a re-joining follower + qu.shutdown(followerId); + + ForkJoinPool.commonPool().submit(() -> { + context.followerId.set(followerId); + qu.restart(followerId); + restarted.complete(null); + return null; + }); + + // given: leader rollover to next epoch + zkLeader.setZxid(setZxidCounter(zkLeader.getZxid(), exclusiveStartCounter)); + context.rolloverZxid.set(ZxidUtils.makeZxid(epoch, MAX_ZXID_COUNTER)); + for (int i = 1; i <= 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/foo" + Long.toHexString(exclusiveStartCounter + i), + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + } + + // when: follower rejoin + restarted.join(); + + String followerAddr = qu.getConnectionStringForServer(followerId); + try (ZooKeeper zk = ClientBase.createZKClient(followerAddr)) { + zk.sync("/"); + // then: all proposals must be replicated + for (int i = 1; i <= 10; i++) { + String path = "/foo" + Long.toHexString(exclusiveStartCounter + i); + Stat stat = zk.exists(path, false); + assertNotNull(stat, path + " not found"); + } + QuorumPeer follower = qu.getPeer(followerId).peer; + assertEquals(epoch + 1, follower.getAcceptedEpoch()); + assertEquals(epoch + 1, follower.getCurrentEpoch()); + assertEquals(epoch + 1, follower.getCurrentVote().getPeerEpoch()); + } + } + + @Test + public void testLearnerRejoinAfterLeaderRolloverEpoch() throws Exception { + final int N = 1; + qu = new QuorumUtil(N); + qu.startAll(); + + int followerId = (int) qu.getFollowerQuorumPeers().get(0).getMyId(); + + ZooKeeperServer zkLeader = qu.getLeaderQuorumPeer().getActiveServer(); + long epoch = ZxidUtils.getEpochFromZxid(zkLeader.getZxid()); + long exclusiveStartCounter = MAX_ZXID_COUNTER - 5; + // Connect only to leader to avoid re-connect attempts. + String leaderConnectString = qu.getConnectString(qu.getLeaderQuorumPeer()); + try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString)) { + // given: a shutdown follower + qu.shutdown(followerId); + + // given: leader rollover to next epoch + zkLeader.setZxid(setZxidCounter(zkLeader.getZxid(), exclusiveStartCounter)); + for (int i = 1; i <= 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/foo" + Long.toHexString(exclusiveStartCounter + i), + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + } + + // when: follower rejoin + qu.restart(followerId); + + String followerAddr = qu.getConnectionStringForServer(followerId); + try (ZooKeeper zk = ClientBase.createZKClient(followerAddr)) { + zk.sync("/"); + // then: all proposals must be replicated + for (int i = 1; i <= 10; i++) { + String path = "/foo" + Long.toHexString(exclusiveStartCounter + i); + Stat stat = zk.exists(path, false); + assertNotNull(stat, path + " not found"); + } + QuorumPeer follower = qu.getPeer(followerId).peer; + assertEquals(epoch + 1, follower.getAcceptedEpoch()); + assertEquals(epoch + 1, follower.getCurrentEpoch()); + assertEquals(epoch + 1, follower.getCurrentVote().getPeerEpoch()); + } + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java index 3021421b5e4..66f5e61bd61 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java @@ -735,6 +735,10 @@ public static ZooKeeper createZKClient(String cxnString) throws Exception { return createZKClient(cxnString, CONNECTION_TIMEOUT); } + public static ZooKeeper createZKClient(String cxnString, CountdownWatcher watcher) throws Exception { + return createZKClient(cxnString, CONNECTION_TIMEOUT, CONNECTION_TIMEOUT, new ZKClientConfig(), watcher); + } + /** * Returns ZooKeeper client after connecting to ZooKeeper Server. Session * timeout is {@link #CONNECTION_TIMEOUT} @@ -757,6 +761,11 @@ public static ZooKeeper createZKClient(String cxnString, int sessionTimeout, public static ZooKeeper createZKClient(String cxnString, int sessionTimeout, long connectionTimeout, ZKClientConfig config) throws IOException { CountdownWatcher watcher = new CountdownWatcher(); + return createZKClient(cxnString, sessionTimeout, connectionTimeout, config, watcher); + } + + public static ZooKeeper createZKClient(String cxnString, int sessionTimeout, + long connectionTimeout, ZKClientConfig config, CountdownWatcher watcher) throws IOException { ZooKeeper zk = new ZooKeeper(cxnString, sessionTimeout, watcher, config); try { watcher.waitForConnected(connectionTimeout); @@ -765,5 +774,4 @@ public static ZooKeeper createZKClient(String cxnString, int sessionTimeout, } return zk; } - } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java index 865d82b0c7c..e60e43a93c9 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java @@ -34,7 +34,13 @@ import java.util.Set; import java.util.TreeSet; import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.common.X509Exception; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.quorum.Election; +import org.apache.zookeeper.server.quorum.Follower; +import org.apache.zookeeper.server.quorum.FollowerZooKeeperServer; +import org.apache.zookeeper.server.quorum.Leader; +import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer; import org.apache.zookeeper.server.quorum.QuorumPeer; import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; @@ -65,7 +71,7 @@ public static class PeerStruct { } - private final Map peersView = new HashMap<>(); + protected final Map peersView = new HashMap<>(); private final Map peers = new HashMap<>(); @@ -75,15 +81,15 @@ public static class PeerStruct { private String hostPort; - private int tickTime; + protected int tickTime; - private int initLimit; + protected int initLimit; - private int syncLimit; + protected int syncLimit; - private int connectToLearnerMasterLimit; + protected int connectToLearnerMasterLimit; - private int electionAlg; + protected int electionAlg; private boolean localSessionEnabled; @@ -120,7 +126,7 @@ public QuorumUtil(int n, int syncLimit) throws RuntimeException { for (int i = 1; i <= ALL; ++i) { PeerStruct ps = peers.get(i); LOG.info("Creating QuorumPeer {}; public port {}", i, ps.clientPort); - ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg, ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); + ps.peer = newQuorumPeer(ps); assertEquals(ps.clientPort, ps.peer.getClientPort()); } } catch (Exception e) { @@ -143,6 +149,28 @@ public void enableLocalSession(boolean localSessionEnabled) { this.localSessionEnabled = localSessionEnabled; } + protected QuorumPeer newQuorumPeer(PeerStruct ps) throws IOException { + return new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg, ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit) { + @Override + protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception { + return QuorumUtil.this.makeLeader(this, logFactory); + } + + @Override + protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException { + return QuorumUtil.this.makeFollower(this, logFactory); + } + }; + } + + protected Leader makeLeader(QuorumPeer self, FileTxnSnapLog logFactory) throws IOException, X509Exception { + return new Leader(self, new LeaderZooKeeperServer(logFactory, self, self.getZkDb())); + } + + protected Follower makeFollower(QuorumPeer self, FileTxnSnapLog logFactory) throws IOException { + return new Follower(self, new FollowerZooKeeperServer(logFactory, self, self.getZkDb())); + } + public void startAll() throws IOException { shutdownAll(); for (int i = 1; i <= ALL; ++i) { @@ -206,7 +234,7 @@ public void startQuorum() throws IOException { public void start(int id) throws IOException { PeerStruct ps = getPeer(id); LOG.info("Creating QuorumPeer {}; public port {}", ps.id, ps.clientPort); - ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg, ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); + ps.peer = newQuorumPeer(ps); if (localSessionEnabled) { ps.peer.enableLocalSessions(true); } @@ -225,7 +253,7 @@ public void restart(int id) throws IOException { public void startThenShutdown(int id) throws IOException { PeerStruct ps = getPeer(id); LOG.info("Creating QuorumPeer {}; public port {}", ps.id, ps.clientPort); - ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg, ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); + ps.peer = newQuorumPeer(ps); if (localSessionEnabled) { ps.peer.enableLocalSessions(true); }