Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Support pull-based ingestion message mappers and raw payload support ([#19765](https://github.com/opensearch-project/OpenSearch/pull/19765))
- Add search API tracker ([#18601](https://github.com/opensearch-project/OpenSearch/pull/18601))
- Support dynamic consumer configuration update in pull-based ingestion ([#19963](https://github.com/opensearch-project/OpenSearch/pull/19963))
- Cache the `StoredFieldsReader` for scroll query optimization ([#20112](https://github.com/opensearch-project/OpenSearch/pull/20112))

### Changed
- Combining filter rewrite and skip list to optimize sub aggregation([#19573](https://github.com/opensearch-project/OpenSearch/pull/19573))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.opensearch.OpenSearchException;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchPhaseExecutionException;
Expand Down Expand Up @@ -80,6 +81,15 @@
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;

/**
* <p>{@code @SuppressCodecs("*")} is needed because we cache StoredFieldsReader instances
* across scroll batches for sequential access. Different batches may run on different threads
* (but never concurrently). Lucene's AssertingStoredFieldsFormat enforces thread affinity
* that rejects this valid sequential cross-thread usage.
*
* @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object)
*/
@LuceneTestCase.SuppressCodecs("*")
public class TransportTwoNodesSearchIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {

public TransportTwoNodesSearchIT(Settings staticSettings) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.scroll;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.SearchHit;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;

import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;

/**
* Integration tests for scroll query StoredFieldsReader caching optimization.
*
* Tests verify that scroll queries correctly return all documents when using
* the sequential reader cache optimization for stored fields.
*/
@LuceneTestCase.SuppressCodecs("*")
public class ScrollStoredFieldsCacheIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {

public ScrollStoredFieldsCacheIT(Settings settings) {
super(settings);
}

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
);
}

/**
* Tests that scroll queries with sequential document access correctly utilize
* the StoredFieldsReader cache and return all documents without data corruption.
*/
public void testScrollWithSequentialReaderCache() throws Exception {
int numDocs = randomIntBetween(100, 500);
int scrollSize = randomIntBetween(10, 50);
createIndex("test", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build());
ensureGreen("test");
// Index documents
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("test")
.setId(Integer.toString(i))
.setSource(jsonBuilder().startObject().field("field", i).field("text", "document " + i).endObject())
.get();
}
refresh("test");
indexRandomForConcurrentSearch("test");
Set<String> retrievedIds = new HashSet<>();
SearchResponse searchResponse = client().prepareSearch("test")
.setQuery(matchAllQuery())
.setSize(scrollSize)
.setScroll(TimeValue.timeValueMinutes(2))
.addSort("field", SortOrder.ASC)
.get();

try {
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo((long) numDocs));
do {
for (SearchHit hit : searchResponse.getHits().getHits()) {
// Verify no duplicate documents
assertTrue("Duplicate document id: " + hit.getId(), retrievedIds.add(hit.getId()));
// Verify document content is correct _source field
assertNotNull(hit.getSourceAsMap());
assertEquals(Integer.parseInt(hit.getId()), hit.getSourceAsMap().get("field"));
}
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)).get();
assertNoFailures(searchResponse);
} while (searchResponse.getHits().getHits().length > 0);
// Verify all documents were retrieved
assertThat(retrievedIds.size(), equalTo(numDocs));
} finally {
clearScroll(searchResponse.getScrollId());
}
}

/**
* Tests scroll queries across multiple segments with batch sizes that
* trigger the sequential reader optimization (>= 10 docs).
*/
public void testScrollAcrossMultipleSegments() throws Exception {
int docsPerSegment = randomIntBetween(20, 50);
int numSegments = randomIntBetween(3, 5);
int totalDocs = docsPerSegment * numSegments;
int scrollSize = randomIntBetween(10, 50);
int expectedBatches = (totalDocs + scrollSize - 1) / scrollSize;
createIndex("test", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build());
ensureGreen("test");
// Index documents in batches to create multiple segments
for (int seg = 0; seg < numSegments; seg++) {
for (int i = 0; i < docsPerSegment; i++) {
int docId = seg * docsPerSegment + i;
client().prepareIndex("test")
.setId(Integer.toString(docId))
.setSource(jsonBuilder().startObject().field("field", docId).field("segment", seg).endObject())
.get();
}
refresh("test");
}
indexRandomForConcurrentSearch("test");
Set<String> retrievedIds = new HashSet<>();
SearchResponse searchResponse = client().prepareSearch("test")
.setQuery(matchAllQuery())
.setSize(scrollSize)
.setScroll(TimeValue.timeValueMinutes(2))
.addSort("field", SortOrder.ASC)
.get();
try {
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo((long) totalDocs));
int batchCount = 0;
do {
batchCount++;
for (SearchHit hit : searchResponse.getHits().getHits()) {
assertTrue("Duplicate document id: " + hit.getId(), retrievedIds.add(hit.getId()));
}
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)).get();
assertNoFailures(searchResponse);
} while (searchResponse.getHits().getHits().length > 0);
// Verify all documents retrieved
assertThat(retrievedIds.size(), equalTo(totalDocs));
// Verify exact batch count
assertThat(batchCount, equalTo(expectedBatches));
} finally {
clearScroll(searchResponse.getScrollId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.SearchPhaseExecutionException;
Expand Down Expand Up @@ -64,10 +65,14 @@
import org.junit.After;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
Expand All @@ -90,7 +95,15 @@

/**
* Tests for scrolling.
*
* <p>{@code @SuppressCodecs("*")} is needed because we cache StoredFieldsReader instances
* across scroll batches for sequential access. Different batches may run on different threads
* (but never concurrently). Lucene's AssertingStoredFieldsFormat enforces thread affinity
* that rejects this valid sequential cross-thread usage.
*
* @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object)
*/
@LuceneTestCase.SuppressCodecs("*")
public class SearchScrollIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {
public SearchScrollIT(Settings settings) {
super(settings);
Expand Down Expand Up @@ -822,6 +835,74 @@ public void testRestartDataNodesDuringScrollSearch() throws Exception {
client().prepareSearchScroll(respFromProdIndex.getScrollId()).get();
}

/**
* Tests that scroll queries with StoredFieldsReader caching return correct results
* across multiple batches. Verifies document order, content integrity, no duplicates,
* and no missing documents when using the sequential reader optimization.
*/
public void testScrollWithSequentialReaderCacheReturnsCorrectResults() throws Exception {
int numDocs = randomIntBetween(100, 300);
int scrollSize = randomIntBetween(10, 35);
client().admin()
.indices()
.prepareCreate("test")
.setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0))
.get();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().get();
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("test")
.setId(Integer.toString(i))
.setSource(jsonBuilder().startObject().field("field", i).field("text", "document number " + i).endObject())
.get();
}
client().admin().indices().prepareRefresh().get();
indexRandomForConcurrentSearch("test");
Set<Integer> retrievedIds = new HashSet<>();
List<Integer> retrievedOrder = new ArrayList<>();
SearchResponse searchResponse = client().prepareSearch("test")
.setQuery(matchAllQuery())
.setSize(scrollSize)
.setScroll(TimeValue.timeValueMinutes(2))
.addSort("field", SortOrder.ASC)
.get();
try {
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo((long) numDocs));
int expectedValue = 0;
int batchCount = 0;
do {
batchCount++;
for (SearchHit hit : searchResponse.getHits().getHits()) {
int docId = Integer.parseInt(hit.getId());
// Verify no duplicates
assertTrue("Duplicate document id: " + docId, retrievedIds.add(docId));
retrievedOrder.add(docId);
// Verify sort order
assertThat(
"Document out of order at position " + retrievedOrder.size(),
((Number) hit.getSortValues()[0]).intValue(),
equalTo(expectedValue)
);
// Verify stored field content matches document id
Map<String, Object> source = hit.getSourceAsMap();
assertThat("Field value mismatch for doc " + docId, source.get("field"), equalTo(docId));
assertThat("Text field mismatch for doc " + docId, source.get("text"), equalTo("document number " + docId));
expectedValue++;
}
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)).get();
assertNoFailures(searchResponse);
} while (searchResponse.getHits().getHits().length > 0);
// Verify all documents retrieved
assertThat("Not all documents retrieved", retrievedIds.size(), equalTo(numDocs));
assertThat("Multiple batches should have been used", batchCount, greaterThan(1));
// Verify complete sequence
for (int i = 0; i < numDocs; i++) {
assertTrue("Missing document: " + i, retrievedIds.contains(i));
}
} finally {
clearScroll(searchResponse.getScrollId());
}
}

private void assertToXContentResponse(ClearScrollResponse response, boolean succeed, int numFreed) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
response.toXContent(builder, ToXContent.EMPTY_PARAMS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
Expand All @@ -56,6 +57,15 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;

/**
* <p>{@code @SuppressCodecs("*")} is needed because we cache StoredFieldsReader instances
* across scroll batches for sequential access. Different batches may run on different threads
* (but never concurrently). Lucene's AssertingStoredFieldsFormat enforces thread affinity
* that rejects this valid sequential cross-thread usage.
*
* @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object)
*/
@LuceneTestCase.SuppressCodecs("*")
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 0)
public class SearchScrollWithFailingNodesIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {
public SearchScrollWithFailingNodesIT(Settings settings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.CreatePitAction;
Expand Down Expand Up @@ -69,6 +70,15 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.startsWith;

/**
* <p>{@code @SuppressCodecs("*")} is needed because we cache StoredFieldsReader instances
* across scroll batches for sequential access. Different batches may run on different threads
* (but never concurrently). Lucene's AssertingStoredFieldsFormat enforces thread affinity
* that rejects this valid sequential cross-thread usage.
*
* @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object)
*/
@LuceneTestCase.SuppressCodecs("*")
public class SearchSliceIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {
public SearchSliceIT(Settings staticSettings) {
super(staticSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
Expand Down Expand Up @@ -79,6 +80,15 @@
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

/**
* <p>{@code @SuppressCodecs("*")} is needed because we cache StoredFieldsReader instances
* across scroll batches for sequential access. Different batches may run on different threads
* (but never concurrently). Lucene's AssertingStoredFieldsFormat enforces thread affinity
* that rejects this valid sequential cross-thread usage.
*
* @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object)
*/
@LuceneTestCase.SuppressCodecs("*")
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, minNumDataNodes = 2)
public class SearchStatsIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.ReaderUtil;
import org.apache.lucene.search.DocIdSetIterator;
Expand Down Expand Up @@ -196,7 +197,20 @@ public void execute(SearchContext context, String profileDescription) {
// for random access and don't optimize for sequential access - except for merging.
// So we do a little hack here and pretend we're going to do merges in order to
// get better sequential access.
fieldReader = lf.getSequentialStoredFieldsReader()::document;
StoredFieldsReader sequentialReader;
// For scroll queries, try to get cached reader
if (context.scrollContext() != null) {
// To get a unique identifier for this segment to use as a cache key
Object segmentKey = lf.getCoreCacheHelper() != null ? lf.getCoreCacheHelper().getKey() : currentReaderContext;
sequentialReader = context.scrollContext().getCachedSequentialReader(segmentKey);
if (sequentialReader == null) {
sequentialReader = lf.getSequentialStoredFieldsReader();
context.scrollContext().cacheSequentialReader(segmentKey, sequentialReader);
}
} else {
sequentialReader = lf.getSequentialStoredFieldsReader();
}
fieldReader = sequentialReader::document;
} else {
fieldReader = currentReaderContext.reader().storedFields()::document;
}
Expand Down
Loading
Loading