Skip to content

Commit dc62810

Browse files
authored
[ML] Handle parsing ingest processors where the definition is not a object (elastic#113697)
1 parent b2a69b5 commit dc62810

File tree

3 files changed

+60
-20
lines changed

3 files changed

+60
-20
lines changed

docs/changelog/113697.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 113697
2+
summary: Handle parsing ingest processors where definition is not a object
3+
area: Machine Learning
4+
type: bug
5+
issues:
6+
- 113615

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/InferenceProcessorInfoExtractor.java

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,7 @@ public static Set<String> getModelIdsFromInferenceProcessors(IngestMetadata inge
8585
List<Map<String, Object>> processorConfigs = ConfigurationUtils.readList(null, null, configMap, PROCESSORS_KEY);
8686
for (Map<String, Object> processorConfigWithKey : processorConfigs) {
8787
for (Map.Entry<String, Object> entry : processorConfigWithKey.entrySet()) {
88-
addModelsAndPipelines(
89-
entry.getKey(),
90-
pipelineId,
91-
(Map<String, Object>) entry.getValue(),
92-
pam -> modelIds.add(pam.modelIdOrAlias()),
93-
0
94-
);
88+
addModelsAndPipelines(entry.getKey(), pipelineId, entry.getValue(), pam -> modelIds.add(pam.modelIdOrAlias()), 0);
9589
}
9690
}
9791
});
@@ -119,7 +113,7 @@ public static Map<String, Set<String>> pipelineIdsByResource(ClusterState state,
119113
List<Map<String, Object>> processorConfigs = ConfigurationUtils.readList(null, null, configMap, PROCESSORS_KEY);
120114
for (Map<String, Object> processorConfigWithKey : processorConfigs) {
121115
for (Map.Entry<String, Object> entry : processorConfigWithKey.entrySet()) {
122-
addModelsAndPipelines(entry.getKey(), pipelineId, (Map<String, Object>) entry.getValue(), pam -> {
116+
addModelsAndPipelines(entry.getKey(), pipelineId, entry.getValue(), pam -> {
123117
if (ids.contains(pam.modelIdOrAlias)) {
124118
pipelineIdsByModelIds.computeIfAbsent(pam.modelIdOrAlias, m -> new LinkedHashSet<>()).add(pipelineId);
125119
}
@@ -151,7 +145,7 @@ public static Set<String> pipelineIdsForResource(ClusterState state, Set<String>
151145
List<Map<String, Object>> processorConfigs = ConfigurationUtils.readList(null, null, configMap, PROCESSORS_KEY);
152146
for (Map<String, Object> processorConfigWithKey : processorConfigs) {
153147
for (Map.Entry<String, Object> entry : processorConfigWithKey.entrySet()) {
154-
addModelsAndPipelines(entry.getKey(), pipelineId, (Map<String, Object>) entry.getValue(), pam -> {
148+
addModelsAndPipelines(entry.getKey(), pipelineId, entry.getValue(), pam -> {
155149
if (ids.contains(pam.modelIdOrAlias)) {
156150
pipelineIds.add(pipelineId);
157151
}
@@ -166,7 +160,7 @@ public static Set<String> pipelineIdsForResource(ClusterState state, Set<String>
166160
private static void addModelsAndPipelines(
167161
String processorType,
168162
String pipelineId,
169-
Map<String, Object> processorDefinition,
163+
Object processorDefinition,
170164
Consumer<PipelineAndModel> handler,
171165
int level
172166
) {
@@ -178,14 +172,16 @@ private static void addModelsAndPipelines(
178172
return;
179173
}
180174
if (InferenceProcessorConstants.TYPE.equals(processorType)) {
181-
String modelId = (String) processorDefinition.get(MODEL_ID_RESULTS_FIELD);
182-
if (modelId != null) {
183-
handler.accept(new PipelineAndModel(pipelineId, modelId));
175+
if (processorDefinition instanceof Map<?, ?> definitionMap) {
176+
String modelId = (String) definitionMap.get(MODEL_ID_RESULTS_FIELD);
177+
if (modelId != null) {
178+
handler.accept(new PipelineAndModel(pipelineId, modelId));
179+
}
184180
}
185181
return;
186182
}
187-
if (FOREACH_PROCESSOR_NAME.equals(processorType)) {
188-
Map<String, Object> innerProcessor = (Map<String, Object>) processorDefinition.get("processor");
183+
if (FOREACH_PROCESSOR_NAME.equals(processorType) && processorDefinition instanceof Map<?, ?> definitionMap) {
184+
Map<String, Object> innerProcessor = (Map<String, Object>) definitionMap.get("processor");
189185
if (innerProcessor != null) {
190186
// a foreach processor should only have a SINGLE nested processor. Iteration is for simplicity's sake.
191187
for (Map.Entry<String, Object> innerProcessorWithName : innerProcessor.entrySet()) {
@@ -200,18 +196,16 @@ private static void addModelsAndPipelines(
200196
}
201197
return;
202198
}
203-
if (processorDefinition.containsKey(Pipeline.ON_FAILURE_KEY)) {
199+
if (processorDefinition instanceof Map<?, ?> definitionMap && definitionMap.containsKey(Pipeline.ON_FAILURE_KEY)) {
204200
List<Map<String, Object>> onFailureConfigs = ConfigurationUtils.readList(
205201
null,
206202
null,
207-
processorDefinition,
203+
(Map<String, Object>) definitionMap,
208204
Pipeline.ON_FAILURE_KEY
209205
);
210206
onFailureConfigs.stream()
211207
.flatMap(map -> map.entrySet().stream())
212-
.forEach(
213-
entry -> addModelsAndPipelines(entry.getKey(), pipelineId, (Map<String, Object>) entry.getValue(), handler, level + 1)
214-
);
208+
.forEach(entry -> addModelsAndPipelines(entry.getKey(), pipelineId, entry.getValue(), handler, level + 1));
215209
}
216210
}
217211

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/InferenceProcessorInfoExtractorTests.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Collections;
3131
import java.util.HashMap;
3232
import java.util.HashSet;
33+
import java.util.List;
3334
import java.util.Map;
3435
import java.util.Set;
3536

@@ -140,6 +141,18 @@ public void testNumInferenceProcessorsRecursivelyDefined() throws IOException {
140141
assertThat(InferenceProcessorInfoExtractor.countInferenceProcessors(cs), equalTo(3));
141142
}
142143

144+
public void testScriptProcessorStringConfig() throws IOException {
145+
Set<String> expectedModelIds = Set.of("foo");
146+
147+
ClusterState clusterState = buildClusterStateWithPipelineConfigurations(
148+
Map.of("processor_does_not_have_a_definition_object", newConfigurationWithScriptProcessor("foo"))
149+
);
150+
IngestMetadata ingestMetadata = clusterState.metadata().custom(IngestMetadata.TYPE);
151+
Set<String> actualModelIds = InferenceProcessorInfoExtractor.getModelIdsFromInferenceProcessors(ingestMetadata);
152+
153+
assertThat(actualModelIds, equalTo(expectedModelIds));
154+
}
155+
143156
private static PipelineConfiguration newConfigurationWithOutInferenceProcessor(int i) throws IOException {
144157
try (
145158
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
@@ -173,6 +186,12 @@ private static ClusterState buildClusterStateWithModelReferences(int numPipeline
173186
for (int i = 0; i < numPipelinesWithoutModel; i++) {
174187
configurations.put("pipeline_without_model_" + i, newConfigurationWithOutInferenceProcessor(i));
175188
}
189+
190+
return buildClusterStateWithPipelineConfigurations(configurations);
191+
}
192+
193+
private static ClusterState buildClusterStateWithPipelineConfigurations(Map<String, PipelineConfiguration> configurations)
194+
throws IOException {
176195
IngestMetadata ingestMetadata = new IngestMetadata(configurations);
177196

178197
return ClusterState.builder(new ClusterName("_name"))
@@ -206,6 +225,24 @@ private static PipelineConfiguration newConfigurationWithForeachProcessorProcess
206225
}
207226
}
208227

228+
private static PipelineConfiguration newConfigurationWithScriptProcessor(String modelId) throws IOException {
229+
try (
230+
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
231+
.map(
232+
Collections.singletonMap(
233+
"processors",
234+
List.of(forScriptProcessorWithStringConfig(), forEachProcessorWithInference(modelId))
235+
)
236+
)
237+
) {
238+
return new PipelineConfiguration(
239+
"pipeline_with_script_and_model_" + modelId,
240+
BytesReference.bytes(xContentBuilder),
241+
XContentType.JSON
242+
);
243+
}
244+
}
245+
209246
private static Map<String, Object> forEachProcessorWithInference(String modelId) {
210247
return Collections.singletonMap("foreach", new HashMap<>() {
211248
{
@@ -229,4 +266,7 @@ private static Map<String, Object> inferenceProcessorForModel(String modelId) {
229266
});
230267
}
231268

269+
private static Map<String, Object> forScriptProcessorWithStringConfig() {
270+
return Collections.singletonMap("script", "ctx.test=2;");
271+
}
232272
}

0 commit comments

Comments
 (0)