Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
c749beb
intra segment support for query_string
prudhvigodithi Jan 29, 2026
0f62870
update changelog
prudhvigodithi Jan 29, 2026
0d5eb3c
Intra segment support for single-value metric aggregations
prudhvigodithi Feb 5, 2026
c8a7992
upstream fetch
prudhvigodithi Feb 5, 2026
42be1ed
update CHANGELOG.md
prudhvigodithi Feb 5, 2026
d654cc2
Upstream fetch
prudhvigodithi Feb 10, 2026
f0db14e
Add tests
prudhvigodithi Feb 11, 2026
aafbc28
Add tests
prudhvigodithi Feb 11, 2026
57af42e
Fix tests
prudhvigodithi Feb 17, 2026
1ea8af5
Temp revert intra for cardinality agg
prudhvigodithi Feb 19, 2026
7187fc3
Enable intra for cardinality agg
prudhvigodithi Feb 19, 2026
0e823af
test: Add unit tests for intra-segment search support
prudhvigodithi Feb 20, 2026
b6954da
Fix commit issue
prudhvigodithi Feb 20, 2026
398a5a3
Address github comments
prudhvigodithi Feb 20, 2026
32c81a6
Address github comments
prudhvigodithi Feb 20, 2026
bd366f3
Add wipeIndices to tests
prudhvigodithi Feb 20, 2026
e047a11
Merge remote-tracking branch 'upstream/main' into approx
prudhvigodithi Mar 9, 2026
5b63598
Update the IT tests
prudhvigodithi Mar 9, 2026
3eb0864
Update the IT tests
prudhvigodithi Mar 9, 2026
b14376d
Merge remote-tracking branch 'upstream/main' into approx
prudhvigodithi Mar 9, 2026
8b306ac
Fetch upstream
prudhvigodithi Mar 9, 2026
26d8a98
Edge case bug with indexBulkWithSegments
prudhvigodithi Mar 9, 2026
0ba1942
Address comments
prudhvigodithi Mar 9, 2026
9d380ad
Address comments
prudhvigodithi Mar 9, 2026
8767987
Merge remote-tracking branch 'upstream/main' into approx
prudhvigodithi Mar 12, 2026
e441c2a
Upstream fetch, fix conflicts
prudhvigodithi Mar 12, 2026
31440a7
Upstream fetch, fix conflicts
prudhvigodithi Mar 12, 2026
874fb79
Merge remote-tracking branch 'upstream/main' into approx
prudhvigodithi Mar 12, 2026
8923433
Merge remote-tracking branch 'upstream/main' into approx
prudhvigodithi Mar 12, 2026
bbe5ed1
Merge remote-tracking branch 'upstream/main' into approx
prudhvigodithi Mar 12, 2026
526f41f
Upstream fetch
prudhvigodithi Mar 12, 2026
fa13dd6
Merge remote-tracking branch 'upstream/main' into approx
prudhvigodithi Mar 12, 2026
5a88b05
Upstream fetch, fix conflicts
prudhvigodithi Mar 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Support TLS cert hot-reload for Arrow Flight transport ([#20700](https://github.com/opensearch-project/OpenSearch/pull/20700))
- [Workload Management] Enhance Scroll API support for autotagging ([#20151](https://github.com/opensearch-project/OpenSearch/pull/20151))
- Add indices to search request slowlog ([#20588](https://github.com/opensearch-project/OpenSearch/pull/20588))
- Add intra segment support for single-value metric aggregations ([#20503](https://github.com/opensearch-project/OpenSearch/pull/20503))
- Add mapper_settings support and field_mapping mapper type for pull-based ingestion([#20722](https://github.com/opensearch-project/OpenSearch/pull/20722))
- Add support for fields containing dots in their name as literals ([#19958](https://github.com/opensearch-project/OpenSearch/pull/19958))
- Add support for forward translog reading ([#20163](https://github.com/opensearch-project/OpenSearch/pull/20163))
Expand All @@ -33,12 +34,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Update to `almalinux:10` ([#20482](https://github.com/opensearch-project/OpenSearch/pull/20482))
- Add X-Request-Id to uniquely identify a search request ([#19798](https://github.com/opensearch-project/OpenSearch/pull/19798))
- Added TopN selection logic for streaming terms aggregations ([#20481](https://github.com/opensearch-project/OpenSearch/pull/20481))
- Added support for Intra Segment Search ([#19704](https://github.com/opensearch-project/OpenSearch/pull/19704))
- Introduce AdditionalCodecs and EnginePlugin::getAdditionalCodecs hook to allow additional Codec registration ([#20411](https://github.com/opensearch-project/OpenSearch/pull/20411))
- Introduced strategy planner interfaces for indexing and deletion ([#20585](https://github.com/opensearch-project/OpenSearch/pull/20585))
- Implement FieldMappingIngestionMessageMapper for pull-based ingestion ([#20729](https://github.com/opensearch-project/OpenSearch/pull/20729))
- Added support of WarmerRefreshListener in NRTReplicationEngine to trigger warmer after replication on replica shards ([#20650](https://github.com/opensearch-project/OpenSearch/pull/20650))
- WLM group custom search settings - groundwork and timeout ([#20536](https://github.com/opensearch-project/OpenSearch/issues/20536))
- Added support for Intra Segment Search ([#19704](https://github.com/opensearch-project/OpenSearch/pull/19704))

### Changed
- Make telemetry `Tags` immutable ([#20788](https://github.com/opensearch-project/OpenSearch/pull/20788))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations.metrics;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY;
import static org.opensearch.search.aggregations.AggregationBuilders.avg;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.notNullValue;

/**
* Integration tests for avg aggregation with concurrent segment search partition strategies.
*/
public class AvgIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {

public AvgIT(Settings staticSettings) {
super(staticSettings);
}

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "segment").build() },
new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "force").build() },
new Object[] {
Settings.builder()
.put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "balanced")
.put(CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE.getKey(), 1000)
.build() }
);
}

public void testAvgAggregation() throws Exception {
createIndex("test_avg_agg", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
try {
List<IndexRequestBuilder> builders = new ArrayList<>(5000);
for (int i = 0; i < 5000; i++) {
builders.add(client().prepareIndex("test_avg_agg").setSource("value", i + 1));
}
indexBulkWithSegments(builders, 2);
indexRandomForConcurrentSearch("test_avg_agg");
SearchResponse response = client().prepareSearch("test_avg_agg").addAggregation(avg("avg_agg").field("value")).get();
assertSearchResponse(response);
Avg avgAgg = response.getAggregations().get("avg_agg");
assertThat(avgAgg, notNullValue());
assertThat(avgAgg.getValue(), closeTo(2500.5, 0.1));
} finally {
internalCluster().wipeIndices("test_avg_agg");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

Expand All @@ -62,6 +64,8 @@
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.search.SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY;
import static org.opensearch.search.aggregations.AggregationBuilders.cardinality;
import static org.opensearch.search.aggregations.AggregationBuilders.global;
import static org.opensearch.search.aggregations.AggregationBuilders.terms;
Expand All @@ -82,7 +86,13 @@ public CardinalityIT(Settings staticSettings) {
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "segment").build() },
new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "force").build() },
new Object[] {
Settings.builder()
.put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "balanced")
.put(CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE.getKey(), 1000)
.build() }
);
}

Expand Down Expand Up @@ -666,4 +676,24 @@ public void testScriptCaching() throws Exception {
);
internalCluster().wipeIndices("cache_test_idx");
}

public void testCardinalityWithIntraSegmentPartitioning() throws Exception {
createIndex("test_cardinality_agg", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
try {
List<IndexRequestBuilder> builders = new ArrayList<>(5000);
for (int i = 0; i < 5000; i++) {
builders.add(client().prepareIndex("test_cardinality_agg").setSource("category", i % 100));
}
indexBulkWithSegments(builders, 2);
indexRandomForConcurrentSearch("test_cardinality_agg");
SearchResponse response = client().prepareSearch("test_cardinality_agg")
.addAggregation(cardinality("cardinality").field("category"))
.get();
Cardinality cardinalityAgg = response.getAggregations().get("cardinality");
assertThat(cardinalityAgg, notNullValue());
assertThat(cardinalityAgg.getValue(), equalTo(100L));
} finally {
internalCluster().wipeIndices("test_cardinality_agg");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations.metrics;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY;
import static org.opensearch.search.aggregations.AggregationBuilders.max;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.notNullValue;

/**
* Integration tests for max aggregation with concurrent segment search partition strategies.
*/
public class MaxIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {

public MaxIT(Settings staticSettings) {
super(staticSettings);
}

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "segment").build() },
new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "force").build() },
new Object[] {
Settings.builder()
.put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "balanced")
.put(CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE.getKey(), 1000)
.build() }
);
}

public void testMaxAggregation() throws Exception {
createIndex("test_max_agg", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
try {
List<IndexRequestBuilder> builders = new ArrayList<>(5000);
for (int i = 0; i < 5000; i++) {
builders.add(client().prepareIndex("test_max_agg").setSource("value", i + 1));
}
indexBulkWithSegments(builders, 2);
indexRandomForConcurrentSearch("test_max_agg");
SearchResponse response = client().prepareSearch("test_max_agg").addAggregation(max("max_agg").field("value")).get();
assertSearchResponse(response);
Max maxAgg = response.getAggregations().get("max_agg");
assertThat(maxAgg, notNullValue());
assertThat(maxAgg.getValue(), closeTo(5000.0, 0.1));
} finally {
internalCluster().wipeIndices("test_max_agg");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations.metrics;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY;
import static org.opensearch.search.aggregations.AggregationBuilders.min;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.notNullValue;

/**
* Integration tests for min aggregation with concurrent segment search partition strategies.
*/
public class MinIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {

public MinIT(Settings staticSettings) {
super(staticSettings);
}

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "segment").build() },
new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "force").build() },
new Object[] {
Settings.builder()
.put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "balanced")
.put(CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE.getKey(), 1000)
.build() }
);
}

public void testMinAggregation() throws Exception {
createIndex("test_min_agg", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
try {
List<IndexRequestBuilder> builders = new ArrayList<>(5000);
for (int i = 0; i < 5000; i++) {
builders.add(client().prepareIndex("test_min_agg").setSource("value", i + 1));
}
indexBulkWithSegments(builders, 2);
indexRandomForConcurrentSearch("test_min_agg");
SearchResponse response = client().prepareSearch("test_min_agg").addAggregation(min("min_agg").field("value")).get();
assertSearchResponse(response);
Min minAgg = response.getAggregations().get("min_agg");
assertThat(minAgg, notNullValue());
assertThat(minAgg.getValue(), closeTo(1.0, 0.1));
} finally {
internalCluster().wipeIndices("test_min_agg");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@

package org.opensearch.search.aggregations.metrics;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.common.settings.Settings;
Expand All @@ -46,12 +49,17 @@
import org.opensearch.search.aggregations.bucket.histogram.Histogram;
import org.opensearch.search.aggregations.bucket.terms.Terms;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.index.query.QueryBuilders.termQuery;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY;
import static org.opensearch.search.aggregations.AggregationBuilders.filter;
import static org.opensearch.search.aggregations.AggregationBuilders.global;
import static org.opensearch.search.aggregations.AggregationBuilders.histogram;
Expand All @@ -60,6 +68,7 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
Expand All @@ -70,6 +79,20 @@ public StatsIT(Settings staticSettings) {
super(staticSettings);
}

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "segment").build() },
new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "force").build() },
new Object[] {
Settings.builder()
.put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "balanced")
.put(CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE.getKey(), 1000)
.build() }
);
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singleton(AggregationTestScriptsPlugin.class);
Expand Down Expand Up @@ -390,4 +413,28 @@ public void testScriptCaching() throws Exception {
);
internalCluster().wipeIndices("cache_test_idx");
}

public void testStatsWithIntraSegmentPartitioning() throws Exception {
createIndex("test_stats_agg", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
try {
long expectedSum = 0;
List<IndexRequestBuilder> builders = new ArrayList<>(5000);
for (int i = 0; i < 5000; i++) {
expectedSum += (i + 1);
builders.add(client().prepareIndex("test_stats_agg").setSource("value", i + 1));
}
indexBulkWithSegments(builders, 2);
indexRandomForConcurrentSearch("test_stats_agg");
SearchResponse response = client().prepareSearch("test_stats_agg").addAggregation(stats("stats").field("value")).get();
Stats statsAgg = response.getAggregations().get("stats");
assertThat(statsAgg, notNullValue());
assertThat(statsAgg.getCount(), equalTo(5000L));
assertThat(statsAgg.getMin(), closeTo(1.0, 0.1));
assertThat(statsAgg.getMax(), closeTo(5000.0, 0.1));
assertThat(statsAgg.getSum(), closeTo((double) expectedSum, 0.1));
assertThat(statsAgg.getAvg(), closeTo((double) expectedSum / 5000, 0.1));
} finally {
internalCluster().wipeIndices("test_stats_agg");
}
}
}
Loading
Loading