diff --git a/docs/changelog/129391.yaml b/docs/changelog/129391.yaml new file mode 100644 index 0000000000000..42f81a51e3c85 --- /dev/null +++ b/docs/changelog/129391.yaml @@ -0,0 +1,7 @@ +pr: 129391 +summary: Ensure that anomaly detection job state update retries if master node is + temoporarily unavailable +area: Machine Learning +type: bug +issues: + - 126148 diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index d003578158f48..e4e3adb471d48 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -11,6 +11,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.RetryableAction; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -29,6 +30,7 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.indices.InvalidAliasNameException; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -1002,13 +1004,17 @@ public Optional jobOpenTime(JobTask jobTask) { void setJobState(JobTask jobTask, JobState state, String reason) { JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason, Instant.now()); - jobTask.updatePersistentTaskState( + // retry state update to ensure that cluster state stays consistent + new UpdateStateRetryableAction( + logger, + threadPool, + jobTask, jobTaskState, ActionListener.wrap( persistentTask -> logger.info("Successfully set job state to [{}] for job [{}]", state, jobTask.getJobId()), e -> logSetJobStateFailure(state, jobTask.getJobId(), e) ) - ); + ).run(); } private static void logSetJobStateFailure(JobState state, String jobId, Exception e) { @@ -1021,7 +1027,8 @@ private static void logSetJobStateFailure(JobState state, String jobId, Exceptio void setJobState(JobTask jobTask, JobState state, String reason, CheckedConsumer handler) { JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason, Instant.now()); - jobTask.updatePersistentTaskState(jobTaskState, ActionListener.wrap(persistentTask -> { + // retry state update to ensure that cluster state stays consistent + new UpdateStateRetryableAction(logger, threadPool, jobTask, jobTaskState, ActionListener.wrap(persistentTask -> { try { handler.accept(null); } catch (IOException e1) { @@ -1033,7 +1040,7 @@ void setJobState(JobTask jobTask, JobState state, String reason, CheckedConsumer } catch (IOException e1) { logger.warn("Error while delegating exception [" + e.getMessage() + "]", e1); } - })); + })).run(); } public Optional>> getStatistics(JobTask jobTask) { @@ -1082,4 +1089,50 @@ public ByteSizeValue getOpenProcessMemoryUsage() { } return ByteSizeValue.ofBytes(memoryUsedBytes); } + + private static class UpdateStateRetryableAction extends RetryableAction> { + + private static final int MIN_RETRY_SLEEP_MILLIS = 500; + private static final int RETRY_TIMEOUT_SECONDS = 30; + private final JobTask jobTask; + private final JobTaskState jobTaskState; + + /** + * @param logger The logger (use AutodetectProcessManager.logger) + * @param threadPool The ThreadPool to schedule retries on + * @param jobTask The JobTask whose state we’re updating + * @param jobTaskState The new state to persist + */ + UpdateStateRetryableAction( + Logger logger, + ThreadPool threadPool, + JobTask jobTask, + JobTaskState jobTaskState, + ActionListener> delegateListener + ) { + super( + logger, + threadPool, + TimeValue.timeValueMillis(UpdateStateRetryableAction.MIN_RETRY_SLEEP_MILLIS), + TimeValue.timeValueSeconds(UpdateStateRetryableAction.RETRY_TIMEOUT_SECONDS), + delegateListener, + // executor for retries + threadPool.generic() + ); + this.jobTask = Objects.requireNonNull(jobTask); + this.jobTaskState = Objects.requireNonNull(jobTaskState); + } + + @Override + public void tryAction(ActionListener> listener) { + // this will call back either onResponse(...) or onFailure(...) + jobTask.updatePersistentTaskState(jobTaskState, listener); + } + + @Override + public boolean shouldRetry(Exception e) { + // retry everything *except* when the task truly no longer exists + return (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) == false; + } + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index cd3ef65377a57..968f3ea0c3a6f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; @@ -33,6 +34,7 @@ import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; @@ -91,6 +93,7 @@ import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -253,6 +256,9 @@ public void setup() throws Exception { handler.accept(buildAutodetectParams()); return null; }).when(jobResultsProvider).getAutodetectParams(any(), any(), any()); + + // when running retry logic use the real executor service + when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); } public void testOpenJob() { @@ -845,6 +851,141 @@ public void testGetOpenProcessMemoryUsage() { assertThat(manager.getOpenProcessMemoryUsage(), equalTo(ByteSizeValue.ofBytes(expectedSizeBytes))); } + public void testSetJobState_withoutHandler_invokesPersistentTaskUpdate() { + AutodetectProcessManager manager = createSpyManager(); + JobTask jobTask = mock(JobTask.class); + when(jobTask.getAllocationId()).thenReturn(123L); + when(jobTask.getJobId()).thenReturn("job-123"); + + // call the no-handler overload + manager.setJobState(jobTask, JobState.CLOSING, "closing-reason"); + + // verify we called updatePersistentTaskState with the expected state + @SuppressWarnings("unchecked") + ArgumentCaptor stateCaptor = ArgumentCaptor.forClass(JobTaskState.class); + verify(jobTask).updatePersistentTaskState(stateCaptor.capture(), any()); + JobTaskState captured = stateCaptor.getValue(); + assertEquals(JobState.CLOSING, captured.getState()); + assertEquals(123L, captured.getAllocationId()); + assertEquals("closing-reason", captured.getReason()); + } + + public void testSetJobState_withHandler_onResponse_triggersHandlerNull() throws IOException { + // This test verifies the “happy‐path” of the retryable overload—i.e. what happens when the very first call + // to updatePersistentTaskState succeeds. On a successful state update it must invoke handler.accept(null) + // (because there was no error). + AutodetectProcessManager manager = createSpyManager(); + JobTask jobTask = mock(JobTask.class); + + // stub updatePersistentTaskState to call onResponse + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + ActionListener> listener = (ActionListener< + PersistentTasksCustomMetadata.PersistentTask>) invocation.getArguments()[1]; + listener.onResponse(null); + return null; + }).when(jobTask).updatePersistentTaskState(any(), any()); + + AtomicReference holder = new AtomicReference<>(); + CheckedConsumer handler = holder::set; + + manager.setJobState(jobTask, JobState.FAILED, "fail-reason", handler); + + // onResponse should have driven handler.accept(null) + assertNull(holder.get()); + verify(jobTask).updatePersistentTaskState(any(JobTaskState.class), any()); + } + + public void testSetJobState_withHandler_onFailure_triggersHandlerException() throws IOException { + // Verifies that when updatePersistentTaskState reports a failure, the handler receives that exception + when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), any(Executor.class))) + .thenAnswer(invocation -> { + Runnable r = invocation.getArgument(0); + r.run(); + return mock(ThreadPool.Cancellable.class); + }); + AutodetectProcessManager manager = createSpyManager(); + JobTask jobTask = mock(JobTask.class); + ResourceNotFoundException boom = new ResourceNotFoundException("boom"); + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + ActionListener> listener = + (ActionListener>) invocation.getArguments()[1]; + listener.onFailure(boom); + return null; + }).when(jobTask).updatePersistentTaskState(any(), any()); + + AtomicReference holder = new AtomicReference<>(); + CheckedConsumer handler = holder::set; + + manager.setJobState(jobTask, JobState.FAILED, "fail-reason", handler); + + // onFailure should have driven handler.accept(boom) + assertSame(boom, holder.get()); + verify(jobTask).updatePersistentTaskState(any(JobTaskState.class), any()); + } + + public void testSetJobState_withHandler_retriesUntilSuccess() throws IOException { + // Verifies that transient failures are retried until eventual success, and the handler receives null on success + + // ensure that all retries are executed on the same thread for determinism + when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), any(Executor.class))).thenAnswer(invocation -> { + Runnable r = invocation.getArgument(0); + r.run(); + return mock(ThreadPool.Cancellable.class); + }); + AutodetectProcessManager manager = createSpyManager(); + JobTask jobTask = mock(JobTask.class); + AtomicInteger attempts = new AtomicInteger(); + doAnswer(invocation -> { + // Simulate transient failures for the first two attempts, then succeed on the third + @SuppressWarnings("unchecked") + ActionListener> listener = (ActionListener< + PersistentTasksCustomMetadata.PersistentTask>) invocation.getArguments()[1]; + if (attempts.incrementAndGet() < 3) { + listener.onFailure(new RuntimeException("transient failure")); + } else { + listener.onResponse(null); + } + return null; + }).when(jobTask).updatePersistentTaskState(any(), any()); + + AtomicReference holder = new AtomicReference<>(); + CheckedConsumer handler = holder::set; + + manager.setJobState(jobTask, JobState.OPENED, "retry-test", handler); + + // confirms that the method was called exactly three times (two failures then one success). + verify(jobTask, times(3)).updatePersistentTaskState(any(JobTaskState.class), any()); + assertNull(holder.get()); + } + + public void testSetJobState_withHandler_noRetryOnResourceNotFound() throws IOException { + // Ensures that if the persistent‐state update fails with a ResourceNotFoundException, the retry loop does not retry + // again but immediately invokes the user’s handler with that exception. + AutodetectProcessManager manager = createSpyManager(); + JobTask jobTask = mock(JobTask.class); + ResourceNotFoundException rnfe = new ResourceNotFoundException("not found"); + doAnswer(invocation -> { + // Simulate a ResourceNotFoundException that should not be retried + @SuppressWarnings("unchecked") + ActionListener> listener = (ActionListener< + PersistentTasksCustomMetadata.PersistentTask>) invocation.getArguments()[1]; + listener.onFailure(rnfe); + return null; + }).when(jobTask).updatePersistentTaskState(any(), any()); + + AtomicReference holder = new AtomicReference<>(); + CheckedConsumer handler = holder::set; + + manager.setJobState(jobTask, JobState.OPENED, "rnfe-test", handler); + + // updatePersistentTaskState(...) was invoked exactly once (no retries). + verify(jobTask, times(1)).updatePersistentTaskState(any(JobTaskState.class), any()); + // The handler should have been invoked with the ResourceNotFoundException + assertSame(rnfe, holder.get()); + } + private AutodetectProcessManager createNonSpyManager(String jobId) { ExecutorService executorService = mock(ExecutorService.class); when(threadPool.executor(anyString())).thenReturn(executorService);