Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
c0f1a12
Update BulkRequestModifier to allow wrapping multiple times while pre…
lukewhiting Jul 28, 2025
f8fa32b
Modify BulkResponse to have an equals method and update ingest test's…
lukewhiting Jul 28, 2025
34082ef
Add new StreamType enum along with logic to check if that stream type…
lukewhiting Jul 28, 2025
2457383
Modify IngestService to prevent documents being re-routed into child …
lukewhiting Jul 28, 2025
a2eab2a
Modify TransportAbstractBulkAction to prevent indexing into child str…
lukewhiting Jul 28, 2025
1a861cd
Additional tests for new indexing restrictions
lukewhiting Jul 28, 2025
5faf7fb
Merge branch 'main' into es-11941-streams-logs-bulk-transport-changes
lukewhiting Jul 28, 2025
1c4225b
Apply suggestion from @szybia
lukewhiting Jul 29, 2025
fbfd61b
Apply suggestions from code review
lukewhiting Jul 29, 2025
78cf0ef
Additional PR changes and cleanup
lukewhiting Jul 29, 2025
5e1d615
Additional PR changes to improve performance and readability further
lukewhiting Jul 29, 2025
387b4e3
Update docs/changelog/132011.yaml
lukewhiting Jul 29, 2025
d54b7b9
Added additional documentation on bulk modifier wrap methods
lukewhiting Jul 31, 2025
e5581aa
Merge remote-tracking branch 'origin/es-11941-streams-logs-bulk-trans…
lukewhiting Jul 31, 2025
a7e6f7a
Merge branch 'main' into es-11941-streams-logs-bulk-transport-changes
lukewhiting Jul 31, 2025
3569947
PR Changes
lukewhiting Aug 1, 2025
34e944e
Use of failure store is now wrapped in cluster feature check
lukewhiting Aug 12, 2025
3e82b35
Merge branch 'main' of github.com:elastic/elasticsearch into es-11941…
lukewhiting Aug 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/132011.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 132011
summary: Restrict Indexing To Child Streams When Streams Is Enabled
area: Data streams
type: enhancement
issues: []
4 changes: 3 additions & 1 deletion modules/streams/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ esplugin {

restResources {
restApi {
include '_common', 'streams'
include '_common', 'streams', "bulk", "index", "ingest", "indices", "delete_by_query", "search"
}
}

Expand All @@ -38,4 +38,6 @@ artifacts {

dependencies {
testImplementation project(path: ':test:test-clusters')
clusterModules project(':modules:ingest-common')
clusterModules project(':modules:reindex')
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ public static Iterable<Object[]> parameters() throws Exception {
}

@ClassRule
public static ElasticsearchCluster cluster = ElasticsearchCluster.local().module("streams").feature(FeatureFlag.LOGS_STREAM).build();
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.module("streams")
.module("ingest-common")
.module("reindex")
.feature(FeatureFlag.LOGS_STREAM)
.build();

@Override
protected String getTestRestCluster() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
---
"Check User Can't Write To Substream Directly":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
streams.status: { }
- is_true: logs.enabled

- do:
bulk:
body: |
{ "index": { "_index": "logs.foo" } }
{ "foo": "bar" }
- match: { errors: true }
- match: { items.0.index.status: 400 }
- match: { items.0.index.error.type: "illegal_argument_exception" }
- match: { items.0.index.error.reason: "Direct writes to child streams are prohibited. Index directly into the [logs] stream instead" }

---
"Check User Can't Write To Substream Directly With Single Doc":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
streams.status: { }
- is_true: logs.enabled

- do:
catch: bad_request
index:
index: logs.foo
id: "1"
body:
foo: bar
- match: { error.type: "illegal_argument_exception" }
- match: { error.reason: "Direct writes to child streams are prohibited. Index directly into the [logs] stream instead" }

---
"Check Bulk Index With Reroute Processor To Substream Is Rejected":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
streams.status: { }
- is_true: logs.enabled

- do:
ingest.put_pipeline:
id: "reroute-to-logs-foo"
body:
processors:
- reroute:
destination: "logs.foo"
- do:
indices.create:
index: "bad-index"
body:
settings:
index.default_pipeline: "reroute-to-logs-foo"
- do:
bulk:
body: |
{ "index": { "_index": "bad-index" } }
{ "foo": "bar" }
- match: { errors: true }
- match: { items.0.index.status: 400 }
- match: { items.0.index.error.type: "illegal_argument_exception" }
- match: { items.0.index.error.reason: "Pipeline [reroute-to-logs-foo] can't change the target index (from [bad-index] to [logs] child stream [logs.foo]) History: [bad-index]" }

---
"Check Bulk Index With Script Processor To Substream Is Rejected":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
streams.status: { }
- is_true: logs.enabled

- do:
ingest.put_pipeline:
id: "script-to-logs-foo"
body:
processors:
- script:
source: "ctx._index = 'logs.foo'"
- do:
indices.create:
index: "bad-index-script"
body:
settings:
index.default_pipeline: "script-to-logs-foo"
- do:
bulk:
body: |
{ "index": { "_index": "bad-index-script" } }
{ "foo": "bar" }
- match: { errors: true }
- match: { items.0.index.status: 400 }
- match: { items.0.index.error.type: "illegal_argument_exception" }
- match: { items.0.index.error.reason: "Pipeline [script-to-logs-foo] can't change the target index (from [bad-index-script] to [logs] child stream [logs.foo]) History: [bad-index-script]" }

---
"Check Delete By Query Directly On Substream After Reroute Succeeds":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
streams.status: { }
- is_true: logs.enabled

- do:
ingest.put_pipeline:
id: "reroute-to-logs-foo-success"
body:
processors:
- reroute:
destination: "logs.foo"
- do:
indices.create:
index: "logs"
body:
settings:
index.default_pipeline: "reroute-to-logs-foo-success"
- do:
bulk:
refresh: true
body: |
{ "index": { "_index": "logs" } }
{ "foo": "bar", "baz": "qux" }
- match: { errors: false }
- match: { items.0.index.status: 201 }

- do:
delete_by_query:
index: logs.foo
refresh: true
body:
query:
match:
foo: "bar"
- match: { deleted: 1 }
- match: { total: 1 }

- do:
search:
index: logs.foo
body:
query:
match_all: {}
- match: { hits.total.value: 0 }
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -105,47 +106,87 @@ BulkRequest getBulkRequest() {
* If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the
* updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest
* service with the results returned from running the remaining write operations.
* <br>
* Use this method when you want the ingest time to be taken from the actual {@link BulkResponse} such as if you are wrapping
* a response multiple times and wish to preserve an already calculated ingest time.
*
* @param ingestTookInMillis Time elapsed for ingestion to be passed to final result.
* @param actionListener The action listener that expects the final bulk response.
* @return An action listener that combines ingest failure results with the results from writing the remaining documents.
* @param actionListener the listener to wrap
* @return a wrapped listener that merges ingest and bulk results, or the original listener if no items were dropped/failed
*/
ActionListener<BulkResponse> wrapActionListenerIfNeeded(ActionListener<BulkResponse> actionListener) {
if (itemResponses.isEmpty()) {
return actionListener;
} else {
return doWrapActionListenerIfNeeded(BulkResponse::getIngestTookInMillis, actionListener);
}
}

/**
* If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the
* updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest
* service with the results returned from running the remaining write operations.
* <br>
* This variant is used when the ingest time is already known and should be explicitly set in the final response,
* rather than extracted from the {@link BulkResponse}.
*
* @param ingestTookInMillis the ingest time in milliseconds to use in the final response
* @param actionListener the listener to wrap
* @return a wrapped listener that merges ingest and bulk results, or the original listener if no items were dropped/failed
*/
ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener<BulkResponse> actionListener) {
if (itemResponses.isEmpty()) {
return actionListener.map(
response -> new BulkResponse(
response.getItems(),
response.getTook().getMillis(),
response.getTookInMillis(),
ingestTookInMillis,
response.getIncrementalState()
)
);
} else {
return actionListener.map(response -> {
// these items are the responses from the subsequent bulk request, their 'slots'
// are not correct for this response we're building
final BulkItemResponse[] bulkResponses = response.getItems();
return doWrapActionListenerIfNeeded(ignoredResponse -> ingestTookInMillis, actionListener);
}
}

/**
* If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the
* updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest
* service with the results returned from running the remaining write operations.
*
* @param ingestTimeProviderFunction A function to provide the ingest time taken for this response
* @param actionListener The action listener that expects the final bulk response.
* @return An action listener that combines ingest failure results with the results from writing the remaining documents.
*/
private ActionListener<BulkResponse> doWrapActionListenerIfNeeded(
Function<BulkResponse, Long> ingestTimeProviderFunction,
ActionListener<BulkResponse> actionListener
) {
return actionListener.map(response -> {
// these items are the responses from the subsequent bulk request, their 'slots'
// are not correct for this response we're building
final BulkItemResponse[] bulkResponses = response.getItems();

final BulkItemResponse[] allResponses = new BulkItemResponse[bulkResponses.length + itemResponses.size()];
final BulkItemResponse[] allResponses = new BulkItemResponse[bulkResponses.length + itemResponses.size()];

// the item responses are from the original request, so their slots are correct.
// these are the responses for requests that failed early and were not passed on to the subsequent bulk.
for (BulkItemResponse item : itemResponses) {
allResponses[item.getItemId()] = item;
}
// the item responses are from the original request, so their slots are correct.
// these are the responses for requests that failed early and were not passed on to the subsequent bulk.
for (BulkItemResponse item : itemResponses) {
allResponses[item.getItemId()] = item;
}

// use the original slots for the responses from the bulk
for (int i = 0; i < bulkResponses.length; i++) {
allResponses[originalSlots.get(i)] = bulkResponses[i];
}
// use the original slots for the responses from the bulk
for (int i = 0; i < bulkResponses.length; i++) {
allResponses[originalSlots.get(i)] = bulkResponses[i];
}

if (Assertions.ENABLED) {
assertResponsesAreCorrect(bulkResponses, allResponses);
}
if (Assertions.ENABLED) {
assertResponsesAreCorrect(bulkResponses, allResponses);
}

return new BulkResponse(allResponses, response.getTook().getMillis(), ingestTookInMillis, response.getIncrementalState());
});
}
var ingestTookInMillis = ingestTimeProviderFunction.apply(response);

return new BulkResponse(allResponses, response.getTook().getMillis(), ingestTookInMillis, response.getIncrementalState());
});
}

private void assertResponsesAreCorrect(BulkItemResponse[] bulkResponses, BulkItemResponse[] allResponses) {
Expand Down
Loading