|
29 | 29 | import org.apache.lucene.index.IndexFileNames; |
30 | 30 | import org.apache.lucene.index.IndexFormatTooNewException; |
31 | 31 | import org.apache.lucene.index.IndexFormatTooOldException; |
| 32 | +import org.apache.lucene.index.IndexReaderContext; |
32 | 33 | import org.apache.lucene.index.IndexWriter; |
33 | 34 | import org.apache.lucene.index.IndexWriterConfig; |
34 | 35 | import org.apache.lucene.index.LeafReader; |
35 | 36 | import org.apache.lucene.index.LeafReaderContext; |
36 | 37 | import org.apache.lucene.index.NoMergePolicy; |
37 | 38 | import org.apache.lucene.index.NoMergeScheduler; |
| 39 | +import org.apache.lucene.index.ReaderUtil; |
38 | 40 | import org.apache.lucene.index.SegmentCommitInfo; |
39 | 41 | import org.apache.lucene.index.SegmentInfos; |
40 | 42 | import org.apache.lucene.index.SegmentReader; |
|
60 | 62 | import org.apache.lucene.store.IOContext; |
61 | 63 | import org.apache.lucene.store.IndexInput; |
62 | 64 | import org.apache.lucene.store.Lock; |
| 65 | +import org.apache.lucene.util.BitSet; |
63 | 66 | import org.apache.lucene.util.Bits; |
64 | 67 | import org.apache.lucene.util.BytesRef; |
65 | 68 | import org.apache.lucene.util.Version; |
| 69 | +import org.elasticsearch.ElasticsearchException; |
66 | 70 | import org.elasticsearch.ExceptionsHelper; |
67 | 71 | import org.elasticsearch.TransportVersions; |
68 | 72 | import org.elasticsearch.common.Strings; |
|
78 | 82 | import org.elasticsearch.index.analysis.NamedAnalyzer; |
79 | 83 | import org.elasticsearch.index.fielddata.IndexFieldData; |
80 | 84 | import org.elasticsearch.lucene.grouping.TopFieldGroups; |
| 85 | +import org.elasticsearch.lucene.util.BitSets; |
| 86 | +import org.elasticsearch.lucene.util.MatchAllBitSet; |
81 | 87 | import org.elasticsearch.search.sort.ShardDocSortField; |
82 | 88 |
|
83 | 89 | import java.io.IOException; |
@@ -1013,6 +1019,168 @@ public CacheHelper getReaderCacheHelper() { |
1013 | 1019 | } |
1014 | 1020 | } |
1015 | 1021 |
|
| 1022 | + public static DirectoryReader queryFilteredDirectoryReader(DirectoryReader in, Query query) throws IOException { |
| 1023 | + return new QueryFilterDirectoryReader(in, query); |
| 1024 | + } |
| 1025 | + |
| 1026 | + private static class QueryFilterDirectoryReader extends FilterDirectoryReader { |
| 1027 | + private final Query query; |
| 1028 | + |
| 1029 | + QueryFilterDirectoryReader(DirectoryReader in, Query query) throws IOException { |
| 1030 | + super(in, new SubReaderWrapper() { |
| 1031 | + @Override |
| 1032 | + public LeafReader wrap(LeafReader reader) { |
| 1033 | + return new QueryFilterLeafReader(reader, query); |
| 1034 | + } |
| 1035 | + }); |
| 1036 | + this.query = query; |
| 1037 | + } |
| 1038 | + |
| 1039 | + @Override |
| 1040 | + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { |
| 1041 | + return new QueryFilterDirectoryReader(in, query); |
| 1042 | + } |
| 1043 | + |
| 1044 | + @Override |
| 1045 | + public CacheHelper getReaderCacheHelper() { |
| 1046 | + return in.getReaderCacheHelper(); |
| 1047 | + } |
| 1048 | + } |
| 1049 | + |
| 1050 | + private static class QueryFilterLeafReader extends SequentialStoredFieldsLeafReader { |
| 1051 | + private final Query query; |
| 1052 | + |
| 1053 | + private int numDocs = -1; |
| 1054 | + private BitSet unownedDocs; |
| 1055 | + |
| 1056 | + protected QueryFilterLeafReader(LeafReader in, Query query) { |
| 1057 | + super(in); |
| 1058 | + this.query = query; |
| 1059 | + } |
| 1060 | + |
| 1061 | + /** |
| 1062 | + * Returns all documents that are not deleted and are owned by the current shard. |
| 1063 | + * We need to recalculate this every time because `in.getLiveDocs()` can change when deletes are performed. |
| 1064 | + */ |
| 1065 | + @Override |
| 1066 | + public Bits getLiveDocs() { |
| 1067 | + ensureUnownedDocumentsPresent(); |
| 1068 | + Bits actualLiveDocs = in.getLiveDocs(); |
| 1069 | + |
| 1070 | + if (unownedDocs == null) { |
| 1071 | + return actualLiveDocs; |
| 1072 | + } |
| 1073 | + |
| 1074 | + if (unownedDocs instanceof MatchAllBitSet) { |
| 1075 | + return new Bits.MatchNoBits(in.maxDoc()); |
| 1076 | + } |
| 1077 | + |
| 1078 | + var liveDocsBitsWithAllLiveCheck = actualLiveDocs == null ? new MatchAllBitSet(in.maxDoc()) : actualLiveDocs; |
| 1079 | + return new FilterBits(liveDocsBitsWithAllLiveCheck, unownedDocs); |
| 1080 | + } |
| 1081 | + |
| 1082 | + @Override |
| 1083 | + public int numDocs() { |
| 1084 | + ensureUnownedDocumentsPresent(); |
| 1085 | + return numDocs; |
| 1086 | + } |
| 1087 | + |
| 1088 | + @Override |
| 1089 | + public boolean hasDeletions() { |
| 1090 | + // It is possible that there are unowned docs which we are going to present as deletes. |
| 1091 | + return true; |
| 1092 | + } |
| 1093 | + |
| 1094 | + @Override |
| 1095 | + protected StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReader reader) { |
| 1096 | + return reader; |
| 1097 | + } |
| 1098 | + |
| 1099 | + @Override |
| 1100 | + public CacheHelper getCoreCacheHelper() { |
| 1101 | + return in.getCoreCacheHelper(); |
| 1102 | + } |
| 1103 | + |
| 1104 | + @Override |
| 1105 | + public CacheHelper getReaderCacheHelper() { |
| 1106 | + // Not delegated since we change live docs. |
| 1107 | + return null; |
| 1108 | + } |
| 1109 | + |
| 1110 | + private void ensureUnownedDocumentsPresent() { |
| 1111 | + if (numDocs == -1) { |
| 1112 | + synchronized (this) { |
| 1113 | + if (numDocs == -1) { |
| 1114 | + try { |
| 1115 | + unownedDocs = queryUnownedDocs(); |
| 1116 | + numDocs = calculateNumDocs(in, unownedDocs); |
| 1117 | + } catch (Exception e) { |
| 1118 | + throw new ElasticsearchException("Failed to execute unowned documents query", e); |
| 1119 | + } |
| 1120 | + } |
| 1121 | + } |
| 1122 | + } |
| 1123 | + } |
| 1124 | + |
| 1125 | + private BitSet queryUnownedDocs() throws IOException { |
| 1126 | + final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(in.getContext()); |
| 1127 | + |
| 1128 | + final IndexSearcher searcher = new IndexSearcher(topLevelContext); |
| 1129 | + searcher.setQueryCache(null); |
| 1130 | + |
| 1131 | + final Query rewrittenQuery = searcher.rewrite(query); |
| 1132 | + // TODO there is a possible optimization of checking for MatchAllDocsQuery which would mean that all documents are unowned. |
| 1133 | + final Weight weight = searcher.createWeight(rewrittenQuery, ScoreMode.COMPLETE_NO_SCORES, 1f); |
| 1134 | + final Scorer s = weight.scorer(in.getContext()); |
| 1135 | + if (s == null) { |
| 1136 | + return null; |
| 1137 | + } else { |
| 1138 | + return BitSets.of(s.iterator(), in.maxDoc()); |
| 1139 | + } |
| 1140 | + } |
| 1141 | + |
| 1142 | + private static int calculateNumDocs(LeafReader reader, BitSet unownedDocs) { |
| 1143 | + final Bits liveDocs = reader.getLiveDocs(); |
| 1144 | + |
| 1145 | + // No deleted documents are present, therefore number of documents is total minus unowned. |
| 1146 | + if (liveDocs == null) { |
| 1147 | + return reader.numDocs() - unownedDocs.cardinality(); |
| 1148 | + } |
| 1149 | + |
| 1150 | + if (unownedDocs instanceof MatchAllBitSet) { |
| 1151 | + return 0; |
| 1152 | + } |
| 1153 | + |
| 1154 | + int numDocs = 0; |
| 1155 | + for (int i = 0; i < liveDocs.length(); i++) { |
| 1156 | + if (liveDocs.get(i) && unownedDocs.get(i) == false) { |
| 1157 | + numDocs++; |
| 1158 | + } |
| 1159 | + } |
| 1160 | + return numDocs; |
| 1161 | + } |
| 1162 | + } |
| 1163 | + |
| 1164 | + static class FilterBits implements Bits { |
| 1165 | + private final Bits original; |
| 1166 | + private final Bits filteredOut; |
| 1167 | + |
| 1168 | + FilterBits(Bits original, BitSet filteredOut) { |
| 1169 | + this.original = original; |
| 1170 | + this.filteredOut = filteredOut; |
| 1171 | + } |
| 1172 | + |
| 1173 | + @Override |
| 1174 | + public boolean get(int index) { |
| 1175 | + return original.get(index) && (filteredOut.get(index) == false); |
| 1176 | + } |
| 1177 | + |
| 1178 | + @Override |
| 1179 | + public int length() { |
| 1180 | + return original.length(); |
| 1181 | + } |
| 1182 | + } |
| 1183 | + |
1016 | 1184 | private static int popCount(Bits bits) { |
1017 | 1185 | assert bits != null; |
1018 | 1186 | int onBits = 0; |
|
0 commit comments