diff --git a/CHANGELOG.md b/CHANGELOG.md index c69c928a7ab56..4e156b284980a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add search API tracker ([#18601](https://github.com/opensearch-project/OpenSearch/pull/18601)) - Support dynamic consumer configuration update in pull-based ingestion ([#19963](https://github.com/opensearch-project/OpenSearch/pull/19963)) - Cache the `StoredFieldsReader` for scroll query optimization ([#20112](https://github.com/opensearch-project/OpenSearch/pull/20112)) +- Add support for forward translog reading ([#20163](https://github.com/opensearch-project/OpenSearch/pull/20163)) ### Changed - Combining filter rewrite and skip list to optimize sub aggregation([#19573](https://github.com/opensearch-project/OpenSearch/pull/19573)) diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 7c5be554a7760..4d5c2ce27b7c4 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -196,6 +196,26 @@ public static IndexMergePolicy fromString(String text) { Property.Dynamic, Property.IndexScope ); + /** + * Controls whether translog operations are read in forward order (oldest to newest) or backward order (newest to oldest). + * Default is false (backward reading), which is the traditional behavior that naturally handles sequence number collisions + * by prioritizing operations from newer generations. + *
+ * Note: Enabling forward reading is safe for most use cases. However, in rare edge cases, it may replay stale
+ * translog operations. Stale operation trimming (via
+ * {@link org.opensearch.index.shard.IndexShard#trimOperationOfPreviousPrimaryTerms(long)}) occurs during the recovery
+ * finalization phase. If a replica fails before completing
+ * {@link org.opensearch.indices.recovery.RecoveryTarget#finalizeRecovery(long, long, org.opensearch.core.action.ActionListener)}
+ + and there are duplicates of the same translog operations with different primary terms in the translog
+ + (for example, during a primary failover with network isolation that leaves stale operations untrimmed)
+ * and no in-sync copies are available, we force-allocate this recovering replica as primary.
+ * In this scenario, forward reading could return outdated operations from previous primary terms.
+ */
+ public static final Settingtrue if translog read-forward is enabled.
+ */
+ public boolean isTranslogReadForward() {
+ return translogReadForward;
+ }
+
public boolean isContextAwareEnabled() {
return contextAwareEnabled && FeatureFlags.isEnabled(CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_SETTING);
}
diff --git a/server/src/main/java/org/opensearch/index/translog/MultiSnapshot.java b/server/src/main/java/org/opensearch/index/translog/MultiSnapshot.java
index 941283afe5908..d3bb51d43701a 100644
--- a/server/src/main/java/org/opensearch/index/translog/MultiSnapshot.java
+++ b/server/src/main/java/org/opensearch/index/translog/MultiSnapshot.java
@@ -54,17 +54,19 @@ final class MultiSnapshot implements Translog.Snapshot {
private final Closeable onClose;
private int index;
private final SeqNoSet seenSeqNo;
+ private final boolean readForward;
/**
* Creates a new point in time snapshot of the given snapshots. Those snapshots are always iterated in-order.
*/
- MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose) {
+ MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose, boolean readForward) {
this.translogs = translogs;
this.totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum();
this.overriddenOperations = 0;
this.onClose = onClose;
this.seenSeqNo = new SeqNoSet();
- this.index = translogs.length - 1;
+ this.readForward = readForward;
+ this.index = readForward ? 0 : translogs.length - 1;
}
@Override
@@ -79,15 +81,30 @@ public int skippedOperations() {
@Override
public Translog.Operation next() throws IOException {
- // TODO: Read translog forward in 9.0+
- for (; index >= 0; index--) {
- final TranslogSnapshot current = translogs[index];
- Translog.Operation op;
- while ((op = current.next()) != null) {
- if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) {
- return op;
- } else {
- overriddenOperations++;
+ if (readForward) {
+ // Read forward: from index 0 to translogs.length - 1
+ for (; index < translogs.length; index++) {
+ final TranslogSnapshot current = translogs[index];
+ Translog.Operation op;
+ while ((op = current.next()) != null) {
+ if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) {
+ return op;
+ } else {
+ overriddenOperations++;
+ }
+ }
+ }
+ } else {
+ // Read backward (original behavior): from translogs.length - 1 to 0
+ for (; index >= 0; index--) {
+ final TranslogSnapshot current = translogs[index];
+ Translog.Operation op;
+ while ((op = current.next()) != null) {
+ if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) {
+ return op;
+ } else {
+ overriddenOperations++;
+ }
}
}
}
diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java
index fd34f6766d2a8..e2a52fb90b77b 100644
--- a/server/src/main/java/org/opensearch/index/translog/Translog.java
+++ b/server/src/main/java/org/opensearch/index/translog/Translog.java
@@ -761,7 +761,8 @@ private Snapshot newMultiSnapshot(TranslogSnapshot[] snapshots) throws IOExcepti
}
boolean success = false;
try {
- Snapshot result = new MultiSnapshot(snapshots, onClose);
+ boolean readForward = indexSettings().isTranslogReadForward();
+ Snapshot result = new MultiSnapshot(snapshots, onClose, readForward);
success = true;
return result;
} finally {
diff --git a/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java
index afe306625b6bc..4730360c0c782 100644
--- a/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java
+++ b/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java
@@ -635,6 +635,74 @@ public void testSeqNoCollision() throws Exception {
}
}
+ public void testSeqNoCollisionWithReadForward() throws Exception {
+ try (
+ ReplicationGroup shards = createGroup(
+ 2,
+ Settings.builder()
+ .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
+ .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1")
+ .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")
+ .put(IndexSettings.INDEX_TRANSLOG_READ_FORWARD_SETTING.getKey(), true)
+ .build()
+ )
+ ) {
+ shards.startAll();
+ int initDocs = shards.indexDocs(randomInt(10));
+ List> views = new ArrayList<>();
+ views.add(new ArrayList<>());
+ final AtomicLong seqNo = new AtomicLong();
+
+ final int generations = randomIntBetween(2, 20);
+ for (int gen = 0; gen < generations; gen++) {
+ final int operations = randomIntBetween(1, 100);
+ for (int i = 0; i < operations; i++) {
+ Translog.Index op = new Translog.Index(
+ randomAlphaOfLength(10),
+ seqNo.getAndIncrement(),
+ primaryTerm.get(),
+ new byte[] { 1 }
+ );
+ forwardTranslog.add(op);
+ views.get(views.size() - 1).add(op);
+ }
+ if (frequently()) {
+ forwardTranslog.rollGeneration();
+ views.add(new ArrayList<>());
+ }
+ }
+ try (Translog.Snapshot snapshot = forwardTranslog.newSnapshot()) {
+ final List