Skip to content

Commit 41c303e

Browse files
committed
initial implementation
1 parent f02a3c4 commit 41c303e

File tree

1 file changed

+74
-13
lines changed

1 file changed

+74
-13
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

Lines changed: 74 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.ElasticsearchStatusException;
1212
import org.elasticsearch.ResourceNotFoundException;
1313
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.action.support.RetryableAction;
1415
import org.elasticsearch.client.internal.Client;
1516
import org.elasticsearch.cluster.ClusterChangedEvent;
1617
import org.elasticsearch.cluster.ClusterState;
@@ -23,6 +24,7 @@
2324
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2425
import org.elasticsearch.common.util.concurrent.ThreadContext;
2526
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
27+
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
2628
import org.elasticsearch.core.CheckedConsumer;
2729
import org.elasticsearch.core.IOUtils;
2830
import org.elasticsearch.core.TimeValue;
@@ -1021,19 +1023,28 @@ private static void logSetJobStateFailure(JobState state, String jobId, Exceptio
10211023

10221024
void setJobState(JobTask jobTask, JobState state, String reason, CheckedConsumer<Exception, IOException> handler) {
10231025
JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason, Instant.now());
1024-
jobTask.updatePersistentTaskState(jobTaskState, ActionListener.wrap(persistentTask -> {
1025-
try {
1026-
handler.accept(null);
1027-
} catch (IOException e1) {
1028-
logger.warn("Error while delegating response", e1);
1029-
}
1030-
}, e -> {
1031-
try {
1032-
handler.accept(e);
1033-
} catch (IOException e1) {
1034-
logger.warn("Error while delegating exception [" + e.getMessage() + "]", e1);
1035-
}
1036-
}));
1026+
// retry with a small initial backoff of 10ms
1027+
new UpdateStateRetryableAction(
1028+
logger,
1029+
threadPool,
1030+
TimeValue.timeValueMillis(UpdateStateRetryableAction.MIN_RETRY_SLEEP_MILLIS),
1031+
TimeValue.MAX_VALUE,
1032+
jobTask,
1033+
jobTaskState,
1034+
ActionListener.wrap(persistentTask -> {
1035+
try {
1036+
handler.accept(null);
1037+
} catch (IOException e1) {
1038+
logger.warn("Error while delegating response", e1);
1039+
}
1040+
}, e -> {
1041+
try {
1042+
handler.accept(e);
1043+
} catch (IOException e1) {
1044+
logger.warn("Error while delegating exception [" + e.getMessage() + "]", e1);
1045+
}
1046+
})
1047+
).run();
10371048
}
10381049

10391050
public Optional<Tuple<DataCounts, Tuple<ModelSizeStats, TimingStats>>> getStatistics(JobTask jobTask) {
@@ -1082,4 +1093,54 @@ public ByteSizeValue getOpenProcessMemoryUsage() {
10821093
}
10831094
return ByteSizeValue.ofBytes(memoryUsedBytes);
10841095
}
1096+
1097+
private static class UpdateStateRetryableAction extends RetryableAction<PersistentTasksCustomMetadata.PersistentTask<?>> {
1098+
1099+
private static final int MIN_RETRY_SLEEP_MILLIS = 50;
1100+
private final JobTask jobTask;
1101+
private final JobTaskState jobTaskState;
1102+
1103+
/**
1104+
* @param logger The logger (use AutodetectProcessManager.logger)
1105+
* @param threadPool The ThreadPool to schedule retries on
1106+
* @param initialDelay How long to wait before the *first* retry
1107+
* @param timeout Overall timeout for all retries
1108+
* @param jobTask The JobTask whose state we’re updating
1109+
* @param jobTaskState The new state to persist
1110+
*/
1111+
UpdateStateRetryableAction(
1112+
Logger logger,
1113+
ThreadPool threadPool,
1114+
TimeValue initialDelay,
1115+
TimeValue timeout,
1116+
JobTask jobTask,
1117+
JobTaskState jobTaskState,
1118+
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> delegateListener
1119+
) {
1120+
super(
1121+
logger,
1122+
threadPool,
1123+
initialDelay,
1124+
timeout,
1125+
delegateListener,
1126+
// executor for retries
1127+
threadPool.generic()
1128+
);
1129+
this.jobTask = Objects.requireNonNull(jobTask);
1130+
this.jobTaskState = Objects.requireNonNull(jobTaskState);
1131+
}
1132+
1133+
@Override
1134+
public void tryAction(ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener) {
1135+
// this will call back either onResponse(...) or onFailure(...)
1136+
jobTask.updatePersistentTaskState(jobTaskState, listener);
1137+
}
1138+
1139+
@Override
1140+
public boolean shouldRetry(Exception e) {
1141+
// retry everything *except* when the task truly no longer exists
1142+
// TODO valeriy: is this the only exception we should not retry on?
1143+
return (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) == false;
1144+
}
1145+
}
10851146
}

0 commit comments

Comments
 (0)