Skip to content

Commit d0b52df

Browse files
committed
Pre shard distribution blocking of child stream writes and universal pipeline reroute blocking
1 parent d2854f5 commit d0b52df

File tree

8 files changed

+202
-7
lines changed

8 files changed

+202
-7
lines changed

modules/streams/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ esplugin {
2020

2121
restResources {
2222
restApi {
23-
include '_common', 'streams'
23+
include '_common', 'streams', "bulk", "index", "ingest", "indices"
2424
}
2525
}
2626

@@ -38,4 +38,5 @@ artifacts {
3838

3939
dependencies {
4040
testImplementation project(path: ':test:test-clusters')
41+
clusterModules project(':modules:ingest-common')
4142
}

modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,11 @@ public static Iterable<Object[]> parameters() throws Exception {
2929
}
3030

3131
@ClassRule
32-
public static ElasticsearchCluster cluster = ElasticsearchCluster.local().module("streams").feature(FeatureFlag.LOGS_STREAM).build();
32+
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
33+
.module("streams")
34+
.module("ingest-common")
35+
.feature(FeatureFlag.LOGS_STREAM)
36+
.build();
3337

3438
@Override
3539
protected String getTestRestCluster() {
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
---
2+
"Check User Can't Write To Substream Directly":
3+
- do:
4+
streams.logs_enable: { }
5+
- is_true: acknowledged
6+
7+
- do:
8+
streams.status: { }
9+
- is_true: logs.enabled
10+
11+
- do:
12+
bulk:
13+
body: |
14+
{ "index": { "_index": "logs.foo" } }
15+
{ "foo": "bar" }
16+
- match: { errors: true }
17+
- match: { items.0.index.status: 400 }
18+
- match: { items.0.index.error.type: "illegal_argument_exception" }
19+
- match: { items.0.index.error.reason: "Writes to child stream [logs.foo] are not allowed, use the parent stream instead: [logs]" }
20+
21+
---
22+
"Check User Can't Write To Substream Directly With Single Doc":
23+
- do:
24+
streams.logs_enable: { }
25+
- is_true: acknowledged
26+
27+
- do:
28+
streams.status: { }
29+
- is_true: logs.enabled
30+
31+
- do:
32+
catch: bad_request
33+
index:
34+
index: logs.foo
35+
id: "1"
36+
body:
37+
foo: bar
38+
- match: { error.type: "illegal_argument_exception" }
39+
- match: { error.reason: "Writes to child stream [logs.foo] are not allowed, use the parent stream instead: [logs]" }
40+
41+
---
42+
"Check Bulk Index With Reroute Processor To Substream Is Rejected":
43+
- do:
44+
streams.logs_enable: { }
45+
- is_true: acknowledged
46+
47+
- do:
48+
streams.status: { }
49+
- is_true: logs.enabled
50+
51+
- do:
52+
ingest.put_pipeline:
53+
id: "reroute-to-logs-foo"
54+
body:
55+
processors:
56+
- reroute:
57+
destination: "logs.foo"
58+
- do:
59+
indices.create:
60+
index: "bad-index"
61+
body:
62+
settings:
63+
index.default_pipeline: "reroute-to-logs-foo"
64+
- do:
65+
bulk:
66+
body: |
67+
{ "index": { "_index": "bad-index" } }
68+
{ "foo": "bar" }
69+
- match: { errors: true }
70+
- match: { items.0.index.status: 400 }
71+
- match: { items.0.index.error.type: "illegal_argument_exception" }
72+
- match: { items.0.index.error.reason: "Cannot reroute to substream [logs.foo] as only the stream itself can reroute to substreams. Please reroute to the stream [logs] instead." }

server/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@
213213
exports org.elasticsearch.common.regex;
214214
exports org.elasticsearch.common.scheduler;
215215
exports org.elasticsearch.common.settings;
216+
exports org.elasticsearch.common.streams;
216217
exports org.elasticsearch.common.text;
217218
exports org.elasticsearch.common.time;
218219
exports org.elasticsearch.common.transport;

server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Objects;
3131
import java.util.Set;
3232
import java.util.concurrent.atomic.AtomicIntegerArray;
33+
import java.util.function.Function;
3334
import java.util.stream.Collectors;
3435
import java.util.stream.IntStream;
3536

@@ -101,22 +102,33 @@ BulkRequest getBulkRequest() {
101102
}
102103
}
103104

105+
ActionListener<BulkResponse> wrapActionListenerIfNeeded(ActionListener<BulkResponse> actionListener) {
106+
return doWrapActionListenerIfNeeded(BulkResponse::getIngestTookInMillis, actionListener);
107+
}
108+
109+
ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener<BulkResponse> actionListener) {
110+
return doWrapActionListenerIfNeeded(ignoredResponse -> ingestTookInMillis, actionListener);
111+
}
112+
104113
/**
105114
* If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the
106115
* updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest
107116
* service with the results returned from running the remaining write operations.
108117
*
109-
* @param ingestTookInMillis Time elapsed for ingestion to be passed to final result.
118+
* @param ingestTimeProviderFunction A function to provide the ingest time taken for this response
110119
* @param actionListener The action listener that expects the final bulk response.
111120
* @return An action listener that combines ingest failure results with the results from writing the remaining documents.
112121
*/
113-
ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener<BulkResponse> actionListener) {
122+
private ActionListener<BulkResponse> doWrapActionListenerIfNeeded(
123+
Function<BulkResponse, Long> ingestTimeProviderFunction,
124+
ActionListener<BulkResponse> actionListener
125+
) {
114126
if (itemResponses.isEmpty()) {
115127
return actionListener.map(
116128
response -> new BulkResponse(
117129
response.getItems(),
118130
response.getTook().getMillis(),
119-
ingestTookInMillis,
131+
ingestTimeProviderFunction.apply(response),
120132
response.getIncrementalState()
121133
)
122134
);
@@ -143,6 +155,8 @@ ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis,
143155
assertResponsesAreCorrect(bulkResponses, allResponses);
144156
}
145157

158+
var ingestTookInMillis = ingestTimeProviderFunction.apply(response);
159+
146160
return new BulkResponse(allResponses, response.getTook().getMillis(), ingestTookInMillis, response.getIncrementalState());
147161
});
148162
}

server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.cluster.project.ProjectResolver;
3232
import org.elasticsearch.cluster.service.ClusterService;
3333
import org.elasticsearch.common.io.stream.Writeable;
34+
import org.elasticsearch.common.streams.StreamType;
3435
import org.elasticsearch.common.util.concurrent.EsExecutors;
3536
import org.elasticsearch.core.Assertions;
3637
import org.elasticsearch.core.Releasable;
@@ -44,12 +45,16 @@
4445
import org.elasticsearch.transport.TransportService;
4546

4647
import java.io.IOException;
48+
import java.util.Arrays;
49+
import java.util.EnumSet;
4750
import java.util.HashMap;
4851
import java.util.Map;
4952
import java.util.Objects;
53+
import java.util.Set;
5054
import java.util.concurrent.Executor;
5155
import java.util.concurrent.TimeUnit;
5256
import java.util.function.LongSupplier;
57+
import java.util.stream.Collectors;
5358

5459
/**
5560
* This is an abstract base class for bulk actions. It traverses all indices that the request gets routed to, executes all applicable
@@ -396,8 +401,45 @@ private void applyPipelinesAndDoInternalExecute(
396401
ActionListener<BulkResponse> listener
397402
) throws IOException {
398403
final long relativeStartTimeNanos = relativeTimeNanos();
399-
if (applyPipelines(task, bulkRequest, executor, listener) == false) {
400-
doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos);
404+
405+
// Validate child stream writes before processing pipelines
406+
ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterService.state());
407+
Set<StreamType> enabledStreamTypes = Arrays.stream(StreamType.values())
408+
.filter(t -> StreamType.streamTypeIsEnabled(t, projectMetadata))
409+
.collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class)));
410+
411+
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest);
412+
413+
for (StreamType streamType : enabledStreamTypes) {
414+
for (int i = 0; i < bulkRequest.requests.size(); i++) {
415+
DocWriteRequest<?> req = bulkRequestModifier.bulkRequest.requests.get(i);
416+
String prefix = streamType.getStreamName() + ".";
417+
418+
boolean prePipeline = true;
419+
if (req instanceof IndexRequest ir) {
420+
prePipeline = ir.isPipelineResolved() == false;
421+
}
422+
423+
if (req != null && req.index() != null && req.index().startsWith(prefix) && prePipeline) {
424+
IllegalArgumentException e = new IllegalArgumentException(
425+
"Direct writes to child streams are prohibited. Index directly into the ["
426+
+ streamType.getStreamName()
427+
+ "] stream instead"
428+
);
429+
Boolean failureStore = resolveFailureStore(req.index(), projectMetadata, threadPool.absoluteTimeInMillis());
430+
if (failureStore != null && failureStore) {
431+
bulkRequestModifier.markItemForFailureStore(i, req.index(), e);
432+
} else {
433+
bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_ENABLED);
434+
}
435+
}
436+
}
437+
}
438+
439+
var wrappedListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener);
440+
441+
if (applyPipelines(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener) == false) {
442+
doInternalExecute(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener, relativeStartTimeNanos);
401443
}
402444
}
403445

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.common.streams;
11+
12+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
13+
import org.elasticsearch.cluster.metadata.StreamsMetadata;
14+
15+
public enum StreamType {
16+
17+
LOGS("logs");
18+
19+
private final String streamName;
20+
21+
StreamType(String streamName) {
22+
this.streamName = streamName;
23+
}
24+
25+
public String getStreamName() {
26+
return streamName;
27+
}
28+
29+
public static boolean streamTypeIsEnabled(StreamType streamType, ProjectMetadata projectMetadata) {
30+
StreamsMetadata metadata = projectMetadata.custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY);
31+
return switch (streamType) {
32+
case LOGS -> metadata.isLogsEnabled();
33+
};
34+
}
35+
36+
}

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.elasticsearch.common.logging.DeprecationLogger;
5656
import org.elasticsearch.common.regex.Regex;
5757
import org.elasticsearch.common.settings.Settings;
58+
import org.elasticsearch.common.streams.StreamType;
5859
import org.elasticsearch.common.util.CollectionUtils;
5960
import org.elasticsearch.common.util.Maps;
6061
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -1198,6 +1199,30 @@ private void executePipelines(
11981199
return; // document failed!
11991200
}
12001201

1202+
for (StreamType streamType : StreamType.values()) {
1203+
if (StreamType.streamTypeIsEnabled(streamType, project)) {
1204+
if (newIndex.startsWith(streamType.getStreamName() + ".")
1205+
&& ingestDocument.getIndexHistory().stream().noneMatch(s -> s.equals(streamType.getStreamName()))) {
1206+
exceptionHandler.accept(
1207+
new IngestPipelineException(
1208+
pipelineId,
1209+
new IllegalArgumentException(
1210+
format(
1211+
"Pipelines can't re-route documents to child streams, but pipeline [%s] tried to reroute "
1212+
+ "this document from index [%s] to index [%s]. Reroute history: %s",
1213+
pipelineId,
1214+
originalIndex,
1215+
newIndex,
1216+
String.join(" -> ", ingestDocument.getIndexHistory())
1217+
)
1218+
)
1219+
)
1220+
);
1221+
return; // document failed!
1222+
}
1223+
}
1224+
}
1225+
12011226
// add the index to the document's index history, and check for cycles in the visited indices
12021227
boolean cycle = ingestDocument.updateIndexHistory(newIndex) == false;
12031228
if (cycle) {

0 commit comments

Comments
 (0)