Skip to content

Commit 47bd452

Browse files
committed
Unit tests commented and fixed
1 parent 0313724 commit 47bd452

File tree

2 files changed

+36
-14
lines changed

2 files changed

+36
-14
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,13 +1004,19 @@ public Optional<Duration> jobOpenTime(JobTask jobTask) {
10041004

10051005
void setJobState(JobTask jobTask, JobState state, String reason) {
10061006
JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason, Instant.now());
1007-
jobTask.updatePersistentTaskState(
1007+
// retry state update to ensure that cluster state stays consistent
1008+
new UpdateStateRetryableAction(
1009+
logger,
1010+
threadPool,
1011+
TimeValue.timeValueMillis(UpdateStateRetryableAction.MIN_RETRY_SLEEP_MILLIS),
1012+
TimeValue.MAX_VALUE,
1013+
jobTask,
10081014
jobTaskState,
10091015
ActionListener.wrap(
10101016
persistentTask -> logger.info("Successfully set job state to [{}] for job [{}]", state, jobTask.getJobId()),
10111017
e -> logSetJobStateFailure(state, jobTask.getJobId(), e)
10121018
)
1013-
);
1019+
).run();
10141020
}
10151021

10161022
private static void logSetJobStateFailure(JobState state, String jobId, Exception e) {
@@ -1023,7 +1029,7 @@ private static void logSetJobStateFailure(JobState state, String jobId, Exceptio
10231029

10241030
void setJobState(JobTask jobTask, JobState state, String reason, CheckedConsumer<Exception, IOException> handler) {
10251031
JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason, Instant.now());
1026-
// retry with a small initial backoff of 10ms
1032+
// retry state update to ensure that cluster state stays consistent
10271033
new UpdateStateRetryableAction(
10281034
logger,
10291035
threadPool,

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -869,27 +869,28 @@ public void testSetJobState_withoutHandler_invokesPersistentTaskUpdate() {
869869
// call the no-handler overload
870870
manager.setJobState(jobTask, JobState.CLOSING, "closing-reason");
871871

872-
// verify we created the correct JobTaskState and passed some listener
872+
// verify we called updatePersistentTaskState with the expected state
873873
@SuppressWarnings("unchecked")
874-
ArgumentCaptor<JobTaskState> stateCaptor =
875-
ArgumentCaptor.forClass(JobTaskState.class);
874+
ArgumentCaptor<JobTaskState> stateCaptor = ArgumentCaptor.forClass(JobTaskState.class);
876875
verify(jobTask).updatePersistentTaskState(stateCaptor.capture(), any());
877876
JobTaskState captured = stateCaptor.getValue();
878877
assertEquals(JobState.CLOSING, captured.getState());
879878
assertEquals(123L, captured.getAllocationId());
880879
assertEquals("closing-reason", captured.getReason());
881-
// assertNotNull(captured.getTimestamp());
882880
}
883881

884882
public void testSetJobState_withHandler_onResponse_triggersHandlerNull() throws IOException {
883+
// This test verifies the “happy‐path” of the retryable overload—i.e. what happens when the very first call
884+
// to updatePersistentTaskState succeeds. On a successful state update it must invoke handler.accept(null)
885+
// (because there was no error).
885886
AutodetectProcessManager manager = createSpyManager();
886887
JobTask jobTask = mock(JobTask.class);
887888

888889
// stub updatePersistentTaskState to call onResponse
889890
doAnswer(invocation -> {
890891
@SuppressWarnings("unchecked")
891-
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener =
892-
(ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>>) invocation.getArguments()[1];
892+
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = (ActionListener<
893+
PersistentTasksCustomMetadata.PersistentTask<?>>) invocation.getArguments()[1];
893894
listener.onResponse(null);
894895
return null;
895896
}).when(jobTask).updatePersistentTaskState(any(), any());
@@ -905,11 +906,16 @@ public void testSetJobState_withHandler_onResponse_triggersHandlerNull() throws
905906
}
906907

907908
public void testSetJobState_withHandler_onFailure_triggersHandlerException() throws IOException {
909+
// Verifies that when updatePersistentTaskState reports a failure, the handler receives that exception
910+
when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), any(Executor.class)))
911+
.thenAnswer(invocation -> {
912+
Runnable r = invocation.getArgument(0);
913+
r.run();
914+
return mock(ThreadPool.Cancellable.class);
915+
});
908916
AutodetectProcessManager manager = createSpyManager();
909917
JobTask jobTask = mock(JobTask.class);
910-
Exception boom = new RuntimeException("boom");
911-
912-
// stub updatePersistentTaskState to call onFailure
918+
ResourceNotFoundException boom = new ResourceNotFoundException("boom");
913919
doAnswer(invocation -> {
914920
@SuppressWarnings("unchecked")
915921
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener =
@@ -929,6 +935,9 @@ public void testSetJobState_withHandler_onFailure_triggersHandlerException() thr
929935
}
930936

931937
public void testSetJobState_withHandler_retriesUntilSuccess() throws IOException {
938+
// Verifies that transient failures are retried until eventual success, and the handler receives null on success
939+
940+
// ensure that all retries are executed on the same thread for determinism
932941
when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), any(Executor.class))).thenAnswer(invocation -> {
933942
Runnable r = invocation.getArgument(0);
934943
r.run();
@@ -938,6 +947,7 @@ public void testSetJobState_withHandler_retriesUntilSuccess() throws IOException
938947
JobTask jobTask = mock(JobTask.class);
939948
AtomicInteger attempts = new AtomicInteger();
940949
doAnswer(invocation -> {
950+
// Simulate transient failures for the first two attempts, then succeed on the third
941951
@SuppressWarnings("unchecked")
942952
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = (ActionListener<
943953
PersistentTasksCustomMetadata.PersistentTask<?>>) invocation.getArguments()[1];
@@ -954,18 +964,22 @@ public void testSetJobState_withHandler_retriesUntilSuccess() throws IOException
954964

955965
manager.setJobState(jobTask, JobState.OPENED, "retry-test", handler);
956966

967+
// confirms that the method was called exactly three times (two failures then one success).
957968
verify(jobTask, times(3)).updatePersistentTaskState(any(JobTaskState.class), any());
958969
assertNull(holder.get());
959970
}
960971

961972
public void testSetJobState_withHandler_noRetryOnResourceNotFound() throws IOException {
973+
// Ensures that if the persistent‐state update fails with a ResourceNotFoundException, the retry loop does not retry
974+
// again but immediately invokes the user’s handler with that exception.
962975
AutodetectProcessManager manager = createSpyManager();
963976
JobTask jobTask = mock(JobTask.class);
964977
ResourceNotFoundException rnfe = new ResourceNotFoundException("not found");
965978
doAnswer(invocation -> {
979+
// Simulate a ResourceNotFoundException that should not be retried
966980
@SuppressWarnings("unchecked")
967-
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener =
968-
(ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>>) invocation.getArguments()[1];
981+
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = (ActionListener<
982+
PersistentTasksCustomMetadata.PersistentTask<?>>) invocation.getArguments()[1];
969983
listener.onFailure(rnfe);
970984
return null;
971985
}).when(jobTask).updatePersistentTaskState(any(), any());
@@ -975,7 +989,9 @@ public void testSetJobState_withHandler_noRetryOnResourceNotFound() throws IOExc
975989

976990
manager.setJobState(jobTask, JobState.OPENED, "rnfe-test", handler);
977991

992+
// updatePersistentTaskState(...) was invoked exactly once (no retries).
978993
verify(jobTask, times(1)).updatePersistentTaskState(any(JobTaskState.class), any());
994+
// The handler should have been invoked with the ResourceNotFoundException
979995
assertSame(rnfe, holder.get());
980996
}
981997

0 commit comments

Comments
 (0)