Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,8 @@
org.elasticsearch.search.SearchFeatures,
org.elasticsearch.script.ScriptFeatures,
org.elasticsearch.search.retriever.RetrieversFeatures,
org.elasticsearch.action.admin.cluster.stats.ClusterStatsFeatures;
org.elasticsearch.action.admin.cluster.stats.ClusterStatsFeatures,
org.elasticsearch.ingest.IngestFeatures;

uses org.elasticsearch.plugins.internal.SettingsExtension;
uses RestExtension;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ static void executeDocument(
pipeline.getVersion(),
pipeline.getMetadata(),
verbosePipelineProcessor,
pipeline.getFieldAccessPattern(),
pipeline.getDeprecated()
);
ingestDocument.executePipeline(verbosePipeline, (result, e) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
Expand All @@ -38,6 +39,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;

public class SimulatePipelineRequest extends LegacyActionRequest implements ToXContentObject {
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(SimulatePipelineRequest.class);
Expand Down Expand Up @@ -154,15 +156,17 @@ static Parsed parse(
Map<String, Object> config,
boolean verbose,
IngestService ingestService,
RestApiVersion restApiVersion
RestApiVersion restApiVersion,
Predicate<NodeFeature> hasFeature
) throws Exception {
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);
Pipeline pipeline = Pipeline.create(
SIMULATED_PIPELINE_ID,
pipelineConfig,
ingestService.getProcessorFactories(),
ingestService.getScriptService(),
projectId
projectId,
hasFeature
);
List<IngestDocument> ingestDocumentList = parseDocs(config, restApiVersion);
return new Parsed(pipeline, ingestDocumentList, verbose);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -51,6 +53,8 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
private final SimulateExecutionService executionService;
private final TransportService transportService;
private final ProjectResolver projectResolver;
private final ClusterService clusterService;
private final FeatureService featureService;
private volatile TimeValue ingestNodeTransportActionTimeout;
// ThreadLocal because our unit testing framework does not like sharing Randoms across threads
private final ThreadLocal<Random> random = ThreadLocal.withInitial(Randomness::get);
Expand All @@ -61,7 +65,9 @@ public SimulatePipelineTransportAction(
TransportService transportService,
ActionFilters actionFilters,
IngestService ingestService,
ProjectResolver projectResolver
ProjectResolver projectResolver,
ClusterService clusterService,
FeatureService featureService
) {
super(
SimulatePipelineAction.NAME,
Expand All @@ -74,6 +80,8 @@ public SimulatePipelineTransportAction(
this.executionService = new SimulateExecutionService(threadPool);
this.transportService = transportService;
this.projectResolver = projectResolver;
this.clusterService = clusterService;
this.featureService = featureService;
this.ingestNodeTransportActionTimeout = INGEST_NODE_TRANSPORT_ACTION_TIMEOUT.get(ingestService.getClusterService().getSettings());
ingestService.getClusterService()
.getClusterSettings()
Expand Down Expand Up @@ -117,7 +125,8 @@ protected void doExecute(Task task, SimulatePipelineRequest request, ActionListe
source,
request.isVerbose(),
ingestService,
request.getRestApiVersion()
request.getRestApiVersion(),
(feature) -> featureService.clusterHasFeature(clusterService.state(), feature)
);
}
executionService.execute(simulateRequest, listener);
Expand Down
27 changes: 27 additions & 0 deletions server/src/main/java/org/elasticsearch/ingest/IngestFeatures.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.ingest;

import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.features.FeatureSpecification;
import org.elasticsearch.features.NodeFeature;

import java.util.Set;

public class IngestFeatures implements FeatureSpecification {
@Override
public Set<NodeFeature> getFeatures() {
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
return Set.of(IngestService.FIELD_ACCESS_PATTERN);
} else {
return Set.of();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.ingest;

import java.util.Map;

public enum IngestPipelineFieldAccessPattern {
/**
* Field names will be split on the `.` character into their contingent parts. Resolution will strictly check
* for nested objects following the field path.
*/
CLASSIC("classic"),
/**
* Field names will be split on the `.` character into their contingent parts. Resolution will flexibly check
* for nested objects following the field path. If nested objects are not found for a key, the access pattern
* will fall back to joining subsequent path elements together until it finds the next object that matches the
* concatenated path. Allows for simple resolution of dotted field names.
*/
FLEXIBLE("flexible");

private final String key;

IngestPipelineFieldAccessPattern(String key) {
this.key = key;
}

public String getKey() {
return key;
}

private static final Map<String, IngestPipelineFieldAccessPattern> NAME_REGISTRY = Map.of(CLASSIC.key, CLASSIC, FLEXIBLE.key, FLEXIBLE);

public static boolean isValidAccessPattern(String accessPatternName) {
return NAME_REGISTRY.containsKey(accessPatternName);
}

public static IngestPipelineFieldAccessPattern getAccessPattern(String accessPatternName) {
IngestPipelineFieldAccessPattern accessPattern = NAME_REGISTRY.get(accessPatternName);
if (accessPattern == null) {
throw new IllegalArgumentException("Invalid ingest pipeline access pattern name [" + accessPatternName + "] given");
}
return accessPattern;
}
}
45 changes: 42 additions & 3 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.env.Environment;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.grok.MatcherWatchdog;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -119,6 +120,25 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
private static final Logger logger = LogManager.getLogger(IngestService.class);
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(IngestService.class);

public static final NodeFeature FIELD_ACCESS_PATTERN = new NodeFeature("ingest.field_access_pattern", true);

/**
* Checks the locally supported node features without relying on cluster state or feature service.
* This is primarily to support the Logstash elastic_integration plugin which uses the IngestService
* internally and thus would not have access to cluster service or feature services. NodeFeatures that
* are accepted here should be currently and generally available in Elasticsearch.
* @param nodeFeature The node feature to check
* @return true if the node feature can be supported in the local library code, false if it is not supported
*/
public static boolean locallySupportedIngestFeature(NodeFeature nodeFeature) {
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
// logs_stream feature flag guard
return IngestService.FIELD_ACCESS_PATTERN.equals(nodeFeature);
}
// Default to unsupported if not contained here
return false;
}

private final MasterServiceTaskQueue<PipelineClusterStateUpdateTask> taskQueue;
private final ClusterService clusterService;
private final ScriptService scriptService;
Expand Down Expand Up @@ -376,6 +396,10 @@ public ProjectResolver getProjectResolver() {
return projectResolver;
}

public FeatureService getFeatureService() {
return featureService;
}

/**
* Deletes the pipeline specified by id in the request.
*/
Expand Down Expand Up @@ -754,7 +778,14 @@ void validatePipeline(
deprecationLogger.critical(DeprecationCategory.API, "pipeline_name_special_chars", e.getMessage());
}

Pipeline pipeline = Pipeline.create(pipelineId, pipelineConfig, processorFactories, scriptService, projectId);
Pipeline pipeline = Pipeline.create(
pipelineId,
pipelineConfig,
processorFactories,
scriptService,
projectId,
(n) -> featureService.clusterHasFeature(state, n)
);
List<Exception> exceptions = new ArrayList<>();
for (Processor processor : pipeline.flattenAllProcessors()) {

Expand Down Expand Up @@ -1428,7 +1459,8 @@ synchronized void innerUpdatePipelines(ProjectId projectId, IngestMetadata newIn
newConfiguration.getConfig(false),
processorFactories,
scriptService,
projectId
projectId,
(nodeFeature) -> featureService.clusterHasFeature(clusterService.state(), nodeFeature)
);
newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline));

Expand Down Expand Up @@ -1557,7 +1589,14 @@ public <P extends Processor> Collection<String> getPipelineWithProcessorType(
public synchronized void reloadPipeline(ProjectId projectId, String id) throws Exception {
var originalPipelines = this.pipelines.getOrDefault(projectId, ImmutableOpenMap.of());
PipelineHolder holder = originalPipelines.get(id);
Pipeline updatedPipeline = Pipeline.create(id, holder.configuration.getConfig(false), processorFactories, scriptService, projectId);
Pipeline updatedPipeline = Pipeline.create(
id,
holder.configuration.getConfig(false),
processorFactories,
scriptService,
projectId,
(nodeFeature) -> featureService.clusterHasFeature(state, nodeFeature)
);
ImmutableOpenMap<String, PipelineHolder> updatedPipelines = ImmutableOpenMap.builder(originalPipelines)
.fPut(id, new PipelineHolder(holder.configuration, updatedPipeline))
.build();
Expand Down
Loading