Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,8 @@
org.elasticsearch.search.SearchFeatures,
org.elasticsearch.script.ScriptFeatures,
org.elasticsearch.search.retriever.RetrieversFeatures,
org.elasticsearch.action.admin.cluster.stats.ClusterStatsFeatures;
org.elasticsearch.action.admin.cluster.stats.ClusterStatsFeatures,
org.elasticsearch.ingest.IngestFeatures;

uses org.elasticsearch.plugins.internal.SettingsExtension;
uses RestExtension;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ static void executeDocument(
pipeline.getVersion(),
pipeline.getMetadata(),
verbosePipelineProcessor,
pipeline.getFieldAccessPattern(),
pipeline.getDeprecated()
);
ingestDocument.executePipeline(verbosePipeline, (result, e) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
Expand All @@ -38,6 +39,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;

public class SimulatePipelineRequest extends LegacyActionRequest implements ToXContentObject {
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(SimulatePipelineRequest.class);
Expand Down Expand Up @@ -154,15 +156,17 @@ static Parsed parse(
Map<String, Object> config,
boolean verbose,
IngestService ingestService,
RestApiVersion restApiVersion
RestApiVersion restApiVersion,
Predicate<NodeFeature> hasFeature
) throws Exception {
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);
Pipeline pipeline = Pipeline.create(
SIMULATED_PIPELINE_ID,
pipelineConfig,
ingestService.getProcessorFactories(),
ingestService.getScriptService(),
projectId
projectId,
hasFeature
);
List<IngestDocument> ingestDocumentList = parseDocs(config, restApiVersion);
return new Parsed(pipeline, ingestDocumentList, verbose);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -51,6 +53,8 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
private final SimulateExecutionService executionService;
private final TransportService transportService;
private final ProjectResolver projectResolver;
private final ClusterService clusterService;
private final FeatureService featureService;
private volatile TimeValue ingestNodeTransportActionTimeout;
// ThreadLocal because our unit testing framework does not like sharing Randoms across threads
private final ThreadLocal<Random> random = ThreadLocal.withInitial(Randomness::get);
Expand All @@ -61,7 +65,9 @@ public SimulatePipelineTransportAction(
TransportService transportService,
ActionFilters actionFilters,
IngestService ingestService,
ProjectResolver projectResolver
ProjectResolver projectResolver,
ClusterService clusterService,
FeatureService featureService
) {
super(
SimulatePipelineAction.NAME,
Expand All @@ -74,6 +80,8 @@ public SimulatePipelineTransportAction(
this.executionService = new SimulateExecutionService(threadPool);
this.transportService = transportService;
this.projectResolver = projectResolver;
this.clusterService = clusterService;
this.featureService = featureService;
this.ingestNodeTransportActionTimeout = INGEST_NODE_TRANSPORT_ACTION_TIMEOUT.get(ingestService.getClusterService().getSettings());
ingestService.getClusterService()
.getClusterSettings()
Expand Down Expand Up @@ -117,7 +125,8 @@ protected void doExecute(Task task, SimulatePipelineRequest request, ActionListe
source,
request.isVerbose(),
ingestService,
request.getRestApiVersion()
request.getRestApiVersion(),
(feature) -> featureService.clusterHasFeature(clusterService.state(), feature)
);
}
executionService.execute(simulateRequest, listener);
Expand Down
27 changes: 27 additions & 0 deletions server/src/main/java/org/elasticsearch/ingest/IngestFeatures.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.ingest;

import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.features.FeatureSpecification;
import org.elasticsearch.features.NodeFeature;

import java.util.Set;

public class IngestFeatures implements FeatureSpecification {
@Override
public Set<NodeFeature> getFeatures() {
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
return Set.of(IngestService.FIELD_ACCESS_PATTERN);
} else {
return Set.of();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.ingest;

import java.util.Map;

public enum IngestPipelineFieldAccessPattern {
/**
* Field names will be split on the `.` character into their contingent parts. Resolution will strictly check
* for nested objects following the field path.
*/
CLASSIC("classic"),
/**
* Field names will be split on the `.` character into their contingent parts. Resolution will flexibly check
* for nested objects following the field path. If nested objects are not found for a key, the access pattern
* will fall back to joining subsequent path elements together until it finds the next object that matches the
* concatenated path. Allows for simple resolution of dotted field names.
*/
FLEXIBLE("flexible");

private final String key;

IngestPipelineFieldAccessPattern(String key) {
this.key = key;
}

public String getKey() {
return key;
}

private static final Map<String, IngestPipelineFieldAccessPattern> NAME_REGISTRY = Map.of(CLASSIC.key, CLASSIC, FLEXIBLE.key, FLEXIBLE);

public static boolean isValidAccessPattern(String accessPatternName) {
return NAME_REGISTRY.containsKey(accessPatternName);
}

public static IngestPipelineFieldAccessPattern getAccessPattern(String accessPatternName) {
IngestPipelineFieldAccessPattern accessPattern = NAME_REGISTRY.get(accessPatternName);
if (accessPattern == null) {
throw new IllegalArgumentException("Invalid ingest pipeline access pattern name [" + accessPatternName + "] given");
}
return accessPattern;
}
}
45 changes: 42 additions & 3 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.env.Environment;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.grok.MatcherWatchdog;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -119,6 +120,25 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
private static final Logger logger = LogManager.getLogger(IngestService.class);
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(IngestService.class);

public static final NodeFeature FIELD_ACCESS_PATTERN = new NodeFeature("ingest.field_access_pattern", true);

/**
* Checks the locally supported node features without relying on cluster state or feature service.
* This is primarily to support the Logstash elastic_integration plugin which uses the IngestService
* internally and thus would not have access to cluster service or feature services. NodeFeatures that
* are accepted here should be currently and generally available in Elasticsearch.
* @param nodeFeature The node feature to check
* @return true if the node feature can be supported in the local library code, false if it is not supported
*/
public static boolean locallySupportedIngestFeature(NodeFeature nodeFeature) {
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
// logs_stream feature flag guard
return IngestService.FIELD_ACCESS_PATTERN.equals(nodeFeature);
}
// Default to unsupported if not contained here
return false;
}

private final MasterServiceTaskQueue<PipelineClusterStateUpdateTask> taskQueue;
private final ClusterService clusterService;
private final ScriptService scriptService;
Expand Down Expand Up @@ -376,6 +396,10 @@ public ProjectResolver getProjectResolver() {
return projectResolver;
}

public FeatureService getFeatureService() {
return featureService;
}

/**
* Deletes the pipeline specified by id in the request.
*/
Expand Down Expand Up @@ -754,7 +778,14 @@ void validatePipeline(
deprecationLogger.critical(DeprecationCategory.API, "pipeline_name_special_chars", e.getMessage());
}

Pipeline pipeline = Pipeline.create(pipelineId, pipelineConfig, processorFactories, scriptService, projectId);
Pipeline pipeline = Pipeline.create(
pipelineId,
pipelineConfig,
processorFactories,
scriptService,
projectId,
(n) -> featureService.clusterHasFeature(state, n)
);
List<Exception> exceptions = new ArrayList<>();
for (Processor processor : pipeline.flattenAllProcessors()) {

Expand Down Expand Up @@ -1428,7 +1459,8 @@ synchronized void innerUpdatePipelines(ProjectId projectId, IngestMetadata newIn
newConfiguration.getConfig(false),
processorFactories,
scriptService,
projectId
projectId,
(nodeFeature) -> featureService.clusterHasFeature(clusterService.state(), nodeFeature)
);
newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline));

Expand Down Expand Up @@ -1557,7 +1589,14 @@ public <P extends Processor> Collection<String> getPipelineWithProcessorType(
public synchronized void reloadPipeline(ProjectId projectId, String id) throws Exception {
var originalPipelines = this.pipelines.getOrDefault(projectId, ImmutableOpenMap.of());
PipelineHolder holder = originalPipelines.get(id);
Pipeline updatedPipeline = Pipeline.create(id, holder.configuration.getConfig(false), processorFactories, scriptService, projectId);
Pipeline updatedPipeline = Pipeline.create(
id,
holder.configuration.getConfig(false),
processorFactories,
scriptService,
projectId,
(nodeFeature) -> featureService.clusterHasFeature(state, nodeFeature)
);
ImmutableOpenMap<String, PipelineHolder> updatedPipelines = ImmutableOpenMap.builder(originalPipelines)
.fPut(id, new PipelineHolder(holder.configuration, updatedPipeline))
.build();
Expand Down
Loading