From 58a5efd67e7b9f4eb9611379d706f521304ba79e Mon Sep 17 00:00:00 2001 From: xuxiong1 Date: Wed, 3 Dec 2025 17:04:25 -0800 Subject: [PATCH 1/2] Add support for forward translog reading Signed-off-by: xuxiong1 --- .../org/opensearch/index/IndexSettings.java | 27 ++++++++ .../index/translog/MultiSnapshot.java | 39 ++++++++--- .../opensearch/index/translog/Translog.java | 3 +- .../IndexLevelReplicationTests.java | 68 +++++++++++++++++++ .../index/translog/LocalTranslogTests.java | 60 ++++++++++++++++ .../indices/recovery/RecoveryTests.java | 41 +++++++++++ 6 files changed, 226 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 7c5be554a7760..ae3ee4aa60d78 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -196,6 +196,24 @@ public static IndexMergePolicy fromString(String text) { Property.Dynamic, Property.IndexScope ); + /** + * Controls whether translog operations are read in forward order (oldest to newest) or backward order (newest to oldest). + * Default is false (backward reading), which is the traditional behavior that naturally handles sequence number collisions + * by prioritizing operations from newer generations. + *

+ * Note: Enabling forward reading is safe for most use cases. However, in rare edge cases, it may replay stale + * translog operations. Stale operation trimming (via + * {@link org.opensearch.index.shard.IndexShard#trimOperationOfPreviousPrimaryTerms(long)}) occurs during the recovery + * finalization phase. If a replica fails before completing + * {@link org.opensearch.indices.recovery.RecoveryTarget#finalizeRecovery(long, long, org.opensearch.core.action.ActionListener)} + * (leaving untrimmed stale operations) and no in-sync copies are available, we force-allocate this recovering replica as primary. + * In this scenario, forward reading could return outdated operations from previous primary terms. + */ + public static final Setting INDEX_TRANSLOG_READ_FORWARD_SETTING = Setting.boolSetting( + "index.translog.read_forward", + false, + Property.IndexScope + ); public static final Setting INDEX_WARMER_ENABLED_SETTING = Setting.boolSetting( "index.warmer.enabled", true, @@ -892,6 +910,7 @@ public static IndexMergePolicy fromString(String text) { private final boolean queryStringAllowLeadingWildcard; private final boolean defaultAllowUnmappedFields; private volatile Translog.Durability durability; + private final boolean translogReadForward; private volatile TimeValue syncInterval; private volatile TimeValue publishReferencedSegmentsInterval; private volatile TimeValue refreshInterval; @@ -1112,6 +1131,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti this.defaultAllowUnmappedFields = scopedSettings.get(ALLOW_UNMAPPED); this.allowDerivedField = scopedSettings.get(ALLOW_DERIVED_FIELDS); this.durability = scopedSettings.get(INDEX_TRANSLOG_DURABILITY_SETTING); + this.translogReadForward = INDEX_TRANSLOG_READ_FORWARD_SETTING.get(settings); defaultFields = scopedSettings.get(DEFAULT_FIELD_SETTING); syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings); publishReferencedSegmentsInterval = INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING.get(settings); @@ -2078,6 +2098,13 @@ public boolean isSoftDeleteEnabled() { return softDeleteEnabled; } + /** + * Returns true if translog read-forward is enabled. + */ + public boolean isTranslogReadForward() { + return translogReadForward; + } + public boolean isContextAwareEnabled() { return contextAwareEnabled && FeatureFlags.isEnabled(CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_SETTING); } diff --git a/server/src/main/java/org/opensearch/index/translog/MultiSnapshot.java b/server/src/main/java/org/opensearch/index/translog/MultiSnapshot.java index 941283afe5908..d3bb51d43701a 100644 --- a/server/src/main/java/org/opensearch/index/translog/MultiSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/MultiSnapshot.java @@ -54,17 +54,19 @@ final class MultiSnapshot implements Translog.Snapshot { private final Closeable onClose; private int index; private final SeqNoSet seenSeqNo; + private final boolean readForward; /** * Creates a new point in time snapshot of the given snapshots. Those snapshots are always iterated in-order. */ - MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose) { + MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose, boolean readForward) { this.translogs = translogs; this.totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum(); this.overriddenOperations = 0; this.onClose = onClose; this.seenSeqNo = new SeqNoSet(); - this.index = translogs.length - 1; + this.readForward = readForward; + this.index = readForward ? 0 : translogs.length - 1; } @Override @@ -79,15 +81,30 @@ public int skippedOperations() { @Override public Translog.Operation next() throws IOException { - // TODO: Read translog forward in 9.0+ - for (; index >= 0; index--) { - final TranslogSnapshot current = translogs[index]; - Translog.Operation op; - while ((op = current.next()) != null) { - if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) { - return op; - } else { - overriddenOperations++; + if (readForward) { + // Read forward: from index 0 to translogs.length - 1 + for (; index < translogs.length; index++) { + final TranslogSnapshot current = translogs[index]; + Translog.Operation op; + while ((op = current.next()) != null) { + if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) { + return op; + } else { + overriddenOperations++; + } + } + } + } else { + // Read backward (original behavior): from translogs.length - 1 to 0 + for (; index >= 0; index--) { + final TranslogSnapshot current = translogs[index]; + Translog.Operation op; + while ((op = current.next()) != null) { + if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) { + return op; + } else { + overriddenOperations++; + } } } } diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index fd34f6766d2a8..e2a52fb90b77b 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -761,7 +761,8 @@ private Snapshot newMultiSnapshot(TranslogSnapshot[] snapshots) throws IOExcepti } boolean success = false; try { - Snapshot result = new MultiSnapshot(snapshots, onClose); + boolean readForward = indexSettings().isTranslogReadForward(); + Snapshot result = new MultiSnapshot(snapshots, onClose, readForward); success = true; return result; } finally { diff --git a/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java index afe306625b6bc..4730360c0c782 100644 --- a/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java @@ -635,6 +635,74 @@ public void testSeqNoCollision() throws Exception { } } + public void testSeqNoCollisionWithReadForward() throws Exception { + try ( + ReplicationGroup shards = createGroup( + 2, + Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1") + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1") + .put(IndexSettings.INDEX_TRANSLOG_READ_FORWARD_SETTING.getKey(), true) + .build() + ) + ) { + shards.startAll(); + int initDocs = shards.indexDocs(randomInt(10)); + List replicas = shards.getReplicas(); + IndexShard replica1 = replicas.get(0); + IndexShard replica2 = replicas.get(1); + shards.syncGlobalCheckpoint(); + + logger.info("--> Isolate replica1"); + IndexRequest indexDoc1 = new IndexRequest(index.getName()).id("d1").source("{}", MediaTypeRegistry.JSON); + BulkShardRequest replicationRequest = indexOnPrimary(indexDoc1, shards.getPrimary()); + indexOnReplica(replicationRequest, shards, replica2); + + final Translog.Operation op1; + final List initOperations = new ArrayList<>(initDocs); + try (Translog.Snapshot snapshot = getTranslog(replica2).newSnapshot()) { + assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); + for (int i = 0; i < initDocs; i++) { + Translog.Operation op = snapshot.next(); + assertThat(op, is(notNullValue())); + initOperations.add(op); + } + op1 = snapshot.next(); + assertThat(op1, notNullValue()); + assertThat(snapshot.next(), nullValue()); + assertThat(snapshot.skippedOperations(), equalTo(0)); + } + logger.info("--> Promote replica1 as the primary"); + shards.promoteReplicaToPrimary(replica1).get(); // wait until resync completed. + shards.index(new IndexRequest(index.getName()).id("d2").source("{}", MediaTypeRegistry.JSON)); + final Translog.Operation op2; + try (Translog.Snapshot snapshot = getTranslog(replica2).newSnapshot()) { + assertThat(snapshot.totalOperations(), equalTo(1)); + op2 = snapshot.next(); + assertThat(op2.seqNo(), equalTo(op1.seqNo())); + assertThat(op2.primaryTerm(), greaterThan(op1.primaryTerm())); + assertNull(snapshot.next()); + assertThat(snapshot.skippedOperations(), equalTo(0)); + } + + // Make sure that peer-recovery transfers all but non-overridden operations with forward reading. + IndexShard replica3 = shards.addReplica(); + logger.info("--> Promote replica2 as the primary"); + shards.promoteReplicaToPrimary(replica2).get(); + logger.info("--> Recover replica3 from replica2"); + recoverReplica(replica3, replica2, true); + try (Translog.Snapshot snapshot = replica3.newChangesSnapshot("test", 0, Long.MAX_VALUE, false, true)) { + assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); + final List expectedOps = new ArrayList<>(initOperations); + expectedOps.add(op2); + assertThat(snapshot, containsOperationsInAnyOrder(expectedOps)); + assertThat("Peer-recovery should not send overridden operations", snapshot.skippedOperations(), equalTo(0)); + } + shards.assertAllEqual(initDocs + 1); + } + } + /** * This test ensures the consistency between primary and replica with late and out of order delivery on the replica. * An index operation on the primary is followed by a delete operation. The delete operation is delivered first diff --git a/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java index 14fb71e2a1b48..aba1cdde2c52d 100644 --- a/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java @@ -3873,6 +3873,66 @@ public void testSnapshotReadOperationInReverse() throws Exception { } } + public void testSnapshotReadOperationForward() throws Exception { + Path tempDir = createTempDir(); + final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put(IndexSettings.INDEX_TRANSLOG_READ_FORWARD_SETTING.getKey(), true) + .build(); + final TranslogConfig forwardConfig = getTranslogConfig(tempDir, settings); + + final String translogUUID = Translog.createEmptyTranslog( + forwardConfig.getTranslogPath(), + SequenceNumbers.NO_OPS_PERFORMED, + shardId, + primaryTerm.get() + ); + + // Create a separate translog instance with forward reading enabled + try ( + Translog forwardTranslog = new LocalTranslog( + forwardConfig, + translogUUID, + createTranslogDeletionPolicy(forwardConfig.getIndexSettings()), + () -> globalCheckpoint.get(), + primaryTerm::get, + getPersistedSeqNoConsumer(), + TranslogOperationHelper.DEFAULT, + null + ) + ) { + final List> views = new ArrayList<>(); + views.add(new ArrayList<>()); + final AtomicLong seqNo = new AtomicLong(); + + final int generations = randomIntBetween(2, 20); + for (int gen = 0; gen < generations; gen++) { + final int operations = randomIntBetween(1, 100); + for (int i = 0; i < operations; i++) { + Translog.Index op = new Translog.Index( + randomAlphaOfLength(10), + seqNo.getAndIncrement(), + primaryTerm.get(), + new byte[] { 1 } + ); + forwardTranslog.add(op); + views.get(views.size() - 1).add(op); + } + if (frequently()) { + forwardTranslog.rollGeneration(); + views.add(new ArrayList<>()); + } + } + try (Translog.Snapshot snapshot = forwardTranslog.newSnapshot()) { + final List expectedSeqNo = new ArrayList<>(); + for (List view : views) { + expectedSeqNo.addAll(view); + } + assertThat(snapshot, SnapshotMatchers.equalsTo(expectedSeqNo)); + } + } + } + public void testSnapshotDedupOperations() throws Exception { final Map latestOperations = new HashMap<>(); final int generations = between(2, 20); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java index 71d89e2856c6e..b065456373935 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java @@ -529,4 +529,45 @@ public void testRecoveryTrimsLocalTranslog() throws Exception { } } } + + public void testRecoveryTrimsLocalTranslogWithReadForward() throws Exception { + Settings settings = Settings.builder().put(IndexSettings.INDEX_TRANSLOG_READ_FORWARD_SETTING.getKey(), true).build(); + try (ReplicationGroup shards = createGroup(between(1, 2), settings, new InternalEngineFactory())) { + shards.startAll(); + IndexShard oldPrimary = shards.getPrimary(); + shards.indexDocs(scaledRandomIntBetween(1, 100)); + if (randomBoolean()) { + shards.flush(); + } + int inflightDocs = scaledRandomIntBetween(1, 100); + for (int i = 0; i < inflightDocs; i++) { + final IndexRequest indexRequest = new IndexRequest(index.getName()).id("extra_" + i).source("{}", MediaTypeRegistry.JSON); + final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary); + for (IndexShard replica : randomSubsetOf(shards.getReplicas())) { + indexOnReplica(bulkShardRequest, shards, replica); + } + if (rarely()) { + shards.flush(); + } + } + shards.syncGlobalCheckpoint(); + shards.promoteReplicaToPrimary(randomFrom(shards.getReplicas())).get(); + oldPrimary.close("demoted", false, false); + oldPrimary.store().close(); + oldPrimary = shards.addReplicaWithExistingPath(oldPrimary.shardPath(), oldPrimary.routingEntry().currentNodeId()); + shards.recoverReplica(oldPrimary); + for (IndexShard shard : shards) { + assertConsistentHistoryBetweenTranslogAndLucene(shard); + } + final List docsAfterRecovery = getDocIdAndSeqNos(shards.getPrimary()); + for (IndexShard shard : shards.getReplicas()) { + assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery)); + } + shards.promoteReplicaToPrimary(oldPrimary).get(); + for (IndexShard shard : shards) { + assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery)); + assertConsistentHistoryBetweenTranslogAndLucene(shard); + } + } + } } From c968213e127175aa6d29a5d5bf3e74f34e37db38 Mon Sep 17 00:00:00 2001 From: xuxiong1 Date: Wed, 3 Dec 2025 17:08:54 -0800 Subject: [PATCH 2/2] Add changelog Signed-off-by: xuxiong1 --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e2be80e8201c..28b0fa707f89c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Support pull-based ingestion message mappers and raw payload support ([#19765](https://github.com/opensearch-project/OpenSearch/pull/19765)) - Add search API tracker ([#18601](https://github.com/opensearch-project/OpenSearch/pull/18601)) - Support dynamic consumer configuration update in pull-based ingestion ([#19963](https://github.com/opensearch-project/OpenSearch/pull/19963)) +- Add support for forward translog reading ([#20163](https://github.com/opensearch-project/OpenSearch/pull/20163) ### Changed - Combining filter rewrite and skip list to optimize sub aggregation([#19573](https://github.com/opensearch-project/OpenSearch/pull/19573))