Skip to content

Commit a1fd7bc

Browse files
authored
Fix trappy timeouts in persistent tasks requests (#120514)
Ensure that callers constructing these master-node requests pass in an explicit timeout. Relates #107984
1 parent 7563e71 commit a1fd7bc

File tree

30 files changed

+188
-111
lines changed

30 files changed

+188
-111
lines changed

server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTaskCreationFailureIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public void onFailure(Exception e) {
113113
UUIDs.base64UUID(),
114114
FailingCreationPersistentTaskExecutor.TASK_NAME,
115115
new FailingCreationTaskParams(),
116-
null,
116+
TEST_REQUEST_TIMEOUT,
117117
l.map(ignored -> null)
118118
)
119119
);

server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTaskInitializationFailureIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void testPersistentTasksThatFailDuringInitializationAreRemovedFromCluster
5050
UUIDs.base64UUID(),
5151
FailingInitializationPersistentTaskExecutor.TASK_NAME,
5252
new FailingInitializationTaskParams(),
53-
null,
53+
TEST_REQUEST_TIMEOUT,
5454
startPersistentTaskFuture
5555
);
5656
startPersistentTaskFuture.actionGet();

server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTasksExecutorFullRestartIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public void testFullClusterRestart() throws Exception {
4545
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
4646
futures.add(future);
4747
taskIds[i] = UUIDs.base64UUID();
48-
service.sendStartRequest(taskIds[i], TestPersistentTasksExecutor.NAME, new TestParams("Blah"), null, future);
48+
service.sendStartRequest(taskIds[i], TestPersistentTasksExecutor.NAME, new TestParams("Blah"), TEST_REQUEST_TIMEOUT, future);
4949
}
5050

5151
for (int i = 0; i < numberOfTasks; i++) {

server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java

Lines changed: 74 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,13 @@ public static class WaitForPersistentTaskFuture<Params extends PersistentTaskPar
6969
public void testPersistentActionFailure() throws Exception {
7070
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
7171
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
72-
persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), null, future);
72+
persistentTasksService.sendStartRequest(
73+
UUIDs.base64UUID(),
74+
TestPersistentTasksExecutor.NAME,
75+
new TestParams("Blah"),
76+
TEST_REQUEST_TIMEOUT,
77+
future
78+
);
7379
long allocationId = future.get().getAllocationId();
7480
waitForTaskToStart();
7581
TaskInfo firstRunningTask = clusterAdmin().prepareListTasks()
@@ -100,7 +106,13 @@ public void testPersistentActionCompletion() throws Exception {
100106
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
101107
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
102108
String taskId = UUIDs.base64UUID();
103-
persistentTasksService.sendStartRequest(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), null, future);
109+
persistentTasksService.sendStartRequest(
110+
taskId,
111+
TestPersistentTasksExecutor.NAME,
112+
new TestParams("Blah"),
113+
TEST_REQUEST_TIMEOUT,
114+
future
115+
);
104116
long allocationId = future.get().getAllocationId();
105117
waitForTaskToStart();
106118
TaskInfo firstRunningTask = clusterAdmin().prepareListTasks()
@@ -119,7 +131,14 @@ public void testPersistentActionCompletion() throws Exception {
119131
logger.info("Simulating errant completion notification");
120132
// try sending completion request with incorrect allocation id
121133
PlainActionFuture<PersistentTask<?>> failedCompletionNotificationFuture = new PlainActionFuture<>();
122-
persistentTasksService.sendCompletionRequest(taskId, Long.MAX_VALUE, null, null, null, failedCompletionNotificationFuture);
134+
persistentTasksService.sendCompletionRequest(
135+
taskId,
136+
Long.MAX_VALUE,
137+
null,
138+
null,
139+
TEST_REQUEST_TIMEOUT,
140+
failedCompletionNotificationFuture
141+
);
123142
assertFutureThrows(failedCompletionNotificationFuture, ResourceNotFoundException.class);
124143
// Make sure that the task is still running
125144
assertThat(
@@ -141,7 +160,13 @@ public void testPersistentActionWithNoAvailableNode() throws Exception {
141160
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
142161
TestParams testParams = new TestParams("Blah");
143162
testParams.setExecutorNodeAttr("test");
144-
persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, null, future);
163+
persistentTasksService.sendStartRequest(
164+
UUIDs.base64UUID(),
165+
TestPersistentTasksExecutor.NAME,
166+
testParams,
167+
TEST_REQUEST_TIMEOUT,
168+
future
169+
);
145170
String taskId = future.get().getId();
146171

147172
Settings nodeSettings = Settings.builder().put(nodeSettings(0, Settings.EMPTY)).put("node.attr.test_attr", "test").build();
@@ -165,7 +190,7 @@ public void testPersistentActionWithNoAvailableNode() throws Exception {
165190

166191
// Remove the persistent task
167192
PlainActionFuture<PersistentTask<?>> removeFuture = new PlainActionFuture<>();
168-
persistentTasksService.sendRemoveRequest(taskId, null, removeFuture);
193+
persistentTasksService.sendRemoveRequest(taskId, TEST_REQUEST_TIMEOUT, removeFuture);
169194
assertEquals(removeFuture.get().getId(), taskId);
170195
}
171196

@@ -182,7 +207,13 @@ public void testPersistentActionWithNonClusterStateCondition() throws Exception
182207
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
183208
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
184209
TestParams testParams = new TestParams("Blah");
185-
persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, null, future);
210+
persistentTasksService.sendStartRequest(
211+
UUIDs.base64UUID(),
212+
TestPersistentTasksExecutor.NAME,
213+
testParams,
214+
TEST_REQUEST_TIMEOUT,
215+
future
216+
);
186217
String taskId = future.get().getId();
187218

188219
assertThat(clusterAdmin().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks(), empty());
@@ -197,14 +228,20 @@ public void testPersistentActionWithNonClusterStateCondition() throws Exception
197228

198229
// Remove the persistent task
199230
PlainActionFuture<PersistentTask<?>> removeFuture = new PlainActionFuture<>();
200-
persistentTasksService.sendRemoveRequest(taskId, null, removeFuture);
231+
persistentTasksService.sendRemoveRequest(taskId, TEST_REQUEST_TIMEOUT, removeFuture);
201232
assertEquals(removeFuture.get().getId(), taskId);
202233
}
203234

204235
public void testPersistentActionStatusUpdate() throws Exception {
205236
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
206237
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
207-
persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), null, future);
238+
persistentTasksService.sendStartRequest(
239+
UUIDs.base64UUID(),
240+
TestPersistentTasksExecutor.NAME,
241+
new TestParams("Blah"),
242+
TEST_REQUEST_TIMEOUT,
243+
future
244+
);
208245
String taskId = future.get().getId();
209246
waitForTaskToStart();
210247
TaskInfo firstRunningTask = clusterAdmin().prepareListTasks()
@@ -250,7 +287,7 @@ public void testPersistentActionStatusUpdate() throws Exception {
250287
assertFutureThrows(future1, IllegalStateException.class, "timed out after 10ms");
251288

252289
PlainActionFuture<PersistentTask<?>> failedUpdateFuture = new PlainActionFuture<>();
253-
persistentTasksService.sendUpdateStateRequest(taskId, -2, new State("should fail"), null, failedUpdateFuture);
290+
persistentTasksService.sendUpdateStateRequest(taskId, -2, new State("should fail"), TEST_REQUEST_TIMEOUT, failedUpdateFuture);
254291
assertFutureThrows(
255292
failedUpdateFuture,
256293
ResourceNotFoundException.class,
@@ -275,11 +312,23 @@ public void testCreatePersistentTaskWithDuplicateId() throws Exception {
275312
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
276313
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
277314
String taskId = UUIDs.base64UUID();
278-
persistentTasksService.sendStartRequest(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), null, future);
315+
persistentTasksService.sendStartRequest(
316+
taskId,
317+
TestPersistentTasksExecutor.NAME,
318+
new TestParams("Blah"),
319+
TEST_REQUEST_TIMEOUT,
320+
future
321+
);
279322
future.get();
280323

281324
PlainActionFuture<PersistentTask<TestParams>> future2 = new PlainActionFuture<>();
282-
persistentTasksService.sendStartRequest(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), null, future2);
325+
persistentTasksService.sendStartRequest(
326+
taskId,
327+
TestPersistentTasksExecutor.NAME,
328+
new TestParams("Blah"),
329+
TEST_REQUEST_TIMEOUT,
330+
future2
331+
);
283332
assertFutureThrows(future2, ResourceAlreadyExistsException.class);
284333

285334
waitForTaskToStart();
@@ -315,7 +364,13 @@ public void testUnassignRunningPersistentTask() throws Exception {
315364
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
316365
TestParams testParams = new TestParams("Blah");
317366
testParams.setExecutorNodeAttr("test");
318-
persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, null, future);
367+
persistentTasksService.sendStartRequest(
368+
UUIDs.base64UUID(),
369+
TestPersistentTasksExecutor.NAME,
370+
testParams,
371+
TEST_REQUEST_TIMEOUT,
372+
future
373+
);
319374
PersistentTask<TestParams> task = future.get();
320375
String taskId = task.getId();
321376

@@ -366,7 +421,13 @@ public void testAbortLocally() throws Exception {
366421
persistentTasksClusterService.setRecheckInterval(TimeValue.timeValueMillis(1));
367422
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
368423
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
369-
persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), null, future);
424+
persistentTasksService.sendStartRequest(
425+
UUIDs.base64UUID(),
426+
TestPersistentTasksExecutor.NAME,
427+
new TestParams("Blah"),
428+
TEST_REQUEST_TIMEOUT,
429+
future
430+
);
370431
String taskId = future.get().getId();
371432
long allocationId = future.get().getAllocationId();
372433
waitForTaskToStart();

server/src/internalClusterTest/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public void testEnableAssignmentAfterRestart() throws Exception {
5252
"task_" + i,
5353
TestPersistentTasksExecutor.NAME,
5454
new TestParams(randomAlphaOfLength(10)),
55-
null,
55+
TEST_REQUEST_TIMEOUT,
5656
ActionListener.running(latch::countDown)
5757
);
5858
}

server/src/main/java/org/elasticsearch/action/admin/cluster/migration/TransportPostFeatureUpgradeAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2222
import org.elasticsearch.cluster.service.ClusterService;
2323
import org.elasticsearch.common.util.concurrent.EsExecutors;
24+
import org.elasticsearch.core.TimeValue;
2425
import org.elasticsearch.indices.SystemIndices;
2526
import org.elasticsearch.injection.guice.Inject;
2627
import org.elasticsearch.persistent.PersistentTasksService;
@@ -95,7 +96,7 @@ protected void masterOperation(
9596
SYSTEM_INDEX_UPGRADE_TASK_NAME,
9697
SYSTEM_INDEX_UPGRADE_TASK_NAME,
9798
new SystemIndexMigrationTaskParams(),
98-
null,
99+
TimeValue.THIRTY_SECONDS /* TODO should this be configurable? longer by default? infinite? */,
99100
ActionListener.wrap(startedTask -> {
100101
listener.onResponse(new PostFeatureUpgradeResponse(true, featuresToMigrate, null, null));
101102
}, ex -> {

server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.common.settings.ClusterSettings;
2323
import org.elasticsearch.common.settings.Setting;
2424
import org.elasticsearch.common.settings.Settings;
25+
import org.elasticsearch.core.TimeValue;
2526
import org.elasticsearch.features.FeatureService;
2627
import org.elasticsearch.node.NodeClosedException;
2728
import org.elasticsearch.persistent.AllocatedPersistentTask;
@@ -162,7 +163,7 @@ void startTask(ClusterChangedEvent event) {
162163
TASK_NAME,
163164
TASK_NAME,
164165
new HealthNodeTaskParams(),
165-
null,
166+
TimeValue.THIRTY_SECONDS /* TODO should this be configurable? longer by default? infinite? */,
166167
ActionListener.wrap(r -> logger.debug("Created the health node task"), e -> {
167168
if (e instanceof NodeClosedException) {
168169
logger.debug("Failed to create health node task because node is shutting down", e);

server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,13 @@ public void updatePersistentTaskState(
6565
final PersistentTaskState state,
6666
final ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener
6767
) {
68-
persistentTasksService.sendUpdateStateRequest(persistentTaskId, allocationId, state, null, listener);
68+
persistentTasksService.sendUpdateStateRequest(
69+
persistentTaskId,
70+
allocationId,
71+
state,
72+
TimeValue.THIRTY_SECONDS /* TODO should this be longer? infinite? */,
73+
listener
74+
);
6975
}
7076

7177
public String getPersistentTaskId() {
@@ -201,7 +207,7 @@ private void completeAndNotifyIfNeeded(@Nullable Exception failure, @Nullable St
201207
getAllocationId(),
202208
failure,
203209
localAbortReason,
204-
null,
210+
TimeValue.THIRTY_SECONDS /* TODO should this be longer? infinite? */,
205211
new ActionListener<>() {
206212
@Override
207213
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {

server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.cluster.service.ClusterService;
2222
import org.elasticsearch.common.io.stream.StreamInput;
2323
import org.elasticsearch.common.io.stream.StreamOutput;
24+
import org.elasticsearch.core.TimeValue;
2425
import org.elasticsearch.injection.guice.Inject;
2526
import org.elasticsearch.tasks.Task;
2627
import org.elasticsearch.threadpool.ThreadPool;
@@ -56,8 +57,8 @@ public Request(StreamInput in) throws IOException {
5657
localAbortReason = in.readOptionalString();
5758
}
5859

59-
public Request(String taskId, long allocationId, Exception exception, String localAbortReason) {
60-
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
60+
public Request(TimeValue masterNodeTimeout, String taskId, long allocationId, Exception exception, String localAbortReason) {
61+
super(masterNodeTimeout);
6162
this.taskId = taskId;
6263
this.exception = exception;
6364
this.allocationId = allocationId;

server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.common.io.stream.StreamInput;
1919
import org.elasticsearch.common.io.stream.StreamOutput;
2020
import org.elasticsearch.common.util.concurrent.EsExecutors;
21+
import org.elasticsearch.core.TimeValue;
2122
import org.elasticsearch.gateway.GatewayService;
2223
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
2324
import org.elasticsearch.tasks.Task;
@@ -310,7 +311,7 @@ private <Params extends PersistentTaskParams> void notifyMasterOfFailedTask(
310311
taskInProgress.getAllocationId(),
311312
originalException,
312313
null,
313-
null,
314+
TimeValue.THIRTY_SECONDS /* TODO should this be longer? infinite? */,
314315
new ActionListener<>() {
315316
@Override
316317
public void onResponse(PersistentTask<?> persistentTask) {
@@ -346,7 +347,7 @@ private void cancelTask(Long allocationId) {
346347
if (task.markAsCancelled()) {
347348
// Cancel the local task using the task manager
348349
String reason = "task has been removed, cancelling locally";
349-
persistentTasksService.sendCancelRequest(task.getId(), reason, null, new ActionListener<>() {
350+
persistentTasksService.sendCancelRequest(task.getId(), reason, new ActionListener<>() {
350351
@Override
351352
public void onResponse(ListTasksResponse cancelTasksResponse) {
352353
logger.trace(

0 commit comments

Comments
 (0)