diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/OpenSearchIndexPartitionCreationSupplier.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/OpenSearchIndexPartitionCreationSupplier.java index 247a26b543..27ef37f6de 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/OpenSearchIndexPartitionCreationSupplier.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/OpenSearchIndexPartitionCreationSupplier.java @@ -18,6 +18,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.ClusterClientFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; import java.io.IOException; import java.util.Collections; @@ -26,7 +27,6 @@ import java.util.Objects; import java.util.function.Function; import java.util.regex.Matcher; -import java.util.stream.Collectors; public class OpenSearchIndexPartitionCreationSupplier implements Function, List> { @@ -82,10 +82,18 @@ private List applyForOpenSearchClient(final Map shouldIndexBeProcessed(osIndicesRecord.index())) - .map(indexRecord -> PartitionIdentifier.builder().withPartitionKey(indexRecord.index()).build()) - .collect(Collectors.toList()); + final List partitions = new java.util.ArrayList<>(); + int matchCount = 0; + for (final var esIndicesRecord : indicesResponse.valueBody()) { + if (shouldIndexBeProcessed(esIndicesRecord.index())) { + partitions.add(PartitionIdentifier.builder().withPartitionKey(esIndicesRecord.index()).build()); + matchCount++; + } + } + if (matchCount == 0) { + LOG.warn(NOISY, "No indices matched the configured regex patterns after applying include/exclude filters"); + } + return partitions; } private List applyForElasticSearchClient(final Map globalStateMap) { @@ -99,10 +107,18 @@ private List applyForElasticSearchClient(final Map shouldIndexBeProcessed(esIndicesRecord.index())) - .map(indexRecord -> PartitionIdentifier.builder().withPartitionKey(indexRecord.index()).build()) - .collect(Collectors.toList()); + final List partitions = new java.util.ArrayList<>(); + int matchCount = 0; + for (final var esIndicesRecord : indicesResponse.valueBody()) { + if (shouldIndexBeProcessed(esIndicesRecord.index())) { + partitions.add(PartitionIdentifier.builder().withPartitionKey(esIndicesRecord.index()).build()); + matchCount++; + } + } + if (matchCount == 0) { + LOG.warn(NOISY, "No indices matched the configured regex patterns after applying include/exclude filters"); + } + return partitions; } private boolean shouldIndexBeProcessed(final String indexName) {