Skip to content

Commit af9c843

Browse files
authored
Add mapper_settings support and field_mapping mapper type for pull-based ingestion (opensearch-project#20722)
* Add mapper_settings support and field_mapping mapper type for pull-based ingestion Signed-off-by: Rishab Nahata <rishab.nahata@uber.com> * Add changelog Signed-off-by: Rishab Nahata <rishab.nahata@uber.com> * Add setting validation Signed-off-by: Rishab Nahata <rishab.nahata@uber.com> * Fix spotless check Signed-off-by: Rishab Nahata <rishab.nahata@uber.com> * Add mapper class Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com> --------- Signed-off-by: Rishab Nahata <rishab.nahata@uber.com> Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
1 parent 5c183b8 commit af9c843

File tree

9 files changed

+271
-7
lines changed

9 files changed

+271
-7
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1313
- Support TLS cert hot-reload for Arrow Flight transport ([#20700](https://github.com/opensearch-project/OpenSearch/pull/20700))
1414
- [Workload Management] Enhance Scroll API support for autotagging ([#20151](https://github.com/opensearch-project/OpenSearch/pull/20151))
1515
- Add indices to search request slowlog ([#20588](https://github.com/opensearch-project/OpenSearch/pull/20588))
16+
- Add mapper_settings support and field_mapping mapper type for pull-based ingestion([#20722](https://github.com/opensearch-project/OpenSearch/pull/20722))
1617

1718
### Changed
1819
- Move Randomness from server to libs/common ([#20570](https://github.com/opensearch-project/OpenSearch/pull/20570))

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1003,6 +1003,18 @@ public Iterator<Setting<?>> settings() {
10031003
}, Property.IndexScope, Property.Dynamic)
10041004
);
10051005

1006+
/**
1007+
* Prefix setting for mapper-specific options. These settings are passed to the configured mapper type.
1008+
* For example, the {@code field_mapping} mapper type uses {@code id_field}, {@code version_field},
1009+
* and {@code op_type_field} settings.
1010+
*/
1011+
public static final Setting.AffixSetting<Object> INGESTION_SOURCE_MAPPER_SETTINGS = Setting.prefixKeySetting(
1012+
"index.ingestion_source.mapper_settings.",
1013+
key -> new Setting<>(key, "", (value) -> {
1014+
return value;
1015+
}, Property.IndexScope, Property.Final)
1016+
);
1017+
10061018
/**
10071019
* an internal index format description, allowing us to find out if this index is upgraded or needs upgrading
10081020
*/
@@ -1261,6 +1273,7 @@ public IngestionSource getIngestionSource() {
12611273
final boolean allActiveIngestionEnabled = INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING.get(settings);
12621274
final TimeValue pointerBasedLagUpdateInterval = INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING.get(settings);
12631275
final IngestionMessageMapper.MapperType mapperType = INGESTION_SOURCE_MAPPER_TYPE_SETTING.get(settings);
1276+
final Map<String, Object> mapperSettings = INGESTION_SOURCE_MAPPER_SETTINGS.getAsMap(settings);
12641277

12651278
return new IngestionSource.Builder(ingestionSourceType).setParams(ingestionSourceParams)
12661279
.setPointerInitReset(pointerInitReset)
@@ -1272,6 +1285,7 @@ public IngestionSource getIngestionSource() {
12721285
.setAllActiveIngestion(allActiveIngestionEnabled)
12731286
.setPointerBasedLagUpdateInterval(pointerBasedLagUpdateInterval)
12741287
.setMapperType(mapperType)
1288+
.setMapperSettings(mapperSettings)
12751289
.build();
12761290
}
12771291
return null;

server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.opensearch.indices.pollingingest.StreamPoller;
1616
import org.opensearch.indices.pollingingest.mappers.IngestionMessageMapper;
1717

18+
import java.util.Collections;
1819
import java.util.HashMap;
1920
import java.util.Map;
2021
import java.util.Objects;
@@ -43,6 +44,7 @@ public class IngestionSource {
4344
private final boolean allActiveIngestion;
4445
private final TimeValue pointerBasedLagUpdateInterval;
4546
private final IngestionMessageMapper.MapperType mapperType;
47+
private final Map<String, Object> mapperSettings;
4648

4749
private IngestionSource(
4850
String type,
@@ -55,7 +57,8 @@ private IngestionSource(
5557
int blockingQueueSize,
5658
boolean allActiveIngestion,
5759
TimeValue pointerBasedLagUpdateInterval,
58-
IngestionMessageMapper.MapperType mapperType
60+
IngestionMessageMapper.MapperType mapperType,
61+
Map<String, Object> mapperSettings
5962
) {
6063
this.type = type;
6164
this.pointerInitReset = pointerInitReset;
@@ -68,6 +71,7 @@ private IngestionSource(
6871
this.allActiveIngestion = allActiveIngestion;
6972
this.pointerBasedLagUpdateInterval = pointerBasedLagUpdateInterval;
7073
this.mapperType = mapperType;
74+
this.mapperSettings = mapperSettings != null ? Collections.unmodifiableMap(mapperSettings) : Collections.emptyMap();
7175
}
7276

7377
public String getType() {
@@ -114,6 +118,10 @@ public IngestionMessageMapper.MapperType getMapperType() {
114118
return mapperType;
115119
}
116120

121+
public Map<String, Object> getMapperSettings() {
122+
return mapperSettings;
123+
}
124+
117125
@Override
118126
public boolean equals(Object o) {
119127
if (this == o) return true;
@@ -129,7 +137,8 @@ public boolean equals(Object o) {
129137
&& Objects.equals(blockingQueueSize, ingestionSource.blockingQueueSize)
130138
&& Objects.equals(allActiveIngestion, ingestionSource.allActiveIngestion)
131139
&& Objects.equals(pointerBasedLagUpdateInterval, ingestionSource.pointerBasedLagUpdateInterval)
132-
&& Objects.equals(mapperType, ingestionSource.mapperType);
140+
&& Objects.equals(mapperType, ingestionSource.mapperType)
141+
&& Objects.equals(mapperSettings, ingestionSource.mapperSettings);
133142
}
134143

135144
@Override
@@ -145,7 +154,8 @@ public int hashCode() {
145154
blockingQueueSize,
146155
allActiveIngestion,
147156
pointerBasedLagUpdateInterval,
148-
mapperType
157+
mapperType,
158+
mapperSettings
149159
);
150160
}
151161

@@ -178,6 +188,8 @@ public String toString() {
178188
+ ", mapperType='"
179189
+ mapperType
180190
+ '\''
191+
+ ", mapperSettings="
192+
+ mapperSettings
181193
+ '}';
182194
}
183195

@@ -240,6 +252,7 @@ public static class Builder {
240252
Settings.EMPTY
241253
);
242254
private IngestionMessageMapper.MapperType mapperType = INGESTION_SOURCE_MAPPER_TYPE_SETTING.getDefault(Settings.EMPTY);
255+
private Map<String, Object> mapperSettings = new HashMap<>();
243256

244257
public Builder(String type) {
245258
this.type = type;
@@ -255,6 +268,7 @@ public Builder(IngestionSource ingestionSource) {
255268
this.allActiveIngestion = ingestionSource.allActiveIngestion;
256269
this.pointerBasedLagUpdateInterval = ingestionSource.pointerBasedLagUpdateInterval;
257270
this.mapperType = ingestionSource.mapperType;
271+
this.mapperSettings = new HashMap<>(ingestionSource.mapperSettings);
258272
}
259273

260274
public Builder setPointerInitReset(PointerInitReset pointerInitReset) {
@@ -312,6 +326,11 @@ public Builder setMapperType(IngestionMessageMapper.MapperType mapperType) {
312326
return this;
313327
}
314328

329+
public Builder setMapperSettings(Map<String, Object> mapperSettings) {
330+
this.mapperSettings = mapperSettings;
331+
return this;
332+
}
333+
315334
public IngestionSource build() {
316335
return new IngestionSource(
317336
type,
@@ -324,7 +343,8 @@ public IngestionSource build() {
324343
blockingQueueSize,
325344
allActiveIngestion,
326345
pointerBasedLagUpdateInterval,
327-
mapperType
346+
mapperType,
347+
mapperSettings
328348
);
329349
}
330350

server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@
107107
import org.opensearch.indices.RemoteStoreSettings;
108108
import org.opensearch.indices.ShardLimitValidator;
109109
import org.opensearch.indices.SystemIndices;
110+
import org.opensearch.indices.pollingingest.mappers.FieldMappingIngestionMessageMapper;
111+
import org.opensearch.indices.pollingingest.mappers.IngestionMessageMapper;
110112
import org.opensearch.indices.replication.common.ReplicationType;
111113
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
112114
import org.opensearch.node.remotestore.RemoteStoreNodeService;
@@ -1269,6 +1271,54 @@ private static void validateSearchOnlyReplicasSettings(Settings indexSettings) {
12691271
}
12701272
}
12711273

1274+
/**
1275+
* Validates ingestion source settings for version compatibility and mapper settings correctness.
1276+
* In a mixed cluster, older nodes may not recognize newer mapper types (e.g., field_mapping),
1277+
* which would cause failures when those nodes try to initialize the ingestion engine.
1278+
* Also validates that mapper_settings keys are recognized for the configured mapper_type.
1279+
*/
1280+
static void validateIngestionSourceSettings(Settings settings, ClusterState state) {
1281+
if (IndexMetadata.INGESTION_SOURCE_MAPPER_TYPE_SETTING.exists(settings) == false) {
1282+
return;
1283+
}
1284+
1285+
IngestionMessageMapper.MapperType mapperType = IndexMetadata.INGESTION_SOURCE_MAPPER_TYPE_SETTING.get(settings);
1286+
Map<String, Object> mapperSettings = IndexMetadata.INGESTION_SOURCE_MAPPER_SETTINGS.getAsMap(settings);
1287+
1288+
switch (mapperType) {
1289+
case FIELD_MAPPING:
1290+
// Version check for mixed cluster compatibility
1291+
Version minNodeVersion = state.nodes().getMinNodeVersion();
1292+
if (minNodeVersion.before(Version.V_3_6_0)) {
1293+
throw new IllegalArgumentException(
1294+
"mapper_type [field_mapping] requires all nodes in the cluster to be on version ["
1295+
+ Version.V_3_6_0
1296+
+ "] or later, but the minimum node version is ["
1297+
+ minNodeVersion
1298+
+ "]"
1299+
);
1300+
}
1301+
// Validate mapper_settings keys
1302+
for (String key : mapperSettings.keySet()) {
1303+
if (FieldMappingIngestionMessageMapper.VALID_SETTINGS.contains(key) == false) {
1304+
throw new IllegalArgumentException(
1305+
"unknown mapper_settings key ["
1306+
+ key
1307+
+ "] for mapper_type [field_mapping]. Valid keys are: "
1308+
+ FieldMappingIngestionMessageMapper.VALID_SETTINGS
1309+
);
1310+
}
1311+
}
1312+
break;
1313+
default:
1314+
// default and raw_payload mappers don't use mapper_settings
1315+
if (mapperSettings.isEmpty() == false) {
1316+
throw new IllegalArgumentException("mapper_settings are not supported for mapper_type [" + mapperType.getName() + "]");
1317+
}
1318+
break;
1319+
}
1320+
}
1321+
12721322
/**
12731323
* Updates index settings to set replication strategy by default based on cluster level settings or remote store
12741324
* node attributes
@@ -1631,6 +1681,7 @@ private void validate(CreateIndexClusterStateUpdateRequest request, ClusterState
16311681
validateIndexName(request.index(), state);
16321682
validateIndexSettings(request.index(), request.settings(), forbidPrivateIndexSettings);
16331683
validateContext(request);
1684+
validateIngestionSourceSettings(request.settings(), state);
16341685
}
16351686

16361687
public void validateIndexSettings(String indexName, final Settings settings, final boolean forbidPrivateIndexSettings)

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
290290
IndexMetadata.INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING,
291291
IndexMetadata.INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING,
292292
IndexMetadata.INGESTION_SOURCE_MAPPER_TYPE_SETTING,
293+
IndexMetadata.INGESTION_SOURCE_MAPPER_SETTINGS,
293294

294295
// Settings for search replica
295296
IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING,
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices.pollingingest.mappers;
10+
11+
import org.opensearch.index.IngestionShardPointer;
12+
import org.opensearch.index.Message;
13+
import org.opensearch.indices.pollingingest.ShardUpdateMessage;
14+
15+
import java.util.Set;
16+
17+
/**
18+
* Mapper implementation that extracts document metadata ({@code _id}, {@code _version}, {@code _op_type})
19+
* from configurable top-level fields in the raw message payload. The remaining fields become the document
20+
* {@code _source}.
21+
*
22+
* <p>Mapper settings:
23+
* <ul>
24+
* <li>{@code id_field} — source field to use as document {@code _id}. If absent, ID is auto-generated.</li>
25+
* <li>{@code version_field} — source field to use as document {@code _version} with external versioning.</li>
26+
* <li>{@code op_type_field} — source field (boolean) to determine operation type: {@code true} → delete,
27+
* {@code false} → index.</li>
28+
* </ul>
29+
*/
30+
public class FieldMappingIngestionMessageMapper implements IngestionMessageMapper {
31+
32+
/** Mapper setting key: source field to use as document _id */
33+
public static final String ID_FIELD = "id_field";
34+
/** Mapper setting key: source field to use as document _version */
35+
public static final String VERSION_FIELD = "version_field";
36+
/** Mapper setting key: source field to determine operation type (index vs delete) */
37+
public static final String OP_TYPE_FIELD = "op_type_field";
38+
39+
/** Valid mapper_settings keys for this mapper type */
40+
public static final Set<String> VALID_SETTINGS = Set.of(ID_FIELD, VERSION_FIELD, OP_TYPE_FIELD);
41+
42+
@Override
43+
public ShardUpdateMessage mapAndProcess(IngestionShardPointer pointer, Message message) throws IllegalArgumentException {
44+
// TODO: pending implementation
45+
throw new UnsupportedOperationException("FieldMappingIngestionMessageMapper is not yet implemented");
46+
}
47+
}

server/src/main/java/org/opensearch/indices/pollingingest/mappers/IngestionMessageMapper.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public interface IngestionMessageMapper {
4040
@ExperimentalApi
4141
enum MapperType {
4242
DEFAULT("default"),
43-
RAW_PAYLOAD("raw_payload");
43+
RAW_PAYLOAD("raw_payload"),
44+
FIELD_MAPPING("field_mapping");
4445

4546
private final String name;
4647

@@ -59,7 +60,11 @@ public static MapperType fromString(String value) {
5960
}
6061
}
6162
throw new IllegalArgumentException(
62-
String.format(Locale.ROOT, "Unknown ingestion mapper type: %s. Valid values are: default, raw_payload", value)
63+
String.format(
64+
Locale.ROOT,
65+
"Unknown ingestion mapper type: %s. Valid values are: default, raw_payload, field_mapping",
66+
value
67+
)
6368
);
6469
}
6570
}

server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public void testToString() {
108108
.setErrorStrategy(DROP)
109109
.build();
110110
String expected =
111-
"IngestionSource{type='type',pointer_init_reset='PointerInitReset{type='RESET_BY_OFFSET', value=1000}',error_strategy='DROP', params={key=value}, maxPollSize=1000, pollTimeout=1000, numProcessorThreads=1, blockingQueueSize=100, allActiveIngestion=false, pointerBasedLagUpdateInterval=10s, mapperType='DEFAULT'}";
111+
"IngestionSource{type='type',pointer_init_reset='PointerInitReset{type='RESET_BY_OFFSET', value=1000}',error_strategy='DROP', params={key=value}, maxPollSize=1000, pollTimeout=1000, numProcessorThreads=1, blockingQueueSize=100, allActiveIngestion=false, pointerBasedLagUpdateInterval=10s, mapperType='DEFAULT', mapperSettings={}}";
112112
assertEquals(expected, source.toString());
113113
}
114114

@@ -137,4 +137,44 @@ public void testAllActiveIngestionConstructorAndGetter() {
137137
IngestionSource ingestionSourceClone = new IngestionSource.Builder(sourceEnabled).build();
138138
assertTrue(ingestionSourceClone.isAllActiveIngestionEnabled());
139139
}
140+
141+
public void testMapperSettings() {
142+
Map<String, Object> params = new HashMap<>();
143+
params.put("key", "value");
144+
Map<String, Object> mapperSettings = new HashMap<>();
145+
mapperSettings.put("id_field", "user_id");
146+
mapperSettings.put("version_field", "timestamp");
147+
mapperSettings.put("op_type_field", "is_deleted");
148+
149+
IngestionSource source = new IngestionSource.Builder("type").setParams(params)
150+
.setPointerInitReset(pointerInitReset)
151+
.setErrorStrategy(DROP)
152+
.setMapperSettings(mapperSettings)
153+
.build();
154+
155+
assertEquals(mapperSettings, source.getMapperSettings());
156+
assertEquals("user_id", source.getMapperSettings().get("id_field"));
157+
assertEquals("timestamp", source.getMapperSettings().get("version_field"));
158+
assertEquals("is_deleted", source.getMapperSettings().get("op_type_field"));
159+
160+
// Test copy constructor preserves mapper settings
161+
IngestionSource copy = new IngestionSource.Builder(source).build();
162+
assertEquals(mapperSettings, copy.getMapperSettings());
163+
164+
// Test equals with mapper settings
165+
IngestionSource source2 = new IngestionSource.Builder("type").setParams(params)
166+
.setPointerInitReset(pointerInitReset)
167+
.setErrorStrategy(DROP)
168+
.setMapperSettings(new HashMap<>(mapperSettings))
169+
.build();
170+
assertEquals(source, source2);
171+
assertEquals(source.hashCode(), source2.hashCode());
172+
173+
// Test empty mapper settings by default
174+
IngestionSource sourceNoMapperSettings = new IngestionSource.Builder("type").setParams(params)
175+
.setPointerInitReset(pointerInitReset)
176+
.setErrorStrategy(DROP)
177+
.build();
178+
assertTrue(sourceNoMapperSettings.getMapperSettings().isEmpty());
179+
}
140180
}

0 commit comments

Comments
 (0)