Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Support pull-based ingestion message mappers and raw payload support ([#19765](https://github.com/opensearch-project/OpenSearch/pull/19765))
- Add search API tracker ([#18601](https://github.com/opensearch-project/OpenSearch/pull/18601))
- Support dynamic consumer configuration update in pull-based ingestion ([#19963](https://github.com/opensearch-project/OpenSearch/pull/19963))
- Add support for forward translog reading ([#20163](https://github.com/opensearch-project/OpenSearch/pull/20163)

### Changed
- Combining filter rewrite and skip list to optimize sub aggregation([#19573](https://github.com/opensearch-project/OpenSearch/pull/19573))
Expand Down
27 changes: 27 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,24 @@ public static IndexMergePolicy fromString(String text) {
Property.Dynamic,
Property.IndexScope
);
/**
* Controls whether translog operations are read in forward order (oldest to newest) or backward order (newest to oldest).
* Default is false (backward reading), which is the traditional behavior that naturally handles sequence number collisions
* by prioritizing operations from newer generations.
* <p>
* <b>Note:</b> Enabling forward reading is safe for most use cases. However, in rare edge cases, it may replay stale
* translog operations. Stale operation trimming (via
* {@link org.opensearch.index.shard.IndexShard#trimOperationOfPreviousPrimaryTerms(long)}) occurs during the recovery
* finalization phase. If a replica fails before completing
* {@link org.opensearch.indices.recovery.RecoveryTarget#finalizeRecovery(long, long, org.opensearch.core.action.ActionListener)}
* (leaving untrimmed stale operations) and no in-sync copies are available, we force-allocate this recovering replica as primary.
* In this scenario, forward reading could return outdated operations from previous primary terms.
*/
public static final Setting<Boolean> INDEX_TRANSLOG_READ_FORWARD_SETTING = Setting.boolSetting(
"index.translog.read_forward",
false,
Property.IndexScope
);
public static final Setting<Boolean> INDEX_WARMER_ENABLED_SETTING = Setting.boolSetting(
"index.warmer.enabled",
true,
Expand Down Expand Up @@ -892,6 +910,7 @@ public static IndexMergePolicy fromString(String text) {
private final boolean queryStringAllowLeadingWildcard;
private final boolean defaultAllowUnmappedFields;
private volatile Translog.Durability durability;
private final boolean translogReadForward;
private volatile TimeValue syncInterval;
private volatile TimeValue publishReferencedSegmentsInterval;
private volatile TimeValue refreshInterval;
Expand Down Expand Up @@ -1112,6 +1131,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
this.defaultAllowUnmappedFields = scopedSettings.get(ALLOW_UNMAPPED);
this.allowDerivedField = scopedSettings.get(ALLOW_DERIVED_FIELDS);
this.durability = scopedSettings.get(INDEX_TRANSLOG_DURABILITY_SETTING);
this.translogReadForward = INDEX_TRANSLOG_READ_FORWARD_SETTING.get(settings);
defaultFields = scopedSettings.get(DEFAULT_FIELD_SETTING);
syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings);
publishReferencedSegmentsInterval = INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING.get(settings);
Expand Down Expand Up @@ -2078,6 +2098,13 @@ public boolean isSoftDeleteEnabled() {
return softDeleteEnabled;
}

/**
* Returns <code>true</code> if translog read-forward is enabled.
*/
public boolean isTranslogReadForward() {
return translogReadForward;
}

public boolean isContextAwareEnabled() {
return contextAwareEnabled && FeatureFlags.isEnabled(CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_SETTING);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,19 @@ final class MultiSnapshot implements Translog.Snapshot {
private final Closeable onClose;
private int index;
private final SeqNoSet seenSeqNo;
private final boolean readForward;

/**
* Creates a new point in time snapshot of the given snapshots. Those snapshots are always iterated in-order.
*/
MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose) {
MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose, boolean readForward) {
this.translogs = translogs;
this.totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum();
this.overriddenOperations = 0;
this.onClose = onClose;
this.seenSeqNo = new SeqNoSet();
this.index = translogs.length - 1;
this.readForward = readForward;
this.index = readForward ? 0 : translogs.length - 1;
}

@Override
Expand All @@ -79,15 +81,30 @@ public int skippedOperations() {

@Override
public Translog.Operation next() throws IOException {
// TODO: Read translog forward in 9.0+
for (; index >= 0; index--) {
final TranslogSnapshot current = translogs[index];
Translog.Operation op;
while ((op = current.next()) != null) {
if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) {
return op;
} else {
overriddenOperations++;
if (readForward) {
// Read forward: from index 0 to translogs.length - 1
for (; index < translogs.length; index++) {
final TranslogSnapshot current = translogs[index];
Translog.Operation op;
while ((op = current.next()) != null) {
if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) {
return op;
} else {
overriddenOperations++;
}
}
}
} else {
// Read backward (original behavior): from translogs.length - 1 to 0
for (; index >= 0; index--) {
final TranslogSnapshot current = translogs[index];
Translog.Operation op;
while ((op = current.next()) != null) {
if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) {
return op;
} else {
overriddenOperations++;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,8 @@ private Snapshot newMultiSnapshot(TranslogSnapshot[] snapshots) throws IOExcepti
}
boolean success = false;
try {
Snapshot result = new MultiSnapshot(snapshots, onClose);
boolean readForward = indexSettings().isTranslogReadForward();
Snapshot result = new MultiSnapshot(snapshots, onClose, readForward);
success = true;
return result;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,74 @@ public void testSeqNoCollision() throws Exception {
}
}

public void testSeqNoCollisionWithReadForward() throws Exception {
try (
ReplicationGroup shards = createGroup(
2,
Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1")
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")
.put(IndexSettings.INDEX_TRANSLOG_READ_FORWARD_SETTING.getKey(), true)
.build()
)
) {
shards.startAll();
int initDocs = shards.indexDocs(randomInt(10));
List<IndexShard> replicas = shards.getReplicas();
IndexShard replica1 = replicas.get(0);
IndexShard replica2 = replicas.get(1);
shards.syncGlobalCheckpoint();

logger.info("--> Isolate replica1");
IndexRequest indexDoc1 = new IndexRequest(index.getName()).id("d1").source("{}", MediaTypeRegistry.JSON);
BulkShardRequest replicationRequest = indexOnPrimary(indexDoc1, shards.getPrimary());
indexOnReplica(replicationRequest, shards, replica2);

final Translog.Operation op1;
final List<Translog.Operation> initOperations = new ArrayList<>(initDocs);
try (Translog.Snapshot snapshot = getTranslog(replica2).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
for (int i = 0; i < initDocs; i++) {
Translog.Operation op = snapshot.next();
assertThat(op, is(notNullValue()));
initOperations.add(op);
}
op1 = snapshot.next();
assertThat(op1, notNullValue());
assertThat(snapshot.next(), nullValue());
assertThat(snapshot.skippedOperations(), equalTo(0));
}
logger.info("--> Promote replica1 as the primary");
shards.promoteReplicaToPrimary(replica1).get(); // wait until resync completed.
shards.index(new IndexRequest(index.getName()).id("d2").source("{}", MediaTypeRegistry.JSON));
final Translog.Operation op2;
try (Translog.Snapshot snapshot = getTranslog(replica2).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(1));
op2 = snapshot.next();
assertThat(op2.seqNo(), equalTo(op1.seqNo()));
assertThat(op2.primaryTerm(), greaterThan(op1.primaryTerm()));
assertNull(snapshot.next());
assertThat(snapshot.skippedOperations(), equalTo(0));
}

// Make sure that peer-recovery transfers all but non-overridden operations with forward reading.
IndexShard replica3 = shards.addReplica();
logger.info("--> Promote replica2 as the primary");
shards.promoteReplicaToPrimary(replica2).get();
logger.info("--> Recover replica3 from replica2");
recoverReplica(replica3, replica2, true);
try (Translog.Snapshot snapshot = replica3.newChangesSnapshot("test", 0, Long.MAX_VALUE, false, true)) {
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
final List<Translog.Operation> expectedOps = new ArrayList<>(initOperations);
expectedOps.add(op2);
assertThat(snapshot, containsOperationsInAnyOrder(expectedOps));
assertThat("Peer-recovery should not send overridden operations", snapshot.skippedOperations(), equalTo(0));
}
shards.assertAllEqual(initDocs + 1);
}
}

/**
* This test ensures the consistency between primary and replica with late and out of order delivery on the replica.
* An index operation on the primary is followed by a delete operation. The delete operation is delivered first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3873,6 +3873,66 @@ public void testSnapshotReadOperationInReverse() throws Exception {
}
}

public void testSnapshotReadOperationForward() throws Exception {
Path tempDir = createTempDir();
final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT)
.put(IndexSettings.INDEX_TRANSLOG_READ_FORWARD_SETTING.getKey(), true)
.build();
final TranslogConfig forwardConfig = getTranslogConfig(tempDir, settings);

final String translogUUID = Translog.createEmptyTranslog(
forwardConfig.getTranslogPath(),
SequenceNumbers.NO_OPS_PERFORMED,
shardId,
primaryTerm.get()
);

// Create a separate translog instance with forward reading enabled
try (
Translog forwardTranslog = new LocalTranslog(
forwardConfig,
translogUUID,
createTranslogDeletionPolicy(forwardConfig.getIndexSettings()),
() -> globalCheckpoint.get(),
primaryTerm::get,
getPersistedSeqNoConsumer(),
TranslogOperationHelper.DEFAULT,
null
)
) {
final List<List<Translog.Operation>> views = new ArrayList<>();
views.add(new ArrayList<>());
final AtomicLong seqNo = new AtomicLong();

final int generations = randomIntBetween(2, 20);
for (int gen = 0; gen < generations; gen++) {
final int operations = randomIntBetween(1, 100);
for (int i = 0; i < operations; i++) {
Translog.Index op = new Translog.Index(
randomAlphaOfLength(10),
seqNo.getAndIncrement(),
primaryTerm.get(),
new byte[] { 1 }
);
forwardTranslog.add(op);
views.get(views.size() - 1).add(op);
}
if (frequently()) {
forwardTranslog.rollGeneration();
views.add(new ArrayList<>());
}
}
try (Translog.Snapshot snapshot = forwardTranslog.newSnapshot()) {
final List<Translog.Operation> expectedSeqNo = new ArrayList<>();
for (List<Translog.Operation> view : views) {
expectedSeqNo.addAll(view);
}
assertThat(snapshot, SnapshotMatchers.equalsTo(expectedSeqNo));
}
}
}

public void testSnapshotDedupOperations() throws Exception {
final Map<Long, Translog.Operation> latestOperations = new HashMap<>();
final int generations = between(2, 20);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,4 +529,45 @@ public void testRecoveryTrimsLocalTranslog() throws Exception {
}
}
}

public void testRecoveryTrimsLocalTranslogWithReadForward() throws Exception {
Settings settings = Settings.builder().put(IndexSettings.INDEX_TRANSLOG_READ_FORWARD_SETTING.getKey(), true).build();
try (ReplicationGroup shards = createGroup(between(1, 2), settings, new InternalEngineFactory())) {
shards.startAll();
IndexShard oldPrimary = shards.getPrimary();
shards.indexDocs(scaledRandomIntBetween(1, 100));
if (randomBoolean()) {
shards.flush();
}
int inflightDocs = scaledRandomIntBetween(1, 100);
for (int i = 0; i < inflightDocs; i++) {
final IndexRequest indexRequest = new IndexRequest(index.getName()).id("extra_" + i).source("{}", MediaTypeRegistry.JSON);
final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary);
for (IndexShard replica : randomSubsetOf(shards.getReplicas())) {
indexOnReplica(bulkShardRequest, shards, replica);
}
if (rarely()) {
shards.flush();
}
}
shards.syncGlobalCheckpoint();
shards.promoteReplicaToPrimary(randomFrom(shards.getReplicas())).get();
oldPrimary.close("demoted", false, false);
oldPrimary.store().close();
oldPrimary = shards.addReplicaWithExistingPath(oldPrimary.shardPath(), oldPrimary.routingEntry().currentNodeId());
shards.recoverReplica(oldPrimary);
for (IndexShard shard : shards) {
assertConsistentHistoryBetweenTranslogAndLucene(shard);
}
final List<DocIdSeqNoAndSource> docsAfterRecovery = getDocIdAndSeqNos(shards.getPrimary());
for (IndexShard shard : shards.getReplicas()) {
assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery));
}
shards.promoteReplicaToPrimary(oldPrimary).get();
for (IndexShard shard : shards) {
assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery));
assertConsistentHistoryBetweenTranslogAndLucene(shard);
}
}
}
}
Loading