Skip to content

Commit d8df6c9

Browse files
Shailesh-Kumar-SinghdhwanilpatelmgodwanShailesh SinghShailesh Singh
authored
Add sorting support (#20798)
* Adding sort support during merge operation Signed-off-by: Dhwanil Patel <dhwanip@amazon.com> * Add sorting support * Fix sort flow * Fix date field type for sort in merge * Renamed @timestamp to timestamp * Changing sort to EventDate * Fix memory issue for larger merge * Using polars for merging * Added hybrid approach of polars and heap * optimize merge by grouping contiguous rows * merge with streaming using sink_batches * streaming k way merge * using sink_batch to stream one batch per file * optimize - materialize chunks at flush time * replace threaded sink_batches with sequential ParquetReader slicing to reduce memory usage * optimize - binary search to find cut point in batch + emit entire batch by checking last row in batch * use arrow writer - ipc * use rayon to paralleize writes across columns * add tokio for parallel column writes * add rayon with thread pool for column encoding during flush * add tokio for async write and rayon parallel decoding during reads through the shared rayon thread pool * temp commit * take sort column from index setting during indexing flow * support reverse sort in merge * store and look up settings for every index from settings store * refactor and address comments * add exception handling + take union of all schema to make code extensible for dynamic mapping support * add profiler --------- Signed-off-by: Dhwanil Patel <dhwanip@amazon.com> Co-authored-by: Dhwanil Patel <dhwanip@amazon.com> Co-authored-by: Mohit Godwani <mgodwan@amazon.com> Co-authored-by: Shailesh Singh <shaikumm@amazon.com> Co-authored-by: Shailesh Singh <shaileshkumarsingh260@gmaill.com>
1 parent d60b7ae commit d8df6c9

30 files changed

+2350
-592
lines changed

modules/parquet-data-format/benchmarks/src/main/java/com/parquet/parquetdataformat/benchmark/ParquetWriterCloseBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public void setup() throws IOException {
6060
writerCreationBenchmarkData = generator.generate("simple", fieldCount, 0);
6161
writerWriteBenchmarkData = generator.generate("simple", fieldCount, recordCount);
6262
filePath = generateTempFilePath();
63-
RustBridge.createWriter(filePath, "benchmark-index", writerCreationBenchmarkData.getArrowSchema().memoryAddress());
63+
RustBridge.createWriter(filePath, "benchmark-index", writerCreationBenchmarkData.getArrowSchema().memoryAddress(), "sortColumn", false);
6464
RustBridge.write(filePath, writerWriteBenchmarkData.getArrowArray().memoryAddress(), writerWriteBenchmarkData.getArrowSchema().memoryAddress());
6565
}
6666

modules/parquet-data-format/benchmarks/src/main/java/com/parquet/parquetdataformat/benchmark/ParquetWriterCreateBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public void tearDown() throws IOException {
8181
@Benchmark
8282
public void benchmarkCreate() throws IOException {
8383
// This is what we're benchmarking - just writer creation
84-
RustBridge.createWriter(filePath, "benchmark-index", writerCreationBenchmarkData.getArrowSchema().memoryAddress());
84+
RustBridge.createWriter(filePath, "benchmark-index", writerCreationBenchmarkData.getArrowSchema().memoryAddress(), null, false);
8585
}
8686

8787
private String generateTempFilePath() {

modules/parquet-data-format/benchmarks/src/main/java/com/parquet/parquetdataformat/benchmark/ParquetWriterWriteBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public void setup() throws IOException {
3939
writerCreationBenchmarkData = generator.generate("simple", fieldCount, 0);
4040
writerWriteBenchmarkData = generator.generate("simple", fieldCount, recordCount);
4141
filePath = generateTempFilePath();
42-
RustBridge.createWriter(filePath, "benchmark-index", writerCreationBenchmarkData.getArrowSchema().memoryAddress());
42+
RustBridge.createWriter(filePath, "benchmark-index", writerCreationBenchmarkData.getArrowSchema().memoryAddress(), null, false);
4343
}
4444

4545
@Benchmark

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/NativeParquetWriter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@ public class NativeParquetWriter implements Closeable {
2323
/**
2424
* Creates a new native Parquet writer.
2525
* @param filePath path to the Parquet file
26+
* @param indexName name of the index (used for settings lookup)
2627
* @param schemaAddress Arrow C Data Interface schema pointer
28+
* @param sortColumn column to sort by
29+
* @param reverseSort whether to sort in reverse order
2730
* @throws IOException if writer creation fails
2831
*/
29-
public NativeParquetWriter(String filePath, String indexName, long schemaAddress) throws IOException {
32+
public NativeParquetWriter(String filePath, String indexName, long schemaAddress, String sortColumn, boolean reverseSort) throws IOException {
3033
this.filePath = filePath;
31-
RustBridge.createWriter(filePath, indexName, schemaAddress);
34+
RustBridge.createWriter(filePath, indexName, schemaAddress, sortColumn, reverseSort);
3235
}
3336

3437
/**

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/RustBridge.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class RustBridge {
2929
public static native void initLogger();
3030

3131
// Enhanced native methods that handle validation and provide better error reporting
32-
public static native void createWriter(String file, String indexName, long schemaAddress) throws IOException;
32+
public static native void createWriter(String file, String indexName, long schemaAddress, String sortColumn, boolean reverseSort) throws IOException;
3333
public static native void write(String file, long arrayAddress, long schemaAddress) throws IOException;
3434
public static native ParquetFileMetadata closeWriter(String file) throws IOException;
3535
public static native void flushToDisk(String file) throws IOException;
@@ -42,5 +42,5 @@ public class RustBridge {
4242

4343

4444
// Native method declarations - these will be implemented in the JNI library
45-
public static native void mergeParquetFilesInRust(List<Path> inputFiles, String outputFile, String indexName);
45+
public static native void mergeParquetFilesInRust(List<Path> inputFiles, String outputFile, String indexName, String sortKey, boolean isReverse);
4646
}

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/ParquetExecutionEngine.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ public class ParquetExecutionEngine implements IndexingExecutionEngine<ParquetDa
7777
private final ParquetMerger parquetMerger;
7878
private final ArrowBufferPool arrowBufferPool;
7979
private final IndexSettings indexSettings;
80+
private volatile String sortColumn;
81+
private volatile boolean reverseSort;
8082
private final boolean isPrimaryEngine;
8183

8284
public ParquetExecutionEngine(
@@ -90,7 +92,7 @@ public ParquetExecutionEngine(
9092
this.shardPath = shardPath;
9193
this.arrowBufferPool = new ArrowBufferPool(settings);
9294
this.indexSettings = indexSettings;
93-
this.parquetMerger = new ParquetMergeExecutor(CompactionStrategy.RECORD_BATCH, indexSettings.getIndex().getName());
95+
this.parquetMerger = new ParquetMergeExecutor(CompactionStrategy.RECORD_BATCH);
9496
this.isPrimaryEngine = isPrimaryEngine;
9597
// Push current settings to Rust store once on construction, then keep in sync on updates
9698
pushSettingsToRust(indexSettings);
@@ -108,6 +110,16 @@ public ParquetExecutionEngine(
108110
// );
109111
}
110112

113+
@Override
114+
public void setSortColumn(String sortColumn) {
115+
this.sortColumn = sortColumn;
116+
}
117+
118+
@Override
119+
public void setReverseSort(boolean reverseSort) {
120+
this.reverseSort = reverseSort;
121+
}
122+
111123
private void pushSettingsToRust(IndexSettings indexSettings) {
112124
NativeSettings config = new NativeSettings();
113125
config.setIndexName(indexSettings.getIndex().getName());
@@ -155,7 +167,7 @@ public List<String> supportedFieldTypes(boolean isPrimaryEngine) {
155167
public Writer<ParquetDocumentInput> createWriter(long writerGeneration) {
156168
String fileName = Path.of(shardPath.getDataPath().toString(), getDataFormat().name(), FILE_NAME_PREFIX + "_" + writerGeneration + FILE_NAME_EXT).toString();
157169
EngineRole role = isPrimaryEngine ? EngineRole.PRIMARY : EngineRole.SECONDARY;
158-
return new ParquetWriter(fileName, schema.get(), writerGeneration, arrowBufferPool, indexSettings, role);
170+
return new ParquetWriter(fileName, schema.get(), writerGeneration, arrowBufferPool, indexSettings, sortColumn, reverseSort, role);
159171
}
160172

161173
@Override

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/merge/ParquetMergeExecutor.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,21 @@
88

99
package com.parquet.parquetdataformat.merge;
1010

11-
import org.opensearch.index.engine.exec.WriterFileSet;
11+
import org.opensearch.index.engine.exec.merge.MergeInput;
1212
import org.opensearch.index.engine.exec.merge.MergeResult;
13-
import java.util.List;
14-
1513
/**
1614
* Executes Parquet merge operations using a chosen compaction strategy.
1715
*/
1816
public class ParquetMergeExecutor extends ParquetMerger {
1917

2018
private final ParquetMergeStrategy strategy;
21-
private final String indexName;
2219

23-
public ParquetMergeExecutor(CompactionStrategy compactionStrategy, String indexName) {
20+
public ParquetMergeExecutor(CompactionStrategy compactionStrategy) {
2421
this.strategy = ParquetMergeStrategyFactory.getStrategy(compactionStrategy);
25-
this.indexName = indexName;
2622
}
2723

2824
@Override
29-
public MergeResult merge(List<WriterFileSet> fileMetadataList, long writerGeneration) {
30-
return strategy.mergeParquetFiles(fileMetadataList, writerGeneration, indexName);
25+
public MergeResult merge(MergeInput mergeInput) {
26+
return strategy.mergeParquetFiles(mergeInput);
3127
}
3228
}

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/merge/ParquetMergeStrategy.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88

99
package com.parquet.parquetdataformat.merge;
1010

11-
import org.opensearch.index.engine.exec.WriterFileSet;
11+
12+
import org.opensearch.index.engine.exec.merge.MergeInput;
1213
import org.opensearch.index.engine.exec.merge.MergeResult;
13-
import java.util.List;
1414

1515
/**
1616
* Interface defining a Parquet merge strategy.
@@ -20,6 +20,6 @@ public interface ParquetMergeStrategy {
2020
/**
2121
* Performs the actual Parquet merge.
2222
*/
23-
MergeResult mergeParquetFiles(List<WriterFileSet> files, long writerGeneration, String indexName);
23+
MergeResult mergeParquetFiles(MergeInput mergeInput);
2424

2525
}

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/merge/ParquetMergeStrategyFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public static ParquetMergeStrategy getStrategy(CompactionStrategy compactionStra
1717
switch (compactionStrategy) {
1818
case RECORD_BATCH:
1919
default:
20-
return new RecordBatchMergeStrategy();
20+
return new StreamingParquetMergeStrategy();
2121
}
2222
}
2323
}

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/merge/ParquetMerger.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,14 @@
88

99
package com.parquet.parquetdataformat.merge;
1010

11-
import org.opensearch.index.engine.exec.FileMetadata;
11+
import org.opensearch.index.engine.exec.merge.MergeInput;
1212
import org.opensearch.index.engine.exec.Merger;
13-
import org.opensearch.index.engine.exec.WriterFileSet;
1413
import org.opensearch.index.engine.exec.merge.MergeResult;
1514
import org.opensearch.index.engine.exec.merge.RowIdMapping;
1615

17-
import java.util.Collection;
18-
import java.util.List;
19-
2016
public abstract class ParquetMerger implements Merger {
2117
@Override
22-
public MergeResult merge(List<WriterFileSet> fileMetadataList, RowIdMapping rowIdMapping, long writerGeneration) {
18+
public MergeResult merge(MergeInput mergeInput, RowIdMapping rowIdMapping) {
2319
throw new UnsupportedOperationException("Not supported parquet as secondary data format yet.");
2420
}
2521
}

0 commit comments

Comments
 (0)