Skip to content

Commit d798c86

Browse files
authored
[8.x] Remove timeouts from CreateIndexClusterStateUpdateRequest (#113366) (#113479)
* Remove timeouts from `CreateIndexClusterStateUpdateRequest` (#113366) This class is mostly used to carry information during the process of computing a cluster state update which creates an index, for which the `masterNodeTimeout` and `ackTimeout` fields are not meaningful. Setting these fields in those contexts is pointless, but leaving them as `null` makes it harder to reason about null-propagation. This commit removes these fields and replaces them with method arguments in the few places where they actually make sense. * Fix compile
1 parent b74e592 commit d798c86

File tree

8 files changed

+57
-37
lines changed

8 files changed

+57
-37
lines changed

server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ private CreateIndexClusterStateUpdateRequest buildUpdateRequest(String indexName
350350
request.cause(),
351351
indexName,
352352
request.index()
353-
).ackTimeout(request.ackTimeout()).performReroute(false).masterNodeTimeout(request.masterNodeTimeout());
353+
).performReroute(false);
354354
logger.debug("Auto-creating index {}", indexName);
355355
return updateRequest;
356356
}
@@ -367,7 +367,7 @@ private CreateIndexClusterStateUpdateRequest buildSystemIndexUpdateRequest(Strin
367367
request.cause(),
368368
concreteIndexName,
369369
request.index()
370-
).ackTimeout(request.ackTimeout()).masterNodeTimeout(request.masterNodeTimeout()).performReroute(false);
370+
).performReroute(false);
371371

372372
updateRequest.waitForActiveShards(ActiveShardCount.ALL);
373373

server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.elasticsearch.action.admin.indices.alias.Alias;
1313
import org.elasticsearch.action.admin.indices.shrink.ResizeType;
1414
import org.elasticsearch.action.support.ActiveShardCount;
15-
import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
1615
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
1716
import org.elasticsearch.cluster.metadata.IndexMetadata;
1817
import org.elasticsearch.common.settings.Settings;
@@ -25,7 +24,7 @@
2524
/**
2625
* Cluster state update request that allows to create an index
2726
*/
28-
public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequest<CreateIndexClusterStateUpdateRequest> {
27+
public class CreateIndexClusterStateUpdateRequest {
2928

3029
private final String cause;
3130
private final String index;

server/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,9 @@ protected void masterOperation(
150150
}
151151

152152
createIndexService.createIndex(
153+
request.masterNodeTimeout(),
154+
request.ackTimeout(),
155+
request.ackTimeout(),
153156
updateRequest,
154157
listener.map(response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName))
155158
);
@@ -166,9 +169,7 @@ private CreateIndexClusterStateUpdateRequest buildUpdateRequest(
166169
alias.isHidden(true);
167170
}
168171
}).collect(Collectors.toSet());
169-
return new CreateIndexClusterStateUpdateRequest(cause, indexName, request.index()).ackTimeout(request.ackTimeout())
170-
.masterNodeTimeout(request.masterNodeTimeout())
171-
.settings(request.settings())
172+
return new CreateIndexClusterStateUpdateRequest(cause, indexName, request.index()).settings(request.settings())
172173
.mappings(request.mappings())
173174
.aliases(aliases)
174175
.nameResolvedInstant(nameResolvedAt)
@@ -196,15 +197,7 @@ private static CreateIndexClusterStateUpdateRequest buildSystemIndexUpdateReques
196197
);
197198
}
198199

199-
final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(
200-
cause,
201-
descriptor.getPrimaryIndex(),
202-
request.index()
203-
);
204-
205-
return updateRequest.ackTimeout(request.ackTimeout())
206-
.masterNodeTimeout(request.masterNodeTimeout())
207-
.aliases(aliases)
200+
return new CreateIndexClusterStateUpdateRequest(cause, descriptor.getPrimaryIndex(), request.index()).aliases(aliases)
208201
.waitForActiveShards(ActiveShardCount.ALL)
209202
.mappings(descriptor.getMappings())
210203
.settings(settings);

server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -548,11 +548,7 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(
548548
if (settings != null) {
549549
b.put(settings);
550550
}
551-
return new CreateIndexClusterStateUpdateRequest(cause, targetIndexName, providedIndexName).ackTimeout(
552-
createIndexRequest.ackTimeout()
553-
)
554-
.masterNodeTimeout(createIndexRequest.masterNodeTimeout())
555-
.settings(b.build())
551+
return new CreateIndexClusterStateUpdateRequest(cause, targetIndexName, providedIndexName).settings(b.build())
556552
.aliases(createIndexRequest.aliases())
557553
.waitForActiveShards(ActiveShardCount.NONE) // not waiting for shards here, will wait on the alias switch operation
558554
.mappings(createIndexRequest.mappings())

server/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@ protected void masterOperation(
136136
return;
137137
}
138138
createIndexService.createIndex(
139+
resizeRequest.masterNodeTimeout(),
140+
resizeRequest.ackTimeout(),
141+
resizeRequest.ackTimeout(),
139142
updateRequest,
140143
delegatedListener.map(
141144
response -> new CreateIndexResponse(
@@ -234,8 +237,6 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(
234237
// mappings are updated on the node when creating in the shards, this prevents race-conditions since all mapping must be
235238
// applied once we took the snapshot and if somebody messes things up and switches the index read/write and adds docs we
236239
// miss the mappings for everything is corrupted and hard to debug
237-
.ackTimeout(targetIndex.ackTimeout())
238-
.masterNodeTimeout(targetIndex.masterNodeTimeout())
239240
.settings(targetIndex.settings())
240241
.aliases(targetIndex.aliases())
241242
.waitForActiveShards(targetIndex.waitForActiveShards())

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.elasticsearch.core.Nullable;
5353
import org.elasticsearch.core.PathUtils;
5454
import org.elasticsearch.core.SuppressForbidden;
55+
import org.elasticsearch.core.TimeValue;
5556
import org.elasticsearch.env.Environment;
5657
import org.elasticsearch.index.Index;
5758
import org.elasticsearch.index.IndexMode;
@@ -254,12 +255,23 @@ public static void validateIndexOrAliasName(String index, BiFunction<String, Str
254255
* the timeout, then {@link ShardsAcknowledgedResponse#isShardsAcknowledged()} will
255256
* return true, otherwise if the operation timed out, then it will return false.
256257
*
258+
* @param masterNodeTimeout timeout on cluster state update in pending task queue
259+
* @param ackTimeout timeout on waiting for all nodes to ack the cluster state update
260+
* @param waitForActiveShardsTimeout timeout for waiting for the {@link ActiveShardCount} specified in
261+
* {@link CreateIndexClusterStateUpdateRequest#waitForActiveShards()} to be satisfied.
262+
* May also be {@code null}, in which case it waits forever.
257263
* @param request the index creation cluster state update request
258264
* @param listener the listener on which to send the index creation cluster state update response
259265
*/
260-
public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ShardsAcknowledgedResponse> listener) {
266+
public void createIndex(
267+
final TimeValue masterNodeTimeout,
268+
final TimeValue ackTimeout,
269+
@Nullable final TimeValue waitForActiveShardsTimeout,
270+
final CreateIndexClusterStateUpdateRequest request,
271+
final ActionListener<ShardsAcknowledgedResponse> listener
272+
) {
261273
logger.trace("createIndex[{}]", request);
262-
onlyCreateIndex(request, listener.delegateFailureAndWrap((delegate, response) -> {
274+
onlyCreateIndex(masterNodeTimeout, ackTimeout, request, listener.delegateFailureAndWrap((delegate, response) -> {
263275
if (response.isAcknowledged()) {
264276
logger.trace(
265277
"[{}] index creation acknowledged, waiting for active shards [{}]",
@@ -270,7 +282,7 @@ public void createIndex(final CreateIndexClusterStateUpdateRequest request, fina
270282
clusterService,
271283
new String[] { request.index() },
272284
request.waitForActiveShards(),
273-
request.ackTimeout(),
285+
waitForActiveShardsTimeout,
274286
delegate.map(shardsAcknowledged -> {
275287
if (shardsAcknowledged == false) {
276288
logger.debug(
@@ -290,18 +302,18 @@ public void createIndex(final CreateIndexClusterStateUpdateRequest request, fina
290302
}));
291303
}
292304

293-
private void onlyCreateIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<AcknowledgedResponse> listener) {
305+
private void onlyCreateIndex(
306+
final TimeValue masterNodeTimeout,
307+
final TimeValue ackTimeout,
308+
final CreateIndexClusterStateUpdateRequest request,
309+
final ActionListener<AcknowledgedResponse> listener
310+
) {
294311
normalizeRequestSetting(request);
295312

296313
var delegate = new AllocationActionListener<>(listener, threadPool.getThreadContext());
297314
submitUnbatchedTask(
298315
"create-index [" + request.index() + "], cause [" + request.cause() + "]",
299-
new AckedClusterStateUpdateTask(
300-
Priority.URGENT,
301-
request.masterNodeTimeout(),
302-
request.ackTimeout(),
303-
delegate.clusterStateUpdate()
304-
) {
316+
new AckedClusterStateUpdateTask(Priority.URGENT, masterNodeTimeout, ackTimeout, delegate.clusterStateUpdate()) {
305317

306318
@Override
307319
public ClusterState execute(ClusterState currentState) throws Exception {

server/src/main/java/org/elasticsearch/upgrades/SystemIndexMigrator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,7 @@ private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<
500500
createRequest.waitForActiveShards(ActiveShardCount.ALL)
501501
.mappings(migrationInfo.getMappings())
502502
.settings(Objects.requireNonNullElse(settingsBuilder.build(), Settings.EMPTY));
503-
metadataCreateIndexService.createIndex(createRequest, listener);
503+
metadataCreateIndexService.createIndex(TimeValue.MINUS_ONE, TimeValue.ZERO, null, createRequest, listener);
504504
}
505505

506506
private CheckedBiConsumer<ActionListener<BulkByScrollResponse>, AcknowledgedResponse, Exception> setAliasAndRemoveOldIndex(

server/src/test/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexActionTests.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.common.settings.Settings;
2828
import org.elasticsearch.common.transport.TransportAddress;
2929
import org.elasticsearch.common.util.concurrent.ThreadContext;
30+
import org.elasticsearch.core.TimeValue;
3031
import org.elasticsearch.indices.SystemIndexDescriptor;
3132
import org.elasticsearch.indices.SystemIndices;
3233
import org.elasticsearch.tasks.Task;
@@ -147,7 +148,13 @@ public void testSystemIndicesCannotBeCreatedUnhidden() {
147148

148149
ArgumentCaptor<Exception> exceptionArgumentCaptor = ArgumentCaptor.forClass(Exception.class);
149150
verify(mockListener, times(0)).onResponse(any());
150-
verify(metadataCreateIndexService, times(0)).createIndex(any(), any());
151+
verify(metadataCreateIndexService, times(0)).createIndex(
152+
any(TimeValue.class),
153+
any(TimeValue.class),
154+
any(TimeValue.class),
155+
any(),
156+
any()
157+
);
151158
verify(mockListener, times(1)).onFailure(exceptionArgumentCaptor.capture());
152159

153160
Exception e = exceptionArgumentCaptor.getValue();
@@ -167,7 +174,13 @@ public void testSystemIndicesCreatedHiddenByDefault() {
167174
CreateIndexClusterStateUpdateRequest.class
168175
);
169176
verify(mockListener, times(0)).onFailure(any());
170-
verify(metadataCreateIndexService, times(1)).createIndex(createRequestArgumentCaptor.capture(), any());
177+
verify(metadataCreateIndexService, times(1)).createIndex(
178+
any(TimeValue.class),
179+
any(TimeValue.class),
180+
any(TimeValue.class),
181+
createRequestArgumentCaptor.capture(),
182+
any()
183+
);
171184

172185
CreateIndexClusterStateUpdateRequest processedRequest = createRequestArgumentCaptor.getValue();
173186
assertTrue(processedRequest.settings().getAsBoolean(SETTING_INDEX_HIDDEN, false));
@@ -187,7 +200,13 @@ public void testSystemAliasCreatedHiddenByDefault() {
187200
CreateIndexClusterStateUpdateRequest.class
188201
);
189202
verify(mockListener, times(0)).onFailure(any());
190-
verify(metadataCreateIndexService, times(1)).createIndex(createRequestArgumentCaptor.capture(), any());
203+
verify(metadataCreateIndexService, times(1)).createIndex(
204+
any(TimeValue.class),
205+
any(TimeValue.class),
206+
any(TimeValue.class),
207+
createRequestArgumentCaptor.capture(),
208+
any()
209+
);
191210

192211
CreateIndexClusterStateUpdateRequest processedRequest = createRequestArgumentCaptor.getValue();
193212
assertTrue(processedRequest.aliases().contains(new Alias(SYSTEM_ALIAS_NAME).isHidden(true)));

0 commit comments

Comments
 (0)