diff --git a/CHANGELOG.md b/CHANGELOG.md index a782df67a7c8e..d9db0161e0f74 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Added support of WarmerRefreshListener in NRTReplicationEngine to trigger warmer after replication on replica shards ([#20650](https://github.com/opensearch-project/OpenSearch/pull/20650)) - WLM group custom search settings - groundwork and timeout ([#20536](https://github.com/opensearch-project/OpenSearch/issues/20536)) - Expose JVM runtime metrics via telemetry framework ([#20844](https://github.com/opensearch-project/OpenSearch/pull/20844)) +- Add intra segment support for single-value metric aggregations ([#20503](https://github.com/opensearch-project/OpenSearch/pull/20503)) ### Changed - Make telemetry `Tags` immutable ([#20788](https://github.com/opensearch-project/OpenSearch/pull/20788)) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/AvgIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/AvgIT.java new file mode 100644 index 0000000000000..ee0131e30e5ce --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/AvgIT.java @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.metrics; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.opensearch.action.index.IndexRequestBuilder; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; +import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE; +import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY; +import static org.opensearch.search.aggregations.AggregationBuilders.avg; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.notNullValue; + +/** + * Integration tests for avg aggregation with concurrent segment search partition strategies. + */ +public class AvgIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { + + public AvgIT(Settings staticSettings) { + super(staticSettings); + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "segment").build() }, + new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "force").build() }, + new Object[] { + Settings.builder() + .put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "balanced") + .put(CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE.getKey(), 1000) + .build() } + ); + } + + public void testAvgAggregation() throws Exception { + createIndex("test_avg_agg", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build()); + try { + List builders = new ArrayList<>(5000); + for (int i = 0; i < 5000; i++) { + builders.add(client().prepareIndex("test_avg_agg").setSource("value", i + 1)); + } + indexBulkWithSegments(builders, 2); + indexRandomForConcurrentSearch("test_avg_agg"); + SearchResponse response = client().prepareSearch("test_avg_agg").addAggregation(avg("avg_agg").field("value")).get(); + assertSearchResponse(response); + Avg avgAgg = response.getAggregations().get("avg_agg"); + assertThat(avgAgg, notNullValue()); + assertThat(avgAgg.getValue(), closeTo(2500.5, 0.1)); + } finally { + internalCluster().wipeIndices("test_avg_agg"); + } + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/CardinalityIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/CardinalityIT.java index b2ed689622e7d..d3d0372a6d098 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/CardinalityIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/CardinalityIT.java @@ -50,10 +50,12 @@ import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.function.Function; @@ -62,6 +64,8 @@ import static org.opensearch.index.query.QueryBuilders.matchAllQuery; import static org.opensearch.search.SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; +import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE; +import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY; import static org.opensearch.search.aggregations.AggregationBuilders.cardinality; import static org.opensearch.search.aggregations.AggregationBuilders.global; import static org.opensearch.search.aggregations.AggregationBuilders.terms; @@ -82,7 +86,13 @@ public CardinalityIT(Settings staticSettings) { public static Collection parameters() { return Arrays.asList( new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, - new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() } + new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "segment").build() }, + new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "force").build() }, + new Object[] { + Settings.builder() + .put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "balanced") + .put(CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE.getKey(), 1000) + .build() } ); } @@ -666,4 +676,24 @@ public void testScriptCaching() throws Exception { ); internalCluster().wipeIndices("cache_test_idx"); } + + public void testCardinalityWithIntraSegmentPartitioning() throws Exception { + createIndex("test_cardinality_agg", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build()); + try { + List builders = new ArrayList<>(5000); + for (int i = 0; i < 5000; i++) { + builders.add(client().prepareIndex("test_cardinality_agg").setSource("category", i % 100)); + } + indexBulkWithSegments(builders, 2); + indexRandomForConcurrentSearch("test_cardinality_agg"); + SearchResponse response = client().prepareSearch("test_cardinality_agg") + .addAggregation(cardinality("cardinality").field("category")) + .get(); + Cardinality cardinalityAgg = response.getAggregations().get("cardinality"); + assertThat(cardinalityAgg, notNullValue()); + assertThat(cardinalityAgg.getValue(), equalTo(100L)); + } finally { + internalCluster().wipeIndices("test_cardinality_agg"); + } + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/MaxIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/MaxIT.java new file mode 100644 index 0000000000000..3f849253fd996 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/MaxIT.java @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.metrics; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.opensearch.action.index.IndexRequestBuilder; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; +import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE; +import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY; +import static org.opensearch.search.aggregations.AggregationBuilders.max; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.notNullValue; + +/** + * Integration tests for max aggregation with concurrent segment search partition strategies. + */ +public class MaxIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { + + public MaxIT(Settings staticSettings) { + super(staticSettings); + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "segment").build() }, + new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "force").build() }, + new Object[] { + Settings.builder() + .put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "balanced") + .put(CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE.getKey(), 1000) + .build() } + ); + } + + public void testMaxAggregation() throws Exception { + createIndex("test_max_agg", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build()); + try { + List builders = new ArrayList<>(5000); + for (int i = 0; i < 5000; i++) { + builders.add(client().prepareIndex("test_max_agg").setSource("value", i + 1)); + } + indexBulkWithSegments(builders, 2); + indexRandomForConcurrentSearch("test_max_agg"); + SearchResponse response = client().prepareSearch("test_max_agg").addAggregation(max("max_agg").field("value")).get(); + assertSearchResponse(response); + Max maxAgg = response.getAggregations().get("max_agg"); + assertThat(maxAgg, notNullValue()); + assertThat(maxAgg.getValue(), closeTo(5000.0, 0.1)); + } finally { + internalCluster().wipeIndices("test_max_agg"); + } + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/MinIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/MinIT.java new file mode 100644 index 0000000000000..3cb3cf8581247 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/MinIT.java @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.metrics; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.opensearch.action.index.IndexRequestBuilder; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; +import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE; +import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY; +import static org.opensearch.search.aggregations.AggregationBuilders.min; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.notNullValue; + +/** + * Integration tests for min aggregation with concurrent segment search partition strategies. + */ +public class MinIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { + + public MinIT(Settings staticSettings) { + super(staticSettings); + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "segment").build() }, + new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "force").build() }, + new Object[] { + Settings.builder() + .put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "balanced") + .put(CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE.getKey(), 1000) + .build() } + ); + } + + public void testMinAggregation() throws Exception { + createIndex("test_min_agg", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build()); + try { + List builders = new ArrayList<>(5000); + for (int i = 0; i < 5000; i++) { + builders.add(client().prepareIndex("test_min_agg").setSource("value", i + 1)); + } + indexBulkWithSegments(builders, 2); + indexRandomForConcurrentSearch("test_min_agg"); + SearchResponse response = client().prepareSearch("test_min_agg").addAggregation(min("min_agg").field("value")).get(); + assertSearchResponse(response); + Min minAgg = response.getAggregations().get("min_agg"); + assertThat(minAgg, notNullValue()); + assertThat(minAgg.getValue(), closeTo(1.0, 0.1)); + } finally { + internalCluster().wipeIndices("test_min_agg"); + } + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/StatsIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/StatsIT.java index c7df3efd6ac1e..88b09fe7dffe2 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/StatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/StatsIT.java @@ -31,7 +31,10 @@ package org.opensearch.search.aggregations.metrics; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.ShardSearchFailure; import org.opensearch.common.settings.Settings; @@ -46,12 +49,17 @@ import org.opensearch.search.aggregations.bucket.histogram.Histogram; import org.opensearch.search.aggregations.bucket.terms.Terms; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; import static org.opensearch.index.query.QueryBuilders.matchAllQuery; import static org.opensearch.index.query.QueryBuilders.termQuery; +import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; +import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE; +import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY; import static org.opensearch.search.aggregations.AggregationBuilders.filter; import static org.opensearch.search.aggregations.AggregationBuilders.global; import static org.opensearch.search.aggregations.AggregationBuilders.histogram; @@ -60,6 +68,7 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -70,6 +79,20 @@ public StatsIT(Settings staticSettings) { super(staticSettings); } + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "segment").build() }, + new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "force").build() }, + new Object[] { + Settings.builder() + .put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "balanced") + .put(CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE.getKey(), 1000) + .build() } + ); + } + @Override protected Collection> nodePlugins() { return Collections.singleton(AggregationTestScriptsPlugin.class); @@ -390,4 +413,28 @@ public void testScriptCaching() throws Exception { ); internalCluster().wipeIndices("cache_test_idx"); } + + public void testStatsWithIntraSegmentPartitioning() throws Exception { + createIndex("test_stats_agg", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build()); + try { + long expectedSum = 0; + List builders = new ArrayList<>(5000); + for (int i = 0; i < 5000; i++) { + expectedSum += (i + 1); + builders.add(client().prepareIndex("test_stats_agg").setSource("value", i + 1)); + } + indexBulkWithSegments(builders, 2); + indexRandomForConcurrentSearch("test_stats_agg"); + SearchResponse response = client().prepareSearch("test_stats_agg").addAggregation(stats("stats").field("value")).get(); + Stats statsAgg = response.getAggregations().get("stats"); + assertThat(statsAgg, notNullValue()); + assertThat(statsAgg.getCount(), equalTo(5000L)); + assertThat(statsAgg.getMin(), closeTo(1.0, 0.1)); + assertThat(statsAgg.getMax(), closeTo(5000.0, 0.1)); + assertThat(statsAgg.getSum(), closeTo((double) expectedSum, 0.1)); + assertThat(statsAgg.getAvg(), closeTo((double) expectedSum / 5000, 0.1)); + } finally { + internalCluster().wipeIndices("test_stats_agg"); + } + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/SumIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/SumIT.java index 8886713caa42b..abeeeebdc6195 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/SumIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/SumIT.java @@ -31,6 +31,8 @@ package org.opensearch.search.aggregations.metrics; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.search.SearchResponse; import org.opensearch.common.settings.Settings; @@ -46,12 +48,16 @@ import org.hamcrest.core.IsNull; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; import static org.opensearch.index.query.QueryBuilders.matchAllQuery; import static org.opensearch.index.query.QueryBuilders.termQuery; +import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; +import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE; +import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY; import static org.opensearch.search.aggregations.AggregationBuilders.filter; import static org.opensearch.search.aggregations.AggregationBuilders.global; import static org.opensearch.search.aggregations.AggregationBuilders.histogram; @@ -63,6 +69,7 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; @@ -72,6 +79,20 @@ public SumIT(Settings staticSettings) { super(staticSettings); } + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "segment").build() }, + new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "force").build() }, + new Object[] { + Settings.builder() + .put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "balanced") + .put(CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE.getKey(), 1000) + .build() } + ); + } + @Override protected Collection> nodePlugins() { return Collections.singleton(MetricAggScriptPlugin.class); @@ -411,4 +432,24 @@ public void testFieldAliasInSubAggregation() { assertThat(sum, notNullValue()); assertThat(sum.getValue(), equalTo(50.5)); } + + public void testSumWithIntraSegmentPartitioning() throws Exception { + createIndex("test_sum_agg", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build()); + try { + long expectedSum = 0; + List builders = new ArrayList<>(5000); + for (int i = 0; i < 5000; i++) { + expectedSum += (i + 1); + builders.add(client().prepareIndex("test_sum_agg").setSource("value", i + 1)); + } + indexBulkWithSegments(builders, 2); + indexRandomForConcurrentSearch("test_sum_agg"); + SearchResponse response = client().prepareSearch("test_sum_agg").addAggregation(sum("sum").field("value")).get(); + Sum sumAgg = response.getAggregations().get("sum"); + assertThat(sumAgg, notNullValue()); + assertThat(sumAgg.getValue(), closeTo((double) expectedSum, 0.1)); + } finally { + internalCluster().wipeIndices("test_sum_agg"); + } + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/ValueCountIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/ValueCountIT.java index c17b2108d4077..5ab3a9bb1c01e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/ValueCountIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/ValueCountIT.java @@ -33,6 +33,7 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.search.SearchResponse; import org.opensearch.common.settings.Settings; import org.opensearch.plugins.Plugin; @@ -46,6 +47,7 @@ import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -56,6 +58,8 @@ import static org.opensearch.index.query.QueryBuilders.matchAllQuery; import static org.opensearch.index.query.QueryBuilders.termQuery; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; +import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE; +import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY; import static org.opensearch.search.aggregations.AggregationBuilders.count; import static org.opensearch.search.aggregations.AggregationBuilders.filter; import static org.opensearch.search.aggregations.AggregationBuilders.global; @@ -82,7 +86,13 @@ public ValueCountIT(Settings staticSettings) { public static Collection parameters() { return Arrays.asList( new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, - new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() } + new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "segment").build() }, + new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "force").build() }, + new Object[] { + Settings.builder() + .put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "balanced") + .put(CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE.getKey(), 1000) + .build() } ); } @@ -416,4 +426,22 @@ public void testOrderByEmptyAggregation() throws Exception { } } + + public void testValueCountWithIntraSegmentPartitioning() throws Exception { + createIndex("test_value_count_agg", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build()); + try { + List builders = new ArrayList<>(5000); + for (int i = 0; i < 5000; i++) { + builders.add(client().prepareIndex("test_value_count_agg").setSource("value", i + 1)); + } + indexBulkWithSegments(builders, 2); + indexRandomForConcurrentSearch("test_value_count_agg"); + SearchResponse response = client().prepareSearch("test_value_count_agg").addAggregation(count("count").field("value")).get(); + ValueCount countAgg = response.getAggregations().get("count"); + assertThat(countAgg, notNullValue()); + assertThat(countAgg.getValue(), equalTo(5000L)); + } finally { + internalCluster().wipeIndices("test_value_count_agg"); + } + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java index 57389f19b4577..212090d35a5ed 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java @@ -100,4 +100,9 @@ protected Aggregator doCreateInternal( protected boolean supportsConcurrentSegmentSearch() { return true; } + + @Override + protected boolean supportsIntraSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.java index 977d5202fa569..dbccaa2c1b7d0 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.java @@ -162,6 +162,11 @@ protected boolean supportsConcurrentSegmentSearch() { return true; } + @Override + protected boolean supportsIntraSegmentSearch() { + return true; + } + private int precision() { return precisionThreshold == null ? HyperLogLogPlusPlus.DEFAULT_PRECISION diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java index 90b61fb0d0866..1aa5b88426957 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java @@ -103,6 +103,11 @@ protected boolean supportsConcurrentSegmentSearch() { return true; } + @Override + protected boolean supportsIntraSegmentSearch() { + return true; + } + @Override public StreamingCostMetrics estimateStreamingCost(SearchContext searchContext) { return StreamingCostMetrics.neutral(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java index 7f553d94ccd38..3659ece739166 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java @@ -103,6 +103,11 @@ protected boolean supportsConcurrentSegmentSearch() { return true; } + @Override + protected boolean supportsIntraSegmentSearch() { + return true; + } + @Override public StreamingCostMetrics estimateStreamingCost(SearchContext searchContext) { return StreamingCostMetrics.neutral(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/StatsAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/StatsAggregatorFactory.java index 0e96e631044dd..f3f6c29584ab9 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/StatsAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/StatsAggregatorFactory.java @@ -95,4 +95,9 @@ protected Aggregator doCreateInternal( protected boolean supportsConcurrentSegmentSearch() { return true; } + + @Override + protected boolean supportsIntraSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java index eb1e35687e0f4..cf9179e65deb6 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java @@ -103,6 +103,11 @@ protected boolean supportsConcurrentSegmentSearch() { return true; } + @Override + protected boolean supportsIntraSegmentSearch() { + return true; + } + @Override public StreamingCostMetrics estimateStreamingCost(SearchContext searchContext) { return StreamingCostMetrics.neutral(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java index 0c82279484461..d865a038ea881 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java @@ -94,4 +94,9 @@ protected Aggregator doCreateInternal( protected boolean supportsConcurrentSegmentSearch() { return true; } + + @Override + protected boolean supportsIntraSegmentSearch() { + return true; + } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/metrics/AvgAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/metrics/AvgAggregatorTests.java index bfe93286a564a..9d6e4458d32ff 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/metrics/AvgAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/metrics/AvgAggregatorTests.java @@ -58,6 +58,7 @@ import org.opensearch.script.ScriptType; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.AggregationBuilders; +import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.AggregatorTestCase; import org.opensearch.search.aggregations.BucketOrder; import org.opensearch.search.aggregations.bucket.filter.Filter; @@ -716,4 +717,22 @@ protected List getSupportedValuesSourceTypes() { protected AggregationBuilder createAggBuilderForTypeTest(MappedFieldType fieldType, String fieldName) { return new AvgAggregationBuilder("foo").field(fieldName); } + + public void testSupportsIntraSegmentSearch() throws IOException { + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.LONG); + try (Directory directory = newDirectory(); RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + indexWriter.addDocument(singleton(new NumericDocValuesField("value", 1))); + try (IndexReader reader = indexWriter.getReader()) { + IndexSearcher searcher = newIndexSearcher(reader); + AggregatorFactories factories = AggregatorFactories.builder() + .addAggregator(new AvgAggregationBuilder("test").field("value")) + .build( + createSearchContext(searcher, createIndexSettings(), new MatchAllDocsQuery(), null, fieldType) + .getQueryShardContext(), + null + ); + assertTrue(factories.allFactoriesSupportIntraSegmentSearch()); + } + } + } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorTests.java index 4b364883d8d5a..eab6184000d0a 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorTests.java @@ -65,6 +65,7 @@ import org.opensearch.index.mapper.RangeFieldMapper; import org.opensearch.index.mapper.RangeType; import org.opensearch.search.aggregations.AggregationBuilder; +import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.AggregatorTestCase; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.LeafBucketCollector; @@ -860,4 +861,22 @@ public void testMemoryLimitExceptionSingleton() { // Test that it has no stack trace for performance assertEquals(0, ex1.getStackTrace().length); } + + public void testSupportsIntraSegmentSearch() throws IOException { + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.LONG); + try (Directory directory = newDirectory(); RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + indexWriter.addDocument(singleton(new NumericDocValuesField("value", 1))); + try (IndexReader reader = indexWriter.getReader()) { + IndexSearcher searcher = newIndexSearcher(reader); + AggregatorFactories factories = AggregatorFactories.builder() + .addAggregator(new CardinalityAggregationBuilder("test").field("value")) + .build( + createSearchContext(searcher, createIndexSettings(), new MatchAllDocsQuery(), null, fieldType) + .getQueryShardContext(), + null + ); + assertTrue(factories.allFactoriesSupportIntraSegmentSearch()); + } + } + } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/metrics/MaxAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/metrics/MaxAggregatorTests.java index ddd43866f55d4..d10395be1c5c3 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/metrics/MaxAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/metrics/MaxAggregatorTests.java @@ -71,6 +71,7 @@ import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.AggregationBuilders; import org.opensearch.search.aggregations.Aggregator; +import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.AggregatorTestCase; import org.opensearch.search.aggregations.BucketCollector; import org.opensearch.search.aggregations.BucketOrder; @@ -1027,4 +1028,22 @@ public void testDoReset() throws IOException { indexReader.close(); directory.close(); } + + public void testSupportsIntraSegmentSearch() throws IOException { + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.LONG); + try (Directory directory = newDirectory(); RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + indexWriter.addDocument(singleton(new NumericDocValuesField("value", 1))); + try (IndexReader reader = indexWriter.getReader()) { + IndexSearcher searcher = newIndexSearcher(reader); + AggregatorFactories factories = AggregatorFactories.builder() + .addAggregator(new MaxAggregationBuilder("test").field("value")) + .build( + createSearchContext(searcher, createIndexSettings(), new MatchAllDocsQuery(), null, fieldType) + .getQueryShardContext(), + null + ); + assertTrue(factories.allFactoriesSupportIntraSegmentSearch()); + } + } + } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/metrics/MinAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/metrics/MinAggregatorTests.java index 225831ada671a..3a1231b5a6e5d 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/metrics/MinAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/metrics/MinAggregatorTests.java @@ -77,6 +77,7 @@ import org.opensearch.script.ScriptService; import org.opensearch.script.ScriptType; import org.opensearch.search.aggregations.AggregationBuilder; +import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.AggregatorTestCase; import org.opensearch.search.aggregations.BucketOrder; import org.opensearch.search.aggregations.bucket.filter.Filter; @@ -808,4 +809,22 @@ public void testDoReset() throws IOException { indexReader.close(); directory.close(); } + + public void testSupportsIntraSegmentSearch() throws IOException { + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.LONG); + try (Directory directory = newDirectory(); RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + indexWriter.addDocument(singleton(new NumericDocValuesField("value", 1))); + try (IndexReader reader = indexWriter.getReader()) { + IndexSearcher searcher = newIndexSearcher(reader); + AggregatorFactories factories = AggregatorFactories.builder() + .addAggregator(new MinAggregationBuilder("test").field("value")) + .build( + createSearchContext(searcher, createIndexSettings(), new MatchAllDocsQuery(), null, fieldType) + .getQueryShardContext(), + null + ); + assertTrue(factories.allFactoriesSupportIntraSegmentSearch()); + } + } + } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/metrics/StatsAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/metrics/StatsAggregatorTests.java index f8a6cf1ef07af..13ace3efe5785 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/metrics/StatsAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/metrics/StatsAggregatorTests.java @@ -55,6 +55,7 @@ import org.opensearch.script.ScriptService; import org.opensearch.script.ScriptType; import org.opensearch.search.aggregations.AggregationBuilder; +import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.AggregatorTestCase; import org.opensearch.search.aggregations.LeafBucketCollector; import org.opensearch.search.aggregations.support.AggregationInspectionHelper; @@ -518,4 +519,22 @@ public void testCollectRange() throws IOException { } } } + + public void testSupportsIntraSegmentSearch() throws IOException { + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.LONG); + try (Directory directory = newDirectory(); RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + indexWriter.addDocument(singleton(new NumericDocValuesField("value", 1))); + try (IndexReader reader = indexWriter.getReader()) { + IndexSearcher searcher = newIndexSearcher(reader); + AggregatorFactories factories = AggregatorFactories.builder() + .addAggregator(new StatsAggregationBuilder("test").field("value")) + .build( + createSearchContext(searcher, createIndexSettings(), new MatchAllDocsQuery(), null, fieldType) + .getQueryShardContext(), + null + ); + assertTrue(factories.allFactoriesSupportIntraSegmentSearch()); + } + } + } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/metrics/SumAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/metrics/SumAggregatorTests.java index 1eecd43aaee2f..db07c422174a1 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/metrics/SumAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/metrics/SumAggregatorTests.java @@ -63,6 +63,7 @@ import org.opensearch.script.ScriptService; import org.opensearch.script.ScriptType; import org.opensearch.search.aggregations.AggregationBuilder; +import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.AggregatorTestCase; import org.opensearch.search.aggregations.support.AggregationInspectionHelper; import org.opensearch.search.aggregations.support.CoreValuesSourceType; @@ -440,4 +441,21 @@ private static MappedFieldType defaultFieldType() { private static MappedFieldType defaultFieldType(NumberType numberType) { return new NumberFieldMapper.NumberFieldType(FIELD_NAME, numberType); } + + public void testSupportsIntraSegmentSearch() throws IOException { + try (Directory directory = newDirectory(); RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + indexWriter.addDocument(singleton(new NumericDocValuesField(FIELD_NAME, 1))); + try (IndexReader reader = indexWriter.getReader()) { + IndexSearcher searcher = newIndexSearcher(reader); + AggregatorFactories factories = AggregatorFactories.builder() + .addAggregator(new SumAggregationBuilder("test").field(FIELD_NAME)) + .build( + createSearchContext(searcher, createIndexSettings(), new MatchAllDocsQuery(), null, defaultFieldType()) + .getQueryShardContext(), + null + ); + assertTrue(factories.allFactoriesSupportIntraSegmentSearch()); + } + } + } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorTests.java index 7fbd1430a0c2a..7bc06d2f01fa8 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorTests.java @@ -41,9 +41,12 @@ import org.apache.lucene.document.SortedDocValuesField; import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.document.SortedSetDocValuesField; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.search.FieldExistsQuery; +import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; +import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; import org.apache.lucene.util.BytesRef; import org.opensearch.common.CheckedConsumer; @@ -66,6 +69,7 @@ import org.opensearch.script.ScriptService; import org.opensearch.script.ScriptType; import org.opensearch.search.aggregations.AggregationBuilder; +import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.AggregatorTestCase; import org.opensearch.search.aggregations.support.AggregationInspectionHelper; import org.opensearch.search.aggregations.support.CoreValuesSourceType; @@ -448,4 +452,22 @@ private static MappedFieldType createMappedFieldType(String name, ValueType valu throw new IllegalArgumentException("Test does not support value type [" + valueType + "]"); } } + + public void testSupportsIntraSegmentSearch() throws IOException { + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.LONG); + try (Directory directory = newDirectory(); RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + indexWriter.addDocument(singleton(new NumericDocValuesField("value", 1))); + try (IndexReader reader = indexWriter.getReader()) { + IndexSearcher searcher = newIndexSearcher(reader); + AggregatorFactories factories = AggregatorFactories.builder() + .addAggregator(new ValueCountAggregationBuilder("test").field("value")) + .build( + createSearchContext(searcher, createIndexSettings(), new MatchAllDocsQuery(), null, fieldType) + .getQueryShardContext(), + null + ); + assertTrue(factories.allFactoriesSupportIntraSegmentSearch()); + } + } + } } diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index a2bbc39a878cb..6c213ed775417 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -1646,6 +1646,24 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean ma } } + /** + * Indexes documents in bulk across the specified number of segments. Documents are evenly distributed + * across segments with a refresh between each batch to create segment boundaries. This is useful for + * tests that need multiple predefined segments for testing. + * + * @param builders the documents to index + * @param numSegments the number of segments to create + */ + public void indexBulkWithSegments(List builders, int numSegments) throws InterruptedException { + assert numSegments > 0 && numSegments <= builders.size() : "numSegments must be between 1 and builders.size()"; + int batchSize = builders.size() / numSegments; + for (int seg = 0; seg < numSegments; seg++) { + int from = seg * batchSize; + int to = (seg == numSegments - 1) ? builders.size() : from + batchSize; + indexRandom(true, false, false, builders.subList(from, to)); + } + } + /* * This method ingests bogus documents for the given indices such that multiple slices * are formed. This is useful for testing with the concurrent search use-case as it creates