Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
2f0dd61
Add support for passing down if restricted params are used
lukewhiting Aug 14, 2025
2925412
Add restricted param check to single doc endpoint
lukewhiting Aug 15, 2025
09a1372
Fixup YAML test
lukewhiting Aug 18, 2025
33225ed
Add YAML tests for new function
lukewhiting Aug 18, 2025
e4fbe38
Switch to more descriptive catch check
lukewhiting Aug 19, 2025
960097e
Additional tests and fixes for those tests
lukewhiting Aug 20, 2025
2429ec0
Update docs/changelog/132967.yaml
lukewhiting Aug 20, 2025
1aed1f6
Apply suggestions from code review
lukewhiting Aug 20, 2025
6aecacc
Move check lower down the stack and preserve param names for better e…
lukewhiting Aug 21, 2025
44b1a67
Merge remote-tracking branch 'origin/es-11331-streams-params-restrict…
lukewhiting Aug 21, 2025
01ffed6
Fix NPE due to missing params set during serialization
lukewhiting Aug 21, 2025
6d5fa51
Merge branch 'main' of github.com:elastic/elasticsearch into es-11331…
lukewhiting Aug 21, 2025
cdc4829
Add additional allowed params following product feedback
lukewhiting Aug 27, 2025
2642075
Merge branch 'main' of github.com:elastic/elasticsearch into es-11331…
lukewhiting Aug 27, 2025
a98f1ea
Merge branch 'main' of github.com:elastic/elasticsearch into es-11331…
lukewhiting Aug 29, 2025
1174387
Merge branch 'main' into es-11331-streams-params-restriction
masseyke Aug 29, 2025
1803656
Merge branch 'main' of github.com:elastic/elasticsearch into es-11331…
lukewhiting Sep 1, 2025
08b088a
Merge branch 'main' of github.com:elastic/elasticsearch into es-11331…
lukewhiting Sep 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/132967.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 132967
summary: ES-11331 streams params restriction
area: Data streams
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,16 @@ teardown:
index.default_pipeline: "reroute-to-logs-foo-success"
- do:
bulk:
refresh: true
body: |
{ "index": { "_index": "logs" } }
{ "foo": "bar", "baz": "qux" }
- match: { errors: false }
- match: { items.0.index.status: 201 }

- do:
indices.refresh:
index: logs.foo

- do:
delete_by_query:
index: logs.foo
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
---
teardown:
- do:
streams.logs_disable: { }

---
"Check User Can't Use Restricted Params On Logs Endpoint":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
streams.status: { }
- is_true: logs.enabled

- do:
bulk:
refresh: true
body: |
{ "index": { "_index": "logs"} }
{ "foo": "bar" }
{ "index": { "_index": "not-logs" } }
{ "foo": "bar" }
- match: { errors: true }
- match: { items.0.index.status: 400 }
- match: { items.0.index.error.type: "illegal_argument_exception" }
- match: { items.0.index.error.reason: "When writing to a stream, only the following parameters are allowed: [index,op_type,error_trace,timeout]" }
- match: { items.1.index.status: 201 }

---
"Check User Can't Use Restricted Params On Logs Endpoint - Single Doc":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
streams.status: { }
- is_true: logs.enabled

- do:
index:
refresh: true
index: logs
body: { "foo": "bar" }
catch: '/When writing to a stream, only the following parameters are allowed: \[index,op_type,error_trace,timeout\]/'

---
"Allowed Params Only - Bulk Should Succeed":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
bulk:
error_trace: true
timeout: "1m"
body: |
{ "index": { "_index": "logs"} }
{ "foo": "bar" }
- match: { errors: false }
- match: { items.0.index.status: 201 }

---
"Allowed Params Only - Single Doc Should Succeed":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
index:
index: logs
error_trace: true
timeout: "1m"
body: |
{ "foo": "bar" }
- match: { _index: "logs" }
- match: { result: "created" }

---
"No Params - Bulk Should Succeed":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
bulk:
body: |
{ "index": { "_index": "logs"} }
{ "foo": "bar" }
- match: { errors: false }
- match: { items.0.index.status: 201 }

---
"No Params - Single Doc Should Succeed":
- do:
streams.logs_enable: { }
- is_true: acknowledged
-
do:
index:
index: logs
body: { "foo": "bar" }
- match: { _index: "logs" }
- match: { result: "created" }

---
"Mixed Allowed and Disallowed Params - Bulk Should Fail":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
bulk:
error_trace: true
timeout: "1m"
routing: "custom-routing"
refresh: true
body: |
{ "index": { "_index": "logs"} }
{ "foo": "bar" }
- match: { errors: true }
- match: { items.0.index.status: 400 }
- match: { items.0.index.error.type: "illegal_argument_exception" }
- match: { items.0.index.error.reason: "When writing to a stream, only the following parameters are allowed: [index,op_type,error_trace,timeout]" }

---
"Mixed Allowed and Disallowed Params - Single Doc Should Fail":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
index:
index: logs
error_trace: true
timeout: "1m"
routing: "custom-routing"
refresh: true
body: { "foo": "bar" }
catch: '/When writing to a stream, only the following parameters are allowed: \[index,op_type,error_trace,timeout\]/'

---
"Multiple Disallowed Params - Bulk Should Fail":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
bulk:
routing: "custom-routing"
wait_for_active_shards: "2"
refresh: true
body: |
{ "index": { "_index": "logs"} }
{ "foo": "bar" }
- match: { errors: true }
- match: { items.0.index.status: 400 }
- match: { items.0.index.error.type: "illegal_argument_exception" }
- match: { items.0.index.error.reason: "When writing to a stream, only the following parameters are allowed: [index,op_type,error_trace,timeout]" }

---
"Multiple Disallowed Params - Single Doc Should Fail":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
index:
index: logs
routing: "custom-routing"
pipeline: "my-pipeline"
wait_for_active_shards: "2"
refresh: true
body: { "foo": "bar" }
catch: '/When writing to a stream, only the following parameters are allowed: \[index,op_type,error_trace,timeout\]/'
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_LOOKUP_JOIN_ON_MANY_FIELDS = def(9_139_0_00);
public static final TransportVersion SIMULATE_INGEST_EFFECTIVE_MAPPING = def(9_140_0_00);
public static final TransportVersion RESOLVE_INDEX_MODE_ADDED = def(9_141_0_00);
public static final TransportVersion STREAMS_ENDPOINT_PARAM_RESTRICTIONS = def(9_142_00_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ public class ActionModule extends AbstractModule {
private final RequestValidators<IndicesAliasesRequest> indicesAliasesRequestRequestValidators;
private final ReservedClusterStateService reservedClusterStateService;
private final RestExtension restExtension;
private final ClusterService clusterService;

public ActionModule(
Settings settings,
Expand Down Expand Up @@ -534,6 +535,7 @@ public ActionModule(
reservedProjectStateHandlers
);
this.restExtension = restExtension;
this.clusterService = clusterService;
}

private static <T> T getRestServerComponent(
Expand Down Expand Up @@ -927,9 +929,9 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster, Predicate<
registerHandler.accept(new RestResolveClusterAction());
registerHandler.accept(new RestResolveIndexAction());

registerHandler.accept(new RestIndexAction());
registerHandler.accept(new CreateHandler());
registerHandler.accept(new AutoIdHandler());
registerHandler.accept(new RestIndexAction(clusterService, projectIdResolver));
registerHandler.accept(new CreateHandler(clusterService, projectIdResolver));
registerHandler.accept(new AutoIdHandler(clusterService, projectIdResolver));
registerHandler.accept(new RestGetAction());
registerHandler.accept(new RestGetSourceAction());
registerHandler.accept(new RestMultiGetAction(settings));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class BulkRequest extends LegacyActionRequest
private Boolean globalRequireAlias;
private Boolean globalRequireDatsStream;
private boolean includeSourceOnError = true;
private boolean streamsRestrictedParamsUsed = false;

private long sizeInBytes = 0;

Expand All @@ -107,7 +108,10 @@ public BulkRequest(StreamInput in) throws IOException {
}
if (in.getTransportVersion().onOrAfter(TransportVersions.INGEST_REQUEST_INCLUDE_SOURCE_ON_ERROR)) {
includeSourceOnError = in.readBoolean();
} // else default value is true
}
if (in.getTransportVersion().onOrAfter(TransportVersions.STREAMS_ENDPOINT_PARAM_RESTRICTIONS)) {
streamsRestrictedParamsUsed = in.readBoolean();
}
}

public BulkRequest(@Nullable String globalIndex) {
Expand Down Expand Up @@ -474,6 +478,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.INGEST_REQUEST_INCLUDE_SOURCE_ON_ERROR)) {
out.writeBoolean(includeSourceOnError);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.STREAMS_ENDPOINT_PARAM_RESTRICTIONS)) {
out.writeBoolean(streamsRestrictedParamsUsed);
}
}

@Override
Expand Down Expand Up @@ -516,6 +523,14 @@ public boolean isSimulated() {
return false; // Always false, but may be overridden by a subclass
}

public boolean streamsRestrictedParamsUsed() {
return streamsRestrictedParamsUsed;
}

public void streamsRestrictedParamsUsed(boolean streamsRestrictedParamsUsed) {
this.streamsRestrictedParamsUsed = streamsRestrictedParamsUsed;
}

/*
* Returns any component template substitutions that are to be used as part of this bulk request. We would likely only have
* substitutions in the event of a simulated request.
Expand Down Expand Up @@ -554,6 +569,7 @@ public BulkRequest shallowClone() {
bulkRequest.routing(routing());
bulkRequest.requireAlias(requireAlias());
bulkRequest.requireDataStream(requireDataStream());
bulkRequest.streamsRestrictedParamsUsed(streamsRestrictedParamsUsed());
return bulkRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,25 @@ public IncrementalBulkService(Client client, IndexingPressure indexingPressure,

public Handler newBulkRequest() {
ensureEnabled();
return newBulkRequest(null, null, null);
return newBulkRequest(null, null, null, false);
}

public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
public Handler newBulkRequest(
@Nullable String waitForActiveShards,
@Nullable TimeValue timeout,
@Nullable String refresh,
boolean usesStreamsRestrictedParams
) {
ensureEnabled();
return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh, chunkWaitTimeMillisHistogram);
return new Handler(
client,
indexingPressure,
waitForActiveShards,
timeout,
refresh,
chunkWaitTimeMillisHistogram,
usesStreamsRestrictedParams
);
}

private void ensureEnabled() {
Expand Down Expand Up @@ -105,6 +118,7 @@ public static class Handler implements Releasable {
private final Client client;
private final ActiveShardCount waitForActiveShards;
private final TimeValue timeout;
private final boolean usesStreamsRestrictedParams;
private final String refresh;

private final ArrayList<Releasable> releasables = new ArrayList<>(4);
Expand All @@ -125,12 +139,14 @@ protected Handler(
@Nullable String waitForActiveShards,
@Nullable TimeValue timeout,
@Nullable String refresh,
LongHistogram chunkWaitTimeMillisHistogram
LongHistogram chunkWaitTimeMillisHistogram,
boolean usesStreamsRestrictedParams
) {
this.client = client;
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
this.timeout = timeout;
this.refresh = refresh;
this.usesStreamsRestrictedParams = usesStreamsRestrictedParams;
this.incrementalOperation = indexingPressure.startIncrementalCoordinating(0, 0, false);
this.chunkWaitTimeMillisHistogram = chunkWaitTimeMillisHistogram;
createNewBulkRequest(EMPTY_STATE);
Expand Down Expand Up @@ -310,6 +326,7 @@ private void createNewBulkRequest(BulkRequest.IncrementalState incrementalState)
if (refresh != null) {
bulkRequest.setRefreshPolicy(refresh);
}
bulkRequest.streamsRestrictedParamsUsed(usesStreamsRestrictedParams);
}
}
}
Loading