From ff69cfa60e49d17f093dddcbb31a7a36005c7927 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Thu, 12 Jun 2025 10:58:57 +0200 Subject: [PATCH 1/8] initial implementation --- .../autodetect/AutodetectProcessManager.java | 87 ++++++++++++++++--- 1 file changed, 74 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index d003578158f48..0d2a5f4f2a7e7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -11,6 +11,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.RetryableAction; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -23,6 +24,7 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentElasticsearchExtension; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.TimeValue; @@ -1021,19 +1023,28 @@ private static void logSetJobStateFailure(JobState state, String jobId, Exceptio void setJobState(JobTask jobTask, JobState state, String reason, CheckedConsumer handler) { JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason, Instant.now()); - jobTask.updatePersistentTaskState(jobTaskState, ActionListener.wrap(persistentTask -> { - try { - handler.accept(null); - } catch (IOException e1) { - logger.warn("Error while delegating response", e1); - } - }, e -> { - try { - handler.accept(e); - } catch (IOException e1) { - logger.warn("Error while delegating exception [" + e.getMessage() + "]", e1); - } - })); + // retry with a small initial backoff of 10ms + new UpdateStateRetryableAction( + logger, + threadPool, + TimeValue.timeValueMillis(UpdateStateRetryableAction.MIN_RETRY_SLEEP_MILLIS), + TimeValue.MAX_VALUE, + jobTask, + jobTaskState, + ActionListener.wrap(persistentTask -> { + try { + handler.accept(null); + } catch (IOException e1) { + logger.warn("Error while delegating response", e1); + } + }, e -> { + try { + handler.accept(e); + } catch (IOException e1) { + logger.warn("Error while delegating exception [" + e.getMessage() + "]", e1); + } + }) + ).run(); } public Optional>> getStatistics(JobTask jobTask) { @@ -1082,4 +1093,54 @@ public ByteSizeValue getOpenProcessMemoryUsage() { } return ByteSizeValue.ofBytes(memoryUsedBytes); } + + private static class UpdateStateRetryableAction extends RetryableAction> { + + private static final int MIN_RETRY_SLEEP_MILLIS = 50; + private final JobTask jobTask; + private final JobTaskState jobTaskState; + + /** + * @param logger The logger (use AutodetectProcessManager.logger) + * @param threadPool The ThreadPool to schedule retries on + * @param initialDelay How long to wait before the *first* retry + * @param timeout Overall timeout for all retries + * @param jobTask The JobTask whose state we’re updating + * @param jobTaskState The new state to persist + */ + UpdateStateRetryableAction( + Logger logger, + ThreadPool threadPool, + TimeValue initialDelay, + TimeValue timeout, + JobTask jobTask, + JobTaskState jobTaskState, + ActionListener> delegateListener + ) { + super( + logger, + threadPool, + initialDelay, + timeout, + delegateListener, + // executor for retries + threadPool.generic() + ); + this.jobTask = Objects.requireNonNull(jobTask); + this.jobTaskState = Objects.requireNonNull(jobTaskState); + } + + @Override + public void tryAction(ActionListener> listener) { + // this will call back either onResponse(...) or onFailure(...) + jobTask.updatePersistentTaskState(jobTaskState, listener); + } + + @Override + public boolean shouldRetry(Exception e) { + // retry everything *except* when the task truly no longer exists + // TODO valeriy: is this the only exception we should not retry on? + return (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) == false; + } + } } From c7c8f3dd969cccb21070a79a25088371dca754ec Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Thu, 12 Jun 2025 11:23:31 +0200 Subject: [PATCH 2/8] Unit tests implemented --- .../autodetect/AutodetectProcessManager.java | 2 +- .../AutodetectProcessManagerTests.java | 125 ++++++++++++++++++ 2 files changed, 126 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 0d2a5f4f2a7e7..b43fc81a202a6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -24,13 +24,13 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentElasticsearchExtension; -import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.indices.InvalidAliasNameException; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.NamedXContentRegistry; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index fcf2f2e32f16b..6f0d713b5c7a9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; @@ -33,6 +34,7 @@ import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; @@ -91,6 +93,7 @@ import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -262,6 +265,9 @@ public void setup() throws Exception { handler.accept(buildAutodetectParams()); return null; }).when(jobResultsProvider).getAutodetectParams(any(), any(), any()); + + // when running retry logic use the real executor service + when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); } public void testOpenJob() { @@ -854,6 +860,125 @@ public void testGetOpenProcessMemoryUsage() { assertThat(manager.getOpenProcessMemoryUsage(), equalTo(ByteSizeValue.ofBytes(expectedSizeBytes))); } + public void testSetJobState_withoutHandler_invokesPersistentTaskUpdate() { + AutodetectProcessManager manager = createSpyManager(); + JobTask jobTask = mock(JobTask.class); + when(jobTask.getAllocationId()).thenReturn(123L); + when(jobTask.getJobId()).thenReturn("job-123"); + + // call the no-handler overload + manager.setJobState(jobTask, JobState.CLOSING, "closing-reason"); + + // verify we created the correct JobTaskState and passed some listener + @SuppressWarnings("unchecked") + ArgumentCaptor stateCaptor = + ArgumentCaptor.forClass(JobTaskState.class); + verify(jobTask).updatePersistentTaskState(stateCaptor.capture(), any()); + JobTaskState captured = stateCaptor.getValue(); + assertEquals(JobState.CLOSING, captured.getState()); + assertEquals(123L, captured.getAllocationId()); + assertEquals("closing-reason", captured.getReason()); +// assertNotNull(captured.getTimestamp()); + } + + public void testSetJobState_withHandler_onResponse_triggersHandlerNull() throws IOException { + AutodetectProcessManager manager = createSpyManager(); + JobTask jobTask = mock(JobTask.class); + + // stub updatePersistentTaskState to call onResponse + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + ActionListener> listener = + (ActionListener>) invocation.getArguments()[1]; + listener.onResponse(null); + return null; + }).when(jobTask).updatePersistentTaskState(any(), any()); + + AtomicReference holder = new AtomicReference<>(); + CheckedConsumer handler = holder::set; + + manager.setJobState(jobTask, JobState.FAILED, "fail-reason", handler); + + // onResponse should have driven handler.accept(null) + assertNull(holder.get()); + verify(jobTask).updatePersistentTaskState(any(JobTaskState.class), any()); + } + + public void testSetJobState_withHandler_onFailure_triggersHandlerException() throws IOException { + AutodetectProcessManager manager = createSpyManager(); + JobTask jobTask = mock(JobTask.class); + Exception boom = new RuntimeException("boom"); + + // stub updatePersistentTaskState to call onFailure + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + ActionListener> listener = + (ActionListener>) invocation.getArguments()[1]; + listener.onFailure(boom); + return null; + }).when(jobTask).updatePersistentTaskState(any(), any()); + + AtomicReference holder = new AtomicReference<>(); + CheckedConsumer handler = holder::set; + + manager.setJobState(jobTask, JobState.FAILED, "fail-reason", handler); + + // onFailure should have driven handler.accept(boom) + assertSame(boom, holder.get()); + verify(jobTask).updatePersistentTaskState(any(JobTaskState.class), any()); + } + + public void testSetJobState_withHandler_retriesUntilSuccess() throws IOException { + when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), any(Executor.class))).thenAnswer(invocation -> { + Runnable r = invocation.getArgument(0); + r.run(); + return mock(ThreadPool.Cancellable.class); + }); + AutodetectProcessManager manager = createSpyManager(); + JobTask jobTask = mock(JobTask.class); + AtomicInteger attempts = new AtomicInteger(); + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + ActionListener> listener = (ActionListener< + PersistentTasksCustomMetadata.PersistentTask>) invocation.getArguments()[1]; + if (attempts.incrementAndGet() < 3) { + listener.onFailure(new RuntimeException("transient failure")); + } else { + listener.onResponse(null); + } + return null; + }).when(jobTask).updatePersistentTaskState(any(), any()); + + AtomicReference holder = new AtomicReference<>(); + CheckedConsumer handler = holder::set; + + manager.setJobState(jobTask, JobState.OPENED, "retry-test", handler); + + verify(jobTask, times(3)).updatePersistentTaskState(any(JobTaskState.class), any()); + assertNull(holder.get()); + } + + public void testSetJobState_withHandler_noRetryOnResourceNotFound() throws IOException { + AutodetectProcessManager manager = createSpyManager(); + JobTask jobTask = mock(JobTask.class); + ResourceNotFoundException rnfe = new ResourceNotFoundException("not found"); + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + ActionListener> listener = + (ActionListener>) invocation.getArguments()[1]; + listener.onFailure(rnfe); + return null; + }).when(jobTask).updatePersistentTaskState(any(), any()); + + AtomicReference holder = new AtomicReference<>(); + CheckedConsumer handler = holder::set; + + manager.setJobState(jobTask, JobState.OPENED, "rnfe-test", handler); + + verify(jobTask, times(1)).updatePersistentTaskState(any(JobTaskState.class), any()); + assertSame(rnfe, holder.get()); + } + private AutodetectProcessManager createNonSpyManager(String jobId) { ExecutorService executorService = mock(ExecutorService.class); when(threadPool.executor(anyString())).thenReturn(executorService); From c8a2e7667e4d06f5eba5fd410b95cd08a3a1e388 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Thu, 12 Jun 2025 13:07:27 +0200 Subject: [PATCH 3/8] Unit tests commented and fixed --- .../autodetect/AutodetectProcessManager.java | 12 ++++-- .../AutodetectProcessManagerTests.java | 38 +++++++++++++------ 2 files changed, 36 insertions(+), 14 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index b43fc81a202a6..d2a42e19ae999 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -1004,13 +1004,19 @@ public Optional jobOpenTime(JobTask jobTask) { void setJobState(JobTask jobTask, JobState state, String reason) { JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason, Instant.now()); - jobTask.updatePersistentTaskState( + // retry state update to ensure that cluster state stays consistent + new UpdateStateRetryableAction( + logger, + threadPool, + TimeValue.timeValueMillis(UpdateStateRetryableAction.MIN_RETRY_SLEEP_MILLIS), + TimeValue.MAX_VALUE, + jobTask, jobTaskState, ActionListener.wrap( persistentTask -> logger.info("Successfully set job state to [{}] for job [{}]", state, jobTask.getJobId()), e -> logSetJobStateFailure(state, jobTask.getJobId(), e) ) - ); + ).run(); } private static void logSetJobStateFailure(JobState state, String jobId, Exception e) { @@ -1023,7 +1029,7 @@ private static void logSetJobStateFailure(JobState state, String jobId, Exceptio void setJobState(JobTask jobTask, JobState state, String reason, CheckedConsumer handler) { JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason, Instant.now()); - // retry with a small initial backoff of 10ms + // retry state update to ensure that cluster state stays consistent new UpdateStateRetryableAction( logger, threadPool, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 6f0d713b5c7a9..d6d4ef84387d8 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -869,27 +869,28 @@ public void testSetJobState_withoutHandler_invokesPersistentTaskUpdate() { // call the no-handler overload manager.setJobState(jobTask, JobState.CLOSING, "closing-reason"); - // verify we created the correct JobTaskState and passed some listener + // verify we called updatePersistentTaskState with the expected state @SuppressWarnings("unchecked") - ArgumentCaptor stateCaptor = - ArgumentCaptor.forClass(JobTaskState.class); + ArgumentCaptor stateCaptor = ArgumentCaptor.forClass(JobTaskState.class); verify(jobTask).updatePersistentTaskState(stateCaptor.capture(), any()); JobTaskState captured = stateCaptor.getValue(); assertEquals(JobState.CLOSING, captured.getState()); assertEquals(123L, captured.getAllocationId()); assertEquals("closing-reason", captured.getReason()); -// assertNotNull(captured.getTimestamp()); } public void testSetJobState_withHandler_onResponse_triggersHandlerNull() throws IOException { + // This test verifies the “happy‐path” of the retryable overload—i.e. what happens when the very first call + // to updatePersistentTaskState succeeds. On a successful state update it must invoke handler.accept(null) + // (because there was no error). AutodetectProcessManager manager = createSpyManager(); JobTask jobTask = mock(JobTask.class); // stub updatePersistentTaskState to call onResponse doAnswer(invocation -> { @SuppressWarnings("unchecked") - ActionListener> listener = - (ActionListener>) invocation.getArguments()[1]; + ActionListener> listener = (ActionListener< + PersistentTasksCustomMetadata.PersistentTask>) invocation.getArguments()[1]; listener.onResponse(null); return null; }).when(jobTask).updatePersistentTaskState(any(), any()); @@ -905,11 +906,16 @@ public void testSetJobState_withHandler_onResponse_triggersHandlerNull() throws } public void testSetJobState_withHandler_onFailure_triggersHandlerException() throws IOException { + // Verifies that when updatePersistentTaskState reports a failure, the handler receives that exception + when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), any(Executor.class))) + .thenAnswer(invocation -> { + Runnable r = invocation.getArgument(0); + r.run(); + return mock(ThreadPool.Cancellable.class); + }); AutodetectProcessManager manager = createSpyManager(); JobTask jobTask = mock(JobTask.class); - Exception boom = new RuntimeException("boom"); - - // stub updatePersistentTaskState to call onFailure + ResourceNotFoundException boom = new ResourceNotFoundException("boom"); doAnswer(invocation -> { @SuppressWarnings("unchecked") ActionListener> listener = @@ -929,6 +935,9 @@ public void testSetJobState_withHandler_onFailure_triggersHandlerException() thr } public void testSetJobState_withHandler_retriesUntilSuccess() throws IOException { + // Verifies that transient failures are retried until eventual success, and the handler receives null on success + + // ensure that all retries are executed on the same thread for determinism when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), any(Executor.class))).thenAnswer(invocation -> { Runnable r = invocation.getArgument(0); r.run(); @@ -938,6 +947,7 @@ public void testSetJobState_withHandler_retriesUntilSuccess() throws IOException JobTask jobTask = mock(JobTask.class); AtomicInteger attempts = new AtomicInteger(); doAnswer(invocation -> { + // Simulate transient failures for the first two attempts, then succeed on the third @SuppressWarnings("unchecked") ActionListener> listener = (ActionListener< PersistentTasksCustomMetadata.PersistentTask>) invocation.getArguments()[1]; @@ -954,18 +964,22 @@ public void testSetJobState_withHandler_retriesUntilSuccess() throws IOException manager.setJobState(jobTask, JobState.OPENED, "retry-test", handler); + // confirms that the method was called exactly three times (two failures then one success). verify(jobTask, times(3)).updatePersistentTaskState(any(JobTaskState.class), any()); assertNull(holder.get()); } public void testSetJobState_withHandler_noRetryOnResourceNotFound() throws IOException { + // Ensures that if the persistent‐state update fails with a ResourceNotFoundException, the retry loop does not retry + // again but immediately invokes the user’s handler with that exception. AutodetectProcessManager manager = createSpyManager(); JobTask jobTask = mock(JobTask.class); ResourceNotFoundException rnfe = new ResourceNotFoundException("not found"); doAnswer(invocation -> { + // Simulate a ResourceNotFoundException that should not be retried @SuppressWarnings("unchecked") - ActionListener> listener = - (ActionListener>) invocation.getArguments()[1]; + ActionListener> listener = (ActionListener< + PersistentTasksCustomMetadata.PersistentTask>) invocation.getArguments()[1]; listener.onFailure(rnfe); return null; }).when(jobTask).updatePersistentTaskState(any(), any()); @@ -975,7 +989,9 @@ public void testSetJobState_withHandler_noRetryOnResourceNotFound() throws IOExc manager.setJobState(jobTask, JobState.OPENED, "rnfe-test", handler); + // updatePersistentTaskState(...) was invoked exactly once (no retries). verify(jobTask, times(1)).updatePersistentTaskState(any(JobTaskState.class), any()); + // The handler should have been invoked with the ResourceNotFoundException assertSame(rnfe, holder.get()); } From 78cd7dd957d7b00239173560ff013bb09a0eb602 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Fri, 13 Jun 2025 09:45:42 +0200 Subject: [PATCH 4/8] Update docs/changelog/129391.yaml --- docs/changelog/129391.yaml | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 docs/changelog/129391.yaml diff --git a/docs/changelog/129391.yaml b/docs/changelog/129391.yaml new file mode 100644 index 0000000000000..42f81a51e3c85 --- /dev/null +++ b/docs/changelog/129391.yaml @@ -0,0 +1,7 @@ +pr: 129391 +summary: Ensure that anomaly detection job state update retries if master node is + temoporarily unavailable +area: Machine Learning +type: bug +issues: + - 126148 From 11844ea1b464e3a78e1e0e69bd38e3b496ca1b55 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Fri, 13 Jun 2025 09:49:40 +0200 Subject: [PATCH 5/8] update min delay and set timeout --- .../ml/job/process/autodetect/AutodetectProcessManager.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index d2a42e19ae999..d02c9116f508d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -1009,7 +1009,7 @@ void setJobState(JobTask jobTask, JobState state, String reason) { logger, threadPool, TimeValue.timeValueMillis(UpdateStateRetryableAction.MIN_RETRY_SLEEP_MILLIS), - TimeValue.MAX_VALUE, + TimeValue.timeValueSeconds(UpdateStateRetryableAction.RETRY_TIMEOUT_SECONDS), jobTask, jobTaskState, ActionListener.wrap( @@ -1102,7 +1102,8 @@ public ByteSizeValue getOpenProcessMemoryUsage() { private static class UpdateStateRetryableAction extends RetryableAction> { - private static final int MIN_RETRY_SLEEP_MILLIS = 50; + private static final int MIN_RETRY_SLEEP_MILLIS = 500; + private static final int RETRY_TIMEOUT_SECONDS = 30; private final JobTask jobTask; private final JobTaskState jobTaskState; From 04571d6ebdd7f3d240cf4ece790e0cebd4fb4559 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Fri, 13 Jun 2025 09:52:52 +0200 Subject: [PATCH 6/8] remove obsolete todo --- .../ml/job/process/autodetect/AutodetectProcessManager.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index d02c9116f508d..9230cc6cee4f0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -1146,7 +1146,6 @@ public void tryAction(ActionListener Date: Fri, 13 Jun 2025 10:02:31 +0200 Subject: [PATCH 7/8] removed delay and timeout from the constructor --- .../process/autodetect/AutodetectProcessManager.java | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 9230cc6cee4f0..5ec333833c2f9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -1008,8 +1008,6 @@ void setJobState(JobTask jobTask, JobState state, String reason) { new UpdateStateRetryableAction( logger, threadPool, - TimeValue.timeValueMillis(UpdateStateRetryableAction.MIN_RETRY_SLEEP_MILLIS), - TimeValue.timeValueSeconds(UpdateStateRetryableAction.RETRY_TIMEOUT_SECONDS), jobTask, jobTaskState, ActionListener.wrap( @@ -1033,8 +1031,6 @@ void setJobState(JobTask jobTask, JobState state, String reason, CheckedConsumer new UpdateStateRetryableAction( logger, threadPool, - TimeValue.timeValueMillis(UpdateStateRetryableAction.MIN_RETRY_SLEEP_MILLIS), - TimeValue.MAX_VALUE, jobTask, jobTaskState, ActionListener.wrap(persistentTask -> { @@ -1110,16 +1106,12 @@ private static class UpdateStateRetryableAction extends RetryableAction> delegateListener @@ -1127,8 +1119,8 @@ private static class UpdateStateRetryableAction extends RetryableAction Date: Fri, 13 Jun 2025 08:10:51 +0000 Subject: [PATCH 8/8] [CI] Auto commit changes from spotless --- .../autodetect/AutodetectProcessManager.java | 32 ++++++++----------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 5ec333833c2f9..e4e3adb471d48 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -1028,25 +1028,19 @@ private static void logSetJobStateFailure(JobState state, String jobId, Exceptio void setJobState(JobTask jobTask, JobState state, String reason, CheckedConsumer handler) { JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason, Instant.now()); // retry state update to ensure that cluster state stays consistent - new UpdateStateRetryableAction( - logger, - threadPool, - jobTask, - jobTaskState, - ActionListener.wrap(persistentTask -> { - try { - handler.accept(null); - } catch (IOException e1) { - logger.warn("Error while delegating response", e1); - } - }, e -> { - try { - handler.accept(e); - } catch (IOException e1) { - logger.warn("Error while delegating exception [" + e.getMessage() + "]", e1); - } - }) - ).run(); + new UpdateStateRetryableAction(logger, threadPool, jobTask, jobTaskState, ActionListener.wrap(persistentTask -> { + try { + handler.accept(null); + } catch (IOException e1) { + logger.warn("Error while delegating response", e1); + } + }, e -> { + try { + handler.accept(e); + } catch (IOException e1) { + logger.warn("Error while delegating exception [" + e.getMessage() + "]", e1); + } + })).run(); } public Optional>> getStatistics(JobTask jobTask) {