From 2891b3fe1c5a35c9f42fcd428a31962d22a3b794 Mon Sep 17 00:00:00 2001 From: Brendan Cully Date: Mon, 22 Sep 2025 11:14:34 -0700 Subject: [PATCH 1/2] Add a hook to filter out documents by query to IndexModule This is a more limited version of the existing ReaderWrapper that only accepts a query of documents to filter from the reader. It is meant to support temporarily filtering out documents that no longer belong to a shard after it has been split, until the shard is cleaned. The decision about whether to install the filter is made when the index module is created. To reduce overhead in the common case, the query-supplying function may return null in which case the underlying reader is returned directly. Co-authored-by: Oleksandr Kolomiiets --- .../elasticsearch/common/lucene/Lucene.java | 168 ++++++++++++++++++ .../org/elasticsearch/index/IndexModule.java | 49 ++++- .../common/lucene/LuceneTests.java | 26 +++ .../elasticsearch/index/IndexModuleTests.java | 129 ++++++++++++++ 4 files changed, 369 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index b963ff4f649b6..51422140e4b06 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -29,12 +29,14 @@ import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.index.IndexReaderContext; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.NoMergeScheduler; +import org.apache.lucene.index.ReaderUtil; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentReader; @@ -60,9 +62,11 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.Lock; +import org.apache.lucene.util.BitSet; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Version; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.TransportVersions; import org.elasticsearch.common.Strings; @@ -78,6 +82,8 @@ import org.elasticsearch.index.analysis.NamedAnalyzer; import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.lucene.grouping.TopFieldGroups; +import org.elasticsearch.lucene.util.BitSets; +import org.elasticsearch.lucene.util.MatchAllBitSet; import org.elasticsearch.search.sort.ShardDocSortField; import java.io.IOException; @@ -1013,6 +1019,168 @@ public CacheHelper getReaderCacheHelper() { } } + public static DirectoryReader queryFilteredDirectoryReader(DirectoryReader in, Query query) throws IOException { + return new QueryFilterDirectoryReader(in, query); + } + + private static class QueryFilterDirectoryReader extends FilterDirectoryReader { + private final Query query; + + QueryFilterDirectoryReader(DirectoryReader in, Query query) throws IOException { + super(in, new SubReaderWrapper() { + @Override + public LeafReader wrap(LeafReader reader) { + return new QueryFilterLeafReader(reader, query); + } + }); + this.query = query; + } + + @Override + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { + return new QueryFilterDirectoryReader(in, query); + } + + @Override + public CacheHelper getReaderCacheHelper() { + return in.getReaderCacheHelper(); + } + } + + private static class QueryFilterLeafReader extends SequentialStoredFieldsLeafReader { + private final Query query; + + private int numDocs = -1; + private BitSet filteredDocs; + + protected QueryFilterLeafReader(LeafReader in, Query query) { + super(in); + this.query = query; + } + + /** + * Returns all documents that are not deleted and are owned by the current shard. + * We need to recalculate this every time because `in.getLiveDocs()` can change when deletes are performed. + */ + @Override + public Bits getLiveDocs() { + ensureFilteredDocumentsPresent(); + Bits actualLiveDocs = in.getLiveDocs(); + + if (filteredDocs == null) { + return actualLiveDocs; + } + + if (filteredDocs instanceof MatchAllBitSet) { + return new Bits.MatchNoBits(in.maxDoc()); + } + + var liveDocsBitsWithAllLiveCheck = actualLiveDocs == null ? new MatchAllBitSet(in.maxDoc()) : actualLiveDocs; + return new FilterBits(liveDocsBitsWithAllLiveCheck, filteredDocs); + } + + @Override + public int numDocs() { + ensureFilteredDocumentsPresent(); + return numDocs; + } + + @Override + public boolean hasDeletions() { + // It is possible that there are unowned docs which we are going to present as deletes. + return true; + } + + @Override + protected StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReader reader) { + return reader; + } + + @Override + public CacheHelper getCoreCacheHelper() { + return in.getCoreCacheHelper(); + } + + @Override + public CacheHelper getReaderCacheHelper() { + // Not delegated since we change live docs. + return null; + } + + private void ensureFilteredDocumentsPresent() { + if (numDocs == -1) { + synchronized (this) { + if (numDocs == -1) { + try { + filteredDocs = queryFilteredDocs(); + numDocs = calculateNumDocs(in, filteredDocs); + } catch (Exception e) { + throw new ElasticsearchException("Failed to execute filtered documents query", e); + } + } + } + } + } + + private BitSet queryFilteredDocs() throws IOException { + final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(in.getContext()); + + final IndexSearcher searcher = new IndexSearcher(topLevelContext); + searcher.setQueryCache(null); + + final Query rewrittenQuery = searcher.rewrite(query); + // TODO there is a possible optimization of checking for MatchAllDocsQuery which would mean that all documents are unowned. + final Weight weight = searcher.createWeight(rewrittenQuery, ScoreMode.COMPLETE_NO_SCORES, 1f); + final Scorer s = weight.scorer(in.getContext()); + if (s == null) { + return null; + } else { + return BitSets.of(s.iterator(), in.maxDoc()); + } + } + + private static int calculateNumDocs(LeafReader reader, BitSet unownedDocs) { + final Bits liveDocs = reader.getLiveDocs(); + + // No deleted documents are present, therefore number of documents is total minus unowned. + if (liveDocs == null) { + return reader.numDocs() - unownedDocs.cardinality(); + } + + if (unownedDocs instanceof MatchAllBitSet) { + return 0; + } + + int numDocs = 0; + for (int i = 0; i < liveDocs.length(); i++) { + if (liveDocs.get(i) && unownedDocs.get(i) == false) { + numDocs++; + } + } + return numDocs; + } + } + + static class FilterBits implements Bits { + private final Bits original; + private final Bits filteredOut; + + FilterBits(Bits original, BitSet filteredOut) { + this.original = original; + this.filteredOut = filteredOut; + } + + @Override + public boolean get(int index) { + return original.get(index) && (filteredOut.get(index) == false); + } + + @Override + public int length() { + return original.length(); + } + } + private static int popCount(Bits bits) { assert bits != null; int onBits = 0; diff --git a/server/src/main/java/org/elasticsearch/index/IndexModule.java b/server/src/main/java/org/elasticsearch/index/IndexModule.java index 42ab9ae362509..b6d978a464bb0 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexModule.java +++ b/server/src/main/java/org/elasticsearch/index/IndexModule.java @@ -15,6 +15,7 @@ import org.apache.lucene.index.FilterDirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReader; +import org.apache.lucene.search.Query; import org.apache.lucene.search.similarities.BM25Similarity; import org.apache.lucene.search.similarities.Similarity; import org.apache.lucene.store.Directory; @@ -26,6 +27,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.TriFunction; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; @@ -168,6 +170,9 @@ public interface DirectoryWrapper { private final SetOnce indexDirectoryWrapper = new SetOnce<>(); private final SetOnce>> indexReaderWrapper = new SetOnce<>(); + // a limited version of the reader wrapper that only returns a query that can filter out documents + private final SetOnce>> indexReaderFilter = + new SetOnce<>(); private final Set indexEventListeners = new HashSet<>(); private final Map> similarities = new HashMap<>(); private final Map directoryFactories; @@ -391,6 +396,29 @@ public void setReaderWrapper( this.indexReaderWrapper.set(indexReaderWrapperFactory); } + /** + * A limited version of {@link IndexModule#setReaderWrapper} that can only filter out documents matching a query. + *

+ * The returned query may be null in which case no documents are filtered. + * @param indexReaderFilterFactory the factory for creating per-DirectoryReader @link Query} instances + */ + public void setReaderFilter(Function> indexReaderFilterFactory) { + ensureNotFrozen(); + // convert the query filter into a reader wrapper + Function> readerWrapperFactory = (indexService) -> { + var queryFactory = indexReaderFilterFactory.apply(indexService); + return (reader) -> { + Query query = queryFactory.apply(reader); + if (query == null) { + return reader; + } + return Lucene.queryFilteredDirectoryReader(reader, query); + }; + }; + + this.indexReaderFilter.set(readerWrapperFactory); + } + /** * Sets a {@link Directory} wrapping method that allows to apply a function to the Lucene directory instance * created by {@link org.elasticsearch.plugins.IndexStorePlugin.DirectoryFactory}. @@ -499,8 +527,23 @@ public IndexService newIndexService( QueryRewriteInterceptor queryRewriteInterceptor ) throws IOException { final IndexEventListener eventListener = freeze(); - Function> readerWrapperFactory = indexReaderWrapper - .get() == null ? (shard) -> null : indexReaderWrapper.get(); + var readerFilterFactory = indexReaderFilter.get(); + var readerWrapperFactory = indexReaderWrapper.get(); + + Function> readerWrappersFactory = (indexService) -> { + if (readerFilterFactory == null && readerWrapperFactory == null) { + return null; + } else if (readerFilterFactory == null) { + return readerWrapperFactory.apply(indexService); + } else if (readerWrapperFactory == null) { + return readerFilterFactory.apply(indexService); + } else { + var readerFilter = readerFilterFactory.apply(indexService); + var readerWrapper = readerWrapperFactory.apply(indexService); + return (reader) -> readerWrapper.apply(readerFilter.apply(reader)); + } + }; + eventListener.beforeIndexCreated(indexSettings.getIndex(), indexSettings.getSettings()); final IndexStorePlugin.DirectoryFactory directoryFactory = getDirectoryFactory(indexSettings, directoryFactories); final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory = getRecoveryStateFactory(indexSettings, recoveryStateFactories); @@ -545,7 +588,7 @@ public IndexService newIndexService( queryCache, directoryFactory, eventListener, - readerWrapperFactory, + readerWrappersFactory, mapperRegistry, indicesFieldDataCache, searchOperationListeners, diff --git a/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java b/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java index f6adb779055b0..2478b3dd52648 100644 --- a/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java +++ b/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java @@ -550,6 +550,32 @@ public void testWrapLiveDocsNotExposeAbortedDocuments() throws Exception { IOUtils.close(writer, dir); } + public void testQueryFilteredDirectoryReader() throws IOException { + try (Directory dir = newDirectory()) { + try (RandomIndexWriter w = new RandomIndexWriter(random(), dir)) { + + Document doc = new Document(); + doc.add(new StringField("foo", "bar", Store.YES)); + w.addDocument(doc); + + doc = new Document(); + doc.add(new StringField("foo", "baz", Store.YES)); + w.addDocument(doc); + + try (DirectoryReader reader = w.getReader()) { + assertThat(reader.numDocs(), equalTo(2)); + DirectoryReader filtered = Lucene.queryFilteredDirectoryReader(reader, new TermQuery(new Term("foo", "bar"))); + assertThat(filtered.numDocs(), equalTo(1)); + IndexSearcher searcher = newSearcher(filtered); + TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE); + StoredFields storedFields = filtered.storedFields(); + assertThat(topDocs.totalHits.value(), equalTo(1L)); + assertThat(storedFields.document(topDocs.scoreDocs[0].doc).get("foo"), equalTo("baz")); + } + } + } + } + public void testSortFieldSerialization() throws IOException { Tuple sortFieldTuple = randomSortField(); SortField deserialized = copyInstance( diff --git a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java index 088c748bde5f6..55955f9ba770e 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -12,8 +12,12 @@ import org.apache.lucene.analysis.standard.StandardTokenizer; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.FieldInvertState; +import org.apache.lucene.index.FilterDirectoryReader; import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.LeafReader; import org.apache.lucene.search.CollectionStatistics; +import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.TermStatistics; import org.apache.lucene.search.Weight; @@ -356,6 +360,84 @@ public void testDirectoryWrapper() throws IOException { closeIndexService(indexService); } + public void testDirectoryFilterIsBound() throws IOException { + final MockEngineFactory engineFactory = new MockEngineFactory(AssertingDirectoryReader.class); + IndexModule module = new IndexModule( + indexSettings, + emptyAnalysisRegistry, + engineFactory, + Collections.emptyMap(), + () -> true, + indexNameExpressionResolver, + Collections.emptyMap(), + mock(SlowLogFieldProvider.class), + MapperMetrics.NOOP, + emptyList(), + new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()), + new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()), + MergeMetrics.NOOP + ); + + // also test that if a wrapper is set, both are applied and the wrapper is outermost + final boolean alsoWrap = randomBoolean(); + // check that we handle returning a null filter (the common case) + final var doFilter = randomBoolean(); + + class Wrapper extends FilterDirectoryReader { + final DirectoryReader wrappedReader; + + Wrapper(DirectoryReader in) throws IOException { + super(in, new SubReaderWrapper() { + @Override + public LeafReader wrap(LeafReader reader) { + return reader; + } + }); + this.wrappedReader = in; + } + + @Override + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { + return null; + } + + @Override + public CacheHelper getReaderCacheHelper() { + return null; + } + } + + final var filterApplied = new AtomicBoolean(false); + module.setReaderFilter(s -> { + if (doFilter == false) { + return r -> null; + } else { + filterApplied.set(true); + return r -> new MatchAllDocsQuery(); + } + }); + if (alsoWrap) { + module.setReaderWrapper(s -> Wrapper::new); + } + + IndexService indexService = newIndexService(module); + var wrapper = indexService.getReaderWrapper(); + + try (var directory = newDirectory(); var reader = new DummyDirectoryReader(directory)) { + var wrapped = wrapper.apply(reader); + assertThat(filterApplied.get(), is(doFilter)); + if (alsoWrap) { + assertTrue(wrapped instanceof Wrapper); + wrapped = ((Wrapper) wrapped).wrappedReader; + } + if (false == doFilter) { + assertThat(wrapped, sameInstance(reader)); + } + } + + closeIndexService(indexService); + } + public void testOtherServiceBound() throws IOException { final AtomicBoolean atomicBoolean = new AtomicBoolean(false); final IndexEventListener eventListener = new IndexEventListener() { @@ -489,6 +571,7 @@ public void testFrozen() { assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.addIndexEventListener(null)).getMessage()); assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.addIndexOperationListener(null)).getMessage()); assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.addSimilarity(null, null)).getMessage()); + assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.setReaderFilter(null)).getMessage()); assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.setReaderWrapper(null)).getMessage()); assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.forceQueryCacheProvider(null)).getMessage()); assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.setDirectoryWrapper(null)).getMessage()); @@ -894,4 +977,50 @@ protected WrappedDirectory(Directory in, ShardRouting shardRouting) { this.shardRouting = shardRouting; } } + + private static class DummyDirectoryReader extends DirectoryReader { + DummyDirectoryReader(Directory directory) throws IOException { + super(directory, new LeafReader[0], null); + } + + @Override + protected DirectoryReader doOpenIfChanged() throws IOException { + return null; + } + + @Override + protected DirectoryReader doOpenIfChanged(IndexCommit commit) throws IOException { + return null; + } + + @Override + protected DirectoryReader doOpenIfChanged(IndexWriter writer, boolean applyAllDeletes) throws IOException { + return null; + } + + @Override + public long getVersion() { + return 0; + } + + @Override + public boolean isCurrent() throws IOException { + return false; + } + + @Override + public IndexCommit getIndexCommit() throws IOException { + return null; + } + + @Override + protected void doClose() throws IOException { + + } + + @Override + public CacheHelper getReaderCacheHelper() { + return null; + } + } } From a6336c8d1023b9fb475fe2388844da424f567bdf Mon Sep 17 00:00:00 2001 From: Brendan Cully Date: Wed, 1 Oct 2025 18:00:43 -0700 Subject: [PATCH 2/2] Alternative approach to search filters Adds a protected hook in Engine to allow wrapping the DirectoryReader supplied to a Searcher. --- .../elasticsearch/common/lucene/Lucene.java | 168 ------------------ .../org/elasticsearch/index/IndexModule.java | 49 +---- .../elasticsearch/index/engine/Engine.java | 8 +- .../common/lucene/LuceneTests.java | 26 --- .../elasticsearch/index/IndexModuleTests.java | 129 -------------- 5 files changed, 10 insertions(+), 370 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 51422140e4b06..b963ff4f649b6 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -29,14 +29,12 @@ import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; -import org.apache.lucene.index.IndexReaderContext; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.NoMergeScheduler; -import org.apache.lucene.index.ReaderUtil; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentReader; @@ -62,11 +60,9 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.Lock; -import org.apache.lucene.util.BitSet; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Version; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.TransportVersions; import org.elasticsearch.common.Strings; @@ -82,8 +78,6 @@ import org.elasticsearch.index.analysis.NamedAnalyzer; import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.lucene.grouping.TopFieldGroups; -import org.elasticsearch.lucene.util.BitSets; -import org.elasticsearch.lucene.util.MatchAllBitSet; import org.elasticsearch.search.sort.ShardDocSortField; import java.io.IOException; @@ -1019,168 +1013,6 @@ public CacheHelper getReaderCacheHelper() { } } - public static DirectoryReader queryFilteredDirectoryReader(DirectoryReader in, Query query) throws IOException { - return new QueryFilterDirectoryReader(in, query); - } - - private static class QueryFilterDirectoryReader extends FilterDirectoryReader { - private final Query query; - - QueryFilterDirectoryReader(DirectoryReader in, Query query) throws IOException { - super(in, new SubReaderWrapper() { - @Override - public LeafReader wrap(LeafReader reader) { - return new QueryFilterLeafReader(reader, query); - } - }); - this.query = query; - } - - @Override - protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { - return new QueryFilterDirectoryReader(in, query); - } - - @Override - public CacheHelper getReaderCacheHelper() { - return in.getReaderCacheHelper(); - } - } - - private static class QueryFilterLeafReader extends SequentialStoredFieldsLeafReader { - private final Query query; - - private int numDocs = -1; - private BitSet filteredDocs; - - protected QueryFilterLeafReader(LeafReader in, Query query) { - super(in); - this.query = query; - } - - /** - * Returns all documents that are not deleted and are owned by the current shard. - * We need to recalculate this every time because `in.getLiveDocs()` can change when deletes are performed. - */ - @Override - public Bits getLiveDocs() { - ensureFilteredDocumentsPresent(); - Bits actualLiveDocs = in.getLiveDocs(); - - if (filteredDocs == null) { - return actualLiveDocs; - } - - if (filteredDocs instanceof MatchAllBitSet) { - return new Bits.MatchNoBits(in.maxDoc()); - } - - var liveDocsBitsWithAllLiveCheck = actualLiveDocs == null ? new MatchAllBitSet(in.maxDoc()) : actualLiveDocs; - return new FilterBits(liveDocsBitsWithAllLiveCheck, filteredDocs); - } - - @Override - public int numDocs() { - ensureFilteredDocumentsPresent(); - return numDocs; - } - - @Override - public boolean hasDeletions() { - // It is possible that there are unowned docs which we are going to present as deletes. - return true; - } - - @Override - protected StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReader reader) { - return reader; - } - - @Override - public CacheHelper getCoreCacheHelper() { - return in.getCoreCacheHelper(); - } - - @Override - public CacheHelper getReaderCacheHelper() { - // Not delegated since we change live docs. - return null; - } - - private void ensureFilteredDocumentsPresent() { - if (numDocs == -1) { - synchronized (this) { - if (numDocs == -1) { - try { - filteredDocs = queryFilteredDocs(); - numDocs = calculateNumDocs(in, filteredDocs); - } catch (Exception e) { - throw new ElasticsearchException("Failed to execute filtered documents query", e); - } - } - } - } - } - - private BitSet queryFilteredDocs() throws IOException { - final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(in.getContext()); - - final IndexSearcher searcher = new IndexSearcher(topLevelContext); - searcher.setQueryCache(null); - - final Query rewrittenQuery = searcher.rewrite(query); - // TODO there is a possible optimization of checking for MatchAllDocsQuery which would mean that all documents are unowned. - final Weight weight = searcher.createWeight(rewrittenQuery, ScoreMode.COMPLETE_NO_SCORES, 1f); - final Scorer s = weight.scorer(in.getContext()); - if (s == null) { - return null; - } else { - return BitSets.of(s.iterator(), in.maxDoc()); - } - } - - private static int calculateNumDocs(LeafReader reader, BitSet unownedDocs) { - final Bits liveDocs = reader.getLiveDocs(); - - // No deleted documents are present, therefore number of documents is total minus unowned. - if (liveDocs == null) { - return reader.numDocs() - unownedDocs.cardinality(); - } - - if (unownedDocs instanceof MatchAllBitSet) { - return 0; - } - - int numDocs = 0; - for (int i = 0; i < liveDocs.length(); i++) { - if (liveDocs.get(i) && unownedDocs.get(i) == false) { - numDocs++; - } - } - return numDocs; - } - } - - static class FilterBits implements Bits { - private final Bits original; - private final Bits filteredOut; - - FilterBits(Bits original, BitSet filteredOut) { - this.original = original; - this.filteredOut = filteredOut; - } - - @Override - public boolean get(int index) { - return original.get(index) && (filteredOut.get(index) == false); - } - - @Override - public int length() { - return original.length(); - } - } - private static int popCount(Bits bits) { assert bits != null; int onBits = 0; diff --git a/server/src/main/java/org/elasticsearch/index/IndexModule.java b/server/src/main/java/org/elasticsearch/index/IndexModule.java index b6d978a464bb0..42ab9ae362509 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexModule.java +++ b/server/src/main/java/org/elasticsearch/index/IndexModule.java @@ -15,7 +15,6 @@ import org.apache.lucene.index.FilterDirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReader; -import org.apache.lucene.search.Query; import org.apache.lucene.search.similarities.BM25Similarity; import org.apache.lucene.search.similarities.Similarity; import org.apache.lucene.store.Directory; @@ -27,7 +26,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.TriFunction; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; @@ -170,9 +168,6 @@ public interface DirectoryWrapper { private final SetOnce indexDirectoryWrapper = new SetOnce<>(); private final SetOnce>> indexReaderWrapper = new SetOnce<>(); - // a limited version of the reader wrapper that only returns a query that can filter out documents - private final SetOnce>> indexReaderFilter = - new SetOnce<>(); private final Set indexEventListeners = new HashSet<>(); private final Map> similarities = new HashMap<>(); private final Map directoryFactories; @@ -396,29 +391,6 @@ public void setReaderWrapper( this.indexReaderWrapper.set(indexReaderWrapperFactory); } - /** - * A limited version of {@link IndexModule#setReaderWrapper} that can only filter out documents matching a query. - *

- * The returned query may be null in which case no documents are filtered. - * @param indexReaderFilterFactory the factory for creating per-DirectoryReader @link Query} instances - */ - public void setReaderFilter(Function> indexReaderFilterFactory) { - ensureNotFrozen(); - // convert the query filter into a reader wrapper - Function> readerWrapperFactory = (indexService) -> { - var queryFactory = indexReaderFilterFactory.apply(indexService); - return (reader) -> { - Query query = queryFactory.apply(reader); - if (query == null) { - return reader; - } - return Lucene.queryFilteredDirectoryReader(reader, query); - }; - }; - - this.indexReaderFilter.set(readerWrapperFactory); - } - /** * Sets a {@link Directory} wrapping method that allows to apply a function to the Lucene directory instance * created by {@link org.elasticsearch.plugins.IndexStorePlugin.DirectoryFactory}. @@ -527,23 +499,8 @@ public IndexService newIndexService( QueryRewriteInterceptor queryRewriteInterceptor ) throws IOException { final IndexEventListener eventListener = freeze(); - var readerFilterFactory = indexReaderFilter.get(); - var readerWrapperFactory = indexReaderWrapper.get(); - - Function> readerWrappersFactory = (indexService) -> { - if (readerFilterFactory == null && readerWrapperFactory == null) { - return null; - } else if (readerFilterFactory == null) { - return readerWrapperFactory.apply(indexService); - } else if (readerWrapperFactory == null) { - return readerFilterFactory.apply(indexService); - } else { - var readerFilter = readerFilterFactory.apply(indexService); - var readerWrapper = readerWrapperFactory.apply(indexService); - return (reader) -> readerWrapper.apply(readerFilter.apply(reader)); - } - }; - + Function> readerWrapperFactory = indexReaderWrapper + .get() == null ? (shard) -> null : indexReaderWrapper.get(); eventListener.beforeIndexCreated(indexSettings.getIndex(), indexSettings.getSettings()); final IndexStorePlugin.DirectoryFactory directoryFactory = getDirectoryFactory(indexSettings, directoryFactories); final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory = getRecoveryStateFactory(indexSettings, recoveryStateFactories); @@ -588,7 +545,7 @@ public IndexService newIndexService( queryCache, directoryFactory, eventListener, - readerWrappersFactory, + readerWrapperFactory, mapperRegistry, indicesFieldDataCache, searchOperationListeners, diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 5a1c49b54b7ac..eaa24ab6c0e89 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -998,6 +998,11 @@ public final SearcherSupplier acquireSearcherSupplier(Function wra try { ReferenceManager referenceManager = getReferenceManager(scope); ElasticsearchDirectoryReader acquire = referenceManager.acquire(); + DirectoryReader wrappedDirectoryReader = wrapDirectoryReader(acquire); SearcherSupplier reader = new SearcherSupplier(wrapper) { @Override public Searcher acquireSearcherInternal(String source) { @@ -1019,7 +1025,7 @@ public Searcher acquireSearcherInternal(String source) { onSearcherCreation(source, scope); return new Searcher( source, - acquire, + wrappedDirectoryReader, engineConfig.getSimilarity(), engineConfig.getQueryCache(), engineConfig.getQueryCachingPolicy(), diff --git a/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java b/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java index 2478b3dd52648..f6adb779055b0 100644 --- a/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java +++ b/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java @@ -550,32 +550,6 @@ public void testWrapLiveDocsNotExposeAbortedDocuments() throws Exception { IOUtils.close(writer, dir); } - public void testQueryFilteredDirectoryReader() throws IOException { - try (Directory dir = newDirectory()) { - try (RandomIndexWriter w = new RandomIndexWriter(random(), dir)) { - - Document doc = new Document(); - doc.add(new StringField("foo", "bar", Store.YES)); - w.addDocument(doc); - - doc = new Document(); - doc.add(new StringField("foo", "baz", Store.YES)); - w.addDocument(doc); - - try (DirectoryReader reader = w.getReader()) { - assertThat(reader.numDocs(), equalTo(2)); - DirectoryReader filtered = Lucene.queryFilteredDirectoryReader(reader, new TermQuery(new Term("foo", "bar"))); - assertThat(filtered.numDocs(), equalTo(1)); - IndexSearcher searcher = newSearcher(filtered); - TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE); - StoredFields storedFields = filtered.storedFields(); - assertThat(topDocs.totalHits.value(), equalTo(1L)); - assertThat(storedFields.document(topDocs.scoreDocs[0].doc).get("foo"), equalTo("baz")); - } - } - } - } - public void testSortFieldSerialization() throws IOException { Tuple sortFieldTuple = randomSortField(); SortField deserialized = copyInstance( diff --git a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java index 55955f9ba770e..088c748bde5f6 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -12,12 +12,8 @@ import org.apache.lucene.analysis.standard.StandardTokenizer; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.FieldInvertState; -import org.apache.lucene.index.FilterDirectoryReader; import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.LeafReader; import org.apache.lucene.search.CollectionStatistics; -import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.TermStatistics; import org.apache.lucene.search.Weight; @@ -360,84 +356,6 @@ public void testDirectoryWrapper() throws IOException { closeIndexService(indexService); } - public void testDirectoryFilterIsBound() throws IOException { - final MockEngineFactory engineFactory = new MockEngineFactory(AssertingDirectoryReader.class); - IndexModule module = new IndexModule( - indexSettings, - emptyAnalysisRegistry, - engineFactory, - Collections.emptyMap(), - () -> true, - indexNameExpressionResolver, - Collections.emptyMap(), - mock(SlowLogFieldProvider.class), - MapperMetrics.NOOP, - emptyList(), - new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()), - new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()), - MergeMetrics.NOOP - ); - - // also test that if a wrapper is set, both are applied and the wrapper is outermost - final boolean alsoWrap = randomBoolean(); - // check that we handle returning a null filter (the common case) - final var doFilter = randomBoolean(); - - class Wrapper extends FilterDirectoryReader { - final DirectoryReader wrappedReader; - - Wrapper(DirectoryReader in) throws IOException { - super(in, new SubReaderWrapper() { - @Override - public LeafReader wrap(LeafReader reader) { - return reader; - } - }); - this.wrappedReader = in; - } - - @Override - protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { - return null; - } - - @Override - public CacheHelper getReaderCacheHelper() { - return null; - } - } - - final var filterApplied = new AtomicBoolean(false); - module.setReaderFilter(s -> { - if (doFilter == false) { - return r -> null; - } else { - filterApplied.set(true); - return r -> new MatchAllDocsQuery(); - } - }); - if (alsoWrap) { - module.setReaderWrapper(s -> Wrapper::new); - } - - IndexService indexService = newIndexService(module); - var wrapper = indexService.getReaderWrapper(); - - try (var directory = newDirectory(); var reader = new DummyDirectoryReader(directory)) { - var wrapped = wrapper.apply(reader); - assertThat(filterApplied.get(), is(doFilter)); - if (alsoWrap) { - assertTrue(wrapped instanceof Wrapper); - wrapped = ((Wrapper) wrapped).wrappedReader; - } - if (false == doFilter) { - assertThat(wrapped, sameInstance(reader)); - } - } - - closeIndexService(indexService); - } - public void testOtherServiceBound() throws IOException { final AtomicBoolean atomicBoolean = new AtomicBoolean(false); final IndexEventListener eventListener = new IndexEventListener() { @@ -571,7 +489,6 @@ public void testFrozen() { assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.addIndexEventListener(null)).getMessage()); assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.addIndexOperationListener(null)).getMessage()); assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.addSimilarity(null, null)).getMessage()); - assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.setReaderFilter(null)).getMessage()); assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.setReaderWrapper(null)).getMessage()); assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.forceQueryCacheProvider(null)).getMessage()); assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.setDirectoryWrapper(null)).getMessage()); @@ -977,50 +894,4 @@ protected WrappedDirectory(Directory in, ShardRouting shardRouting) { this.shardRouting = shardRouting; } } - - private static class DummyDirectoryReader extends DirectoryReader { - DummyDirectoryReader(Directory directory) throws IOException { - super(directory, new LeafReader[0], null); - } - - @Override - protected DirectoryReader doOpenIfChanged() throws IOException { - return null; - } - - @Override - protected DirectoryReader doOpenIfChanged(IndexCommit commit) throws IOException { - return null; - } - - @Override - protected DirectoryReader doOpenIfChanged(IndexWriter writer, boolean applyAllDeletes) throws IOException { - return null; - } - - @Override - public long getVersion() { - return 0; - } - - @Override - public boolean isCurrent() throws IOException { - return false; - } - - @Override - public IndexCommit getIndexCommit() throws IOException { - return null; - } - - @Override - protected void doClose() throws IOException { - - } - - @Override - public CacheHelper getReaderCacheHelper() { - return null; - } - } }