Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/changelog/129391.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 129391
summary: Ensure that anomaly detection job state update retries if master node is
temoporarily unavailable
area: Machine Learning
type: bug
issues:
- 126148
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.RetryableAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -29,6 +30,7 @@
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.indices.InvalidAliasNameException;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -1002,13 +1004,17 @@ public Optional<Duration> jobOpenTime(JobTask jobTask) {

void setJobState(JobTask jobTask, JobState state, String reason) {
JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason, Instant.now());
jobTask.updatePersistentTaskState(
// retry state update to ensure that cluster state stays consistent
new UpdateStateRetryableAction(
logger,
threadPool,
jobTask,
jobTaskState,
ActionListener.wrap(
persistentTask -> logger.info("Successfully set job state to [{}] for job [{}]", state, jobTask.getJobId()),
e -> logSetJobStateFailure(state, jobTask.getJobId(), e)
)
);
).run();
}

private static void logSetJobStateFailure(JobState state, String jobId, Exception e) {
Expand All @@ -1021,19 +1027,26 @@ private static void logSetJobStateFailure(JobState state, String jobId, Exceptio

void setJobState(JobTask jobTask, JobState state, String reason, CheckedConsumer<Exception, IOException> handler) {
JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason, Instant.now());
jobTask.updatePersistentTaskState(jobTaskState, ActionListener.wrap(persistentTask -> {
try {
handler.accept(null);
} catch (IOException e1) {
logger.warn("Error while delegating response", e1);
}
}, e -> {
try {
handler.accept(e);
} catch (IOException e1) {
logger.warn("Error while delegating exception [" + e.getMessage() + "]", e1);
}
}));
// retry state update to ensure that cluster state stays consistent
new UpdateStateRetryableAction(
logger,
threadPool,
jobTask,
jobTaskState,
ActionListener.wrap(persistentTask -> {
try {
handler.accept(null);
} catch (IOException e1) {
logger.warn("Error while delegating response", e1);
}
}, e -> {
try {
handler.accept(e);
} catch (IOException e1) {
logger.warn("Error while delegating exception [" + e.getMessage() + "]", e1);
}
})
).run();
}

public Optional<Tuple<DataCounts, Tuple<ModelSizeStats, TimingStats>>> getStatistics(JobTask jobTask) {
Expand Down Expand Up @@ -1082,4 +1095,50 @@ public ByteSizeValue getOpenProcessMemoryUsage() {
}
return ByteSizeValue.ofBytes(memoryUsedBytes);
}

private static class UpdateStateRetryableAction extends RetryableAction<PersistentTasksCustomMetadata.PersistentTask<?>> {

private static final int MIN_RETRY_SLEEP_MILLIS = 500;
private static final int RETRY_TIMEOUT_SECONDS = 30;
private final JobTask jobTask;
private final JobTaskState jobTaskState;

/**
* @param logger The logger (use AutodetectProcessManager.logger)
* @param threadPool The ThreadPool to schedule retries on
* @param jobTask The JobTask whose state we’re updating
* @param jobTaskState The new state to persist
*/
UpdateStateRetryableAction(
Logger logger,
ThreadPool threadPool,
JobTask jobTask,
JobTaskState jobTaskState,
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> delegateListener
) {
super(
logger,
threadPool,
TimeValue.timeValueMillis(UpdateStateRetryableAction.MIN_RETRY_SLEEP_MILLIS),
TimeValue.timeValueSeconds(UpdateStateRetryableAction.RETRY_TIMEOUT_SECONDS),
delegateListener,
// executor for retries
threadPool.generic()
);
this.jobTask = Objects.requireNonNull(jobTask);
this.jobTaskState = Objects.requireNonNull(jobTaskState);
}

@Override
public void tryAction(ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener) {
// this will call back either onResponse(...) or onFailure(...)
jobTask.updatePersistentTaskState(jobTaskState, listener);
}

@Override
public boolean shouldRetry(Exception e) {
// retry everything *except* when the task truly no longer exists
return (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) == false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.xpack.ml.job.process.autodetect;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
Expand All @@ -33,6 +34,7 @@
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
Expand Down Expand Up @@ -91,6 +93,7 @@
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -262,6 +265,9 @@ public void setup() throws Exception {
handler.accept(buildAutodetectParams());
return null;
}).when(jobResultsProvider).getAutodetectParams(any(), any(), any());

// when running retry logic use the real executor service
when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
}

public void testOpenJob() {
Expand Down Expand Up @@ -854,6 +860,141 @@ public void testGetOpenProcessMemoryUsage() {
assertThat(manager.getOpenProcessMemoryUsage(), equalTo(ByteSizeValue.ofBytes(expectedSizeBytes)));
}

public void testSetJobState_withoutHandler_invokesPersistentTaskUpdate() {
AutodetectProcessManager manager = createSpyManager();
JobTask jobTask = mock(JobTask.class);
when(jobTask.getAllocationId()).thenReturn(123L);
when(jobTask.getJobId()).thenReturn("job-123");

// call the no-handler overload
manager.setJobState(jobTask, JobState.CLOSING, "closing-reason");

// verify we called updatePersistentTaskState with the expected state
@SuppressWarnings("unchecked")
ArgumentCaptor<JobTaskState> stateCaptor = ArgumentCaptor.forClass(JobTaskState.class);
verify(jobTask).updatePersistentTaskState(stateCaptor.capture(), any());
JobTaskState captured = stateCaptor.getValue();
assertEquals(JobState.CLOSING, captured.getState());
assertEquals(123L, captured.getAllocationId());
assertEquals("closing-reason", captured.getReason());
}

public void testSetJobState_withHandler_onResponse_triggersHandlerNull() throws IOException {
// This test verifies the “happy‐path” of the retryable overload—i.e. what happens when the very first call
// to updatePersistentTaskState succeeds. On a successful state update it must invoke handler.accept(null)
// (because there was no error).
AutodetectProcessManager manager = createSpyManager();
JobTask jobTask = mock(JobTask.class);

// stub updatePersistentTaskState to call onResponse
doAnswer(invocation -> {
@SuppressWarnings("unchecked")
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = (ActionListener<
PersistentTasksCustomMetadata.PersistentTask<?>>) invocation.getArguments()[1];
listener.onResponse(null);
return null;
}).when(jobTask).updatePersistentTaskState(any(), any());

AtomicReference<Exception> holder = new AtomicReference<>();
CheckedConsumer<Exception, IOException> handler = holder::set;

manager.setJobState(jobTask, JobState.FAILED, "fail-reason", handler);

// onResponse should have driven handler.accept(null)
assertNull(holder.get());
verify(jobTask).updatePersistentTaskState(any(JobTaskState.class), any());
}

public void testSetJobState_withHandler_onFailure_triggersHandlerException() throws IOException {
// Verifies that when updatePersistentTaskState reports a failure, the handler receives that exception
when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), any(Executor.class)))
.thenAnswer(invocation -> {
Runnable r = invocation.getArgument(0);
r.run();
return mock(ThreadPool.Cancellable.class);
});
AutodetectProcessManager manager = createSpyManager();
JobTask jobTask = mock(JobTask.class);
ResourceNotFoundException boom = new ResourceNotFoundException("boom");
doAnswer(invocation -> {
@SuppressWarnings("unchecked")
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener =
(ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>>) invocation.getArguments()[1];
listener.onFailure(boom);
return null;
}).when(jobTask).updatePersistentTaskState(any(), any());

AtomicReference<Exception> holder = new AtomicReference<>();
CheckedConsumer<Exception, IOException> handler = holder::set;

manager.setJobState(jobTask, JobState.FAILED, "fail-reason", handler);

// onFailure should have driven handler.accept(boom)
assertSame(boom, holder.get());
verify(jobTask).updatePersistentTaskState(any(JobTaskState.class), any());
}

public void testSetJobState_withHandler_retriesUntilSuccess() throws IOException {
// Verifies that transient failures are retried until eventual success, and the handler receives null on success

// ensure that all retries are executed on the same thread for determinism
when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), any(Executor.class))).thenAnswer(invocation -> {
Runnable r = invocation.getArgument(0);
r.run();
return mock(ThreadPool.Cancellable.class);
});
AutodetectProcessManager manager = createSpyManager();
JobTask jobTask = mock(JobTask.class);
AtomicInteger attempts = new AtomicInteger();
doAnswer(invocation -> {
// Simulate transient failures for the first two attempts, then succeed on the third
@SuppressWarnings("unchecked")
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = (ActionListener<
PersistentTasksCustomMetadata.PersistentTask<?>>) invocation.getArguments()[1];
if (attempts.incrementAndGet() < 3) {
listener.onFailure(new RuntimeException("transient failure"));
} else {
listener.onResponse(null);
}
return null;
}).when(jobTask).updatePersistentTaskState(any(), any());

AtomicReference<Exception> holder = new AtomicReference<>();
CheckedConsumer<Exception, IOException> handler = holder::set;

manager.setJobState(jobTask, JobState.OPENED, "retry-test", handler);

// confirms that the method was called exactly three times (two failures then one success).
verify(jobTask, times(3)).updatePersistentTaskState(any(JobTaskState.class), any());
assertNull(holder.get());
}

public void testSetJobState_withHandler_noRetryOnResourceNotFound() throws IOException {
// Ensures that if the persistent‐state update fails with a ResourceNotFoundException, the retry loop does not retry
// again but immediately invokes the user’s handler with that exception.
AutodetectProcessManager manager = createSpyManager();
JobTask jobTask = mock(JobTask.class);
ResourceNotFoundException rnfe = new ResourceNotFoundException("not found");
doAnswer(invocation -> {
// Simulate a ResourceNotFoundException that should not be retried
@SuppressWarnings("unchecked")
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = (ActionListener<
PersistentTasksCustomMetadata.PersistentTask<?>>) invocation.getArguments()[1];
listener.onFailure(rnfe);
return null;
}).when(jobTask).updatePersistentTaskState(any(), any());

AtomicReference<Exception> holder = new AtomicReference<>();
CheckedConsumer<Exception, IOException> handler = holder::set;

manager.setJobState(jobTask, JobState.OPENED, "rnfe-test", handler);

// updatePersistentTaskState(...) was invoked exactly once (no retries).
verify(jobTask, times(1)).updatePersistentTaskState(any(JobTaskState.class), any());
// The handler should have been invoked with the ResourceNotFoundException
assertSame(rnfe, holder.get());
}

private AutodetectProcessManager createNonSpyManager(String jobId) {
ExecutorService executorService = mock(ExecutorService.class);
when(threadPool.executor(anyString())).thenReturn(executorService);
Expand Down
Loading