From 221f622269be857c6e691b8f5b66e8e8b6c8173f Mon Sep 17 00:00:00 2001 From: shuangli-z Date: Tue, 14 Oct 2025 15:07:00 +0800 Subject: [PATCH 1/4] Add lastUpdatedTime when saving failed status Signed-off-by: shuangli-z --- .../ml/common/indexInsight/AbstractIndexInsightTask.java | 1 + 1 file changed, 1 insertion(+) diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/AbstractIndexInsightTask.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/AbstractIndexInsightTask.java index a266f95313..c728c641fb 100644 --- a/common/src/main/java/org/opensearch/ml/common/indexInsight/AbstractIndexInsightTask.java +++ b/common/src/main/java/org/opensearch/ml/common/indexInsight/AbstractIndexInsightTask.java @@ -250,6 +250,7 @@ protected void saveFailedStatus(String tenantId, Exception error, ActionListener .index(sourceIndex) .taskType(taskType) .status(IndexInsightTaskStatus.FAILED) + .lastUpdatedTime(Instant.now()) .build(); writeIndexInsight( indexInsight, From 58c2b183c8d6a61a859bf5122b6a874123ab974d Mon Sep 17 00:00:00 2001 From: shuangli-z Date: Tue, 14 Oct 2025 15:16:39 +0800 Subject: [PATCH 2/4] Refine error messages and remove redundant logging Signed-off-by: shuangli-z --- .../indexInsight/FieldDescriptionTask.java | 18 ++++++++++-------- .../indexInsight/LogRelatedIndexCheckTask.java | 16 ++++++++++------ .../indexInsight/StatisticalDataTask.java | 4 ++-- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTask.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTask.java index a965160d28..27d28b03e4 100644 --- a/common/src/main/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTask.java +++ b/common/src/main/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTask.java @@ -52,11 +52,14 @@ public void runTask(String tenantId, ActionListener listener) { client, tenantId, ActionListener - .wrap(agentId -> { batchProcessFields(statisticalContentMap, agentId, tenantId, listener); }, listener::onFailure) + .wrap( + agentId -> { batchProcessFields(statisticalContentMap, agentId, tenantId, listener); }, + e -> handleError("Failed to get agent ID from ML config", e, tenantId, listener) + ) ); - }, e -> handleError("Failed to get statistical content for index {}", e, tenantId, listener))); + }, e -> handleError("Failed to get statistical content for index: {}", e, tenantId, listener))); } catch (Exception e) { - handleError("Failed to execute field description task for index {}", e, tenantId, listener); + handleError("Failed to execute field description task for index: {}", e, tenantId, listener); } } @@ -170,7 +173,7 @@ private void batchProcessFields( saveResult("", tenantId, ActionListener.wrap(insight -> { log.info("Empty field description completed for: {}", sourceIndex); listener.onResponse(insight); - }, e -> handleError("Failed to save empty field description result for index {}", e, tenantId, listener))); + }, e -> handleError("Failed to save empty field description result for index: {}", e, tenantId, listener))); return; } @@ -195,13 +198,12 @@ private void batchProcessFields( saveResult(gson.toJson(resultsMap), tenantId, ActionListener.wrap(insight -> { log.info("Field description completed for: {}", sourceIndex); listener.onResponse(insight); - }, e -> handleError("Failed to save field description result for index {}", e, tenantId, listener))); + }, e -> handleError("Failed to save field description result for index: {}", e, tenantId, listener))); } else { - handleError("Batch processing failed for index {}", new Exception("Batch processing failed"), tenantId, listener); + handleError("Batch processing failed for index: {}", new Exception("Batch processing failed"), tenantId, listener); } } catch (InterruptedException e) { - log.error("Batch processing interrupted for index: {}", sourceIndex); - handleError("Batch processing interrupted for index {}", e, tenantId, listener); + handleError("Batch processing interrupted for index: {}", e, tenantId, listener); } } diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTask.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTask.java index 23fbce76e0..d62a6de9de 100644 --- a/common/src/main/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTask.java +++ b/common/src/main/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTask.java @@ -92,11 +92,15 @@ public void runTask(String tenantId, ActionListener listener) { getAgentIdToRun( client, tenantId, - ActionListener.wrap(agentId -> performLogAnalysis(agentId, tenantId, listener), listener::onFailure) + ActionListener + .wrap( + agentId -> performLogAnalysis(agentId, tenantId, listener), + e -> handleError("Failed to get agent ID from ML config", e, tenantId, listener) + ) ); }, listener::onFailure)); } catch (Exception e) { - handleError("Failed log related check for {}", e, tenantId, listener); + handleError("Failed log related check for index: {}", e, tenantId, listener); } } @@ -136,13 +140,13 @@ private void performLogAnalysis(String agentId, String tenantId, ActionListener< try { Map parsed = parseCheckResponse(response); saveResult(MAPPER.writeValueAsString(parsed), tenantId, ActionListener.wrap(insight -> { - log.info("Log related check completed for index {}", sourceIndex); + log.info("Log related check completed for index: {}", sourceIndex); listener.onResponse(insight); - }, e -> handleError("Failed to save log related check result for index {}", e, tenantId, listener))); + }, e -> handleError("Failed to save log related check result for index: {}", e, tenantId, listener))); } catch (Exception e) { - handleError("Error parsing response of log related check for {}", e, tenantId, listener); + handleError("Error parsing response of log related check for index: {}", e, tenantId, listener); } - }, e -> handleError("Failed to call LLM for log related check: {}", e, tenantId, listener))); + }, e -> handleError("Failed to call LLM for log related check for index: {}", e, tenantId, listener))); } private Map parseCheckResponse(String resp) { diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/StatisticalDataTask.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/StatisticalDataTask.java index 215e2646c4..ed491c152a 100644 --- a/common/src/main/java/org/opensearch/ml/common/indexInsight/StatisticalDataTask.java +++ b/common/src/main/java/org/opensearch/ml/common/indexInsight/StatisticalDataTask.java @@ -101,7 +101,7 @@ public void runTask(String tenantId, ActionListener listener, bool try { collectStatisticalData(tenantId, shouldStore, listener); } catch (Exception e) { - handleError("Failed to execute statistical data task for index {}", e, tenantId, listener, shouldStore); + handleError("Failed to execute statistical data task for index: {}", e, tenantId, listener, shouldStore); } } @@ -364,7 +364,7 @@ private Map parseSearchResult( } catch (Exception e) { log .error( - "Failed to parse aggregation result from DSL in statistical index insight for index name: {}", + "Failed to parse aggregation result from DSL in statistical index insight for index: {}", sourceIndex, e ); From b4dbf8e5b2e081e7e5490beff29eefdc55ad2ca6 Mon Sep 17 00:00:00 2001 From: shuangli-z Date: Wed, 15 Oct 2025 13:44:19 +0800 Subject: [PATCH 3/4] Replace listener::onFailure with handleError after GENERATING status is written Signed-off-by: shuangli-z --- .../indexInsight/AbstractIndexInsightTask.java | 12 +++++++----- .../indexInsight/FieldDescriptionTask.java | 16 +++++++--------- .../indexInsight/LogRelatedIndexCheckTask.java | 2 +- .../common/indexInsight/StatisticalDataTask.java | 12 +++++++++--- 4 files changed, 24 insertions(+), 18 deletions(-) diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/AbstractIndexInsightTask.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/AbstractIndexInsightTask.java index c728c641fb..20d14983dd 100644 --- a/common/src/main/java/org/opensearch/ml/common/indexInsight/AbstractIndexInsightTask.java +++ b/common/src/main/java/org/opensearch/ml/common/indexInsight/AbstractIndexInsightTask.java @@ -105,7 +105,7 @@ public void execute(String tenantId, ActionListener listener) { .whenComplete((r, throwable) -> { if (throwable != null) { Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable); - listener.onFailure(cause); + handleError("Failed to search pattern matched documents for index: {}", cause, tenantId, listener, false); } else { SearchResponse searchResponse = r.searchResponse(); SearchHit[] hits = searchResponse.getHits().getHits(); @@ -203,9 +203,11 @@ protected void beginGeneration(String tenantId, ActionListener lis .lastUpdatedTime(Instant.now()) .build(); - writeIndexInsight(indexInsight, tenantId, ActionListener.wrap(r -> { runWithPrerequisites(tenantId, listener); }, e -> { - saveFailedStatus(tenantId, e, listener); - })); + writeIndexInsight( + indexInsight, + tenantId, + ActionListener.wrap(r -> { runWithPrerequisites(tenantId, listener); }, listener::onFailure) + ); } protected void runWithPrerequisites(String tenantId, ActionListener listener) { @@ -239,7 +241,7 @@ protected void saveResult(String content, String tenantId, ActionListener { listener.onResponse(insight); }, e -> { - saveFailedStatus(tenantId, e, listener); + handleError("Failed to save completed result for index: {}", e, tenantId, listener); })); } diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTask.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTask.java index 27d28b03e4..d1ef8a4b24 100644 --- a/common/src/main/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTask.java +++ b/common/src/main/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTask.java @@ -128,17 +128,12 @@ protected void handlePatternResult(Map patternSource, String ten listener.onResponse(insight); } catch (Exception e) { - log.error("Failed to process current index mapping for index {}", sourceIndex, e); - listener.onFailure(e); + handleError("Failed to process current index mapping for index: {}", e, tenantId, listener, false); } - }, e -> { - log.error("Failed to get current index mapping for index {}", sourceIndex, e); - listener.onFailure(e); - })); + }, e -> { handleError("Failed to get current index mapping for index: {}", e, tenantId, listener, false); })); } catch (Exception e) { - log.error("Failed to filter field descriptions for index {}", sourceIndex, e); - listener.onFailure(e); + handleError("Failed to filter field descriptions for index: {}", e, tenantId, listener, false); } } @@ -233,7 +228,10 @@ private void processBatch( log.error("Error parsing response for batch in index {}: {}", sourceIndex, e.getMessage()); listener.onFailure(e); } - }, e -> { listener.onFailure(e); })); + }, e -> { + log.error("Failed to call LLM for batch processing in index {}: {}", sourceIndex, e.getMessage()); + listener.onFailure(e); + })); } private String generateBatchPrompt(List batchFields, Map statisticalContentMap) { diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTask.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTask.java index d62a6de9de..48366415f9 100644 --- a/common/src/main/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTask.java +++ b/common/src/main/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTask.java @@ -98,7 +98,7 @@ public void runTask(String tenantId, ActionListener listener) { e -> handleError("Failed to get agent ID from ML config", e, tenantId, listener) ) ); - }, listener::onFailure)); + }, e -> handleError("Failed to collect sample documents for index: {}", e, tenantId, listener))); } catch (Exception e) { handleError("Failed log related check for index: {}", e, tenantId, listener); } diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/StatisticalDataTask.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/StatisticalDataTask.java index ed491c152a..046ff15d97 100644 --- a/common/src/main/java/org/opensearch/ml/common/indexInsight/StatisticalDataTask.java +++ b/common/src/main/java/org/opensearch/ml/common/indexInsight/StatisticalDataTask.java @@ -122,7 +122,13 @@ private void collectStatisticalData(String tenantId, boolean shouldStore, Action client.admin().indices().getMappings(getMappingsRequest, ActionListener.wrap(getMappingsResponse -> { Map mappings = getMappingsResponse.getMappings(); if (mappings.isEmpty()) { - listener.onFailure(new IllegalArgumentException("No matching mapping with index name: " + sourceIndex)); + handleError( + "No matching mapping with index name: {}", + new IllegalArgumentException("No matching mapping with index name: " + sourceIndex), + tenantId, + listener, + shouldStore + ); return; } @@ -169,9 +175,9 @@ private void collectStatisticalData(String tenantId, boolean shouldStore, Action .build(); listener.onResponse(insight); } - }, listener::onFailure)); + }, e -> handleError("Failed to filter important columns by LLM for index: {}", e, tenantId, listener, shouldStore))); }, e -> handleError("Failed to collect statistical data for index: {}", e, tenantId, listener, shouldStore))); - }, listener::onFailure)); + }, e -> handleError("Failed to get mappings for index: {}", e, tenantId, listener, shouldStore))); } @Override From 04c76bda41a7f15ba48b01de0e75696da51113c8 Mon Sep 17 00:00:00 2001 From: shuangli-z Date: Wed, 15 Oct 2025 15:05:34 +0800 Subject: [PATCH 4/4] fix UT Signed-off-by: shuangli-z --- .../common/indexInsight/FieldDescriptionTaskTests.java | 1 + .../indexInsight/LogRelatedIndexCheckTaskTests.java | 1 + .../common/indexInsight/StatisticalDataTaskTests.java | 10 ++++++++++ 3 files changed, 12 insertions(+) diff --git a/common/src/test/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTaskTests.java b/common/src/test/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTaskTests.java index c0a0118733..a0eb059c50 100644 --- a/common/src/test/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTaskTests.java +++ b/common/src/test/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTaskTests.java @@ -142,6 +142,7 @@ public void testRunTask_MLConfigRetrievalFailure() { String statisticalContent = "{\"mapping\": {\"field1\": {\"type\": \"text\"}}}"; mockGetSuccess(sdkClient, statisticalContent); + mockUpdateSuccess(sdkClient); mockMLConfigFailure(client, "Config not found"); task.runTask("tenant-id", listener); diff --git a/common/src/test/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTaskTests.java b/common/src/test/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTaskTests.java index 992454abfc..202ced0f2d 100644 --- a/common/src/test/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTaskTests.java +++ b/common/src/test/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTaskTests.java @@ -277,6 +277,7 @@ public void testRunTask_MLConfigRetrievalFailure() { mockSearchResponse(); // Mock getAgentIdToRun mockMLConfigFailure(client, "Config not found"); + mockUpdateSuccess(sdkClient); ActionListener listener = mock(ActionListener.class); task.runTask("tenant-id", listener); diff --git a/common/src/test/java/org/opensearch/ml/common/indexInsight/StatisticalDataTaskTests.java b/common/src/test/java/org/opensearch/ml/common/indexInsight/StatisticalDataTaskTests.java index 131bf4799e..b1b70cd3ba 100644 --- a/common/src/test/java/org/opensearch/ml/common/indexInsight/StatisticalDataTaskTests.java +++ b/common/src/test/java/org/opensearch/ml/common/indexInsight/StatisticalDataTaskTests.java @@ -38,6 +38,8 @@ import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.cluster.metadata.MappingMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; @@ -52,6 +54,7 @@ import org.opensearch.search.aggregations.bucket.sampler.InternalSampler; import org.opensearch.search.aggregations.metrics.InternalTopHits; import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.client.AdminClient; import org.opensearch.transport.client.Client; import org.opensearch.transport.client.IndicesAdminClient; @@ -68,9 +71,14 @@ private Client setupBasicClientMocks() { Client client = mock(Client.class); AdminClient adminClient = mock(AdminClient.class); IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); + ThreadPool threadPool = mock(ThreadPool.class); + Settings settings = Settings.builder().build(); + ThreadContext threadContext = new ThreadContext(settings); when(client.admin()).thenReturn(adminClient); when(adminClient.indices()).thenReturn(indicesAdminClient); + when(client.threadPool()).thenReturn(threadPool); + when(threadPool.getThreadContext()).thenReturn(threadContext); return client; } @@ -184,6 +192,8 @@ public void testRunTask_WithEmptyMappings() { when(getMappingsResponse.getMappings()).thenReturn(new HashMap<>()); setupGetMappingsCall(client, getMappingsResponse); + sdkClient = mock(SdkClient.class); + mockUpdateSuccess(sdkClient); StatisticalDataTask task = new StatisticalDataTask("test-index", client, sdkClient); task.runTask("tenant-id", listener);