Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.tests.index.ForceMergePolicy;
import org.apache.lucene.util.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
Expand All @@ -21,6 +22,7 @@
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.mapper.ParsedDocument;
import org.opensearch.index.seqno.LocalCheckpointTracker;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.Store;
Expand Down Expand Up @@ -58,42 +60,48 @@ public void testAcquireLastIndexCommit() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

try (
final Store primaryStore = createStore();
final InternalEngine primaryEngine = createEngine(
defaultSettings,
primaryStore,
createTempDir(),
new ForceMergePolicy(newMergePolicy())
);
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, INDEX_SETTINGS)
) {
// only index 2 docs here, this will create segments _0 and _1 and after forcemerge into _2.
final int docCount = 2;
List<Engine.Operation> operations = generateHistoryOnReplica(docCount, randomBoolean(), randomBoolean(), randomBoolean());
for (Engine.Operation op : operations) {
applyOperation(engine, op);
// Index 2 docs with distinct IDs, this will create segments _0 and _1 and after forcemerge into _2.
for (int i = 0; i < 2; i++) {
final ParsedDocument doc = createParsedDoc(Integer.toString(i), null);
final Engine.Index op = replicaIndexForDoc(doc, 1, i, false);
applyOperation(primaryEngine, op);
applyOperation(nrtEngine, op);
// refresh to create a lot of segments.
engine.refresh("test");
primaryEngine.refresh("test");
}
assertEquals(2, engine.segmentsStats(false, false).getCount());
assertEquals(2, primaryEngine.segmentsStats(false, false).getCount());
// wipe the nrt directory initially so we can sync with primary.
Lucene.cleanLuceneIndex(nrtEngineStore.directory());
for (String file : engine.getLatestSegmentInfos().files(true)) {
nrtEngineStore.directory().copyFrom(store.directory(), file, file, IOContext.DEFAULT);
for (String file : primaryEngine.getLatestSegmentInfos().files(true)) {
nrtEngineStore.directory().copyFrom(primaryStore.directory(), file, file, IOContext.DEFAULT);
}
nrtEngine.updateSegments(engine.getLatestSegmentInfos());
nrtEngine.updateSegments(primaryEngine.getLatestSegmentInfos());
nrtEngine.flush(false, true);

// acquire latest commit
GatedCloseable<IndexCommit> indexCommitGatedCloseable = nrtEngine.acquireLastIndexCommit(false);
List<String> replicaFiles = List.of(nrtEngine.store.directory().listAll());

// merge primary down to 1 segment
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
primaryEngine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());

final Collection<String> latestPrimaryFiles = engine.getLatestSegmentInfos().files(false);
final Collection<String> latestPrimaryFiles = primaryEngine.getLatestSegmentInfos().files(false);
// copy new segments in and load reader.
for (String file : latestPrimaryFiles) {
if (replicaFiles.contains(file) == false) {
nrtEngineStore.directory().copyFrom(store.directory(), file, file, IOContext.DEFAULT);
nrtEngineStore.directory().copyFrom(primaryStore.directory(), file, file, IOContext.DEFAULT);
}
}
nrtEngine.updateSegments(engine.getLatestSegmentInfos());
nrtEngine.updateSegments(primaryEngine.getLatestSegmentInfos());
nrtEngine.flush(false, true);

// Verify that the files contained in indexCommitGatedCloseable will not be deleted.
Expand Down Expand Up @@ -452,47 +460,53 @@ public void testGetSegmentInfosSnapshotPreservesFilesUntilRelease() throws Excep
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

try (
final Store primaryStore = createStore();
final InternalEngine primaryEngine = createEngine(
defaultSettings,
primaryStore,
createTempDir(),
new ForceMergePolicy(newMergePolicy())
);
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, INDEX_SETTINGS)
) {
// only index 2 docs here, this will create segments _0 and _1 and after forcemerge into _2.
final int docCount = 2;
List<Engine.Operation> operations = generateHistoryOnReplica(docCount, randomBoolean(), randomBoolean(), randomBoolean());
for (Engine.Operation op : operations) {
applyOperation(engine, op);
// Index 2 docs with distinct IDs, this will create segments _0 and _1 and after forcemerge into _2.
for (int i = 0; i < 2; i++) {
final ParsedDocument doc = createParsedDoc(Integer.toString(i), null);
final Engine.Index op = replicaIndexForDoc(doc, 1, i, false);
applyOperation(primaryEngine, op);
applyOperation(nrtEngine, op);
// refresh to create a lot of segments.
engine.refresh("test");
primaryEngine.refresh("test");
}
assertEquals(2, engine.segmentsStats(false, false).getCount());
assertEquals(2, primaryEngine.segmentsStats(false, false).getCount());
// wipe the nrt directory initially so we can sync with primary.
Lucene.cleanLuceneIndex(nrtEngineStore.directory());
assertFalse(
Arrays.stream(nrtEngineStore.directory().listAll())
.anyMatch(file -> file.equals("write.lock") == false && file.equals("extra0") == false)
);
for (String file : engine.getLatestSegmentInfos().files(true)) {
nrtEngineStore.directory().copyFrom(store.directory(), file, file, IOContext.DEFAULT);
for (String file : primaryEngine.getLatestSegmentInfos().files(true)) {
nrtEngineStore.directory().copyFrom(primaryStore.directory(), file, file, IOContext.DEFAULT);
}
nrtEngine.updateSegments(engine.getLatestSegmentInfos());
assertEquals(engine.getLatestSegmentInfos(), nrtEngine.getLatestSegmentInfos());
nrtEngine.updateSegments(primaryEngine.getLatestSegmentInfos());
assertEquals(primaryEngine.getLatestSegmentInfos(), nrtEngine.getLatestSegmentInfos());
final GatedCloseable<SegmentInfos> snapshot = nrtEngine.getSegmentInfosSnapshot();
final Collection<String> replicaSnapshotFiles = snapshot.get().files(false);
List<String> replicaFiles = List.of(nrtEngine.store.directory().listAll());

// merge primary down to 1 segment
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
primaryEngine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
// we expect a 3rd segment to be created after merge.
assertEquals(3, engine.segmentsStats(false, false).getCount());
final Collection<String> latestPrimaryFiles = engine.getLatestSegmentInfos().files(false);
assertEquals(3, primaryEngine.segmentsStats(false, false).getCount());
final Collection<String> latestPrimaryFiles = primaryEngine.getLatestSegmentInfos().files(false);

// copy new segments in and load reader.
for (String file : latestPrimaryFiles) {
if (replicaFiles.contains(file) == false) {
nrtEngineStore.directory().copyFrom(store.directory(), file, file, IOContext.DEFAULT);
nrtEngineStore.directory().copyFrom(primaryStore.directory(), file, file, IOContext.DEFAULT);
}
}
nrtEngine.updateSegments(engine.getLatestSegmentInfos());
nrtEngine.updateSegments(primaryEngine.getLatestSegmentInfos());

replicaFiles = List.of(nrtEngine.store.directory().listAll());
assertTrue(replicaFiles.containsAll(replicaSnapshotFiles));
Expand Down
Loading