diff --git a/docs/changelog/132967.yaml b/docs/changelog/132967.yaml new file mode 100644 index 0000000000000..0e19f0070c6f4 --- /dev/null +++ b/docs/changelog/132967.yaml @@ -0,0 +1,5 @@ +pr: 132967 +summary: ES-11331 streams params restriction +area: Data streams +type: enhancement +issues: [] diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml index d11245a7e7a5e..cc19b71d5ed22 100644 --- a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml @@ -142,13 +142,16 @@ teardown: index.default_pipeline: "reroute-to-logs-foo-success" - do: bulk: - refresh: true body: | { "index": { "_index": "logs" } } { "foo": "bar", "baz": "qux" } - match: { errors: false } - match: { items.0.index.status: 201 } + - do: + indices.refresh: + index: logs.foo + - do: delete_by_query: index: logs.foo diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml new file mode 100644 index 0000000000000..9417a1d8f376b --- /dev/null +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml @@ -0,0 +1,190 @@ +--- +teardown: + - do: + streams.logs_disable: { } + +--- +"Check User Can't Use Restricted Params On Logs Endpoint": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + bulk: + list_executed_pipelines: true + body: | + { "index": { "_index": "logs"} } + { "foo": "bar" } + { "index": { "_index": "not-logs" } } + { "foo": "bar" } + - match: { errors: true } + - match: { items.0.index.status: 400 } + - match: { items.0.index.error.type: "illegal_argument_exception" } + - match: { items.0.index.error.reason: '/When\ writing\ to\ a\ stream\,\ only\ the\ following\ parameters\ are\ allowed\:\ \[.+\]\ however\ the\ following\ were\ used\:\ \[.+\]/' } + - match: { items.1.index.status: 201 } + +--- +"Check User Can't Use Restricted Params On Logs Endpoint - Single Doc": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + index: + require_alias: true + index: logs + body: { "foo": "bar" } + catch: '/When writing to a stream, only the following parameters are allowed: \[.+\] however the following were used: \[.+\]/' + +--- +"Allowed Params Only - Bulk Should Succeed": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + bulk: + error_trace: true + timeout: "1m" + body: | + { "index": { "_index": "logs"} } + { "foo": "bar" } + - match: { errors: false } + - match: { items.0.index.status: 201 } + +--- +"Allowed Params Only - Single Doc Should Succeed": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + index: + index: logs + error_trace: true + timeout: "1m" + body: | + { "foo": "bar" } + - match: { _index: "logs" } + - match: { result: "created" } + +--- +"Allowed Params Only - Single Doc with ID Should Succeed": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + index: + index: logs + id: 123456 + body: | + { "foo": "bar" } + - match: { _index: "logs" } + - match: { result: "created" } + +--- +"No Params - Bulk Should Succeed": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + bulk: + body: | + { "index": { "_index": "logs"} } + { "foo": "bar" } + - match: { errors: false } + - match: { items.0.index.status: 201 } + +--- +"No Params - Single Doc Should Succeed": + - do: + streams.logs_enable: { } + - is_true: acknowledged + - + do: + index: + index: logs + body: { "foo": "bar" } + - match: { _index: "logs" } + - match: { result: "created" } + +--- +"Mixed Allowed and Disallowed Params - Bulk Should Fail": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + bulk: + error_trace: true + timeout: "1m" + routing: "custom-routing" + refresh: true + body: | + { "index": { "_index": "logs"} } + { "foo": "bar" } + - match: { errors: true } + - match: { items.0.index.status: 400 } + - match: { items.0.index.error.type: "illegal_argument_exception" } + - match: { items.0.index.error.reason: '/When\ writing\ to\ a\ stream\,\ only\ the\ following\ parameters\ are\ allowed\:\ \[.+\]\ however\ the\ following\ were\ used\:\ \[.+\]/' } + +--- +"Mixed Allowed and Disallowed Params - Single Doc Should Fail": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + index: + index: logs + error_trace: true + timeout: "1m" + routing: "custom-routing" + refresh: true + body: { "foo": "bar" } + catch: '/When writing to a stream, only the following parameters are allowed: \[.+\] however the following were used: \[.+\]/' + +--- +"Multiple Disallowed Params - Bulk Should Fail": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + bulk: + routing: "custom-routing" + wait_for_active_shards: "2" + refresh: true + body: | + { "index": { "_index": "logs"} } + { "foo": "bar" } + - match: { errors: true } + - match: { items.0.index.status: 400 } + - match: { items.0.index.error.type: "illegal_argument_exception" } + - match: { items.0.index.error.reason: '/When\ writing\ to\ a\ stream\,\ only\ the\ following\ parameters\ are\ allowed\:\ \[.+\]\ however\ the\ following\ were\ used\:\ \[.+\]/' } + +--- +"Multiple Disallowed Params - Single Doc Should Fail": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + index: + index: logs + routing: "custom-routing" + pipeline: "my-pipeline" + wait_for_active_shards: "2" + refresh: true + body: { "foo": "bar" } + catch: '/When writing to a stream, only the following parameters are allowed: \[.+\] however the following were used: \[.+\]/' diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 1abb8177e5d74..c8198fc9e720b 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -354,6 +354,7 @@ static TransportVersion def(int id) { public static final TransportVersion SHARD_WRITE_LOAD_IN_CLUSTER_INFO = def(9_126_0_00); public static final TransportVersion ESQL_SAMPLE_OPERATOR_STATUS = def(9_127_0_00); public static final TransportVersion PROJECT_RESERVED_STATE_MOVE_TO_REGISTRY = def(9_147_0_00); + public static final TransportVersion STREAMS_ENDPOINT_PARAM_RESTRICTIONS = def(9_148_00_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 7a3bbfdf2609d..211adffba5ec8 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -459,6 +459,7 @@ public class ActionModule extends AbstractModule { private final RequestValidators indicesAliasesRequestRequestValidators; private final ReservedClusterStateService reservedClusterStateService; private final RestExtension restExtension; + private final ClusterService clusterService; public ActionModule( Settings settings, @@ -534,6 +535,7 @@ public ActionModule( reservedProjectStateHandlers ); this.restExtension = restExtension; + this.clusterService = clusterService; } private static T getRestServerComponent( @@ -927,9 +929,9 @@ public void initRestHandlers(Supplier nodesInCluster, Predicate< registerHandler.accept(new RestResolveClusterAction()); registerHandler.accept(new RestResolveIndexAction()); - registerHandler.accept(new RestIndexAction()); - registerHandler.accept(new CreateHandler()); - registerHandler.accept(new AutoIdHandler()); + registerHandler.accept(new RestIndexAction(clusterService, projectIdResolver)); + registerHandler.accept(new CreateHandler(clusterService, projectIdResolver)); + registerHandler.accept(new AutoIdHandler(clusterService, projectIdResolver)); registerHandler.accept(new RestGetAction()); registerHandler.accept(new RestGetSourceAction()); registerHandler.accept(new RestMultiGetAction(settings)); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 0acc6f955f5b6..93ac6f2a21143 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -48,6 +48,7 @@ import java.util.Objects; import java.util.Set; +import static java.util.Collections.emptySet; import static org.elasticsearch.action.ValidateActions.addValidationError; /** @@ -86,6 +87,7 @@ public class BulkRequest extends LegacyActionRequest private Boolean globalRequireAlias; private Boolean globalRequireDatsStream; private boolean includeSourceOnError = true; + private Set paramsUsed = emptySet(); private long sizeInBytes = 0; @@ -108,6 +110,9 @@ public BulkRequest(StreamInput in) throws IOException { if (in.getTransportVersion().onOrAfter(TransportVersions.INGEST_REQUEST_INCLUDE_SOURCE_ON_ERROR)) { includeSourceOnError = in.readBoolean(); } // else default value is true + if (in.getTransportVersion().onOrAfter(TransportVersions.STREAMS_ENDPOINT_PARAM_RESTRICTIONS)) { + paramsUsed = in.readCollectionAsImmutableSet(StreamInput::readString); + } } public BulkRequest(@Nullable String globalIndex) { @@ -474,6 +479,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.INGEST_REQUEST_INCLUDE_SOURCE_ON_ERROR)) { out.writeBoolean(includeSourceOnError); } + if (out.getTransportVersion().onOrAfter(TransportVersions.STREAMS_ENDPOINT_PARAM_RESTRICTIONS)) { + out.writeCollection(paramsUsed, StreamOutput::writeString); + } } @Override @@ -516,6 +524,14 @@ public boolean isSimulated() { return false; // Always false, but may be overridden by a subclass } + public Set requestParamsUsed() { + return paramsUsed; + } + + public void requestParamsUsed(Set paramsUsed) { + this.paramsUsed = paramsUsed; + } + /* * Returns any component template substitutions that are to be used as part of this bulk request. We would likely only have * substitutions in the event of a simulated request. @@ -554,6 +570,7 @@ public BulkRequest shallowClone() { bulkRequest.routing(routing()); bulkRequest.requireAlias(requireAlias()); bulkRequest.requireDataStream(requireDataStream()); + bulkRequest.requestParamsUsed(requestParamsUsed()); return bulkRequest; } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index b2b4404f48d47..b721a8f4f2b6b 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -29,9 +29,11 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import static java.util.Collections.emptySet; import static org.elasticsearch.common.settings.Setting.boolSetting; public class IncrementalBulkService { @@ -62,12 +64,17 @@ public IncrementalBulkService(Client client, IndexingPressure indexingPressure, public Handler newBulkRequest() { ensureEnabled(); - return newBulkRequest(null, null, null); + return newBulkRequest(null, null, null, emptySet()); } - public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) { + public Handler newBulkRequest( + @Nullable String waitForActiveShards, + @Nullable TimeValue timeout, + @Nullable String refresh, + Set paramsUsed + ) { ensureEnabled(); - return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh, chunkWaitTimeMillisHistogram); + return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh, chunkWaitTimeMillisHistogram, paramsUsed); } private void ensureEnabled() { @@ -105,6 +112,7 @@ public static class Handler implements Releasable { private final Client client; private final ActiveShardCount waitForActiveShards; private final TimeValue timeout; + private final Set paramsUsed; private final String refresh; private final ArrayList releasables = new ArrayList<>(4); @@ -125,12 +133,14 @@ protected Handler( @Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh, - LongHistogram chunkWaitTimeMillisHistogram + LongHistogram chunkWaitTimeMillisHistogram, + Set paramsUsed ) { this.client = client; this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null; this.timeout = timeout; this.refresh = refresh; + this.paramsUsed = paramsUsed; this.incrementalOperation = indexingPressure.startIncrementalCoordinating(0, 0, false); this.chunkWaitTimeMillisHistogram = chunkWaitTimeMillisHistogram; createNewBulkRequest(EMPTY_STATE); @@ -310,6 +320,7 @@ private void createNewBulkRequest(BulkRequest.IncrementalState incrementalState) if (refresh != null) { bulkRequest.setRefreshPolicy(refresh); } + bulkRequest.requestParamsUsed(paramsUsed); } } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index 12a583251516a..03029258cb6c4 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.streams.StreamType; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; @@ -48,8 +49,10 @@ import java.io.IOException; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.function.LongSupplier; @@ -61,6 +64,19 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction { private static final Logger logger = LogManager.getLogger(TransportAbstractBulkAction.class); + public static final Set STREAMS_ALLOWED_PARAMS = new HashSet<>(8) { + { + add("error_trace"); + add("filter_path"); + add("id"); + add("index"); + add("op_type"); + add("pretty"); + add("refresh"); + add("timeout"); + } + }; + protected final ThreadPool threadPool; protected final ClusterService clusterService; protected final IndexingPressure indexingPressure; @@ -412,14 +428,44 @@ private void applyPipelinesAndDoInternalExecute( while (bulkRequestModifier.hasNext()) { req = bulkRequestModifier.next(); i++; + doStreamsChecks(bulkRequest, projectMetadata, req, bulkRequestModifier, i); + } + + var wrappedListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener); + + if (applyPipelines(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener) == false) { + doInternalExecute(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener, relativeStartTimeNanos); + } + } - for (StreamType streamType : StreamType.getEnabledStreamTypesForProject(projectMetadata)) { - if (req instanceof IndexRequest ir && streamType.matchesStreamPrefix(req.index()) && ir.isPipelineResolved() == false) { - IllegalArgumentException e = new IllegalArgumentException( + private void doStreamsChecks( + BulkRequest bulkRequest, + ProjectMetadata projectMetadata, + DocWriteRequest req, + BulkRequestModifier bulkRequestModifier, + int i + ) { + for (StreamType streamType : StreamType.getEnabledStreamTypesForProject(projectMetadata)) { + if (req instanceof IndexRequest ir && ir.isPipelineResolved() == false) { + IllegalArgumentException e = null; + if (streamType.matchesStreamPrefix(req.index())) { + e = new IllegalArgumentException( "Direct writes to child streams are prohibited. Index directly into the [" + streamType.getStreamName() + "] stream instead" ); + } + + if (e == null && streamsRestrictedParamsUsed(bulkRequest) && req.index().equals(streamType.getStreamName())) { + e = new IllegalArgumentException( + "When writing to a stream, only the following parameters are allowed: [" + + String.join(", ", STREAMS_ALLOWED_PARAMS) + + "] however the following were used: " + + bulkRequest.requestParamsUsed() + ); + } + + if (e != null) { Boolean failureStoreEnabled = resolveFailureStore(req.index(), projectMetadata, threadPool.absoluteTimeInMillis()); if (featureService.clusterHasFeature(clusterService.state(), DataStream.DATA_STREAM_FAILURE_STORE_FEATURE)) { @@ -438,12 +484,10 @@ private void applyPipelinesAndDoInternalExecute( } } } + } - var wrappedListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener); - - if (applyPipelines(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener) == false) { - doInternalExecute(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener, relativeStartTimeNanos); - } + private boolean streamsRestrictedParamsUsed(BulkRequest bulkRequest) { + return Sets.difference(bulkRequest.requestParamsUsed(), STREAMS_ALLOWED_PARAMS).isEmpty() == false; } /** diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 293c8128dc8d9..20a9fb7ed23aa 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -62,7 +62,6 @@ public class RestBulkAction extends BaseRestHandler { public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated."; public static final String FAILURE_STORE_STATUS_CAPABILITY = "failure_store_status"; - private final boolean allowExplicitIndex; private final IncrementalBulkService bulkHandler; private final IncrementalBulkService.Enabled incrementalEnabled; @@ -113,6 +112,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT)); bulkRequest.setRefreshPolicy(request.param("refresh")); bulkRequest.includeSourceOnError(RestUtils.getIncludeSourceOnError(request)); + bulkRequest.requestParamsUsed(request.params().keySet()); ReleasableBytesReference content = request.requiredContent(); try { @@ -141,7 +141,11 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC String waitForActiveShards = request.param("wait_for_active_shards"); TimeValue timeout = request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT); String refresh = request.param("refresh"); - return new ChunkHandler(allowExplicitIndex, request, () -> bulkHandler.newBulkRequest(waitForActiveShards, timeout, refresh)); + return new ChunkHandler( + allowExplicitIndex, + request, + () -> bulkHandler.newBulkRequest(waitForActiveShards, timeout, refresh, request.params().keySet()) + ); } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java index a157ab1b95065..f4405e6c912f4 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java @@ -12,10 +12,16 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.bulk.TransportAbstractBulkAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.project.ProjectIdResolver; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.streams.StreamType; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.VersionType; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; @@ -40,6 +46,13 @@ public class RestIndexAction extends BaseRestHandler { + "index requests is deprecated, use the typeless endpoints instead (/{index}/_doc/{id}, /{index}/_doc, " + "or /{index}/_create/{id})."; private final Set capabilities = Set.of(FAILURE_STORE_STATUS_CAPABILITY); + private final ClusterService clusterService; + private final ProjectIdResolver projectIdResolver; + + public RestIndexAction(ClusterService clusterService, ProjectIdResolver projectIdResolver) { + this.clusterService = clusterService; + this.projectIdResolver = projectIdResolver; + } @Override public List routes() { @@ -54,6 +67,10 @@ public String getName() { @ServerlessScope(Scope.PUBLIC) public static final class CreateHandler extends RestIndexAction { + public CreateHandler(ClusterService clusterService, ProjectIdResolver projectIdResolver) { + super(clusterService, projectIdResolver); + } + @Override public String getName() { return "document_create_action"; @@ -81,7 +98,9 @@ static void validateOpType(String opType) { @ServerlessScope(Scope.PUBLIC) public static final class AutoIdHandler extends RestIndexAction { - public AutoIdHandler() {} + public AutoIdHandler(ClusterService clusterService, ProjectIdResolver projectIdResolver) { + super(clusterService, projectIdResolver); + } @Override public String getName() { @@ -105,7 +124,11 @@ public RestChannelConsumer prepareRequest(RestRequest request, final NodeClient @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { ReleasableBytesReference source = request.requiredContent(); - IndexRequest indexRequest = new IndexRequest(request.param("index")); + String index = request.param("index"); + + validateStreamsParamRestrictions(request, index); + + IndexRequest indexRequest = new IndexRequest(index); indexRequest.id(request.param("id")); indexRequest.routing(request.param("routing")); indexRequest.setPipeline(request.param("pipeline")); @@ -140,6 +163,28 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC }; } + private void validateStreamsParamRestrictions(RestRequest request, String index) { + ProjectMetadata projectMetadata = null; + + for (StreamType streamType : StreamType.values()) { + if (index.equals(streamType.getStreamName())) { + if (projectMetadata == null) { + projectMetadata = clusterService.state().projectState(projectIdResolver.getProjectId()).metadata(); + } + + if (streamType.streamTypeIsEnabled(projectMetadata) + && Sets.difference(request.params().keySet(), TransportAbstractBulkAction.STREAMS_ALLOWED_PARAMS).isEmpty() == false) { + throw new IllegalArgumentException( + "When writing to a stream, only the following parameters are allowed: [" + + String.join(", ", TransportAbstractBulkAction.STREAMS_ALLOWED_PARAMS) + + "] however the following were used: " + + request.params().keySet() + ); + } + } + } + } + @Override public Set supportedCapabilities() { return capabilities; diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java index 0b3e50974a827..6a1adbea5e7b6 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java @@ -44,6 +44,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -246,7 +247,8 @@ public void next() { null, null, null, - MeterRegistry.NOOP.getLongHistogram(IncrementalBulkService.CHUNK_WAIT_TIME_HISTOGRAM_NAME) + MeterRegistry.NOOP.getLongHistogram(IncrementalBulkService.CHUNK_WAIT_TIME_HISTOGRAM_NAME), + emptySet() ) { @Override diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestIndexActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestIndexActionTests.java index c97160427e59d..46d9fe7982041 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestIndexActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestIndexActionTests.java @@ -37,13 +37,13 @@ public final class RestIndexActionTests extends RestActionTestCase { @Before public void setUpAction() { - controller().registerHandler(new RestIndexAction()); - controller().registerHandler(new CreateHandler()); - controller().registerHandler(new AutoIdHandler()); + controller().registerHandler(new RestIndexAction(null, null)); + controller().registerHandler(new CreateHandler(null, null)); + controller().registerHandler(new AutoIdHandler(null, null)); } public void testCreateOpTypeValidation() { - RestIndexAction.CreateHandler create = new CreateHandler(); + RestIndexAction.CreateHandler create = new CreateHandler(null, null); String opType = randomFrom("CREATE", null); CreateHandler.validateOpType(opType);