From 8a868da8ea600282fc8a62699b6407b40742cc6e Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Sun, 27 Oct 2024 10:37:41 +0800 Subject: [PATCH] ZOOKEEPER-4882: Fix data loss after rejoin and restart of a node experienced temporary disk error The cause is multifold: 1. Leader will commit a proposal once quorum acked. 2. Proposal is able to be committed in node's memory even if it has not been written to that node's disk. 3. In case of disk error, the txn log could lag behind memory database. This way, node experienced temporary disk error will have hole in its txn log after re-join. Once restarted, data will loss. This commit complains the lag so to reload disk database to memory. This way, the node will not be able to become leader and sync missing txns from leader. Refs: ZOOKEEPER-4882, ZOOKEEPER-4925 --- .../apache/zookeeper/server/ZKDatabase.java | 9 +- .../zookeeper/server/ZooKeeperServer.java | 2 +- .../server/persistence/FileTxnLog.java | 21 +- .../server/persistence/FileTxnSnapLog.java | 7 +- .../zookeeper/server/quorum/QuorumPeer.java | 3 +- .../zookeeper/server/quorum/DIFFSyncTest.java | 2 +- .../server/quorum/SyncDiskErrorTest.java | 277 ++++++++++++++++++ .../org/apache/zookeeper/test/QuorumUtil.java | 10 +- 8 files changed, 318 insertions(+), 13 deletions(-) create mode 100644 zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncDiskErrorTest.java diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java index 2fd67da5063..3ab66e12d75 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java @@ -299,9 +299,16 @@ public long loadDataBase() throws IOException { /** * Fast-forward the database adding transactions from the committed log into memory. * @return the last valid zxid. - * @throws IOException + * @throws IOException IO or inconsistent database error */ public long fastForwardDataBase() throws IOException { + long lastLoggedZxid = snapLog.getLastLoggedZxid(); + if (lastLoggedZxid < dataTree.lastProcessedZxid) { + String msg = String.format("memory database(zxid: 0x%s) is ahead of disk(zxid: 0x%s)", + Long.toHexString(dataTree.lastProcessedZxid), + Long.toHexString(lastLoggedZxid)); + throw new IOException(msg); + } long zxid = snapLog.fastForwardFromEdits(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener); initialized = true; return zxid; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 11ec1fb7413..d5344397968 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -935,7 +935,7 @@ public final synchronized void shutdown(boolean fullyShutDown) { // This will fast-forward the database to the last recorded transaction zkDb.fastForwardDataBase(); } catch (IOException e) { - LOG.error("Error updating DB", e); + LOG.error("Failed to update memory database, will clear it to avoid inconsistency", e); fullyShutDown = true; } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java index e14e510c2be..267e17e4cbf 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java @@ -147,6 +147,7 @@ public class FileTxnLog implements TxnLog, Closeable { } long lastZxidSeen; + long lastZxidFlushed; volatile BufferedOutputStream logStream = null; volatile OutputArchive oa; volatile FileOutputStream fos = null; @@ -366,7 +367,13 @@ public static File[] getLogFiles(File[] logDirList, long snapshotZxid) { * get the last zxid that was logged in the transaction logs * @return the last zxid logged in the transaction logs */ - public long getLastLoggedZxid() { + @Override + public long getLastLoggedZxid() throws IOException { + long lastFlushedZxid = getLastFlushedZxid(); + if (lastFlushedZxid > 0) { + return lastFlushedZxid; + } + File[] files = getLogFiles(logDir.listFiles(), 0); long maxLog = files.length > 0 ? Util.getZxidFromName(files[files.length - 1].getName(), LOG_FILE_PREFIX) : -1; @@ -381,8 +388,6 @@ public long getLastLoggedZxid() { TxnHeader hdr = itr.getHeader(); zxid = hdr.getZxid(); } - } catch (IOException e) { - LOG.warn("Unexpected exception", e); } return zxid; } @@ -427,6 +432,7 @@ public synchronized void commit() throws IOException { ServerMetrics.getMetrics().FSYNC_TIME.add(syncElapsedMS); } } + lastZxidFlushed = lastZxidSeen; while (streamsToFlush.size() > 1) { streamsToFlush.poll().close(); } @@ -442,6 +448,10 @@ public synchronized void commit() throws IOException { } } + private synchronized long getLastFlushedZxid() { + return lastZxidFlushed; + } + /** * * @return elapsed sync time of transaction log in milliseconds @@ -494,8 +504,13 @@ public boolean truncate(long zxid) throws IOException { while (itr.goToNextLog()) { if (!itr.logFile.delete()) { LOG.warn("Unable to truncate {}", itr.logFile); + throw new IOException("Unable to truncate " + itr.logFile); } } + synchronized (this) { + lastZxidSeen = zxid; + lastZxidFlushed = zxid; + } } return true; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java index 2816826046e..2ec275fb29a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java @@ -457,9 +457,10 @@ public void processTransaction( * the last logged zxid on the transaction logs * @return the last logged zxid */ - public long getLastLoggedZxid() { - FileTxnLog txnLog = new FileTxnLog(dataDir); - return txnLog.getLastLoggedZxid(); + public long getLastLoggedZxid() throws IOException { + SnapshotInfo snapshotInfo = snapLog.getLastSnapshotInfo(); + long lastSnapZxid = snapshotInfo == null ? -1 : snapshotInfo.zxid; + return Long.max(lastSnapZxid, txnLog.getLastLoggedZxid()); } /** diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java index 9a18d59d401..378ae9271a0 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -2263,7 +2263,8 @@ public void setZKDatabase(ZKDatabase database) { this.zkDb = database; } - protected ZKDatabase getZkDb() { + // @VisibleForTesting + public ZKDatabase getZkDb() { return zkDb; } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncTest.java index a9be09f3973..3da9d9ea2c1 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncTest.java @@ -293,7 +293,7 @@ public void commit(final Request request) { private void logEpochsAndLastLoggedTxnForAllServers() throws Exception { for (int i = 0; i < SERVER_COUNT; i++) { final QuorumPeer qp = mt[i].getQuorumPeer(); - if (qp != null) { + if (qp != null && qp.getZkDb().isInitialized()) { LOG.info(String.format("server id=%d, acceptedEpoch=%d, currentEpoch=%d, lastLoggedTxn=%s", qp.getMyId(), qp.getAcceptedEpoch(), qp.getCurrentEpoch(), Long.toHexString(qp.getLastLoggedZxid()))); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncDiskErrorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncDiskErrorTest.java new file mode 100644 index 00000000000..dcea06a06dd --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncDiskErrorTest.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import java.io.IOException; +import java.util.Comparator; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.server.ZKDatabase; +import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.test.QuorumUtil; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class SyncDiskErrorTest extends ZKTestCase { + private static final int CONNECTION_TIMEOUT = ClientBase.CONNECTION_TIMEOUT; + + QuorumUtil qu; + + @BeforeEach + public void setUp() { + // write and sync every txn + System.setProperty("zookeeper.maxBatchSize", "1"); + } + + @AfterEach + public void tearDown() { + System.clearProperty("zookeeper.maxBatchSize"); + if (qu != null) { + qu.shutdownAll(); + } + } + + @Test + public void testFollowerRejoinAndRestartAfterTemporaryDiskError() throws Exception { + class Context { + final AtomicLong followerId = new AtomicLong(-1); + final CompletableFuture hang = new CompletableFuture<>(); + final CompletableFuture error = new CompletableFuture<>(); + } + Context context = new Context(); + final int N = 1; + qu = new QuorumUtil(N) { + @Override + protected QuorumPeer newQuorumPeer(PeerStruct ps) throws IOException { + QuorumPeer peer = super.newQuorumPeer(ps); + peer.setZKDatabase(new ZKDatabase(peer.getTxnFactory()) { + @Override + public void commit() throws IOException { + if (peer.follower != null && peer.getMyId() == context.followerId.get()) { + context.hang.complete(null); + context.error.join(); + throw new IOException("temporary disk error"); + } + super.commit(); + } + }); + return peer; + } + }; + qu.startAll(); + + int followerId = (int) qu.getFollowerQuorumPeers().get(0).getMyId(); + String followerConnectString = qu.getConnectionStringForServer(followerId); + + // Connect to leader to avoid connection to faulty node. + String leaderConnectString = qu.getConnectString(qu.getLeaderQuorumPeer()); + try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString)) { + // given: follower disk hang temporarily and error + context.followerId.set(followerId); + + // given: multiple write txn committed meanwhile + for (int i = 1; i < 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/foo" + i, + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + zk.create("/foo" + 10, new byte[0], ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT); + context.hang.join(); + context.followerId.set(-1); + context.error.complete(null); + + // given: re-join after disk error + ClientBase.waitForServerUp(followerConnectString, CONNECTION_TIMEOUT); + + // given: follower state is good + try (ZooKeeper followerZk = ClientBase.createZKClient(followerConnectString)) { + followerZk.sync("/"); + for (int i = 1; i <= 10; i++) { + String path = "/foo" + i; + assertNotNull(followerZk.exists(path, false), path + " not found"); + } + } + + // given: more write txns + for (int i = 1; i <= 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/bar" + i, + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + } + + // when: restart follower node + qu.shutdown(followerId); + qu.restart(followerId); + + // then: follower state should still be good too + try (ZooKeeper zk = ClientBase.createZKClient(followerConnectString)) { + for (int i = 1; i <= 10; i++) { + String path = "/bar" + i; + assertNotNull(zk.exists(path, false), path + " not found"); + } + for (int i = 1; i <= 10; i++) { + String path = "/foo" + i; + assertNotNull(zk.exists(path, false), path + " not found"); + } + } + } + + @Test + public void testDiffSyncAfterTemporaryDiskErrorAndLeading() throws Exception { + class Context { + final AtomicLong followerId = new AtomicLong(-1); + final CompletableFuture hang = new CompletableFuture<>(); + final CompletableFuture error = new CompletableFuture<>(); + } + Context context = new Context(); + // N >= 2, so we can commit with stale follower and error follower + final int N = 2; + qu = new QuorumUtil(N) { + @Override + protected QuorumPeer newQuorumPeer(PeerStruct ps) throws IOException { + QuorumPeer peer = super.newQuorumPeer(ps); + peer.setZKDatabase(new ZKDatabase(peer.getTxnFactory()) { + @Override + public void commit() throws IOException { + if (peer.follower != null && peer.getMyId() == context.followerId.get()) { + context.hang.complete(null); + context.error.join(); + throw new IOException("temporary disk error"); + } + super.commit(); + } + }); + // Force DIFF sync + peer.getZkDb().setSnapshotSizeFactor(1000000); + return peer; + } + }; + qu.startAll(); + + int[] followerIds = qu.getFollowerQuorumPeers() + .stream() + .sorted(Comparator.comparingLong(QuorumPeer::getMyId).reversed()) + .mapToInt(peer -> (int) peer.getMyId()) + .toArray(); + int followerId = followerIds[0]; + String followerConnectString = qu.getConnectionStringForServer(followerId); + + int staleFollowerId = followerIds[1]; + + // Connect to leader to avoid connection to faulty node. + String leaderConnectString = qu.getConnectString(qu.getLeaderQuorumPeer()); + try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString)) { + // given: another stale follower + qu.shutdown(staleFollowerId); + + // given: follower disk hang temporarily and error + context.followerId.set(followerId); + + // given: multiple write txn committed meanwhile + for (int i = 1; i < 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/foo" + i, + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + zk.create("/foo" + 10, new byte[0], ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT); + context.hang.join(); + context.followerId.set(-1); + context.error.complete(null); + + // given: re-join after disk error + ClientBase.waitForServerUp(followerConnectString, CONNECTION_TIMEOUT); + + // given: follower state is good + try (ZooKeeper followerZk = ClientBase.createZKClient(followerConnectString)) { + followerZk.sync("/"); + for (int i = 1; i <= 10; i++) { + String path = "/foo" + i; + assertNotNull(followerZk.exists(path, false), path + " not found"); + } + } + + // given: more write txns + for (int i = 1; i <= 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/bar" + i, + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + } + + // when: become leader + while (qu.getLeaderServer() != followerId) { + int leaderId = qu.getLeaderServer(); + long syncTimeout = (long) qu.getLeaderQuorumPeer().getTickTime() * qu.getLeaderQuorumPeer().getSyncLimit(); + qu.shutdown(leaderId); + Thread.sleep(syncTimeout); + qu.restart(leaderId); + } + + // and: write some txns + try (ZooKeeper zk = ClientBase.createZKClient(followerConnectString)) { + for (int i = 1; i <= 10; i++) { + zk.create("/foobar" + i, new byte[0], ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT); + } + } + + // when: DIFF sync to stale follower + qu.restart(staleFollowerId); + + // then: follower state should still be good too + String staleFollowerConnectString = qu.getConnectionStringForServer(staleFollowerId); + try (ZooKeeper zk = ClientBase.createZKClient(staleFollowerConnectString)) { + zk.sync("/"); + for (int i = 1; i <= 10; i++) { + String path = "/bar" + i; + assertNotNull(zk.exists(path, false), path + " not found"); + } + for (int i = 1; i <= 10; i++) { + String path = "/foo" + i; + assertNotNull(zk.exists(path, false), path + " not found"); + } + } + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java index 865d82b0c7c..0cf1b5bf164 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java @@ -120,7 +120,7 @@ public QuorumUtil(int n, int syncLimit) throws RuntimeException { for (int i = 1; i <= ALL; ++i) { PeerStruct ps = peers.get(i); LOG.info("Creating QuorumPeer {}; public port {}", i, ps.clientPort); - ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg, ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); + ps.peer = newQuorumPeer(ps); assertEquals(ps.clientPort, ps.peer.getClientPort()); } } catch (Exception e) { @@ -143,6 +143,10 @@ public void enableLocalSession(boolean localSessionEnabled) { this.localSessionEnabled = localSessionEnabled; } + protected QuorumPeer newQuorumPeer(PeerStruct ps) throws IOException { + return new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg, ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); + } + public void startAll() throws IOException { shutdownAll(); for (int i = 1; i <= ALL; ++i) { @@ -206,7 +210,7 @@ public void startQuorum() throws IOException { public void start(int id) throws IOException { PeerStruct ps = getPeer(id); LOG.info("Creating QuorumPeer {}; public port {}", ps.id, ps.clientPort); - ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg, ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); + ps.peer = newQuorumPeer(ps); if (localSessionEnabled) { ps.peer.enableLocalSessions(true); } @@ -225,7 +229,7 @@ public void restart(int id) throws IOException { public void startThenShutdown(int id) throws IOException { PeerStruct ps = getPeer(id); LOG.info("Creating QuorumPeer {}; public port {}", ps.id, ps.clientPort); - ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg, ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); + ps.peer = newQuorumPeer(ps); if (localSessionEnabled) { ps.peer.enableLocalSessions(true); }