Skip to content

Commit 00cf7c1

Browse files
Using cached StoredFieldsReader for scroll query optimizations (#20112)
* Scroll query optimizations Signed-off-by: Prudhvi Godithi <[email protected]> * spotless fix Signed-off-by: Prudhvi Godithi <[email protected]> * Scroll query optimizations Signed-off-by: Prudhvi Godithi <[email protected]> * spotless fix Signed-off-by: Prudhvi Godithi <[email protected]> * spotless fix Signed-off-by: Prudhvi Godithi <[email protected]> * Fix assertions Signed-off-by: Prudhvi Godithi <[email protected]> * Fix assertions Signed-off-by: Prudhvi Godithi <[email protected]> * Update tests Signed-off-by: Prudhvi Godithi <[email protected]> * update changelog Signed-off-by: Prudhvi Godithi <[email protected]> * Fix SearchSliceIT Signed-off-by: Prudhvi Godithi <[email protected]> * Fix RetryTests Signed-off-by: Prudhvi Godithi <[email protected]> --------- Signed-off-by: Prudhvi Godithi <[email protected]>
1 parent b7f013f commit 00cf7c1

File tree

12 files changed

+447
-2
lines changed

12 files changed

+447
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3535
- Support pull-based ingestion message mappers and raw payload support ([#19765](https://github.com/opensearch-project/OpenSearch/pull/19765))
3636
- Add search API tracker ([#18601](https://github.com/opensearch-project/OpenSearch/pull/18601))
3737
- Support dynamic consumer configuration update in pull-based ingestion ([#19963](https://github.com/opensearch-project/OpenSearch/pull/19963))
38+
- Cache the `StoredFieldsReader` for scroll query optimization ([#20112](https://github.com/opensearch-project/OpenSearch/pull/20112))
3839

3940
### Changed
4041
- Combining filter rewrite and skip list to optimize sub aggregation([#19573](https://github.com/opensearch-project/OpenSearch/pull/19573))

modules/reindex/src/test/java/org/opensearch/index/reindex/RetryTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.index.reindex;
3434

35+
import org.apache.lucene.tests.util.LuceneTestCase;
3536
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
3637
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
3738
import org.opensearch.action.bulk.BackoffPolicy;
@@ -68,7 +69,15 @@
6869
/**
6970
* Integration test for bulk retry behavior. Useful because retrying relies on the way that the
7071
* rest of OpenSearch throws exceptions and unit tests won't verify that.
72+
*
73+
* <p>{@code @SuppressCodecs("*")} is needed because we cache StoredFieldsReader instances
74+
* across scroll batches for sequential access. Different batches may run on different threads
75+
* (but never concurrently). Lucene's AssertingStoredFieldsFormat enforces thread affinity
76+
* that rejects this valid sequential cross-thread usage.
77+
*
78+
* @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object)
7179
*/
80+
@LuceneTestCase.SuppressCodecs("*")
7281
public class RetryTests extends OpenSearchIntegTestCase {
7382

7483
private static final int DOC_COUNT = 20;

server/src/internalClusterTest/java/org/opensearch/search/basic/TransportTwoNodesSearchIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
3636

37+
import org.apache.lucene.tests.util.LuceneTestCase;
3738
import org.opensearch.OpenSearchException;
3839
import org.opensearch.action.search.MultiSearchResponse;
3940
import org.opensearch.action.search.SearchPhaseExecutionException;
@@ -80,6 +81,15 @@
8081
import static org.hamcrest.Matchers.nullValue;
8182
import static org.hamcrest.Matchers.startsWith;
8283

84+
/**
85+
* <p>{@code @SuppressCodecs("*")} is needed because we cache StoredFieldsReader instances
86+
* across scroll batches for sequential access. Different batches may run on different threads
87+
* (but never concurrently). Lucene's AssertingStoredFieldsFormat enforces thread affinity
88+
* that rejects this valid sequential cross-thread usage.
89+
*
90+
* @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object)
91+
*/
92+
@LuceneTestCase.SuppressCodecs("*")
8393
public class TransportTwoNodesSearchIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {
8494

8595
public TransportTwoNodesSearchIT(Settings staticSettings) {
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search.scroll;
10+
11+
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
12+
13+
import org.apache.lucene.tests.util.LuceneTestCase;
14+
import org.opensearch.action.search.SearchResponse;
15+
import org.opensearch.common.settings.Settings;
16+
import org.opensearch.common.unit.TimeValue;
17+
import org.opensearch.search.SearchHit;
18+
import org.opensearch.search.sort.SortOrder;
19+
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
20+
21+
import java.util.Arrays;
22+
import java.util.Collection;
23+
import java.util.HashSet;
24+
import java.util.Set;
25+
26+
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
27+
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
28+
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
29+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;
30+
import static org.hamcrest.Matchers.equalTo;
31+
32+
/**
33+
* Integration tests for scroll query StoredFieldsReader caching optimization.
34+
*
35+
* Tests verify that scroll queries correctly return all documents when using
36+
* the sequential reader cache optimization for stored fields.
37+
*/
38+
@LuceneTestCase.SuppressCodecs("*")
39+
public class ScrollStoredFieldsCacheIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {
40+
41+
public ScrollStoredFieldsCacheIT(Settings settings) {
42+
super(settings);
43+
}
44+
45+
@ParametersFactory
46+
public static Collection<Object[]> parameters() {
47+
return Arrays.asList(
48+
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
49+
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
50+
);
51+
}
52+
53+
/**
54+
* Tests that scroll queries with sequential document access correctly utilize
55+
* the StoredFieldsReader cache and return all documents without data corruption.
56+
*/
57+
public void testScrollWithSequentialReaderCache() throws Exception {
58+
int numDocs = randomIntBetween(100, 500);
59+
int scrollSize = randomIntBetween(10, 50);
60+
createIndex("test", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build());
61+
ensureGreen("test");
62+
// Index documents
63+
for (int i = 0; i < numDocs; i++) {
64+
client().prepareIndex("test")
65+
.setId(Integer.toString(i))
66+
.setSource(jsonBuilder().startObject().field("field", i).field("text", "document " + i).endObject())
67+
.get();
68+
}
69+
refresh("test");
70+
indexRandomForConcurrentSearch("test");
71+
Set<String> retrievedIds = new HashSet<>();
72+
SearchResponse searchResponse = client().prepareSearch("test")
73+
.setQuery(matchAllQuery())
74+
.setSize(scrollSize)
75+
.setScroll(TimeValue.timeValueMinutes(2))
76+
.addSort("field", SortOrder.ASC)
77+
.get();
78+
79+
try {
80+
assertNoFailures(searchResponse);
81+
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo((long) numDocs));
82+
do {
83+
for (SearchHit hit : searchResponse.getHits().getHits()) {
84+
// Verify no duplicate documents
85+
assertTrue("Duplicate document id: " + hit.getId(), retrievedIds.add(hit.getId()));
86+
// Verify document content is correct _source field
87+
assertNotNull(hit.getSourceAsMap());
88+
assertEquals(Integer.parseInt(hit.getId()), hit.getSourceAsMap().get("field"));
89+
}
90+
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)).get();
91+
assertNoFailures(searchResponse);
92+
} while (searchResponse.getHits().getHits().length > 0);
93+
// Verify all documents were retrieved
94+
assertThat(retrievedIds.size(), equalTo(numDocs));
95+
} finally {
96+
clearScroll(searchResponse.getScrollId());
97+
}
98+
}
99+
100+
/**
101+
* Tests scroll queries across multiple segments with batch sizes that
102+
* trigger the sequential reader optimization (>= 10 docs).
103+
*/
104+
public void testScrollAcrossMultipleSegments() throws Exception {
105+
int docsPerSegment = randomIntBetween(20, 50);
106+
int numSegments = randomIntBetween(3, 5);
107+
int totalDocs = docsPerSegment * numSegments;
108+
int scrollSize = randomIntBetween(10, 50);
109+
int expectedBatches = (totalDocs + scrollSize - 1) / scrollSize;
110+
createIndex("test", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build());
111+
ensureGreen("test");
112+
// Index documents in batches to create multiple segments
113+
for (int seg = 0; seg < numSegments; seg++) {
114+
for (int i = 0; i < docsPerSegment; i++) {
115+
int docId = seg * docsPerSegment + i;
116+
client().prepareIndex("test")
117+
.setId(Integer.toString(docId))
118+
.setSource(jsonBuilder().startObject().field("field", docId).field("segment", seg).endObject())
119+
.get();
120+
}
121+
refresh("test");
122+
}
123+
indexRandomForConcurrentSearch("test");
124+
Set<String> retrievedIds = new HashSet<>();
125+
SearchResponse searchResponse = client().prepareSearch("test")
126+
.setQuery(matchAllQuery())
127+
.setSize(scrollSize)
128+
.setScroll(TimeValue.timeValueMinutes(2))
129+
.addSort("field", SortOrder.ASC)
130+
.get();
131+
try {
132+
assertNoFailures(searchResponse);
133+
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo((long) totalDocs));
134+
int batchCount = 0;
135+
do {
136+
batchCount++;
137+
for (SearchHit hit : searchResponse.getHits().getHits()) {
138+
assertTrue("Duplicate document id: " + hit.getId(), retrievedIds.add(hit.getId()));
139+
}
140+
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)).get();
141+
assertNoFailures(searchResponse);
142+
} while (searchResponse.getHits().getHits().length > 0);
143+
// Verify all documents retrieved
144+
assertThat(retrievedIds.size(), equalTo(totalDocs));
145+
// Verify exact batch count
146+
assertThat(batchCount, equalTo(expectedBatches));
147+
} finally {
148+
clearScroll(searchResponse.getScrollId());
149+
}
150+
}
151+
}

server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollIT.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
3636

37+
import org.apache.lucene.tests.util.LuceneTestCase;
3738
import org.opensearch.ExceptionsHelper;
3839
import org.opensearch.action.search.ClearScrollResponse;
3940
import org.opensearch.action.search.SearchPhaseExecutionException;
@@ -64,10 +65,14 @@
6465
import org.junit.After;
6566

6667
import java.io.IOException;
68+
import java.util.ArrayList;
6769
import java.util.Arrays;
6870
import java.util.Collection;
6971
import java.util.Collections;
72+
import java.util.HashSet;
73+
import java.util.List;
7074
import java.util.Map;
75+
import java.util.Set;
7176

7277
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
7378
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
@@ -90,7 +95,15 @@
9095

9196
/**
9297
* Tests for scrolling.
98+
*
99+
* <p>{@code @SuppressCodecs("*")} is needed because we cache StoredFieldsReader instances
100+
* across scroll batches for sequential access. Different batches may run on different threads
101+
* (but never concurrently). Lucene's AssertingStoredFieldsFormat enforces thread affinity
102+
* that rejects this valid sequential cross-thread usage.
103+
*
104+
* @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object)
93105
*/
106+
@LuceneTestCase.SuppressCodecs("*")
94107
public class SearchScrollIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {
95108
public SearchScrollIT(Settings settings) {
96109
super(settings);
@@ -822,6 +835,74 @@ public void testRestartDataNodesDuringScrollSearch() throws Exception {
822835
client().prepareSearchScroll(respFromProdIndex.getScrollId()).get();
823836
}
824837

838+
/**
839+
* Tests that scroll queries with StoredFieldsReader caching return correct results
840+
* across multiple batches. Verifies document order, content integrity, no duplicates,
841+
* and no missing documents when using the sequential reader optimization.
842+
*/
843+
public void testScrollWithSequentialReaderCacheReturnsCorrectResults() throws Exception {
844+
int numDocs = randomIntBetween(100, 300);
845+
int scrollSize = randomIntBetween(10, 35);
846+
client().admin()
847+
.indices()
848+
.prepareCreate("test")
849+
.setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0))
850+
.get();
851+
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().get();
852+
for (int i = 0; i < numDocs; i++) {
853+
client().prepareIndex("test")
854+
.setId(Integer.toString(i))
855+
.setSource(jsonBuilder().startObject().field("field", i).field("text", "document number " + i).endObject())
856+
.get();
857+
}
858+
client().admin().indices().prepareRefresh().get();
859+
indexRandomForConcurrentSearch("test");
860+
Set<Integer> retrievedIds = new HashSet<>();
861+
List<Integer> retrievedOrder = new ArrayList<>();
862+
SearchResponse searchResponse = client().prepareSearch("test")
863+
.setQuery(matchAllQuery())
864+
.setSize(scrollSize)
865+
.setScroll(TimeValue.timeValueMinutes(2))
866+
.addSort("field", SortOrder.ASC)
867+
.get();
868+
try {
869+
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo((long) numDocs));
870+
int expectedValue = 0;
871+
int batchCount = 0;
872+
do {
873+
batchCount++;
874+
for (SearchHit hit : searchResponse.getHits().getHits()) {
875+
int docId = Integer.parseInt(hit.getId());
876+
// Verify no duplicates
877+
assertTrue("Duplicate document id: " + docId, retrievedIds.add(docId));
878+
retrievedOrder.add(docId);
879+
// Verify sort order
880+
assertThat(
881+
"Document out of order at position " + retrievedOrder.size(),
882+
((Number) hit.getSortValues()[0]).intValue(),
883+
equalTo(expectedValue)
884+
);
885+
// Verify stored field content matches document id
886+
Map<String, Object> source = hit.getSourceAsMap();
887+
assertThat("Field value mismatch for doc " + docId, source.get("field"), equalTo(docId));
888+
assertThat("Text field mismatch for doc " + docId, source.get("text"), equalTo("document number " + docId));
889+
expectedValue++;
890+
}
891+
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)).get();
892+
assertNoFailures(searchResponse);
893+
} while (searchResponse.getHits().getHits().length > 0);
894+
// Verify all documents retrieved
895+
assertThat("Not all documents retrieved", retrievedIds.size(), equalTo(numDocs));
896+
assertThat("Multiple batches should have been used", batchCount, greaterThan(1));
897+
// Verify complete sequence
898+
for (int i = 0; i < numDocs; i++) {
899+
assertTrue("Missing document: " + i, retrievedIds.contains(i));
900+
}
901+
} finally {
902+
clearScroll(searchResponse.getScrollId());
903+
}
904+
}
905+
825906
private void assertToXContentResponse(ClearScrollResponse response, boolean succeed, int numFreed) throws IOException {
826907
XContentBuilder builder = XContentFactory.jsonBuilder();
827908
response.toXContent(builder, ToXContent.EMPTY_PARAMS);

server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollWithFailingNodesIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
3636

37+
import org.apache.lucene.tests.util.LuceneTestCase;
3738
import org.opensearch.action.index.IndexRequestBuilder;
3839
import org.opensearch.action.search.SearchResponse;
3940
import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
@@ -56,6 +57,15 @@
5657
import static org.hamcrest.Matchers.greaterThan;
5758
import static org.hamcrest.Matchers.lessThan;
5859

60+
/**
61+
* <p>{@code @SuppressCodecs("*")} is needed because we cache StoredFieldsReader instances
62+
* across scroll batches for sequential access. Different batches may run on different threads
63+
* (but never concurrently). Lucene's AssertingStoredFieldsFormat enforces thread affinity
64+
* that rejects this valid sequential cross-thread usage.
65+
*
66+
* @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object)
67+
*/
68+
@LuceneTestCase.SuppressCodecs("*")
5969
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 0)
6070
public class SearchScrollWithFailingNodesIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {
6171
public SearchScrollWithFailingNodesIT(Settings settings) {

server/src/internalClusterTest/java/org/opensearch/search/slice/SearchSliceIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
3636

37+
import org.apache.lucene.tests.util.LuceneTestCase;
3738
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;
3839
import org.opensearch.action.index.IndexRequestBuilder;
3940
import org.opensearch.action.search.CreatePitAction;
@@ -69,6 +70,15 @@
6970
import static org.hamcrest.Matchers.equalTo;
7071
import static org.hamcrest.Matchers.startsWith;
7172

73+
/**
74+
* <p>{@code @SuppressCodecs("*")} is needed because we cache StoredFieldsReader instances
75+
* across scroll batches for sequential access. Different batches may run on different threads
76+
* (but never concurrently). Lucene's AssertingStoredFieldsFormat enforces thread affinity
77+
* that rejects this valid sequential cross-thread usage.
78+
*
79+
* @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object)
80+
*/
81+
@LuceneTestCase.SuppressCodecs("*")
7282
public class SearchSliceIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {
7383
public SearchSliceIT(Settings staticSettings) {
7484
super(staticSettings);

server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
3636

37+
import org.apache.lucene.tests.util.LuceneTestCase;
3738
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
3839
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
3940
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
@@ -79,6 +80,15 @@
7980
import static org.hamcrest.Matchers.notNullValue;
8081
import static org.hamcrest.Matchers.nullValue;
8182

83+
/**
84+
* <p>{@code @SuppressCodecs("*")} is needed because we cache StoredFieldsReader instances
85+
* across scroll batches for sequential access. Different batches may run on different threads
86+
* (but never concurrently). Lucene's AssertingStoredFieldsFormat enforces thread affinity
87+
* that rejects this valid sequential cross-thread usage.
88+
*
89+
* @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object)
90+
*/
91+
@LuceneTestCase.SuppressCodecs("*")
8292
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, minNumDataNodes = 2)
8393
public class SearchStatsIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {
8494

0 commit comments

Comments
 (0)