Skip to content

Commit eb755dd

Browse files
jbaieramridula-s109
authored andcommitted
[Streams] Add new ingest pipeline field access flag (elastic#129096)
This PR introduces a new flag to ingest pipeline configurations which will be used to control how fields are accessed from within that pipeline.
1 parent 1363afe commit eb755dd

File tree

15 files changed

+354
-28
lines changed

15 files changed

+354
-28
lines changed

server/src/main/java/module-info.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,8 @@
431431
org.elasticsearch.search.SearchFeatures,
432432
org.elasticsearch.script.ScriptFeatures,
433433
org.elasticsearch.search.retriever.RetrieversFeatures,
434-
org.elasticsearch.action.admin.cluster.stats.ClusterStatsFeatures;
434+
org.elasticsearch.action.admin.cluster.stats.ClusterStatsFeatures,
435+
org.elasticsearch.ingest.IngestFeatures;
435436

436437
uses org.elasticsearch.plugins.internal.SettingsExtension;
437438
uses RestExtension;

server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ static void executeDocument(
4848
pipeline.getVersion(),
4949
pipeline.getMetadata(),
5050
verbosePipelineProcessor,
51+
pipeline.getFieldAccessPattern(),
5152
pipeline.getDeprecated()
5253
);
5354
ingestDocument.executePipeline(verbosePipeline, (result, e) -> {

server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.common.xcontent.XContentHelper;
2222
import org.elasticsearch.core.RestApiVersion;
2323
import org.elasticsearch.core.UpdateForV10;
24+
import org.elasticsearch.features.NodeFeature;
2425
import org.elasticsearch.index.VersionType;
2526
import org.elasticsearch.ingest.ConfigurationUtils;
2627
import org.elasticsearch.ingest.IngestDocument;
@@ -38,6 +39,7 @@
3839
import java.util.List;
3940
import java.util.Map;
4041
import java.util.Objects;
42+
import java.util.function.Predicate;
4143

4244
public class SimulatePipelineRequest extends LegacyActionRequest implements ToXContentObject {
4345
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(SimulatePipelineRequest.class);
@@ -154,15 +156,17 @@ static Parsed parse(
154156
Map<String, Object> config,
155157
boolean verbose,
156158
IngestService ingestService,
157-
RestApiVersion restApiVersion
159+
RestApiVersion restApiVersion,
160+
Predicate<NodeFeature> hasFeature
158161
) throws Exception {
159162
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);
160163
Pipeline pipeline = Pipeline.create(
161164
SIMULATED_PIPELINE_ID,
162165
pipelineConfig,
163166
ingestService.getProcessorFactories(),
164167
ingestService.getScriptService(),
165-
projectId
168+
projectId,
169+
hasFeature
166170
);
167171
List<IngestDocument> ingestDocumentList = parseDocs(config, restApiVersion);
168172
return new Parsed(pipeline, ingestDocumentList, verbose);

server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
import org.elasticsearch.cluster.node.DiscoveryNode;
1919
import org.elasticsearch.cluster.node.DiscoveryNodes;
2020
import org.elasticsearch.cluster.project.ProjectResolver;
21+
import org.elasticsearch.cluster.service.ClusterService;
2122
import org.elasticsearch.common.Randomness;
2223
import org.elasticsearch.common.settings.Setting;
2324
import org.elasticsearch.common.util.concurrent.EsExecutors;
2425
import org.elasticsearch.common.xcontent.XContentHelper;
2526
import org.elasticsearch.core.TimeValue;
27+
import org.elasticsearch.features.FeatureService;
2628
import org.elasticsearch.ingest.IngestService;
2729
import org.elasticsearch.injection.guice.Inject;
2830
import org.elasticsearch.tasks.Task;
@@ -51,6 +53,8 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
5153
private final SimulateExecutionService executionService;
5254
private final TransportService transportService;
5355
private final ProjectResolver projectResolver;
56+
private final ClusterService clusterService;
57+
private final FeatureService featureService;
5458
private volatile TimeValue ingestNodeTransportActionTimeout;
5559
// ThreadLocal because our unit testing framework does not like sharing Randoms across threads
5660
private final ThreadLocal<Random> random = ThreadLocal.withInitial(Randomness::get);
@@ -61,7 +65,9 @@ public SimulatePipelineTransportAction(
6165
TransportService transportService,
6266
ActionFilters actionFilters,
6367
IngestService ingestService,
64-
ProjectResolver projectResolver
68+
ProjectResolver projectResolver,
69+
ClusterService clusterService,
70+
FeatureService featureService
6571
) {
6672
super(
6773
SimulatePipelineAction.NAME,
@@ -74,6 +80,8 @@ public SimulatePipelineTransportAction(
7480
this.executionService = new SimulateExecutionService(threadPool);
7581
this.transportService = transportService;
7682
this.projectResolver = projectResolver;
83+
this.clusterService = clusterService;
84+
this.featureService = featureService;
7785
this.ingestNodeTransportActionTimeout = INGEST_NODE_TRANSPORT_ACTION_TIMEOUT.get(ingestService.getClusterService().getSettings());
7886
ingestService.getClusterService()
7987
.getClusterSettings()
@@ -117,7 +125,8 @@ protected void doExecute(Task task, SimulatePipelineRequest request, ActionListe
117125
source,
118126
request.isVerbose(),
119127
ingestService,
120-
request.getRestApiVersion()
128+
request.getRestApiVersion(),
129+
(feature) -> featureService.clusterHasFeature(clusterService.state(), feature)
121130
);
122131
}
123132
executionService.execute(simulateRequest, listener);
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.ingest;
11+
12+
import org.elasticsearch.cluster.metadata.DataStream;
13+
import org.elasticsearch.features.FeatureSpecification;
14+
import org.elasticsearch.features.NodeFeature;
15+
16+
import java.util.Set;
17+
18+
public class IngestFeatures implements FeatureSpecification {
19+
@Override
20+
public Set<NodeFeature> getFeatures() {
21+
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
22+
return Set.of(IngestService.FIELD_ACCESS_PATTERN);
23+
} else {
24+
return Set.of();
25+
}
26+
}
27+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.ingest;
11+
12+
import java.util.Map;
13+
14+
public enum IngestPipelineFieldAccessPattern {
15+
/**
16+
* Field names will be split on the `.` character into their contingent parts. Resolution will strictly check
17+
* for nested objects following the field path.
18+
*/
19+
CLASSIC("classic"),
20+
/**
21+
* Field names will be split on the `.` character into their contingent parts. Resolution will flexibly check
22+
* for nested objects following the field path. If nested objects are not found for a key, the access pattern
23+
* will fall back to joining subsequent path elements together until it finds the next object that matches the
24+
* concatenated path. Allows for simple resolution of dotted field names.
25+
*/
26+
FLEXIBLE("flexible");
27+
28+
private final String key;
29+
30+
IngestPipelineFieldAccessPattern(String key) {
31+
this.key = key;
32+
}
33+
34+
public String getKey() {
35+
return key;
36+
}
37+
38+
private static final Map<String, IngestPipelineFieldAccessPattern> NAME_REGISTRY = Map.of(CLASSIC.key, CLASSIC, FLEXIBLE.key, FLEXIBLE);
39+
40+
public static boolean isValidAccessPattern(String accessPatternName) {
41+
return NAME_REGISTRY.containsKey(accessPatternName);
42+
}
43+
44+
public static IngestPipelineFieldAccessPattern getAccessPattern(String accessPatternName) {
45+
IngestPipelineFieldAccessPattern accessPattern = NAME_REGISTRY.get(accessPatternName);
46+
if (accessPattern == null) {
47+
throw new IllegalArgumentException("Invalid ingest pipeline access pattern name [" + accessPatternName + "] given");
48+
}
49+
return accessPattern;
50+
}
51+
}

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.elasticsearch.core.UpdateForV10;
6767
import org.elasticsearch.env.Environment;
6868
import org.elasticsearch.features.FeatureService;
69+
import org.elasticsearch.features.NodeFeature;
6970
import org.elasticsearch.gateway.GatewayService;
7071
import org.elasticsearch.grok.MatcherWatchdog;
7172
import org.elasticsearch.index.IndexSettings;
@@ -119,6 +120,25 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
119120
private static final Logger logger = LogManager.getLogger(IngestService.class);
120121
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(IngestService.class);
121122

123+
public static final NodeFeature FIELD_ACCESS_PATTERN = new NodeFeature("ingest.field_access_pattern", true);
124+
125+
/**
126+
* Checks the locally supported node features without relying on cluster state or feature service.
127+
* This is primarily to support the Logstash elastic_integration plugin which uses the IngestService
128+
* internally and thus would not have access to cluster service or feature services. NodeFeatures that
129+
* are accepted here should be currently and generally available in Elasticsearch.
130+
* @param nodeFeature The node feature to check
131+
* @return true if the node feature can be supported in the local library code, false if it is not supported
132+
*/
133+
public static boolean locallySupportedIngestFeature(NodeFeature nodeFeature) {
134+
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
135+
// logs_stream feature flag guard
136+
return IngestService.FIELD_ACCESS_PATTERN.equals(nodeFeature);
137+
}
138+
// Default to unsupported if not contained here
139+
return false;
140+
}
141+
122142
private final MasterServiceTaskQueue<PipelineClusterStateUpdateTask> taskQueue;
123143
private final ClusterService clusterService;
124144
private final ScriptService scriptService;
@@ -376,6 +396,10 @@ public ProjectResolver getProjectResolver() {
376396
return projectResolver;
377397
}
378398

399+
public FeatureService getFeatureService() {
400+
return featureService;
401+
}
402+
379403
/**
380404
* Deletes the pipeline specified by id in the request.
381405
*/
@@ -754,7 +778,14 @@ void validatePipeline(
754778
deprecationLogger.critical(DeprecationCategory.API, "pipeline_name_special_chars", e.getMessage());
755779
}
756780

757-
Pipeline pipeline = Pipeline.create(pipelineId, pipelineConfig, processorFactories, scriptService, projectId);
781+
Pipeline pipeline = Pipeline.create(
782+
pipelineId,
783+
pipelineConfig,
784+
processorFactories,
785+
scriptService,
786+
projectId,
787+
(n) -> featureService.clusterHasFeature(state, n)
788+
);
758789
List<Exception> exceptions = new ArrayList<>();
759790
for (Processor processor : pipeline.flattenAllProcessors()) {
760791

@@ -1428,7 +1459,8 @@ synchronized void innerUpdatePipelines(ProjectId projectId, IngestMetadata newIn
14281459
newConfiguration.getConfig(false),
14291460
processorFactories,
14301461
scriptService,
1431-
projectId
1462+
projectId,
1463+
(nodeFeature) -> featureService.clusterHasFeature(clusterService.state(), nodeFeature)
14321464
);
14331465
newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline));
14341466

@@ -1557,7 +1589,14 @@ public <P extends Processor> Collection<String> getPipelineWithProcessorType(
15571589
public synchronized void reloadPipeline(ProjectId projectId, String id) throws Exception {
15581590
var originalPipelines = this.pipelines.getOrDefault(projectId, ImmutableOpenMap.of());
15591591
PipelineHolder holder = originalPipelines.get(id);
1560-
Pipeline updatedPipeline = Pipeline.create(id, holder.configuration.getConfig(false), processorFactories, scriptService, projectId);
1592+
Pipeline updatedPipeline = Pipeline.create(
1593+
id,
1594+
holder.configuration.getConfig(false),
1595+
processorFactories,
1596+
scriptService,
1597+
projectId,
1598+
(nodeFeature) -> featureService.clusterHasFeature(state, nodeFeature)
1599+
);
15611600
ImmutableOpenMap<String, PipelineHolder> updatedPipelines = ImmutableOpenMap.builder(originalPipelines)
15621601
.fPut(id, new PipelineHolder(holder.configuration, updatedPipeline))
15631602
.build();

0 commit comments

Comments
 (0)