diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f2d872a9..8d936ee86 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,14 +43,14 @@ All notable changes to this project will be documented in this file. - nifi: Add [nifi-iceberg-bundle](https://github.com/stackabletech/nifi-iceberg-bundle) for NiFi `2.2.0` ([#1060], [#1106]). - java: Add JDK 24 ([#1097]). - ci: Add golang image to mirror workflow ([#1103]). -- omid: bump version to 1.1.3 ([#1105]) -- hbase: add 2.6.2 and upgrade dependencies ([#1101]) -- kafka: Add `4.0.0` ([#1117]) -- Include `.tar.gz` snapshots of the product source code in container images ([#1126]) -- airflow: OPA authorizer for Airflow 3.x ([#1127]) -- kafka: Add `3.9.1` ([#1149]) -- spark-k8s: Add `3.5.6` ([#1142]) -- spark-connect-client: Add `3.5.6` ([#1142]) +- omid: bump version to 1.1.3 ([#1105]). +- hbase: add 2.6.2 and upgrade dependencies ([#1101]). +- kafka: Add `4.0.0` ([#1117]). +- Include `.tar.gz` snapshots of the product source code in container images ([#1126]). +- airflow: OPA authorizer for Airflow 3.x ([#1127]). +- kafka: Add `3.9.1` ([#1149]). +- spark-k8s: Add `3.5.6` ([#1142]). +- spark-connect-client: Add `3.5.6` ([#1142]). - git-sync: Bump version to 4.4.1 ([#1151]). ### Changed @@ -66,16 +66,17 @@ All notable changes to this project will be documented in this file. - yq: Bump products to use `4.45.2` ([#1090]). - cyclonedx-bom: Bump airflow and superset to use `6.0.0` ([#1090]). - vector: Bump to `0.46.1` ([#1098]). -- spark: update dependencies for 3.5.5 ([#1094]) -- nifi: include NAR SBOMs ([#1119]) +- spark: update dependencies for 3.5.5 ([#1094]). +- nifi: include NAR SBOMs ([#1119]). - nifi: update patch allowing to bypass host header validation starting with NiFi 2.4.0 ([#1125]). - BREAKING: kcat: Stop building kcat image ([#1124]). -- containerdebug updated to 0.2.0 ([#1128]) -- Build Hadoop as `stackable` and configure the Stackable Nexus build-repo for the `root` user ([#1133]) +- containerdebug updated to 0.2.0 ([#1128]). +- Build Hadoop as `stackable` and configure the Stackable Nexus build-repo for the `root` user ([#1133]). - patchable: The base branch is now configured as the git upstream branch ([#1131]). -- airflow: Updates the entrypoint script and removes the check for GID == 0 ([#1138]) +- airflow: Updates the entrypoint script and removes the check for GID == 0 ([#1138]). - druid: Bump druiod-opa-authorizer to `0.7.0` ([#1139]). - vector: Bump to `0.47.0` ([#1152]). +- zookeeper: backport ZOOKEEPER-4846, ZOOKEEPER-4921, ZOOKEEPER-4925 into Zookeeper 3.9.3 ([#1150]). ### Fixed @@ -105,7 +106,7 @@ All notable changes to this project will be documented in this file. Also remove the old release workflow. - zookeeper: Remove 3.9.2 ([#1093]). - Remove ubi8-rust-builder image ([#1091]). -- spark: remove 3.5.2 ([#1094]) +- spark: remove 3.5.2 ([#1094]). - hadoop: Remove `3.3.4` and `3.4.0` ([#1099]). - opa: Remove `0.67.1` ([#1103]). - opa: Remove legacy bundle-builder from container build ([#1103]). @@ -113,8 +114,8 @@ All notable changes to this project will be documented in this file. - hbase: Remove 2.4.18 ([#1101]) - druid: Remove `30.0.0` ([#1110]). - nifi: Remove `2.2.0` ([#1114]). -- kafka: Remove `3.7.1` and `3.8.0` ([#1117]) -- spark-connect-client: Remove `3.5.5` ([#1142]) +- kafka: Remove `3.7.1` and `3.8.0` ([#1117]). +- spark-connect-client: Remove `3.5.5` ([#1142]). [nifi-iceberg-bundle]: https://github.com/stackabletech/nifi-iceberg-bundle [#1025]: https://github.com/stackabletech/docker-images/pull/1025 @@ -169,8 +170,9 @@ All notable changes to this project will be documented in this file. [#1137]: https://github.com/stackabletech/docker-images/pull/1137 [#1138]: https://github.com/stackabletech/docker-images/pull/1138 [#1139]: https://github.com/stackabletech/docker-images/pull/1139 -[#1149]: https://github.com/stackabletech/docker-images/pull/1149 [#1142]: https://github.com/stackabletech/docker-images/pull/1142 +[#1149]: https://github.com/stackabletech/docker-images/pull/1149 +[#1150]: https://github.com/stackabletech/docker-images/pull/1150 [#1151]: https://github.com/stackabletech/docker-images/pull/1151 [#1152]: https://github.com/stackabletech/docker-images/pull/1152 diff --git a/zookeeper/stackable/patches/3.9.3/0001-Add-CycloneDX-plugin.patch b/zookeeper/stackable/patches/3.9.3/0001-Add-CycloneDX-plugin.patch index 901191646..725492275 100644 --- a/zookeeper/stackable/patches/3.9.3/0001-Add-CycloneDX-plugin.patch +++ b/zookeeper/stackable/patches/3.9.3/0001-Add-CycloneDX-plugin.patch @@ -8,7 +8,7 @@ Subject: Add CycloneDX plugin 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml -index 6ef4011f..07ae7538 100644 +index 6ef4011fe..07ae75387 100644 --- a/pom.xml +++ b/pom.xml @@ -925,7 +925,7 @@ diff --git a/zookeeper/stackable/patches/3.9.3/0002-ZOOKEEPER-4846-Failure-to-reload-database-due-to-mis.patch b/zookeeper/stackable/patches/3.9.3/0002-ZOOKEEPER-4846-Failure-to-reload-database-due-to-mis.patch new file mode 100644 index 000000000..e0944a1ac --- /dev/null +++ b/zookeeper/stackable/patches/3.9.3/0002-ZOOKEEPER-4846-Failure-to-reload-database-due-to-mis.patch @@ -0,0 +1,64 @@ +From 4004002a9ff08a539a94842ea12a2a449274e968 Mon Sep 17 00:00:00 2001 +From: =?UTF-8?q?Andor=20Moln=C3=A1r?= +Date: Tue, 11 Feb 2025 10:43:20 -0600 +Subject: ZOOKEEPER-4846: Failure to reload database due to missing ACL + +ZOOKEEPER-4846. Fix ACL reference on existing znode when trying to create +Reviewers: cnauroth, eolivelli, ztzg +Author: anmolnar +Closes #2222 from anmolnar/ZOOKEEPER-4846 +--- + .../org/apache/zookeeper/server/DataTree.java | 5 +++-- + .../apache/zookeeper/server/DataTreeTest.java | 16 ++++++++++++++++ + 2 files changed, 19 insertions(+), 2 deletions(-) + +diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +index 3b61c80d8..af937f834 100644 +--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java ++++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +@@ -462,8 +462,9 @@ public class DataTree { + // we did for the global sessions. + Long acls = aclCache.convertAcls(acl); + +- Set children = parent.getChildren(); +- if (children.contains(childName)) { ++ DataNode existingChild = nodes.get(path); ++ if (existingChild != null) { ++ existingChild.acl = acls; + throw new NodeExistsException(); + } + +diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java +index 07a69f14f..fc20ed320 100644 +--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java ++++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java +@@ -23,6 +23,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; + import static org.junit.jupiter.api.Assertions.assertNotEquals; + import static org.junit.jupiter.api.Assertions.assertNotNull; + import static org.junit.jupiter.api.Assertions.assertNull; ++import static org.junit.jupiter.api.Assertions.assertThrows; + import static org.junit.jupiter.api.Assertions.assertTrue; + import java.io.ByteArrayInputStream; + import java.io.ByteArrayOutputStream; +@@ -631,6 +632,21 @@ public class DataTreeTest extends ZKTestCase { + } + } + ++ @Test ++ public void testCreateNodeFixMissingACL() throws Exception { ++ DataTree dt = new DataTree(); ++ ReferenceCountedACLCache aclCache = dt.getReferenceCountedAclCache(); ++ ++ dt.createNode("/the_parent", new byte[0], ZooDefs.Ids.CREATOR_ALL_ACL, -1, 1, 1, 0); ++ Long aclId = dt.getNode("/the_parent").acl; ++ aclCache.removeUsage(aclId); ++ aclCache.purgeUnused(); ++ // try to re-create the parent -> throws NodeExistsException, but fixes the deleted ACL ++ assertThrows(NodeExistsException.class, () -> ++ dt.createNode("/the_parent", new byte[0], ZooDefs.Ids.CREATOR_ALL_ACL, -1, 1, 1, 0)); ++ dt.createNode("/the_parent/the_child", new byte[0], ZooDefs.Ids.CREATOR_ALL_ACL, -1, 2, 2, 2); ++ } ++ + private DataTree buildDataTreeForTest() { + final DataTree dt = new DataTree(); + assertEquals(dt.lastProcessedZxid, 0); diff --git a/zookeeper/stackable/patches/3.9.3/0003-ZOOKEEPER-4921-Retry-endlessly-to-establish-a-brand-.patch b/zookeeper/stackable/patches/3.9.3/0003-ZOOKEEPER-4921-Retry-endlessly-to-establish-a-brand-.patch new file mode 100644 index 000000000..65838c2bb --- /dev/null +++ b/zookeeper/stackable/patches/3.9.3/0003-ZOOKEEPER-4921-Retry-endlessly-to-establish-a-brand-.patch @@ -0,0 +1,143 @@ +From 90e8e0f44e8a884765b6e7afe8bd779d59136fad Mon Sep 17 00:00:00 2001 +From: Kezhu Wang +Date: Sat, 26 Apr 2025 12:04:01 +0800 +Subject: ZOOKEEPER-4921: Retry endlessly to establish a brand-new session + +This partially rollback ZOOKEEPER-4508 to keep consistent with versions +prior to 3.9.3 (excluded), so to maintain compatibility with third party +libraries. + +Refs: ZOOKEEPER-4508, ZOOKEEPER-4921, ZOOKEEPER-4923 and +https://lists.apache.org/thread/nfb9z7rhgglbjzfxvg4z2m3pks53b3c1 +--- + .../java/org/apache/zookeeper/ClientCnxn.java | 2 +- + .../zookeeper/test/SessionTimeoutTest.java | 65 +++++++++++++------ + 2 files changed, 47 insertions(+), 20 deletions(-) + +diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java +index 0bf616c61..207bb8c49 100644 +--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java ++++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java +@@ -1242,7 +1242,7 @@ public class ClientCnxn { + to = connectTimeout - clientCnxnSocket.getIdleSend(); + } + +- int expiration = expirationTimeout - clientCnxnSocket.getIdleRecv(); ++ int expiration = sessionId == 0 ? Integer.MAX_VALUE : expirationTimeout - clientCnxnSocket.getIdleRecv(); + if (expiration <= 0) { + String warnInfo = String.format( + "Client session timed out, have not heard from server in %dms for session id 0x%s", +diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java +index 7a59f5eb9..9f5943f68 100644 +--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java ++++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java +@@ -18,6 +18,9 @@ + + package org.apache.zookeeper.test; + ++import static org.hamcrest.MatcherAssert.assertThat; ++import static org.hamcrest.Matchers.greaterThanOrEqualTo; ++import static org.hamcrest.Matchers.lessThan; + import static org.junit.jupiter.api.Assertions.assertNotNull; + import static org.junit.jupiter.api.Assertions.assertNull; + import static org.junit.jupiter.api.Assertions.assertThrows; +@@ -31,12 +34,15 @@ import java.util.List; + import java.util.concurrent.CompletableFuture; + import java.util.concurrent.CountDownLatch; + import java.util.concurrent.TimeUnit; ++import java.util.concurrent.TimeoutException; + import org.apache.zookeeper.CreateMode; + import org.apache.zookeeper.KeeperException; + import org.apache.zookeeper.TestableZooKeeper; + import org.apache.zookeeper.WatchedEvent; + import org.apache.zookeeper.Watcher; + import org.apache.zookeeper.ZooDefs; ++import org.apache.zookeeper.ZooKeeper; ++import org.apache.zookeeper.common.Time; + import org.junit.jupiter.api.BeforeEach; + import org.junit.jupiter.api.Test; + import org.slf4j.Logger; +@@ -54,6 +60,21 @@ public class SessionTimeoutTest extends ClientBase { + zk = createClient(); + } + ++ private static class ExpiredWatcher implements Watcher { ++ public volatile CompletableFuture expired = new CompletableFuture<>(); ++ ++ synchronized void reset() { ++ expired = new CompletableFuture<>(); ++ } ++ ++ @Override ++ public synchronized void process(WatchedEvent event) { ++ if (event.getState() == Event.KeeperState.Expired) { ++ expired.complete(null); ++ } ++ } ++ } ++ + private static class BusyServer implements AutoCloseable { + private final ServerSocket server; + private final Socket client; +@@ -143,17 +164,24 @@ public class SessionTimeoutTest extends ClientBase { + // stop client also to gain less distraction + zk.close(); + +- // small connection timeout to gain quick ci feedback +- int sessionTimeout = 3000; +- CompletableFuture expired = new CompletableFuture<>(); ++ // given: established session ++ int sessionTimeout = 3000; // small connection timeout to gain quick ci feedback ++ ExpiredWatcher watcher = new ExpiredWatcher(); + zk = createClient(new CountdownWatcher(), hostPort, sessionTimeout); +- zk.register(event -> { +- if (event.getState() == Watcher.Event.KeeperState.Expired) { +- expired.complete(null); +- } +- }); ++ zk.register(watcher); ++ ++ // when: all server down ++ long start = Time.currentElapsedTime(); ++ zk.sync("/"); // touch timeout counts + stopServer(); +- expired.join(); ++ ++ // then: get Expired after session timeout ++ watcher.expired.join(); ++ long elapsed = Time.currentElapsedTime() - start; ++ assertThat(elapsed, greaterThanOrEqualTo((long) zk.getSessionTimeout())); ++ assertThat(elapsed, lessThan(zk.getSessionTimeout() * 10L)); ++ ++ // then: future request will get SessionExpiredException + assertThrows(KeeperException.SessionExpiredException.class, () -> zk.exists("/", null)); + } + +@@ -162,18 +190,17 @@ public class SessionTimeoutTest extends ClientBase { + // stop client also to gain less distraction + zk.close(); + ++ // given: unavailable cluster + stopServer(); + +- // small connection timeout to gain quick ci feedback +- int sessionTimeout = 3000; +- CompletableFuture expired = new CompletableFuture<>(); +- new TestableZooKeeper(hostPort, sessionTimeout, event -> { +- if (event.getState() == Watcher.Event.KeeperState.Expired) { +- expired.complete(null); +- } +- }); +- expired.join(); +- assertThrows(KeeperException.SessionExpiredException.class, () -> zk.exists("/", null)); ++ // when: try to establish a brand-new session ++ int sessionTimeout = 300; // small connection timeout to gain quick ci feedback ++ ExpiredWatcher watcher = new ExpiredWatcher(); ++ try (ZooKeeper zk = new ZooKeeper(hostPort, sessionTimeout, watcher)) { ++ // then: never Expired ++ assertThrows(TimeoutException.class, () -> watcher.expired.get(3 * sessionTimeout, TimeUnit.MILLISECONDS)); ++ assertThrows(KeeperException.ConnectionLossException.class, () -> zk.exists("/", null)); ++ } + } + + @Test diff --git a/zookeeper/stackable/patches/3.9.3/0004-ZOOKEEPER-4925-Fix-data-loss-due-to-propagation-of-d.patch b/zookeeper/stackable/patches/3.9.3/0004-ZOOKEEPER-4925-Fix-data-loss-due-to-propagation-of-d.patch new file mode 100644 index 000000000..76df03879 --- /dev/null +++ b/zookeeper/stackable/patches/3.9.3/0004-ZOOKEEPER-4925-Fix-data-loss-due-to-propagation-of-d.patch @@ -0,0 +1,611 @@ +From d16264bd13ce61ae5a4375e30e9d1787c1206747 Mon Sep 17 00:00:00 2001 +From: Kezhu Wang +Date: Wed, 30 Apr 2025 11:45:05 +0800 +Subject: ZOOKEEPER-4925: Fix data loss due to propagation of discontinuous + committedLog + +There are two variants of `ZooKeeperServer::processTxn`. Those two +variants diverge significantly since ZOOKEEPER-3484. +`processTxn(Request request)` pops outstanding change from +`outstandingChanges` and adds txn to `committedLog` for follower to sync +in addition to what `processTxn(TxnHeader hdr, Record txn)` does. The +`Learner` uses `processTxn(TxnHeader hdr, Record txn)` to commit txn to +memory after ZOOKEEPER-4394, which means it leaves `committedLog` +untouched in `SYNCHRONIZATION` phase. + +This way, a stale follower will have hole in its `committedLog` after +joining cluster. The stale follower will propagate the in memory hole +to other stale nodes after becoming leader. This causes data loss. + +The test case fails on master and 3.9.3, and passes on 3.9.2. So only +3.9.3 is affected. + +This commit drops `processTxn(TxnHeader hdr, Record txn)` as +`processTxn(Request request)` is capable in `SYNCHRONIZATION` phase too. + +Also, this commit rejects discontinuous proposals in `syncWithLeader` +and `committedLog`, so to avoid possible data loss. + +Refs: ZOOKEEPER-4925, ZOOKEEPER-4394, ZOOKEEPER-3484 + +Add separated code to enforce continuous proposals +--- + .../org/apache/zookeeper/server/Request.java | 13 +++ + .../apache/zookeeper/server/TxnLogEntry.java | 4 + + .../apache/zookeeper/server/ZKDatabase.java | 28 +++-- + .../zookeeper/server/ZooKeeperServer.java | 21 ++-- + .../zookeeper/server/quorum/Follower.java | 4 +- + .../quorum/FollowerZooKeeperServer.java | 34 ++---- + .../zookeeper/server/quorum/Learner.java | 58 ++++++---- + .../zookeeper/server/quorum/Observer.java | 11 +- + .../zookeeper/server/TxnLogDigestTest.java | 2 + + .../zookeeper/server/ZxidRolloverTest.java | 2 + + .../server/quorum/QuorumSyncTest.java | 100 ++++++++++++++++++ + 11 files changed, 196 insertions(+), 81 deletions(-) + create mode 100644 zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSyncTest.java + +diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java +index c174fdd1e..ad5071375 100644 +--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java ++++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java +@@ -78,6 +78,19 @@ public class Request { + this.authInfo = null; + } + ++ public Request(TxnHeader hdr, Record txn, TxnDigest digest) { ++ this.sessionId = hdr.getClientId(); ++ this.cxid = hdr.getCxid(); ++ this.type = hdr.getType(); ++ this.hdr = hdr; ++ this.txn = txn; ++ this.zxid = hdr.getZxid(); ++ this.request = null; ++ this.cnxn = null; ++ this.authInfo = null; ++ this.txnDigest = digest; ++ } ++ + public final long sessionId; + + public final int cxid; +diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java +index 352eb81da..409fd21fa 100644 +--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java ++++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java +@@ -47,4 +47,8 @@ public final class TxnLogEntry { + public TxnDigest getDigest() { + return digest; + } ++ ++ public Request toRequest() { ++ return new Request(header, txn, digest); ++ } + } +diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java +index 7258daa7c..7a26d8362 100644 +--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java ++++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java +@@ -58,6 +58,7 @@ import org.apache.zookeeper.server.quorum.Leader.Proposal; + import org.apache.zookeeper.server.quorum.Leader.PureRequestProposal; + import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; + import org.apache.zookeeper.server.util.SerializeUtils; ++import org.apache.zookeeper.server.util.ZxidUtils; + import org.apache.zookeeper.txn.TxnDigest; + import org.apache.zookeeper.txn.TxnHeader; + import org.slf4j.Logger; +@@ -82,6 +83,8 @@ public class ZKDatabase { + protected FileTxnSnapLog snapLog; + protected long minCommittedLog, maxCommittedLog; + ++ private final boolean allowDiscontinuousProposals = Boolean.getBoolean("zookeeper.test.allowDiscontinuousProposals"); ++ + /** + * Default value is to use snapshot if txnlog size exceeds 1/3 the size of snapshot + */ +@@ -170,8 +173,6 @@ public class ZKDatabase { + * data structures in zkdatabase. + */ + public void clear() { +- minCommittedLog = 0; +- maxCommittedLog = 0; + /* to be safe we just create a new + * datatree. + */ +@@ -182,6 +183,8 @@ public class ZKDatabase { + try { + lock.lock(); + committedLog.clear(); ++ minCommittedLog = 0; ++ maxCommittedLog = 0; + } finally { + lock.unlock(); + } +@@ -320,17 +323,30 @@ public class ZKDatabase { + WriteLock wl = logLock.writeLock(); + try { + wl.lock(); +- if (committedLog.size() > commitLogCount) { +- committedLog.remove(); +- minCommittedLog = committedLog.peek().getZxid(); +- } + if (committedLog.isEmpty()) { + minCommittedLog = request.zxid; + maxCommittedLog = request.zxid; ++ } else if (request.zxid <= maxCommittedLog) { ++ // This could happen if lastProcessedZxid is rewinded and database is re-synced. ++ // Currently, it only happens in test codes, but it should also be safe for production path. ++ return; ++ } else if (!allowDiscontinuousProposals ++ && request.zxid != maxCommittedLog + 1 ++ && ZxidUtils.getEpochFromZxid(request.zxid) <= ZxidUtils.getEpochFromZxid(maxCommittedLog)) { ++ String msg = String.format( ++ "Committed proposal cached out of order: 0x%s is not the next proposal of 0x%s", ++ ZxidUtils.zxidToString(request.zxid), ++ ZxidUtils.zxidToString(maxCommittedLog)); ++ LOG.error(msg); ++ throw new IllegalStateException(msg); + } + PureRequestProposal p = new PureRequestProposal(request); + committedLog.add(p); + maxCommittedLog = p.getZxid(); ++ if (committedLog.size() > commitLogCount) { ++ committedLog.remove(); ++ minCommittedLog = committedLog.peek().getZxid(); ++ } + } finally { + wl.unlock(); + } +diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +index 6740f6d52..14dd59b8c 100644 +--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java ++++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +@@ -1846,13 +1846,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { + cnxn.sendResponse(replyHeader, record, "response"); + } + +- // entry point for quorum/Learner.java +- public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { +- processTxnForSessionEvents(null, hdr, txn); +- return processTxnInDB(hdr, txn, null); +- } +- +- // entry point for FinalRequestProcessor.java + public ProcessTxnResult processTxn(Request request) { + TxnHeader hdr = request.getHdr(); + processTxnForSessionEvents(request, hdr, request.getTxn()); +@@ -1864,8 +1857,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { + if (!writeRequest && !quorumRequest) { + return new ProcessTxnResult(); + } ++ ++ ProcessTxnResult rc; + synchronized (outstandingChanges) { +- ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest()); ++ rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest()); + + // request.hdr is set for write requests, which are the only ones + // that add to outstandingChanges. +@@ -1886,13 +1881,13 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { + } + } + } ++ } + +- // do not add non quorum packets to the queue. +- if (quorumRequest) { +- getZKDatabase().addCommittedProposal(request); +- } +- return rc; ++ // do not add non quorum packets to the queue. ++ if (quorumRequest) { ++ getZKDatabase().addCommittedProposal(request); + } ++ return rc; + } + + private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) { +diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java +index 0eff9d248..ca99974cb 100644 +--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java ++++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java +@@ -35,7 +35,6 @@ import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; + import org.apache.zookeeper.server.util.SerializeUtils; + import org.apache.zookeeper.server.util.ZxidUtils; + import org.apache.zookeeper.txn.SetDataTxn; +-import org.apache.zookeeper.txn.TxnDigest; + import org.apache.zookeeper.txn.TxnHeader; + + /** +@@ -164,7 +163,6 @@ public class Follower extends Learner { + TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData()); + TxnHeader hdr = logEntry.getHeader(); + Record txn = logEntry.getTxn(); +- TxnDigest digest = logEntry.getDigest(); + if (hdr.getZxid() != lastQueued + 1) { + LOG.warn( + "Got zxid 0x{} expected 0x{}", +@@ -179,7 +177,7 @@ public class Follower extends Learner { + self.setLastSeenQuorumVerifier(qv, true); + } + +- fzk.logRequest(hdr, txn, digest); ++ fzk.logRequest(logEntry.toRequest()); + if (hdr != null) { + /* + * Request header is created only by the leader, so this is only set +diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java +index b67661999..1b0b5cd92 100644 +--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java ++++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java +@@ -22,7 +22,6 @@ import java.io.IOException; + import java.util.concurrent.ConcurrentLinkedQueue; + import java.util.concurrent.LinkedBlockingQueue; + import javax.management.JMException; +-import org.apache.jute.Record; + import org.apache.zookeeper.jmx.MBeanRegistry; + import org.apache.zookeeper.metrics.MetricsContext; + import org.apache.zookeeper.server.ExitCode; +@@ -33,8 +32,6 @@ import org.apache.zookeeper.server.ServerMetrics; + import org.apache.zookeeper.server.SyncRequestProcessor; + import org.apache.zookeeper.server.ZKDatabase; + import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +-import org.apache.zookeeper.txn.TxnDigest; +-import org.apache.zookeeper.txn.TxnHeader; + import org.apache.zookeeper.util.ServiceUtils; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; +@@ -79,20 +76,17 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer { + + LinkedBlockingQueue pendingTxns = new LinkedBlockingQueue<>(); + +- public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) { +- final Request request = buildRequestToProcess(hdr, txn, digest); ++ public void logRequest(Request request) { ++ if ((request.zxid & 0xffffffffL) != 0) { ++ pendingTxns.add(request); ++ } + syncProcessor.processRequest(request); + } + + /** +- * Build a request for the txn and append it to the transaction log +- * @param hdr the txn header +- * @param txn the txn +- * @param digest the digest of txn ++ * Append txn request to the transaction log directly without go through request processors. + */ +- public void appendRequest(final TxnHeader hdr, final Record txn, final TxnDigest digest) throws IOException { +- final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid()); +- request.setTxnDigest(digest); ++ public void appendRequest(Request request) throws IOException { + getZKDatabase().append(request); + } + +@@ -188,20 +182,4 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer { + rootContext.unregisterGauge("synced_observers"); + + } +- +- /** +- * Build a request for the txn +- * @param hdr the txn header +- * @param txn the txn +- * @param digest the digest of txn +- * @return a request moving through a chain of RequestProcessors +- */ +- private Request buildRequestToProcess(final TxnHeader hdr, final Record txn, final TxnDigest digest) { +- final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid()); +- request.setTxnDigest(digest); +- if ((request.zxid & 0xffffffffL) != 0) { +- pendingTxns.add(request); +- } +- return request; +- } + } +diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +index 1ef99e50a..adf0ef6e5 100644 +--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java ++++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +@@ -82,6 +82,10 @@ public class Learner { + Record rec; + TxnDigest digest; + ++ Request toRequest() { ++ return new Request(hdr, rec, digest); ++ } ++ + } + + QuorumPeer self; +@@ -535,6 +539,27 @@ public class Learner { + } + } + ++ long enforceContinuousProposal(long lastQueued, PacketInFlight pif) throws Exception { ++ if (lastQueued == 0) { ++ LOG.info("DIFF sync got first proposal 0x{}", Long.toHexString(pif.hdr.getZxid())); ++ } else if (pif.hdr.getZxid() != lastQueued + 1) { ++ if (ZxidUtils.getEpochFromZxid(pif.hdr.getZxid()) <= ZxidUtils.getEpochFromZxid(lastQueued)) { ++ String msg = String.format( ++ "DIFF sync got proposal 0x%s, last queued 0x%s, expected 0x%s", ++ Long.toHexString(pif.hdr.getZxid()), Long.toHexString(lastQueued), ++ Long.toHexString(lastQueued + 1)); ++ LOG.error(msg); ++ throw new Exception(msg); ++ } ++ // We can't tell whether it is a data loss. Given that new epoch is rare, ++ // log at warn should not be too verbose. ++ LOG.warn("DIFF sync got new epoch proposal 0x{}, last queued 0x{}, expected 0x{}", ++ Long.toHexString(pif.hdr.getZxid()), Long.toHexString(lastQueued), ++ Long.toHexString(lastQueued + 1)); ++ } ++ return pif.hdr.getZxid(); ++ } ++ + /** + * Finally, synchronize our history with the Leader (if Follower) + * or the LearnerMaster (if Observer). +@@ -609,6 +634,8 @@ public class Learner { + zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); + zk.createSessionTracker(); + ++ // TODO: Ideally, this should be lastProcessZxid(a.k.a. QuorumPacket::zxid from above), but currently ++ // LearnerHandler does not guarantee this. So, let's be conservative and keep it unchange for now. + long lastQueued = 0; + + // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0 +@@ -630,13 +657,7 @@ public class Learner { + pif.hdr = logEntry.getHeader(); + pif.rec = logEntry.getTxn(); + pif.digest = logEntry.getDigest(); +- if (pif.hdr.getZxid() != lastQueued + 1) { +- LOG.warn( +- "Got zxid 0x{} expected 0x{}", +- Long.toHexString(pif.hdr.getZxid()), +- Long.toHexString(lastQueued + 1)); +- } +- lastQueued = pif.hdr.getZxid(); ++ lastQueued = enforceContinuousProposal(lastQueued, pif); + + if (pif.hdr.getType() == OpCode.reconfig) { + SetDataTxn setDataTxn = (SetDataTxn) pif.rec; +@@ -666,7 +687,7 @@ public class Learner { + Long.toHexString(qp.getZxid()), + Long.toHexString(pif.hdr.getZxid())); + } else { +- zk.processTxn(pif.hdr, pif.rec); ++ zk.processTxn(pif.toRequest()); + packetsNotLogged.remove(); + } + } else { +@@ -696,18 +717,11 @@ public class Learner { + packet.rec = logEntry.getTxn(); + packet.hdr = logEntry.getHeader(); + packet.digest = logEntry.getDigest(); +- // Log warning message if txn comes out-of-order +- if (packet.hdr.getZxid() != lastQueued + 1) { +- LOG.warn( +- "Got zxid 0x{} expected 0x{}", +- Long.toHexString(packet.hdr.getZxid()), +- Long.toHexString(lastQueued + 1)); +- } +- lastQueued = packet.hdr.getZxid(); ++ lastQueued = enforceContinuousProposal(lastQueued, packet); + } + if (!writeToTxnLog) { + // Apply to db directly if we haven't taken the snapshot +- zk.processTxn(packet.hdr, packet.rec); ++ zk.processTxn(packet.toRequest()); + } else { + packetsNotLogged.add(packet); + packetsCommitted.add(qp.getZxid()); +@@ -780,8 +794,9 @@ public class Learner { + continue; + } + packetsNotLogged.removeFirst(); +- fzk.appendRequest(pif.hdr, pif.rec, pif.digest); +- fzk.processTxn(pif.hdr, pif.rec); ++ Request request = pif.toRequest(); ++ fzk.appendRequest(request); ++ fzk.processTxn(request); + } + + // @see https://issues.apache.org/jira/browse/ZOOKEEPER-4646 +@@ -823,7 +838,7 @@ public class Learner { + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + for (PacketInFlight p : packetsNotLogged) { +- fzk.logRequest(p.hdr, p.rec, p.digest); ++ fzk.logRequest(p.toRequest()); + } + LOG.info("{} txns have been logged asynchronously", packetsNotLogged.size()); + +@@ -847,8 +862,7 @@ public class Learner { + continue; + } + packetsCommitted.remove(); +- Request request = new Request(p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1); +- request.setTxnDigest(p.digest); ++ Request request = p.toRequest(); + ozk.commitRequest(request); + } + } else { +diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java +index d3aa41b5f..334fa54c1 100644 +--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java ++++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java +@@ -202,12 +202,8 @@ public class Observer extends Learner { + case Leader.INFORM: + ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1); + logEntry = SerializeUtils.deserializeTxn(qp.getData()); +- hdr = logEntry.getHeader(); +- txn = logEntry.getTxn(); +- digest = logEntry.getDigest(); +- Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0); ++ Request request = logEntry.toRequest(); + request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY); +- request.setTxnDigest(digest); + ObserverZooKeeperServer obs = (ObserverZooKeeperServer) zk; + obs.commitRequest(request); + break; +@@ -219,13 +215,10 @@ public class Observer extends Learner { + byte[] remainingdata = new byte[buffer.remaining()]; + buffer.get(remainingdata); + logEntry = SerializeUtils.deserializeTxn(remainingdata); +- hdr = logEntry.getHeader(); + txn = logEntry.getTxn(); +- digest = logEntry.getDigest(); + QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) txn).getData(), UTF_8)); + +- request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0); +- request.setTxnDigest(digest); ++ request = logEntry.toRequest(); + obs = (ObserverZooKeeperServer) zk; + + boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true); +diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java +index 75d6fe680..b52ea3418 100644 +--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java ++++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java +@@ -60,6 +60,7 @@ public class TxnLogDigestTest extends ClientBase { + + @BeforeEach + public void setUp() throws Exception { ++ System.setProperty("zookeeper.test.allowDiscontinuousProposals", "true"); + super.setUp(); + server = serverFactory.getZooKeeperServer(); + zk = createClient(); +@@ -67,6 +68,7 @@ public class TxnLogDigestTest extends ClientBase { + + @AfterEach + public void tearDown() throws Exception { ++ System.clearProperty("zookeeper.test.allowDiscontinuousProposals"); + // server will be closed in super.tearDown + super.tearDown(); + +diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java +index 031ccc2f7..b23fd80a3 100644 +--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java ++++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java +@@ -60,6 +60,7 @@ public class ZxidRolloverTest extends ZKTestCase { + @BeforeEach + public void setUp() throws Exception { + System.setProperty("zookeeper.admin.enableServer", "false"); ++ System.setProperty("zookeeper.test.allowDiscontinuousProposals", "true"); + + // set the snap count to something low so that we force log rollover + // and verify that is working as part of the epoch rollover. +@@ -215,6 +216,7 @@ public class ZxidRolloverTest extends ZKTestCase { + + @AfterEach + public void tearDown() throws Exception { ++ System.clearProperty("zookeeper.test.allowDiscontinuousProposals"); + LOG.info("tearDown starting"); + for (int i = 0; i < zkClients.length; i++) { + zkClients[i].close(); +diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSyncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSyncTest.java +new file mode 100644 +index 000000000..c4b7720cf +--- /dev/null ++++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSyncTest.java +@@ -0,0 +1,100 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.zookeeper.server.quorum; ++ ++import static org.junit.jupiter.api.Assertions.assertNotNull; ++import java.util.Comparator; ++import org.apache.zookeeper.CreateMode; ++import org.apache.zookeeper.ZKTestCase; ++import org.apache.zookeeper.ZooDefs; ++import org.apache.zookeeper.ZooKeeper; ++import org.apache.zookeeper.test.ClientBase; ++import org.apache.zookeeper.test.QuorumUtil; ++import org.junit.jupiter.api.AfterEach; ++import org.junit.jupiter.api.Test; ++ ++public class QuorumSyncTest extends ZKTestCase { ++ private QuorumUtil qu; ++ ++ @AfterEach ++ public void tearDown() throws Exception { ++ if (qu != null) { ++ qu.shutdownAll(); ++ } ++ } ++ ++ @Test ++ public void testStaleDiffSync() throws Exception { ++ qu = new QuorumUtil(2); ++ qu.startAll(); ++ ++ int[] followerIds = qu.getFollowerQuorumPeers() ++ .stream() ++ .sorted(Comparator.comparingLong(QuorumPeer::getMyId).reversed()) ++ .mapToInt(peer -> (int) peer.getMyId()).toArray(); ++ ++ int follower1 = followerIds[0]; ++ int follower2 = followerIds[1]; ++ ++ String leaderConnectString = qu.getConnectString(qu.getLeaderQuorumPeer()); ++ try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString)) { ++ qu.shutdown(follower2); ++ ++ for (int i = 0; i < 10; i++) { ++ zk.create("/foo" + i, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); ++ } ++ ++ qu.shutdown(follower1); ++ ++ for (int i = 0; i < 10; i++) { ++ zk.create("/bar" + i, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); ++ } ++ ++ qu.restart(follower1); ++ } ++ ++ try (ZooKeeper zk = ClientBase.createZKClient(qu.getConnectionStringForServer(follower1))) { ++ for (int i = 0; i < 10; i++) { ++ String path = "/foo" + i; ++ assertNotNull(zk.exists(path, false), path + " not found"); ++ } ++ ++ for (int i = 0; i < 10; i++) { ++ String path = "/bar" + i; ++ assertNotNull(zk.exists(path, false), path + " not found"); ++ } ++ } ++ ++ qu.shutdown(qu.getLeaderServer()); ++ ++ qu.restart(follower2); ++ ++ try (ZooKeeper zk = ClientBase.createZKClient(qu.getConnectionStringForServer(follower2))) { ++ for (int i = 0; i < 10; i++) { ++ String path = "/foo" + i; ++ assertNotNull(zk.exists(path, false), path + " not found"); ++ } ++ ++ for (int i = 0; i < 10; i++) { ++ String path = "/bar" + i; ++ assertNotNull(zk.exists(path, false), path + " not found"); ++ } ++ } ++ } ++}