Skip to content
168 changes: 168 additions & 0 deletions server/src/main/java/org/elasticsearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexReaderContext;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.NoMergeScheduler;
import org.apache.lucene.index.ReaderUtil;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
Expand All @@ -60,9 +62,11 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.util.BitSet;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Version;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.Strings;
Expand All @@ -78,6 +82,8 @@
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.lucene.grouping.TopFieldGroups;
import org.elasticsearch.lucene.util.BitSets;
import org.elasticsearch.lucene.util.MatchAllBitSet;
import org.elasticsearch.search.sort.ShardDocSortField;

import java.io.IOException;
Expand Down Expand Up @@ -1013,6 +1019,168 @@ public CacheHelper getReaderCacheHelper() {
}
}

public static DirectoryReader queryFilteredDirectoryReader(DirectoryReader in, Query query) throws IOException {
return new QueryFilterDirectoryReader(in, query);
}

private static class QueryFilterDirectoryReader extends FilterDirectoryReader {
private final Query query;

QueryFilterDirectoryReader(DirectoryReader in, Query query) throws IOException {
super(in, new SubReaderWrapper() {
@Override
public LeafReader wrap(LeafReader reader) {
return new QueryFilterLeafReader(reader, query);
}
});
this.query = query;
}

@Override
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
return new QueryFilterDirectoryReader(in, query);
}

@Override
public CacheHelper getReaderCacheHelper() {
return in.getReaderCacheHelper();
}
}

private static class QueryFilterLeafReader extends SequentialStoredFieldsLeafReader {
private final Query query;

private int numDocs = -1;
private BitSet filteredDocs;

protected QueryFilterLeafReader(LeafReader in, Query query) {
super(in);
this.query = query;
}

/**
* Returns all documents that are not deleted and are owned by the current shard.
* We need to recalculate this every time because `in.getLiveDocs()` can change when deletes are performed.
*/
@Override
public Bits getLiveDocs() {
ensureFilteredDocumentsPresent();
Bits actualLiveDocs = in.getLiveDocs();

if (filteredDocs == null) {
return actualLiveDocs;
}

if (filteredDocs instanceof MatchAllBitSet) {
return new Bits.MatchNoBits(in.maxDoc());
}

var liveDocsBitsWithAllLiveCheck = actualLiveDocs == null ? new MatchAllBitSet(in.maxDoc()) : actualLiveDocs;
return new FilterBits(liveDocsBitsWithAllLiveCheck, filteredDocs);
}

@Override
public int numDocs() {
ensureFilteredDocumentsPresent();
return numDocs;
}

@Override
public boolean hasDeletions() {
// It is possible that there are unowned docs which we are going to present as deletes.
return true;
}

@Override
protected StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReader reader) {
return reader;
}

@Override
public CacheHelper getCoreCacheHelper() {
return in.getCoreCacheHelper();
}

@Override
public CacheHelper getReaderCacheHelper() {
// Not delegated since we change live docs.
return null;
}

private void ensureFilteredDocumentsPresent() {
if (numDocs == -1) {
synchronized (this) {
if (numDocs == -1) {
try {
filteredDocs = queryFilteredDocs();
numDocs = calculateNumDocs(in, filteredDocs);
} catch (Exception e) {
throw new ElasticsearchException("Failed to execute filtered documents query", e);
}
}
}
}
}

private BitSet queryFilteredDocs() throws IOException {
final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(in.getContext());

final IndexSearcher searcher = new IndexSearcher(topLevelContext);
searcher.setQueryCache(null);

final Query rewrittenQuery = searcher.rewrite(query);
// TODO there is a possible optimization of checking for MatchAllDocsQuery which would mean that all documents are unowned.
final Weight weight = searcher.createWeight(rewrittenQuery, ScoreMode.COMPLETE_NO_SCORES, 1f);
final Scorer s = weight.scorer(in.getContext());
if (s == null) {
return null;
} else {
return BitSets.of(s.iterator(), in.maxDoc());
}
}

private static int calculateNumDocs(LeafReader reader, BitSet unownedDocs) {
final Bits liveDocs = reader.getLiveDocs();

// No deleted documents are present, therefore number of documents is total minus unowned.
if (liveDocs == null) {
return reader.numDocs() - unownedDocs.cardinality();
}

if (unownedDocs instanceof MatchAllBitSet) {
return 0;
}

int numDocs = 0;
for (int i = 0; i < liveDocs.length(); i++) {
if (liveDocs.get(i) && unownedDocs.get(i) == false) {
numDocs++;
}
}
return numDocs;
}
}

static class FilterBits implements Bits {
private final Bits original;
private final Bits filteredOut;

FilterBits(Bits original, BitSet filteredOut) {
this.original = original;
this.filteredOut = filteredOut;
}

@Override
public boolean get(int index) {
return original.get(index) && (filteredOut.get(index) == false);
}

@Override
public int length() {
return original.length();
}
}

private static int popCount(Bits bits) {
assert bits != null;
int onBits = 0;
Expand Down
49 changes: 46 additions & 3 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.similarities.BM25Similarity;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.Directory;
Expand All @@ -26,6 +27,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -168,6 +170,9 @@ public interface DirectoryWrapper {
private final SetOnce<DirectoryWrapper> indexDirectoryWrapper = new SetOnce<>();
private final SetOnce<Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>>> indexReaderWrapper =
new SetOnce<>();
// a limited version of the reader wrapper that only returns a query that can filter out documents
private final SetOnce<Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>>> indexReaderFilter =
new SetOnce<>();
private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
private final Map<String, TriFunction<Settings, IndexVersion, ScriptService, Similarity>> similarities = new HashMap<>();
private final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories;
Expand Down Expand Up @@ -391,6 +396,29 @@ public void setReaderWrapper(
this.indexReaderWrapper.set(indexReaderWrapperFactory);
}

/**
* A limited version of {@link IndexModule#setReaderWrapper} that can only filter out documents matching a query.
* <p>
* The returned query may be null in which case no documents are filtered.
* @param indexReaderFilterFactory the factory for creating per-DirectoryReader @link Query} instances
*/
public void setReaderFilter(Function<IndexService, CheckedFunction<DirectoryReader, Query, IOException>> indexReaderFilterFactory) {
ensureNotFrozen();
// convert the query filter into a reader wrapper
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = (indexService) -> {
var queryFactory = indexReaderFilterFactory.apply(indexService);
return (reader) -> {
Query query = queryFactory.apply(reader);
if (query == null) {
return reader;
}
return Lucene.queryFilteredDirectoryReader(reader, query);
};
};

this.indexReaderFilter.set(readerWrapperFactory);
}

/**
* Sets a {@link Directory} wrapping method that allows to apply a function to the Lucene directory instance
* created by {@link org.elasticsearch.plugins.IndexStorePlugin.DirectoryFactory}.
Expand Down Expand Up @@ -499,8 +527,23 @@ public IndexService newIndexService(
QueryRewriteInterceptor queryRewriteInterceptor
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
.get() == null ? (shard) -> null : indexReaderWrapper.get();
var readerFilterFactory = indexReaderFilter.get();
var readerWrapperFactory = indexReaderWrapper.get();

Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrappersFactory = (indexService) -> {
if (readerFilterFactory == null && readerWrapperFactory == null) {
return null;
} else if (readerFilterFactory == null) {
return readerWrapperFactory.apply(indexService);
} else if (readerWrapperFactory == null) {
return readerFilterFactory.apply(indexService);
} else {
var readerFilter = readerFilterFactory.apply(indexService);
var readerWrapper = readerWrapperFactory.apply(indexService);
return (reader) -> readerWrapper.apply(readerFilter.apply(reader));
}
};

eventListener.beforeIndexCreated(indexSettings.getIndex(), indexSettings.getSettings());
final IndexStorePlugin.DirectoryFactory directoryFactory = getDirectoryFactory(indexSettings, directoryFactories);
final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory = getRecoveryStateFactory(indexSettings, recoveryStateFactories);
Expand Down Expand Up @@ -545,7 +588,7 @@ public IndexService newIndexService(
queryCache,
directoryFactory,
eventListener,
readerWrapperFactory,
readerWrappersFactory,
mapperRegistry,
indicesFieldDataCache,
searchOperationListeners,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,32 @@ public void testWrapLiveDocsNotExposeAbortedDocuments() throws Exception {
IOUtils.close(writer, dir);
}

public void testQueryFilteredDirectoryReader() throws IOException {
try (Directory dir = newDirectory()) {
try (RandomIndexWriter w = new RandomIndexWriter(random(), dir)) {

Document doc = new Document();
doc.add(new StringField("foo", "bar", Store.YES));
w.addDocument(doc);

doc = new Document();
doc.add(new StringField("foo", "baz", Store.YES));
w.addDocument(doc);

try (DirectoryReader reader = w.getReader()) {
assertThat(reader.numDocs(), equalTo(2));
DirectoryReader filtered = Lucene.queryFilteredDirectoryReader(reader, new TermQuery(new Term("foo", "bar")));
assertThat(filtered.numDocs(), equalTo(1));
IndexSearcher searcher = newSearcher(filtered);
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE);
StoredFields storedFields = filtered.storedFields();
assertThat(topDocs.totalHits.value(), equalTo(1L));
assertThat(storedFields.document(topDocs.scoreDocs[0].doc).get("foo"), equalTo("baz"));
}
}
}
}

public void testSortFieldSerialization() throws IOException {
Tuple<SortField, SortField> sortFieldTuple = randomSortField();
SortField deserialized = copyInstance(
Expand Down
Loading