Skip to content

Commit 3e6aef2

Browse files
authored
Indexing changes for composite engine (#20790)
* Lucene + parquet indexing changes * Add changes for mappers * Add secondary index setting * Add test fix
1 parent 81d0c1d commit 3e6aef2

File tree

101 files changed

+3013
-641
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

101 files changed

+3013
-641
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
.claude
22
CLAUDE.md
33
.cursor*
4+
.kiro*
5+
examples/
46

57
# intellij files
68
.idea/

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/ParquetDataFormatPlugin.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,18 @@
2323
import org.opensearch.core.xcontent.NamedXContentRegistry;
2424
import org.opensearch.env.Environment;
2525
import org.opensearch.env.NodeEnvironment;
26+
import org.opensearch.index.engine.EngineConfig;
2627
import org.opensearch.index.engine.exec.DataFormat;
28+
import org.opensearch.index.engine.exec.FieldAssignments;
29+
import org.opensearch.index.engine.exec.FieldSupportRegistry;
2730
import org.opensearch.index.engine.exec.IndexingExecutionEngine;
2831
import com.parquet.parquetdataformat.bridge.RustBridge;
2932
import com.parquet.parquetdataformat.engine.ParquetExecutionEngine;
33+
import org.opensearch.index.mapper.MapperService;
3034
import org.opensearch.index.shard.ShardPath;
3135
import org.opensearch.index.store.FormatStoreDirectory;
3236
import org.opensearch.index.store.GenericStoreDirectory;
3337
import org.opensearch.plugins.DataSourcePlugin;
34-
import org.opensearch.index.mapper.MapperService;
3538
import org.opensearch.plugins.Plugin;
3639
import org.opensearch.plugins.spi.vectorized.DataSourceCodec;
3740
import org.opensearch.repositories.RepositoriesService;
@@ -82,8 +85,15 @@ public class ParquetDataFormatPlugin extends Plugin implements DataSourcePlugin
8285

8386
@Override
8487
@SuppressWarnings("unchecked")
85-
public <T extends DataFormat> IndexingExecutionEngine<T> indexingEngine(MapperService mapperService, ShardPath shardPath, IndexSettings indexSettings) {
86-
return (IndexingExecutionEngine<T>) new ParquetExecutionEngine(settings, () -> ArrowSchemaBuilder.getSchema(mapperService), shardPath, indexSettings);
88+
public <T extends DataFormat> IndexingExecutionEngine<T> indexingEngine(EngineConfig engineConfig, MapperService mapperService, boolean isPrimary, ShardPath shardPath, IndexSettings indexSettings, FieldAssignments fieldAssignments) {
89+
ParquetExecutionEngine engine = new ParquetExecutionEngine(
90+
settings,
91+
isPrimary,
92+
() -> ArrowSchemaBuilder.getSchema(mapperService, isPrimary),
93+
shardPath,
94+
indexSettings
95+
);
96+
return (IndexingExecutionEngine<T>) engine;
8797
}
8898

8999
@Override
@@ -136,6 +146,15 @@ public BlobContainer createBlobContainer(BlobStore blobStore, BlobPath baseBlobP
136146
return blobStore.blobContainer(formatPath);
137147
}
138148

149+
@Override
150+
public void registerFieldSupport(FieldSupportRegistry registry) {
151+
DataFormat parquet = getDataFormat();
152+
for (Map.Entry<String, com.parquet.parquetdataformat.fields.ParquetField> entry :
153+
com.parquet.parquetdataformat.fields.ArrowFieldRegistry.getRegisteredFields().entrySet()) {
154+
registry.register(entry.getKey(), parquet, entry.getValue().getFieldCapabilities());
155+
}
156+
}
157+
139158
@Override
140159
public List<Setting<?>> getSettings() {
141160
return List.of(

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/ParquetDataFormat.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,14 @@ public void configureStore() {
5050

5151
@Override
5252
public boolean equals(Object obj) {
53-
return true;
53+
if (this == obj) return true;
54+
if (!(obj instanceof DataFormat)) return false;
55+
return name().equals(((DataFormat) obj).name());
5456
}
5557

5658
@Override
5759
public int hashCode() {
58-
return 0;
60+
return name().hashCode();
5961
}
6062

6163
@Override

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/ParquetExecutionEngine.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
import org.apache.logging.log4j.Logger;
1515
import org.opensearch.common.settings.Settings;
1616
import org.opensearch.index.IndexSettings;
17+
import com.parquet.parquetdataformat.fields.ArrowFieldRegistry;
1718
import org.opensearch.index.engine.exec.DataFormat;
19+
import org.opensearch.index.engine.exec.EngineRole;
1820
import org.opensearch.index.engine.exec.IndexingExecutionEngine;
1921
import org.opensearch.index.engine.exec.Merger;
2022
import org.opensearch.index.engine.exec.RefreshInput;
@@ -75,9 +77,11 @@ public class ParquetExecutionEngine implements IndexingExecutionEngine<ParquetDa
7577
private final ParquetMerger parquetMerger;
7678
private final ArrowBufferPool arrowBufferPool;
7779
private final IndexSettings indexSettings;
80+
private final boolean isPrimaryEngine;
7881

7982
public ParquetExecutionEngine(
8083
Settings settings,
84+
boolean isPrimaryEngine,
8185
Supplier<Schema> schema,
8286
ShardPath shardPath,
8387
IndexSettings indexSettings
@@ -87,7 +91,7 @@ public ParquetExecutionEngine(
8791
this.arrowBufferPool = new ArrowBufferPool(settings);
8892
this.indexSettings = indexSettings;
8993
this.parquetMerger = new ParquetMergeExecutor(CompactionStrategy.RECORD_BATCH, indexSettings.getIndex().getName());
90-
94+
this.isPrimaryEngine = isPrimaryEngine;
9195
// Push current settings to Rust store once on construction, then keep in sync on updates
9296
pushSettingsToRust(indexSettings);
9397

@@ -143,14 +147,15 @@ public void deleteFiles(Map<String, Collection<String>> filesToDelete) {
143147
}
144148

145149
@Override
146-
public List<String> supportedFieldTypes() {
147-
return List.of();
150+
public List<String> supportedFieldTypes(boolean isPrimaryEngine) {
151+
return new java.util.ArrayList<>(ArrowFieldRegistry.getRegisteredFieldNames());
148152
}
149153

150154
@Override
151155
public Writer<ParquetDocumentInput> createWriter(long writerGeneration) {
152156
String fileName = Path.of(shardPath.getDataPath().toString(), getDataFormat().name(), FILE_NAME_PREFIX + "_" + writerGeneration + FILE_NAME_EXT).toString();
153-
return new ParquetWriter(fileName, schema.get(), writerGeneration, arrowBufferPool, indexSettings);
157+
EngineRole role = isPrimaryEngine ? EngineRole.PRIMARY : EngineRole.SECONDARY;
158+
return new ParquetWriter(fileName, schema.get(), writerGeneration, arrowBufferPool, indexSettings, role);
154159
}
155160

156161
@Override

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/fields/ArrowFieldRegistry.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private static void registerCorePlugins() {
6262
// Register core data fields
6363
registerPlugin(new CoreDataFieldPlugin(), "CoreDataFields");
6464

65-
// REgister metadata fields
65+
// Register metadata fields
6666
registerPlugin(new MetadataFieldPlugin(), "MetadataFields");
6767
}
6868
/**
@@ -141,6 +141,13 @@ public static ParquetField getParquetField(String fieldType) {
141141
return FIELD_REGISTRY.get(fieldType);
142142
}
143143

144+
/**
145+
* Returns an unmodifiable view of all registered field mappings.
146+
*/
147+
public static Map<String, ParquetField> getRegisteredFields() {
148+
return Collections.unmodifiableMap(FIELD_REGISTRY);
149+
}
150+
144151
public static class RegistryStats {
145152
private final int totalFields;
146153
private final Set<String> allFieldTypes;

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/fields/ArrowSchemaBuilder.java

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ private ArrowSchemaBuilder() {
4646
* @throws IllegalArgumentException if mapperService is null
4747
* @throws IllegalStateException if no valid fields are found or if a field type is not supported
4848
*/
49-
public static Schema getSchema(final MapperService mapperService) {
49+
public static Schema getSchema(final MapperService mapperService, boolean isPrimary) {
5050
Objects.requireNonNull(mapperService, "MapperService cannot be null");
5151

52-
final List<Field> fields = extractFieldsFromMappers(mapperService);
52+
final List<Field> fields = extractFieldsFromMappers(mapperService, isPrimary);
5353

5454
if (fields.isEmpty()) {
5555
throw new IllegalStateException("No valid fields found in mapper service");
@@ -62,22 +62,26 @@ public static Schema getSchema(final MapperService mapperService) {
6262
* Extracts Arrow fields from the mapper service, filtering out metadata fields.
6363
*
6464
* @param mapperService the mapper service to extract fields from
65+
* @param isPrimary whether this is a primary engine context
6566
* @return a list of Arrow fields
6667
*/
67-
private static List<Field> extractFieldsFromMappers(final MapperService mapperService) {
68+
private static List<Field> extractFieldsFromMappers(final MapperService mapperService, boolean isPrimary) {
6869
final List<Field> fields = new ArrayList<>();
6970

7071
for (final Mapper mapper : mapperService.documentMapper().mappers()) {
7172
if (notSupportedMetadataField(mapper)) {
7273
continue;
7374
}
7475

75-
final Field arrowField = createArrowField(mapper);
76-
fields.add(arrowField);
76+
final Field arrowField = createArrowField(mapper, isPrimary);
77+
if (arrowField != null) {
78+
fields.add(arrowField);
79+
}
7780
}
7881

79-
fields.add(new Field(CompositeDataFormatWriter.ROW_ID, new LongParquetField().getFieldType(), null));
80-
fields.add(new Field(SeqNoFieldMapper.PRIMARY_TERM_NAME, new LongParquetField().getFieldType(), null));
82+
LongParquetField longField = new LongParquetField();
83+
fields.add(new Field(CompositeDataFormatWriter.ROW_ID, longField.getFieldType(), null));
84+
fields.add(new Field(SeqNoFieldMapper.PRIMARY_TERM_NAME, longField.getFieldType(), null));
8185

8286
return fields;
8387
}
@@ -98,20 +102,27 @@ private static boolean notSupportedMetadataField(final Mapper mapper) {
98102
}
99103

100104
/**
101-
* Creates an Arrow Field from an OpenSearch Mapper.
105+
* Creates an Arrow Field from an OpenSearch Mapper using the ArrowFieldRegistry.
106+
* For non-primary contexts, returns null if the field type has no eligible ParquetField,
107+
* allowing the caller to skip the field. For primary contexts, throws IllegalStateException
108+
* if no ParquetField is found.
102109
*
103110
* @param mapper the mapper to convert
104-
* @return a new Arrow Field
105-
* @throws IllegalStateException if the mapper type is not supported
111+
* @param isPrimary whether this is a primary engine context
112+
* @return a new Arrow Field, or null if the field is not eligible for the role
113+
* @throws IllegalStateException if the mapper type is not supported in primary context
106114
*/
107-
private static Field createArrowField(final Mapper mapper) {
115+
private static Field createArrowField(final Mapper mapper, boolean isPrimary) {
108116
final ParquetField parquetField = ArrowFieldRegistry.getParquetField(mapper.typeName());
109117

110118
if (parquetField == null) {
111-
throw new IllegalStateException(
112-
String.format("Unsupported field type '%s' for field '%s'",
113-
mapper.typeName(), mapper.name())
114-
);
119+
if (isPrimary) {
120+
throw new IllegalStateException(
121+
String.format("Unsupported field type '%s' for field '%s'",
122+
mapper.typeName(), mapper.name())
123+
);
124+
}
125+
return null;
115126
}
116127

117128
return new Field(mapper.name(), parquetField.getFieldType(), null);

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/fields/ParquetField.java

Lines changed: 17 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@
1111
import com.parquet.parquetdataformat.vsr.ManagedVSR;
1212
import org.apache.arrow.vector.types.pojo.ArrowType;
1313
import org.apache.arrow.vector.types.pojo.FieldType;
14+
import org.opensearch.index.engine.exec.FieldCapability;
1415
import org.opensearch.index.mapper.MappedFieldType;
1516

1617
import java.util.Objects;
18+
import java.util.Set;
1719

1820
/**
1921
* Abstract base class for all Parquet field implementations that handle the conversion
@@ -37,49 +39,30 @@ public abstract class ParquetField {
3739

3840
/**
3941
* Adds the parsed field value to the appropriate vector group within the managed VSR.
40-
* This method is responsible for the actual data conversion and storage in the
41-
* columnar format specific to each field type.
4242
*
43-
* <p>Implementations must handle null values appropriately and ensure type safety
44-
* when casting the parseValue to the expected type.</p>
45-
*
46-
* @param mappedFieldType the OpenSearch field type metadata containing field configuration
43+
* @param fieldType the per-field MappedFieldType carrying field name, type name, and capability flags
4744
* @param managedVSR the managed vector schema root for columnar data storage
4845
* @param parseValue the parsed field value to be stored, may be null
49-
* @throws IllegalArgumentException if any parameter is invalid for this field type
50-
* @throws ClassCastException if parseValue cannot be cast to the expected type
5146
*/
52-
protected abstract void addToGroup(MappedFieldType mappedFieldType, ManagedVSR managedVSR, Object parseValue);
47+
protected abstract void addToGroup(MappedFieldType fieldType, ManagedVSR managedVSR, Object parseValue);
5348

5449
/**
5550
* Creates and processes a field entry if the field type supports columnar storage.
56-
* This method serves as the main entry point for field processing and includes
57-
* validation logic to ensure only columnar fields are processed.
58-
*
59-
* <p>The method performs the following operations:
60-
* <ol>
61-
* <li>Validates input parameters</li>
62-
* <li>Checks if the field supports columnar storage</li>
63-
* <li>Delegates to {@link #addToGroup} for actual data processing</li>
64-
* </ol>
6551
*
66-
* @param mappedFieldType the OpenSearch field type metadata, must not be null
52+
* @param fieldType the per-field MappedFieldType carrying field name, type name, and capability flags, must not be null
6753
* @param managedVSR the managed vector schema root, must not be null
6854
* @param parseValue the parsed field value to be processed, may be null
69-
* @throws IllegalArgumentException if mappedFieldType or managedVSR is null
7055
*/
71-
public final void createField(final MappedFieldType mappedFieldType,
56+
public final void createField(final MappedFieldType fieldType,
7257
final ManagedVSR managedVSR,
7358
final Object parseValue) {
74-
Objects.requireNonNull(mappedFieldType, "MappedFieldType cannot be null");
59+
Objects.requireNonNull(fieldType, "MappedFieldType cannot be null");
7560
Objects.requireNonNull(managedVSR, "ManagedVSR cannot be null");
7661

77-
if (mappedFieldType.isColumnar()) {
78-
// TODO: support dynamic mapping update
79-
// for now ignore the field
80-
if (managedVSR.getVector(mappedFieldType.name()) != null) {
81-
addToGroup(mappedFieldType, managedVSR, parseValue);
82-
}
62+
// TODO: support dynamic mapping update
63+
// for now ignore the field
64+
if (managedVSR.getVector(fieldType.name()) != null) {
65+
addToGroup(fieldType, managedVSR, parseValue);
8366
}
8467
}
8568

@@ -109,6 +92,12 @@ public final void createField(final MappedFieldType mappedFieldType,
10992
*/
11093
public abstract FieldType getFieldType();
11194

95+
/**
96+
* Returns the set of capabilities this field supports.
97+
* The engine uses this to populate the FieldSupportRegistry.
98+
*/
99+
public abstract Set<FieldCapability> getFieldCapabilities();
100+
112101
/**
113102
* Provides a string representation of this ParquetField for debugging purposes.
114103
* The default implementation includes the class name and Arrow type information.

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/fields/core/data/BinaryParquetField.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,16 @@
88

99
package com.parquet.parquetdataformat.fields.core.data;
1010

11+
import org.opensearch.index.engine.exec.FieldCapability;
12+
import org.opensearch.index.mapper.MappedFieldType;
1113
import com.parquet.parquetdataformat.fields.ParquetField;
1214
import com.parquet.parquetdataformat.vsr.ManagedVSR;
1315
import org.apache.arrow.vector.VarBinaryVector;
1416
import org.apache.arrow.vector.types.pojo.ArrowType;
1517
import org.apache.arrow.vector.types.pojo.FieldType;
16-
import org.opensearch.index.mapper.MappedFieldType;
18+
19+
import java.util.EnumSet;
20+
import java.util.Set;
1721

1822
/**
1923
* Parquet field implementation for handling binary data types in OpenSearch documents.
@@ -55,4 +59,9 @@ public ArrowType getArrowType() {
5559
public FieldType getFieldType() {
5660
return FieldType.nullable(getArrowType());
5761
}
62+
63+
@Override
64+
public Set<FieldCapability> getFieldCapabilities() {
65+
return EnumSet.of(FieldCapability.DOC_VALUES, FieldCapability.STORE);
66+
}
5867
}

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/fields/core/data/BooleanParquetField.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,17 @@
88

99
package com.parquet.parquetdataformat.fields.core.data;
1010

11+
import org.opensearch.index.engine.exec.FieldCapability;
12+
import org.opensearch.index.mapper.MappedFieldType;
1113
import com.parquet.parquetdataformat.fields.ArrowFieldRegistry;
1214
import com.parquet.parquetdataformat.fields.ParquetField;
1315
import com.parquet.parquetdataformat.vsr.ManagedVSR;
1416
import org.apache.arrow.vector.BitVector;
1517
import org.apache.arrow.vector.types.pojo.ArrowType;
1618
import org.apache.arrow.vector.types.pojo.FieldType;
17-
import org.opensearch.index.mapper.MappedFieldType;
19+
20+
import java.util.EnumSet;
21+
import java.util.Set;
1822

1923
/**
2024
* Parquet field implementation for handling boolean data types in OpenSearch documents.
@@ -56,4 +60,9 @@ public ArrowType getArrowType() {
5660
public FieldType getFieldType() {
5761
return FieldType.nullable(getArrowType());
5862
}
63+
64+
@Override
65+
public Set<FieldCapability> getFieldCapabilities() {
66+
return EnumSet.of(FieldCapability.DOC_VALUES, FieldCapability.STORE);
67+
}
5968
}

0 commit comments

Comments
 (0)