Skip to content

Commit 617dc51

Browse files
committed
Add additional tests for delete by query
1 parent 8c33962 commit 617dc51

File tree

6 files changed

+91
-5
lines changed

6 files changed

+91
-5
lines changed

modules/streams/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ esplugin {
2020

2121
restResources {
2222
restApi {
23-
include '_common', 'streams', "bulk", "index", "ingest", "indices"
23+
include '_common', 'streams', "bulk", "index", "ingest", "indices", "delete_by_query", "search"
2424
}
2525
}
2626

@@ -39,4 +39,5 @@ artifacts {
3939
dependencies {
4040
testImplementation project(path: ':test:test-clusters')
4141
clusterModules project(':modules:ingest-common')
42+
clusterModules project(':modules:reindex')
4243
}

modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public static Iterable<Object[]> parameters() throws Exception {
3232
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
3333
.module("streams")
3434
.module("ingest-common")
35+
.module("reindex")
3536
.feature(FeatureFlag.LOGS_STREAM)
3637
.build();
3738

modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,87 @@
7070
- match: { items.0.index.status: 400 }
7171
- match: { items.0.index.error.type: "illegal_argument_exception" }
7272
- match: { items.0.index.error.reason: "Pipelines can't re-route documents to child streams, but pipeline [reroute-to-logs-foo] tried to reroute this document from index [bad-index] to index [logs.foo]. Reroute history: bad-index" }
73+
74+
---
75+
"Check Bulk Index With Script Processor To Substream Is Rejected":
76+
- do:
77+
streams.logs_enable: { }
78+
- is_true: acknowledged
79+
80+
- do:
81+
streams.status: { }
82+
- is_true: logs.enabled
83+
84+
- do:
85+
ingest.put_pipeline:
86+
id: "script-to-logs-foo"
87+
body:
88+
processors:
89+
- script:
90+
source: "ctx._index = 'logs.foo'"
91+
- do:
92+
indices.create:
93+
index: "bad-index-script"
94+
body:
95+
settings:
96+
index.default_pipeline: "script-to-logs-foo"
97+
- do:
98+
bulk:
99+
body: |
100+
{ "index": { "_index": "bad-index-script" } }
101+
{ "foo": "bar" }
102+
- match: { errors: true }
103+
- match: { items.0.index.status: 400 }
104+
- match: { items.0.index.error.type: "illegal_argument_exception" }
105+
- match: { items.0.index.error.reason: "Pipelines can't re-route documents to child streams, but pipeline [script-to-logs-foo] tried to reroute this document from index [bad-index-script] to index [logs.foo]. Reroute history: bad-index-script" }
106+
107+
---
108+
"Check Delete By Query Directly On Substream After Reroute Succeeds":
109+
- do:
110+
streams.logs_enable: { }
111+
- is_true: acknowledged
112+
113+
- do:
114+
streams.status: { }
115+
- is_true: logs.enabled
116+
117+
- do:
118+
ingest.put_pipeline:
119+
id: "reroute-to-logs-foo-success"
120+
body:
121+
processors:
122+
- reroute:
123+
destination: "logs.foo"
124+
- do:
125+
indices.create:
126+
index: "logs"
127+
body:
128+
settings:
129+
index.default_pipeline: "reroute-to-logs-foo-success"
130+
- do:
131+
bulk:
132+
refresh: true
133+
body: |
134+
{ "index": { "_index": "logs" } }
135+
{ "foo": "bar", "baz": "qux" }
136+
- match: { errors: false }
137+
- match: { items.0.index.status: 201 }
138+
139+
- do:
140+
delete_by_query:
141+
index: logs.foo
142+
refresh: true
143+
body:
144+
query:
145+
match:
146+
foo: "bar"
147+
- match: { deleted: 1 }
148+
- match: { total: 1 }
149+
150+
- do:
151+
search:
152+
index: logs.foo
153+
body:
154+
query:
155+
match_all: {}
156+
- match: { hits.total.value: 0 }

server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ private void applyPipelinesAndDoInternalExecute(
405405
// Validate child stream writes before processing pipelines
406406
ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterService.state());
407407
Set<StreamType> enabledStreamTypes = Arrays.stream(StreamType.values())
408-
.filter(t -> StreamType.streamTypeIsEnabled(t, projectMetadata))
408+
.filter(t -> t.streamTypeIsEnabled(projectMetadata))
409409
.collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class)));
410410

411411
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest);

server/src/main/java/org/elasticsearch/common/streams/StreamType.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ public String getStreamName() {
2626
return streamName;
2727
}
2828

29-
public static boolean streamTypeIsEnabled(StreamType streamType, ProjectMetadata projectMetadata) {
29+
public boolean streamTypeIsEnabled(ProjectMetadata projectMetadata) {
3030
StreamsMetadata metadata = projectMetadata.custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY);
31-
return switch (streamType) {
31+
return switch (this) {
3232
case LOGS -> metadata.isLogsEnabled();
3333
};
3434
}

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1200,7 +1200,7 @@ private void executePipelines(
12001200
}
12011201

12021202
for (StreamType streamType : StreamType.values()) {
1203-
if (StreamType.streamTypeIsEnabled(streamType, project)) {
1203+
if (streamType.streamTypeIsEnabled(project)) {
12041204
if (newIndex.startsWith(streamType.getStreamName() + ".")
12051205
&& ingestDocument.getIndexHistory().stream().noneMatch(s -> s.equals(streamType.getStreamName()))) {
12061206
exceptionHandler.accept(

0 commit comments

Comments
 (0)