Skip to content

Commit a786542

Browse files
authored
Refactoring merge code to act at composite level instead of Parquet (#20119)
Signed-off-by: Dhwanil Patel <[email protected]>
1 parent 23e4717 commit a786542

File tree

14 files changed

+214
-280
lines changed

14 files changed

+214
-280
lines changed

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/merge/ParquetMergeExecutor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@
99
package com.parquet.parquetdataformat.merge;
1010

1111
import org.opensearch.index.engine.exec.FileMetadata;
12+
import org.opensearch.index.engine.exec.WriterFileSet;
1213
import org.opensearch.index.engine.exec.merge.MergeResult;
1314
import java.util.Collection;
15+
import java.util.List;
1416

1517
/**
1618
* Executes Parquet merge operations using a chosen compaction strategy.
@@ -24,7 +26,7 @@ public ParquetMergeExecutor(CompactionStrategy compactionStrategy) {
2426
}
2527

2628
@Override
27-
public MergeResult merge(Collection<FileMetadata> fileMetadataList, long writerGeneration) {
29+
public MergeResult merge(List<WriterFileSet> fileMetadataList, long writerGeneration) {
2830
MergeResult result = strategy.mergeParquetFiles(fileMetadataList, writerGeneration);
2931
strategy.postMerge();
3032
return result;

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/merge/ParquetMergeStrategy.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@
1010

1111

1212
import org.opensearch.index.engine.exec.FileMetadata;
13+
import org.opensearch.index.engine.exec.WriterFileSet;
1314
import org.opensearch.index.engine.exec.merge.MergeResult;
1415

1516
import java.util.Collection;
17+
import java.util.List;
18+
1619
/**
1720
* Interface defining a Parquet merge strategy.
1821
*/
@@ -21,7 +24,7 @@ public interface ParquetMergeStrategy {
2124
/**
2225
* Performs the actual Parquet merge.
2326
*/
24-
MergeResult mergeParquetFiles(Collection<FileMetadata> files, long writerGeneration);
27+
MergeResult mergeParquetFiles(List<WriterFileSet> files, long writerGeneration);
2528

2629
/**
2730
* Optional post-merge hook.

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/merge/ParquetMerger.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,16 @@
1010

1111
import org.opensearch.index.engine.exec.FileMetadata;
1212
import org.opensearch.index.engine.exec.Merger;
13+
import org.opensearch.index.engine.exec.WriterFileSet;
1314
import org.opensearch.index.engine.exec.merge.MergeResult;
1415
import org.opensearch.index.engine.exec.merge.RowIdMapping;
1516

1617
import java.util.Collection;
18+
import java.util.List;
1719

1820
public abstract class ParquetMerger implements Merger {
1921
@Override
20-
public MergeResult merge(Collection<FileMetadata> fileMetadataList, RowIdMapping rowIdMapping, long writerGeneration) {
22+
public MergeResult merge(List<WriterFileSet> fileMetadataList, RowIdMapping rowIdMapping, long writerGeneration) {
2123
throw new UnsupportedOperationException("Not supported parquet as secondary data format yet.");
2224
}
2325
}

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/merge/RecordBatchMergeStrategy.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.opensearch.index.engine.exec.merge.RowIdMapping;
1919

2020
import java.nio.file.Path;
21+
import java.util.ArrayList;
2122
import java.util.Collection;
2223
import java.util.Collections;
2324
import java.util.HashMap;
@@ -34,17 +35,17 @@
3435
public class RecordBatchMergeStrategy implements ParquetMergeStrategy {
3536

3637
@Override
37-
public MergeResult mergeParquetFiles(Collection<FileMetadata> files, long writerGeneration) {
38+
public MergeResult mergeParquetFiles(List<WriterFileSet> files, long writerGeneration) {
3839

3940
if (files.isEmpty()) {
4041
throw new IllegalArgumentException("No files to merge");
4142
}
4243

43-
List<Path> filePaths = files.stream()
44-
.map(fm -> Path.of(fm.directory(), fm.file()))
45-
.collect(Collectors.toList());
44+
List<Path> filePaths = new ArrayList<>();
45+
files.forEach(writerFileSet -> writerFileSet.getFiles().forEach(
46+
file -> filePaths.add(Path.of(writerFileSet.getDirectory(), file))));
4647

47-
String outputDirectory = files.iterator().next().directory();
48+
String outputDirectory = files.iterator().next().getDirectory();
4849
String mergedFilePath = getMergedFilePath(writerGeneration, outputDirectory);
4950
String mergedFileName = getMergedFileName(writerGeneration);
5051

server/src/main/java/org/opensearch/index/TieredMergePolicyProvider.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,8 @@
3939
import org.apache.lucene.index.TieredMergePolicy;
4040
import org.opensearch.common.settings.Setting;
4141
import org.opensearch.common.settings.Setting.Property;
42-
import org.opensearch.common.util.FeatureFlags;
4342
import org.opensearch.core.common.unit.ByteSizeUnit;
4443
import org.opensearch.core.common.unit.ByteSizeValue;
45-
import org.opensearch.index.engine.exec.merge.ParquetTieredMergePolicy;
4644

4745
/**
4846
* A shard in opensearch is a Lucene index, and a Lucene index is broken

server/src/main/java/org/opensearch/index/engine/exec/Merger.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,21 @@
1111
import org.opensearch.index.engine.exec.merge.MergeResult;
1212
import org.opensearch.index.engine.exec.merge.RowIdMapping;
1313

14-
import java.util.Collection;
14+
import java.util.List;
1515

1616
public interface Merger {
1717
/**
1818
*
1919
* @param fileMetadataList List of FileMetadata to merge
2020
* @return MergeResult - having RowIdMapping and mergedFileMetadata
2121
*/
22-
MergeResult merge(Collection<FileMetadata> fileMetadataList, long writerGeneration);
22+
MergeResult merge(List<WriterFileSet> fileMetadataList, long writerGeneration);
2323

2424
/**
2525
*
2626
* @param fileMetadataList List of FileMetadata to merge
2727
* @param rowIdMapping Mapping of old segment + old rowId to new rowId
2828
* @return MergeResult - having mergedFileMetadata
2929
*/
30-
MergeResult merge(Collection<FileMetadata> fileMetadataList, RowIdMapping rowIdMapping, long writerGeneration);
30+
MergeResult merge(List<WriterFileSet> fileMetadataList, RowIdMapping rowIdMapping, long writerGeneration);
3131
}

server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManager.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,14 @@ public synchronized void applyMergeResults(MergeResult mergeResult, OneMerge one
7777
List<CatalogSnapshot.Segment> segmentList = latestCatalogSnapshot.getSegments();
7878

7979
CatalogSnapshot.Segment segmentToAdd = getSegment(mergeResult.getMergedWriterFileSet());
80-
81-
Set<FileMetadata> filesToRemove = new HashSet<>();
82-
oneMerge.getFilesToMerge().forEach(file -> filesToRemove.add(file));
80+
Set<CatalogSnapshot.Segment> segmentsToRemove = new HashSet<>(oneMerge.getSegmentsToMerge());
8381

8482
boolean inserted = false;
8583
int newSegIdx = 0;
8684
for (int segIdx = 0, cnt = segmentList.size(); segIdx < cnt; segIdx++) {
8785
assert segIdx >= newSegIdx;
8886
CatalogSnapshot.Segment currSegment = segmentList.get(segIdx);
89-
if(filesToRemove.containsAll(currSegment.getSearchableFiles(oneMerge.getDataFormat().name()))) {
87+
if(segmentsToRemove.contains(currSegment)) {
9088
if (!inserted) {
9189
segmentList.set(segIdx, segmentToAdd);
9290
inserted = true;

server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import org.opensearch.index.engine.exec.merge.MergeResult;
3939
import org.opensearch.index.engine.exec.merge.MergeScheduler;
4040
import org.opensearch.index.engine.exec.merge.OneMerge;
41-
import org.opensearch.index.engine.exec.merge.ParquetMergeHandler;
41+
import org.opensearch.index.engine.exec.merge.CompositeMergeHandler;
4242
import org.opensearch.index.mapper.IdFieldMapper;
4343
import org.opensearch.index.mapper.MapperService;
4444
import org.opensearch.index.mapper.SeqNoFieldMapper;
@@ -268,7 +268,7 @@ public void onFailure(String reason, Exception ex) {
268268
this.throttle = new IndexThrottle();
269269
this.historyUUID = loadHistoryUUID(userData);
270270
this.mergeHandler =
271-
new ParquetMergeHandler(this, this.engine, this.engine.getDataFormat(), indexSettings);
271+
new CompositeMergeHandler(this, this.engine, this.engine.getDataFormat(), indexSettings);
272272
this.mergeScheduler = new MergeScheduler(this.mergeHandler, this);
273273

274274
// Refresh here so that catalog snapshot gets initialized
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.engine.exec.merge;
10+
11+
import org.opensearch.index.IndexSettings;
12+
import org.opensearch.index.engine.exec.composite.CompositeIndexingExecutionEngine;
13+
import org.opensearch.index.engine.exec.coord.Any;
14+
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
15+
import org.opensearch.index.engine.exec.coord.CompositeEngine;
16+
17+
import java.util.ArrayList;
18+
import java.util.Collection;
19+
import java.util.List;
20+
21+
public class CompositeMergeHandler extends MergeHandler {
22+
23+
private final CompositeMergePolicy mergePolicy;
24+
private final CompositeEngine compositeEngine;
25+
private final CompositeIndexingExecutionEngine compositeIndexingExecutionEngine;
26+
27+
public CompositeMergeHandler(
28+
CompositeEngine compositeEngine,
29+
CompositeIndexingExecutionEngine compositeIndexingExecutionEngine,
30+
Any dataFormats,
31+
IndexSettings indexSettings
32+
) {
33+
super(compositeEngine, compositeIndexingExecutionEngine, dataFormats);
34+
this.compositeEngine = compositeEngine;
35+
this.compositeIndexingExecutionEngine = compositeIndexingExecutionEngine;
36+
37+
mergePolicy = new CompositeMergePolicy(indexSettings.getMergePolicy(true));
38+
}
39+
40+
@Override
41+
public Collection<OneMerge> findForceMerges(int maxSegmentCount) {
42+
List<OneMerge> oneMerges = new ArrayList<>();
43+
try (CompositeEngine.ReleasableRef<CatalogSnapshot> catalogSnapshotReleasableRef = compositeEngine.acquireSnapshot()) {
44+
CatalogSnapshot catalogSnapshot = catalogSnapshotReleasableRef.getRef();
45+
46+
List<CatalogSnapshot.Segment> segmentList = catalogSnapshot.getSegments();
47+
List<List<CatalogSnapshot.Segment>> mergeCandidates =
48+
mergePolicy.findForceMergeCandidates(segmentList, maxSegmentCount);
49+
50+
// Process merge candidates
51+
for (List<CatalogSnapshot.Segment> mergeGroup : mergeCandidates) {
52+
oneMerges.add(new OneMerge(mergeGroup));
53+
}
54+
} catch (Exception e) {
55+
throw new RuntimeException(e);
56+
}
57+
return oneMerges;
58+
}
59+
60+
@Override
61+
public Collection<OneMerge> findMerges() {
62+
List<OneMerge> oneMerges = new ArrayList<>();
63+
try (CompositeEngine.ReleasableRef<CatalogSnapshot> catalogSnapshotReleasableRef = compositeEngine.acquireSnapshot()) {
64+
CatalogSnapshot catalogSnapshot = catalogSnapshotReleasableRef.getRef();
65+
66+
List<CatalogSnapshot.Segment> segmentList = catalogSnapshot.getSegments();
67+
List<List<CatalogSnapshot.Segment>> mergeCandidates =
68+
mergePolicy.findMergeCandidates(segmentList);
69+
70+
// Process merge candidates
71+
for (List<CatalogSnapshot.Segment> mergeGroup : mergeCandidates) {
72+
oneMerges.add(new OneMerge(mergeGroup));
73+
}
74+
} catch (Exception e) {
75+
e.printStackTrace();
76+
throw new RuntimeException(e);
77+
}
78+
return oneMerges;
79+
}
80+
81+
public synchronized void registerMerge(OneMerge oneMerge) {
82+
super.registerMerge(oneMerge);
83+
mergePolicy.addMergingSegment(oneMerge.getSegmentsToMerge());
84+
}
85+
86+
public synchronized void onMergeFinished(OneMerge oneMerge) {
87+
super.onMergeFinished(oneMerge);
88+
mergePolicy.removeMergingSegment(oneMerge.getSegmentsToMerge());
89+
}
90+
91+
public synchronized void onMergeFailure(OneMerge oneMerge) {
92+
super.onMergeFailure(oneMerge);
93+
mergePolicy.removeMergingSegment(oneMerge.getSegmentsToMerge());
94+
}
95+
}

0 commit comments

Comments
 (0)