Skip to content

Commit 58a5efd

Browse files
committed
Add support for forward translog reading
Signed-off-by: xuxiong1 <[email protected]>
1 parent 2b6d266 commit 58a5efd

File tree

6 files changed

+226
-12
lines changed

6 files changed

+226
-12
lines changed

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,24 @@ public static IndexMergePolicy fromString(String text) {
196196
Property.Dynamic,
197197
Property.IndexScope
198198
);
199+
/**
200+
* Controls whether translog operations are read in forward order (oldest to newest) or backward order (newest to oldest).
201+
* Default is false (backward reading), which is the traditional behavior that naturally handles sequence number collisions
202+
* by prioritizing operations from newer generations.
203+
* <p>
204+
* <b>Note:</b> Enabling forward reading is safe for most use cases. However, in rare edge cases, it may replay stale
205+
* translog operations. Stale operation trimming (via
206+
* {@link org.opensearch.index.shard.IndexShard#trimOperationOfPreviousPrimaryTerms(long)}) occurs during the recovery
207+
* finalization phase. If a replica fails before completing
208+
* {@link org.opensearch.indices.recovery.RecoveryTarget#finalizeRecovery(long, long, org.opensearch.core.action.ActionListener)}
209+
* (leaving untrimmed stale operations) and no in-sync copies are available, we force-allocate this recovering replica as primary.
210+
* In this scenario, forward reading could return outdated operations from previous primary terms.
211+
*/
212+
public static final Setting<Boolean> INDEX_TRANSLOG_READ_FORWARD_SETTING = Setting.boolSetting(
213+
"index.translog.read_forward",
214+
false,
215+
Property.IndexScope
216+
);
199217
public static final Setting<Boolean> INDEX_WARMER_ENABLED_SETTING = Setting.boolSetting(
200218
"index.warmer.enabled",
201219
true,
@@ -892,6 +910,7 @@ public static IndexMergePolicy fromString(String text) {
892910
private final boolean queryStringAllowLeadingWildcard;
893911
private final boolean defaultAllowUnmappedFields;
894912
private volatile Translog.Durability durability;
913+
private final boolean translogReadForward;
895914
private volatile TimeValue syncInterval;
896915
private volatile TimeValue publishReferencedSegmentsInterval;
897916
private volatile TimeValue refreshInterval;
@@ -1112,6 +1131,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
11121131
this.defaultAllowUnmappedFields = scopedSettings.get(ALLOW_UNMAPPED);
11131132
this.allowDerivedField = scopedSettings.get(ALLOW_DERIVED_FIELDS);
11141133
this.durability = scopedSettings.get(INDEX_TRANSLOG_DURABILITY_SETTING);
1134+
this.translogReadForward = INDEX_TRANSLOG_READ_FORWARD_SETTING.get(settings);
11151135
defaultFields = scopedSettings.get(DEFAULT_FIELD_SETTING);
11161136
syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings);
11171137
publishReferencedSegmentsInterval = INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING.get(settings);
@@ -2078,6 +2098,13 @@ public boolean isSoftDeleteEnabled() {
20782098
return softDeleteEnabled;
20792099
}
20802100

2101+
/**
2102+
* Returns <code>true</code> if translog read-forward is enabled.
2103+
*/
2104+
public boolean isTranslogReadForward() {
2105+
return translogReadForward;
2106+
}
2107+
20812108
public boolean isContextAwareEnabled() {
20822109
return contextAwareEnabled && FeatureFlags.isEnabled(CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_SETTING);
20832110
}

server/src/main/java/org/opensearch/index/translog/MultiSnapshot.java

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,17 +54,19 @@ final class MultiSnapshot implements Translog.Snapshot {
5454
private final Closeable onClose;
5555
private int index;
5656
private final SeqNoSet seenSeqNo;
57+
private final boolean readForward;
5758

5859
/**
5960
* Creates a new point in time snapshot of the given snapshots. Those snapshots are always iterated in-order.
6061
*/
61-
MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose) {
62+
MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose, boolean readForward) {
6263
this.translogs = translogs;
6364
this.totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum();
6465
this.overriddenOperations = 0;
6566
this.onClose = onClose;
6667
this.seenSeqNo = new SeqNoSet();
67-
this.index = translogs.length - 1;
68+
this.readForward = readForward;
69+
this.index = readForward ? 0 : translogs.length - 1;
6870
}
6971

7072
@Override
@@ -79,15 +81,30 @@ public int skippedOperations() {
7981

8082
@Override
8183
public Translog.Operation next() throws IOException {
82-
// TODO: Read translog forward in 9.0+
83-
for (; index >= 0; index--) {
84-
final TranslogSnapshot current = translogs[index];
85-
Translog.Operation op;
86-
while ((op = current.next()) != null) {
87-
if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) {
88-
return op;
89-
} else {
90-
overriddenOperations++;
84+
if (readForward) {
85+
// Read forward: from index 0 to translogs.length - 1
86+
for (; index < translogs.length; index++) {
87+
final TranslogSnapshot current = translogs[index];
88+
Translog.Operation op;
89+
while ((op = current.next()) != null) {
90+
if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) {
91+
return op;
92+
} else {
93+
overriddenOperations++;
94+
}
95+
}
96+
}
97+
} else {
98+
// Read backward (original behavior): from translogs.length - 1 to 0
99+
for (; index >= 0; index--) {
100+
final TranslogSnapshot current = translogs[index];
101+
Translog.Operation op;
102+
while ((op = current.next()) != null) {
103+
if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) {
104+
return op;
105+
} else {
106+
overriddenOperations++;
107+
}
91108
}
92109
}
93110
}

server/src/main/java/org/opensearch/index/translog/Translog.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -761,7 +761,8 @@ private Snapshot newMultiSnapshot(TranslogSnapshot[] snapshots) throws IOExcepti
761761
}
762762
boolean success = false;
763763
try {
764-
Snapshot result = new MultiSnapshot(snapshots, onClose);
764+
boolean readForward = indexSettings().isTranslogReadForward();
765+
Snapshot result = new MultiSnapshot(snapshots, onClose, readForward);
765766
success = true;
766767
return result;
767768
} finally {

server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -635,6 +635,74 @@ public void testSeqNoCollision() throws Exception {
635635
}
636636
}
637637

638+
public void testSeqNoCollisionWithReadForward() throws Exception {
639+
try (
640+
ReplicationGroup shards = createGroup(
641+
2,
642+
Settings.builder()
643+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
644+
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1")
645+
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")
646+
.put(IndexSettings.INDEX_TRANSLOG_READ_FORWARD_SETTING.getKey(), true)
647+
.build()
648+
)
649+
) {
650+
shards.startAll();
651+
int initDocs = shards.indexDocs(randomInt(10));
652+
List<IndexShard> replicas = shards.getReplicas();
653+
IndexShard replica1 = replicas.get(0);
654+
IndexShard replica2 = replicas.get(1);
655+
shards.syncGlobalCheckpoint();
656+
657+
logger.info("--> Isolate replica1");
658+
IndexRequest indexDoc1 = new IndexRequest(index.getName()).id("d1").source("{}", MediaTypeRegistry.JSON);
659+
BulkShardRequest replicationRequest = indexOnPrimary(indexDoc1, shards.getPrimary());
660+
indexOnReplica(replicationRequest, shards, replica2);
661+
662+
final Translog.Operation op1;
663+
final List<Translog.Operation> initOperations = new ArrayList<>(initDocs);
664+
try (Translog.Snapshot snapshot = getTranslog(replica2).newSnapshot()) {
665+
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
666+
for (int i = 0; i < initDocs; i++) {
667+
Translog.Operation op = snapshot.next();
668+
assertThat(op, is(notNullValue()));
669+
initOperations.add(op);
670+
}
671+
op1 = snapshot.next();
672+
assertThat(op1, notNullValue());
673+
assertThat(snapshot.next(), nullValue());
674+
assertThat(snapshot.skippedOperations(), equalTo(0));
675+
}
676+
logger.info("--> Promote replica1 as the primary");
677+
shards.promoteReplicaToPrimary(replica1).get(); // wait until resync completed.
678+
shards.index(new IndexRequest(index.getName()).id("d2").source("{}", MediaTypeRegistry.JSON));
679+
final Translog.Operation op2;
680+
try (Translog.Snapshot snapshot = getTranslog(replica2).newSnapshot()) {
681+
assertThat(snapshot.totalOperations(), equalTo(1));
682+
op2 = snapshot.next();
683+
assertThat(op2.seqNo(), equalTo(op1.seqNo()));
684+
assertThat(op2.primaryTerm(), greaterThan(op1.primaryTerm()));
685+
assertNull(snapshot.next());
686+
assertThat(snapshot.skippedOperations(), equalTo(0));
687+
}
688+
689+
// Make sure that peer-recovery transfers all but non-overridden operations with forward reading.
690+
IndexShard replica3 = shards.addReplica();
691+
logger.info("--> Promote replica2 as the primary");
692+
shards.promoteReplicaToPrimary(replica2).get();
693+
logger.info("--> Recover replica3 from replica2");
694+
recoverReplica(replica3, replica2, true);
695+
try (Translog.Snapshot snapshot = replica3.newChangesSnapshot("test", 0, Long.MAX_VALUE, false, true)) {
696+
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
697+
final List<Translog.Operation> expectedOps = new ArrayList<>(initOperations);
698+
expectedOps.add(op2);
699+
assertThat(snapshot, containsOperationsInAnyOrder(expectedOps));
700+
assertThat("Peer-recovery should not send overridden operations", snapshot.skippedOperations(), equalTo(0));
701+
}
702+
shards.assertAllEqual(initDocs + 1);
703+
}
704+
}
705+
638706
/**
639707
* This test ensures the consistency between primary and replica with late and out of order delivery on the replica.
640708
* An index operation on the primary is followed by a delete operation. The delete operation is delivered first

server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3873,6 +3873,66 @@ public void testSnapshotReadOperationInReverse() throws Exception {
38733873
}
38743874
}
38753875

3876+
public void testSnapshotReadOperationForward() throws Exception {
3877+
Path tempDir = createTempDir();
3878+
final Settings settings = Settings.builder()
3879+
.put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT)
3880+
.put(IndexSettings.INDEX_TRANSLOG_READ_FORWARD_SETTING.getKey(), true)
3881+
.build();
3882+
final TranslogConfig forwardConfig = getTranslogConfig(tempDir, settings);
3883+
3884+
final String translogUUID = Translog.createEmptyTranslog(
3885+
forwardConfig.getTranslogPath(),
3886+
SequenceNumbers.NO_OPS_PERFORMED,
3887+
shardId,
3888+
primaryTerm.get()
3889+
);
3890+
3891+
// Create a separate translog instance with forward reading enabled
3892+
try (
3893+
Translog forwardTranslog = new LocalTranslog(
3894+
forwardConfig,
3895+
translogUUID,
3896+
createTranslogDeletionPolicy(forwardConfig.getIndexSettings()),
3897+
() -> globalCheckpoint.get(),
3898+
primaryTerm::get,
3899+
getPersistedSeqNoConsumer(),
3900+
TranslogOperationHelper.DEFAULT,
3901+
null
3902+
)
3903+
) {
3904+
final List<List<Translog.Operation>> views = new ArrayList<>();
3905+
views.add(new ArrayList<>());
3906+
final AtomicLong seqNo = new AtomicLong();
3907+
3908+
final int generations = randomIntBetween(2, 20);
3909+
for (int gen = 0; gen < generations; gen++) {
3910+
final int operations = randomIntBetween(1, 100);
3911+
for (int i = 0; i < operations; i++) {
3912+
Translog.Index op = new Translog.Index(
3913+
randomAlphaOfLength(10),
3914+
seqNo.getAndIncrement(),
3915+
primaryTerm.get(),
3916+
new byte[] { 1 }
3917+
);
3918+
forwardTranslog.add(op);
3919+
views.get(views.size() - 1).add(op);
3920+
}
3921+
if (frequently()) {
3922+
forwardTranslog.rollGeneration();
3923+
views.add(new ArrayList<>());
3924+
}
3925+
}
3926+
try (Translog.Snapshot snapshot = forwardTranslog.newSnapshot()) {
3927+
final List<Translog.Operation> expectedSeqNo = new ArrayList<>();
3928+
for (List<Translog.Operation> view : views) {
3929+
expectedSeqNo.addAll(view);
3930+
}
3931+
assertThat(snapshot, SnapshotMatchers.equalsTo(expectedSeqNo));
3932+
}
3933+
}
3934+
}
3935+
38763936
public void testSnapshotDedupOperations() throws Exception {
38773937
final Map<Long, Translog.Operation> latestOperations = new HashMap<>();
38783938
final int generations = between(2, 20);

server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,4 +529,45 @@ public void testRecoveryTrimsLocalTranslog() throws Exception {
529529
}
530530
}
531531
}
532+
533+
public void testRecoveryTrimsLocalTranslogWithReadForward() throws Exception {
534+
Settings settings = Settings.builder().put(IndexSettings.INDEX_TRANSLOG_READ_FORWARD_SETTING.getKey(), true).build();
535+
try (ReplicationGroup shards = createGroup(between(1, 2), settings, new InternalEngineFactory())) {
536+
shards.startAll();
537+
IndexShard oldPrimary = shards.getPrimary();
538+
shards.indexDocs(scaledRandomIntBetween(1, 100));
539+
if (randomBoolean()) {
540+
shards.flush();
541+
}
542+
int inflightDocs = scaledRandomIntBetween(1, 100);
543+
for (int i = 0; i < inflightDocs; i++) {
544+
final IndexRequest indexRequest = new IndexRequest(index.getName()).id("extra_" + i).source("{}", MediaTypeRegistry.JSON);
545+
final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary);
546+
for (IndexShard replica : randomSubsetOf(shards.getReplicas())) {
547+
indexOnReplica(bulkShardRequest, shards, replica);
548+
}
549+
if (rarely()) {
550+
shards.flush();
551+
}
552+
}
553+
shards.syncGlobalCheckpoint();
554+
shards.promoteReplicaToPrimary(randomFrom(shards.getReplicas())).get();
555+
oldPrimary.close("demoted", false, false);
556+
oldPrimary.store().close();
557+
oldPrimary = shards.addReplicaWithExistingPath(oldPrimary.shardPath(), oldPrimary.routingEntry().currentNodeId());
558+
shards.recoverReplica(oldPrimary);
559+
for (IndexShard shard : shards) {
560+
assertConsistentHistoryBetweenTranslogAndLucene(shard);
561+
}
562+
final List<DocIdSeqNoAndSource> docsAfterRecovery = getDocIdAndSeqNos(shards.getPrimary());
563+
for (IndexShard shard : shards.getReplicas()) {
564+
assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery));
565+
}
566+
shards.promoteReplicaToPrimary(oldPrimary).get();
567+
for (IndexShard shard : shards) {
568+
assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery));
569+
assertConsistentHistoryBetweenTranslogAndLucene(shard);
570+
}
571+
}
572+
}
532573
}

0 commit comments

Comments
 (0)