From 56dca5e806f4552f97cc9d9d37333c9c290074b2 Mon Sep 17 00:00:00 2001 From: Prudhvi Godithi Date: Thu, 27 Nov 2025 13:47:01 -0800 Subject: [PATCH 01/11] Scroll query optimizations Signed-off-by: Prudhvi Godithi --- .../SequentialStoredFieldsLeafReader.java | 38 +++++++++++++++---- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/lucene/index/SequentialStoredFieldsLeafReader.java b/server/src/main/java/org/opensearch/common/lucene/index/SequentialStoredFieldsLeafReader.java index 3e6630af074fd..1c0b25f751354 100644 --- a/server/src/main/java/org/opensearch/common/lucene/index/SequentialStoredFieldsLeafReader.java +++ b/server/src/main/java/org/opensearch/common/lucene/index/SequentialStoredFieldsLeafReader.java @@ -49,6 +49,8 @@ * @opensearch.internal */ public abstract class SequentialStoredFieldsLeafReader extends FilterLeafReader { + + private StoredFieldsReader cachedSequentialReader; /** *

Construct a StoredFieldsFilterLeafReader based on the specified base reader. *

Note that base reader is closed if this FilterLeafReader is closed.

@@ -69,14 +71,34 @@ public SequentialStoredFieldsLeafReader(LeafReader in) { * Returns a {@link StoredFieldsReader} optimized for sequential access (adjacent doc ids). */ public StoredFieldsReader getSequentialStoredFieldsReader() throws IOException { - if (in instanceof CodecReader) { - CodecReader reader = (CodecReader) in; - return doGetSequentialStoredFieldsReader(reader.getFieldsReader().getMergeInstance()); - } else if (in instanceof SequentialStoredFieldsLeafReader) { - SequentialStoredFieldsLeafReader reader = (SequentialStoredFieldsLeafReader) in; - return doGetSequentialStoredFieldsReader(reader.getSequentialStoredFieldsReader()); - } else { - throw new IOException("requires a CodecReader or a SequentialStoredFieldsLeafReader, got " + in.getClass()); + if (cachedSequentialReader == null) { + if (in instanceof CodecReader) { + CodecReader reader = (CodecReader) in; + cachedSequentialReader = doGetSequentialStoredFieldsReader( + reader.getFieldsReader().getMergeInstance() + ); + } else if (in instanceof SequentialStoredFieldsLeafReader) { + SequentialStoredFieldsLeafReader reader = (SequentialStoredFieldsLeafReader) in; + cachedSequentialReader = doGetSequentialStoredFieldsReader( + reader.getSequentialStoredFieldsReader() + ); + } else { + throw new IOException( + "requires a CodecReader or a SequentialStoredFieldsLeafReader, got " + in.getClass() + ); + } + } + return cachedSequentialReader; + } + + @Override + protected void doClose() throws IOException { + try { + if (cachedSequentialReader != null) { + cachedSequentialReader.close(); + } + } finally { + super.doClose(); } } From c5e1e1913002a1d3bfd1d7c1aa2ac03e15d57ce6 Mon Sep 17 00:00:00 2001 From: Prudhvi Godithi Date: Thu, 27 Nov 2025 22:21:56 -0800 Subject: [PATCH 02/11] spotless fix Signed-off-by: Prudhvi Godithi --- .../index/SequentialStoredFieldsLeafReader.java | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/lucene/index/SequentialStoredFieldsLeafReader.java b/server/src/main/java/org/opensearch/common/lucene/index/SequentialStoredFieldsLeafReader.java index 1c0b25f751354..db135de959a96 100644 --- a/server/src/main/java/org/opensearch/common/lucene/index/SequentialStoredFieldsLeafReader.java +++ b/server/src/main/java/org/opensearch/common/lucene/index/SequentialStoredFieldsLeafReader.java @@ -51,6 +51,7 @@ public abstract class SequentialStoredFieldsLeafReader extends FilterLeafReader { private StoredFieldsReader cachedSequentialReader; + /** *

Construct a StoredFieldsFilterLeafReader based on the specified base reader. *

Note that base reader is closed if this FilterLeafReader is closed.

@@ -74,18 +75,12 @@ public StoredFieldsReader getSequentialStoredFieldsReader() throws IOException { if (cachedSequentialReader == null) { if (in instanceof CodecReader) { CodecReader reader = (CodecReader) in; - cachedSequentialReader = doGetSequentialStoredFieldsReader( - reader.getFieldsReader().getMergeInstance() - ); + cachedSequentialReader = doGetSequentialStoredFieldsReader(reader.getFieldsReader().getMergeInstance()); } else if (in instanceof SequentialStoredFieldsLeafReader) { SequentialStoredFieldsLeafReader reader = (SequentialStoredFieldsLeafReader) in; - cachedSequentialReader = doGetSequentialStoredFieldsReader( - reader.getSequentialStoredFieldsReader() - ); + cachedSequentialReader = doGetSequentialStoredFieldsReader(reader.getSequentialStoredFieldsReader()); } else { - throw new IOException( - "requires a CodecReader or a SequentialStoredFieldsLeafReader, got " + in.getClass() - ); + throw new IOException("requires a CodecReader or a SequentialStoredFieldsLeafReader, got " + in.getClass()); } } return cachedSequentialReader; From 9a047cd0e3bcc75a518aeea01d47d121c9d605a6 Mon Sep 17 00:00:00 2001 From: Prudhvi Godithi Date: Sat, 29 Nov 2025 19:51:27 -0800 Subject: [PATCH 03/11] Scroll query optimizations Signed-off-by: Prudhvi Godithi --- .../SequentialStoredFieldsLeafReader.java | 33 +++++------------- .../opensearch/search/fetch/FetchPhase.java | 20 ++++++++--- .../search/internal/LegacyReaderContext.java | 1 + .../search/internal/ScrollContext.java | 34 +++++++++++++++++-- 4 files changed, 56 insertions(+), 32 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/lucene/index/SequentialStoredFieldsLeafReader.java b/server/src/main/java/org/opensearch/common/lucene/index/SequentialStoredFieldsLeafReader.java index db135de959a96..3e6630af074fd 100644 --- a/server/src/main/java/org/opensearch/common/lucene/index/SequentialStoredFieldsLeafReader.java +++ b/server/src/main/java/org/opensearch/common/lucene/index/SequentialStoredFieldsLeafReader.java @@ -49,9 +49,6 @@ * @opensearch.internal */ public abstract class SequentialStoredFieldsLeafReader extends FilterLeafReader { - - private StoredFieldsReader cachedSequentialReader; - /** *

Construct a StoredFieldsFilterLeafReader based on the specified base reader. *

Note that base reader is closed if this FilterLeafReader is closed.

@@ -72,28 +69,14 @@ public SequentialStoredFieldsLeafReader(LeafReader in) { * Returns a {@link StoredFieldsReader} optimized for sequential access (adjacent doc ids). */ public StoredFieldsReader getSequentialStoredFieldsReader() throws IOException { - if (cachedSequentialReader == null) { - if (in instanceof CodecReader) { - CodecReader reader = (CodecReader) in; - cachedSequentialReader = doGetSequentialStoredFieldsReader(reader.getFieldsReader().getMergeInstance()); - } else if (in instanceof SequentialStoredFieldsLeafReader) { - SequentialStoredFieldsLeafReader reader = (SequentialStoredFieldsLeafReader) in; - cachedSequentialReader = doGetSequentialStoredFieldsReader(reader.getSequentialStoredFieldsReader()); - } else { - throw new IOException("requires a CodecReader or a SequentialStoredFieldsLeafReader, got " + in.getClass()); - } - } - return cachedSequentialReader; - } - - @Override - protected void doClose() throws IOException { - try { - if (cachedSequentialReader != null) { - cachedSequentialReader.close(); - } - } finally { - super.doClose(); + if (in instanceof CodecReader) { + CodecReader reader = (CodecReader) in; + return doGetSequentialStoredFieldsReader(reader.getFieldsReader().getMergeInstance()); + } else if (in instanceof SequentialStoredFieldsLeafReader) { + SequentialStoredFieldsLeafReader reader = (SequentialStoredFieldsLeafReader) in; + return doGetSequentialStoredFieldsReader(reader.getSequentialStoredFieldsReader()); + } else { + throw new IOException("requires a CodecReader or a SequentialStoredFieldsLeafReader, got " + in.getClass()); } } diff --git a/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java b/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java index 22756432bd6b4..903f261601d3b 100644 --- a/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java +++ b/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.StoredFieldsReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.ReaderUtil; import org.apache.lucene.search.DocIdSetIterator; @@ -192,11 +193,20 @@ public void execute(SearchContext context, String profileDescription) { if (currentReaderContext.reader() instanceof SequentialStoredFieldsLeafReader lf && hasSequentialDocs && docs.length >= 10) { - // All the docs to fetch are adjacent but Lucene stored fields are optimized - // for random access and don't optimize for sequential access - except for merging. - // So we do a little hack here and pretend we're going to do merges in order to - // get better sequential access. - fieldReader = lf.getSequentialStoredFieldsReader()::document; + StoredFieldsReader sequentialReader; + // For scroll queries, try to get cached reader + if (context.scrollContext() != null) { + Object segmentKey = lf.getCoreCacheHelper() != null ? lf.getCoreCacheHelper().getKey() : currentReaderContext; + sequentialReader = context.scrollContext().getCachedSequentialReader(segmentKey); + if (sequentialReader == null) { + sequentialReader = lf.getSequentialStoredFieldsReader(); + context.scrollContext().cacheSequentialReader(segmentKey, sequentialReader); + } + } else { + sequentialReader = lf.getSequentialStoredFieldsReader(); + } + + fieldReader = sequentialReader::document; } else { fieldReader = currentReaderContext.reader().storedFields()::document; } diff --git a/server/src/main/java/org/opensearch/search/internal/LegacyReaderContext.java b/server/src/main/java/org/opensearch/search/internal/LegacyReaderContext.java index 05ab12d5ae809..0df1e46aa1bd5 100644 --- a/server/src/main/java/org/opensearch/search/internal/LegacyReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/LegacyReaderContext.java @@ -82,6 +82,7 @@ public LegacyReaderContext( () -> {} ); this.scrollContext = new ScrollContext(); + addOnClose(this.scrollContext); } else { this.scrollContext = null; this.searcher = null; diff --git a/server/src/main/java/org/opensearch/search/internal/ScrollContext.java b/server/src/main/java/org/opensearch/search/internal/ScrollContext.java index e3517756ced6e..0333d2f37b290 100644 --- a/server/src/main/java/org/opensearch/search/internal/ScrollContext.java +++ b/server/src/main/java/org/opensearch/search/internal/ScrollContext.java @@ -32,20 +32,50 @@ package org.opensearch.search.internal; +import org.apache.lucene.codecs.StoredFieldsReader; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TotalHits; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.lease.Releasable; import org.opensearch.search.Scroll; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + /** * Wrapper around information that needs to stay around when scrolling. * * @opensearch.api */ @PublicApi(since = "1.0.0") -public final class ScrollContext { - public TotalHits totalHits = null; +public final class ScrollContext implements Releasable { + public TotalHits totalHits; public float maxScore = Float.NaN; public ScoreDoc lastEmittedDoc; public Scroll scroll; + private Map sequentialReaderCache; + + public StoredFieldsReader getCachedSequentialReader(Object segmentKey) { + return sequentialReaderCache != null ? sequentialReaderCache.get(segmentKey) : null; + } + + public void cacheSequentialReader(Object segmentKey, StoredFieldsReader reader) { + if (sequentialReaderCache == null) { + sequentialReaderCache = new HashMap<>(); + } + sequentialReaderCache.put(segmentKey, reader); + } + + @Override + public void close() { + if (sequentialReaderCache != null) { + for (StoredFieldsReader reader : sequentialReaderCache.values()) { + try { + reader.close(); + } catch (IOException e) {} + } + sequentialReaderCache = null; + } + } } From 10a1340decf853b8ba8bf057881e427ef91f25b7 Mon Sep 17 00:00:00 2001 From: Prudhvi Godithi Date: Sat, 29 Nov 2025 19:55:16 -0800 Subject: [PATCH 04/11] spotless fix Signed-off-by: Prudhvi Godithi --- .../main/java/org/opensearch/search/fetch/FetchPhase.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java b/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java index 903f261601d3b..485e4480c58b9 100644 --- a/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java +++ b/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java @@ -193,6 +193,10 @@ public void execute(SearchContext context, String profileDescription) { if (currentReaderContext.reader() instanceof SequentialStoredFieldsLeafReader lf && hasSequentialDocs && docs.length >= 10) { + // All the docs to fetch are adjacent but Lucene stored fields are optimized + // for random access and don't optimize for sequential access - except for merging. + // So we do a little hack here and pretend we're going to do merges in order to + // get better sequential access. StoredFieldsReader sequentialReader; // For scroll queries, try to get cached reader if (context.scrollContext() != null) { @@ -205,7 +209,6 @@ public void execute(SearchContext context, String profileDescription) { } else { sequentialReader = lf.getSequentialStoredFieldsReader(); } - fieldReader = sequentialReader::document; } else { fieldReader = currentReaderContext.reader().storedFields()::document; From 7e57b5de4a39c4ab38151416650bf54415a7a035 Mon Sep 17 00:00:00 2001 From: Prudhvi Godithi Date: Sat, 29 Nov 2025 19:56:13 -0800 Subject: [PATCH 05/11] spotless fix Signed-off-by: Prudhvi Godithi --- .../main/java/org/opensearch/search/internal/ScrollContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/search/internal/ScrollContext.java b/server/src/main/java/org/opensearch/search/internal/ScrollContext.java index 0333d2f37b290..eab266e457a8d 100644 --- a/server/src/main/java/org/opensearch/search/internal/ScrollContext.java +++ b/server/src/main/java/org/opensearch/search/internal/ScrollContext.java @@ -50,7 +50,7 @@ */ @PublicApi(since = "1.0.0") public final class ScrollContext implements Releasable { - public TotalHits totalHits; + public TotalHits totalHits = null; public float maxScore = Float.NaN; public ScoreDoc lastEmittedDoc; public Scroll scroll; From 8bd03d55c5fe8a0cf05b87cd625bb5ff0f6c8984 Mon Sep 17 00:00:00 2001 From: Prudhvi Godithi Date: Mon, 1 Dec 2025 13:55:51 -0800 Subject: [PATCH 06/11] Fix assertions Signed-off-by: Prudhvi Godithi --- .../search/basic/TransportTwoNodesSearchIT.java | 2 ++ .../org/opensearch/search/scroll/SearchScrollIT.java | 2 ++ .../search/scroll/SearchScrollWithFailingNodesIT.java | 2 ++ .../org/opensearch/search/stats/SearchStatsIT.java | 2 ++ .../org/opensearch/search/internal/ScrollContext.java | 10 ++++++++++ .../java/org/opensearch/test/OpenSearchTestCase.java | 2 -- 6 files changed, 18 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/basic/TransportTwoNodesSearchIT.java b/server/src/internalClusterTest/java/org/opensearch/search/basic/TransportTwoNodesSearchIT.java index cc88d399932c8..7d91694c81ea4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/basic/TransportTwoNodesSearchIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/basic/TransportTwoNodesSearchIT.java @@ -34,6 +34,7 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.OpenSearchException; import org.opensearch.action.search.MultiSearchResponse; import org.opensearch.action.search.SearchPhaseExecutionException; @@ -80,6 +81,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.startsWith; +@LuceneTestCase.SuppressCodecs("Asserting") public class TransportTwoNodesSearchIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { public TransportTwoNodesSearchIT(Settings staticSettings) { diff --git a/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollIT.java b/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollIT.java index a99ccae1ffdd1..2114be9ddfd4d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollIT.java @@ -34,6 +34,7 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.ExceptionsHelper; import org.opensearch.action.search.ClearScrollResponse; import org.opensearch.action.search.SearchPhaseExecutionException; @@ -91,6 +92,7 @@ /** * Tests for scrolling. */ +@LuceneTestCase.SuppressCodecs("Asserting") public class SearchScrollIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { public SearchScrollIT(Settings settings) { super(settings); diff --git a/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollWithFailingNodesIT.java b/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollWithFailingNodesIT.java index 38f65c8c2d0da..9aa96347a80f1 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollWithFailingNodesIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollWithFailingNodesIT.java @@ -34,6 +34,7 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.search.SearchResponse; import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; @@ -56,6 +57,7 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.lessThan; +@LuceneTestCase.SuppressCodecs("Asserting") @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 0) public class SearchScrollWithFailingNodesIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { public SearchScrollWithFailingNodesIT(Settings settings) { diff --git a/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java index 99cb3a4e8ca20..40e8a6d447ba0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java @@ -34,6 +34,7 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; @@ -79,6 +80,7 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +@LuceneTestCase.SuppressCodecs("Asserting") @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, minNumDataNodes = 2) public class SearchStatsIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { diff --git a/server/src/main/java/org/opensearch/search/internal/ScrollContext.java b/server/src/main/java/org/opensearch/search/internal/ScrollContext.java index eab266e457a8d..1b8056f7247c9 100644 --- a/server/src/main/java/org/opensearch/search/internal/ScrollContext.java +++ b/server/src/main/java/org/opensearch/search/internal/ScrollContext.java @@ -54,6 +54,16 @@ public final class ScrollContext implements Releasable { public float maxScore = Float.NaN; public ScoreDoc lastEmittedDoc; public Scroll scroll; + + /** + * Cache for sequential stored field readers per segment. + * These readers are optimized for sequential access and cache decompressed blocks. + * + * Thread-safety note: Scroll requests are serialized (client waits for response before + * sending next request), so while different threads may use this cache, they won't + * access it concurrently. The underlying StoredFieldsReader has mutable state (BlockState) + * but is safe for sequential single-threaded access across different threads. + */ private Map sequentialReaderCache; public StoredFieldsReader getCachedSequentialReader(Object segmentKey) { diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java index a36a028b75e01..352fb42e0acb7 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java @@ -63,7 +63,6 @@ import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.tests.util.TimeUnits; import org.opensearch.Version; -import org.opensearch.bootstrap.BootstrapForTesting; import org.opensearch.cluster.ClusterModule; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.PersistedStateRegistry; @@ -137,7 +136,6 @@ import org.opensearch.test.junit.listeners.LoggingListener; import org.opensearch.test.junit.listeners.ReproduceInfoPrinter; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.TransportService; import org.opensearch.transport.client.Client; import org.opensearch.transport.client.Requests; import org.opensearch.transport.nio.MockNioTransportPlugin; From dc6a4ae80f0e467a5ce0ab750c53911e4c7fe274 Mon Sep 17 00:00:00 2001 From: Prudhvi Godithi Date: Mon, 1 Dec 2025 13:56:40 -0800 Subject: [PATCH 07/11] Fix assertions Signed-off-by: Prudhvi Godithi --- .../src/main/java/org/opensearch/test/OpenSearchTestCase.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java index 352fb42e0acb7..a36a028b75e01 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java @@ -63,6 +63,7 @@ import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.tests.util.TimeUnits; import org.opensearch.Version; +import org.opensearch.bootstrap.BootstrapForTesting; import org.opensearch.cluster.ClusterModule; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.PersistedStateRegistry; @@ -136,6 +137,7 @@ import org.opensearch.test.junit.listeners.LoggingListener; import org.opensearch.test.junit.listeners.ReproduceInfoPrinter; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; import org.opensearch.transport.client.Client; import org.opensearch.transport.client.Requests; import org.opensearch.transport.nio.MockNioTransportPlugin; From 5bedf518159951f3a759a0c966b661cc29bdfff9 Mon Sep 17 00:00:00 2001 From: Prudhvi Godithi Date: Tue, 2 Dec 2025 10:41:19 -0800 Subject: [PATCH 08/11] Update tests Signed-off-by: Prudhvi Godithi --- .../basic/TransportTwoNodesSearchIT.java | 14 +- .../scroll/ScrollStoredFieldsCacheIT.java | 193 ++++++++++++++++++ .../search/scroll/SearchScrollIT.java | 91 ++++++++- .../SearchScrollWithFailingNodesIT.java | 12 +- .../search/stats/SearchStatsIT.java | 12 +- .../ScrollContextReaderCacheTests.java | 126 ++++++++++++ .../opensearch/test/OpenSearchTestCase.java | 6 +- 7 files changed, 446 insertions(+), 8 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/search/scroll/ScrollStoredFieldsCacheIT.java create mode 100644 server/src/test/java/org/opensearch/search/internal/ScrollContextReaderCacheTests.java diff --git a/server/src/internalClusterTest/java/org/opensearch/search/basic/TransportTwoNodesSearchIT.java b/server/src/internalClusterTest/java/org/opensearch/search/basic/TransportTwoNodesSearchIT.java index 7d91694c81ea4..20ea8dbca5282 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/basic/TransportTwoNodesSearchIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/basic/TransportTwoNodesSearchIT.java @@ -81,7 +81,19 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.startsWith; -@LuceneTestCase.SuppressCodecs("Asserting") +/** + *

The {@code @SuppressCodecs("*")} annotation is required because scroll queries + * cache StoredFieldsReader merge instances in ScrollContext to optimize sequential document + * access across batches. Different scroll batches may execute on different threads from the + * search thread pool, but access is always sequential (never concurrent). Lucene's + * AssertingStoredFieldsFormat enforces strict thread affinity that doesn't account for this + * legitimate sequential cross-thread usage pattern. The underlying Lucene implementation + * (Lucene90CompressingStoredFieldsReader) is safe for sequential access from different threads + * since there's no concurrent modification of internal state. + * + * @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object) + */ +@LuceneTestCase.SuppressCodecs("*") public class TransportTwoNodesSearchIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { public TransportTwoNodesSearchIT(Settings staticSettings) { diff --git a/server/src/internalClusterTest/java/org/opensearch/search/scroll/ScrollStoredFieldsCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/search/scroll/ScrollStoredFieldsCacheIT.java new file mode 100644 index 0000000000000..9e8b793863205 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/search/scroll/ScrollStoredFieldsCacheIT.java @@ -0,0 +1,193 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.scroll; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.apache.lucene.tests.util.LuceneTestCase; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.search.SearchHit; +import org.opensearch.search.sort.SortOrder; +import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.opensearch.index.query.QueryBuilders.matchAllQuery; +import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.equalTo; + +/** + * Integration tests for scroll query StoredFieldsReader caching optimization. + * + * Tests verify that scroll queries correctly return all documents when using + * the sequential reader cache optimization for stored fields. + */ +@LuceneTestCase.SuppressCodecs("*") +public class ScrollStoredFieldsCacheIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { + + public ScrollStoredFieldsCacheIT(Settings settings) { + super(settings); + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() } + ); + } + + /** + * Tests that scroll queries with sequential document access correctly utilize + * the StoredFieldsReader cache and return all documents without data corruption. + */ + public void testScrollWithSequentialReaderCache() throws Exception { + int numDocs = randomIntBetween(100, 500); + int scrollSize = randomIntBetween(10, 50); + createIndex("test", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build()); + ensureGreen("test"); + // Index documents + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("test") + .setId(Integer.toString(i)) + .setSource(jsonBuilder().startObject().field("field", i).field("text", "document " + i).endObject()) + .get(); + } + refresh("test"); + indexRandomForConcurrentSearch("test"); + // Execute scroll query + Set retrievedIds = new HashSet<>(); + SearchResponse searchResponse = client().prepareSearch("test") + .setQuery(matchAllQuery()) + .setSize(scrollSize) + .setScroll(TimeValue.timeValueMinutes(2)) + .addSort("field", SortOrder.ASC) + .get(); + + try { + assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().getTotalHits().value(), equalTo((long) numDocs)); + do { + for (SearchHit hit : searchResponse.getHits().getHits()) { + // Verify no duplicate documents + assertTrue("Duplicate document id: " + hit.getId(), retrievedIds.add(hit.getId())); + // Verify document content is correct + assertNotNull(hit.getSourceAsMap()); + assertEquals(Integer.parseInt(hit.getId()), hit.getSourceAsMap().get("field")); + } + searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)).get(); + assertNoFailures(searchResponse); + } while (searchResponse.getHits().getHits().length > 0); + // Verify all documents were retrieved + assertThat(retrievedIds.size(), equalTo(numDocs)); + } finally { + clearScroll(searchResponse.getScrollId()); + } + } + + /** + * Tests scroll queries across multiple segments to verify cache works + * correctly when switching between segments. + */ + public void testScrollAcrossMultipleSegments() throws Exception { + int docsPerSegment = randomIntBetween(20, 50); + int numSegments = randomIntBetween(3, 5); + int totalDocs = docsPerSegment * numSegments; + int scrollSize = randomIntBetween(5, 15); + int expectedBatches = (totalDocs + scrollSize - 1) / scrollSize; + createIndex("test", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build()); + ensureGreen("test"); + // Index documents in batches to create multiple segments + for (int seg = 0; seg < numSegments; seg++) { + for (int i = 0; i < docsPerSegment; i++) { + int docId = seg * docsPerSegment + i; + client().prepareIndex("test") + .setId(Integer.toString(docId)) + .setSource(jsonBuilder().startObject().field("field", docId).field("segment", seg).endObject()) + .get(); + } + refresh("test"); // Create new segment + } + indexRandomForConcurrentSearch("test"); + // Execute scroll query + Set retrievedIds = new HashSet<>(); + SearchResponse searchResponse = client().prepareSearch("test") + .setQuery(matchAllQuery()) + .setSize(scrollSize) + .setScroll(TimeValue.timeValueMinutes(2)) + .addSort("field", SortOrder.ASC) + .get(); + try { + assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().getTotalHits().value(), equalTo((long) totalDocs)); + int batchCount = 0; + do { + batchCount++; + for (SearchHit hit : searchResponse.getHits().getHits()) { + assertTrue("Duplicate document id: " + hit.getId(), retrievedIds.add(hit.getId())); + } + searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)).get(); + assertNoFailures(searchResponse); + } while (searchResponse.getHits().getHits().length > 0); + // Verify all documents retrieved + assertThat(retrievedIds.size(), equalTo(totalDocs)); + // Verify exact batch count + assertThat(batchCount, equalTo(expectedBatches)); + } finally { + clearScroll(searchResponse.getScrollId()); + } + } + + /** + * Tests that large scroll batches (>=10 docs) trigger the sequential reader + * optimization and work correctly. + */ + public void testLargeScrollBatchTriggersOptimization() throws Exception { + int numDocs = 200; + // Scroll size >= 10 triggers sequential reader optimization + int scrollSize = randomIntBetween(10, 50); + createIndex("test", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build()); + ensureGreen("test"); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("test") + .setId(Integer.toString(i)) + .setSource(jsonBuilder().startObject().field("field", i).endObject()) + .get(); + } + refresh("test"); + indexRandomForConcurrentSearch("test"); + Set retrievedIds = new HashSet<>(); + SearchResponse searchResponse = client().prepareSearch("test") + .setQuery(matchAllQuery()) + .setSize(scrollSize) + .setScroll(TimeValue.timeValueMinutes(2)) + .addSort("field", SortOrder.ASC) + .get(); + try { + assertNoFailures(searchResponse); + do { + for (SearchHit hit : searchResponse.getHits().getHits()) { + retrievedIds.add(hit.getId()); + } + searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)).get(); + assertNoFailures(searchResponse); + } while (searchResponse.getHits().getHits().length > 0); + assertThat(retrievedIds.size(), equalTo(numDocs)); + } finally { + clearScroll(searchResponse.getScrollId()); + } + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollIT.java b/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollIT.java index 2114be9ddfd4d..5f5b3c164e282 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollIT.java @@ -65,10 +65,14 @@ import org.junit.After; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; @@ -91,8 +95,19 @@ /** * Tests for scrolling. + * + *

The {@code @SuppressCodecs("*")} annotation is required because scroll queries + * cache StoredFieldsReader merge instances in ScrollContext to optimize sequential document + * access across batches. Different scroll batches may execute on different threads from the + * search thread pool, but access is always sequential (never concurrent). Lucene's + * AssertingStoredFieldsFormat enforces strict thread affinity that doesn't account for this + * legitimate sequential cross-thread usage pattern. The underlying Lucene implementation + * (Lucene90CompressingStoredFieldsReader) is safe for sequential access from different threads + * since there's no concurrent modification of internal state. + * + * @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object) */ -@LuceneTestCase.SuppressCodecs("Asserting") +@LuceneTestCase.SuppressCodecs("*") public class SearchScrollIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { public SearchScrollIT(Settings settings) { super(settings); @@ -824,6 +839,80 @@ public void testRestartDataNodesDuringScrollSearch() throws Exception { client().prepareSearchScroll(respFromProdIndex.getScrollId()).get(); } + /** + * Tests that scroll queries with StoredFieldsReader caching return correct results + * across multiple batches. Verifies document order, content integrity, no duplicates, + * and no missing documents when using the sequential reader optimization. + */ + public void testScrollWithSequentialReaderCacheReturnsCorrectResults() throws Exception { + int numDocs = randomIntBetween(100, 300); + // Size >= 10 triggers sequential reader optimization in FetchPhase + int scrollSize = randomIntBetween(10, 35); + + client().admin() + .indices() + .prepareCreate("test") + .setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)) + .get(); + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().get(); + // Index documents with stored fields + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("test") + .setId(Integer.toString(i)) + .setSource(jsonBuilder().startObject().field("field", i).field("text", "document number " + i).endObject()) + .get(); + } + client().admin().indices().prepareRefresh().get(); + indexRandomForConcurrentSearch("test"); + // Execute scroll query with sorting to ensure deterministic order + Set retrievedIds = new HashSet<>(); + List retrievedOrder = new ArrayList<>(); + SearchResponse searchResponse = client().prepareSearch("test") + .setQuery(matchAllQuery()) + .setSize(scrollSize) + .setScroll(TimeValue.timeValueMinutes(2)) + .addSort("field", SortOrder.ASC) + .get(); + + try { + assertThat(searchResponse.getHits().getTotalHits().value(), equalTo((long) numDocs)); + int expectedValue = 0; + int batchCount = 0; + do { + batchCount++; + for (SearchHit hit : searchResponse.getHits().getHits()) { + int docId = Integer.parseInt(hit.getId()); + // Verify no duplicates + assertTrue("Duplicate document id: " + docId, retrievedIds.add(docId)); + retrievedOrder.add(docId); + // Verify sort order + assertThat( + "Document out of order at position " + retrievedOrder.size(), + ((Number) hit.getSortValues()[0]).intValue(), + equalTo(expectedValue) + ); + // Verify stored field content matches document id + Map source = hit.getSourceAsMap(); + assertThat("Field value mismatch for doc " + docId, source.get("field"), equalTo(docId)); + assertThat("Text field mismatch for doc " + docId, source.get("text"), equalTo("document number " + docId)); + expectedValue++; + } + searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)).get(); + assertNoFailures(searchResponse); + + } while (searchResponse.getHits().getHits().length > 0); + // Verify all documents retrieved + assertThat("Not all documents retrieved", retrievedIds.size(), equalTo(numDocs)); + assertThat("Multiple batches should have been used", batchCount, greaterThan(1)); + // Verify complete sequence + for (int i = 0; i < numDocs; i++) { + assertTrue("Missing document: " + i, retrievedIds.contains(i)); + } + } finally { + clearScroll(searchResponse.getScrollId()); + } + } + private void assertToXContentResponse(ClearScrollResponse response, boolean succeed, int numFreed) throws IOException { XContentBuilder builder = XContentFactory.jsonBuilder(); response.toXContent(builder, ToXContent.EMPTY_PARAMS); diff --git a/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollWithFailingNodesIT.java b/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollWithFailingNodesIT.java index 9aa96347a80f1..2eaf2542b9331 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollWithFailingNodesIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollWithFailingNodesIT.java @@ -57,7 +57,17 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.lessThan; -@LuceneTestCase.SuppressCodecs("Asserting") +/** + *

The {@code @SuppressCodecs("*")} annotation is required because scroll queries + * cache StoredFieldsReader merge instances in ScrollContext to optimize sequential document + * access across batches. Different scroll batches may execute on different threads from the + * search thread pool, but access is always sequential (never concurrent). Lucene's + * AssertingStoredFieldsFormat enforces strict thread affinity that doesn't account for this + * legitimate sequential cross-thread usage pattern. + * + * @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object) + */ +@LuceneTestCase.SuppressCodecs("*") @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 0) public class SearchScrollWithFailingNodesIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { public SearchScrollWithFailingNodesIT(Settings settings) { diff --git a/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java index 40e8a6d447ba0..92d40d60338dd 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java @@ -80,7 +80,17 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; -@LuceneTestCase.SuppressCodecs("Asserting") +/** + *

The {@code @SuppressCodecs("*")} annotation is required because scroll queries + * cache StoredFieldsReader merge instances in ScrollContext to optimize sequential document + * access across batches. Different scroll batches may execute on different threads from the + * search thread pool, but access is always sequential (never concurrent). Lucene's + * AssertingStoredFieldsFormat enforces strict thread affinity that doesn't account for this + * legitimate sequential cross-thread usage pattern. + * + * @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object) + */ +@LuceneTestCase.SuppressCodecs("*") @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, minNumDataNodes = 2) public class SearchStatsIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { diff --git a/server/src/test/java/org/opensearch/search/internal/ScrollContextReaderCacheTests.java b/server/src/test/java/org/opensearch/search/internal/ScrollContextReaderCacheTests.java new file mode 100644 index 0000000000000..bf61103a322f5 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/internal/ScrollContextReaderCacheTests.java @@ -0,0 +1,126 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.internal; + +import org.apache.lucene.codecs.StoredFieldsReader; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Unit tests for {@link ScrollContext}'s StoredFieldsReader caching functionality. + * + * The cache stores merge instances of StoredFieldsReader per segment to optimize + * sequential document access during scroll queries by avoiding repeated decompression + * of stored field blocks. + */ +public class ScrollContextReaderCacheTests extends OpenSearchTestCase { + + public void testCachePutAndGet() { + ScrollContext scrollContext = new ScrollContext(); + Object segmentKey1 = new Object(); + Object segmentKey2 = new Object(); + StoredFieldsReader reader1 = mock(StoredFieldsReader.class); + StoredFieldsReader reader2 = mock(StoredFieldsReader.class); + // Initially no cached readers + assertNull(scrollContext.getCachedSequentialReader(segmentKey1)); + assertNull(scrollContext.getCachedSequentialReader(segmentKey2)); + // Cache reader for segment 1 + scrollContext.cacheSequentialReader(segmentKey1, reader1); + assertSame(reader1, scrollContext.getCachedSequentialReader(segmentKey1)); + assertNull(scrollContext.getCachedSequentialReader(segmentKey2)); + // Cache reader for segment 2 + scrollContext.cacheSequentialReader(segmentKey2, reader2); + assertSame(reader1, scrollContext.getCachedSequentialReader(segmentKey1)); + assertSame(reader2, scrollContext.getCachedSequentialReader(segmentKey2)); + scrollContext.close(); + } + + public void testCloseReleasesAllReaders() throws IOException { + ScrollContext scrollContext = new ScrollContext(); + Object segmentKey1 = new Object(); + Object segmentKey2 = new Object(); + StoredFieldsReader reader1 = mock(StoredFieldsReader.class); + StoredFieldsReader reader2 = mock(StoredFieldsReader.class); + scrollContext.cacheSequentialReader(segmentKey1, reader1); + scrollContext.cacheSequentialReader(segmentKey2, reader2); + scrollContext.close(); + // Verify both readers were closed + verify(reader1, times(1)).close(); + verify(reader2, times(1)).close(); + // After close, cache should be cleared + assertNull(scrollContext.getCachedSequentialReader(segmentKey1)); + assertNull(scrollContext.getCachedSequentialReader(segmentKey2)); + } + + public void testCloseWithEmptyCache() { + ScrollContext scrollContext = new ScrollContext(); + // Should not throw when closing with no cached readers + scrollContext.close(); + assertNull(scrollContext.getCachedSequentialReader(new Object())); + } + + public void testCloseHandlesReaderException() throws IOException { + ScrollContext scrollContext = new ScrollContext(); + Object segmentKey1 = new Object(); + Object segmentKey2 = new Object(); + StoredFieldsReader reader1 = mock(StoredFieldsReader.class); + StoredFieldsReader reader2 = mock(StoredFieldsReader.class); + // Make reader1 throw on close + doThrow(new IOException("test exception")).when(reader1).close(); + scrollContext.cacheSequentialReader(segmentKey1, reader1); + scrollContext.cacheSequentialReader(segmentKey2, reader2); + // Should not throw, should continue closing other readers + scrollContext.close(); + // Both readers should have been attempted to close + verify(reader1, times(1)).close(); + verify(reader2, times(1)).close(); + } + + public void testCacheHitAcrossScrollBatches() { + ScrollContext scrollContext = new ScrollContext(); + Object segmentKey = new Object(); + StoredFieldsReader reader = mock(StoredFieldsReader.class); + // Simulate first batch - cache miss, then cache + assertNull(scrollContext.getCachedSequentialReader(segmentKey)); + scrollContext.cacheSequentialReader(segmentKey, reader); + // Simulate subsequent batches - cache hits + for (int batch = 2; batch <= 5; batch++) { + assertSame("Batch " + batch + " should hit cache", reader, scrollContext.getCachedSequentialReader(segmentKey)); + } + scrollContext.close(); + } + + public void testMultipleSegmentsCached() throws IOException { + ScrollContext scrollContext = new ScrollContext(); + int numSegments = randomIntBetween(3, 10); + Object[] segmentKeys = new Object[numSegments]; + StoredFieldsReader[] readers = new StoredFieldsReader[numSegments]; + // Cache readers for all segments + for (int i = 0; i < numSegments; i++) { + segmentKeys[i] = new Object(); + readers[i] = mock(StoredFieldsReader.class); + scrollContext.cacheSequentialReader(segmentKeys[i], readers[i]); + } + // Verify all segments are cached + for (int i = 0; i < numSegments; i++) { + assertSame(readers[i], scrollContext.getCachedSequentialReader(segmentKeys[i])); + } + scrollContext.close(); + // Verify all readers were closed + for (int i = 0; i < numSegments; i++) { + verify(readers[i], times(1)).close(); + } + } +} diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java index a36a028b75e01..9e05d8b219262 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java @@ -63,7 +63,6 @@ import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.tests.util.TimeUnits; import org.opensearch.Version; -import org.opensearch.bootstrap.BootstrapForTesting; import org.opensearch.cluster.ClusterModule; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.PersistedStateRegistry; @@ -137,7 +136,6 @@ import org.opensearch.test.junit.listeners.LoggingListener; import org.opensearch.test.junit.listeners.ReproduceInfoPrinter; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.TransportService; import org.opensearch.transport.client.Client; import org.opensearch.transport.client.Requests; import org.opensearch.transport.nio.MockNioTransportPlugin; @@ -329,8 +327,8 @@ public void append(LogEvent event) { Configurator.shutdown(context); })); - BootstrapForTesting.ensureInitialized(); - TransportService.ensureClassloaded(); // ensure server streamables are registered + // BootstrapForTesting.ensureInitialized(); + // TransportService.ensureClassloaded(); // ensure server streamables are registered // filter out joda timezones that are deprecated for the java time migration List jodaTZIds = DateTimeZone.getAvailableIDs() From 1ba51851dfd077fd9879391dd9b2c0643ceaa13b Mon Sep 17 00:00:00 2001 From: Prudhvi Godithi Date: Tue, 2 Dec 2025 12:29:57 -0800 Subject: [PATCH 09/11] update changelog Signed-off-by: Prudhvi Godithi --- CHANGELOG.md | 1 + .../scroll/ScrollStoredFieldsCacheIT.java | 52 ++----------------- .../search/scroll/SearchScrollIT.java | 18 ++----- .../SearchScrollWithFailingNodesIT.java | 10 ++-- .../search/stats/SearchStatsIT.java | 10 ++-- .../opensearch/search/fetch/FetchPhase.java | 1 + .../ScrollContextReaderCacheTests.java | 42 +++++---------- .../opensearch/test/OpenSearchTestCase.java | 6 ++- 8 files changed, 35 insertions(+), 105 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e2be80e8201c..8b5654a9e0647 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Support pull-based ingestion message mappers and raw payload support ([#19765](https://github.com/opensearch-project/OpenSearch/pull/19765)) - Add search API tracker ([#18601](https://github.com/opensearch-project/OpenSearch/pull/18601)) - Support dynamic consumer configuration update in pull-based ingestion ([#19963](https://github.com/opensearch-project/OpenSearch/pull/19963)) +- Cache the `StoredFieldsReader` for scroll query optimization ([#20112](https://github.com/opensearch-project/OpenSearch/pull/20112)) ### Changed - Combining filter rewrite and skip list to optimize sub aggregation([#19573](https://github.com/opensearch-project/OpenSearch/pull/19573)) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/scroll/ScrollStoredFieldsCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/search/scroll/ScrollStoredFieldsCacheIT.java index 9e8b793863205..448e96d8322c4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/scroll/ScrollStoredFieldsCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/scroll/ScrollStoredFieldsCacheIT.java @@ -68,7 +68,6 @@ public void testScrollWithSequentialReaderCache() throws Exception { } refresh("test"); indexRandomForConcurrentSearch("test"); - // Execute scroll query Set retrievedIds = new HashSet<>(); SearchResponse searchResponse = client().prepareSearch("test") .setQuery(matchAllQuery()) @@ -84,7 +83,7 @@ public void testScrollWithSequentialReaderCache() throws Exception { for (SearchHit hit : searchResponse.getHits().getHits()) { // Verify no duplicate documents assertTrue("Duplicate document id: " + hit.getId(), retrievedIds.add(hit.getId())); - // Verify document content is correct + // Verify document content is correct _source field assertNotNull(hit.getSourceAsMap()); assertEquals(Integer.parseInt(hit.getId()), hit.getSourceAsMap().get("field")); } @@ -99,14 +98,14 @@ public void testScrollWithSequentialReaderCache() throws Exception { } /** - * Tests scroll queries across multiple segments to verify cache works - * correctly when switching between segments. + * Tests scroll queries across multiple segments with batch sizes that + * trigger the sequential reader optimization (>= 10 docs). */ public void testScrollAcrossMultipleSegments() throws Exception { int docsPerSegment = randomIntBetween(20, 50); int numSegments = randomIntBetween(3, 5); int totalDocs = docsPerSegment * numSegments; - int scrollSize = randomIntBetween(5, 15); + int scrollSize = randomIntBetween(10, 50); int expectedBatches = (totalDocs + scrollSize - 1) / scrollSize; createIndex("test", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build()); ensureGreen("test"); @@ -119,10 +118,9 @@ public void testScrollAcrossMultipleSegments() throws Exception { .setSource(jsonBuilder().startObject().field("field", docId).field("segment", seg).endObject()) .get(); } - refresh("test"); // Create new segment + refresh("test"); } indexRandomForConcurrentSearch("test"); - // Execute scroll query Set retrievedIds = new HashSet<>(); SearchResponse searchResponse = client().prepareSearch("test") .setQuery(matchAllQuery()) @@ -150,44 +148,4 @@ public void testScrollAcrossMultipleSegments() throws Exception { clearScroll(searchResponse.getScrollId()); } } - - /** - * Tests that large scroll batches (>=10 docs) trigger the sequential reader - * optimization and work correctly. - */ - public void testLargeScrollBatchTriggersOptimization() throws Exception { - int numDocs = 200; - // Scroll size >= 10 triggers sequential reader optimization - int scrollSize = randomIntBetween(10, 50); - createIndex("test", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build()); - ensureGreen("test"); - for (int i = 0; i < numDocs; i++) { - client().prepareIndex("test") - .setId(Integer.toString(i)) - .setSource(jsonBuilder().startObject().field("field", i).endObject()) - .get(); - } - refresh("test"); - indexRandomForConcurrentSearch("test"); - Set retrievedIds = new HashSet<>(); - SearchResponse searchResponse = client().prepareSearch("test") - .setQuery(matchAllQuery()) - .setSize(scrollSize) - .setScroll(TimeValue.timeValueMinutes(2)) - .addSort("field", SortOrder.ASC) - .get(); - try { - assertNoFailures(searchResponse); - do { - for (SearchHit hit : searchResponse.getHits().getHits()) { - retrievedIds.add(hit.getId()); - } - searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)).get(); - assertNoFailures(searchResponse); - } while (searchResponse.getHits().getHits().length > 0); - assertThat(retrievedIds.size(), equalTo(numDocs)); - } finally { - clearScroll(searchResponse.getScrollId()); - } - } } diff --git a/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollIT.java b/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollIT.java index 5f5b3c164e282..ef97f61a8f069 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollIT.java @@ -96,14 +96,10 @@ /** * Tests for scrolling. * - *

The {@code @SuppressCodecs("*")} annotation is required because scroll queries - * cache StoredFieldsReader merge instances in ScrollContext to optimize sequential document - * access across batches. Different scroll batches may execute on different threads from the - * search thread pool, but access is always sequential (never concurrent). Lucene's - * AssertingStoredFieldsFormat enforces strict thread affinity that doesn't account for this - * legitimate sequential cross-thread usage pattern. The underlying Lucene implementation - * (Lucene90CompressingStoredFieldsReader) is safe for sequential access from different threads - * since there's no concurrent modification of internal state. + *

{@code @SuppressCodecs("*")} is needed because we cache StoredFieldsReader instances + * across scroll batches for sequential access. Different batches may run on different threads + * (but never concurrently). Lucene's AssertingStoredFieldsFormat enforces thread affinity + * that rejects this valid sequential cross-thread usage. * * @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object) */ @@ -846,16 +842,13 @@ public void testRestartDataNodesDuringScrollSearch() throws Exception { */ public void testScrollWithSequentialReaderCacheReturnsCorrectResults() throws Exception { int numDocs = randomIntBetween(100, 300); - // Size >= 10 triggers sequential reader optimization in FetchPhase int scrollSize = randomIntBetween(10, 35); - client().admin() .indices() .prepareCreate("test") .setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)) .get(); client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().get(); - // Index documents with stored fields for (int i = 0; i < numDocs; i++) { client().prepareIndex("test") .setId(Integer.toString(i)) @@ -864,7 +857,6 @@ public void testScrollWithSequentialReaderCacheReturnsCorrectResults() throws Ex } client().admin().indices().prepareRefresh().get(); indexRandomForConcurrentSearch("test"); - // Execute scroll query with sorting to ensure deterministic order Set retrievedIds = new HashSet<>(); List retrievedOrder = new ArrayList<>(); SearchResponse searchResponse = client().prepareSearch("test") @@ -873,7 +865,6 @@ public void testScrollWithSequentialReaderCacheReturnsCorrectResults() throws Ex .setScroll(TimeValue.timeValueMinutes(2)) .addSort("field", SortOrder.ASC) .get(); - try { assertThat(searchResponse.getHits().getTotalHits().value(), equalTo((long) numDocs)); int expectedValue = 0; @@ -899,7 +890,6 @@ public void testScrollWithSequentialReaderCacheReturnsCorrectResults() throws Ex } searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)).get(); assertNoFailures(searchResponse); - } while (searchResponse.getHits().getHits().length > 0); // Verify all documents retrieved assertThat("Not all documents retrieved", retrievedIds.size(), equalTo(numDocs)); diff --git a/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollWithFailingNodesIT.java b/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollWithFailingNodesIT.java index 2eaf2542b9331..5206d3269b68a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollWithFailingNodesIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/scroll/SearchScrollWithFailingNodesIT.java @@ -58,12 +58,10 @@ import static org.hamcrest.Matchers.lessThan; /** - *

The {@code @SuppressCodecs("*")} annotation is required because scroll queries - * cache StoredFieldsReader merge instances in ScrollContext to optimize sequential document - * access across batches. Different scroll batches may execute on different threads from the - * search thread pool, but access is always sequential (never concurrent). Lucene's - * AssertingStoredFieldsFormat enforces strict thread affinity that doesn't account for this - * legitimate sequential cross-thread usage pattern. + *

{@code @SuppressCodecs("*")} is needed because we cache StoredFieldsReader instances + * across scroll batches for sequential access. Different batches may run on different threads + * (but never concurrently). Lucene's AssertingStoredFieldsFormat enforces thread affinity + * that rejects this valid sequential cross-thread usage. * * @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object) */ diff --git a/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java index 92d40d60338dd..428d9f4afc143 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java @@ -81,12 +81,10 @@ import static org.hamcrest.Matchers.nullValue; /** - *

The {@code @SuppressCodecs("*")} annotation is required because scroll queries - * cache StoredFieldsReader merge instances in ScrollContext to optimize sequential document - * access across batches. Different scroll batches may execute on different threads from the - * search thread pool, but access is always sequential (never concurrent). Lucene's - * AssertingStoredFieldsFormat enforces strict thread affinity that doesn't account for this - * legitimate sequential cross-thread usage pattern. + *

{@code @SuppressCodecs("*")} is needed because we cache StoredFieldsReader instances + * across scroll batches for sequential access. Different batches may run on different threads + * (but never concurrently). Lucene's AssertingStoredFieldsFormat enforces thread affinity + * that rejects this valid sequential cross-thread usage. * * @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object) */ diff --git a/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java b/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java index 485e4480c58b9..67c5d509c5d09 100644 --- a/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java +++ b/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java @@ -200,6 +200,7 @@ public void execute(SearchContext context, String profileDescription) { StoredFieldsReader sequentialReader; // For scroll queries, try to get cached reader if (context.scrollContext() != null) { + // To get a unique identifier for this segment to use as a cache key Object segmentKey = lf.getCoreCacheHelper() != null ? lf.getCoreCacheHelper().getKey() : currentReaderContext; sequentialReader = context.scrollContext().getCachedSequentialReader(segmentKey); if (sequentialReader == null) { diff --git a/server/src/test/java/org/opensearch/search/internal/ScrollContextReaderCacheTests.java b/server/src/test/java/org/opensearch/search/internal/ScrollContextReaderCacheTests.java index bf61103a322f5..b9d94b1b9ab29 100644 --- a/server/src/test/java/org/opensearch/search/internal/ScrollContextReaderCacheTests.java +++ b/server/src/test/java/org/opensearch/search/internal/ScrollContextReaderCacheTests.java @@ -27,7 +27,7 @@ */ public class ScrollContextReaderCacheTests extends OpenSearchTestCase { - public void testCachePutAndGet() { + public void testCachePutAndGet() throws IOException { ScrollContext scrollContext = new ScrollContext(); Object segmentKey1 = new Object(); Object segmentKey2 = new Object(); @@ -45,17 +45,6 @@ public void testCachePutAndGet() { assertSame(reader1, scrollContext.getCachedSequentialReader(segmentKey1)); assertSame(reader2, scrollContext.getCachedSequentialReader(segmentKey2)); scrollContext.close(); - } - - public void testCloseReleasesAllReaders() throws IOException { - ScrollContext scrollContext = new ScrollContext(); - Object segmentKey1 = new Object(); - Object segmentKey2 = new Object(); - StoredFieldsReader reader1 = mock(StoredFieldsReader.class); - StoredFieldsReader reader2 = mock(StoredFieldsReader.class); - scrollContext.cacheSequentialReader(segmentKey1, reader1); - scrollContext.cacheSequentialReader(segmentKey2, reader2); - scrollContext.close(); // Verify both readers were closed verify(reader1, times(1)).close(); verify(reader2, times(1)).close(); @@ -88,34 +77,27 @@ public void testCloseHandlesReaderException() throws IOException { verify(reader2, times(1)).close(); } - public void testCacheHitAcrossScrollBatches() { - ScrollContext scrollContext = new ScrollContext(); - Object segmentKey = new Object(); - StoredFieldsReader reader = mock(StoredFieldsReader.class); - // Simulate first batch - cache miss, then cache - assertNull(scrollContext.getCachedSequentialReader(segmentKey)); - scrollContext.cacheSequentialReader(segmentKey, reader); - // Simulate subsequent batches - cache hits - for (int batch = 2; batch <= 5; batch++) { - assertSame("Batch " + batch + " should hit cache", reader, scrollContext.getCachedSequentialReader(segmentKey)); - } - scrollContext.close(); - } - public void testMultipleSegmentsCached() throws IOException { ScrollContext scrollContext = new ScrollContext(); int numSegments = randomIntBetween(3, 10); Object[] segmentKeys = new Object[numSegments]; StoredFieldsReader[] readers = new StoredFieldsReader[numSegments]; - // Cache readers for all segments + // Cache readers for all segments (simulates first batch per segment) for (int i = 0; i < numSegments; i++) { segmentKeys[i] = new Object(); readers[i] = mock(StoredFieldsReader.class); + assertNull(scrollContext.getCachedSequentialReader(segmentKeys[i])); scrollContext.cacheSequentialReader(segmentKeys[i], readers[i]); } - // Verify all segments are cached - for (int i = 0; i < numSegments; i++) { - assertSame(readers[i], scrollContext.getCachedSequentialReader(segmentKeys[i])); + // Simulate multiple scroll batches - all should hit cache + for (int batch = 2; batch <= 5; batch++) { + for (int i = 0; i < numSegments; i++) { + assertSame( + "Batch " + batch + ", segment " + i + " should hit cache", + readers[i], + scrollContext.getCachedSequentialReader(segmentKeys[i]) + ); + } } scrollContext.close(); // Verify all readers were closed diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java index 9e05d8b219262..a36a028b75e01 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java @@ -63,6 +63,7 @@ import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.tests.util.TimeUnits; import org.opensearch.Version; +import org.opensearch.bootstrap.BootstrapForTesting; import org.opensearch.cluster.ClusterModule; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.PersistedStateRegistry; @@ -136,6 +137,7 @@ import org.opensearch.test.junit.listeners.LoggingListener; import org.opensearch.test.junit.listeners.ReproduceInfoPrinter; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; import org.opensearch.transport.client.Client; import org.opensearch.transport.client.Requests; import org.opensearch.transport.nio.MockNioTransportPlugin; @@ -327,8 +329,8 @@ public void append(LogEvent event) { Configurator.shutdown(context); })); - // BootstrapForTesting.ensureInitialized(); - // TransportService.ensureClassloaded(); // ensure server streamables are registered + BootstrapForTesting.ensureInitialized(); + TransportService.ensureClassloaded(); // ensure server streamables are registered // filter out joda timezones that are deprecated for the java time migration List jodaTZIds = DateTimeZone.getAvailableIDs() From 94d599a44e785ff8cda99409af85464bc333c820 Mon Sep 17 00:00:00 2001 From: Prudhvi Godithi Date: Tue, 2 Dec 2025 14:28:22 -0800 Subject: [PATCH 10/11] Fix SearchSliceIT Signed-off-by: Prudhvi Godithi --- .../search/basic/TransportTwoNodesSearchIT.java | 12 ++++-------- .../org/opensearch/search/slice/SearchSliceIT.java | 10 ++++++++++ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/basic/TransportTwoNodesSearchIT.java b/server/src/internalClusterTest/java/org/opensearch/search/basic/TransportTwoNodesSearchIT.java index 20ea8dbca5282..d4cd5d63edf0c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/basic/TransportTwoNodesSearchIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/basic/TransportTwoNodesSearchIT.java @@ -82,14 +82,10 @@ import static org.hamcrest.Matchers.startsWith; /** - *

The {@code @SuppressCodecs("*")} annotation is required because scroll queries - * cache StoredFieldsReader merge instances in ScrollContext to optimize sequential document - * access across batches. Different scroll batches may execute on different threads from the - * search thread pool, but access is always sequential (never concurrent). Lucene's - * AssertingStoredFieldsFormat enforces strict thread affinity that doesn't account for this - * legitimate sequential cross-thread usage pattern. The underlying Lucene implementation - * (Lucene90CompressingStoredFieldsReader) is safe for sequential access from different threads - * since there's no concurrent modification of internal state. + *

{@code @SuppressCodecs("*")} is needed because we cache StoredFieldsReader instances + * across scroll batches for sequential access. Different batches may run on different threads + * (but never concurrently). Lucene's AssertingStoredFieldsFormat enforces thread affinity + * that rejects this valid sequential cross-thread usage. * * @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object) */ diff --git a/server/src/internalClusterTest/java/org/opensearch/search/slice/SearchSliceIT.java b/server/src/internalClusterTest/java/org/opensearch/search/slice/SearchSliceIT.java index b381d7cbc18f8..ee3ab01fcdf32 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/slice/SearchSliceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/slice/SearchSliceIT.java @@ -34,6 +34,7 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest; import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.search.CreatePitAction; @@ -69,6 +70,15 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.startsWith; +/** + *

{@code @SuppressCodecs("*")} is needed because we cache StoredFieldsReader instances + * across scroll batches for sequential access. Different batches may run on different threads + * (but never concurrently). Lucene's AssertingStoredFieldsFormat enforces thread affinity + * that rejects this valid sequential cross-thread usage. + * + * @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object) + */ +@LuceneTestCase.SuppressCodecs("*") public class SearchSliceIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { public SearchSliceIT(Settings staticSettings) { super(staticSettings); From 1cc0d36d95cb5e7303e130edac205d1e4143c3f0 Mon Sep 17 00:00:00 2001 From: Prudhvi Godithi Date: Tue, 2 Dec 2025 21:03:58 -0800 Subject: [PATCH 11/11] Fix RetryTests Signed-off-by: Prudhvi Godithi --- .../java/org/opensearch/index/reindex/RetryTests.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/modules/reindex/src/test/java/org/opensearch/index/reindex/RetryTests.java b/modules/reindex/src/test/java/org/opensearch/index/reindex/RetryTests.java index 82371eff2fb13..6d5dfc293afdd 100644 --- a/modules/reindex/src/test/java/org/opensearch/index/reindex/RetryTests.java +++ b/modules/reindex/src/test/java/org/opensearch/index/reindex/RetryTests.java @@ -32,6 +32,7 @@ package org.opensearch.index.reindex; +import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.action.admin.cluster.node.info.NodeInfo; import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.opensearch.action.bulk.BackoffPolicy; @@ -68,7 +69,15 @@ /** * Integration test for bulk retry behavior. Useful because retrying relies on the way that the * rest of OpenSearch throws exceptions and unit tests won't verify that. + * + *

{@code @SuppressCodecs("*")} is needed because we cache StoredFieldsReader instances + * across scroll batches for sequential access. Different batches may run on different threads + * (but never concurrently). Lucene's AssertingStoredFieldsFormat enforces thread affinity + * that rejects this valid sequential cross-thread usage. + * + * @see org.opensearch.search.internal.ScrollContext#getCachedSequentialReader(Object) */ +@LuceneTestCase.SuppressCodecs("*") public class RetryTests extends OpenSearchIntegTestCase { private static final int DOC_COUNT = 20;