Skip to content

Commit 006753c

Browse files
authored
Configurable priority/max-timeout for auto-create (elastic#140770)
Similar to elastic#139898 and elastic#139699: users can submit auto-create tasks at an unreasonable rate too.
1 parent 528bfe9 commit 006753c

File tree

3 files changed

+123
-2
lines changed

3 files changed

+123
-2
lines changed

server/src/internalClusterTest/java/org/elasticsearch/cluster/metadata/CreateIndexPriorityIT.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,15 @@
99

1010
package org.elasticsearch.cluster.metadata;
1111

12+
import org.elasticsearch.action.admin.indices.create.AutoCreateAction;
1213
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
1314
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
1415
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
1516
import org.elasticsearch.action.admin.indices.mapping.put.TransportAutoPutMappingAction;
1617
import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction;
18+
import org.elasticsearch.action.bulk.BulkRequest;
19+
import org.elasticsearch.action.bulk.TransportBulkAction;
20+
import org.elasticsearch.action.index.IndexRequest;
1721
import org.elasticsearch.action.support.SubscribableListener;
1822
import org.elasticsearch.cluster.ClusterState;
1923
import org.elasticsearch.cluster.ClusterStateUpdateTask;
@@ -25,6 +29,7 @@
2529
import org.elasticsearch.plugins.Plugin;
2630
import org.elasticsearch.test.ClusterServiceUtils;
2731
import org.elasticsearch.test.ESIntegTestCase;
32+
import org.elasticsearch.xcontent.XContentType;
2833
import org.hamcrest.Matchers;
2934

3035
import java.util.Collection;
@@ -38,6 +43,7 @@ public static class TestPlugin extends Plugin {
3843
public List<Setting<?>> getSettings() {
3944
return CollectionUtils.appendToCopyNoNullElements(
4045
super.getSettings(),
46+
AutoCreateAction.AUTO_CREATE_INDEX_PRIORITY_SETTING,
4147
MetadataCreateIndexService.CREATE_INDEX_PRIORITY_SETTING,
4248
MetadataMappingService.PUT_MAPPING_PRIORITY_SETTING
4349
);
@@ -107,6 +113,7 @@ void awaitPendingAndStop(String taskSourcePrefix, SubscribableListener<?> listen
107113
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
108114
return Settings.builder()
109115
.put(otherSettings)
116+
.put(AutoCreateAction.AUTO_CREATE_INDEX_PRIORITY_SETTING.getKey(), "NORMAL")
110117
.put(MetadataCreateIndexService.CREATE_INDEX_PRIORITY_SETTING.getKey(), "NORMAL")
111118
.put(MetadataMappingService.PUT_MAPPING_PRIORITY_SETTING.getKey(), "NORMAL")
112119
.build();
@@ -131,6 +138,23 @@ public void testReducePriorities() {
131138
client().execute(TransportCreateIndexAction.TYPE, new CreateIndexRequest(indexName))
132139
);
133140

141+
// starve all tasks at NORMAL or below again
142+
final var autoCreateIndexBlockingTask = new RepeatingTask(masterClusterService);
143+
autoCreateIndexBlockingTask.submitTask();
144+
145+
// now submit an auto-create task, wait for it to be enqueued, verify its priority by the fact that it doesn't execute,
146+
// then remove the starvation to allow it through
147+
final var autoIndexName = randomIndexName();
148+
final var indexRequest = new IndexRequest().source("{}", XContentType.JSON);
149+
autoCreateIndexBlockingTask.awaitPendingAndStop(
150+
"auto create [" + autoIndexName + "]",
151+
ClusterServiceUtils.addTemporaryStateListener(
152+
masterClusterService,
153+
cs -> cs.metadata().getProject(ProjectId.DEFAULT).index(autoIndexName) != null
154+
),
155+
client().execute(TransportBulkAction.TYPE, new BulkRequest(autoIndexName).add(indexRequest))
156+
);
157+
134158
// starve all tasks at NORMAL or below again
135159
final var putMappingBlockingTask = new RepeatingTask(masterClusterService);
136160
putMappingBlockingTask.submitTask();

server/src/internalClusterTest/java/org/elasticsearch/cluster/metadata/CreateIndexTimeoutIT.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@
1919
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
2020
import org.elasticsearch.action.admin.indices.mapping.put.TransportAutoPutMappingAction;
2121
import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction;
22+
import org.elasticsearch.action.support.PlainActionFuture;
2223
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2324
import org.elasticsearch.action.support.master.MasterNodeRequest;
2425
import org.elasticsearch.client.Request;
26+
import org.elasticsearch.client.Response;
27+
import org.elasticsearch.client.ResponseListener;
2528
import org.elasticsearch.cluster.ClusterState;
2629
import org.elasticsearch.cluster.ClusterStateUpdateTask;
2730
import org.elasticsearch.cluster.service.ClusterService;
@@ -35,11 +38,14 @@
3538
import org.elasticsearch.rest.RestUtils;
3639
import org.elasticsearch.test.ESIntegTestCase;
3740
import org.elasticsearch.test.rest.ESRestTestCase;
41+
import org.elasticsearch.test.rest.ObjectPath;
3842

3943
import java.util.Collection;
4044
import java.util.List;
4145
import java.util.concurrent.CyclicBarrier;
46+
import java.util.function.Consumer;
4247

48+
import static org.elasticsearch.action.admin.indices.create.AutoCreateAction.AUTO_CREATE_INDEX_MAX_TIMEOUT_SETTING;
4349
import static org.elasticsearch.cluster.metadata.MetadataCreateIndexService.CREATE_INDEX_MAX_TIMEOUT_SETTING;
4450
import static org.elasticsearch.cluster.metadata.MetadataMappingService.PUT_MAPPING_MAX_TIMEOUT_SETTING;
4551
import static org.hamcrest.Matchers.allOf;
@@ -53,6 +59,7 @@ public static class TestPlugin extends Plugin {
5359
public List<Setting<?>> getSettings() {
5460
return CollectionUtils.appendToCopyNoNullElements(
5561
super.getSettings(),
62+
AUTO_CREATE_INDEX_MAX_TIMEOUT_SETTING,
5663
CREATE_INDEX_MAX_TIMEOUT_SETTING,
5764
PUT_MAPPING_MAX_TIMEOUT_SETTING
5865
);
@@ -68,6 +75,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
6875
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
6976
return Settings.builder()
7077
.put(otherSettings)
78+
.put(AUTO_CREATE_INDEX_MAX_TIMEOUT_SETTING.getKey(), "1ms")
7179
.put(CREATE_INDEX_MAX_TIMEOUT_SETTING.getKey(), "1ms")
7280
.put(PUT_MAPPING_MAX_TIMEOUT_SETTING.getKey(), "1ms")
7381
.build();
@@ -97,6 +105,67 @@ protected boolean addMockHttpTransport() {
97105
return false; // enable HTTP
98106
}
99107

108+
public void testAutoCreateTimeoutLimit() throws Exception {
109+
final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
110+
final var restClient = getRestClient();
111+
final var indexName = randomIndexName();
112+
113+
final Consumer<Request> timeoutApplier;
114+
if (randomBoolean()) {
115+
timeoutApplier = r -> {};
116+
} else {
117+
var timeout = randomFrom(
118+
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
119+
TimeValue.MINUS_ONE,
120+
TimeValue.THIRTY_SECONDS,
121+
TimeValue.MAX_VALUE
122+
).getStringRep();
123+
timeoutApplier = r -> r.addParameter("timeout", timeout);
124+
}
125+
126+
try (var ignored = withBlockedMasterService(masterClusterService)) {
127+
final var request = new Request("PUT", "/" + indexName + "/_bulk");
128+
request.setJsonEntity("""
129+
{"index":{}}
130+
{}
131+
""");
132+
timeoutApplier.accept(request);
133+
final var response = ObjectPath.createFromResponse(restClient.performRequest(request));
134+
logger.info("--> response={}", response);
135+
assertTrue(response.evaluate("errors"));
136+
assertEquals(RestStatus.TOO_MANY_REQUESTS.getStatus(), (int) response.evaluate("items.0.index.status"));
137+
assertThat(response.evaluate("items.0.index.error.type"), equalTo("process_cluster_event_timeout_exception"));
138+
assertThat(response.evaluate("items.0.index.error.reason"), containsString("failed to process cluster event"));
139+
}
140+
141+
updateClusterSettings(Settings.builder().put(AUTO_CREATE_INDEX_MAX_TIMEOUT_SETTING.getKey(), randomFrom("-1", "60s", "1h")));
142+
143+
final PlainActionFuture<Response> bulkResponseFuture = new PlainActionFuture<>();
144+
try (var ignored = withBlockedMasterService(masterClusterService)) {
145+
final var request = new Request("PUT", "/" + indexName + "/_bulk");
146+
request.setJsonEntity("""
147+
{"index":{}}
148+
{}
149+
""");
150+
timeoutApplier.accept(request);
151+
restClient.performRequestAsync(request, new ResponseListener() {
152+
@Override
153+
public void onSuccess(Response response) {
154+
bulkResponseFuture.onResponse(response);
155+
}
156+
157+
@Override
158+
public void onFailure(Exception exception) {
159+
bulkResponseFuture.onFailure(exception);
160+
}
161+
});
162+
awaitPendingTask(masterClusterService, "auto create [" + indexName + "]");
163+
}
164+
safeGet(bulkResponseFuture);
165+
166+
updateClusterSettings(Settings.builder().putNull(AUTO_CREATE_INDEX_MAX_TIMEOUT_SETTING.getKey()));
167+
}
168+
100169
public void testReducePriorities() throws Exception {
101170
final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
102171
final var restClient = getRestClient();

server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,10 @@
3838
import org.elasticsearch.cluster.routing.allocation.AllocationService;
3939
import org.elasticsearch.cluster.routing.allocation.allocator.AllocationActionMultiListener;
4040
import org.elasticsearch.cluster.service.ClusterService;
41+
import org.elasticsearch.cluster.service.MasterService;
4142
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
4243
import org.elasticsearch.common.Priority;
44+
import org.elasticsearch.common.settings.Setting;
4345
import org.elasticsearch.common.settings.Settings;
4446
import org.elasticsearch.common.util.Maps;
4547
import org.elasticsearch.common.util.concurrent.EsExecutors;
@@ -71,6 +73,22 @@ public final class AutoCreateAction extends ActionType<CreateIndexResponse> {
7173
public static final AutoCreateAction INSTANCE = new AutoCreateAction();
7274
public static final String NAME = "indices:admin/auto_create";
7375

76+
// Deliberately not registered so it can only be set in tests/plugins.
77+
public static final Setting<Priority> AUTO_CREATE_INDEX_PRIORITY_SETTING = Setting.enumSetting(
78+
Priority.class,
79+
"cluster.service.auto_create_index.priority",
80+
Priority.URGENT,
81+
Setting.Property.NodeScope
82+
);
83+
84+
// Deliberately not registered so it can only be set in tests/plugins.
85+
public static final Setting<TimeValue> AUTO_CREATE_INDEX_MAX_TIMEOUT_SETTING = Setting.timeSetting(
86+
"cluster.service.auto_create_index.max_timeout",
87+
TimeValue.MINUS_ONE,
88+
Setting.Property.NodeScope,
89+
Setting.Property.Dynamic
90+
);
91+
7492
private AutoCreateAction() {
7593
super(NAME);
7694
}
@@ -84,6 +102,7 @@ public static final class TransportAction extends TransportMasterNodeAction<Crea
84102
private final SystemIndices systemIndices;
85103

86104
private final MasterServiceTaskQueue<CreateIndexTask> taskQueue;
105+
private volatile TimeValue maxMasterNodeTimeout;
87106

88107
@Inject
89108
public TransportAction(
@@ -113,7 +132,8 @@ public TransportAction(
113132
this.metadataCreateDataStreamService = metadataCreateDataStreamService;
114133
this.autoCreateIndex = autoCreateIndex;
115134
this.projectResolver = projectResolver;
116-
this.taskQueue = clusterService.createTaskQueue("auto-create", Priority.URGENT, batchExecutionContext -> {
135+
final var priority = AUTO_CREATE_INDEX_PRIORITY_SETTING.get(clusterService.getSettings());
136+
this.taskQueue = clusterService.createTaskQueue("auto-create", priority, batchExecutionContext -> {
117137
final var listener = new AllocationActionMultiListener<CreateIndexResponse>(threadPool.getThreadContext());
118138
final var taskContexts = batchExecutionContext.taskContexts();
119139
final var successfulRequests = Maps.<CreateIndexRequest, List<String>>newMapWithExpectedSize(taskContexts.size());
@@ -136,6 +156,14 @@ public TransportAction(
136156
}
137157
return state;
138158
});
159+
160+
if (clusterService.getClusterSettings().isDynamicSetting(AUTO_CREATE_INDEX_MAX_TIMEOUT_SETTING.getKey())) {
161+
// setting only registered in some tests today
162+
clusterService.getClusterSettings()
163+
.initializeAndWatch(AUTO_CREATE_INDEX_MAX_TIMEOUT_SETTING, v -> maxMasterNodeTimeout = v);
164+
} else {
165+
maxMasterNodeTimeout = AUTO_CREATE_INDEX_MAX_TIMEOUT_SETTING.get(clusterService.getSettings());
166+
}
139167
}
140168

141169
@Override
@@ -148,7 +176,7 @@ protected void masterOperation(
148176
taskQueue.submitTask(
149177
"auto create [" + request.index() + "]",
150178
new CreateIndexTask(request, projectResolver.getProjectId(), listener),
151-
request.masterNodeTimeout()
179+
MasterService.maybeLimitMasterNodeTimeout(request.masterNodeTimeout(), maxMasterNodeTimeout)
152180
);
153181
}
154182

0 commit comments

Comments
 (0)