Skip to content

Commit ddf2923

Browse files
authored
Merge branch 'main' into arbitrary-intervals-of-month-year-buckets
2 parents 650eed7 + 8e34393 commit ddf2923

File tree

14 files changed

+96
-72
lines changed

14 files changed

+96
-72
lines changed

docs/changelog/122731.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 122731
2+
summary: Fork post-snapshot-delete cleanup off master thread
3+
area: Snapshot/Restore
4+
type: bug
5+
issues: []

libs/core/src/main/java/org/elasticsearch/core/UpdateForV9.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,11 @@
2424
enum Owner {
2525
CORE_INFRA,
2626
DATA_MANAGEMENT,
27-
DISTRIBUTED_COORDINATION,
2827
DISTRIBUTED_INDEXING,
2928
ENTERPRISE_SEARCH,
3029
MACHINE_LEARNING,
3130
PROFILING,
3231
SEARCH_ANALYTICS,
33-
SEARCH_FOUNDATIONS,
34-
SEARCH_RELEVANCE,
3532
SECURITY,
3633
}
3734

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,6 @@ tests:
6161
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
6262
method: test {p0=transform/transforms_start_stop/Verify start transform reuses destination index}
6363
issue: https://github.com/elastic/elasticsearch/issues/115808
64-
- class: org.elasticsearch.search.StressSearchServiceReaperIT
65-
method: testStressReaper
66-
issue: https://github.com/elastic/elasticsearch/issues/115816
6764
- class: org.elasticsearch.xpack.application.connector.ConnectorIndexServiceTests
6865
issue: https://github.com/elastic/elasticsearch/issues/116087
6966
- class: org.elasticsearch.xpack.test.rest.XPackRestIT

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.delete/10_basic.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,10 @@ setup:
6868
wait_for_completion: false
6969

7070
- match: { acknowledged: true }
71+
72+
# now create another snapshot just to ensure that the async delete finishes before the test cleanup runs:
73+
- do:
74+
snapshot.create:
75+
repository: test_repo_create_1
76+
snapshot: barrier_snapshot
77+
wait_for_completion: true

server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,12 @@ public void taskSucceeded(ClusterStateTaskListener clusterStateTaskListener, Obj
508508
.orElseThrow()
509509
.queue();
510510

511+
// There is one task in the queue for computing and forking the cleanup work.
512+
assertThat(queueLength.getAsInt(), equalTo(1));
513+
514+
safeAwait(barrier); // unblock the barrier thread and let it process the queue
515+
safeAwait(barrier); // wait for the queue to be processed
516+
511517
// There are indexCount (=3*snapshotPoolSize) index-deletion tasks, plus one for cleaning up the root metadata. However, the
512518
// throttled runner only enqueues one task per SNAPSHOT thread to start with, and then the eager runner adds another one. This shows
513519
// we are not spamming the threadpool with all the tasks at once, which means that other snapshot activities can run alongside this

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4491,14 +4491,17 @@ public void waitForEngineOrClosedShard(ActionListener<Void> listener) {
44914491
}
44924492

44934493
/**
4494-
* Registers a listener for an event when the shard advances to the provided primary term and segment generation
4494+
* Registers a listener for an event when the shard advances to the provided primary term and segment generation.
4495+
* Completes the listener with a {@link IndexShardClosedException} if the shard is closed.
44954496
*/
44964497
public void waitForPrimaryTermAndGeneration(long primaryTerm, long segmentGeneration, ActionListener<Long> listener) {
4497-
waitForEngineOrClosedShard(
4498-
listener.delegateFailureAndWrap(
4499-
(l, ignored) -> getEngine().addPrimaryTermAndGenerationListener(primaryTerm, segmentGeneration, l)
4500-
)
4501-
);
4498+
waitForEngineOrClosedShard(listener.delegateFailureAndWrap((l, ignored) -> {
4499+
if (state == IndexShardState.CLOSED) {
4500+
l.onFailure(new IndexShardClosedException(shardId));
4501+
} else {
4502+
getEngine().addPrimaryTermAndGenerationListener(primaryTerm, segmentGeneration, l);
4503+
}
4504+
}));
45024505
}
45034506

45044507
/**

server/src/main/java/org/elasticsearch/repositories/RepositoryData.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.snapshots.SnapshotInfo;
3030
import org.elasticsearch.snapshots.SnapshotState;
3131
import org.elasticsearch.snapshots.SnapshotsService;
32+
import org.elasticsearch.threadpool.ThreadPool;
3233
import org.elasticsearch.xcontent.XContentBuilder;
3334
import org.elasticsearch.xcontent.XContentParser;
3435

@@ -377,6 +378,7 @@ private static boolean isIndexToUpdateAfterRemovingSnapshots(
377378
* @return map of index to index metadata blob id to delete
378379
*/
379380
public Map<IndexId, Collection<String>> indexMetaDataToRemoveAfterRemovingSnapshots(Collection<SnapshotId> snapshotIds) {
381+
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
380382
Iterator<IndexId> indicesForSnapshot = indicesToUpdateAfterRemovingSnapshot(snapshotIds);
381383
final Set<String> allRemainingIdentifiers = indexMetaDataGenerations.lookup.entrySet()
382384
.stream()

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,14 +1134,20 @@ private void runWithUniqueShardMetadataNaming(ActionListener<RepositoryData> rep
11341134
);
11351135
})
11361136

1137-
.<RepositoryData>andThen((l, newRepositoryData) -> {
1138-
l.onResponse(newRepositoryData);
1139-
// Once we have updated the repository, run the unreferenced blobs cleanup in parallel to shard-level snapshot deletion
1140-
try (var refs = new RefCountingRunnable(onCompletion)) {
1141-
cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener());
1142-
cleanupUnlinkedShardLevelBlobs(refs.acquireListener());
1137+
.<RepositoryData>andThen(
1138+
// writeIndexGen finishes on master-service thread so must fork here.
1139+
snapshotExecutor,
1140+
threadPool.getThreadContext(),
1141+
(l, newRepositoryData) -> {
1142+
l.onResponse(newRepositoryData);
1143+
// Once we have updated the repository, run the unreferenced blobs cleanup in parallel to shard-level snapshot
1144+
// deletion
1145+
try (var refs = new RefCountingRunnable(onCompletion)) {
1146+
cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener());
1147+
cleanupUnlinkedShardLevelBlobs(refs.acquireListener());
1148+
}
11431149
}
1144-
})
1150+
)
11451151

11461152
.addListener(repositoryDataUpdateListener);
11471153
}

server/src/main/java/org/elasticsearch/search/aggregations/metrics/AvgAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ protected LeafBucketCollector getLeafCollector(NumericDoubleValues values, final
5858
public void collect(int doc, long bucket) throws IOException {
5959
if (values.advanceExact(doc)) {
6060
maybeGrow(bucket);
61-
computeSum(bucket, values, sums, compensations);
61+
computeSum(bucket, values.doubleValue(), sums, compensations);
6262
counts.increment(bucket, 1L);
6363
}
6464
}

server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregator.java

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ public void collect(int doc, long bucket) throws IOException {
105105

106106
@Override
107107
protected LeafBucketCollector getLeafCollector(NumericDoubleValues values, final LeafBucketCollector sub) {
108-
final CompensatedSum compensatedSum = new CompensatedSum(0, 0);
109108
return new LeafBucketCollectorBase(sub, values) {
110109

111110
@Override
@@ -114,20 +113,9 @@ public void collect(int doc, long bucket) throws IOException {
114113
maybeGrow(bucket);
115114
final double value = values.doubleValue();
116115
counts.increment(bucket, 1L);
117-
// Compute the sum and sum of squires for double values with Kahan summation algorithm
118-
// which is more accurate than naive summation.
119-
compensatedSum.reset(sums.get(bucket), compensations.get(bucket));
120-
compensatedSum.add(value);
121-
sums.set(bucket, compensatedSum.value());
122-
compensations.set(bucket, compensatedSum.delta());
123-
124-
compensatedSum.reset(sumOfSqrs.get(bucket), compensationOfSqrs.get(bucket));
125-
compensatedSum.add(value * value);
126-
sumOfSqrs.set(bucket, compensatedSum.value());
127-
compensationOfSqrs.set(bucket, compensatedSum.delta());
128-
129-
mins.set(bucket, Math.min(mins.get(bucket), value));
130-
maxes.set(bucket, Math.max(maxes.get(bucket), value));
116+
SumAggregator.computeSum(bucket, value, sums, compensations);
117+
SumAggregator.computeSum(bucket, value * value, sumOfSqrs, compensationOfSqrs);
118+
StatsAggregator.updateMinsAndMaxes(bucket, value, mins, maxes);
131119
}
132120
}
133121

@@ -138,13 +126,14 @@ private void maybeGrow(long bucket) {
138126
if (bucket >= counts.size()) {
139127
final long from = counts.size();
140128
final long overSize = BigArrays.overSize(bucket + 1);
141-
counts = bigArrays().resize(counts, overSize);
142-
sums = bigArrays().resize(sums, overSize);
143-
compensations = bigArrays().resize(compensations, overSize);
144-
mins = bigArrays().resize(mins, overSize);
145-
maxes = bigArrays().resize(maxes, overSize);
146-
sumOfSqrs = bigArrays().resize(sumOfSqrs, overSize);
147-
compensationOfSqrs = bigArrays().resize(compensationOfSqrs, overSize);
129+
var bigArrays = bigArrays();
130+
counts = bigArrays.resize(counts, overSize);
131+
sums = bigArrays.resize(sums, overSize);
132+
compensations = bigArrays.resize(compensations, overSize);
133+
mins = bigArrays.resize(mins, overSize);
134+
maxes = bigArrays.resize(maxes, overSize);
135+
sumOfSqrs = bigArrays.resize(sumOfSqrs, overSize);
136+
compensationOfSqrs = bigArrays.resize(compensationOfSqrs, overSize);
148137
mins.fill(from, overSize, Double.POSITIVE_INFINITY);
149138
maxes.fill(from, overSize, Double.NEGATIVE_INFINITY);
150139
}

0 commit comments

Comments
 (0)