Skip to content

Commit 2891b3f

Browse files
bcullylkts
andcommitted
Add a hook to filter out documents by query to IndexModule
This is a more limited version of the existing ReaderWrapper that only accepts a query of documents to filter from the reader. It is meant to support temporarily filtering out documents that no longer belong to a shard after it has been split, until the shard is cleaned. The decision about whether to install the filter is made when the index module is created. To reduce overhead in the common case, the query-supplying function may return null in which case the underlying reader is returned directly. Co-authored-by: Oleksandr Kolomiiets <[email protected]>
1 parent cf94ea9 commit 2891b3f

File tree

4 files changed

+369
-3
lines changed

4 files changed

+369
-3
lines changed

server/src/main/java/org/elasticsearch/common/lucene/Lucene.java

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@
2929
import org.apache.lucene.index.IndexFileNames;
3030
import org.apache.lucene.index.IndexFormatTooNewException;
3131
import org.apache.lucene.index.IndexFormatTooOldException;
32+
import org.apache.lucene.index.IndexReaderContext;
3233
import org.apache.lucene.index.IndexWriter;
3334
import org.apache.lucene.index.IndexWriterConfig;
3435
import org.apache.lucene.index.LeafReader;
3536
import org.apache.lucene.index.LeafReaderContext;
3637
import org.apache.lucene.index.NoMergePolicy;
3738
import org.apache.lucene.index.NoMergeScheduler;
39+
import org.apache.lucene.index.ReaderUtil;
3840
import org.apache.lucene.index.SegmentCommitInfo;
3941
import org.apache.lucene.index.SegmentInfos;
4042
import org.apache.lucene.index.SegmentReader;
@@ -60,9 +62,11 @@
6062
import org.apache.lucene.store.IOContext;
6163
import org.apache.lucene.store.IndexInput;
6264
import org.apache.lucene.store.Lock;
65+
import org.apache.lucene.util.BitSet;
6366
import org.apache.lucene.util.Bits;
6467
import org.apache.lucene.util.BytesRef;
6568
import org.apache.lucene.util.Version;
69+
import org.elasticsearch.ElasticsearchException;
6670
import org.elasticsearch.ExceptionsHelper;
6771
import org.elasticsearch.TransportVersions;
6872
import org.elasticsearch.common.Strings;
@@ -78,6 +82,8 @@
7882
import org.elasticsearch.index.analysis.NamedAnalyzer;
7983
import org.elasticsearch.index.fielddata.IndexFieldData;
8084
import org.elasticsearch.lucene.grouping.TopFieldGroups;
85+
import org.elasticsearch.lucene.util.BitSets;
86+
import org.elasticsearch.lucene.util.MatchAllBitSet;
8187
import org.elasticsearch.search.sort.ShardDocSortField;
8288

8389
import java.io.IOException;
@@ -1013,6 +1019,168 @@ public CacheHelper getReaderCacheHelper() {
10131019
}
10141020
}
10151021

1022+
public static DirectoryReader queryFilteredDirectoryReader(DirectoryReader in, Query query) throws IOException {
1023+
return new QueryFilterDirectoryReader(in, query);
1024+
}
1025+
1026+
private static class QueryFilterDirectoryReader extends FilterDirectoryReader {
1027+
private final Query query;
1028+
1029+
QueryFilterDirectoryReader(DirectoryReader in, Query query) throws IOException {
1030+
super(in, new SubReaderWrapper() {
1031+
@Override
1032+
public LeafReader wrap(LeafReader reader) {
1033+
return new QueryFilterLeafReader(reader, query);
1034+
}
1035+
});
1036+
this.query = query;
1037+
}
1038+
1039+
@Override
1040+
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
1041+
return new QueryFilterDirectoryReader(in, query);
1042+
}
1043+
1044+
@Override
1045+
public CacheHelper getReaderCacheHelper() {
1046+
return in.getReaderCacheHelper();
1047+
}
1048+
}
1049+
1050+
private static class QueryFilterLeafReader extends SequentialStoredFieldsLeafReader {
1051+
private final Query query;
1052+
1053+
private int numDocs = -1;
1054+
private BitSet filteredDocs;
1055+
1056+
protected QueryFilterLeafReader(LeafReader in, Query query) {
1057+
super(in);
1058+
this.query = query;
1059+
}
1060+
1061+
/**
1062+
* Returns all documents that are not deleted and are owned by the current shard.
1063+
* We need to recalculate this every time because `in.getLiveDocs()` can change when deletes are performed.
1064+
*/
1065+
@Override
1066+
public Bits getLiveDocs() {
1067+
ensureFilteredDocumentsPresent();
1068+
Bits actualLiveDocs = in.getLiveDocs();
1069+
1070+
if (filteredDocs == null) {
1071+
return actualLiveDocs;
1072+
}
1073+
1074+
if (filteredDocs instanceof MatchAllBitSet) {
1075+
return new Bits.MatchNoBits(in.maxDoc());
1076+
}
1077+
1078+
var liveDocsBitsWithAllLiveCheck = actualLiveDocs == null ? new MatchAllBitSet(in.maxDoc()) : actualLiveDocs;
1079+
return new FilterBits(liveDocsBitsWithAllLiveCheck, filteredDocs);
1080+
}
1081+
1082+
@Override
1083+
public int numDocs() {
1084+
ensureFilteredDocumentsPresent();
1085+
return numDocs;
1086+
}
1087+
1088+
@Override
1089+
public boolean hasDeletions() {
1090+
// It is possible that there are unowned docs which we are going to present as deletes.
1091+
return true;
1092+
}
1093+
1094+
@Override
1095+
protected StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReader reader) {
1096+
return reader;
1097+
}
1098+
1099+
@Override
1100+
public CacheHelper getCoreCacheHelper() {
1101+
return in.getCoreCacheHelper();
1102+
}
1103+
1104+
@Override
1105+
public CacheHelper getReaderCacheHelper() {
1106+
// Not delegated since we change live docs.
1107+
return null;
1108+
}
1109+
1110+
private void ensureFilteredDocumentsPresent() {
1111+
if (numDocs == -1) {
1112+
synchronized (this) {
1113+
if (numDocs == -1) {
1114+
try {
1115+
filteredDocs = queryFilteredDocs();
1116+
numDocs = calculateNumDocs(in, filteredDocs);
1117+
} catch (Exception e) {
1118+
throw new ElasticsearchException("Failed to execute filtered documents query", e);
1119+
}
1120+
}
1121+
}
1122+
}
1123+
}
1124+
1125+
private BitSet queryFilteredDocs() throws IOException {
1126+
final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(in.getContext());
1127+
1128+
final IndexSearcher searcher = new IndexSearcher(topLevelContext);
1129+
searcher.setQueryCache(null);
1130+
1131+
final Query rewrittenQuery = searcher.rewrite(query);
1132+
// TODO there is a possible optimization of checking for MatchAllDocsQuery which would mean that all documents are unowned.
1133+
final Weight weight = searcher.createWeight(rewrittenQuery, ScoreMode.COMPLETE_NO_SCORES, 1f);
1134+
final Scorer s = weight.scorer(in.getContext());
1135+
if (s == null) {
1136+
return null;
1137+
} else {
1138+
return BitSets.of(s.iterator(), in.maxDoc());
1139+
}
1140+
}
1141+
1142+
private static int calculateNumDocs(LeafReader reader, BitSet unownedDocs) {
1143+
final Bits liveDocs = reader.getLiveDocs();
1144+
1145+
// No deleted documents are present, therefore number of documents is total minus unowned.
1146+
if (liveDocs == null) {
1147+
return reader.numDocs() - unownedDocs.cardinality();
1148+
}
1149+
1150+
if (unownedDocs instanceof MatchAllBitSet) {
1151+
return 0;
1152+
}
1153+
1154+
int numDocs = 0;
1155+
for (int i = 0; i < liveDocs.length(); i++) {
1156+
if (liveDocs.get(i) && unownedDocs.get(i) == false) {
1157+
numDocs++;
1158+
}
1159+
}
1160+
return numDocs;
1161+
}
1162+
}
1163+
1164+
static class FilterBits implements Bits {
1165+
private final Bits original;
1166+
private final Bits filteredOut;
1167+
1168+
FilterBits(Bits original, BitSet filteredOut) {
1169+
this.original = original;
1170+
this.filteredOut = filteredOut;
1171+
}
1172+
1173+
@Override
1174+
public boolean get(int index) {
1175+
return original.get(index) && (filteredOut.get(index) == false);
1176+
}
1177+
1178+
@Override
1179+
public int length() {
1180+
return original.length();
1181+
}
1182+
}
1183+
10161184
private static int popCount(Bits bits) {
10171185
assert bits != null;
10181186
int onBits = 0;

server/src/main/java/org/elasticsearch/index/IndexModule.java

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.apache.lucene.index.FilterDirectoryReader;
1616
import org.apache.lucene.index.IndexReader;
1717
import org.apache.lucene.index.LeafReader;
18+
import org.apache.lucene.search.Query;
1819
import org.apache.lucene.search.similarities.BM25Similarity;
1920
import org.apache.lucene.search.similarities.Similarity;
2021
import org.apache.lucene.store.Directory;
@@ -26,6 +27,7 @@
2627
import org.elasticsearch.cluster.service.ClusterService;
2728
import org.elasticsearch.common.TriFunction;
2829
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
30+
import org.elasticsearch.common.lucene.Lucene;
2931
import org.elasticsearch.common.settings.Setting;
3032
import org.elasticsearch.common.settings.Setting.Property;
3133
import org.elasticsearch.common.settings.Settings;
@@ -168,6 +170,9 @@ public interface DirectoryWrapper {
168170
private final SetOnce<DirectoryWrapper> indexDirectoryWrapper = new SetOnce<>();
169171
private final SetOnce<Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>>> indexReaderWrapper =
170172
new SetOnce<>();
173+
// a limited version of the reader wrapper that only returns a query that can filter out documents
174+
private final SetOnce<Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>>> indexReaderFilter =
175+
new SetOnce<>();
171176
private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
172177
private final Map<String, TriFunction<Settings, IndexVersion, ScriptService, Similarity>> similarities = new HashMap<>();
173178
private final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories;
@@ -391,6 +396,29 @@ public void setReaderWrapper(
391396
this.indexReaderWrapper.set(indexReaderWrapperFactory);
392397
}
393398

399+
/**
400+
* A limited version of {@link IndexModule#setReaderWrapper} that can only filter out documents matching a query.
401+
* <p>
402+
* The returned query may be null in which case no documents are filtered.
403+
* @param indexReaderFilterFactory the factory for creating per-DirectoryReader @link Query} instances
404+
*/
405+
public void setReaderFilter(Function<IndexService, CheckedFunction<DirectoryReader, Query, IOException>> indexReaderFilterFactory) {
406+
ensureNotFrozen();
407+
// convert the query filter into a reader wrapper
408+
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = (indexService) -> {
409+
var queryFactory = indexReaderFilterFactory.apply(indexService);
410+
return (reader) -> {
411+
Query query = queryFactory.apply(reader);
412+
if (query == null) {
413+
return reader;
414+
}
415+
return Lucene.queryFilteredDirectoryReader(reader, query);
416+
};
417+
};
418+
419+
this.indexReaderFilter.set(readerWrapperFactory);
420+
}
421+
394422
/**
395423
* Sets a {@link Directory} wrapping method that allows to apply a function to the Lucene directory instance
396424
* created by {@link org.elasticsearch.plugins.IndexStorePlugin.DirectoryFactory}.
@@ -499,8 +527,23 @@ public IndexService newIndexService(
499527
QueryRewriteInterceptor queryRewriteInterceptor
500528
) throws IOException {
501529
final IndexEventListener eventListener = freeze();
502-
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
503-
.get() == null ? (shard) -> null : indexReaderWrapper.get();
530+
var readerFilterFactory = indexReaderFilter.get();
531+
var readerWrapperFactory = indexReaderWrapper.get();
532+
533+
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrappersFactory = (indexService) -> {
534+
if (readerFilterFactory == null && readerWrapperFactory == null) {
535+
return null;
536+
} else if (readerFilterFactory == null) {
537+
return readerWrapperFactory.apply(indexService);
538+
} else if (readerWrapperFactory == null) {
539+
return readerFilterFactory.apply(indexService);
540+
} else {
541+
var readerFilter = readerFilterFactory.apply(indexService);
542+
var readerWrapper = readerWrapperFactory.apply(indexService);
543+
return (reader) -> readerWrapper.apply(readerFilter.apply(reader));
544+
}
545+
};
546+
504547
eventListener.beforeIndexCreated(indexSettings.getIndex(), indexSettings.getSettings());
505548
final IndexStorePlugin.DirectoryFactory directoryFactory = getDirectoryFactory(indexSettings, directoryFactories);
506549
final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory = getRecoveryStateFactory(indexSettings, recoveryStateFactories);
@@ -545,7 +588,7 @@ public IndexService newIndexService(
545588
queryCache,
546589
directoryFactory,
547590
eventListener,
548-
readerWrapperFactory,
591+
readerWrappersFactory,
549592
mapperRegistry,
550593
indicesFieldDataCache,
551594
searchOperationListeners,

server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,32 @@ public void testWrapLiveDocsNotExposeAbortedDocuments() throws Exception {
550550
IOUtils.close(writer, dir);
551551
}
552552

553+
public void testQueryFilteredDirectoryReader() throws IOException {
554+
try (Directory dir = newDirectory()) {
555+
try (RandomIndexWriter w = new RandomIndexWriter(random(), dir)) {
556+
557+
Document doc = new Document();
558+
doc.add(new StringField("foo", "bar", Store.YES));
559+
w.addDocument(doc);
560+
561+
doc = new Document();
562+
doc.add(new StringField("foo", "baz", Store.YES));
563+
w.addDocument(doc);
564+
565+
try (DirectoryReader reader = w.getReader()) {
566+
assertThat(reader.numDocs(), equalTo(2));
567+
DirectoryReader filtered = Lucene.queryFilteredDirectoryReader(reader, new TermQuery(new Term("foo", "bar")));
568+
assertThat(filtered.numDocs(), equalTo(1));
569+
IndexSearcher searcher = newSearcher(filtered);
570+
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE);
571+
StoredFields storedFields = filtered.storedFields();
572+
assertThat(topDocs.totalHits.value(), equalTo(1L));
573+
assertThat(storedFields.document(topDocs.scoreDocs[0].doc).get("foo"), equalTo("baz"));
574+
}
575+
}
576+
}
577+
}
578+
553579
public void testSortFieldSerialization() throws IOException {
554580
Tuple<SortField, SortField> sortFieldTuple = randomSortField();
555581
SortField deserialized = copyInstance(

0 commit comments

Comments
 (0)