Skip to content

Commit 282dfed

Browse files
authored
Merge branch 'main' into es-10851-allow-reindex-when-completed
2 parents ad785dd + ad220c1 commit 282dfed

File tree

16 files changed

+103
-75
lines changed

16 files changed

+103
-75
lines changed

docs/changelog/122731.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 122731
2+
summary: Fork post-snapshot-delete cleanup off master thread
3+
area: Snapshot/Restore
4+
type: bug
5+
issues: []

libs/core/src/main/java/org/elasticsearch/core/UpdateForV9.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,11 @@
2424
enum Owner {
2525
CORE_INFRA,
2626
DATA_MANAGEMENT,
27-
DISTRIBUTED_COORDINATION,
2827
DISTRIBUTED_INDEXING,
2928
ENTERPRISE_SEARCH,
3029
MACHINE_LEARNING,
3130
PROFILING,
3231
SEARCH_ANALYTICS,
33-
SEARCH_FOUNDATIONS,
34-
SEARCH_RELEVANCE,
3532
SECURITY,
3633
}
3734

muted-tests.yml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,6 @@ tests:
6161
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
6262
method: test {p0=transform/transforms_start_stop/Verify start transform reuses destination index}
6363
issue: https://github.com/elastic/elasticsearch/issues/115808
64-
- class: org.elasticsearch.search.StressSearchServiceReaperIT
65-
method: testStressReaper
66-
issue: https://github.com/elastic/elasticsearch/issues/115816
6764
- class: org.elasticsearch.xpack.application.connector.ConnectorIndexServiceTests
6865
issue: https://github.com/elastic/elasticsearch/issues/116087
6966
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
@@ -329,6 +326,8 @@ tests:
329326
- class: org.elasticsearch.xpack.esql.heap_attack.HeapAttackIT
330327
method: testEnrichExplosionManyMatches
331328
issue: https://github.com/elastic/elasticsearch/issues/122913
329+
- class: org.elasticsearch.xpack.search.AsyncSearchSecurityIT
330+
issue: https://github.com/elastic/elasticsearch/issues/122940
332331

333332
# Examples:
334333
#

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.delete/10_basic.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,10 @@ setup:
6868
wait_for_completion: false
6969

7070
- match: { acknowledged: true }
71+
72+
# now create another snapshot just to ensure that the async delete finishes before the test cleanup runs:
73+
- do:
74+
snapshot.create:
75+
repository: test_repo_create_1
76+
snapshot: barrier_snapshot
77+
wait_for_completion: true

server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,12 @@ public void taskSucceeded(ClusterStateTaskListener clusterStateTaskListener, Obj
508508
.orElseThrow()
509509
.queue();
510510

511+
// There is one task in the queue for computing and forking the cleanup work.
512+
assertThat(queueLength.getAsInt(), equalTo(1));
513+
514+
safeAwait(barrier); // unblock the barrier thread and let it process the queue
515+
safeAwait(barrier); // wait for the queue to be processed
516+
511517
// There are indexCount (=3*snapshotPoolSize) index-deletion tasks, plus one for cleaning up the root metadata. However, the
512518
// throttled runner only enqueues one task per SNAPSHOT thread to start with, and then the eager runner adds another one. This shows
513519
// we are not spamming the threadpool with all the tasks at once, which means that other snapshot activities can run alongside this

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2925,7 +2925,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
29252925
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time
29262926
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
29272927
*/
2928-
final Map<String, String> extraCommitUserData = getCommitExtraUserData();
2928+
final Map<String, String> extraCommitUserData = getCommitExtraUserData(localCheckpoint);
29292929
final Map<String, String> commitData = Maps.newMapWithExpectedSize(8 + extraCommitUserData.size());
29302930
commitData.putAll(extraCommitUserData);
29312931
commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID());
@@ -2973,8 +2973,10 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
29732973
/**
29742974
* Allows InternalEngine extenders to return custom key-value pairs which will be included in the Lucene commit user-data. Custom user
29752975
* data keys can be overwritten by if their keys conflict keys used by InternalEngine.
2976+
*
2977+
* @param localCheckpoint the local checkpoint of the commit
29762978
*/
2977-
protected Map<String, String> getCommitExtraUserData() {
2979+
protected Map<String, String> getCommitExtraUserData(final long localCheckpoint) {
29782980
return Collections.emptyMap();
29792981
}
29802982

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4491,14 +4491,17 @@ public void waitForEngineOrClosedShard(ActionListener<Void> listener) {
44914491
}
44924492

44934493
/**
4494-
* Registers a listener for an event when the shard advances to the provided primary term and segment generation
4494+
* Registers a listener for an event when the shard advances to the provided primary term and segment generation.
4495+
* Completes the listener with a {@link IndexShardClosedException} if the shard is closed.
44954496
*/
44964497
public void waitForPrimaryTermAndGeneration(long primaryTerm, long segmentGeneration, ActionListener<Long> listener) {
4497-
waitForEngineOrClosedShard(
4498-
listener.delegateFailureAndWrap(
4499-
(l, ignored) -> getEngine().addPrimaryTermAndGenerationListener(primaryTerm, segmentGeneration, l)
4500-
)
4501-
);
4498+
waitForEngineOrClosedShard(listener.delegateFailureAndWrap((l, ignored) -> {
4499+
if (state == IndexShardState.CLOSED) {
4500+
l.onFailure(new IndexShardClosedException(shardId));
4501+
} else {
4502+
getEngine().addPrimaryTermAndGenerationListener(primaryTerm, segmentGeneration, l);
4503+
}
4504+
}));
45024505
}
45034506

45044507
/**

server/src/main/java/org/elasticsearch/repositories/RepositoryData.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.snapshots.SnapshotInfo;
3030
import org.elasticsearch.snapshots.SnapshotState;
3131
import org.elasticsearch.snapshots.SnapshotsService;
32+
import org.elasticsearch.threadpool.ThreadPool;
3233
import org.elasticsearch.xcontent.XContentBuilder;
3334
import org.elasticsearch.xcontent.XContentParser;
3435

@@ -377,6 +378,7 @@ private static boolean isIndexToUpdateAfterRemovingSnapshots(
377378
* @return map of index to index metadata blob id to delete
378379
*/
379380
public Map<IndexId, Collection<String>> indexMetaDataToRemoveAfterRemovingSnapshots(Collection<SnapshotId> snapshotIds) {
381+
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
380382
Iterator<IndexId> indicesForSnapshot = indicesToUpdateAfterRemovingSnapshot(snapshotIds);
381383
final Set<String> allRemainingIdentifiers = indexMetaDataGenerations.lookup.entrySet()
382384
.stream()

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,14 +1134,20 @@ private void runWithUniqueShardMetadataNaming(ActionListener<RepositoryData> rep
11341134
);
11351135
})
11361136

1137-
.<RepositoryData>andThen((l, newRepositoryData) -> {
1138-
l.onResponse(newRepositoryData);
1139-
// Once we have updated the repository, run the unreferenced blobs cleanup in parallel to shard-level snapshot deletion
1140-
try (var refs = new RefCountingRunnable(onCompletion)) {
1141-
cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener());
1142-
cleanupUnlinkedShardLevelBlobs(refs.acquireListener());
1137+
.<RepositoryData>andThen(
1138+
// writeIndexGen finishes on master-service thread so must fork here.
1139+
snapshotExecutor,
1140+
threadPool.getThreadContext(),
1141+
(l, newRepositoryData) -> {
1142+
l.onResponse(newRepositoryData);
1143+
// Once we have updated the repository, run the unreferenced blobs cleanup in parallel to shard-level snapshot
1144+
// deletion
1145+
try (var refs = new RefCountingRunnable(onCompletion)) {
1146+
cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener());
1147+
cleanupUnlinkedShardLevelBlobs(refs.acquireListener());
1148+
}
11431149
}
1144-
})
1150+
)
11451151

11461152
.addListener(repositoryDataUpdateListener);
11471153
}

server/src/main/java/org/elasticsearch/search/aggregations/metrics/AvgAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ protected LeafBucketCollector getLeafCollector(NumericDoubleValues values, final
5858
public void collect(int doc, long bucket) throws IOException {
5959
if (values.advanceExact(doc)) {
6060
maybeGrow(bucket);
61-
computeSum(bucket, values, sums, compensations);
61+
computeSum(bucket, values.doubleValue(), sums, compensations);
6262
counts.increment(bucket, 1L);
6363
}
6464
}

0 commit comments

Comments
 (0)