Skip to content

Commit d4bc4fd

Browse files
committed
SNAPSHOT - Refactor for ingest service to handle reroute path checks
1 parent 8d46fdc commit d4bc4fd

File tree

11 files changed

+92
-97
lines changed

11 files changed

+92
-97
lines changed

.idea/runConfigurations/Debug_Elasticsearch.xml

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
6565
entry(RegisteredDomainProcessor.TYPE, new RegisteredDomainProcessor.Factory()),
6666
entry(RemoveProcessor.TYPE, new RemoveProcessor.Factory(parameters.scriptService)),
6767
entry(RenameProcessor.TYPE, new RenameProcessor.Factory(parameters.scriptService)),
68-
entry(RerouteProcessor.TYPE, new RerouteProcessor.Factory(parameters.ingestService)),
68+
entry(RerouteProcessor.TYPE, new RerouteProcessor.Factory()),
6969
entry(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService)),
7070
entry(SetProcessor.TYPE, new SetProcessor.Factory(parameters.scriptService)),
7171
entry(SortProcessor.TYPE, new SortProcessor.Factory()),

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,10 @@
1010
package org.elasticsearch.ingest.common;
1111

1212
import org.elasticsearch.cluster.metadata.ProjectId;
13-
import org.elasticsearch.cluster.metadata.ProjectMetadata;
14-
import org.elasticsearch.cluster.service.ClusterService;
15-
import org.elasticsearch.common.streams.StreamsPermissionsUtils;
1613
import org.elasticsearch.core.Nullable;
1714
import org.elasticsearch.ingest.AbstractProcessor;
1815
import org.elasticsearch.ingest.ConfigurationUtils;
1916
import org.elasticsearch.ingest.IngestDocument;
20-
import org.elasticsearch.ingest.IngestService;
2117
import org.elasticsearch.ingest.Processor;
2218

2319
import java.util.List;
@@ -49,20 +45,14 @@ public final class RerouteProcessor extends AbstractProcessor {
4945
private final List<DataStreamValueSource> dataset;
5046
private final List<DataStreamValueSource> namespace;
5147
private final String destination;
52-
private final ClusterService clusterService;
53-
private final ProjectId projectId;
54-
private final StreamsPermissionsUtils streamsPermissionsUtils;
5548

5649
RerouteProcessor(
5750
String tag,
5851
String description,
5952
List<DataStreamValueSource> type,
6053
List<DataStreamValueSource> dataset,
6154
List<DataStreamValueSource> namespace,
62-
String destination,
63-
ClusterService clusterService,
64-
ProjectId projectId,
65-
StreamsPermissionsUtils streamsPermissionsUtils
55+
String destination
6656
) {
6757
super(tag, description);
6858
if (type.isEmpty()) {
@@ -81,16 +71,11 @@ public final class RerouteProcessor extends AbstractProcessor {
8171
this.namespace = namespace;
8272
}
8373
this.destination = destination;
84-
this.clusterService = clusterService;
85-
this.projectId = projectId;
86-
this.streamsPermissionsUtils = streamsPermissionsUtils;
8774
}
8875

8976
@Override
9077
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
9178
if (destination != null) {
92-
ProjectMetadata projectMetadata = clusterService.state().projectState(projectId).metadata();
93-
streamsPermissionsUtils.throwIfRerouteToSubstreamNotAllowed(projectMetadata, ingestDocument.getIndexHistory(), destination);
9479
ingestDocument.reroute(destination);
9580
return ingestDocument;
9681
}
@@ -186,12 +171,6 @@ String getDestination() {
186171

187172
public static final class Factory implements Processor.Factory {
188173

189-
private final IngestService ingestService;
190-
191-
public Factory(IngestService ingestService) {
192-
this.ingestService = ingestService;
193-
}
194-
195174
@Override
196175
public RerouteProcessor create(
197176
Map<String, Processor.Factory> processorFactories,
@@ -233,17 +212,7 @@ public RerouteProcessor create(
233212
throw newConfigurationException(TYPE, tag, "destination", "can only be set if type, dataset, and namespace are not set");
234213
}
235214

236-
return new RerouteProcessor(
237-
tag,
238-
description,
239-
type,
240-
dataset,
241-
namespace,
242-
destination,
243-
ingestService.getClusterService(),
244-
projectId,
245-
StreamsPermissionsUtils.getInstance()
246-
);
215+
return new RerouteProcessor(tag, description, type, dataset, namespace, destination);
247216
}
248217
}
249218

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorFactoryTests.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
import org.elasticsearch.ElasticsearchParseException;
1313
import org.elasticsearch.cluster.metadata.ProjectId;
14-
import org.elasticsearch.ingest.IngestService;
1514
import org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource;
1615
import org.elasticsearch.test.ESTestCase;
1716

@@ -20,8 +19,6 @@
2019
import java.util.Map;
2120

2221
import static org.hamcrest.Matchers.equalTo;
23-
import static org.mockito.Answers.RETURNS_SMART_NULLS;
24-
import static org.mockito.Mockito.mock;
2522

2623
public class RerouteProcessorFactoryTests extends ESTestCase {
2724

@@ -78,7 +75,6 @@ private static RerouteProcessor create(String dataset, String namespace) throws
7875
}
7976

8077
private static RerouteProcessor create(Map<String, Object> config) throws Exception {
81-
IngestService ingestService = mock(IngestService.class, RETURNS_SMART_NULLS);
82-
return new RerouteProcessor.Factory(ingestService).create(null, null, null, new HashMap<>(config), ProjectId.DEFAULT);
78+
return new RerouteProcessor.Factory().create(null, null, null, new HashMap<>(config), ProjectId.DEFAULT);
8379
}
8480
}

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java

Lines changed: 2 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -9,45 +9,21 @@
99

1010
package org.elasticsearch.ingest.common;
1111

12-
import org.elasticsearch.cluster.ClusterState;
13-
import org.elasticsearch.cluster.metadata.ProjectId;
14-
import org.elasticsearch.cluster.metadata.ProjectMetadata;
15-
import org.elasticsearch.cluster.service.ClusterService;
16-
import org.elasticsearch.common.streams.StreamsPermissionsUtils;
1712
import org.elasticsearch.ingest.CompoundProcessor;
1813
import org.elasticsearch.ingest.IngestDocument;
1914
import org.elasticsearch.ingest.Processor;
2015
import org.elasticsearch.ingest.RandomDocumentPicks;
2116
import org.elasticsearch.ingest.TestProcessor;
2217
import org.elasticsearch.ingest.WrappingProcessor;
2318
import org.elasticsearch.test.ESTestCase;
24-
import org.junit.Before;
2519

2620
import java.util.List;
2721
import java.util.Map;
2822

2923
import static org.hamcrest.Matchers.equalTo;
30-
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
31-
import static org.mockito.Mockito.any;
32-
import static org.mockito.Mockito.anyString;
33-
import static org.mockito.Mockito.doNothing;
34-
import static org.mockito.Mockito.mock;
35-
import static org.mockito.Mockito.when;
3624

3725
public class RerouteProcessorTests extends ESTestCase {
3826

39-
private final StreamsPermissionsUtils streamsPermissionsUtilsMock = mock(StreamsPermissionsUtils.class);
40-
private final ClusterService clusterServiceMock = mock(ClusterService.class, RETURNS_DEEP_STUBS);
41-
42-
@Before
43-
public void setUpStreamsPermissionsUtils() {
44-
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
45-
.putProjectMetadata(ProjectMetadata.builder(ProjectId.DEFAULT).build())
46-
.build();
47-
when(clusterServiceMock.state()).thenReturn(clusterState);
48-
doNothing().when(streamsPermissionsUtilsMock).throwIfRerouteToSubstreamNotAllowed(any(), any(), anyString());
49-
}
50-
5127
public void testDefaults() throws Exception {
5228
IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
5329

@@ -315,25 +291,12 @@ private RerouteProcessor createRerouteProcessor(List<String> type, List<String>
315291
type.stream().map(RerouteProcessor.DataStreamValueSource::type).toList(),
316292
dataset.stream().map(RerouteProcessor.DataStreamValueSource::dataset).toList(),
317293
namespace.stream().map(RerouteProcessor.DataStreamValueSource::namespace).toList(),
318-
null,
319-
clusterServiceMock,
320-
ProjectId.DEFAULT,
321-
streamsPermissionsUtilsMock
294+
null
322295
);
323296
}
324297

325298
private RerouteProcessor createRerouteProcessor(String destination) {
326-
return new RerouteProcessor(
327-
null,
328-
null,
329-
List.of(),
330-
List.of(),
331-
List.of(),
332-
destination,
333-
clusterServiceMock,
334-
ProjectId.DEFAULT,
335-
streamsPermissionsUtilsMock
336-
);
299+
return new RerouteProcessor(null, null, List.of(), List.of(), List.of(), destination);
337300
}
338301

339302
private void assertDataSetFields(IngestDocument ingestDocument, String type, String dataset, String namespace) {

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.action.index.IndexRequest;
2727
import org.elasticsearch.action.ingest.DeletePipelineRequest;
2828
import org.elasticsearch.action.ingest.PutPipelineRequest;
29+
import org.elasticsearch.action.ingest.ReservedPipelineAction;
2930
import org.elasticsearch.action.support.RefCountingRunnable;
3031
import org.elasticsearch.action.support.master.AcknowledgedResponse;
3132
import org.elasticsearch.client.internal.Client;
@@ -55,6 +56,8 @@
5556
import org.elasticsearch.common.logging.DeprecationLogger;
5657
import org.elasticsearch.common.regex.Regex;
5758
import org.elasticsearch.common.settings.Settings;
59+
import org.elasticsearch.common.streams.StreamType;
60+
import org.elasticsearch.common.streams.StreamsPermissionsUtils;
5861
import org.elasticsearch.common.util.CollectionUtils;
5962
import org.elasticsearch.common.util.Maps;
6063
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -75,6 +78,7 @@
7578
import org.elasticsearch.node.ReportingService;
7679
import org.elasticsearch.plugins.IngestPlugin;
7780
import org.elasticsearch.plugins.internal.XContentParserDecorator;
81+
import org.elasticsearch.script.Metadata;
7882
import org.elasticsearch.script.ScriptService;
7983
import org.elasticsearch.threadpool.Scheduler;
8084
import org.elasticsearch.threadpool.ThreadPool;
@@ -154,6 +158,7 @@ public static boolean locallySupportedIngestFeature(NodeFeature nodeFeature) {
154158
private volatile ClusterState state;
155159
private final ProjectResolver projectResolver;
156160
private final FeatureService featureService;
161+
private final StreamsPermissionsUtils streamsPermissionsUtils;
157162

158163
private static BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> createScheduler(ThreadPool threadPool) {
159164
return (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), threadPool.generic());
@@ -241,7 +246,8 @@ public IngestService(
241246
MatcherWatchdog matcherWatchdog,
242247
FailureStoreMetrics failureStoreMetrics,
243248
ProjectResolver projectResolver,
244-
FeatureService featureService
249+
FeatureService featureService,
250+
StreamsPermissionsUtils streamsPermissionsUtils
245251
) {
246252
this.clusterService = clusterService;
247253
this.scriptService = scriptService;
@@ -265,6 +271,7 @@ public IngestService(
265271
this.failureStoreMetrics = failureStoreMetrics;
266272
this.projectResolver = projectResolver;
267273
this.featureService = featureService;
274+
this.streamsPermissionsUtils = streamsPermissionsUtils;
268275
}
269276

270277
/**
@@ -283,6 +290,7 @@ public IngestService(
283290
this.failureStoreMetrics = ingestService.failureStoreMetrics;
284291
this.projectResolver = ingestService.projectResolver;
285292
this.featureService = ingestService.featureService;
293+
streamsPermissionsUtils = ingestService.streamsPermissionsUtils;
286294
}
287295

288296
private static Map<String, Processor.Factory> processorFactories(List<IngestPlugin> ingestPlugins, Processor.Parameters parameters) {
@@ -301,15 +309,15 @@ private static Map<String, Processor.Factory> processorFactories(List<IngestPlug
301309

302310
/**
303311
* Resolves the potential pipelines (default and final) from the requests or templates associated to the index and then **mutates**
304-
* the {@link org.elasticsearch.action.index.IndexRequest} passed object with the pipeline information.
312+
* the {@link IndexRequest} passed object with the pipeline information.
305313
* <p>
306314
* Also, this method marks the request as `isPipelinesResolved = true`: Due to the request could be rerouted from a coordinating node
307315
* to an ingest node, we have to be able to avoid double resolving the pipelines and also able to distinguish that either the pipeline
308316
* comes as part of the request or resolved from this method. All this is made to later be able to reject the request in case the
309317
* pipeline was set by a required pipeline **and** the request also has a pipeline request too.
310318
*
311319
* @param originalRequest Original write request received.
312-
* @param indexRequest The {@link org.elasticsearch.action.index.IndexRequest} object to update.
320+
* @param indexRequest The {@link IndexRequest} object to update.
313321
* @param projectMetadata Project metadata from the cluster state from where the pipeline information is derived.
314322
*/
315323
public static void resolvePipelinesAndUpdateIndexRequest(
@@ -411,7 +419,7 @@ public void delete(ProjectId projectId, DeletePipelineRequest request, ActionLis
411419
}
412420

413421
/**
414-
* Used by this class and {@link org.elasticsearch.action.ingest.ReservedPipelineAction}
422+
* Used by this class and {@link ReservedPipelineAction}
415423
*/
416424
public static class DeletePipelineClusterStateUpdateTask extends PipelineClusterStateUpdateTask {
417425
private final DeletePipelineRequest request;
@@ -672,7 +680,7 @@ private static void collectProcessorMetrics(
672680
}
673681

674682
/**
675-
* Used in this class and externally by the {@link org.elasticsearch.action.ingest.ReservedPipelineAction}
683+
* Used in this class and externally by the {@link ReservedPipelineAction}
676684
*/
677685
public static class PutPipelineClusterStateUpdateTask extends PipelineClusterStateUpdateTask {
678686
private final PutPipelineRequest request;
@@ -683,7 +691,7 @@ public static class PutPipelineClusterStateUpdateTask extends PipelineClusterSta
683691
}
684692

685693
/**
686-
* Used by {@link org.elasticsearch.action.ingest.ReservedPipelineAction}
694+
* Used by {@link ReservedPipelineAction}
687695
*/
688696
public PutPipelineClusterStateUpdateTask(ProjectId projectId, PutPipelineRequest request) {
689697
this(projectId, null, request);
@@ -904,7 +912,7 @@ protected void doRun() {
904912
final int slot = i;
905913
final Releasable ref = refs.acquire();
906914
final IngestDocument ingestDocument = newIngestDocument(indexRequest);
907-
final org.elasticsearch.script.Metadata originalDocumentMetadata = ingestDocument.getMetadata().clone();
915+
final Metadata originalDocumentMetadata = ingestDocument.getMetadata().clone();
908916
// the document listener gives us three-way logic: a document can fail processing (1), or it can
909917
// be successfully processed. a successfully processed document can be kept (2) or dropped (3).
910918
final ActionListener<IngestPipelinesExecutionResult> documentListener = ActionListener.runAfter(
@@ -1198,6 +1206,31 @@ private void executePipelines(
11981206
return; // document failed!
11991207
}
12001208

1209+
StreamsPermissionsUtils permissionsUtils = StreamsPermissionsUtils.getInstance();
1210+
for (StreamType streamType : StreamType.values()) {
1211+
if (permissionsUtils.streamTypeIsEnabled(streamType, project)) {
1212+
if (newIndex.startsWith(streamType.getStreamName() + ".")
1213+
&& ingestDocument.getIndexHistory().stream().noneMatch(s -> s.equals(streamType.getStreamName()))) {
1214+
exceptionHandler.accept(
1215+
new IngestPipelineException(
1216+
pipelineId,
1217+
new IllegalArgumentException(
1218+
format(
1219+
"Pipelines can't re-route documents to child streams, but pipeline [%s] tried to reroute "
1220+
+ "this document from index [%s] to index [%s]. Reroute history: %s",
1221+
pipelineId,
1222+
originalIndex,
1223+
newIndex,
1224+
String.join(" -> ", ingestDocument.getIndexHistory())
1225+
)
1226+
)
1227+
)
1228+
);
1229+
return; // document failed!
1230+
}
1231+
}
1232+
}
1233+
12011234
// add the index to the document's index history, and check for cycles in the visited indices
12021235
boolean cycle = ingestDocument.updateIndexHistory(newIndex) == false;
12031236
if (cycle) {
@@ -1352,7 +1385,7 @@ private static IngestDocument newIngestDocument(final IndexRequest request) {
13521385
/**
13531386
* Updates an index request based on the metadata of an ingest document.
13541387
*/
1355-
private static void updateIndexRequestMetadata(final IndexRequest request, final org.elasticsearch.script.Metadata metadata) {
1388+
private static void updateIndexRequestMetadata(final IndexRequest request, final Metadata metadata) {
13561389
// it's fine to set all metadata fields all the time, as ingest document holds their starting values
13571390
// before ingestion, which might also get modified during ingestion.
13581391
request.index(metadata.getIndex());

server/src/main/java/org/elasticsearch/node/NodeConstruction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import org.elasticsearch.common.settings.Setting;
7878
import org.elasticsearch.common.settings.Settings;
7979
import org.elasticsearch.common.settings.SettingsModule;
80+
import org.elasticsearch.common.streams.StreamsPermissionsUtils;
8081
import org.elasticsearch.common.util.BigArrays;
8182
import org.elasticsearch.common.util.PageCacheRecycler;
8283
import org.elasticsearch.common.util.set.Sets;
@@ -716,7 +717,8 @@ private void construct(
716717
IngestService.createGrokThreadWatchdog(environment, threadPool),
717718
failureStoreMetrics,
718719
projectResolver,
719-
featureService
720+
featureService,
721+
StreamsPermissionsUtils.getInstance()
720722
);
721723

722724
SystemIndices systemIndices = createSystemIndices(settings);

0 commit comments

Comments
 (0)