Skip to content

Commit 17d5398

Browse files
author
Tanvir Alam
committed
Merge upstream/feature/datafusion into feature/datafusion-52-liquid-cache
2 parents 13e10d5 + 81d0c1d commit 17d5398

File tree

77 files changed

+2443
-305
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+2443
-305
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.vectorized.execution.search.spi;
10+
11+
import java.util.List;
12+
import java.util.Map;
13+
14+
/**
15+
* Service Provider Interface for query execution results.
16+
* Implementations provide access to columnar query results from different execution engines.
17+
*
18+
* @opensearch.experimental
19+
*/
20+
public interface QueryResult {
21+
22+
/**
23+
* Returns the columnar result data where each entry maps a column name to its list of values.
24+
*
25+
* @return Map of column names to their corresponding value lists
26+
*/
27+
Map<String, List<Object>> getColumns();
28+
}

modules/parquet-data-format/benchmarks/src/main/java/com/parquet/parquetdataformat/benchmark/ParquetWriterCloseBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public void setup() throws IOException {
6060
writerCreationBenchmarkData = generator.generate("simple", fieldCount, 0);
6161
writerWriteBenchmarkData = generator.generate("simple", fieldCount, recordCount);
6262
filePath = generateTempFilePath();
63-
RustBridge.createWriter(filePath, writerCreationBenchmarkData.getArrowSchema().memoryAddress());
63+
RustBridge.createWriter(filePath, "benchmark-index", writerCreationBenchmarkData.getArrowSchema().memoryAddress());
6464
RustBridge.write(filePath, writerWriteBenchmarkData.getArrowArray().memoryAddress(), writerWriteBenchmarkData.getArrowSchema().memoryAddress());
6565
}
6666

modules/parquet-data-format/benchmarks/src/main/java/com/parquet/parquetdataformat/benchmark/ParquetWriterCreateBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public void tearDown() throws IOException {
8181
@Benchmark
8282
public void benchmarkCreate() throws IOException {
8383
// This is what we're benchmarking - just writer creation
84-
RustBridge.createWriter(filePath, writerCreationBenchmarkData.getArrowSchema().memoryAddress());
84+
RustBridge.createWriter(filePath, "benchmark-index", writerCreationBenchmarkData.getArrowSchema().memoryAddress());
8585
}
8686

8787
private String generateTempFilePath() {

modules/parquet-data-format/benchmarks/src/main/java/com/parquet/parquetdataformat/benchmark/ParquetWriterWriteBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public void setup() throws IOException {
3939
writerCreationBenchmarkData = generator.generate("simple", fieldCount, 0);
4040
writerWriteBenchmarkData = generator.generate("simple", fieldCount, recordCount);
4141
filePath = generateTempFilePath();
42-
RustBridge.createWriter(filePath, writerCreationBenchmarkData.getArrowSchema().memoryAddress());
42+
RustBridge.createWriter(filePath, "benchmark-index", writerCreationBenchmarkData.getArrowSchema().memoryAddress());
4343
}
4444

4545
@Benchmark

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/ParquetDataFormatPlugin.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.opensearch.core.xcontent.NamedXContentRegistry;
2424
import org.opensearch.env.Environment;
2525
import org.opensearch.env.NodeEnvironment;
26-
import org.opensearch.index.engine.DataFormatPlugin;
2726
import org.opensearch.index.engine.exec.DataFormat;
2827
import org.opensearch.index.engine.exec.IndexingExecutionEngine;
2928
import com.parquet.parquetdataformat.bridge.RustBridge;
@@ -78,22 +77,13 @@
7877
* <li>Memory management via {@link com.parquet.parquetdataformat.memory} package</li>
7978
* </ul>
8079
*/
81-
public class ParquetDataFormatPlugin extends Plugin implements DataFormatPlugin, DataSourcePlugin {
80+
public class ParquetDataFormatPlugin extends Plugin implements DataSourcePlugin {
8281
private Settings settings;
8382

84-
public static String DEFAULT_MAX_NATIVE_ALLOCATION = "10%";
85-
86-
public static final Setting<String> INDEX_MAX_NATIVE_ALLOCATION = Setting.simpleString(
87-
"index.parquet.max_native_allocation",
88-
DEFAULT_MAX_NATIVE_ALLOCATION,
89-
Setting.Property.NodeScope,
90-
Setting.Property.Dynamic
91-
);
92-
9383
@Override
9484
@SuppressWarnings("unchecked")
95-
public <T extends DataFormat> IndexingExecutionEngine<T> indexingEngine(MapperService mapperService, ShardPath shardPath) {
96-
return (IndexingExecutionEngine<T>) new ParquetExecutionEngine(settings, () -> ArrowSchemaBuilder.getSchema(mapperService), shardPath);
85+
public <T extends DataFormat> IndexingExecutionEngine<T> indexingEngine(MapperService mapperService, ShardPath shardPath, IndexSettings indexSettings) {
86+
return (IndexingExecutionEngine<T>) new ParquetExecutionEngine(settings, () -> ArrowSchemaBuilder.getSchema(mapperService), shardPath, indexSettings);
9787
}
9888

9989
@Override
@@ -148,7 +138,16 @@ public BlobContainer createBlobContainer(BlobStore blobStore, BlobPath baseBlobP
148138

149139
@Override
150140
public List<Setting<?>> getSettings() {
151-
return List.of(INDEX_MAX_NATIVE_ALLOCATION);
141+
return List.of(
142+
ParquetSettings.MAX_NATIVE_ALLOCATION,
143+
ParquetSettings.PARQUET_SETTINGS,
144+
ParquetSettings.ROW_GROUP_SIZE_BYTES,
145+
ParquetSettings.PAGE_SIZE_BYTES,
146+
ParquetSettings.PAGE_ROW_LIMIT,
147+
ParquetSettings.DICT_SIZE_BYTES,
148+
ParquetSettings.COMPRESSION_TYPE,
149+
ParquetSettings.COMPRESSION_LEVEL
150+
);
152151
}
153152

154153
// for testing locally only
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package com.parquet.parquetdataformat;
10+
11+
import org.opensearch.common.settings.Setting;
12+
import org.opensearch.common.settings.Settings;
13+
import org.opensearch.core.common.unit.ByteSizeUnit;
14+
import org.opensearch.core.common.unit.ByteSizeValue;
15+
16+
import static org.opensearch.common.settings.WriteableSetting.SettingType.ByteSizeValue;
17+
18+
/**
19+
* Settings for Parquet data format.
20+
*/
21+
public final class ParquetSettings {
22+
23+
private ParquetSettings() {}
24+
25+
public static final String DEFAULT_MAX_NATIVE_ALLOCATION = "10%";
26+
27+
public static final Setting<Settings> PARQUET_SETTINGS = Setting.groupSetting(
28+
"index.parquet.",
29+
Setting.Property.IndexScope
30+
);
31+
32+
public static final Setting<ByteSizeValue> ROW_GROUP_SIZE_BYTES = Setting.byteSizeSetting(
33+
"index.parquet.row_group_size_bytes",
34+
new ByteSizeValue(128, ByteSizeUnit.MB),
35+
Setting.Property.IndexScope
36+
);
37+
38+
public static final Setting<ByteSizeValue> PAGE_SIZE_BYTES = Setting.byteSizeSetting(
39+
"index.parquet.page_size_bytes",
40+
new ByteSizeValue(1, ByteSizeUnit.MB),
41+
Setting.Property.IndexScope
42+
);
43+
44+
public static final Setting<Integer> PAGE_ROW_LIMIT = Setting.intSetting(
45+
"index.parquet.page_row_limit",
46+
20000,
47+
1,
48+
Setting.Property.IndexScope
49+
);
50+
51+
public static final Setting<ByteSizeValue> DICT_SIZE_BYTES = Setting.byteSizeSetting(
52+
"index.parquet.dict_size_bytes",
53+
new ByteSizeValue(2, ByteSizeUnit.MB),
54+
Setting.Property.IndexScope
55+
);
56+
57+
public static final Setting<String> COMPRESSION_TYPE = Setting.simpleString(
58+
"index.parquet.compression_type",
59+
"ZSTD",
60+
Setting.Property.IndexScope
61+
);
62+
63+
public static final Setting<Integer> COMPRESSION_LEVEL = Setting.intSetting(
64+
"index.parquet.compression_level",
65+
2,
66+
1,
67+
9,
68+
Setting.Property.IndexScope
69+
);
70+
71+
public static final Setting<String> MAX_NATIVE_ALLOCATION = Setting.simpleString(
72+
"index.parquet.max_native_allocation",
73+
DEFAULT_MAX_NATIVE_ALLOCATION,
74+
Setting.Property.NodeScope
75+
);
76+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package com.parquet.parquetdataformat.bridge;
10+
11+
/**
12+
* Field-level configuration that overrides index-level defaults.
13+
* Null values inherit from index-level configuration.
14+
*/
15+
public class FieldConfig {
16+
17+
private Integer compressionLevel;
18+
private String compressionType;
19+
20+
public Integer getCompressionLevel() { return compressionLevel; }
21+
public void setCompressionLevel(Integer compressionLevel) { this.compressionLevel = compressionLevel; }
22+
23+
public String getCompressionType() { return compressionType; }
24+
public void setCompressionType(String compressionType) { this.compressionType = compressionType; }
25+
}

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/NativeParquetWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ public class NativeParquetWriter implements Closeable {
2626
* @param schemaAddress Arrow C Data Interface schema pointer
2727
* @throws IOException if writer creation fails
2828
*/
29-
public NativeParquetWriter(String filePath, long schemaAddress) throws IOException {
29+
public NativeParquetWriter(String filePath, String indexName, long schemaAddress) throws IOException {
3030
this.filePath = filePath;
31-
RustBridge.createWriter(filePath, schemaAddress);
31+
RustBridge.createWriter(filePath, indexName, schemaAddress);
3232
}
3333

3434
/**
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package com.parquet.parquetdataformat.bridge;
10+
11+
import java.util.HashMap;
12+
import java.util.Map;
13+
14+
/**
15+
* Native settings passed to the Rust layer, covering writer and future merge configurations.
16+
* Field-level configs override these defaults.
17+
*/
18+
public class NativeSettings {
19+
20+
// Index identifier
21+
private String indexName;
22+
23+
// Index-level defaults
24+
private String compressionType;
25+
private Integer compressionLevel;
26+
private Long pageSizeBytes;
27+
private Integer pageRowLimit;
28+
private Long dictSizeBytes;
29+
private Long rowGroupSizeBytes;
30+
31+
// Field-level overrides
32+
private Map<String, FieldConfig> fieldConfigs;
33+
private Map<String, Object> customSettings;
34+
35+
public String getIndexName() { return indexName; }
36+
public void setIndexName(String indexName) { this.indexName = indexName; }
37+
38+
public String getCompressionType() { return compressionType; }
39+
public void setCompressionType(String compressionType) { this.compressionType = compressionType; }
40+
41+
public Integer getCompressionLevel() { return compressionLevel; }
42+
public void setCompressionLevel(Integer compressionLevel) { this.compressionLevel = compressionLevel; }
43+
44+
public Long getPageSizeBytes() { return pageSizeBytes; }
45+
public void setPageSizeBytes(Long pageSizeBytes) { this.pageSizeBytes = pageSizeBytes; }
46+
47+
public Integer getPageRowLimit() { return pageRowLimit; }
48+
public void setPageRowLimit(Integer pageRowLimit) { this.pageRowLimit = pageRowLimit; }
49+
50+
public Long getDictSizeBytes() { return dictSizeBytes; }
51+
public void setDictSizeBytes(Long dictSizeBytes) { this.dictSizeBytes = dictSizeBytes; }
52+
53+
public Long getRowGroupSizeBytes() { return rowGroupSizeBytes; }
54+
public void setRowGroupSizeBytes(Long rowGroupSizeBytes) { this.rowGroupSizeBytes = rowGroupSizeBytes; }
55+
56+
public Map<String, FieldConfig> getFieldConfigs() { return fieldConfigs; }
57+
public void setFieldConfigs(Map<String, FieldConfig> fieldConfigs) { this.fieldConfigs = fieldConfigs; }
58+
59+
public void addFieldConfig(String fieldName, FieldConfig config) {
60+
if (fieldConfigs == null) fieldConfigs = new HashMap<>();
61+
fieldConfigs.put(fieldName, config);
62+
}
63+
64+
public Map<String, Object> getCustomSettings() { return customSettings; }
65+
public void setCustomSettings(Map<String, Object> customSettings) { this.customSettings = customSettings; }
66+
}

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/RustBridge.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,18 @@ public class RustBridge {
2929
public static native void initLogger();
3030

3131
// Enhanced native methods that handle validation and provide better error reporting
32-
public static native void createWriter(String file, long schemaAddress) throws IOException;
32+
public static native void createWriter(String file, String indexName, long schemaAddress) throws IOException;
3333
public static native void write(String file, long arrayAddress, long schemaAddress) throws IOException;
3434
public static native ParquetFileMetadata closeWriter(String file) throws IOException;
3535
public static native void flushToDisk(String file) throws IOException;
3636
public static native ParquetFileMetadata getFileMetadata(String file) throws IOException;
37+
public static native void onSettingsUpdate(NativeSettings nativeSettings) throws IOException;
38+
39+
public static native void removeSettings(String indexName);
3740

3841
public static native long getFilteredNativeBytesUsed(String pathPrefix);
3942

4043

4144
// Native method declarations - these will be implemented in the JNI library
42-
public static native void mergeParquetFilesInRust(List<Path> inputFiles, String outputFile);
45+
public static native void mergeParquetFilesInRust(List<Path> inputFiles, String outputFile, String indexName);
4346
}

0 commit comments

Comments
 (0)