Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add search API tracker ([#18601](https://github.com/opensearch-project/OpenSearch/pull/18601))
- Support dynamic consumer configuration update in pull-based ingestion ([#19963](https://github.com/opensearch-project/OpenSearch/pull/19963))
- Cache the `StoredFieldsReader` for scroll query optimization ([#20112](https://github.com/opensearch-project/OpenSearch/pull/20112))
- Add support for forward translog reading ([#20163](https://github.com/opensearch-project/OpenSearch/pull/20163)

### Changed
- Combining filter rewrite and skip list to optimize sub aggregation([#19573](https://github.com/opensearch-project/OpenSearch/pull/19573))
Expand Down
28 changes: 28 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,25 @@ public static IndexMergePolicy fromString(String text) {
Property.Dynamic,
Property.IndexScope
);
/**
* Controls whether translog operations are read in forward order (oldest to newest) or backward order (newest to oldest).
* Default is false (backward reading), which is the traditional behavior that naturally handles sequence number collisions
* by prioritizing operations from newer generations.
* <p>
* <b>Note:</b> Enabling forward reading is safe for most use cases. However, in rare edge cases, it may replay stale
* translog operations. Stale operation trimming (via
* {@link org.opensearch.index.shard.IndexShard#trimOperationOfPreviousPrimaryTerms(long)}) occurs during the recovery
* finalization phase. If a replica fails before completing
* {@link org.opensearch.indices.recovery.RecoveryTarget#finalizeRecovery(long, long, org.opensearch.core.action.ActionListener)}
* and there are duplicates of the same translog operations with different primary terms in the translog (could happen during primary failover with newwork isolation, leaving untrimmed stale operations)
* and no in-sync copies are available, we force-allocate this recovering replica as primary.
* In this scenario, forward reading could return outdated operations from previous primary terms.
*/
public static final Setting<Boolean> INDEX_TRANSLOG_READ_FORWARD_SETTING = Setting.boolSetting(
"index.translog.read_forward",
false,
Property.IndexScope
);
public static final Setting<Boolean> INDEX_WARMER_ENABLED_SETTING = Setting.boolSetting(
"index.warmer.enabled",
true,
Expand Down Expand Up @@ -892,6 +911,7 @@ public static IndexMergePolicy fromString(String text) {
private final boolean queryStringAllowLeadingWildcard;
private final boolean defaultAllowUnmappedFields;
private volatile Translog.Durability durability;
private final boolean translogReadForward;
private volatile TimeValue syncInterval;
private volatile TimeValue publishReferencedSegmentsInterval;
private volatile TimeValue refreshInterval;
Expand Down Expand Up @@ -1112,6 +1132,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
this.defaultAllowUnmappedFields = scopedSettings.get(ALLOW_UNMAPPED);
this.allowDerivedField = scopedSettings.get(ALLOW_DERIVED_FIELDS);
this.durability = scopedSettings.get(INDEX_TRANSLOG_DURABILITY_SETTING);
this.translogReadForward = INDEX_TRANSLOG_READ_FORWARD_SETTING.get(settings);
defaultFields = scopedSettings.get(DEFAULT_FIELD_SETTING);
syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings);
publishReferencedSegmentsInterval = INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING.get(settings);
Expand Down Expand Up @@ -2078,6 +2099,13 @@ public boolean isSoftDeleteEnabled() {
return softDeleteEnabled;
}

/**
* Returns <code>true</code> if translog read-forward is enabled.
*/
public boolean isTranslogReadForward() {
return translogReadForward;
}

public boolean isContextAwareEnabled() {
return contextAwareEnabled && FeatureFlags.isEnabled(CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_SETTING);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,19 @@ final class MultiSnapshot implements Translog.Snapshot {
private final Closeable onClose;
private int index;
private final SeqNoSet seenSeqNo;
private final boolean readForward;

/**
* Creates a new point in time snapshot of the given snapshots. Those snapshots are always iterated in-order.
*/
MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose) {
MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose, boolean readForward) {
this.translogs = translogs;
this.totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum();
this.overriddenOperations = 0;
this.onClose = onClose;
this.seenSeqNo = new SeqNoSet();
this.index = translogs.length - 1;
this.readForward = readForward;
this.index = readForward ? 0 : translogs.length - 1;
}

@Override
Expand All @@ -79,15 +81,30 @@ public int skippedOperations() {

@Override
public Translog.Operation next() throws IOException {
// TODO: Read translog forward in 9.0+
for (; index >= 0; index--) {
final TranslogSnapshot current = translogs[index];
Translog.Operation op;
while ((op = current.next()) != null) {
if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) {
return op;
} else {
overriddenOperations++;
if (readForward) {
// Read forward: from index 0 to translogs.length - 1
for (; index < translogs.length; index++) {
final TranslogSnapshot current = translogs[index];
Translog.Operation op;
while ((op = current.next()) != null) {
if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) {
return op;
} else {
overriddenOperations++;
}
}
}
} else {
// Read backward (original behavior): from translogs.length - 1 to 0
for (; index >= 0; index--) {
final TranslogSnapshot current = translogs[index];
Translog.Operation op;
while ((op = current.next()) != null) {
if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) {
return op;
} else {
overriddenOperations++;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,8 @@ private Snapshot newMultiSnapshot(TranslogSnapshot[] snapshots) throws IOExcepti
}
boolean success = false;
try {
Snapshot result = new MultiSnapshot(snapshots, onClose);
boolean readForward = indexSettings().isTranslogReadForward();
Snapshot result = new MultiSnapshot(snapshots, onClose, readForward);
success = true;
return result;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,74 @@ public void testSeqNoCollision() throws Exception {
}
}

public void testSeqNoCollisionWithReadForward() throws Exception {
try (
ReplicationGroup shards = createGroup(
2,
Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1")
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")
.put(IndexSettings.INDEX_TRANSLOG_READ_FORWARD_SETTING.getKey(), true)
.build()
)
) {
shards.startAll();
int initDocs = shards.indexDocs(randomInt(10));
List<IndexShard> replicas = shards.getReplicas();
IndexShard replica1 = replicas.get(0);
IndexShard replica2 = replicas.get(1);
shards.syncGlobalCheckpoint();

logger.info("--> Isolate replica1");
IndexRequest indexDoc1 = new IndexRequest(index.getName()).id("d1").source("{}", MediaTypeRegistry.JSON);
BulkShardRequest replicationRequest = indexOnPrimary(indexDoc1, shards.getPrimary());
indexOnReplica(replicationRequest, shards, replica2);

final Translog.Operation op1;
final List<Translog.Operation> initOperations = new ArrayList<>(initDocs);
try (Translog.Snapshot snapshot = getTranslog(replica2).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
for (int i = 0; i < initDocs; i++) {
Translog.Operation op = snapshot.next();
assertThat(op, is(notNullValue()));
initOperations.add(op);
}
op1 = snapshot.next();
assertThat(op1, notNullValue());
assertThat(snapshot.next(), nullValue());
assertThat(snapshot.skippedOperations(), equalTo(0));
}
logger.info("--> Promote replica1 as the primary");
shards.promoteReplicaToPrimary(replica1).get(); // wait until resync completed.
shards.index(new IndexRequest(index.getName()).id("d2").source("{}", MediaTypeRegistry.JSON));
final Translog.Operation op2;
try (Translog.Snapshot snapshot = getTranslog(replica2).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(1));
op2 = snapshot.next();
assertThat(op2.seqNo(), equalTo(op1.seqNo()));
assertThat(op2.primaryTerm(), greaterThan(op1.primaryTerm()));
assertNull(snapshot.next());
assertThat(snapshot.skippedOperations(), equalTo(0));
}

// Make sure that peer-recovery transfers all but non-overridden operations with forward reading.
IndexShard replica3 = shards.addReplica();
logger.info("--> Promote replica2 as the primary");
shards.promoteReplicaToPrimary(replica2).get();
logger.info("--> Recover replica3 from replica2");
recoverReplica(replica3, replica2, true);
try (Translog.Snapshot snapshot = replica3.newChangesSnapshot("test", 0, Long.MAX_VALUE, false, true)) {
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
final List<Translog.Operation> expectedOps = new ArrayList<>(initOperations);
expectedOps.add(op2);
assertThat(snapshot, containsOperationsInAnyOrder(expectedOps));
assertThat("Peer-recovery should not send overridden operations", snapshot.skippedOperations(), equalTo(0));
}
shards.assertAllEqual(initDocs + 1);
}
}

/**
* This test ensures the consistency between primary and replica with late and out of order delivery on the replica.
* An index operation on the primary is followed by a delete operation. The delete operation is delivered first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3873,6 +3873,66 @@ public void testSnapshotReadOperationInReverse() throws Exception {
}
}

public void testSnapshotReadOperationForward() throws Exception {
Path tempDir = createTempDir();
final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT)
.put(IndexSettings.INDEX_TRANSLOG_READ_FORWARD_SETTING.getKey(), true)
.build();
final TranslogConfig forwardConfig = getTranslogConfig(tempDir, settings);

final String translogUUID = Translog.createEmptyTranslog(
forwardConfig.getTranslogPath(),
SequenceNumbers.NO_OPS_PERFORMED,
shardId,
primaryTerm.get()
);

// Create a separate translog instance with forward reading enabled
try (
Translog forwardTranslog = new LocalTranslog(
forwardConfig,
translogUUID,
createTranslogDeletionPolicy(forwardConfig.getIndexSettings()),
() -> globalCheckpoint.get(),
primaryTerm::get,
getPersistedSeqNoConsumer(),
TranslogOperationHelper.DEFAULT,
null
)
) {
final List<List<Translog.Operation>> views = new ArrayList<>();
views.add(new ArrayList<>());
final AtomicLong seqNo = new AtomicLong();

final int generations = randomIntBetween(2, 20);
for (int gen = 0; gen < generations; gen++) {
final int operations = randomIntBetween(1, 100);
for (int i = 0; i < operations; i++) {
Translog.Index op = new Translog.Index(
randomAlphaOfLength(10),
seqNo.getAndIncrement(),
primaryTerm.get(),
new byte[] { 1 }
);
forwardTranslog.add(op);
views.get(views.size() - 1).add(op);
}
if (frequently()) {
forwardTranslog.rollGeneration();
views.add(new ArrayList<>());
}
}
try (Translog.Snapshot snapshot = forwardTranslog.newSnapshot()) {
final List<Translog.Operation> expectedSeqNo = new ArrayList<>();
for (List<Translog.Operation> view : views) {
expectedSeqNo.addAll(view);
}
assertThat(snapshot, SnapshotMatchers.equalsTo(expectedSeqNo));
}
}
}

public void testSnapshotDedupOperations() throws Exception {
final Map<Long, Translog.Operation> latestOperations = new HashMap<>();
final int generations = between(2, 20);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,4 +529,45 @@ public void testRecoveryTrimsLocalTranslog() throws Exception {
}
}
}

public void testRecoveryTrimsLocalTranslogWithReadForward() throws Exception {
Settings settings = Settings.builder().put(IndexSettings.INDEX_TRANSLOG_READ_FORWARD_SETTING.getKey(), true).build();
try (ReplicationGroup shards = createGroup(between(1, 2), settings, new InternalEngineFactory())) {
shards.startAll();
IndexShard oldPrimary = shards.getPrimary();
shards.indexDocs(scaledRandomIntBetween(1, 100));
if (randomBoolean()) {
shards.flush();
}
int inflightDocs = scaledRandomIntBetween(1, 100);
for (int i = 0; i < inflightDocs; i++) {
final IndexRequest indexRequest = new IndexRequest(index.getName()).id("extra_" + i).source("{}", MediaTypeRegistry.JSON);
final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary);
for (IndexShard replica : randomSubsetOf(shards.getReplicas())) {
indexOnReplica(bulkShardRequest, shards, replica);
}
if (rarely()) {
shards.flush();
}
}
shards.syncGlobalCheckpoint();
shards.promoteReplicaToPrimary(randomFrom(shards.getReplicas())).get();
oldPrimary.close("demoted", false, false);
oldPrimary.store().close();
oldPrimary = shards.addReplicaWithExistingPath(oldPrimary.shardPath(), oldPrimary.routingEntry().currentNodeId());
shards.recoverReplica(oldPrimary);
for (IndexShard shard : shards) {
assertConsistentHistoryBetweenTranslogAndLucene(shard);
}
final List<DocIdSeqNoAndSource> docsAfterRecovery = getDocIdAndSeqNos(shards.getPrimary());
for (IndexShard shard : shards.getReplicas()) {
assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery));
}
shards.promoteReplicaToPrimary(oldPrimary).get();
for (IndexShard shard : shards) {
assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery));
assertConsistentHistoryBetweenTranslogAndLucene(shard);
}
}
}
}
Loading