Skip to content

Commit 6aecacc

Browse files
committed
Move check lower down the stack and preserve param names for better error message
Also whitelists ID and pretty as allowed params
1 parent 960097e commit 6aecacc

File tree

7 files changed

+92
-68
lines changed

7 files changed

+92
-68
lines changed

modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/30_param_restrictions.yml

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ teardown:
2424
- match: { errors: true }
2525
- match: { items.0.index.status: 400 }
2626
- match: { items.0.index.error.type: "illegal_argument_exception" }
27-
- match: { items.0.index.error.reason: "When writing to a stream, only the following parameters are allowed: [index,op_type,error_trace,timeout]" }
27+
- match: { items.0.index.error.reason: '/When\ writing\ to\ a\ stream\,\ only\ the\ following\ parameters\ are\ allowed\:\ \[.+\]\ however\ the\ following\ were\ used\:\ \[.+\]/' }
2828
- match: { items.1.index.status: 201 }
2929

3030
---
@@ -42,7 +42,7 @@ teardown:
4242
refresh: true
4343
index: logs
4444
body: { "foo": "bar" }
45-
catch: '/When writing to a stream, only the following parameters are allowed: \[index,op_type,error_trace,timeout\]/'
45+
catch: '/When writing to a stream, only the following parameters are allowed: \[.+\] however the following were used: \[.+\]/'
4646

4747
---
4848
"Allowed Params Only - Bulk Should Succeed":
@@ -76,6 +76,21 @@ teardown:
7676
- match: { _index: "logs" }
7777
- match: { result: "created" }
7878

79+
---
80+
"Allowed Params Only - Single Doc with ID Should Succeed":
81+
- do:
82+
streams.logs_enable: { }
83+
- is_true: acknowledged
84+
85+
- do:
86+
index:
87+
index: logs
88+
id: 123456
89+
body: |
90+
{ "foo": "bar" }
91+
- match: { _index: "logs" }
92+
- match: { result: "created" }
93+
7994
---
8095
"No Params - Bulk Should Succeed":
8196
- do:
@@ -121,7 +136,7 @@ teardown:
121136
- match: { errors: true }
122137
- match: { items.0.index.status: 400 }
123138
- match: { items.0.index.error.type: "illegal_argument_exception" }
124-
- match: { items.0.index.error.reason: "When writing to a stream, only the following parameters are allowed: [index,op_type,error_trace,timeout]" }
139+
- match: { items.0.index.error.reason: '/When\ writing\ to\ a\ stream\,\ only\ the\ following\ parameters\ are\ allowed\:\ \[.+\]\ however\ the\ following\ were\ used\:\ \[.+\]/' }
125140

126141
---
127142
"Mixed Allowed and Disallowed Params - Single Doc Should Fail":
@@ -137,7 +152,7 @@ teardown:
137152
routing: "custom-routing"
138153
refresh: true
139154
body: { "foo": "bar" }
140-
catch: '/When writing to a stream, only the following parameters are allowed: \[index,op_type,error_trace,timeout\]/'
155+
catch: '/When writing to a stream, only the following parameters are allowed: \[.+\] however the following were used: \[.+\]/'
141156

142157
---
143158
"Multiple Disallowed Params - Bulk Should Fail":
@@ -156,7 +171,7 @@ teardown:
156171
- match: { errors: true }
157172
- match: { items.0.index.status: 400 }
158173
- match: { items.0.index.error.type: "illegal_argument_exception" }
159-
- match: { items.0.index.error.reason: "When writing to a stream, only the following parameters are allowed: [index,op_type,error_trace,timeout]" }
174+
- match: { items.0.index.error.reason: '/When\ writing\ to\ a\ stream\,\ only\ the\ following\ parameters\ are\ allowed\:\ \[.+\]\ however\ the\ following\ were\ used\:\ \[.+\]/' }
160175

161176
---
162177
"Multiple Disallowed Params - Single Doc Should Fail":
@@ -172,4 +187,4 @@ teardown:
172187
wait_for_active_shards: "2"
173188
refresh: true
174189
body: { "foo": "bar" }
175-
catch: '/When writing to a stream, only the following parameters are allowed: \[index,op_type,error_trace,timeout\]/'
190+
catch: '/When writing to a stream, only the following parameters are allowed: \[.+\] however the following were used: \[.+\]/'

server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public class BulkRequest extends LegacyActionRequest
8686
private Boolean globalRequireAlias;
8787
private Boolean globalRequireDatsStream;
8888
private boolean includeSourceOnError = true;
89-
private boolean streamsRestrictedParamsUsed = false;
89+
private Set<String> paramsUsed;
9090

9191
private long sizeInBytes = 0;
9292

@@ -110,7 +110,7 @@ public BulkRequest(StreamInput in) throws IOException {
110110
includeSourceOnError = in.readBoolean();
111111
}
112112
if (in.getTransportVersion().onOrAfter(TransportVersions.STREAMS_ENDPOINT_PARAM_RESTRICTIONS)) {
113-
streamsRestrictedParamsUsed = in.readBoolean();
113+
paramsUsed = in.readCollectionAsImmutableSet(StreamInput::readString);
114114
}
115115
}
116116

@@ -479,7 +479,7 @@ public void writeTo(StreamOutput out) throws IOException {
479479
out.writeBoolean(includeSourceOnError);
480480
}
481481
if (out.getTransportVersion().onOrAfter(TransportVersions.STREAMS_ENDPOINT_PARAM_RESTRICTIONS)) {
482-
out.writeBoolean(streamsRestrictedParamsUsed);
482+
out.writeCollection(paramsUsed, StreamOutput::writeString);
483483
}
484484
}
485485

@@ -523,12 +523,12 @@ public boolean isSimulated() {
523523
return false; // Always false, but may be overridden by a subclass
524524
}
525525

526-
public boolean streamsRestrictedParamsUsed() {
527-
return streamsRestrictedParamsUsed;
526+
public Set<String> requestParamsUsed() {
527+
return paramsUsed;
528528
}
529529

530-
public void streamsRestrictedParamsUsed(boolean streamsRestrictedParamsUsed) {
531-
this.streamsRestrictedParamsUsed = streamsRestrictedParamsUsed;
530+
public void requestParamsUsed(Set<String> paramsUsed) {
531+
this.paramsUsed = paramsUsed;
532532
}
533533

534534
/*
@@ -569,7 +569,7 @@ public BulkRequest shallowClone() {
569569
bulkRequest.routing(routing());
570570
bulkRequest.requireAlias(requireAlias());
571571
bulkRequest.requireDataStream(requireDataStream());
572-
bulkRequest.streamsRestrictedParamsUsed(streamsRestrictedParamsUsed());
572+
bulkRequest.requestParamsUsed(requestParamsUsed());
573573
return bulkRequest;
574574
}
575575
}

server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@
2929
import java.util.Collections;
3030
import java.util.List;
3131
import java.util.Optional;
32+
import java.util.Set;
3233
import java.util.concurrent.atomic.AtomicBoolean;
3334
import java.util.function.Supplier;
3435

36+
import static java.util.Collections.emptySet;
3537
import static org.elasticsearch.common.settings.Setting.boolSetting;
3638

3739
public class IncrementalBulkService {
@@ -62,25 +64,17 @@ public IncrementalBulkService(Client client, IndexingPressure indexingPressure,
6264

6365
public Handler newBulkRequest() {
6466
ensureEnabled();
65-
return newBulkRequest(null, null, null, false);
67+
return newBulkRequest(null, null, null, emptySet());
6668
}
6769

6870
public Handler newBulkRequest(
6971
@Nullable String waitForActiveShards,
7072
@Nullable TimeValue timeout,
7173
@Nullable String refresh,
72-
boolean usesStreamsRestrictedParams
74+
Set<String> paramsUsed
7375
) {
7476
ensureEnabled();
75-
return new Handler(
76-
client,
77-
indexingPressure,
78-
waitForActiveShards,
79-
timeout,
80-
refresh,
81-
chunkWaitTimeMillisHistogram,
82-
usesStreamsRestrictedParams
83-
);
77+
return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh, chunkWaitTimeMillisHistogram, paramsUsed);
8478
}
8579

8680
private void ensureEnabled() {
@@ -118,7 +112,7 @@ public static class Handler implements Releasable {
118112
private final Client client;
119113
private final ActiveShardCount waitForActiveShards;
120114
private final TimeValue timeout;
121-
private final boolean usesStreamsRestrictedParams;
115+
private final Set<String> paramsUsed;
122116
private final String refresh;
123117

124118
private final ArrayList<Releasable> releasables = new ArrayList<>(4);
@@ -140,13 +134,13 @@ protected Handler(
140134
@Nullable TimeValue timeout,
141135
@Nullable String refresh,
142136
LongHistogram chunkWaitTimeMillisHistogram,
143-
boolean usesStreamsRestrictedParams
137+
Set<String> paramsUsed
144138
) {
145139
this.client = client;
146140
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
147141
this.timeout = timeout;
148142
this.refresh = refresh;
149-
this.usesStreamsRestrictedParams = usesStreamsRestrictedParams;
143+
this.paramsUsed = paramsUsed;
150144
this.incrementalOperation = indexingPressure.startIncrementalCoordinating(0, 0, false);
151145
this.chunkWaitTimeMillisHistogram = chunkWaitTimeMillisHistogram;
152146
createNewBulkRequest(EMPTY_STATE);
@@ -326,7 +320,7 @@ private void createNewBulkRequest(BulkRequest.IncrementalState incrementalState)
326320
if (refresh != null) {
327321
bulkRequest.setRefreshPolicy(refresh);
328322
}
329-
bulkRequest.streamsRestrictedParamsUsed(usesStreamsRestrictedParams);
323+
bulkRequest.requestParamsUsed(paramsUsed);
330324
}
331325
}
332326
}

server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.common.io.stream.Writeable;
3535
import org.elasticsearch.common.streams.StreamType;
3636
import org.elasticsearch.common.util.concurrent.EsExecutors;
37+
import org.elasticsearch.common.util.set.Sets;
3738
import org.elasticsearch.core.Assertions;
3839
import org.elasticsearch.core.Releasable;
3940
import org.elasticsearch.core.TimeValue;
@@ -42,15 +43,17 @@
4243
import org.elasticsearch.indices.SystemIndices;
4344
import org.elasticsearch.ingest.IngestService;
4445
import org.elasticsearch.node.NodeClosedException;
45-
import org.elasticsearch.rest.action.document.RestBulkAction;
4646
import org.elasticsearch.tasks.Task;
4747
import org.elasticsearch.threadpool.ThreadPool;
4848
import org.elasticsearch.transport.TransportService;
4949

5050
import java.io.IOException;
51+
import java.util.Collections;
5152
import java.util.HashMap;
53+
import java.util.HashSet;
5254
import java.util.Map;
5355
import java.util.Objects;
56+
import java.util.Set;
5457
import java.util.concurrent.Executor;
5558
import java.util.concurrent.TimeUnit;
5659
import java.util.function.LongSupplier;
@@ -62,6 +65,17 @@
6265
public abstract class TransportAbstractBulkAction extends HandledTransportAction<BulkRequest, BulkResponse> {
6366
private static final Logger logger = LogManager.getLogger(TransportAbstractBulkAction.class);
6467

68+
public static final Set<String> STREAMS_ALLOWED_PARAMS = new HashSet<>(4) {
69+
{
70+
add("id");
71+
add("index");
72+
add("op_type");
73+
add("pretty");
74+
add("error_trace");
75+
add("timeout");
76+
}
77+
};
78+
6579
protected final ThreadPool threadPool;
6680
protected final ClusterService clusterService;
6781
protected final IndexingPressure indexingPressure;
@@ -441,11 +455,12 @@ private void doStreamsChecks(
441455
);
442456
}
443457

444-
if (e == null && bulkRequest.streamsRestrictedParamsUsed() && req.index().equals(streamType.getStreamName())) {
458+
if (e == null && streamsRestrictedParamsUsed(bulkRequest) && req.index().equals(streamType.getStreamName())) {
445459
e = new IllegalArgumentException(
446460
"When writing to a stream, only the following parameters are allowed: ["
447-
+ String.join(",", RestBulkAction.STREAMS_ALLOWED_PARAMS)
448-
+ "]"
461+
+ String.join(", ", STREAMS_ALLOWED_PARAMS)
462+
+ "] however the following were used: "
463+
+ bulkRequest.requestParamsUsed()
449464
);
450465
}
451466

@@ -470,6 +485,13 @@ private void doStreamsChecks(
470485
}
471486
}
472487

488+
private boolean streamsRestrictedParamsUsed(BulkRequest bulkRequest) {
489+
return Sets.difference(
490+
bulkRequest.requestParamsUsed() == null ? Collections.emptySet() : bulkRequest.requestParamsUsed(),
491+
STREAMS_ALLOWED_PARAMS
492+
).isEmpty() == false;
493+
}
494+
473495
/**
474496
* This method creates any missing resources and actually applies the BulkRequest to the relevant indices
475497
* @param task The task in which this work is being done

server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.elasticsearch.common.bytes.ReleasableBytesReference;
2424
import org.elasticsearch.common.settings.ClusterSettings;
2525
import org.elasticsearch.common.settings.Settings;
26-
import org.elasticsearch.common.util.set.Sets;
2726
import org.elasticsearch.core.Releasable;
2827
import org.elasticsearch.core.Releasables;
2928
import org.elasticsearch.core.TimeValue;
@@ -41,7 +40,6 @@
4140
import java.io.IOException;
4241
import java.util.ArrayDeque;
4342
import java.util.ArrayList;
44-
import java.util.LinkedHashSet;
4543
import java.util.List;
4644
import java.util.Set;
4745
import java.util.concurrent.TimeUnit;
@@ -64,14 +62,6 @@ public class RestBulkAction extends BaseRestHandler {
6462

6563
public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated.";
6664
public static final String FAILURE_STORE_STATUS_CAPABILITY = "failure_store_status";
67-
public static final Set<String> STREAMS_ALLOWED_PARAMS = new LinkedHashSet<>(2) {
68-
{
69-
add("index");
70-
add("op_type");
71-
add("error_trace");
72-
add("timeout");
73-
}
74-
};
7565

7666
private final boolean allowExplicitIndex;
7767
private final IncrementalBulkService bulkHandler;
@@ -123,7 +113,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
123113
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
124114
bulkRequest.setRefreshPolicy(request.param("refresh"));
125115
bulkRequest.includeSourceOnError(RestUtils.getIncludeSourceOnError(request));
126-
bulkRequest.streamsRestrictedParamsUsed(usesStreamsRestrictedParams(request));
116+
bulkRequest.requestParamsUsed(request.params().keySet());
127117
ReleasableBytesReference content = request.requiredContent();
128118

129119
try {
@@ -154,15 +144,11 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
154144
return new ChunkHandler(
155145
allowExplicitIndex,
156146
request,
157-
() -> bulkHandler.newBulkRequest(waitForActiveShards, timeout, refresh, usesStreamsRestrictedParams(request))
147+
() -> bulkHandler.newBulkRequest(waitForActiveShards, timeout, refresh, request.params().keySet())
158148
);
159149
}
160150
}
161151

162-
private boolean usesStreamsRestrictedParams(RestRequest request) {
163-
return Sets.difference(request.params().keySet(), STREAMS_ALLOWED_PARAMS).isEmpty() == false;
164-
}
165-
166152
private static Exception parseFailureException(Exception e) {
167153
if (e instanceof IllegalArgumentException) {
168154
return e;

server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.action.DocWriteRequest;
1414
import org.elasticsearch.action.DocWriteResponse;
15+
import org.elasticsearch.action.bulk.TransportAbstractBulkAction;
1516
import org.elasticsearch.action.index.IndexRequest;
1617
import org.elasticsearch.action.support.ActiveShardCount;
1718
import org.elasticsearch.client.internal.node.NodeClient;
@@ -123,26 +124,9 @@ public RestChannelConsumer prepareRequest(RestRequest request, final NodeClient
123124
@Override
124125
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
125126
ReleasableBytesReference source = request.requiredContent();
126-
127127
String index = request.param("index");
128-
ProjectMetadata projectMetadata = null;
129-
130-
for (StreamType streamType : StreamType.values()) {
131-
if (index.equals(streamType.getStreamName())) {
132-
if (projectMetadata == null) {
133-
projectMetadata = clusterService.state().projectState(projectIdResolver.getProjectId()).metadata();
134-
}
135128

136-
if (streamType.streamTypeIsEnabled(projectMetadata)
137-
&& Sets.difference(request.params().keySet(), RestBulkAction.STREAMS_ALLOWED_PARAMS).isEmpty() == false) {
138-
throw new IllegalArgumentException(
139-
"When writing to a stream, only the following parameters are allowed: ["
140-
+ String.join(",", RestBulkAction.STREAMS_ALLOWED_PARAMS)
141-
+ "]"
142-
);
143-
}
144-
}
145-
}
129+
validateStreamsParamRestrictions(request, index);
146130

147131
IndexRequest indexRequest = new IndexRequest(index);
148132
indexRequest.id(request.param("id"));
@@ -179,6 +163,28 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
179163
};
180164
}
181165

166+
private void validateStreamsParamRestrictions(RestRequest request, String index) {
167+
ProjectMetadata projectMetadata = null;
168+
169+
for (StreamType streamType : StreamType.values()) {
170+
if (index.equals(streamType.getStreamName())) {
171+
if (projectMetadata == null) {
172+
projectMetadata = clusterService.state().projectState(projectIdResolver.getProjectId()).metadata();
173+
}
174+
175+
if (streamType.streamTypeIsEnabled(projectMetadata)
176+
&& Sets.difference(request.params().keySet(), TransportAbstractBulkAction.STREAMS_ALLOWED_PARAMS).isEmpty() == false) {
177+
throw new IllegalArgumentException(
178+
"When writing to a stream, only the following parameters are allowed: ["
179+
+ String.join(", ", TransportAbstractBulkAction.STREAMS_ALLOWED_PARAMS)
180+
+ "] however the following were used: "
181+
+ request.params().keySet()
182+
);
183+
}
184+
}
185+
}
186+
}
187+
182188
@Override
183189
public Set<String> supportedCapabilities() {
184190
return capabilities;

0 commit comments

Comments
 (0)