|
7 | 7 | package org.elasticsearch.xpack.ml.job.process.autodetect; |
8 | 8 |
|
9 | 9 | import org.elasticsearch.ElasticsearchException; |
| 10 | +import org.elasticsearch.ResourceNotFoundException; |
10 | 11 | import org.elasticsearch.action.ActionListener; |
11 | 12 | import org.elasticsearch.action.ActionType; |
12 | 13 | import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; |
|
33 | 34 | import org.elasticsearch.index.analysis.AnalysisRegistry; |
34 | 35 | import org.elasticsearch.indices.TestIndexNameExpressionResolver; |
35 | 36 | import org.elasticsearch.license.XPackLicenseState; |
| 37 | +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; |
36 | 38 | import org.elasticsearch.persistent.PersistentTasksService; |
37 | 39 | import org.elasticsearch.tasks.TaskId; |
38 | 40 | import org.elasticsearch.tasks.TaskManager; |
|
91 | 93 | import java.util.Optional; |
92 | 94 | import java.util.concurrent.Callable; |
93 | 95 | import java.util.concurrent.CountDownLatch; |
| 96 | +import java.util.concurrent.Executor; |
94 | 97 | import java.util.concurrent.ExecutorService; |
95 | 98 | import java.util.concurrent.Future; |
96 | 99 | import java.util.concurrent.TimeUnit; |
@@ -262,6 +265,9 @@ public void setup() throws Exception { |
262 | 265 | handler.accept(buildAutodetectParams()); |
263 | 266 | return null; |
264 | 267 | }).when(jobResultsProvider).getAutodetectParams(any(), any(), any()); |
| 268 | + |
| 269 | + // when running retry logic use the real executor service |
| 270 | + when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); |
265 | 271 | } |
266 | 272 |
|
267 | 273 | public void testOpenJob() { |
@@ -854,6 +860,125 @@ public void testGetOpenProcessMemoryUsage() { |
854 | 860 | assertThat(manager.getOpenProcessMemoryUsage(), equalTo(ByteSizeValue.ofBytes(expectedSizeBytes))); |
855 | 861 | } |
856 | 862 |
|
| 863 | + public void testSetJobState_withoutHandler_invokesPersistentTaskUpdate() { |
| 864 | + AutodetectProcessManager manager = createSpyManager(); |
| 865 | + JobTask jobTask = mock(JobTask.class); |
| 866 | + when(jobTask.getAllocationId()).thenReturn(123L); |
| 867 | + when(jobTask.getJobId()).thenReturn("job-123"); |
| 868 | + |
| 869 | + // call the no-handler overload |
| 870 | + manager.setJobState(jobTask, JobState.CLOSING, "closing-reason"); |
| 871 | + |
| 872 | + // verify we created the correct JobTaskState and passed some listener |
| 873 | + @SuppressWarnings("unchecked") |
| 874 | + ArgumentCaptor<JobTaskState> stateCaptor = |
| 875 | + ArgumentCaptor.forClass(JobTaskState.class); |
| 876 | + verify(jobTask).updatePersistentTaskState(stateCaptor.capture(), any()); |
| 877 | + JobTaskState captured = stateCaptor.getValue(); |
| 878 | + assertEquals(JobState.CLOSING, captured.getState()); |
| 879 | + assertEquals(123L, captured.getAllocationId()); |
| 880 | + assertEquals("closing-reason", captured.getReason()); |
| 881 | +// assertNotNull(captured.getTimestamp()); |
| 882 | + } |
| 883 | + |
| 884 | + public void testSetJobState_withHandler_onResponse_triggersHandlerNull() throws IOException { |
| 885 | + AutodetectProcessManager manager = createSpyManager(); |
| 886 | + JobTask jobTask = mock(JobTask.class); |
| 887 | + |
| 888 | + // stub updatePersistentTaskState to call onResponse |
| 889 | + doAnswer(invocation -> { |
| 890 | + @SuppressWarnings("unchecked") |
| 891 | + ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = |
| 892 | + (ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>>) invocation.getArguments()[1]; |
| 893 | + listener.onResponse(null); |
| 894 | + return null; |
| 895 | + }).when(jobTask).updatePersistentTaskState(any(), any()); |
| 896 | + |
| 897 | + AtomicReference<Exception> holder = new AtomicReference<>(); |
| 898 | + CheckedConsumer<Exception, IOException> handler = holder::set; |
| 899 | + |
| 900 | + manager.setJobState(jobTask, JobState.FAILED, "fail-reason", handler); |
| 901 | + |
| 902 | + // onResponse should have driven handler.accept(null) |
| 903 | + assertNull(holder.get()); |
| 904 | + verify(jobTask).updatePersistentTaskState(any(JobTaskState.class), any()); |
| 905 | + } |
| 906 | + |
| 907 | + public void testSetJobState_withHandler_onFailure_triggersHandlerException() throws IOException { |
| 908 | + AutodetectProcessManager manager = createSpyManager(); |
| 909 | + JobTask jobTask = mock(JobTask.class); |
| 910 | + Exception boom = new RuntimeException("boom"); |
| 911 | + |
| 912 | + // stub updatePersistentTaskState to call onFailure |
| 913 | + doAnswer(invocation -> { |
| 914 | + @SuppressWarnings("unchecked") |
| 915 | + ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = |
| 916 | + (ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>>) invocation.getArguments()[1]; |
| 917 | + listener.onFailure(boom); |
| 918 | + return null; |
| 919 | + }).when(jobTask).updatePersistentTaskState(any(), any()); |
| 920 | + |
| 921 | + AtomicReference<Exception> holder = new AtomicReference<>(); |
| 922 | + CheckedConsumer<Exception, IOException> handler = holder::set; |
| 923 | + |
| 924 | + manager.setJobState(jobTask, JobState.FAILED, "fail-reason", handler); |
| 925 | + |
| 926 | + // onFailure should have driven handler.accept(boom) |
| 927 | + assertSame(boom, holder.get()); |
| 928 | + verify(jobTask).updatePersistentTaskState(any(JobTaskState.class), any()); |
| 929 | + } |
| 930 | + |
| 931 | + public void testSetJobState_withHandler_retriesUntilSuccess() throws IOException { |
| 932 | + when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), any(Executor.class))).thenAnswer(invocation -> { |
| 933 | + Runnable r = invocation.getArgument(0); |
| 934 | + r.run(); |
| 935 | + return mock(ThreadPool.Cancellable.class); |
| 936 | + }); |
| 937 | + AutodetectProcessManager manager = createSpyManager(); |
| 938 | + JobTask jobTask = mock(JobTask.class); |
| 939 | + AtomicInteger attempts = new AtomicInteger(); |
| 940 | + doAnswer(invocation -> { |
| 941 | + @SuppressWarnings("unchecked") |
| 942 | + ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = (ActionListener< |
| 943 | + PersistentTasksCustomMetadata.PersistentTask<?>>) invocation.getArguments()[1]; |
| 944 | + if (attempts.incrementAndGet() < 3) { |
| 945 | + listener.onFailure(new RuntimeException("transient failure")); |
| 946 | + } else { |
| 947 | + listener.onResponse(null); |
| 948 | + } |
| 949 | + return null; |
| 950 | + }).when(jobTask).updatePersistentTaskState(any(), any()); |
| 951 | + |
| 952 | + AtomicReference<Exception> holder = new AtomicReference<>(); |
| 953 | + CheckedConsumer<Exception, IOException> handler = holder::set; |
| 954 | + |
| 955 | + manager.setJobState(jobTask, JobState.OPENED, "retry-test", handler); |
| 956 | + |
| 957 | + verify(jobTask, times(3)).updatePersistentTaskState(any(JobTaskState.class), any()); |
| 958 | + assertNull(holder.get()); |
| 959 | + } |
| 960 | + |
| 961 | + public void testSetJobState_withHandler_noRetryOnResourceNotFound() throws IOException { |
| 962 | + AutodetectProcessManager manager = createSpyManager(); |
| 963 | + JobTask jobTask = mock(JobTask.class); |
| 964 | + ResourceNotFoundException rnfe = new ResourceNotFoundException("not found"); |
| 965 | + doAnswer(invocation -> { |
| 966 | + @SuppressWarnings("unchecked") |
| 967 | + ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = |
| 968 | + (ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>>) invocation.getArguments()[1]; |
| 969 | + listener.onFailure(rnfe); |
| 970 | + return null; |
| 971 | + }).when(jobTask).updatePersistentTaskState(any(), any()); |
| 972 | + |
| 973 | + AtomicReference<Exception> holder = new AtomicReference<>(); |
| 974 | + CheckedConsumer<Exception, IOException> handler = holder::set; |
| 975 | + |
| 976 | + manager.setJobState(jobTask, JobState.OPENED, "rnfe-test", handler); |
| 977 | + |
| 978 | + verify(jobTask, times(1)).updatePersistentTaskState(any(JobTaskState.class), any()); |
| 979 | + assertSame(rnfe, holder.get()); |
| 980 | + } |
| 981 | + |
857 | 982 | private AutodetectProcessManager createNonSpyManager(String jobId) { |
858 | 983 | ExecutorService executorService = mock(ExecutorService.class); |
859 | 984 | when(threadPool.executor(anyString())).thenReturn(executorService); |
|
0 commit comments