Skip to content

Commit d470e5a

Browse files
authored
Merge branch 'main' into read-failure-store-privilege-role-building
2 parents 8036ecf + f220aba commit d470e5a

File tree

12 files changed

+71
-66
lines changed

12 files changed

+71
-66
lines changed

docs/changelog/122731.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 122731
2+
summary: Fork post-snapshot-delete cleanup off master thread
3+
area: Snapshot/Restore
4+
type: bug
5+
issues: []

libs/core/src/main/java/org/elasticsearch/core/UpdateForV9.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,11 @@
2424
enum Owner {
2525
CORE_INFRA,
2626
DATA_MANAGEMENT,
27-
DISTRIBUTED_COORDINATION,
2827
DISTRIBUTED_INDEXING,
2928
ENTERPRISE_SEARCH,
3029
MACHINE_LEARNING,
3130
PROFILING,
3231
SEARCH_ANALYTICS,
33-
SEARCH_FOUNDATIONS,
34-
SEARCH_RELEVANCE,
3532
SECURITY,
3633
}
3734

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,6 @@ tests:
6161
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
6262
method: test {p0=transform/transforms_start_stop/Verify start transform reuses destination index}
6363
issue: https://github.com/elastic/elasticsearch/issues/115808
64-
- class: org.elasticsearch.search.StressSearchServiceReaperIT
65-
method: testStressReaper
66-
issue: https://github.com/elastic/elasticsearch/issues/115816
6764
- class: org.elasticsearch.xpack.application.connector.ConnectorIndexServiceTests
6865
issue: https://github.com/elastic/elasticsearch/issues/116087
6966
- class: org.elasticsearch.xpack.test.rest.XPackRestIT

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.delete/10_basic.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,10 @@ setup:
6868
wait_for_completion: false
6969

7070
- match: { acknowledged: true }
71+
72+
# now create another snapshot just to ensure that the async delete finishes before the test cleanup runs:
73+
- do:
74+
snapshot.create:
75+
repository: test_repo_create_1
76+
snapshot: barrier_snapshot
77+
wait_for_completion: true

server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,12 @@ public void taskSucceeded(ClusterStateTaskListener clusterStateTaskListener, Obj
508508
.orElseThrow()
509509
.queue();
510510

511+
// There is one task in the queue for computing and forking the cleanup work.
512+
assertThat(queueLength.getAsInt(), equalTo(1));
513+
514+
safeAwait(barrier); // unblock the barrier thread and let it process the queue
515+
safeAwait(barrier); // wait for the queue to be processed
516+
511517
// There are indexCount (=3*snapshotPoolSize) index-deletion tasks, plus one for cleaning up the root metadata. However, the
512518
// throttled runner only enqueues one task per SNAPSHOT thread to start with, and then the eager runner adds another one. This shows
513519
// we are not spamming the threadpool with all the tasks at once, which means that other snapshot activities can run alongside this

server/src/main/java/org/elasticsearch/repositories/RepositoryData.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.snapshots.SnapshotInfo;
3030
import org.elasticsearch.snapshots.SnapshotState;
3131
import org.elasticsearch.snapshots.SnapshotsService;
32+
import org.elasticsearch.threadpool.ThreadPool;
3233
import org.elasticsearch.xcontent.XContentBuilder;
3334
import org.elasticsearch.xcontent.XContentParser;
3435

@@ -377,6 +378,7 @@ private static boolean isIndexToUpdateAfterRemovingSnapshots(
377378
* @return map of index to index metadata blob id to delete
378379
*/
379380
public Map<IndexId, Collection<String>> indexMetaDataToRemoveAfterRemovingSnapshots(Collection<SnapshotId> snapshotIds) {
381+
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
380382
Iterator<IndexId> indicesForSnapshot = indicesToUpdateAfterRemovingSnapshot(snapshotIds);
381383
final Set<String> allRemainingIdentifiers = indexMetaDataGenerations.lookup.entrySet()
382384
.stream()

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,14 +1134,20 @@ private void runWithUniqueShardMetadataNaming(ActionListener<RepositoryData> rep
11341134
);
11351135
})
11361136

1137-
.<RepositoryData>andThen((l, newRepositoryData) -> {
1138-
l.onResponse(newRepositoryData);
1139-
// Once we have updated the repository, run the unreferenced blobs cleanup in parallel to shard-level snapshot deletion
1140-
try (var refs = new RefCountingRunnable(onCompletion)) {
1141-
cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener());
1142-
cleanupUnlinkedShardLevelBlobs(refs.acquireListener());
1137+
.<RepositoryData>andThen(
1138+
// writeIndexGen finishes on master-service thread so must fork here.
1139+
snapshotExecutor,
1140+
threadPool.getThreadContext(),
1141+
(l, newRepositoryData) -> {
1142+
l.onResponse(newRepositoryData);
1143+
// Once we have updated the repository, run the unreferenced blobs cleanup in parallel to shard-level snapshot
1144+
// deletion
1145+
try (var refs = new RefCountingRunnable(onCompletion)) {
1146+
cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener());
1147+
cleanupUnlinkedShardLevelBlobs(refs.acquireListener());
1148+
}
11431149
}
1144-
})
1150+
)
11451151

11461152
.addListener(repositoryDataUpdateListener);
11471153
}

server/src/main/java/org/elasticsearch/search/aggregations/metrics/AvgAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ protected LeafBucketCollector getLeafCollector(NumericDoubleValues values, final
5858
public void collect(int doc, long bucket) throws IOException {
5959
if (values.advanceExact(doc)) {
6060
maybeGrow(bucket);
61-
computeSum(bucket, values, sums, compensations);
61+
computeSum(bucket, values.doubleValue(), sums, compensations);
6262
counts.increment(bucket, 1L);
6363
}
6464
}

server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregator.java

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ public void collect(int doc, long bucket) throws IOException {
105105

106106
@Override
107107
protected LeafBucketCollector getLeafCollector(NumericDoubleValues values, final LeafBucketCollector sub) {
108-
final CompensatedSum compensatedSum = new CompensatedSum(0, 0);
109108
return new LeafBucketCollectorBase(sub, values) {
110109

111110
@Override
@@ -114,20 +113,9 @@ public void collect(int doc, long bucket) throws IOException {
114113
maybeGrow(bucket);
115114
final double value = values.doubleValue();
116115
counts.increment(bucket, 1L);
117-
// Compute the sum and sum of squires for double values with Kahan summation algorithm
118-
// which is more accurate than naive summation.
119-
compensatedSum.reset(sums.get(bucket), compensations.get(bucket));
120-
compensatedSum.add(value);
121-
sums.set(bucket, compensatedSum.value());
122-
compensations.set(bucket, compensatedSum.delta());
123-
124-
compensatedSum.reset(sumOfSqrs.get(bucket), compensationOfSqrs.get(bucket));
125-
compensatedSum.add(value * value);
126-
sumOfSqrs.set(bucket, compensatedSum.value());
127-
compensationOfSqrs.set(bucket, compensatedSum.delta());
128-
129-
mins.set(bucket, Math.min(mins.get(bucket), value));
130-
maxes.set(bucket, Math.max(maxes.get(bucket), value));
116+
SumAggregator.computeSum(bucket, value, sums, compensations);
117+
SumAggregator.computeSum(bucket, value * value, sumOfSqrs, compensationOfSqrs);
118+
StatsAggregator.updateMinsAndMaxes(bucket, value, mins, maxes);
131119
}
132120
}
133121

@@ -138,13 +126,14 @@ private void maybeGrow(long bucket) {
138126
if (bucket >= counts.size()) {
139127
final long from = counts.size();
140128
final long overSize = BigArrays.overSize(bucket + 1);
141-
counts = bigArrays().resize(counts, overSize);
142-
sums = bigArrays().resize(sums, overSize);
143-
compensations = bigArrays().resize(compensations, overSize);
144-
mins = bigArrays().resize(mins, overSize);
145-
maxes = bigArrays().resize(maxes, overSize);
146-
sumOfSqrs = bigArrays().resize(sumOfSqrs, overSize);
147-
compensationOfSqrs = bigArrays().resize(compensationOfSqrs, overSize);
129+
var bigArrays = bigArrays();
130+
counts = bigArrays.resize(counts, overSize);
131+
sums = bigArrays.resize(sums, overSize);
132+
compensations = bigArrays.resize(compensations, overSize);
133+
mins = bigArrays.resize(mins, overSize);
134+
maxes = bigArrays.resize(maxes, overSize);
135+
sumOfSqrs = bigArrays.resize(sumOfSqrs, overSize);
136+
compensationOfSqrs = bigArrays.resize(compensationOfSqrs, overSize);
148137
mins.fill(from, overSize, Double.POSITIVE_INFINITY);
149138
maxes.fill(from, overSize, Double.NEGATIVE_INFINITY);
150139
}

server/src/main/java/org/elasticsearch/search/aggregations/metrics/GeoCentroidAggregator.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ public void collect(int doc, long bucket) throws IOException {
9393
}
9494

9595
private LeafBucketCollector getLeafCollector(GeoPointValues values, LeafBucketCollector sub) {
96-
final CompensatedSum compensatedSum = new CompensatedSum(0, 0);
9796
return new LeafBucketCollectorBase(sub, values) {
9897
@Override
9998
public void collect(int doc, long bucket) throws IOException {
@@ -104,16 +103,8 @@ public void collect(int doc, long bucket) throws IOException {
104103
// Compute the sum of double values with Kahan summation algorithm which is more
105104
// accurate than naive summation.
106105
final GeoPoint value = values.pointValue();
107-
// latitude
108-
compensatedSum.reset(latSum.get(bucket), latCompensations.get(bucket));
109-
compensatedSum.add(value.getLat());
110-
latSum.set(bucket, compensatedSum.value());
111-
latCompensations.set(bucket, compensatedSum.delta());
112-
// longitude
113-
compensatedSum.reset(lonSum.get(bucket), lonCompensations.get(bucket));
114-
compensatedSum.add(value.getLon());
115-
lonSum.set(bucket, compensatedSum.value());
116-
lonCompensations.set(bucket, compensatedSum.delta());
106+
SumAggregator.computeSum(bucket, value.getLat(), latSum, latCompensations);
107+
SumAggregator.computeSum(bucket, value.getLon(), lonSum, lonCompensations);
117108
}
118109
}
119110
};

0 commit comments

Comments
 (0)