Skip to content

Commit d06f7f2

Browse files
authored
Avoid implicit ML/transform master node timeouts (elastic#113536) (elastic#113559)
Today in the ML and Transform plugins we use `null` for timeouts related to persistent tasks, which means to use the implicit default timeout of 30s. As per elastic#107984 we want to eliminate all such uses of the implicit default timeout. This commit either moves to using the timeout from the associated transport request, if available, or else makes it explicit that we're using a hard-coded 30s timeout.
1 parent c644dbb commit d06f7f2

14 files changed

+249
-186
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,14 @@ public class MachineLearning extends Plugin
498498

499499
public static final String TRAINED_MODEL_CIRCUIT_BREAKER_NAME = "model_inference";
500500

501+
/**
502+
* Hard-coded timeout used for {@link org.elasticsearch.action.support.master.MasterNodeRequest#masterNodeTimeout()} for requests to
503+
* the master node from ML code. Wherever possible, prefer to use a user-controlled timeout instead of this.
504+
*
505+
* @see <a href="https://github.com/elastic/elasticsearch/issues/107984">#107984</a>
506+
*/
507+
public static final TimeValue HARD_CODED_MACHINE_LEARNING_MASTER_NODE_TIMEOUT = TimeValue.THIRTY_SECONDS;
508+
501509
private static final long DEFAULT_MODEL_CIRCUIT_BREAKER_LIMIT = (long) ((0.50) * JvmInfo.jvmInfo().getMem().getHeapMax().getBytes());
502510
private static final double DEFAULT_MODEL_CIRCUIT_BREAKER_OVERHEAD = 1.0D;
503511

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCancelJobModelSnapshotUpgradeAction.java

Lines changed: 40 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.xpack.core.ml.job.config.Job;
3131
import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeTaskParams;
3232
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
33+
import org.elasticsearch.xpack.ml.MachineLearning;
3334
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
3435

3536
import java.util.List;
@@ -103,47 +104,51 @@ private void removePersistentTasks(
103104
final AtomicArray<Exception> failures = new AtomicArray<>(numberOfTasks);
104105

105106
for (PersistentTasksCustomMetadata.PersistentTask<?> task : upgradeTasksToCancel) {
106-
persistentTasksService.sendRemoveRequest(task.getId(), null, new ActionListener<>() {
107-
@Override
108-
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> task) {
109-
if (counter.incrementAndGet() == numberOfTasks) {
110-
sendResponseOrFailure(listener, failures);
107+
persistentTasksService.sendRemoveRequest(
108+
task.getId(),
109+
MachineLearning.HARD_CODED_MACHINE_LEARNING_MASTER_NODE_TIMEOUT,
110+
new ActionListener<>() {
111+
@Override
112+
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> task) {
113+
if (counter.incrementAndGet() == numberOfTasks) {
114+
sendResponseOrFailure(listener, failures);
115+
}
111116
}
112-
}
113117

114-
@Override
115-
public void onFailure(Exception e) {
116-
final int slot = counter.incrementAndGet();
117-
// Not found is not an error - it just means the upgrade completed before we could cancel it.
118-
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException == false) {
119-
failures.set(slot - 1, e);
120-
}
121-
if (slot == numberOfTasks) {
122-
sendResponseOrFailure(listener, failures);
118+
@Override
119+
public void onFailure(Exception e) {
120+
final int slot = counter.incrementAndGet();
121+
// Not found is not an error - it just means the upgrade completed before we could cancel it.
122+
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException == false) {
123+
failures.set(slot - 1, e);
124+
}
125+
if (slot == numberOfTasks) {
126+
sendResponseOrFailure(listener, failures);
127+
}
123128
}
124-
}
125129

126-
private void sendResponseOrFailure(ActionListener<Response> listener, AtomicArray<Exception> failures) {
127-
List<Exception> caughtExceptions = failures.asList();
128-
if (caughtExceptions.isEmpty()) {
129-
listener.onResponse(new Response(true));
130-
return;
130+
private void sendResponseOrFailure(ActionListener<Response> listener, AtomicArray<Exception> failures) {
131+
List<Exception> caughtExceptions = failures.asList();
132+
if (caughtExceptions.isEmpty()) {
133+
listener.onResponse(new Response(true));
134+
return;
135+
}
136+
137+
String msg = "Failed to cancel model snapshot upgrade for ["
138+
+ request.getSnapshotId()
139+
+ "] on job ["
140+
+ request.getJobId()
141+
+ "]. Total failures ["
142+
+ caughtExceptions.size()
143+
+ "], rethrowing first. All Exceptions: ["
144+
+ caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", "))
145+
+ "]";
146+
147+
ElasticsearchStatusException e = exceptionArrayToStatusException(failures, msg);
148+
listener.onFailure(e);
131149
}
132-
133-
String msg = "Failed to cancel model snapshot upgrade for ["
134-
+ request.getSnapshotId()
135-
+ "] on job ["
136-
+ request.getJobId()
137-
+ "]. Total failures ["
138-
+ caughtExceptions.size()
139-
+ "], rethrowing first. All Exceptions: ["
140-
+ caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", "))
141-
+ "]";
142-
143-
ElasticsearchStatusException e = exceptionArrayToStatusException(failures, msg);
144-
listener.onFailure(e);
145150
}
146-
});
151+
);
147152
}
148153
}
149154
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java

Lines changed: 42 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ protected void doExecute(Task task, CloseJobAction.Request request, ActionListen
206206
// these persistent tasks to disappear.
207207
persistentTasksService.sendRemoveRequest(
208208
jobTask.getId(),
209-
null,
209+
MachineLearning.HARD_CODED_MACHINE_LEARNING_MASTER_NODE_TIMEOUT,
210210
ActionListener.wrap(
211211
r -> logger.trace(
212212
() -> format("[%s] removed task to close unassigned job", resolvedJobId)
@@ -517,48 +517,52 @@ private void forceCloseJob(
517517
PersistentTasksCustomMetadata.PersistentTask<?> jobTask = MlTasks.getJobTask(jobId, tasks);
518518
if (jobTask != null) {
519519
auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING);
520-
persistentTasksService.sendRemoveRequest(jobTask.getId(), null, new ActionListener<>() {
521-
@Override
522-
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> task) {
523-
if (counter.incrementAndGet() == numberOfJobs) {
524-
sendResponseOrFailure(request.getJobId(), listener, failures);
520+
persistentTasksService.sendRemoveRequest(
521+
jobTask.getId(),
522+
MachineLearning.HARD_CODED_MACHINE_LEARNING_MASTER_NODE_TIMEOUT,
523+
new ActionListener<>() {
524+
@Override
525+
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> task) {
526+
if (counter.incrementAndGet() == numberOfJobs) {
527+
sendResponseOrFailure(request.getJobId(), listener, failures);
528+
}
525529
}
526-
}
527530

528-
@Override
529-
public void onFailure(Exception e) {
530-
final int slot = counter.incrementAndGet();
531-
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException == false) {
532-
failures.set(slot - 1, e);
531+
@Override
532+
public void onFailure(Exception e) {
533+
final int slot = counter.incrementAndGet();
534+
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException == false) {
535+
failures.set(slot - 1, e);
536+
}
537+
if (slot == numberOfJobs) {
538+
sendResponseOrFailure(request.getJobId(), listener, failures);
539+
}
533540
}
534-
if (slot == numberOfJobs) {
535-
sendResponseOrFailure(request.getJobId(), listener, failures);
536-
}
537-
}
538541

539-
private static void sendResponseOrFailure(
540-
String jobId,
541-
ActionListener<CloseJobAction.Response> listener,
542-
AtomicArray<Exception> failures
543-
) {
544-
List<Exception> caughtExceptions = failures.asList();
545-
if (caughtExceptions.isEmpty()) {
546-
listener.onResponse(new CloseJobAction.Response(true));
547-
return;
542+
private static void sendResponseOrFailure(
543+
String jobId,
544+
ActionListener<CloseJobAction.Response> listener,
545+
AtomicArray<Exception> failures
546+
) {
547+
List<Exception> caughtExceptions = failures.asList();
548+
if (caughtExceptions.isEmpty()) {
549+
listener.onResponse(new CloseJobAction.Response(true));
550+
return;
551+
}
552+
553+
String msg = "Failed to force close job ["
554+
+ jobId
555+
+ "] with ["
556+
+ caughtExceptions.size()
557+
+ "] failures, rethrowing first. All Exceptions: ["
558+
+ caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", "))
559+
+ "]";
560+
561+
ElasticsearchStatusException e = exceptionArrayToStatusException(failures, msg);
562+
listener.onFailure(e);
548563
}
549-
550-
String msg = "Failed to force close job ["
551-
+ jobId
552-
+ "] with ["
553-
+ caughtExceptions.size()
554-
+ "] failures, rethrowing first. All Exceptions: ["
555-
+ caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", "))
556-
+ "]";
557-
558-
ElasticsearchStatusException e = exceptionArrayToStatusException(failures, msg);
559-
listener.onFailure(e);
560564
}
561-
});
565+
);
562566
}
563567
}
564568
}
@@ -588,7 +592,7 @@ private void normalCloseJob(
588592
PersistentTasksCustomMetadata.PersistentTask<?> jobTask = MlTasks.getJobTask(jobId, tasks);
589593
persistentTasksService.sendRemoveRequest(
590594
jobTask.getId(),
591-
null,
595+
MachineLearning.HARD_CODED_MACHINE_LEARNING_MASTER_NODE_TIMEOUT,
592596
ActionListener.wrap(r -> logger.trace("[{}] removed persistent task for relocated job", jobId), e -> {
593597
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
594598
logger.debug("[{}] relocated job task already removed", jobId);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
2929
import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction;
3030
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
31+
import org.elasticsearch.xpack.ml.MachineLearning;
3132
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
3233

3334
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
@@ -103,22 +104,26 @@ private void removeDatafeedTask(DeleteDatafeedAction.Request request, ClusterSta
103104
if (datafeedTask == null) {
104105
listener.onResponse(true);
105106
} else {
106-
persistentTasksService.sendRemoveRequest(datafeedTask.getId(), null, new ActionListener<>() {
107-
@Override
108-
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {
109-
listener.onResponse(Boolean.TRUE);
110-
}
107+
persistentTasksService.sendRemoveRequest(
108+
datafeedTask.getId(),
109+
MachineLearning.HARD_CODED_MACHINE_LEARNING_MASTER_NODE_TIMEOUT,
110+
new ActionListener<>() {
111+
@Override
112+
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {
113+
listener.onResponse(Boolean.TRUE);
114+
}
111115

112-
@Override
113-
public void onFailure(Exception e) {
114-
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
115-
// the task has been removed in between
116-
listener.onResponse(true);
117-
} else {
118-
listener.onFailure(e);
116+
@Override
117+
public void onFailure(Exception e) {
118+
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
119+
// the task has been removed in between
120+
listener.onResponse(true);
121+
} else {
122+
listener.onFailure(e);
123+
}
119124
}
120125
}
121-
});
126+
);
122127
}
123128
}
124129

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
4646
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
4747
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
48+
import org.elasticsearch.xpack.ml.MachineLearning;
4849
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
4950
import org.elasticsearch.xpack.ml.job.JobManager;
5051
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
@@ -291,7 +292,11 @@ private void removePersistentTask(String jobId, ClusterState currentState, Actio
291292
if (jobTask == null) {
292293
listener.onResponse(null);
293294
} else {
294-
persistentTasksService.sendRemoveRequest(jobTask.getId(), null, listener.safeMap(task -> true));
295+
persistentTasksService.sendRemoveRequest(
296+
jobTask.getId(),
297+
MachineLearning.HARD_CODED_MACHINE_LEARNING_MASTER_NODE_TIMEOUT,
298+
listener.safeMap(task -> true)
299+
);
295300
}
296301
}
297302

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
4848
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
4949
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
50+
import org.elasticsearch.xpack.ml.MachineLearning;
5051
import org.elasticsearch.xpack.ml.job.JobNodeSelector;
5152
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
5253
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
@@ -166,7 +167,7 @@ public void onFailure(Exception e) {
166167
MlTasks.jobTaskId(jobParams.getJobId()),
167168
MlTasks.JOB_TASK_NAME,
168169
jobParams,
169-
null,
170+
request.masterNodeTimeout(),
170171
waitForJobToStart
171172
),
172173
listener::onFailure
@@ -325,27 +326,31 @@ private void cancelJobStart(
325326
Exception exception,
326327
ActionListener<NodeAcknowledgedResponse> listener
327328
) {
328-
persistentTasksService.sendRemoveRequest(persistentTask.getId(), null, new ActionListener<>() {
329-
@Override
330-
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> task) {
331-
// We succeeded in cancelling the persistent task, but the
332-
// problem that caused us to cancel it is the overall result
333-
listener.onFailure(exception);
334-
}
329+
persistentTasksService.sendRemoveRequest(
330+
persistentTask.getId(),
331+
MachineLearning.HARD_CODED_MACHINE_LEARNING_MASTER_NODE_TIMEOUT,
332+
new ActionListener<>() {
333+
@Override
334+
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> task) {
335+
// We succeeded in cancelling the persistent task, but the
336+
// problem that caused us to cancel it is the overall result
337+
listener.onFailure(exception);
338+
}
335339

336-
@Override
337-
public void onFailure(Exception e) {
338-
logger.error(
339-
() -> format(
340-
"[%s] Failed to cancel persistent task that could not be assigned due to [%s]",
341-
persistentTask.getParams().getJobId(),
342-
exception.getMessage()
343-
),
344-
e
345-
);
346-
listener.onFailure(exception);
340+
@Override
341+
public void onFailure(Exception e) {
342+
logger.error(
343+
() -> format(
344+
"[%s] Failed to cancel persistent task that could not be assigned due to [%s]",
345+
persistentTask.getParams().getJobId(),
346+
exception.getMessage()
347+
),
348+
e
349+
);
350+
listener.onFailure(exception);
351+
}
347352
}
348-
});
353+
);
349354
}
350355

351356
/**

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ public void onFailure(Exception e) {
211211
MlTasks.dataFrameAnalyticsTaskId(request.getId()),
212212
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
213213
taskParams,
214-
null,
214+
request.masterNodeTimeout(),
215215
waitForAnalyticsToStart
216216
);
217217
}, listener::onFailure);
@@ -603,8 +603,8 @@ private void cancelAnalyticsStart(
603603
) {
604604
persistentTasksService.sendRemoveRequest(
605605
persistentTask.getId(),
606-
null,
607-
new ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>>() {
606+
MachineLearning.HARD_CODED_MACHINE_LEARNING_MASTER_NODE_TIMEOUT,
607+
new ActionListener<>() {
608608
@Override
609609
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> task) {
610610
// We succeeded in cancelling the persistent task, but the

0 commit comments

Comments
 (0)