From 2f0dd61b6e516643629201cefdd8c0a87982c88a Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Thu, 14 Aug 2025 13:34:26 +0100 Subject: [PATCH 01/11] Add support for passing down if restricted params are used # Conflicts: # server/src/main/java/org/elasticsearch/TransportVersions.java # Conflicts: # server/src/main/java/org/elasticsearch/TransportVersions.java --- .../org/elasticsearch/TransportVersions.java | 1 + .../action/bulk/BulkRequest.java | 18 ++++++++++++- .../action/bulk/IncrementalBulkService.java | 25 ++++++++++++++++--- .../rest/action/document/RestBulkAction.java | 13 +++++++++- .../action/document/RestBulkActionTests.java | 3 ++- 5 files changed, 53 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 5a026b6e1660b..c99ce67bc68dd 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -365,6 +365,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_LOOKUP_JOIN_ON_MANY_FIELDS = def(9_139_0_00); public static final TransportVersion SIMULATE_INGEST_EFFECTIVE_MAPPING = def(9_140_0_00); public static final TransportVersion RESOLVE_INDEX_MODE_ADDED = def(9_141_0_00); + public static final TransportVersion STREAMS_ENDPOINT_PARAM_RESTRICTIONS = def(9_142_00_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 0acc6f955f5b6..883db3189ba00 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -86,6 +86,7 @@ public class BulkRequest extends LegacyActionRequest private Boolean globalRequireAlias; private Boolean globalRequireDatsStream; private boolean includeSourceOnError = true; + private boolean streamsRestrictedParamsUsed = false; private long sizeInBytes = 0; @@ -107,7 +108,10 @@ public BulkRequest(StreamInput in) throws IOException { } if (in.getTransportVersion().onOrAfter(TransportVersions.INGEST_REQUEST_INCLUDE_SOURCE_ON_ERROR)) { includeSourceOnError = in.readBoolean(); - } // else default value is true + } + if (in.getTransportVersion().onOrAfter(TransportVersions.STREAMS_ENDPOINT_PARAM_RESTRICTIONS)) { + streamsRestrictedParamsUsed = in.readBoolean(); + } } public BulkRequest(@Nullable String globalIndex) { @@ -474,6 +478,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.INGEST_REQUEST_INCLUDE_SOURCE_ON_ERROR)) { out.writeBoolean(includeSourceOnError); } + if (out.getTransportVersion().onOrAfter(TransportVersions.STREAMS_ENDPOINT_PARAM_RESTRICTIONS)) { + out.writeBoolean(streamsRestrictedParamsUsed); + } } @Override @@ -516,6 +523,14 @@ public boolean isSimulated() { return false; // Always false, but may be overridden by a subclass } + public boolean streamsRestrictedParamsUsed() { + return streamsRestrictedParamsUsed; + } + + public void streamsRestrictedParamsUsed(boolean streamsRestrictedParamsUsed) { + this.streamsRestrictedParamsUsed = streamsRestrictedParamsUsed; + } + /* * Returns any component template substitutions that are to be used as part of this bulk request. We would likely only have * substitutions in the event of a simulated request. @@ -554,6 +569,7 @@ public BulkRequest shallowClone() { bulkRequest.routing(routing()); bulkRequest.requireAlias(requireAlias()); bulkRequest.requireDataStream(requireDataStream()); + bulkRequest.streamsRestrictedParamsUsed(streamsRestrictedParamsUsed()); return bulkRequest; } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index b2b4404f48d47..778c97b675120 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -62,12 +62,25 @@ public IncrementalBulkService(Client client, IndexingPressure indexingPressure, public Handler newBulkRequest() { ensureEnabled(); - return newBulkRequest(null, null, null); + return newBulkRequest(null, null, null, false); } - public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) { + public Handler newBulkRequest( + @Nullable String waitForActiveShards, + @Nullable TimeValue timeout, + @Nullable String refresh, + boolean usesStreamsRestrictedParams + ) { ensureEnabled(); - return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh, chunkWaitTimeMillisHistogram); + return new Handler( + client, + indexingPressure, + waitForActiveShards, + timeout, + refresh, + chunkWaitTimeMillisHistogram, + usesStreamsRestrictedParams + ); } private void ensureEnabled() { @@ -105,6 +118,7 @@ public static class Handler implements Releasable { private final Client client; private final ActiveShardCount waitForActiveShards; private final TimeValue timeout; + private final boolean usesStreamsRestrictedParams; private final String refresh; private final ArrayList releasables = new ArrayList<>(4); @@ -125,12 +139,14 @@ protected Handler( @Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh, - LongHistogram chunkWaitTimeMillisHistogram + LongHistogram chunkWaitTimeMillisHistogram, + boolean usesStreamsRestrictedParams ) { this.client = client; this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null; this.timeout = timeout; this.refresh = refresh; + this.usesStreamsRestrictedParams = usesStreamsRestrictedParams; this.incrementalOperation = indexingPressure.startIncrementalCoordinating(0, 0, false); this.chunkWaitTimeMillisHistogram = chunkWaitTimeMillisHistogram; createNewBulkRequest(EMPTY_STATE); @@ -310,6 +326,7 @@ private void createNewBulkRequest(BulkRequest.IncrementalState incrementalState) if (refresh != null) { bulkRequest.setRefreshPolicy(refresh); } + bulkRequest.streamsRestrictedParamsUsed(usesStreamsRestrictedParams); } } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index f638367b85e76..1710a548a1140 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; @@ -62,6 +63,7 @@ public class RestBulkAction extends BaseRestHandler { public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated."; public static final String FAILURE_STORE_STATUS_CAPABILITY = "failure_store_status"; + public static final Set STREAMS_ALLOWED_PARAMS = Set.of("timeout"); private final boolean allowExplicitIndex; private final IncrementalBulkService bulkHandler; @@ -113,6 +115,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT)); bulkRequest.setRefreshPolicy(request.param("refresh")); bulkRequest.includeSourceOnError(RestUtils.getIncludeSourceOnError(request)); + bulkRequest.streamsRestrictedParamsUsed(usesStreamsRestrictedParams(request)); ReleasableBytesReference content = request.requiredContent(); try { @@ -140,10 +143,18 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC String waitForActiveShards = request.param("wait_for_active_shards"); TimeValue timeout = request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT); String refresh = request.param("refresh"); - return new ChunkHandler(allowExplicitIndex, request, () -> bulkHandler.newBulkRequest(waitForActiveShards, timeout, refresh)); + return new ChunkHandler( + allowExplicitIndex, + request, + () -> bulkHandler.newBulkRequest(waitForActiveShards, timeout, refresh, usesStreamsRestrictedParams(request)) + ); } } + private boolean usesStreamsRestrictedParams(RestRequest request) { + return Sets.difference(request.params().keySet(), STREAMS_ALLOWED_PARAMS).isEmpty() == false; + } + private static Exception parseFailureException(Exception e) { if (e instanceof IllegalArgumentException) { return e; diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java index f4d601c7ad3b4..43fcd3d6344d3 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java @@ -240,7 +240,8 @@ public void next() { null, null, null, - MeterRegistry.NOOP.getLongHistogram(IncrementalBulkService.CHUNK_WAIT_TIME_HISTOGRAM_NAME) + MeterRegistry.NOOP.getLongHistogram(IncrementalBulkService.CHUNK_WAIT_TIME_HISTOGRAM_NAME), + false ) { @Override From 2925412695839d3563b192e197702cda2c4e41d2 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Fri, 15 Aug 2025 09:39:36 +0100 Subject: [PATCH 02/11] Add restricted param check to single doc endpoint --- .../elasticsearch/action/ActionModule.java | 8 ++-- .../bulk/TransportAbstractBulkAction.java | 42 ++++++++++++++---- .../rest/action/document/RestIndexAction.java | 43 ++++++++++++++++++- .../action/document/RestIndexActionTests.java | 8 ++-- 4 files changed, 83 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 7a3bbfdf2609d..211adffba5ec8 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -459,6 +459,7 @@ public class ActionModule extends AbstractModule { private final RequestValidators indicesAliasesRequestRequestValidators; private final ReservedClusterStateService reservedClusterStateService; private final RestExtension restExtension; + private final ClusterService clusterService; public ActionModule( Settings settings, @@ -534,6 +535,7 @@ public ActionModule( reservedProjectStateHandlers ); this.restExtension = restExtension; + this.clusterService = clusterService; } private static T getRestServerComponent( @@ -927,9 +929,9 @@ public void initRestHandlers(Supplier nodesInCluster, Predicate< registerHandler.accept(new RestResolveClusterAction()); registerHandler.accept(new RestResolveIndexAction()); - registerHandler.accept(new RestIndexAction()); - registerHandler.accept(new CreateHandler()); - registerHandler.accept(new AutoIdHandler()); + registerHandler.accept(new RestIndexAction(clusterService, projectIdResolver)); + registerHandler.accept(new CreateHandler(clusterService, projectIdResolver)); + registerHandler.accept(new AutoIdHandler(clusterService, projectIdResolver)); registerHandler.accept(new RestGetAction()); registerHandler.accept(new RestGetSourceAction()); registerHandler.accept(new RestMultiGetAction(settings)); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index 12a583251516a..6c111d168136a 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -42,6 +42,7 @@ import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.node.NodeClosedException; +import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -412,14 +413,43 @@ private void applyPipelinesAndDoInternalExecute( while (bulkRequestModifier.hasNext()) { req = bulkRequestModifier.next(); i++; + doStreamsChecks(bulkRequest, projectMetadata, req, bulkRequestModifier, i); + } + + var wrappedListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener); - for (StreamType streamType : StreamType.getEnabledStreamTypesForProject(projectMetadata)) { - if (req instanceof IndexRequest ir && streamType.matchesStreamPrefix(req.index()) && ir.isPipelineResolved() == false) { - IllegalArgumentException e = new IllegalArgumentException( + if (applyPipelines(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener) == false) { + doInternalExecute(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener, relativeStartTimeNanos); + } + } + + private void doStreamsChecks( + BulkRequest bulkRequest, + ProjectMetadata projectMetadata, + DocWriteRequest req, + BulkRequestModifier bulkRequestModifier, + int i + ) { + for (StreamType streamType : StreamType.getEnabledStreamTypesForProject(projectMetadata)) { + if (req instanceof IndexRequest ir && ir.isPipelineResolved() == false) { + IllegalArgumentException e = null; + if (streamType.matchesStreamPrefix(req.index())) { + e = new IllegalArgumentException( "Direct writes to child streams are prohibited. Index directly into the [" + streamType.getStreamName() + "] stream instead" ); + } + + if (e == null && bulkRequest.streamsRestrictedParamsUsed() && req.index().equals(streamType.getStreamName())) { + e = new IllegalArgumentException( + "When writing to a stream, only the following parameters are allowed: [" + + String.join(",", RestBulkAction.STREAMS_ALLOWED_PARAMS) + + "]" + ); + } + + if (e != null) { Boolean failureStoreEnabled = resolveFailureStore(req.index(), projectMetadata, threadPool.absoluteTimeInMillis()); if (featureService.clusterHasFeature(clusterService.state(), DataStream.DATA_STREAM_FAILURE_STORE_FEATURE)) { @@ -438,12 +468,6 @@ private void applyPipelinesAndDoInternalExecute( } } } - - var wrappedListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener); - - if (applyPipelines(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener) == false) { - doInternalExecute(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener, relativeStartTimeNanos); - } } /** diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java index a157ab1b95065..dcf7aa9cbdee6 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java @@ -15,7 +15,12 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.project.ProjectIdResolver; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.streams.StreamType; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.VersionType; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; @@ -40,6 +45,13 @@ public class RestIndexAction extends BaseRestHandler { + "index requests is deprecated, use the typeless endpoints instead (/{index}/_doc/{id}, /{index}/_doc, " + "or /{index}/_create/{id})."; private final Set capabilities = Set.of(FAILURE_STORE_STATUS_CAPABILITY); + private final ClusterService clusterService; + private final ProjectIdResolver projectIdResolver; + + public RestIndexAction(ClusterService clusterService, ProjectIdResolver projectIdResolver) { + this.clusterService = clusterService; + this.projectIdResolver = projectIdResolver; + } @Override public List routes() { @@ -54,6 +66,10 @@ public String getName() { @ServerlessScope(Scope.PUBLIC) public static final class CreateHandler extends RestIndexAction { + public CreateHandler(ClusterService clusterService, ProjectIdResolver projectIdResolver) { + super(clusterService, projectIdResolver); + } + @Override public String getName() { return "document_create_action"; @@ -81,7 +97,9 @@ static void validateOpType(String opType) { @ServerlessScope(Scope.PUBLIC) public static final class AutoIdHandler extends RestIndexAction { - public AutoIdHandler() {} + public AutoIdHandler(ClusterService clusterService, ProjectIdResolver projectIdResolver) { + super(clusterService, projectIdResolver); + } @Override public String getName() { @@ -105,7 +123,28 @@ public RestChannelConsumer prepareRequest(RestRequest request, final NodeClient @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { ReleasableBytesReference source = request.requiredContent(); - IndexRequest indexRequest = new IndexRequest(request.param("index")); + + String index = request.param("index"); + ProjectMetadata projectMetadata = null; + + for (StreamType streamType : StreamType.values()) { + if (index.equals(streamType.getStreamName())) { + if (projectMetadata == null) { + projectMetadata = clusterService.state().projectState(projectIdResolver.getProjectId()).metadata(); + } + + if (streamType.streamTypeIsEnabled(projectMetadata) + && Sets.difference(request.params().keySet(), RestBulkAction.STREAMS_ALLOWED_PARAMS).isEmpty() == false) { + throw new IllegalArgumentException( + "When writing to a stream, only the following parameters are allowed: [" + + String.join(",", RestBulkAction.STREAMS_ALLOWED_PARAMS) + + "]" + ); + } + } + } + + IndexRequest indexRequest = new IndexRequest(index); indexRequest.id(request.param("id")); indexRequest.routing(request.param("routing")); indexRequest.setPipeline(request.param("pipeline")); diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestIndexActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestIndexActionTests.java index c97160427e59d..46d9fe7982041 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestIndexActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestIndexActionTests.java @@ -37,13 +37,13 @@ public final class RestIndexActionTests extends RestActionTestCase { @Before public void setUpAction() { - controller().registerHandler(new RestIndexAction()); - controller().registerHandler(new CreateHandler()); - controller().registerHandler(new AutoIdHandler()); + controller().registerHandler(new RestIndexAction(null, null)); + controller().registerHandler(new CreateHandler(null, null)); + controller().registerHandler(new AutoIdHandler(null, null)); } public void testCreateOpTypeValidation() { - RestIndexAction.CreateHandler create = new CreateHandler(); + RestIndexAction.CreateHandler create = new CreateHandler(null, null); String opType = randomFrom("CREATE", null); CreateHandler.validateOpType(opType); From 09a137251c87c33a9e7c2a691f1e2a8643ca5100 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Mon, 18 Aug 2025 14:15:31 +0100 Subject: [PATCH 03/11] Fixup YAML test --- .../test/streams/logs/20_substream_restrictions.yml | 5 ++++- .../elasticsearch/rest/action/document/RestBulkAction.java | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml index d11245a7e7a5e..cc19b71d5ed22 100644 --- a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml @@ -142,13 +142,16 @@ teardown: index.default_pipeline: "reroute-to-logs-foo-success" - do: bulk: - refresh: true body: | { "index": { "_index": "logs" } } { "foo": "bar", "baz": "qux" } - match: { errors: false } - match: { items.0.index.status: 201 } + - do: + indices.refresh: + index: logs.foo + - do: delete_by_query: index: logs.foo diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 1710a548a1140..d82c00b0f40ea 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -63,7 +63,7 @@ public class RestBulkAction extends BaseRestHandler { public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated."; public static final String FAILURE_STORE_STATUS_CAPABILITY = "failure_store_status"; - public static final Set STREAMS_ALLOWED_PARAMS = Set.of("timeout"); + public static final Set STREAMS_ALLOWED_PARAMS = Set.of("timeout", "error_trace"); private final boolean allowExplicitIndex; private final IncrementalBulkService bulkHandler; From 33225ed0b522f9d12f67b3a75da4083f6487dfb5 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Mon, 18 Aug 2025 15:11:50 +0100 Subject: [PATCH 04/11] Add YAML tests for new function --- .../streams/logs/30_param_restrictions.yml | 45 +++++++++++++++++++ .../rest/action/document/RestBulkAction.java | 8 +++- 2 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml new file mode 100644 index 0000000000000..e4b4162ea4d46 --- /dev/null +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml @@ -0,0 +1,45 @@ +--- +teardown: + - do: + streams.logs_disable: { } + +--- +"Check User Can't Use Restricted Params On Logs Endpoint": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + bulk: + refresh: true + body: | + { "index": { "_index": "logs"} } + { "foo": "bar" } + { "index": { "_index": "not-logs" } } + { "foo": "bar" } + - match: { errors: true } + - match: { items.0.index.status: 400 } + - match: { items.0.index.error.type: "illegal_argument_exception" } + - match: { items.0.index.error.reason: "When writing to a stream, only the following parameters are allowed: [error_trace,timeout]" } + - match: { items.1.index.status: 201 } + +--- +"Check User Can't Use Restricted Params On Logs Endpoint - Single Doc": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + index: + refresh: true + index: logs + body: { "foo": "bar" } + catch: bad_request diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index d82c00b0f40ea..5a9aafb963e27 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -63,7 +64,12 @@ public class RestBulkAction extends BaseRestHandler { public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated."; public static final String FAILURE_STORE_STATUS_CAPABILITY = "failure_store_status"; - public static final Set STREAMS_ALLOWED_PARAMS = Set.of("timeout", "error_trace"); + public static final LinkedHashSet STREAMS_ALLOWED_PARAMS = new LinkedHashSet<>(2) { + { + add("error_trace"); + add("timeout"); + } + }; private final boolean allowExplicitIndex; private final IncrementalBulkService bulkHandler; From e4fbe3800f2f9d63c209cc597049a5a4a23c1b61 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 19 Aug 2025 10:15:00 +0100 Subject: [PATCH 05/11] Switch to more descriptive catch check --- .../rest-api-spec/test/streams/logs/30_param_restrictions.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml index e4b4162ea4d46..d154c3869fa24 100644 --- a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml @@ -42,4 +42,4 @@ teardown: refresh: true index: logs body: { "foo": "bar" } - catch: bad_request + catch: '/When writing to a stream, only the following parameters are allowed: \[error_trace,timeout\]/' From 960097e2cf92f6b3411c6e98e6060c8732f67274 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 20 Aug 2025 13:38:55 +0100 Subject: [PATCH 06/11] Additional tests and fixes for those tests --- .../streams/logs/30_param_restrictions.yml | 134 +++++++++++++++++- .../rest/action/document/RestBulkAction.java | 4 +- 2 files changed, 135 insertions(+), 3 deletions(-) diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml index d154c3869fa24..a215de168cf41 100644 --- a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml @@ -24,7 +24,7 @@ teardown: - match: { errors: true } - match: { items.0.index.status: 400 } - match: { items.0.index.error.type: "illegal_argument_exception" } - - match: { items.0.index.error.reason: "When writing to a stream, only the following parameters are allowed: [error_trace,timeout]" } + - match: { items.0.index.error.reason: "When writing to a stream, only the following parameters are allowed: [index,op_type,error_trace,timeout]" } - match: { items.1.index.status: 201 } --- @@ -42,4 +42,134 @@ teardown: refresh: true index: logs body: { "foo": "bar" } - catch: '/When writing to a stream, only the following parameters are allowed: \[error_trace,timeout\]/' + catch: '/When writing to a stream, only the following parameters are allowed: \[index,op_type,error_trace,timeout\]/' + +--- +"Allowed Params Only - Bulk Should Succeed": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + bulk: + error_trace: true + timeout: "1m" + body: | + { "index": { "_index": "logs"} } + { "foo": "bar" } + - match: { errors: false } + - match: { items.0.index.status: 201 } + +--- +"Allowed Params Only - Single Doc Should Succeed": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + index: + index: logs + error_trace: true + timeout: "1m" + body: | + { "foo": "bar" } + - match: { _index: "logs" } + - match: { result: "created" } + +--- +"No Params - Bulk Should Succeed": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + bulk: + body: | + { "index": { "_index": "logs"} } + { "foo": "bar" } + - match: { errors: false } + - match: { items.0.index.status: 201 } + +--- +"No Params - Single Doc Should Succeed": + - do: + streams.logs_enable: { } + - is_true: acknowledged + - + do: + index: + index: logs + body: { "foo": "bar" } + - match: { _index: "logs" } + - match: { result: "created" } + +--- +"Mixed Allowed and Disallowed Params - Bulk Should Fail": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + bulk: + error_trace: true + timeout: "1m" + routing: "custom-routing" + refresh: true + body: | + { "index": { "_index": "logs"} } + { "foo": "bar" } + - match: { errors: true } + - match: { items.0.index.status: 400 } + - match: { items.0.index.error.type: "illegal_argument_exception" } + - match: { items.0.index.error.reason: "When writing to a stream, only the following parameters are allowed: [index,op_type,error_trace,timeout]" } + +--- +"Mixed Allowed and Disallowed Params - Single Doc Should Fail": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + index: + index: logs + error_trace: true + timeout: "1m" + routing: "custom-routing" + refresh: true + body: { "foo": "bar" } + catch: '/When writing to a stream, only the following parameters are allowed: \[index,op_type,error_trace,timeout\]/' + +--- +"Multiple Disallowed Params - Bulk Should Fail": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + bulk: + routing: "custom-routing" + wait_for_active_shards: "2" + refresh: true + body: | + { "index": { "_index": "logs"} } + { "foo": "bar" } + - match: { errors: true } + - match: { items.0.index.status: 400 } + - match: { items.0.index.error.type: "illegal_argument_exception" } + - match: { items.0.index.error.reason: "When writing to a stream, only the following parameters are allowed: [index,op_type,error_trace,timeout]" } + +--- +"Multiple Disallowed Params - Single Doc Should Fail": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + index: + index: logs + routing: "custom-routing" + pipeline: "my-pipeline" + wait_for_active_shards: "2" + refresh: true + body: { "foo": "bar" } + catch: '/When writing to a stream, only the following parameters are allowed: \[index,op_type,error_trace,timeout\]/' diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 5a9aafb963e27..b0bd3a84f13eb 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -64,8 +64,10 @@ public class RestBulkAction extends BaseRestHandler { public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated."; public static final String FAILURE_STORE_STATUS_CAPABILITY = "failure_store_status"; - public static final LinkedHashSet STREAMS_ALLOWED_PARAMS = new LinkedHashSet<>(2) { + public static final Set STREAMS_ALLOWED_PARAMS = new LinkedHashSet<>(2) { { + add("index"); + add("op_type"); add("error_trace"); add("timeout"); } From 2429ec0294081e37a51eca6cdb962cb4112e61ea Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 20 Aug 2025 13:47:50 +0100 Subject: [PATCH 07/11] Update docs/changelog/132967.yaml --- docs/changelog/132967.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/132967.yaml diff --git a/docs/changelog/132967.yaml b/docs/changelog/132967.yaml new file mode 100644 index 0000000000000..0e19f0070c6f4 --- /dev/null +++ b/docs/changelog/132967.yaml @@ -0,0 +1,5 @@ +pr: 132967 +summary: ES-11331 streams params restriction +area: Data streams +type: enhancement +issues: [] From 1aed1f6303738e6e3fbb3f0e6eb7697254db57ec Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 20 Aug 2025 13:52:25 +0100 Subject: [PATCH 08/11] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../org/elasticsearch/rest/action/document/RestBulkAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index b0bd3a84f13eb..646cbdc0b3acc 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -64,7 +64,7 @@ public class RestBulkAction extends BaseRestHandler { public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated."; public static final String FAILURE_STORE_STATUS_CAPABILITY = "failure_store_status"; - public static final Set STREAMS_ALLOWED_PARAMS = new LinkedHashSet<>(2) { + public static final Set STREAMS_ALLOWED_PARAMS = new LinkedHashSet<>(4) { { add("index"); add("op_type"); From 6aecacc55c5d83f3045f4fab435befd699fd09e4 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Thu, 21 Aug 2025 13:02:52 +0100 Subject: [PATCH 09/11] Move check lower down the stack and preserve param names for better error message Also whitelists ID and pretty as allowed params --- .../streams/logs/30_param_restrictions.yml | 27 +++++++++--- .../action/bulk/BulkRequest.java | 16 +++---- .../action/bulk/IncrementalBulkService.java | 24 ++++------- .../bulk/TransportAbstractBulkAction.java | 30 +++++++++++-- .../rest/action/document/RestBulkAction.java | 18 +------- .../rest/action/document/RestIndexAction.java | 42 +++++++++++-------- .../action/document/RestBulkActionTests.java | 3 +- 7 files changed, 92 insertions(+), 68 deletions(-) diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml index a215de168cf41..55bff031d51a6 100644 --- a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml @@ -24,7 +24,7 @@ teardown: - match: { errors: true } - match: { items.0.index.status: 400 } - match: { items.0.index.error.type: "illegal_argument_exception" } - - match: { items.0.index.error.reason: "When writing to a stream, only the following parameters are allowed: [index,op_type,error_trace,timeout]" } + - match: { items.0.index.error.reason: '/When\ writing\ to\ a\ stream\,\ only\ the\ following\ parameters\ are\ allowed\:\ \[.+\]\ however\ the\ following\ were\ used\:\ \[.+\]/' } - match: { items.1.index.status: 201 } --- @@ -42,7 +42,7 @@ teardown: refresh: true index: logs body: { "foo": "bar" } - catch: '/When writing to a stream, only the following parameters are allowed: \[index,op_type,error_trace,timeout\]/' + catch: '/When writing to a stream, only the following parameters are allowed: \[.+\] however the following were used: \[.+\]/' --- "Allowed Params Only - Bulk Should Succeed": @@ -76,6 +76,21 @@ teardown: - match: { _index: "logs" } - match: { result: "created" } +--- +"Allowed Params Only - Single Doc with ID Should Succeed": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + index: + index: logs + id: 123456 + body: | + { "foo": "bar" } + - match: { _index: "logs" } + - match: { result: "created" } + --- "No Params - Bulk Should Succeed": - do: @@ -121,7 +136,7 @@ teardown: - match: { errors: true } - match: { items.0.index.status: 400 } - match: { items.0.index.error.type: "illegal_argument_exception" } - - match: { items.0.index.error.reason: "When writing to a stream, only the following parameters are allowed: [index,op_type,error_trace,timeout]" } + - match: { items.0.index.error.reason: '/When\ writing\ to\ a\ stream\,\ only\ the\ following\ parameters\ are\ allowed\:\ \[.+\]\ however\ the\ following\ were\ used\:\ \[.+\]/' } --- "Mixed Allowed and Disallowed Params - Single Doc Should Fail": @@ -137,7 +152,7 @@ teardown: routing: "custom-routing" refresh: true body: { "foo": "bar" } - catch: '/When writing to a stream, only the following parameters are allowed: \[index,op_type,error_trace,timeout\]/' + catch: '/When writing to a stream, only the following parameters are allowed: \[.+\] however the following were used: \[.+\]/' --- "Multiple Disallowed Params - Bulk Should Fail": @@ -156,7 +171,7 @@ teardown: - match: { errors: true } - match: { items.0.index.status: 400 } - match: { items.0.index.error.type: "illegal_argument_exception" } - - match: { items.0.index.error.reason: "When writing to a stream, only the following parameters are allowed: [index,op_type,error_trace,timeout]" } + - match: { items.0.index.error.reason: '/When\ writing\ to\ a\ stream\,\ only\ the\ following\ parameters\ are\ allowed\:\ \[.+\]\ however\ the\ following\ were\ used\:\ \[.+\]/' } --- "Multiple Disallowed Params - Single Doc Should Fail": @@ -172,4 +187,4 @@ teardown: wait_for_active_shards: "2" refresh: true body: { "foo": "bar" } - catch: '/When writing to a stream, only the following parameters are allowed: \[index,op_type,error_trace,timeout\]/' + catch: '/When writing to a stream, only the following parameters are allowed: \[.+\] however the following were used: \[.+\]/' diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 883db3189ba00..b80b526c543b2 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -86,7 +86,7 @@ public class BulkRequest extends LegacyActionRequest private Boolean globalRequireAlias; private Boolean globalRequireDatsStream; private boolean includeSourceOnError = true; - private boolean streamsRestrictedParamsUsed = false; + private Set paramsUsed; private long sizeInBytes = 0; @@ -110,7 +110,7 @@ public BulkRequest(StreamInput in) throws IOException { includeSourceOnError = in.readBoolean(); } if (in.getTransportVersion().onOrAfter(TransportVersions.STREAMS_ENDPOINT_PARAM_RESTRICTIONS)) { - streamsRestrictedParamsUsed = in.readBoolean(); + paramsUsed = in.readCollectionAsImmutableSet(StreamInput::readString); } } @@ -479,7 +479,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(includeSourceOnError); } if (out.getTransportVersion().onOrAfter(TransportVersions.STREAMS_ENDPOINT_PARAM_RESTRICTIONS)) { - out.writeBoolean(streamsRestrictedParamsUsed); + out.writeCollection(paramsUsed, StreamOutput::writeString); } } @@ -523,12 +523,12 @@ public boolean isSimulated() { return false; // Always false, but may be overridden by a subclass } - public boolean streamsRestrictedParamsUsed() { - return streamsRestrictedParamsUsed; + public Set requestParamsUsed() { + return paramsUsed; } - public void streamsRestrictedParamsUsed(boolean streamsRestrictedParamsUsed) { - this.streamsRestrictedParamsUsed = streamsRestrictedParamsUsed; + public void requestParamsUsed(Set paramsUsed) { + this.paramsUsed = paramsUsed; } /* @@ -569,7 +569,7 @@ public BulkRequest shallowClone() { bulkRequest.routing(routing()); bulkRequest.requireAlias(requireAlias()); bulkRequest.requireDataStream(requireDataStream()); - bulkRequest.streamsRestrictedParamsUsed(streamsRestrictedParamsUsed()); + bulkRequest.requestParamsUsed(requestParamsUsed()); return bulkRequest; } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index 778c97b675120..b721a8f4f2b6b 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -29,9 +29,11 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import static java.util.Collections.emptySet; import static org.elasticsearch.common.settings.Setting.boolSetting; public class IncrementalBulkService { @@ -62,25 +64,17 @@ public IncrementalBulkService(Client client, IndexingPressure indexingPressure, public Handler newBulkRequest() { ensureEnabled(); - return newBulkRequest(null, null, null, false); + return newBulkRequest(null, null, null, emptySet()); } public Handler newBulkRequest( @Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh, - boolean usesStreamsRestrictedParams + Set paramsUsed ) { ensureEnabled(); - return new Handler( - client, - indexingPressure, - waitForActiveShards, - timeout, - refresh, - chunkWaitTimeMillisHistogram, - usesStreamsRestrictedParams - ); + return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh, chunkWaitTimeMillisHistogram, paramsUsed); } private void ensureEnabled() { @@ -118,7 +112,7 @@ public static class Handler implements Releasable { private final Client client; private final ActiveShardCount waitForActiveShards; private final TimeValue timeout; - private final boolean usesStreamsRestrictedParams; + private final Set paramsUsed; private final String refresh; private final ArrayList releasables = new ArrayList<>(4); @@ -140,13 +134,13 @@ protected Handler( @Nullable TimeValue timeout, @Nullable String refresh, LongHistogram chunkWaitTimeMillisHistogram, - boolean usesStreamsRestrictedParams + Set paramsUsed ) { this.client = client; this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null; this.timeout = timeout; this.refresh = refresh; - this.usesStreamsRestrictedParams = usesStreamsRestrictedParams; + this.paramsUsed = paramsUsed; this.incrementalOperation = indexingPressure.startIncrementalCoordinating(0, 0, false); this.chunkWaitTimeMillisHistogram = chunkWaitTimeMillisHistogram; createNewBulkRequest(EMPTY_STATE); @@ -326,7 +320,7 @@ private void createNewBulkRequest(BulkRequest.IncrementalState incrementalState) if (refresh != null) { bulkRequest.setRefreshPolicy(refresh); } - bulkRequest.streamsRestrictedParamsUsed(usesStreamsRestrictedParams); + bulkRequest.requestParamsUsed(paramsUsed); } } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index 6c111d168136a..31fcc465abfdb 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.streams.StreamType; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; @@ -42,15 +43,17 @@ import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.node.NodeClosedException; -import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.function.LongSupplier; @@ -62,6 +65,17 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction { private static final Logger logger = LogManager.getLogger(TransportAbstractBulkAction.class); + public static final Set STREAMS_ALLOWED_PARAMS = new HashSet<>(4) { + { + add("id"); + add("index"); + add("op_type"); + add("pretty"); + add("error_trace"); + add("timeout"); + } + }; + protected final ThreadPool threadPool; protected final ClusterService clusterService; protected final IndexingPressure indexingPressure; @@ -441,11 +455,12 @@ private void doStreamsChecks( ); } - if (e == null && bulkRequest.streamsRestrictedParamsUsed() && req.index().equals(streamType.getStreamName())) { + if (e == null && streamsRestrictedParamsUsed(bulkRequest) && req.index().equals(streamType.getStreamName())) { e = new IllegalArgumentException( "When writing to a stream, only the following parameters are allowed: [" - + String.join(",", RestBulkAction.STREAMS_ALLOWED_PARAMS) - + "]" + + String.join(", ", STREAMS_ALLOWED_PARAMS) + + "] however the following were used: " + + bulkRequest.requestParamsUsed() ); } @@ -470,6 +485,13 @@ private void doStreamsChecks( } } + private boolean streamsRestrictedParamsUsed(BulkRequest bulkRequest) { + return Sets.difference( + bulkRequest.requestParamsUsed() == null ? Collections.emptySet() : bulkRequest.requestParamsUsed(), + STREAMS_ALLOWED_PARAMS + ).isEmpty() == false; + } + /** * This method creates any missing resources and actually applies the BulkRequest to the relevant indices * @param task The task in which this work is being done diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index b0bd3a84f13eb..594ae380aab3b 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -23,7 +23,6 @@ import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; @@ -41,7 +40,6 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -64,14 +62,6 @@ public class RestBulkAction extends BaseRestHandler { public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated."; public static final String FAILURE_STORE_STATUS_CAPABILITY = "failure_store_status"; - public static final Set STREAMS_ALLOWED_PARAMS = new LinkedHashSet<>(2) { - { - add("index"); - add("op_type"); - add("error_trace"); - add("timeout"); - } - }; private final boolean allowExplicitIndex; private final IncrementalBulkService bulkHandler; @@ -123,7 +113,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT)); bulkRequest.setRefreshPolicy(request.param("refresh")); bulkRequest.includeSourceOnError(RestUtils.getIncludeSourceOnError(request)); - bulkRequest.streamsRestrictedParamsUsed(usesStreamsRestrictedParams(request)); + bulkRequest.requestParamsUsed(request.params().keySet()); ReleasableBytesReference content = request.requiredContent(); try { @@ -154,15 +144,11 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC return new ChunkHandler( allowExplicitIndex, request, - () -> bulkHandler.newBulkRequest(waitForActiveShards, timeout, refresh, usesStreamsRestrictedParams(request)) + () -> bulkHandler.newBulkRequest(waitForActiveShards, timeout, refresh, request.params().keySet()) ); } } - private boolean usesStreamsRestrictedParams(RestRequest request) { - return Sets.difference(request.params().keySet(), STREAMS_ALLOWED_PARAMS).isEmpty() == false; - } - private static Exception parseFailureException(Exception e) { if (e instanceof IllegalArgumentException) { return e; diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java index dcf7aa9cbdee6..f4405e6c912f4 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.bulk.TransportAbstractBulkAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.internal.node.NodeClient; @@ -123,26 +124,9 @@ public RestChannelConsumer prepareRequest(RestRequest request, final NodeClient @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { ReleasableBytesReference source = request.requiredContent(); - String index = request.param("index"); - ProjectMetadata projectMetadata = null; - - for (StreamType streamType : StreamType.values()) { - if (index.equals(streamType.getStreamName())) { - if (projectMetadata == null) { - projectMetadata = clusterService.state().projectState(projectIdResolver.getProjectId()).metadata(); - } - if (streamType.streamTypeIsEnabled(projectMetadata) - && Sets.difference(request.params().keySet(), RestBulkAction.STREAMS_ALLOWED_PARAMS).isEmpty() == false) { - throw new IllegalArgumentException( - "When writing to a stream, only the following parameters are allowed: [" - + String.join(",", RestBulkAction.STREAMS_ALLOWED_PARAMS) - + "]" - ); - } - } - } + validateStreamsParamRestrictions(request, index); IndexRequest indexRequest = new IndexRequest(index); indexRequest.id(request.param("id")); @@ -179,6 +163,28 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC }; } + private void validateStreamsParamRestrictions(RestRequest request, String index) { + ProjectMetadata projectMetadata = null; + + for (StreamType streamType : StreamType.values()) { + if (index.equals(streamType.getStreamName())) { + if (projectMetadata == null) { + projectMetadata = clusterService.state().projectState(projectIdResolver.getProjectId()).metadata(); + } + + if (streamType.streamTypeIsEnabled(projectMetadata) + && Sets.difference(request.params().keySet(), TransportAbstractBulkAction.STREAMS_ALLOWED_PARAMS).isEmpty() == false) { + throw new IllegalArgumentException( + "When writing to a stream, only the following parameters are allowed: [" + + String.join(", ", TransportAbstractBulkAction.STREAMS_ALLOWED_PARAMS) + + "] however the following were used: " + + request.params().keySet() + ); + } + } + } + } + @Override public Set supportedCapabilities() { return capabilities; diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java index 43fcd3d6344d3..95d463f7cb0cc 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java @@ -42,6 +42,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -241,7 +242,7 @@ public void next() { null, null, MeterRegistry.NOOP.getLongHistogram(IncrementalBulkService.CHUNK_WAIT_TIME_HISTOGRAM_NAME), - false + emptySet() ) { @Override From 01ffed6367afc2218d162b9795af0ebd7ed3e1d9 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Thu, 21 Aug 2025 13:29:16 +0100 Subject: [PATCH 10/11] Fix NPE due to missing params set during serialization --- .../java/org/elasticsearch/action/bulk/BulkRequest.java | 3 ++- .../action/bulk/TransportAbstractBulkAction.java | 6 +----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index b80b526c543b2..648df8f7073b4 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -48,6 +48,7 @@ import java.util.Objects; import java.util.Set; +import static java.util.Collections.emptySet; import static org.elasticsearch.action.ValidateActions.addValidationError; /** @@ -86,7 +87,7 @@ public class BulkRequest extends LegacyActionRequest private Boolean globalRequireAlias; private Boolean globalRequireDatsStream; private boolean includeSourceOnError = true; - private Set paramsUsed; + private Set paramsUsed = emptySet(); private long sizeInBytes = 0; diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index 31fcc465abfdb..d4c516c75fc35 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -48,7 +48,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -486,10 +485,7 @@ private void doStreamsChecks( } private boolean streamsRestrictedParamsUsed(BulkRequest bulkRequest) { - return Sets.difference( - bulkRequest.requestParamsUsed() == null ? Collections.emptySet() : bulkRequest.requestParamsUsed(), - STREAMS_ALLOWED_PARAMS - ).isEmpty() == false; + return Sets.difference(bulkRequest.requestParamsUsed(), STREAMS_ALLOWED_PARAMS).isEmpty() == false; } /** From cdc482923e28be2bc1de26b32f83a23f9984a9d7 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 27 Aug 2025 13:12:54 +0100 Subject: [PATCH 11/11] Add additional allowed params following product feedback --- .../test/streams/logs/30_param_restrictions.yml | 4 ++-- .../java/org/elasticsearch/action/bulk/BulkRequest.java | 2 +- .../action/bulk/TransportAbstractBulkAction.java | 6 ++++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml index 55bff031d51a6..9417a1d8f376b 100644 --- a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml @@ -15,7 +15,7 @@ teardown: - do: bulk: - refresh: true + list_executed_pipelines: true body: | { "index": { "_index": "logs"} } { "foo": "bar" } @@ -39,7 +39,7 @@ teardown: - do: index: - refresh: true + require_alias: true index: logs body: { "foo": "bar" } catch: '/When writing to a stream, only the following parameters are allowed: \[.+\] however the following were used: \[.+\]/' diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 648df8f7073b4..93ac6f2a21143 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -109,7 +109,7 @@ public BulkRequest(StreamInput in) throws IOException { } if (in.getTransportVersion().onOrAfter(TransportVersions.INGEST_REQUEST_INCLUDE_SOURCE_ON_ERROR)) { includeSourceOnError = in.readBoolean(); - } + } // else default value is true if (in.getTransportVersion().onOrAfter(TransportVersions.STREAMS_ENDPOINT_PARAM_RESTRICTIONS)) { paramsUsed = in.readCollectionAsImmutableSet(StreamInput::readString); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index d4c516c75fc35..03029258cb6c4 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -64,13 +64,15 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction { private static final Logger logger = LogManager.getLogger(TransportAbstractBulkAction.class); - public static final Set STREAMS_ALLOWED_PARAMS = new HashSet<>(4) { + public static final Set STREAMS_ALLOWED_PARAMS = new HashSet<>(8) { { + add("error_trace"); + add("filter_path"); add("id"); add("index"); add("op_type"); add("pretty"); - add("error_trace"); + add("refresh"); add("timeout"); } };