Skip to content

Commit e760584

Browse files
committed
Merge branch 'main' of github.com:elastic/elasticsearch into esql_bf_kibana_null_examples
2 parents 36ebc18 + c0facac commit e760584

File tree

17 files changed

+259
-96
lines changed

17 files changed

+259
-96
lines changed

server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
8989
config.isPromotableToPrimary(),
9090
config.getMapperService(),
9191
config.getEngineResetLock(),
92-
config.getMergeMetrics()
92+
config.getMergeMetrics(),
93+
config.getIndexDeletionPolicyWrapper()
9394
);
9495
}
9596

server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -802,6 +802,28 @@ public <K, V> Map<K, V> readMapValues(final Writeable.Reader<V> valueReader, fin
802802
return map;
803803
}
804804

805+
/**
806+
* Reads a multiple {@code V}-values and then converts them to a {@code Map} using keyMapper.
807+
*
808+
* @param valueReader The value reader
809+
* @param keyMapper function to create a key from a value
810+
* @param constructor map constructor
811+
* @return Never {@code null}.
812+
*/
813+
public <K, V, M extends Map<K, V>> M readMapValues(
814+
final Writeable.Reader<V> valueReader,
815+
final Function<V, K> keyMapper,
816+
final IntFunction<M> constructor
817+
) throws IOException {
818+
final int size = readArraySize();
819+
final M map = constructor.apply(size);
820+
for (int i = 0; i < size; i++) {
821+
V value = valueReader.read(this);
822+
map.put(keyMapper.apply(value), value);
823+
}
824+
return map;
825+
}
826+
805827
/**
806828
* If the returned map contains any entries it will be mutable. If it is empty it might be immutable.
807829
*/

server/src/main/java/org/elasticsearch/common/lucene/FilterIndexCommit.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,11 @@ public Map<String, String> getUserData() throws IOException {
7171
public String toString() {
7272
return "FilterIndexCommit{" + "in=" + in + '}';
7373
}
74+
75+
public static IndexCommit unwrap(IndexCommit in) {
76+
while (in instanceof FilterIndexCommit) {
77+
in = ((FilterIndexCommit) in).getIndexCommit();
78+
}
79+
return in;
80+
}
7481
}

server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
* In particular, this policy will delete index commits whose max sequence number is at most
3939
* the current global checkpoint except the index commit which has the highest max sequence number among those.
4040
*/
41-
public class CombinedDeletionPolicy extends IndexDeletionPolicy {
41+
public class CombinedDeletionPolicy extends ElasticsearchIndexDeletionPolicy {
4242
private final Logger logger;
4343
private final TranslogDeletionPolicy translogDeletionPolicy;
4444
private final SoftDeletesPolicy softDeletesPolicy;
@@ -48,13 +48,6 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
4848
// when checking for externally acquired index commits that haven't been released
4949
private final Set<IndexCommit> internallyAcquiredIndexCommits;
5050

51-
interface CommitsListener {
52-
53-
void onNewAcquiredCommit(IndexCommit commit, Set<String> additionalFiles);
54-
55-
void onDeletedCommit(IndexCommit commit);
56-
}
57-
5851
@Nullable
5952
private final CommitsListener commitsListener;
6053

@@ -187,7 +180,6 @@ private void deleteCommit(IndexCommit commit) throws IOException {
187180
assert commit.isDeleted() == false : "Index commit [" + commitDescription(commit) + "] is deleted twice";
188181
logger.debug("Delete index commit [{}]", commitDescription(commit));
189182
commit.delete();
190-
assert commit.isDeleted() : "Deletion commit [" + commitDescription(commit) + "] was suppressed";
191183
}
192184

193185
private void updateRetentionPolicy() throws IOException {
@@ -204,7 +196,8 @@ protected int getDocCountOfCommit(IndexCommit indexCommit) throws IOException {
204196
return SegmentInfos.readCommit(indexCommit.getDirectory(), indexCommit.getSegmentsFileName()).totalMaxDoc();
205197
}
206198

207-
SafeCommitInfo getSafeCommitInfo() {
199+
@Override
200+
public SafeCommitInfo getSafeCommitInfo() {
208201
return safeCommitInfo;
209202
}
210203

@@ -214,7 +207,8 @@ SafeCommitInfo getSafeCommitInfo() {
214207
*
215208
* @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point.
216209
*/
217-
synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) {
210+
@Override
211+
public synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) {
218212
return acquireIndexCommit(acquiringSafeCommit, false);
219213
}
220214

@@ -241,7 +235,8 @@ protected IndexCommit wrapCommit(IndexCommit indexCommit, boolean acquiredIntern
241235
*
242236
* @return true if the acquired commit can be clean up.
243237
*/
244-
synchronized boolean releaseCommit(final IndexCommit acquiredCommit) {
238+
@Override
239+
public synchronized boolean releaseIndexCommit(final IndexCommit acquiredCommit) {
245240
final SnapshotIndexCommit snapshotIndexCommit = (SnapshotIndexCommit) acquiredCommit;
246241
final IndexCommit releasingCommit = snapshotIndexCommit.getIndexCommit();
247242
assert acquiredIndexCommits.containsKey(releasingCommit)
@@ -316,7 +311,8 @@ private static Set<String> listOfNewFileNames(IndexCommit previous, IndexCommit
316311
/**
317312
* Checks whether the deletion policy is holding on to externally acquired index commits
318313
*/
319-
synchronized boolean hasAcquiredIndexCommitsForTesting() {
314+
@Override
315+
public synchronized boolean hasAcquiredIndexCommitsForTesting() {
320316
// We explicitly check only external commits and disregard internal commits acquired by the commits listener
321317
for (var e : acquiredIndexCommits.entrySet()) {
322318
if (internallyAcquiredIndexCommits.contains(e.getKey()) == false || e.getValue() > 1) {
@@ -329,7 +325,8 @@ synchronized boolean hasAcquiredIndexCommitsForTesting() {
329325
/**
330326
* Checks if the deletion policy can delete some index commits with the latest global checkpoint.
331327
*/
332-
boolean hasUnreferencedCommits() {
328+
@Override
329+
public boolean hasUnreferencedCommits() {
333330
return maxSeqNoOfNextSafeCommit <= globalCheckpointSupplier.getAsLong();
334331
}
335332

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.apache.lucene.index.IndexCommit;
13+
import org.apache.lucene.index.IndexDeletionPolicy;
14+
15+
import java.util.Set;
16+
17+
public abstract class ElasticsearchIndexDeletionPolicy extends IndexDeletionPolicy {
18+
19+
/**
20+
* Captures the most recent commit point or the most recent safe commit point.
21+
* Index files of the capturing commit point won't be released until the commit reference is closed.
22+
*
23+
* @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point.
24+
*/
25+
public abstract IndexCommit acquireIndexCommit(boolean acquiringSafeCommit);
26+
27+
/**
28+
* Releases an index commit that was acquired by {@link #acquireIndexCommit(boolean)}.
29+
*
30+
* @return true if the acquired commit can be clean up.
31+
*/
32+
public abstract boolean releaseIndexCommit(IndexCommit acquiredIndexCommit);
33+
34+
/**
35+
* @return information about the safe commit
36+
*/
37+
public abstract SafeCommitInfo getSafeCommitInfo();
38+
39+
public abstract boolean hasAcquiredIndexCommitsForTesting();
40+
41+
public abstract boolean hasUnreferencedCommits();
42+
43+
public interface CommitsListener {
44+
45+
void onNewAcquiredCommit(IndexCommit commit, Set<String> additionalFiles);
46+
47+
void onDeletedCommit(IndexCommit commit);
48+
}
49+
}

server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.Comparator;
4242
import java.util.List;
4343
import java.util.Objects;
44+
import java.util.function.Function;
4445
import java.util.function.LongSupplier;
4546
import java.util.function.Supplier;
4647

@@ -151,6 +152,11 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {
151152

152153
private final MergeMetrics mergeMetrics;
153154

155+
/**
156+
* Allows to pass an {@link ElasticsearchIndexDeletionPolicy} wrapper to egine implementations.
157+
*/
158+
private final Function<ElasticsearchIndexDeletionPolicy, ElasticsearchIndexDeletionPolicy> indexDeletionPolicyWrapper;
159+
154160
/**
155161
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
156162
*/
@@ -184,7 +190,8 @@ public EngineConfig(
184190
boolean promotableToPrimary,
185191
MapperService mapperService,
186192
EngineResetLock engineResetLock,
187-
MergeMetrics mergeMetrics
193+
MergeMetrics mergeMetrics,
194+
Function<ElasticsearchIndexDeletionPolicy, ElasticsearchIndexDeletionPolicy> indexDeletionPolicyWrapper
188195
) {
189196
this.shardId = shardId;
190197
this.indexSettings = indexSettings;
@@ -233,6 +240,7 @@ public EngineConfig(
233240
this.useCompoundFile = indexSettings.getSettings().getAsBoolean(USE_COMPOUND_FILE, true);
234241
this.engineResetLock = engineResetLock;
235242
this.mergeMetrics = mergeMetrics;
243+
this.indexDeletionPolicyWrapper = indexDeletionPolicyWrapper;
236244
}
237245

238246
/**
@@ -485,4 +493,11 @@ public EngineResetLock getEngineResetLock() {
485493
public MergeMetrics getMergeMetrics() {
486494
return mergeMetrics;
487495
}
496+
497+
/**
498+
* @return an {@link ElasticsearchIndexDeletionPolicy} wrapper, to be use by engine implementations.
499+
*/
500+
public Function<ElasticsearchIndexDeletionPolicy, ElasticsearchIndexDeletionPolicy> getIndexDeletionPolicyWrapper() {
501+
return indexDeletionPolicyWrapper;
502+
}
488503
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public class InternalEngine extends Engine {
170170

171171
private final LocalCheckpointTracker localCheckpointTracker;
172172

173-
private final CombinedDeletionPolicy combinedDeletionPolicy;
173+
private final ElasticsearchIndexDeletionPolicy indexDeletionPolicy;
174174

175175
// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
176176
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttle
@@ -277,13 +277,7 @@ public InternalEngine(EngineConfig engineConfig) {
277277
this.totalDiskSpace = ByteSizeValue.of(Environment.getFileStore(translog.location()).getTotalSpace(), ByteSizeUnit.BYTES);
278278
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
279279
this.softDeletesPolicy = newSoftDeletesPolicy();
280-
this.combinedDeletionPolicy = new CombinedDeletionPolicy(
281-
logger,
282-
translogDeletionPolicy,
283-
softDeletesPolicy,
284-
translog::getLastSyncedGlobalCheckpoint,
285-
newCommitsListener()
286-
);
280+
this.indexDeletionPolicy = newIndexDeletionPolicy(engineConfig, logger, translog, softDeletesPolicy);
287281
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
288282
writer = createWriter();
289283
bootstrapAppendOnlyInfoFromWriter(writer);
@@ -391,6 +385,25 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
391385
);
392386
}
393387

388+
protected ElasticsearchIndexDeletionPolicy newIndexDeletionPolicy(
389+
EngineConfig engineConfig,
390+
Logger logger,
391+
Translog translog,
392+
SoftDeletesPolicy softDeletesPolicy
393+
) {
394+
var wrapper = engineConfig.getIndexDeletionPolicyWrapper();
395+
assert wrapper != null : "no index deletion policy wrapper for " + engineConfig.getShardId();
396+
return wrapper.apply(
397+
new CombinedDeletionPolicy(
398+
logger,
399+
translog.getDeletionPolicy(),
400+
softDeletesPolicy,
401+
translog::getLastSyncedGlobalCheckpoint,
402+
newCommitsListener()
403+
)
404+
);
405+
}
406+
394407
@Nullable
395408
private CombinedDeletionPolicy.CommitsListener newCommitsListener() {
396409
IndexCommitListener listener = engineConfig.getIndexCommitListener();
@@ -682,7 +695,7 @@ Translog getTranslog() {
682695

683696
// Package private for testing purposes only
684697
boolean hasAcquiredIndexCommitsForTesting() {
685-
return combinedDeletionPolicy.hasAcquiredIndexCommitsForTesting();
698+
return indexDeletionPolicy.hasAcquiredIndexCommitsForTesting();
686699
}
687700

688701
@Override
@@ -748,7 +761,7 @@ public Translog.Location getTranslogLastWriteLocation() {
748761
}
749762

750763
private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException {
751-
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
764+
if (indexDeletionPolicy.hasUnreferencedCommits()) {
752765
indexWriter.deleteUnusedFiles();
753766
}
754767
translog.trimUnreferencedReaders();
@@ -2555,17 +2568,17 @@ public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws En
25552568
future.actionGet();
25562569
logger.trace("finish flush for snapshot");
25572570
}
2558-
return acquireIndexCommitRef(() -> combinedDeletionPolicy.acquireIndexCommit(false));
2571+
return acquireIndexCommitRef(() -> indexDeletionPolicy.acquireIndexCommit(false));
25592572
}
25602573

25612574
@Override
25622575
public IndexCommitRef acquireSafeIndexCommit() throws EngineException {
2563-
return acquireIndexCommitRef(() -> combinedDeletionPolicy.acquireIndexCommit(true));
2576+
return acquireIndexCommitRef(() -> indexDeletionPolicy.acquireIndexCommit(true));
25642577
}
25652578

25662579
private void releaseIndexCommit(IndexCommit snapshot) throws IOException {
25672580
// Revisit the deletion policy if we can clean up the snapshotting commit.
2568-
if (combinedDeletionPolicy.releaseCommit(snapshot)) {
2581+
if (indexDeletionPolicy.releaseIndexCommit(snapshot)) {
25692582
try {
25702583
// Here we don't have to trim translog because snapshotting an index commit
25712584
// does not lock translog or prevents unreferenced files from trimming.
@@ -2578,7 +2591,7 @@ private void releaseIndexCommit(IndexCommit snapshot) throws IOException {
25782591

25792592
@Override
25802593
public SafeCommitInfo getSafeCommitInfo() {
2581-
return combinedDeletionPolicy.getSafeCommitInfo();
2594+
return indexDeletionPolicy.getSafeCommitInfo();
25822595
}
25832596

25842597
private boolean failOnTragicEvent(AlreadyClosedException ex) {
@@ -2756,7 +2769,7 @@ private IndexWriterConfig getIndexWriterConfig() {
27562769
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
27572770
iwc.setCommitOnClose(false); // we by default don't commit on close
27582771
iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
2759-
iwc.setIndexDeletionPolicy(combinedDeletionPolicy);
2772+
iwc.setIndexDeletionPolicy(indexDeletionPolicy);
27602773
iwc.setInfoStream(TESTS_VERBOSE ? InfoStream.getDefault() : new LoggerInfoStream(logger));
27612774
iwc.setMergeScheduler(mergeScheduler.getMergeScheduler());
27622775
// Give us the opportunity to upgrade old segments while performing

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3765,7 +3765,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
37653765
routingEntry().isPromotableToPrimary(),
37663766
mapperService(),
37673767
engineResetLock,
3768-
mergeMetrics
3768+
mergeMetrics,
3769+
Function.identity()
37693770
);
37703771
}
37713772

0 commit comments

Comments
 (0)