Skip to content

Commit 30ea0ca

Browse files
committed
Trim points for seqno when no longer internally needed.
1 parent 2310bca commit 30ea0ca

File tree

8 files changed

+284
-12
lines changed

8 files changed

+284
-12
lines changed

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBIndexingIT.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ public void testSkippingShards() throws Exception {
428428
}
429429
}
430430

431-
public void testTrimId() throws Exception {
431+
public void testTrimIdAndSeqNoPoints() throws Exception {
432432
String dataStreamName = "k8s";
433433
var putTemplateRequest = new TransportPutComposableIndexTemplateAction.Request("id");
434434
putTemplateRequest.indexTemplate(
@@ -496,6 +496,7 @@ public void testTrimId() throws Exception {
496496
new AnalyzeIndexDiskUsageRequest(new String[] { dataStreamName }, AnalyzeIndexDiskUsageRequest.DEFAULT_INDICES_OPTIONS, true)
497497
).actionGet();
498498
var map = XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(diskUsageResponse), false);
499+
logger.error("response: {}", map);
499500
assertMap(
500501
map,
501502
matchesMap().extraOk()
@@ -509,6 +510,19 @@ public void testTrimId() throws Exception {
509510
)
510511
)
511512
);
513+
assertMap(
514+
map,
515+
matchesMap().extraOk()
516+
.entry(
517+
indexName,
518+
matchesMap().extraOk()
519+
.entry(
520+
"fields",
521+
matchesMap().extraOk()
522+
.entry("_seq_no", matchesMap().extraOk().entry("points_in_bytes", greaterThanOrEqualTo(1)))
523+
)
524+
)
525+
);
512526

513527
// Check that the minimum retaining seqno has advanced, otherwise _id (and recovery source) doesn't get trimmed away.
514528
var finalIndexName = indexName;
@@ -550,6 +564,18 @@ public void testTrimId() throws Exception {
550564
)
551565
)
552566
);
567+
assertMap(
568+
map,
569+
matchesMap().extraOk()
570+
.entry(
571+
indexName,
572+
matchesMap().extraOk()
573+
.entry(
574+
"fields",
575+
matchesMap().extraOk().entry("_seq_no", matchesMap().extraOk().entry("points_in_bytes", equalTo(0)))
576+
)
577+
)
578+
);
553579

554580
// Check the search api can synthesize _id
555581
final String idxName = indexName;

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
package org.elasticsearch.index.engine;
1111

1212
import org.apache.logging.log4j.Logger;
13-
import org.apache.lucene.document.LongPoint;
1413
import org.apache.lucene.document.NumericDocValuesField;
1514
import org.apache.lucene.index.DirectoryReader;
1615
import org.apache.lucene.index.IndexCommit;
@@ -2747,12 +2746,15 @@ private IndexWriterConfig getIndexWriterConfig() {
27472746
MergePolicy mergePolicy = config().getMergePolicy();
27482747
// always configure soft-deletes field so an engine with soft-deletes disabled can open a Lucene index with soft-deletes.
27492748
iwc.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD);
2749+
var indexMode = engineConfig.getIndexSettings().getMode();
2750+
boolean pruneSeqnoPoints = indexMode == IndexMode.LOGSDB || indexMode == IndexMode.TIME_SERIES;
27502751
mergePolicy = new RecoverySourcePruneMergePolicy(
27512752
engineConfig.getIndexSettings().isRecoverySourceSyntheticEnabled() ? null : SourceFieldMapper.RECOVERY_SOURCE_NAME,
27522753
engineConfig.getIndexSettings().isRecoverySourceSyntheticEnabled()
27532754
? SourceFieldMapper.RECOVERY_SOURCE_SIZE_NAME
27542755
: SourceFieldMapper.RECOVERY_SOURCE_NAME,
2755-
engineConfig.getIndexSettings().getMode() == IndexMode.TIME_SERIES,
2756+
indexMode == IndexMode.TIME_SERIES,
2757+
pruneSeqnoPoints,
27562758
softDeletesPolicy::getRetentionQuery,
27572759
new SoftDeletesRetentionMergePolicy(
27582760
Lucene.SOFT_DELETES_FIELD,
@@ -3448,7 +3450,7 @@ private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryRead
34483450
final IndexSearcher searcher = new IndexSearcher(directoryReader);
34493451
searcher.setQueryCache(null);
34503452
final Query query = new BooleanQuery.Builder().add(
3451-
LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE),
3453+
SeqNoFieldMapper.rangeQueryForSeqNo(getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE),
34523454
BooleanClause.Occur.MUST
34533455
)
34543456
.add(Queries.newNonNestedFilter(indexVersionCreated), BooleanClause.Occur.MUST) // exclude non-root nested documents

server/src/main/java/org/elasticsearch/index/engine/RecoverySourcePruneMergePolicy.java

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.index.engine;
1111

1212
import org.apache.lucene.codecs.DocValuesProducer;
13+
import org.apache.lucene.codecs.PointsReader;
1314
import org.apache.lucene.codecs.StoredFieldsReader;
1415
import org.apache.lucene.index.CodecReader;
1516
import org.apache.lucene.index.FieldInfo;
@@ -18,6 +19,7 @@
1819
import org.apache.lucene.index.MergePolicy;
1920
import org.apache.lucene.index.NumericDocValues;
2021
import org.apache.lucene.index.OneMergeWrappingMergePolicy;
22+
import org.apache.lucene.index.PointValues;
2123
import org.apache.lucene.index.StoredFieldVisitor;
2224
import org.apache.lucene.search.ConjunctionUtils;
2325
import org.apache.lucene.search.DocIdSetIterator;
@@ -31,6 +33,7 @@
3133
import org.elasticsearch.core.Nullable;
3234
import org.elasticsearch.index.codec.FilterDocValuesProducer;
3335
import org.elasticsearch.index.mapper.IdFieldMapper;
36+
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
3437
import org.elasticsearch.search.internal.FilterStoredFieldVisitor;
3538

3639
import java.io.IOException;
@@ -42,14 +45,22 @@ final class RecoverySourcePruneMergePolicy extends OneMergeWrappingMergePolicy {
4245
@Nullable String pruneStoredFieldName,
4346
String pruneNumericDVFieldName,
4447
boolean pruneIdField,
48+
boolean pruneSeqnoPoints,
4549
Supplier<Query> retainSourceQuerySupplier,
4650
MergePolicy in
4751
) {
4852
super(in, toWrap -> new OneMerge(toWrap.segments) {
4953
@Override
5054
public CodecReader wrapForMerge(CodecReader reader) throws IOException {
5155
CodecReader wrapped = toWrap.wrapForMerge(reader);
52-
return wrapReader(pruneStoredFieldName, pruneNumericDVFieldName, pruneIdField, wrapped, retainSourceQuerySupplier);
56+
return wrapReader(
57+
pruneStoredFieldName,
58+
pruneNumericDVFieldName,
59+
pruneIdField,
60+
pruneSeqnoPoints,
61+
wrapped,
62+
retainSourceQuerySupplier
63+
);
5364
}
5465
});
5566
}
@@ -58,6 +69,7 @@ private static CodecReader wrapReader(
5869
String pruneStoredFieldName,
5970
String pruneNumericDVFieldName,
6071
boolean pruneIdField,
72+
boolean pruneSeqnoPoints,
6173
CodecReader reader,
6274
Supplier<Query> retainSourceQuerySupplier
6375
) throws IOException {
@@ -67,7 +79,8 @@ private static CodecReader wrapReader(
6779
}
6880
IndexSearcher s = new IndexSearcher(reader);
6981
s.setQueryCache(null);
70-
Weight weight = s.createWeight(s.rewrite(retainSourceQuerySupplier.get()), ScoreMode.COMPLETE_NO_SCORES, 1.0f);
82+
Query retainSourceQuery = retainSourceQuerySupplier.get();
83+
Weight weight = s.createWeight(s.rewrite(retainSourceQuery), ScoreMode.COMPLETE_NO_SCORES, 1.0f);
7184
Scorer scorer = weight.scorer(reader.getContext());
7285
if (scorer != null) {
7386
BitSet recoverySourceToKeep = BitSet.of(scorer.iterator(), reader.maxDoc());
@@ -80,11 +93,19 @@ private static CodecReader wrapReader(
8093
pruneStoredFieldName,
8194
pruneNumericDVFieldName,
8295
pruneIdField,
96+
pruneSeqnoPoints,
8397
reader,
84-
recoverySourceToKeep
98+
recoverySourceToKeep.cardinality() == 0 ? null : recoverySourceToKeep
8599
);
86100
} else {
87-
return new SourcePruningFilterCodecReader(pruneStoredFieldName, pruneNumericDVFieldName, pruneIdField, reader, null);
101+
return new SourcePruningFilterCodecReader(
102+
pruneStoredFieldName,
103+
pruneNumericDVFieldName,
104+
pruneIdField,
105+
pruneSeqnoPoints,
106+
reader,
107+
null
108+
);
88109
}
89110
}
90111

@@ -93,11 +114,13 @@ private static class SourcePruningFilterCodecReader extends FilterCodecReader {
93114
private final String pruneStoredFieldName;
94115
private final String pruneNumericDVFieldName;
95116
private final boolean pruneIdField;
117+
private final boolean pruneSeqnoPoints;
96118

97119
SourcePruningFilterCodecReader(
98120
@Nullable String pruneStoredFieldName,
99121
String pruneNumericDVFieldName,
100122
boolean pruneIdField,
123+
boolean pruneSeqnoPoints,
101124
CodecReader reader,
102125
BitSet recoverySourceToKeep
103126
) {
@@ -106,6 +129,7 @@ private static class SourcePruningFilterCodecReader extends FilterCodecReader {
106129
this.recoverySourceToKeep = recoverySourceToKeep;
107130
this.pruneNumericDVFieldName = pruneNumericDVFieldName;
108131
this.pruneIdField = pruneIdField;
132+
this.pruneSeqnoPoints = pruneSeqnoPoints;
109133
}
110134

111135
@Override
@@ -149,6 +173,35 @@ public boolean advanceExact(int target) {
149173
};
150174
}
151175

176+
@Override
177+
public PointsReader getPointsReader() {
178+
var pointsReader = super.getPointsReader();
179+
if (pruneSeqnoPoints && recoverySourceToKeep == null) {
180+
return new PointsReader() {
181+
@Override
182+
public void checkIntegrity() throws IOException {
183+
pointsReader.checkIntegrity();
184+
}
185+
186+
@Override
187+
public PointValues getValues(String field) throws IOException {
188+
if (SeqNoFieldMapper.NAME.equals(field)) {
189+
return null;
190+
} else {
191+
return pointsReader.getValues(field);
192+
}
193+
}
194+
195+
@Override
196+
public void close() throws IOException {
197+
pointsReader.close();
198+
}
199+
};
200+
} else {
201+
return pointsReader;
202+
}
203+
}
204+
152205
@Override
153206
public StoredFieldsReader getFieldsReader() {
154207
if (pruneStoredFieldName == null && pruneIdField == false) {

server/src/main/java/org/elasticsearch/index/engine/SearchBasedChangesSnapshot.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
package org.elasticsearch.index.engine;
1111

12-
import org.apache.lucene.document.LongPoint;
1312
import org.apache.lucene.index.LeafReader;
1413
import org.apache.lucene.index.LeafReaderContext;
1514
import org.apache.lucene.index.NumericDocValues;
@@ -242,7 +241,7 @@ static IndexSearcher newIndexSearcher(Engine.Searcher engineSearcher) throws IOE
242241
}
243242

244243
static Query rangeQuery(long fromSeqNo, long toSeqNo, IndexVersion indexVersionCreated) {
245-
return new BooleanQuery.Builder().add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo), BooleanClause.Occur.MUST)
244+
return new BooleanQuery.Builder().add(SeqNoFieldMapper.rangeQueryForSeqNo(fromSeqNo, toSeqNo), BooleanClause.Occur.MUST)
246245
.add(Queries.newNonNestedFilter(indexVersionCreated), BooleanClause.Occur.MUST)
247246
.build();
248247
}

server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
package org.elasticsearch.index.engine;
1111

12-
import org.apache.lucene.document.LongPoint;
1312
import org.apache.lucene.search.Query;
1413
import org.elasticsearch.core.Releasable;
1514
import org.elasticsearch.core.Releasables;
@@ -148,7 +147,7 @@ synchronized long getMinRetainedSeqNo() {
148147
* Documents including tombstones are soft-deleted and matched this query will be retained and won't cleaned up by merges.
149148
*/
150149
Query getRetentionQuery() {
151-
return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getMinRetainedSeqNo(), Long.MAX_VALUE);
150+
return SeqNoFieldMapper.rangeQueryForSeqNo(getMinRetainedSeqNo(), Long.MAX_VALUE);
152151
}
153152

154153
}

server/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,4 +245,12 @@ public void postParse(DocumentParserContext context) {
245245
protected String contentType() {
246246
return CONTENT_TYPE;
247247
}
248+
249+
/**
250+
* Create a range query that matches all documents whose seq_no is between {@code lowerValue} and {@code upperValue} included.
251+
*/
252+
public static Query rangeQueryForSeqNo(long lowerValue, long upperValue) {
253+
return new SeqnoRangeQuery(lowerValue, upperValue);
254+
}
255+
248256
}

0 commit comments

Comments
 (0)