Skip to content

Commit 6c04589

Browse files
committed
Add support for passing down if restricted params are used
# Conflicts: # server/src/main/java/org/elasticsearch/TransportVersions.java
1 parent 15768f6 commit 6c04589

File tree

5 files changed

+53
-7
lines changed

5 files changed

+53
-7
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,7 @@ static TransportVersion def(int id) {
365365
public static final TransportVersion SIMULATE_INGEST_MAPPING_MERGE_TYPE = def(9_138_0_00);
366366
public static final TransportVersion ESQL_LOOKUP_JOIN_ON_MANY_FIELDS = def(9_139_0_00);
367367
public static final TransportVersion SIMULATE_INGEST_EFFECTIVE_MAPPING = def(9_140_0_00);
368+
public static final TransportVersion STREAMS_ENDPOINT_PARAM_RESTRICTIONS = def(9_141_00_00);
368369

369370
/*
370371
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public class BulkRequest extends LegacyActionRequest
8686
private Boolean globalRequireAlias;
8787
private Boolean globalRequireDatsStream;
8888
private boolean includeSourceOnError = true;
89+
private boolean streamsRestrictedParamsUsed = false;
8990

9091
private long sizeInBytes = 0;
9192

@@ -107,7 +108,10 @@ public BulkRequest(StreamInput in) throws IOException {
107108
}
108109
if (in.getTransportVersion().onOrAfter(TransportVersions.INGEST_REQUEST_INCLUDE_SOURCE_ON_ERROR)) {
109110
includeSourceOnError = in.readBoolean();
110-
} // else default value is true
111+
}
112+
if (in.getTransportVersion().onOrAfter(TransportVersions.STREAMS_ENDPOINT_PARAM_RESTRICTIONS)) {
113+
streamsRestrictedParamsUsed = in.readBoolean();
114+
}
111115
}
112116

113117
public BulkRequest(@Nullable String globalIndex) {
@@ -474,6 +478,9 @@ public void writeTo(StreamOutput out) throws IOException {
474478
if (out.getTransportVersion().onOrAfter(TransportVersions.INGEST_REQUEST_INCLUDE_SOURCE_ON_ERROR)) {
475479
out.writeBoolean(includeSourceOnError);
476480
}
481+
if (out.getTransportVersion().onOrAfter(TransportVersions.STREAMS_ENDPOINT_PARAM_RESTRICTIONS)) {
482+
out.writeBoolean(streamsRestrictedParamsUsed);
483+
}
477484
}
478485

479486
@Override
@@ -516,6 +523,14 @@ public boolean isSimulated() {
516523
return false; // Always false, but may be overridden by a subclass
517524
}
518525

526+
public boolean streamsRestrictedParamsUsed() {
527+
return streamsRestrictedParamsUsed;
528+
}
529+
530+
public void streamsRestrictedParamsUsed(boolean streamsRestrictedParamsUsed) {
531+
this.streamsRestrictedParamsUsed = streamsRestrictedParamsUsed;
532+
}
533+
519534
/*
520535
* Returns any component template substitutions that are to be used as part of this bulk request. We would likely only have
521536
* substitutions in the event of a simulated request.
@@ -554,6 +569,7 @@ public BulkRequest shallowClone() {
554569
bulkRequest.routing(routing());
555570
bulkRequest.requireAlias(requireAlias());
556571
bulkRequest.requireDataStream(requireDataStream());
572+
bulkRequest.streamsRestrictedParamsUsed(streamsRestrictedParamsUsed());
557573
return bulkRequest;
558574
}
559575
}

server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,25 @@ public IncrementalBulkService(Client client, IndexingPressure indexingPressure,
6262

6363
public Handler newBulkRequest() {
6464
ensureEnabled();
65-
return newBulkRequest(null, null, null);
65+
return newBulkRequest(null, null, null, false);
6666
}
6767

68-
public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
68+
public Handler newBulkRequest(
69+
@Nullable String waitForActiveShards,
70+
@Nullable TimeValue timeout,
71+
@Nullable String refresh,
72+
boolean usesStreamsRestrictedParams
73+
) {
6974
ensureEnabled();
70-
return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh, chunkWaitTimeMillisHistogram);
75+
return new Handler(
76+
client,
77+
indexingPressure,
78+
waitForActiveShards,
79+
timeout,
80+
refresh,
81+
chunkWaitTimeMillisHistogram,
82+
usesStreamsRestrictedParams
83+
);
7184
}
7285

7386
private void ensureEnabled() {
@@ -105,6 +118,7 @@ public static class Handler implements Releasable {
105118
private final Client client;
106119
private final ActiveShardCount waitForActiveShards;
107120
private final TimeValue timeout;
121+
private final boolean usesStreamsRestrictedParams;
108122
private final String refresh;
109123

110124
private final ArrayList<Releasable> releasables = new ArrayList<>(4);
@@ -125,12 +139,14 @@ protected Handler(
125139
@Nullable String waitForActiveShards,
126140
@Nullable TimeValue timeout,
127141
@Nullable String refresh,
128-
LongHistogram chunkWaitTimeMillisHistogram
142+
LongHistogram chunkWaitTimeMillisHistogram,
143+
boolean usesStreamsRestrictedParams
129144
) {
130145
this.client = client;
131146
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
132147
this.timeout = timeout;
133148
this.refresh = refresh;
149+
this.usesStreamsRestrictedParams = usesStreamsRestrictedParams;
134150
this.incrementalOperation = indexingPressure.startIncrementalCoordinating(0, 0, false);
135151
this.chunkWaitTimeMillisHistogram = chunkWaitTimeMillisHistogram;
136152
createNewBulkRequest(EMPTY_STATE);
@@ -310,6 +326,7 @@ private void createNewBulkRequest(BulkRequest.IncrementalState incrementalState)
310326
if (refresh != null) {
311327
bulkRequest.setRefreshPolicy(refresh);
312328
}
329+
bulkRequest.streamsRestrictedParamsUsed(usesStreamsRestrictedParams);
313330
}
314331
}
315332
}

server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.common.bytes.ReleasableBytesReference;
2424
import org.elasticsearch.common.settings.ClusterSettings;
2525
import org.elasticsearch.common.settings.Settings;
26+
import org.elasticsearch.common.util.set.Sets;
2627
import org.elasticsearch.core.Releasable;
2728
import org.elasticsearch.core.Releasables;
2829
import org.elasticsearch.core.TimeValue;
@@ -62,6 +63,7 @@ public class RestBulkAction extends BaseRestHandler {
6263

6364
public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated.";
6465
public static final String FAILURE_STORE_STATUS_CAPABILITY = "failure_store_status";
66+
public static final Set<String> STREAMS_ALLOWED_PARAMS = Set.of("timeout");
6567

6668
private final boolean allowExplicitIndex;
6769
private final IncrementalBulkService bulkHandler;
@@ -113,6 +115,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
113115
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
114116
bulkRequest.setRefreshPolicy(request.param("refresh"));
115117
bulkRequest.includeSourceOnError(RestUtils.getIncludeSourceOnError(request));
118+
bulkRequest.streamsRestrictedParamsUsed(usesStreamsRestrictedParams(request));
116119
ReleasableBytesReference content = request.requiredContent();
117120

118121
try {
@@ -140,10 +143,18 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
140143
String waitForActiveShards = request.param("wait_for_active_shards");
141144
TimeValue timeout = request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT);
142145
String refresh = request.param("refresh");
143-
return new ChunkHandler(allowExplicitIndex, request, () -> bulkHandler.newBulkRequest(waitForActiveShards, timeout, refresh));
146+
return new ChunkHandler(
147+
allowExplicitIndex,
148+
request,
149+
() -> bulkHandler.newBulkRequest(waitForActiveShards, timeout, refresh, usesStreamsRestrictedParams(request))
150+
);
144151
}
145152
}
146153

154+
private boolean usesStreamsRestrictedParams(RestRequest request) {
155+
return Sets.difference(request.params().keySet(), STREAMS_ALLOWED_PARAMS).isEmpty() == false;
156+
}
157+
147158
private static Exception parseFailureException(Exception e) {
148159
if (e instanceof IllegalArgumentException) {
149160
return e;

server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,8 @@ public void next() {
240240
null,
241241
null,
242242
null,
243-
MeterRegistry.NOOP.getLongHistogram(IncrementalBulkService.CHUNK_WAIT_TIME_HISTOGRAM_NAME)
243+
MeterRegistry.NOOP.getLongHistogram(IncrementalBulkService.CHUNK_WAIT_TIME_HISTOGRAM_NAME),
244+
false
244245
) {
245246

246247
@Override

0 commit comments

Comments
 (0)