Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.replication.TransportReplicationAction;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.engine.NRTReplicationEngine;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction;
import org.opensearch.plugins.EnginePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.RemoteTransportException;
import org.opensearch.transport.TransportService;
import org.junit.Before;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationPrimaryPromotionIT extends SegmentReplicationBaseIT {
private static boolean lockEnable;
private static CountDownLatch indexLuceneLatch;
private static CountDownLatch flushLatch;
private static CountDownLatch refreshLatch;

@Before
public void setup() {
lockEnable = false;
indexLuceneLatch = new CountDownLatch(1);
flushLatch = new CountDownLatch(1);
refreshLatch = new CountDownLatch(1);
internalCluster().startClusterManagerOnlyNode();
}

@Override
protected Collection<Class<? extends Plugin>> getMockPlugins() {
List<Class<? extends Plugin>> plugins = super.getMockPlugins().stream()
.filter(plugin -> !plugin.getName().contains("MockEngineFactoryPlugin"))
.collect(java.util.stream.Collectors.toList());
plugins.add(MockEnginePlugin.class);
return plugins;
}

public static class MockEnginePlugin extends Plugin implements EnginePlugin {
@Override
public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSettings) {
return Optional.of(new MockEngineFactory());
}
}

public static class MockEngineFactory implements EngineFactory {
@Override
public Engine newReadWriteEngine(EngineConfig config) {
return config.isReadOnlyReplica() ? new MockNRTReplicationEngine(config) : new MockInternalEngine(config);
}
}

public static class MockInternalEngine extends InternalEngine {
MockInternalEngine(EngineConfig config) throws EngineException {
super(config);
}

@Override
protected long generateSeqNoForOperationOnPrimary(final Operation operation) {
long seqNo = super.generateSeqNoForOperationOnPrimary(operation);
try {
if (lockEnable) {
flushLatch.countDown();
indexLuceneLatch.await();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return seqNo;
}
}

public static class MockNRTReplicationEngine extends NRTReplicationEngine {
MockNRTReplicationEngine(EngineConfig config) throws EngineException {
super(config);
}

@Override
public IndexResult index(Index index) throws IOException {
IndexResult indexResult = super.index(index);
if (lockEnable) {
refreshLatch.countDown();
}
return indexResult;
}
}

// Used to test that primary promotion does not result in data loss.
public void testPrimaryStopped_ReplicaPromoted_no_data_loss() throws Exception {
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put("index.refresh_interval", -1).build());
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get();
lockEnable = true;
Thread writeThread = new Thread(() -> { client().prepareIndex(INDEX_NAME).setId("2").setSource("foo2", "bar2").get(); });
writeThread.start();
assertTrue("flushLatch timed out", flushLatch.await(30, TimeUnit.SECONDS));

flush(INDEX_NAME);

waitForSearchableDocs(1, replica);

// mock network exception
MockTransportService replicaTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
replica
));
replicaTransportService.addRequestHandlingBehavior(
PublishCheckpointAction.ACTION_NAME + TransportReplicationAction.REPLICA_ACTION_SUFFIX,
(handler, request, channel, task) -> {
throw new RemoteTransportException("mock remote transport exception", new OpenSearchRejectedExecutionException());
}
);

refresh(INDEX_NAME);
waitForSearchableDocs(1, primary);
indexLuceneLatch.countDown();
assertTrue("refreshLatch timed out", refreshLatch.await(30, TimeUnit.SECONDS));
writeThread.join();

logger.info("refresh index");
refresh(INDEX_NAME);
flush(INDEX_NAME);
waitForSearchableDocs(2, primary);
waitForSearchableDocs(1, replica);

// stop the primary node - we only have one shard on here.
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary));
ensureYellowAndNoInitializingShards(INDEX_NAME);

final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replica);
assertNotNull(replicaShardRouting);
assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary());

refresh(INDEX_NAME);
SearchResponse response = client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get();
assertEquals(2L, response.getHits().getTotalHits().value());
replicaTransportService.clearAllRules();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.engine.IngestionEngine;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.engine.MergedSegmentWarmerFactory;
import org.opensearch.index.engine.NRTReplicationEngine;
import org.opensearch.index.engine.ReadOnlyEngine;
Expand Down Expand Up @@ -4024,6 +4025,13 @@ protected Engine getEngineOrNull() {
return this.currentEngineReference.get();
}

// Only used for initializing segment replication CopyState
public long getLastRefreshedCheckpoint() {
Engine engine = getEngine();
assert engine instanceof InternalEngine;
return ((InternalEngine) engine).lastRefreshedCheckpoint();
}

public void startRecovery(
RecoveryState recoveryState,
PeerRecoveryTargetService recoveryTargetService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.store.ByteBuffersIndexOutput;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
Expand All @@ -22,6 +23,8 @@
import java.io.UncheckedIOException;
import java.util.Map;

import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;

/**
* An Opensearch-specific version of Lucene's CopyState class that
* holds incRef'd file level details for one point-in-time segment infos.
Expand All @@ -38,15 +41,23 @@ public class CopyState implements Closeable {

public CopyState(IndexShard shard) throws IOException {
this.shard = shard;
long lastRefreshedCheckpoint = shard.getLastRefreshedCheckpoint();
final Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> latestSegmentInfosAndCheckpoint = shard
.getLatestSegmentInfosAndCheckpoint();
this.segmentInfosRef = latestSegmentInfosAndCheckpoint.v1();
this.replicationCheckpoint = latestSegmentInfosAndCheckpoint.v2();
SegmentInfos segmentInfos = this.segmentInfosRef.get();

SegmentInfos segmentInfosSnapshot = segmentInfos.clone();
Map<String, String> userData = segmentInfosSnapshot.getUserData();
userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(lastRefreshedCheckpoint));
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(lastRefreshedCheckpoint));
segmentInfosSnapshot.setUserData(userData, false);

ByteBuffersDataOutput buffer = new ByteBuffersDataOutput();
// resource description and name are not used, but resource description cannot be null
try (ByteBuffersIndexOutput indexOutput = new ByteBuffersIndexOutput(buffer, "", null)) {
segmentInfos.write(indexOutput);
segmentInfosSnapshot.write(indexOutput);
}
this.infosBytes = buffer.toArrayCopy();
}
Expand Down
Loading