diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e2be80e8201c..8b5654a9e0647 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Support pull-based ingestion message mappers and raw payload support ([#19765](https://github.com/opensearch-project/OpenSearch/pull/19765)) - Add search API tracker ([#18601](https://github.com/opensearch-project/OpenSearch/pull/18601)) - Support dynamic consumer configuration update in pull-based ingestion ([#19963](https://github.com/opensearch-project/OpenSearch/pull/19963)) +- Cache the `StoredFieldsReader` for scroll query optimization ([#20112](https://github.com/opensearch-project/OpenSearch/pull/20112)) ### Changed - Combining filter rewrite and skip list to optimize sub aggregation([#19573](https://github.com/opensearch-project/OpenSearch/pull/19573)) diff --git a/modules/reindex/src/test/java/org/opensearch/index/reindex/RetryTests.java b/modules/reindex/src/test/java/org/opensearch/index/reindex/RetryTests.java index 82371eff2fb13..6d5dfc293afdd 100644 --- a/modules/reindex/src/test/java/org/opensearch/index/reindex/RetryTests.java +++ b/modules/reindex/src/test/java/org/opensearch/index/reindex/RetryTests.java @@ -32,6 +32,7 @@ package org.opensearch.index.reindex; +import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.action.admin.cluster.node.info.NodeInfo; import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.opensearch.action.bulk.BackoffPolicy; @@ -68,7 +69,15 @@ /** * Integration test for bulk retry behavior. Useful because retrying relies on the way that the * rest of OpenSearch throws exceptions and unit tests won't verify that. + * + *

{@code @SuppressCodecs("*")} is needed because we cache StoredFieldsReader instances + * across scroll batches for sequential access. Different batches may run on different threads + * (but never concurrently). Lucene's AssertingStoredFieldsFormat enforces thread affinity + * that rejects this valid sequential cross-thread usage. + * + * @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object) */ +@LuceneTestCase.SuppressCodecs("*") public class RetryTests extends OpenSearchIntegTestCase { private static final int DOC_COUNT = 20; diff --git a/server/src/internalClusterTest/java/org/opensearch/search/basic/TransportTwoNodesSearchIT.java b/server/src/internalClusterTest/java/org/opensearch/search/basic/TransportTwoNodesSearchIT.java index cc88d399932c8..d4cd5d63edf0c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/basic/TransportTwoNodesSearchIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/basic/TransportTwoNodesSearchIT.java @@ -34,6 +34,7 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.OpenSearchException; import org.opensearch.action.search.MultiSearchResponse; import org.opensearch.action.search.SearchPhaseExecutionException; @@ -80,6 +81,15 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.startsWith; +/** + *

{@code @SuppressCodecs("*")} is needed because we cache StoredFieldsReader instances + * across scroll batches for sequential access. Different batches may run on different threads + * (but never concurrently). Lucene's AssertingStoredFieldsFormat enforces thread affinity + * that rejects this valid sequential cross-thread usage. + * + * @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object) + */ +@LuceneTestCase.SuppressCodecs("*") public class TransportTwoNodesSearchIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { public TransportTwoNodesSearchIT(Settings staticSettings) { diff --git a/server/src/internalClusterTest/java/org/opensearch/search/scroll/ScrollStoredFieldsCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/search/scroll/ScrollStoredFieldsCacheIT.java new file mode 100644 index 0000000000000..448e96d8322c4 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/search/scroll/ScrollStoredFieldsCacheIT.java @@ -0,0 +1,151 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.scroll; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.apache.lucene.tests.util.LuceneTestCase; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.search.SearchHit; +import org.opensearch.search.sort.SortOrder; +import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.opensearch.index.query.QueryBuilders.matchAllQuery; +import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.equalTo; + +/** + * Integration tests for scroll query StoredFieldsReader caching optimization. + * + * Tests verify that scroll queries correctly return all documents when using + * the sequential reader cache optimization for stored fields. + */ +@LuceneTestCase.SuppressCodecs("*") +public class ScrollStoredFieldsCacheIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { + + public ScrollStoredFieldsCacheIT(Settings settings) { + super(settings); + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() } + ); + } + + /** + * Tests that scroll queries with sequential document access correctly utilize + * the StoredFieldsReader cache and return all documents without data corruption. + */ + public void testScrollWithSequentialReaderCache() throws Exception { + int numDocs = randomIntBetween(100, 500); + int scrollSize = randomIntBetween(10, 50); + createIndex("test", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build()); + ensureGreen("test"); + // Index documents + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("test") + .setId(Integer.toString(i)) + .setSource(jsonBuilder().startObject().field("field", i).field("text", "document " + i).endObject()) + .get(); + } + refresh("test"); + indexRandomForConcurrentSearch("test"); + Set retrievedIds = new HashSet<>(); + SearchResponse searchResponse = client().prepareSearch("test") + .setQuery(matchAllQuery()) + .setSize(scrollSize) + .setScroll(TimeValue.timeValueMinutes(2)) + .addSort("field", SortOrder.ASC) + .get(); + + try { + assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().getTotalHits().value(), equalTo((long) numDocs)); + do { + for (SearchHit hit : searchResponse.getHits().getHits()) { + // Verify no duplicate documents + assertTrue("Duplicate document id: " + hit.getId(), retrievedIds.add(hit.getId())); + // Verify document content is correct _source field + assertNotNull(hit.getSourceAsMap()); + assertEquals(Integer.parseInt(hit.getId()), hit.getSourceAsMap().get("field")); + } + searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)).get(); + assertNoFailures(searchResponse); + } while (searchResponse.getHits().getHits().length > 0); + // Verify all documents were retrieved + assertThat(retrievedIds.size(), equalTo(numDocs)); + } finally { + clearScroll(searchResponse.getScrollId()); + } + } + + /** + * Tests scroll queries across multiple segments with batch sizes that + * trigger the sequential reader optimization (>= 10 docs). + */ + public void testScrollAcrossMultipleSegments() throws Exception { + int docsPerSegment = randomIntBetween(20, 50); + int numSegments = randomIntBetween(3, 5); + int totalDocs = docsPerSegment * numSegments; + int scrollSize = randomIntBetween(10, 50); + int expectedBatches = (totalDocs + scrollSize - 1) / scrollSize; + createIndex("test", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build()); + ensureGreen("test"); + // Index documents in batches to create multiple segments + for (int seg = 0; seg < numSegments; seg++) { + for (int i = 0; i < docsPerSegment; i++) { + int docId = seg * docsPerSegment + i; + client().prepareIndex("test") + .setId(Integer.toString(docId)) + .setSource(jsonBuilder().startObject().field("field", docId).field("segment", seg).endObject()) + .get(); + } + refresh("test"); + } + indexRandomForConcurrentSearch("test"); + Set retrievedIds = new HashSet<>(); + SearchResponse searchResponse = client().prepareSearch("test") + .setQuery(matchAllQuery()) + .setSize(scrollSize) + .setScroll(TimeValue.timeValueMinutes(2)) + .addSort("field", SortOrder.ASC) + .get(); + try { + assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().getTotalHits().value(), equalTo((long) totalDocs)); + int batchCount = 0; + do { + batchCount++; + for (SearchHit hit : searchResponse.getHits().getHits()) { + assertTrue("Duplicate document id: " + hit.getId(), retrievedIds.add(hit.getId())); + } + searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)).get(); + assertNoFailures(searchResponse); + } while (searchResponse.getHits().getHits().length > 0); + // Verify all documents retrieved + assertThat(retrievedIds.size(), equalTo(totalDocs)); + // Verify exact batch count + assertThat(batchCount, equalTo(expectedBatches)); + } finally { + clearScroll(searchResponse.getScrollId()); + } + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollIT.java b/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollIT.java index a99ccae1ffdd1..ef97f61a8f069 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollIT.java @@ -34,6 +34,7 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.ExceptionsHelper; import org.opensearch.action.search.ClearScrollResponse; import org.opensearch.action.search.SearchPhaseExecutionException; @@ -64,10 +65,14 @@ import org.junit.After; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; @@ -90,7 +95,15 @@ /** * Tests for scrolling. + * + *

{@code @SuppressCodecs("*")} is needed because we cache StoredFieldsReader instances + * across scroll batches for sequential access. Different batches may run on different threads + * (but never concurrently). Lucene's AssertingStoredFieldsFormat enforces thread affinity + * that rejects this valid sequential cross-thread usage. + * + * @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object) */ +@LuceneTestCase.SuppressCodecs("*") public class SearchScrollIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { public SearchScrollIT(Settings settings) { super(settings); @@ -822,6 +835,74 @@ public void testRestartDataNodesDuringScrollSearch() throws Exception { client().prepareSearchScroll(respFromProdIndex.getScrollId()).get(); } + /** + * Tests that scroll queries with StoredFieldsReader caching return correct results + * across multiple batches. Verifies document order, content integrity, no duplicates, + * and no missing documents when using the sequential reader optimization. + */ + public void testScrollWithSequentialReaderCacheReturnsCorrectResults() throws Exception { + int numDocs = randomIntBetween(100, 300); + int scrollSize = randomIntBetween(10, 35); + client().admin() + .indices() + .prepareCreate("test") + .setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)) + .get(); + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().get(); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("test") + .setId(Integer.toString(i)) + .setSource(jsonBuilder().startObject().field("field", i).field("text", "document number " + i).endObject()) + .get(); + } + client().admin().indices().prepareRefresh().get(); + indexRandomForConcurrentSearch("test"); + Set retrievedIds = new HashSet<>(); + List retrievedOrder = new ArrayList<>(); + SearchResponse searchResponse = client().prepareSearch("test") + .setQuery(matchAllQuery()) + .setSize(scrollSize) + .setScroll(TimeValue.timeValueMinutes(2)) + .addSort("field", SortOrder.ASC) + .get(); + try { + assertThat(searchResponse.getHits().getTotalHits().value(), equalTo((long) numDocs)); + int expectedValue = 0; + int batchCount = 0; + do { + batchCount++; + for (SearchHit hit : searchResponse.getHits().getHits()) { + int docId = Integer.parseInt(hit.getId()); + // Verify no duplicates + assertTrue("Duplicate document id: " + docId, retrievedIds.add(docId)); + retrievedOrder.add(docId); + // Verify sort order + assertThat( + "Document out of order at position " + retrievedOrder.size(), + ((Number) hit.getSortValues()[0]).intValue(), + equalTo(expectedValue) + ); + // Verify stored field content matches document id + Map source = hit.getSourceAsMap(); + assertThat("Field value mismatch for doc " + docId, source.get("field"), equalTo(docId)); + assertThat("Text field mismatch for doc " + docId, source.get("text"), equalTo("document number " + docId)); + expectedValue++; + } + searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)).get(); + assertNoFailures(searchResponse); + } while (searchResponse.getHits().getHits().length > 0); + // Verify all documents retrieved + assertThat("Not all documents retrieved", retrievedIds.size(), equalTo(numDocs)); + assertThat("Multiple batches should have been used", batchCount, greaterThan(1)); + // Verify complete sequence + for (int i = 0; i < numDocs; i++) { + assertTrue("Missing document: " + i, retrievedIds.contains(i)); + } + } finally { + clearScroll(searchResponse.getScrollId()); + } + } + private void assertToXContentResponse(ClearScrollResponse response, boolean succeed, int numFreed) throws IOException { XContentBuilder builder = XContentFactory.jsonBuilder(); response.toXContent(builder, ToXContent.EMPTY_PARAMS); diff --git a/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollWithFailingNodesIT.java b/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollWithFailingNodesIT.java index 38f65c8c2d0da..5206d3269b68a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollWithFailingNodesIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollWithFailingNodesIT.java @@ -34,6 +34,7 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.search.SearchResponse; import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; @@ -56,6 +57,15 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.lessThan; +/** + *

{@code @SuppressCodecs("*")} is needed because we cache StoredFieldsReader instances + * across scroll batches for sequential access. Different batches may run on different threads + * (but never concurrently). Lucene's AssertingStoredFieldsFormat enforces thread affinity + * that rejects this valid sequential cross-thread usage. + * + * @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object) + */ +@LuceneTestCase.SuppressCodecs("*") @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 0) public class SearchScrollWithFailingNodesIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { public SearchScrollWithFailingNodesIT(Settings settings) { diff --git a/server/src/internalClusterTest/java/org/opensearch/search/slice/SearchSliceIT.java b/server/src/internalClusterTest/java/org/opensearch/search/slice/SearchSliceIT.java index b381d7cbc18f8..ee3ab01fcdf32 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/slice/SearchSliceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/slice/SearchSliceIT.java @@ -34,6 +34,7 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest; import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.search.CreatePitAction; @@ -69,6 +70,15 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.startsWith; +/** + *

{@code @SuppressCodecs("*")} is needed because we cache StoredFieldsReader instances + * across scroll batches for sequential access. Different batches may run on different threads + * (but never concurrently). Lucene's AssertingStoredFieldsFormat enforces thread affinity + * that rejects this valid sequential cross-thread usage. + * + * @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object) + */ +@LuceneTestCase.SuppressCodecs("*") public class SearchSliceIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { public SearchSliceIT(Settings staticSettings) { super(staticSettings); diff --git a/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java index 99cb3a4e8ca20..428d9f4afc143 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java @@ -34,6 +34,7 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; @@ -79,6 +80,15 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +/** + *

{@code @SuppressCodecs("*")} is needed because we cache StoredFieldsReader instances + * across scroll batches for sequential access. Different batches may run on different threads + * (but never concurrently). Lucene's AssertingStoredFieldsFormat enforces thread affinity + * that rejects this valid sequential cross-thread usage. + * + * @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object) + */ +@LuceneTestCase.SuppressCodecs("*") @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, minNumDataNodes = 2) public class SearchStatsIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { diff --git a/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java b/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java index 22756432bd6b4..67c5d509c5d09 100644 --- a/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java +++ b/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.StoredFieldsReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.ReaderUtil; import org.apache.lucene.search.DocIdSetIterator; @@ -196,7 +197,20 @@ public void execute(SearchContext context, String profileDescription) { // for random access and don't optimize for sequential access - except for merging. // So we do a little hack here and pretend we're going to do merges in order to // get better sequential access. - fieldReader = lf.getSequentialStoredFieldsReader()::document; + StoredFieldsReader sequentialReader; + // For scroll queries, try to get cached reader + if (context.scrollContext() != null) { + // To get a unique identifier for this segment to use as a cache key + Object segmentKey = lf.getCoreCacheHelper() != null ? lf.getCoreCacheHelper().getKey() : currentReaderContext; + sequentialReader = context.scrollContext().getCachedSequentialReader(segmentKey); + if (sequentialReader == null) { + sequentialReader = lf.getSequentialStoredFieldsReader(); + context.scrollContext().cacheSequentialReader(segmentKey, sequentialReader); + } + } else { + sequentialReader = lf.getSequentialStoredFieldsReader(); + } + fieldReader = sequentialReader::document; } else { fieldReader = currentReaderContext.reader().storedFields()::document; } diff --git a/server/src/main/java/org/opensearch/search/internal/LegacyReaderContext.java b/server/src/main/java/org/opensearch/search/internal/LegacyReaderContext.java index 05ab12d5ae809..0df1e46aa1bd5 100644 --- a/server/src/main/java/org/opensearch/search/internal/LegacyReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/LegacyReaderContext.java @@ -82,6 +82,7 @@ public LegacyReaderContext( () -> {} ); this.scrollContext = new ScrollContext(); + addOnClose(this.scrollContext); } else { this.scrollContext = null; this.searcher = null; diff --git a/server/src/main/java/org/opensearch/search/internal/ScrollContext.java b/server/src/main/java/org/opensearch/search/internal/ScrollContext.java index e3517756ced6e..1b8056f7247c9 100644 --- a/server/src/main/java/org/opensearch/search/internal/ScrollContext.java +++ b/server/src/main/java/org/opensearch/search/internal/ScrollContext.java @@ -32,20 +32,60 @@ package org.opensearch.search.internal; +import org.apache.lucene.codecs.StoredFieldsReader; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TotalHits; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.lease.Releasable; import org.opensearch.search.Scroll; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + /** * Wrapper around information that needs to stay around when scrolling. * * @opensearch.api */ @PublicApi(since = "1.0.0") -public final class ScrollContext { +public final class ScrollContext implements Releasable { public TotalHits totalHits = null; public float maxScore = Float.NaN; public ScoreDoc lastEmittedDoc; public Scroll scroll; + + /** + * Cache for sequential stored field readers per segment. + * These readers are optimized for sequential access and cache decompressed blocks. + * + * Thread-safety note: Scroll requests are serialized (client waits for response before + * sending next request), so while different threads may use this cache, they won't + * access it concurrently. The underlying StoredFieldsReader has mutable state (BlockState) + * but is safe for sequential single-threaded access across different threads. + */ + private Map sequentialReaderCache; + + public StoredFieldsReader getCachedSequentialReader(Object segmentKey) { + return sequentialReaderCache != null ? sequentialReaderCache.get(segmentKey) : null; + } + + public void cacheSequentialReader(Object segmentKey, StoredFieldsReader reader) { + if (sequentialReaderCache == null) { + sequentialReaderCache = new HashMap<>(); + } + sequentialReaderCache.put(segmentKey, reader); + } + + @Override + public void close() { + if (sequentialReaderCache != null) { + for (StoredFieldsReader reader : sequentialReaderCache.values()) { + try { + reader.close(); + } catch (IOException e) {} + } + sequentialReaderCache = null; + } + } } diff --git a/server/src/test/java/org/opensearch/search/internal/ScrollContextReaderCacheTests.java b/server/src/test/java/org/opensearch/search/internal/ScrollContextReaderCacheTests.java new file mode 100644 index 0000000000000..b9d94b1b9ab29 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/internal/ScrollContextReaderCacheTests.java @@ -0,0 +1,108 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.internal; + +import org.apache.lucene.codecs.StoredFieldsReader; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Unit tests for {@link ScrollContext}'s StoredFieldsReader caching functionality. + * + * The cache stores merge instances of StoredFieldsReader per segment to optimize + * sequential document access during scroll queries by avoiding repeated decompression + * of stored field blocks. + */ +public class ScrollContextReaderCacheTests extends OpenSearchTestCase { + + public void testCachePutAndGet() throws IOException { + ScrollContext scrollContext = new ScrollContext(); + Object segmentKey1 = new Object(); + Object segmentKey2 = new Object(); + StoredFieldsReader reader1 = mock(StoredFieldsReader.class); + StoredFieldsReader reader2 = mock(StoredFieldsReader.class); + // Initially no cached readers + assertNull(scrollContext.getCachedSequentialReader(segmentKey1)); + assertNull(scrollContext.getCachedSequentialReader(segmentKey2)); + // Cache reader for segment 1 + scrollContext.cacheSequentialReader(segmentKey1, reader1); + assertSame(reader1, scrollContext.getCachedSequentialReader(segmentKey1)); + assertNull(scrollContext.getCachedSequentialReader(segmentKey2)); + // Cache reader for segment 2 + scrollContext.cacheSequentialReader(segmentKey2, reader2); + assertSame(reader1, scrollContext.getCachedSequentialReader(segmentKey1)); + assertSame(reader2, scrollContext.getCachedSequentialReader(segmentKey2)); + scrollContext.close(); + // Verify both readers were closed + verify(reader1, times(1)).close(); + verify(reader2, times(1)).close(); + // After close, cache should be cleared + assertNull(scrollContext.getCachedSequentialReader(segmentKey1)); + assertNull(scrollContext.getCachedSequentialReader(segmentKey2)); + } + + public void testCloseWithEmptyCache() { + ScrollContext scrollContext = new ScrollContext(); + // Should not throw when closing with no cached readers + scrollContext.close(); + assertNull(scrollContext.getCachedSequentialReader(new Object())); + } + + public void testCloseHandlesReaderException() throws IOException { + ScrollContext scrollContext = new ScrollContext(); + Object segmentKey1 = new Object(); + Object segmentKey2 = new Object(); + StoredFieldsReader reader1 = mock(StoredFieldsReader.class); + StoredFieldsReader reader2 = mock(StoredFieldsReader.class); + // Make reader1 throw on close + doThrow(new IOException("test exception")).when(reader1).close(); + scrollContext.cacheSequentialReader(segmentKey1, reader1); + scrollContext.cacheSequentialReader(segmentKey2, reader2); + // Should not throw, should continue closing other readers + scrollContext.close(); + // Both readers should have been attempted to close + verify(reader1, times(1)).close(); + verify(reader2, times(1)).close(); + } + + public void testMultipleSegmentsCached() throws IOException { + ScrollContext scrollContext = new ScrollContext(); + int numSegments = randomIntBetween(3, 10); + Object[] segmentKeys = new Object[numSegments]; + StoredFieldsReader[] readers = new StoredFieldsReader[numSegments]; + // Cache readers for all segments (simulates first batch per segment) + for (int i = 0; i < numSegments; i++) { + segmentKeys[i] = new Object(); + readers[i] = mock(StoredFieldsReader.class); + assertNull(scrollContext.getCachedSequentialReader(segmentKeys[i])); + scrollContext.cacheSequentialReader(segmentKeys[i], readers[i]); + } + // Simulate multiple scroll batches - all should hit cache + for (int batch = 2; batch <= 5; batch++) { + for (int i = 0; i < numSegments; i++) { + assertSame( + "Batch " + batch + ", segment " + i + " should hit cache", + readers[i], + scrollContext.getCachedSequentialReader(segmentKeys[i]) + ); + } + } + scrollContext.close(); + // Verify all readers were closed + for (int i = 0; i < numSegments; i++) { + verify(readers[i], times(1)).close(); + } + } +}