diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/AbstractIndexInsightTask.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/AbstractIndexInsightTask.java index a266f95313..20d14983dd 100644 --- a/common/src/main/java/org/opensearch/ml/common/indexInsight/AbstractIndexInsightTask.java +++ b/common/src/main/java/org/opensearch/ml/common/indexInsight/AbstractIndexInsightTask.java @@ -105,7 +105,7 @@ public void execute(String tenantId, ActionListener listener) { .whenComplete((r, throwable) -> { if (throwable != null) { Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable); - listener.onFailure(cause); + handleError("Failed to search pattern matched documents for index: {}", cause, tenantId, listener, false); } else { SearchResponse searchResponse = r.searchResponse(); SearchHit[] hits = searchResponse.getHits().getHits(); @@ -203,9 +203,11 @@ protected void beginGeneration(String tenantId, ActionListener lis .lastUpdatedTime(Instant.now()) .build(); - writeIndexInsight(indexInsight, tenantId, ActionListener.wrap(r -> { runWithPrerequisites(tenantId, listener); }, e -> { - saveFailedStatus(tenantId, e, listener); - })); + writeIndexInsight( + indexInsight, + tenantId, + ActionListener.wrap(r -> { runWithPrerequisites(tenantId, listener); }, listener::onFailure) + ); } protected void runWithPrerequisites(String tenantId, ActionListener listener) { @@ -239,7 +241,7 @@ protected void saveResult(String content, String tenantId, ActionListener { listener.onResponse(insight); }, e -> { - saveFailedStatus(tenantId, e, listener); + handleError("Failed to save completed result for index: {}", e, tenantId, listener); })); } @@ -250,6 +252,7 @@ protected void saveFailedStatus(String tenantId, Exception error, ActionListener .index(sourceIndex) .taskType(taskType) .status(IndexInsightTaskStatus.FAILED) + .lastUpdatedTime(Instant.now()) .build(); writeIndexInsight( indexInsight, diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTask.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTask.java index a965160d28..d1ef8a4b24 100644 --- a/common/src/main/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTask.java +++ b/common/src/main/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTask.java @@ -52,11 +52,14 @@ public void runTask(String tenantId, ActionListener listener) { client, tenantId, ActionListener - .wrap(agentId -> { batchProcessFields(statisticalContentMap, agentId, tenantId, listener); }, listener::onFailure) + .wrap( + agentId -> { batchProcessFields(statisticalContentMap, agentId, tenantId, listener); }, + e -> handleError("Failed to get agent ID from ML config", e, tenantId, listener) + ) ); - }, e -> handleError("Failed to get statistical content for index {}", e, tenantId, listener))); + }, e -> handleError("Failed to get statistical content for index: {}", e, tenantId, listener))); } catch (Exception e) { - handleError("Failed to execute field description task for index {}", e, tenantId, listener); + handleError("Failed to execute field description task for index: {}", e, tenantId, listener); } } @@ -125,17 +128,12 @@ protected void handlePatternResult(Map patternSource, String ten listener.onResponse(insight); } catch (Exception e) { - log.error("Failed to process current index mapping for index {}", sourceIndex, e); - listener.onFailure(e); + handleError("Failed to process current index mapping for index: {}", e, tenantId, listener, false); } - }, e -> { - log.error("Failed to get current index mapping for index {}", sourceIndex, e); - listener.onFailure(e); - })); + }, e -> { handleError("Failed to get current index mapping for index: {}", e, tenantId, listener, false); })); } catch (Exception e) { - log.error("Failed to filter field descriptions for index {}", sourceIndex, e); - listener.onFailure(e); + handleError("Failed to filter field descriptions for index: {}", e, tenantId, listener, false); } } @@ -170,7 +168,7 @@ private void batchProcessFields( saveResult("", tenantId, ActionListener.wrap(insight -> { log.info("Empty field description completed for: {}", sourceIndex); listener.onResponse(insight); - }, e -> handleError("Failed to save empty field description result for index {}", e, tenantId, listener))); + }, e -> handleError("Failed to save empty field description result for index: {}", e, tenantId, listener))); return; } @@ -195,13 +193,12 @@ private void batchProcessFields( saveResult(gson.toJson(resultsMap), tenantId, ActionListener.wrap(insight -> { log.info("Field description completed for: {}", sourceIndex); listener.onResponse(insight); - }, e -> handleError("Failed to save field description result for index {}", e, tenantId, listener))); + }, e -> handleError("Failed to save field description result for index: {}", e, tenantId, listener))); } else { - handleError("Batch processing failed for index {}", new Exception("Batch processing failed"), tenantId, listener); + handleError("Batch processing failed for index: {}", new Exception("Batch processing failed"), tenantId, listener); } } catch (InterruptedException e) { - log.error("Batch processing interrupted for index: {}", sourceIndex); - handleError("Batch processing interrupted for index {}", e, tenantId, listener); + handleError("Batch processing interrupted for index: {}", e, tenantId, listener); } } @@ -231,7 +228,10 @@ private void processBatch( log.error("Error parsing response for batch in index {}: {}", sourceIndex, e.getMessage()); listener.onFailure(e); } - }, e -> { listener.onFailure(e); })); + }, e -> { + log.error("Failed to call LLM for batch processing in index {}: {}", sourceIndex, e.getMessage()); + listener.onFailure(e); + })); } private String generateBatchPrompt(List batchFields, Map statisticalContentMap) { diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTask.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTask.java index 23fbce76e0..48366415f9 100644 --- a/common/src/main/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTask.java +++ b/common/src/main/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTask.java @@ -92,11 +92,15 @@ public void runTask(String tenantId, ActionListener listener) { getAgentIdToRun( client, tenantId, - ActionListener.wrap(agentId -> performLogAnalysis(agentId, tenantId, listener), listener::onFailure) + ActionListener + .wrap( + agentId -> performLogAnalysis(agentId, tenantId, listener), + e -> handleError("Failed to get agent ID from ML config", e, tenantId, listener) + ) ); - }, listener::onFailure)); + }, e -> handleError("Failed to collect sample documents for index: {}", e, tenantId, listener))); } catch (Exception e) { - handleError("Failed log related check for {}", e, tenantId, listener); + handleError("Failed log related check for index: {}", e, tenantId, listener); } } @@ -136,13 +140,13 @@ private void performLogAnalysis(String agentId, String tenantId, ActionListener< try { Map parsed = parseCheckResponse(response); saveResult(MAPPER.writeValueAsString(parsed), tenantId, ActionListener.wrap(insight -> { - log.info("Log related check completed for index {}", sourceIndex); + log.info("Log related check completed for index: {}", sourceIndex); listener.onResponse(insight); - }, e -> handleError("Failed to save log related check result for index {}", e, tenantId, listener))); + }, e -> handleError("Failed to save log related check result for index: {}", e, tenantId, listener))); } catch (Exception e) { - handleError("Error parsing response of log related check for {}", e, tenantId, listener); + handleError("Error parsing response of log related check for index: {}", e, tenantId, listener); } - }, e -> handleError("Failed to call LLM for log related check: {}", e, tenantId, listener))); + }, e -> handleError("Failed to call LLM for log related check for index: {}", e, tenantId, listener))); } private Map parseCheckResponse(String resp) { diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/StatisticalDataTask.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/StatisticalDataTask.java index 215e2646c4..046ff15d97 100644 --- a/common/src/main/java/org/opensearch/ml/common/indexInsight/StatisticalDataTask.java +++ b/common/src/main/java/org/opensearch/ml/common/indexInsight/StatisticalDataTask.java @@ -101,7 +101,7 @@ public void runTask(String tenantId, ActionListener listener, bool try { collectStatisticalData(tenantId, shouldStore, listener); } catch (Exception e) { - handleError("Failed to execute statistical data task for index {}", e, tenantId, listener, shouldStore); + handleError("Failed to execute statistical data task for index: {}", e, tenantId, listener, shouldStore); } } @@ -122,7 +122,13 @@ private void collectStatisticalData(String tenantId, boolean shouldStore, Action client.admin().indices().getMappings(getMappingsRequest, ActionListener.wrap(getMappingsResponse -> { Map mappings = getMappingsResponse.getMappings(); if (mappings.isEmpty()) { - listener.onFailure(new IllegalArgumentException("No matching mapping with index name: " + sourceIndex)); + handleError( + "No matching mapping with index name: {}", + new IllegalArgumentException("No matching mapping with index name: " + sourceIndex), + tenantId, + listener, + shouldStore + ); return; } @@ -169,9 +175,9 @@ private void collectStatisticalData(String tenantId, boolean shouldStore, Action .build(); listener.onResponse(insight); } - }, listener::onFailure)); + }, e -> handleError("Failed to filter important columns by LLM for index: {}", e, tenantId, listener, shouldStore))); }, e -> handleError("Failed to collect statistical data for index: {}", e, tenantId, listener, shouldStore))); - }, listener::onFailure)); + }, e -> handleError("Failed to get mappings for index: {}", e, tenantId, listener, shouldStore))); } @Override @@ -364,7 +370,7 @@ private Map parseSearchResult( } catch (Exception e) { log .error( - "Failed to parse aggregation result from DSL in statistical index insight for index name: {}", + "Failed to parse aggregation result from DSL in statistical index insight for index: {}", sourceIndex, e ); diff --git a/common/src/test/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTaskTests.java b/common/src/test/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTaskTests.java index c0a0118733..a0eb059c50 100644 --- a/common/src/test/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTaskTests.java +++ b/common/src/test/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTaskTests.java @@ -142,6 +142,7 @@ public void testRunTask_MLConfigRetrievalFailure() { String statisticalContent = "{\"mapping\": {\"field1\": {\"type\": \"text\"}}}"; mockGetSuccess(sdkClient, statisticalContent); + mockUpdateSuccess(sdkClient); mockMLConfigFailure(client, "Config not found"); task.runTask("tenant-id", listener); diff --git a/common/src/test/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTaskTests.java b/common/src/test/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTaskTests.java index 992454abfc..202ced0f2d 100644 --- a/common/src/test/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTaskTests.java +++ b/common/src/test/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTaskTests.java @@ -277,6 +277,7 @@ public void testRunTask_MLConfigRetrievalFailure() { mockSearchResponse(); // Mock getAgentIdToRun mockMLConfigFailure(client, "Config not found"); + mockUpdateSuccess(sdkClient); ActionListener listener = mock(ActionListener.class); task.runTask("tenant-id", listener); diff --git a/common/src/test/java/org/opensearch/ml/common/indexInsight/StatisticalDataTaskTests.java b/common/src/test/java/org/opensearch/ml/common/indexInsight/StatisticalDataTaskTests.java index 131bf4799e..b1b70cd3ba 100644 --- a/common/src/test/java/org/opensearch/ml/common/indexInsight/StatisticalDataTaskTests.java +++ b/common/src/test/java/org/opensearch/ml/common/indexInsight/StatisticalDataTaskTests.java @@ -38,6 +38,8 @@ import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.cluster.metadata.MappingMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; @@ -52,6 +54,7 @@ import org.opensearch.search.aggregations.bucket.sampler.InternalSampler; import org.opensearch.search.aggregations.metrics.InternalTopHits; import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.client.AdminClient; import org.opensearch.transport.client.Client; import org.opensearch.transport.client.IndicesAdminClient; @@ -68,9 +71,14 @@ private Client setupBasicClientMocks() { Client client = mock(Client.class); AdminClient adminClient = mock(AdminClient.class); IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); + ThreadPool threadPool = mock(ThreadPool.class); + Settings settings = Settings.builder().build(); + ThreadContext threadContext = new ThreadContext(settings); when(client.admin()).thenReturn(adminClient); when(adminClient.indices()).thenReturn(indicesAdminClient); + when(client.threadPool()).thenReturn(threadPool); + when(threadPool.getThreadContext()).thenReturn(threadContext); return client; } @@ -184,6 +192,8 @@ public void testRunTask_WithEmptyMappings() { when(getMappingsResponse.getMappings()).thenReturn(new HashMap<>()); setupGetMappingsCall(client, getMappingsResponse); + sdkClient = mock(SdkClient.class); + mockUpdateSuccess(sdkClient); StatisticalDataTask task = new StatisticalDataTask("test-index", client, sdkClient); task.runTask("tenant-id", listener);