Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void execute(String tenantId, ActionListener<IndexInsight> listener) {
.whenComplete((r, throwable) -> {
if (throwable != null) {
Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable);
listener.onFailure(cause);
handleError("Failed to search pattern matched documents for index: {}", cause, tenantId, listener, false);
} else {
SearchResponse searchResponse = r.searchResponse();
SearchHit[] hits = searchResponse.getHits().getHits();
Expand Down Expand Up @@ -203,9 +203,11 @@ protected void beginGeneration(String tenantId, ActionListener<IndexInsight> lis
.lastUpdatedTime(Instant.now())
.build();

writeIndexInsight(indexInsight, tenantId, ActionListener.wrap(r -> { runWithPrerequisites(tenantId, listener); }, e -> {
saveFailedStatus(tenantId, e, listener);
}));
writeIndexInsight(
indexInsight,
tenantId,
ActionListener.wrap(r -> { runWithPrerequisites(tenantId, listener); }, listener::onFailure)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why in here we don't save the failure status?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because if writing the initial GENERATING status fails, the insight will not be stored, so there is no need to update to FAILED status. Also, attempting to save a FAILED status would likely fail for the same reason.

);
}

protected void runWithPrerequisites(String tenantId, ActionListener<IndexInsight> listener) {
Expand Down Expand Up @@ -239,7 +241,7 @@ protected void saveResult(String content, String tenantId, ActionListener<IndexI
.build();

writeIndexInsight(insight, tenantId, ActionListener.wrap(r -> { listener.onResponse(insight); }, e -> {
saveFailedStatus(tenantId, e, listener);
handleError("Failed to save completed result for index: {}", e, tenantId, listener);
}));
}

Expand All @@ -250,6 +252,7 @@ protected void saveFailedStatus(String tenantId, Exception error, ActionListener
.index(sourceIndex)
.taskType(taskType)
.status(IndexInsightTaskStatus.FAILED)
.lastUpdatedTime(Instant.now())
.build();
writeIndexInsight(
indexInsight,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,14 @@ public void runTask(String tenantId, ActionListener<IndexInsight> listener) {
client,
tenantId,
ActionListener
.wrap(agentId -> { batchProcessFields(statisticalContentMap, agentId, tenantId, listener); }, listener::onFailure)
.wrap(
agentId -> { batchProcessFields(statisticalContentMap, agentId, tenantId, listener); },
e -> handleError("Failed to get agent ID from ML config", e, tenantId, listener)
)
);
}, e -> handleError("Failed to get statistical content for index {}", e, tenantId, listener)));
}, e -> handleError("Failed to get statistical content for index: {}", e, tenantId, listener)));
} catch (Exception e) {
handleError("Failed to execute field description task for index {}", e, tenantId, listener);
handleError("Failed to execute field description task for index: {}", e, tenantId, listener);
}
}

Expand Down Expand Up @@ -125,17 +128,12 @@ protected void handlePatternResult(Map<String, Object> patternSource, String ten
listener.onResponse(insight);

} catch (Exception e) {
log.error("Failed to process current index mapping for index {}", sourceIndex, e);
listener.onFailure(e);
handleError("Failed to process current index mapping for index: {}", e, tenantId, listener, false);
}
}, e -> {
log.error("Failed to get current index mapping for index {}", sourceIndex, e);
listener.onFailure(e);
}));
}, e -> { handleError("Failed to get current index mapping for index: {}", e, tenantId, listener, false); }));

} catch (Exception e) {
log.error("Failed to filter field descriptions for index {}", sourceIndex, e);
listener.onFailure(e);
handleError("Failed to filter field descriptions for index: {}", e, tenantId, listener, false);
}
}

Expand Down Expand Up @@ -170,7 +168,7 @@ private void batchProcessFields(
saveResult("", tenantId, ActionListener.wrap(insight -> {
log.info("Empty field description completed for: {}", sourceIndex);
listener.onResponse(insight);
}, e -> handleError("Failed to save empty field description result for index {}", e, tenantId, listener)));
}, e -> handleError("Failed to save empty field description result for index: {}", e, tenantId, listener)));
return;
}

Expand All @@ -195,13 +193,12 @@ private void batchProcessFields(
saveResult(gson.toJson(resultsMap), tenantId, ActionListener.wrap(insight -> {
log.info("Field description completed for: {}", sourceIndex);
listener.onResponse(insight);
}, e -> handleError("Failed to save field description result for index {}", e, tenantId, listener)));
}, e -> handleError("Failed to save field description result for index: {}", e, tenantId, listener)));
} else {
handleError("Batch processing failed for index {}", new Exception("Batch processing failed"), tenantId, listener);
handleError("Batch processing failed for index: {}", new Exception("Batch processing failed"), tenantId, listener);
}
} catch (InterruptedException e) {
log.error("Batch processing interrupted for index: {}", sourceIndex);
handleError("Batch processing interrupted for index {}", e, tenantId, listener);
handleError("Batch processing interrupted for index: {}", e, tenantId, listener);
}
}

Expand Down Expand Up @@ -231,7 +228,10 @@ private void processBatch(
log.error("Error parsing response for batch in index {}: {}", sourceIndex, e.getMessage());
listener.onFailure(e);
}
}, e -> { listener.onFailure(e); }));
}, e -> {
log.error("Failed to call LLM for batch processing in index {}: {}", sourceIndex, e.getMessage());
listener.onFailure(e);
}));
}

private String generateBatchPrompt(List<String> batchFields, Map<String, Object> statisticalContentMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,15 @@ public void runTask(String tenantId, ActionListener<IndexInsight> listener) {
getAgentIdToRun(
client,
tenantId,
ActionListener.wrap(agentId -> performLogAnalysis(agentId, tenantId, listener), listener::onFailure)
ActionListener
.wrap(
agentId -> performLogAnalysis(agentId, tenantId, listener),
e -> handleError("Failed to get agent ID from ML config", e, tenantId, listener)
)
);
}, listener::onFailure));
}, e -> handleError("Failed to collect sample documents for index: {}", e, tenantId, listener)));
} catch (Exception e) {
handleError("Failed log related check for {}", e, tenantId, listener);
handleError("Failed log related check for index: {}", e, tenantId, listener);
}
}

Expand Down Expand Up @@ -136,13 +140,13 @@ private void performLogAnalysis(String agentId, String tenantId, ActionListener<
try {
Map<String, Object> parsed = parseCheckResponse(response);
saveResult(MAPPER.writeValueAsString(parsed), tenantId, ActionListener.wrap(insight -> {
log.info("Log related check completed for index {}", sourceIndex);
log.info("Log related check completed for index: {}", sourceIndex);
listener.onResponse(insight);
}, e -> handleError("Failed to save log related check result for index {}", e, tenantId, listener)));
}, e -> handleError("Failed to save log related check result for index: {}", e, tenantId, listener)));
} catch (Exception e) {
handleError("Error parsing response of log related check for {}", e, tenantId, listener);
handleError("Error parsing response of log related check for index: {}", e, tenantId, listener);
}
}, e -> handleError("Failed to call LLM for log related check: {}", e, tenantId, listener)));
}, e -> handleError("Failed to call LLM for log related check for index: {}", e, tenantId, listener)));
}

private Map<String, Object> parseCheckResponse(String resp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void runTask(String tenantId, ActionListener<IndexInsight> listener, bool
try {
collectStatisticalData(tenantId, shouldStore, listener);
} catch (Exception e) {
handleError("Failed to execute statistical data task for index {}", e, tenantId, listener, shouldStore);
handleError("Failed to execute statistical data task for index: {}", e, tenantId, listener, shouldStore);
}
}

Expand All @@ -122,7 +122,13 @@ private void collectStatisticalData(String tenantId, boolean shouldStore, Action
client.admin().indices().getMappings(getMappingsRequest, ActionListener.wrap(getMappingsResponse -> {
Map<String, MappingMetadata> mappings = getMappingsResponse.getMappings();
if (mappings.isEmpty()) {
listener.onFailure(new IllegalArgumentException("No matching mapping with index name: " + sourceIndex));
handleError(
"No matching mapping with index name: {}",
new IllegalArgumentException("No matching mapping with index name: " + sourceIndex),
tenantId,
listener,
shouldStore
);
return;
}

Expand Down Expand Up @@ -169,9 +175,9 @@ private void collectStatisticalData(String tenantId, boolean shouldStore, Action
.build();
listener.onResponse(insight);
}
}, listener::onFailure));
}, e -> handleError("Failed to filter important columns by LLM for index: {}", e, tenantId, listener, shouldStore)));
}, e -> handleError("Failed to collect statistical data for index: {}", e, tenantId, listener, shouldStore)));
}, listener::onFailure));
}, e -> handleError("Failed to get mappings for index: {}", e, tenantId, listener, shouldStore)));
}

@Override
Expand Down Expand Up @@ -364,7 +370,7 @@ private Map<String, Object> parseSearchResult(
} catch (Exception e) {
log
.error(
"Failed to parse aggregation result from DSL in statistical index insight for index name: {}",
"Failed to parse aggregation result from DSL in statistical index insight for index: {}",
sourceIndex,
e
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public void testRunTask_MLConfigRetrievalFailure() {
String statisticalContent = "{\"mapping\": {\"field1\": {\"type\": \"text\"}}}";

mockGetSuccess(sdkClient, statisticalContent);
mockUpdateSuccess(sdkClient);
mockMLConfigFailure(client, "Config not found");

task.runTask("tenant-id", listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ public void testRunTask_MLConfigRetrievalFailure() {
mockSearchResponse();
// Mock getAgentIdToRun
mockMLConfigFailure(client, "Config not found");
mockUpdateSuccess(sdkClient);

ActionListener<IndexInsight> listener = mock(ActionListener.class);
task.runTask("tenant-id", listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
Expand All @@ -52,6 +54,7 @@
import org.opensearch.search.aggregations.bucket.sampler.InternalSampler;
import org.opensearch.search.aggregations.metrics.InternalTopHits;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.AdminClient;
import org.opensearch.transport.client.Client;
import org.opensearch.transport.client.IndicesAdminClient;
Expand All @@ -68,9 +71,14 @@ private Client setupBasicClientMocks() {
Client client = mock(Client.class);
AdminClient adminClient = mock(AdminClient.class);
IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
ThreadPool threadPool = mock(ThreadPool.class);
Settings settings = Settings.builder().build();
ThreadContext threadContext = new ThreadContext(settings);

when(client.admin()).thenReturn(adminClient);
when(adminClient.indices()).thenReturn(indicesAdminClient);
when(client.threadPool()).thenReturn(threadPool);
when(threadPool.getThreadContext()).thenReturn(threadContext);

return client;
}
Expand Down Expand Up @@ -184,6 +192,8 @@ public void testRunTask_WithEmptyMappings() {

when(getMappingsResponse.getMappings()).thenReturn(new HashMap<>());
setupGetMappingsCall(client, getMappingsResponse);
sdkClient = mock(SdkClient.class);
mockUpdateSuccess(sdkClient);

StatisticalDataTask task = new StatisticalDataTask("test-index", client, sdkClient);
task.runTask("tenant-id", listener);
Expand Down