Skip to content

Commit 12a57e0

Browse files
lukewhitingszybia
andauthored
Restrict Indexing To Child Streams When Streams Is Enabled (#132011)
* Update BulkRequestModifier to allow wrapping multiple times while preserving ingest time taken * Modify BulkResponse to have an equals method and update ingest test's to not depend on same instance assertions This prevents issues when wrapping responses during ingest * Add new StreamType enum along with logic to check if that stream type is enabled in the cluster * Modify IngestService to prevent documents being re-routed into child streams * Modify TransportAbstractBulkAction to prevent indexing into child streams * Additional tests for new indexing restrictions * Apply suggestion from @szybia Co-authored-by: Szymon Bialkowski <[email protected]> * Apply suggestions from code review Co-authored-by: Szymon Bialkowski <[email protected]> * Additional PR changes and cleanup * Additional PR changes to improve performance and readability further * Update docs/changelog/132011.yaml * Added additional documentation on bulk modifier wrap methods * PR Changes * Use of failure store is now wrapped in cluster feature check --------- Co-authored-by: Szymon Bialkowski <[email protected]>
1 parent 3d14a39 commit 12a57e0

File tree

11 files changed

+375
-35
lines changed

11 files changed

+375
-35
lines changed

docs/changelog/132011.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 132011
2+
summary: Restrict Indexing To Child Streams When Streams Is Enabled
3+
area: Data streams
4+
type: enhancement
5+
issues: []

modules/streams/build.gradle

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ esplugin {
2020

2121
restResources {
2222
restApi {
23-
include '_common', 'streams'
23+
include '_common', 'streams', "bulk", "index", "ingest", "indices", "delete_by_query", "search"
2424
}
2525
}
2626

@@ -38,4 +38,6 @@ artifacts {
3838

3939
dependencies {
4040
testImplementation project(path: ':test:test-clusters')
41+
clusterModules project(':modules:ingest-common')
42+
clusterModules project(':modules:reindex')
4143
}

modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,12 @@ public static Iterable<Object[]> parameters() throws Exception {
2929
}
3030

3131
@ClassRule
32-
public static ElasticsearchCluster cluster = ElasticsearchCluster.local().module("streams").feature(FeatureFlag.LOGS_STREAM).build();
32+
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
33+
.module("streams")
34+
.module("ingest-common")
35+
.module("reindex")
36+
.feature(FeatureFlag.LOGS_STREAM)
37+
.build();
3338

3439
@Override
3540
protected String getTestRestCluster() {
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
---
2+
"Check User Can't Write To Substream Directly":
3+
- do:
4+
streams.logs_enable: { }
5+
- is_true: acknowledged
6+
7+
- do:
8+
streams.status: { }
9+
- is_true: logs.enabled
10+
11+
- do:
12+
bulk:
13+
body: |
14+
{ "index": { "_index": "logs.foo" } }
15+
{ "foo": "bar" }
16+
- match: { errors: true }
17+
- match: { items.0.index.status: 400 }
18+
- match: { items.0.index.error.type: "illegal_argument_exception" }
19+
- match: { items.0.index.error.reason: "Direct writes to child streams are prohibited. Index directly into the [logs] stream instead" }
20+
21+
---
22+
"Check User Can't Write To Substream Directly With Single Doc":
23+
- do:
24+
streams.logs_enable: { }
25+
- is_true: acknowledged
26+
27+
- do:
28+
streams.status: { }
29+
- is_true: logs.enabled
30+
31+
- do:
32+
catch: bad_request
33+
index:
34+
index: logs.foo
35+
id: "1"
36+
body:
37+
foo: bar
38+
- match: { error.type: "illegal_argument_exception" }
39+
- match: { error.reason: "Direct writes to child streams are prohibited. Index directly into the [logs] stream instead" }
40+
41+
---
42+
"Check Bulk Index With Reroute Processor To Substream Is Rejected":
43+
- do:
44+
streams.logs_enable: { }
45+
- is_true: acknowledged
46+
47+
- do:
48+
streams.status: { }
49+
- is_true: logs.enabled
50+
51+
- do:
52+
ingest.put_pipeline:
53+
id: "reroute-to-logs-foo"
54+
body:
55+
processors:
56+
- reroute:
57+
destination: "logs.foo"
58+
- do:
59+
indices.create:
60+
index: "bad-index"
61+
body:
62+
settings:
63+
index.default_pipeline: "reroute-to-logs-foo"
64+
- do:
65+
bulk:
66+
body: |
67+
{ "index": { "_index": "bad-index" } }
68+
{ "foo": "bar" }
69+
- match: { errors: true }
70+
- match: { items.0.index.status: 400 }
71+
- match: { items.0.index.error.type: "illegal_argument_exception" }
72+
- match: { items.0.index.error.reason: "Pipeline [reroute-to-logs-foo] can't change the target index (from [bad-index] to [logs] child stream [logs.foo]) History: [bad-index]" }
73+
74+
---
75+
"Check Bulk Index With Script Processor To Substream Is Rejected":
76+
- do:
77+
streams.logs_enable: { }
78+
- is_true: acknowledged
79+
80+
- do:
81+
streams.status: { }
82+
- is_true: logs.enabled
83+
84+
- do:
85+
ingest.put_pipeline:
86+
id: "script-to-logs-foo"
87+
body:
88+
processors:
89+
- script:
90+
source: "ctx._index = 'logs.foo'"
91+
- do:
92+
indices.create:
93+
index: "bad-index-script"
94+
body:
95+
settings:
96+
index.default_pipeline: "script-to-logs-foo"
97+
- do:
98+
bulk:
99+
body: |
100+
{ "index": { "_index": "bad-index-script" } }
101+
{ "foo": "bar" }
102+
- match: { errors: true }
103+
- match: { items.0.index.status: 400 }
104+
- match: { items.0.index.error.type: "illegal_argument_exception" }
105+
- match: { items.0.index.error.reason: "Pipeline [script-to-logs-foo] can't change the target index (from [bad-index-script] to [logs] child stream [logs.foo]) History: [bad-index-script]" }
106+
107+
---
108+
"Check Delete By Query Directly On Substream After Reroute Succeeds":
109+
- do:
110+
streams.logs_enable: { }
111+
- is_true: acknowledged
112+
113+
- do:
114+
streams.status: { }
115+
- is_true: logs.enabled
116+
117+
- do:
118+
ingest.put_pipeline:
119+
id: "reroute-to-logs-foo-success"
120+
body:
121+
processors:
122+
- reroute:
123+
destination: "logs.foo"
124+
- do:
125+
indices.create:
126+
index: "logs"
127+
body:
128+
settings:
129+
index.default_pipeline: "reroute-to-logs-foo-success"
130+
- do:
131+
bulk:
132+
refresh: true
133+
body: |
134+
{ "index": { "_index": "logs" } }
135+
{ "foo": "bar", "baz": "qux" }
136+
- match: { errors: false }
137+
- match: { items.0.index.status: 201 }
138+
139+
- do:
140+
delete_by_query:
141+
index: logs.foo
142+
refresh: true
143+
body:
144+
query:
145+
match:
146+
foo: "bar"
147+
- match: { deleted: 1 }
148+
- match: { total: 1 }
149+
150+
- do:
151+
search:
152+
index: logs.foo
153+
body:
154+
query:
155+
match_all: {}
156+
- match: { hits.total.value: 0 }

server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java

Lines changed: 65 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Objects;
3131
import java.util.Set;
3232
import java.util.concurrent.atomic.AtomicIntegerArray;
33+
import java.util.function.Function;
3334
import java.util.stream.Collectors;
3435
import java.util.stream.IntStream;
3536

@@ -105,47 +106,87 @@ BulkRequest getBulkRequest() {
105106
* If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the
106107
* updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest
107108
* service with the results returned from running the remaining write operations.
109+
* <br>
110+
* Use this method when you want the ingest time to be taken from the actual {@link BulkResponse} such as if you are wrapping
111+
* a response multiple times and wish to preserve an already calculated ingest time.
108112
*
109-
* @param ingestTookInMillis Time elapsed for ingestion to be passed to final result.
110-
* @param actionListener The action listener that expects the final bulk response.
111-
* @return An action listener that combines ingest failure results with the results from writing the remaining documents.
113+
* @param actionListener the listener to wrap
114+
* @return a wrapped listener that merges ingest and bulk results, or the original listener if no items were dropped/failed
115+
*/
116+
ActionListener<BulkResponse> wrapActionListenerIfNeeded(ActionListener<BulkResponse> actionListener) {
117+
if (itemResponses.isEmpty()) {
118+
return actionListener;
119+
} else {
120+
return doWrapActionListenerIfNeeded(BulkResponse::getIngestTookInMillis, actionListener);
121+
}
122+
}
123+
124+
/**
125+
* If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the
126+
* updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest
127+
* service with the results returned from running the remaining write operations.
128+
* <br>
129+
* This variant is used when the ingest time is already known and should be explicitly set in the final response,
130+
* rather than extracted from the {@link BulkResponse}.
131+
*
132+
* @param ingestTookInMillis the ingest time in milliseconds to use in the final response
133+
* @param actionListener the listener to wrap
134+
* @return a wrapped listener that merges ingest and bulk results, or the original listener if no items were dropped/failed
112135
*/
113136
ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener<BulkResponse> actionListener) {
114137
if (itemResponses.isEmpty()) {
115138
return actionListener.map(
116139
response -> new BulkResponse(
117140
response.getItems(),
118-
response.getTook().getMillis(),
141+
response.getTookInMillis(),
119142
ingestTookInMillis,
120143
response.getIncrementalState()
121144
)
122145
);
123146
} else {
124-
return actionListener.map(response -> {
125-
// these items are the responses from the subsequent bulk request, their 'slots'
126-
// are not correct for this response we're building
127-
final BulkItemResponse[] bulkResponses = response.getItems();
147+
return doWrapActionListenerIfNeeded(ignoredResponse -> ingestTookInMillis, actionListener);
148+
}
149+
}
150+
151+
/**
152+
* If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the
153+
* updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest
154+
* service with the results returned from running the remaining write operations.
155+
*
156+
* @param ingestTimeProviderFunction A function to provide the ingest time taken for this response
157+
* @param actionListener The action listener that expects the final bulk response.
158+
* @return An action listener that combines ingest failure results with the results from writing the remaining documents.
159+
*/
160+
private ActionListener<BulkResponse> doWrapActionListenerIfNeeded(
161+
Function<BulkResponse, Long> ingestTimeProviderFunction,
162+
ActionListener<BulkResponse> actionListener
163+
) {
164+
return actionListener.map(response -> {
165+
// these items are the responses from the subsequent bulk request, their 'slots'
166+
// are not correct for this response we're building
167+
final BulkItemResponse[] bulkResponses = response.getItems();
128168

129-
final BulkItemResponse[] allResponses = new BulkItemResponse[bulkResponses.length + itemResponses.size()];
169+
final BulkItemResponse[] allResponses = new BulkItemResponse[bulkResponses.length + itemResponses.size()];
130170

131-
// the item responses are from the original request, so their slots are correct.
132-
// these are the responses for requests that failed early and were not passed on to the subsequent bulk.
133-
for (BulkItemResponse item : itemResponses) {
134-
allResponses[item.getItemId()] = item;
135-
}
171+
// the item responses are from the original request, so their slots are correct.
172+
// these are the responses for requests that failed early and were not passed on to the subsequent bulk.
173+
for (BulkItemResponse item : itemResponses) {
174+
allResponses[item.getItemId()] = item;
175+
}
136176

137-
// use the original slots for the responses from the bulk
138-
for (int i = 0; i < bulkResponses.length; i++) {
139-
allResponses[originalSlots.get(i)] = bulkResponses[i];
140-
}
177+
// use the original slots for the responses from the bulk
178+
for (int i = 0; i < bulkResponses.length; i++) {
179+
allResponses[originalSlots.get(i)] = bulkResponses[i];
180+
}
141181

142-
if (Assertions.ENABLED) {
143-
assertResponsesAreCorrect(bulkResponses, allResponses);
144-
}
182+
if (Assertions.ENABLED) {
183+
assertResponsesAreCorrect(bulkResponses, allResponses);
184+
}
145185

146-
return new BulkResponse(allResponses, response.getTook().getMillis(), ingestTookInMillis, response.getIncrementalState());
147-
});
148-
}
186+
var ingestTookInMillis = ingestTimeProviderFunction.apply(response);
187+
188+
return new BulkResponse(allResponses, response.getTook().getMillis(), ingestTookInMillis, response.getIncrementalState());
189+
});
149190
}
150191

151192
private void assertResponsesAreCorrect(BulkItemResponse[] bulkResponses, BulkItemResponse[] allResponses) {

0 commit comments

Comments
 (0)