Skip to content

Commit 69fa6b3

Browse files
authored
Run newShardSnapshotTask tasks concurrently (#126478)
In #88707 we changed the behaviour here to run the shard-snapshot initialization tasks all in sequence. Yet these tasks do nontrivial work since they may flush to acquire the relevant index commit, so with this commit we go back to distributing them across the `SNAPSHOT` pool again. Backport of #126452 to `8.x`
1 parent 1e6cdc3 commit 69fa6b3

File tree

3 files changed

+103
-4
lines changed

3 files changed

+103
-4
lines changed

docs/changelog/126452.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 126452
2+
summary: Run `newShardSnapshotTask` tasks concurrently
3+
area: Snapshot/Restore
4+
type: bug
5+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,23 @@
99

1010
package org.elasticsearch.snapshots;
1111

12+
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
13+
import org.elasticsearch.action.admin.cluster.snapshots.create.TransportCreateSnapshotAction;
14+
import org.elasticsearch.cluster.SnapshotsInProgress;
1215
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
1316
import org.elasticsearch.plugins.Plugin;
1417
import org.elasticsearch.snapshots.mockstore.MockRepository;
18+
import org.elasticsearch.test.ClusterServiceUtils;
1519
import org.elasticsearch.test.ESIntegTestCase;
1620
import org.elasticsearch.test.disruption.NetworkDisruption;
1721
import org.elasticsearch.test.transport.MockTransportService;
22+
import org.elasticsearch.threadpool.ThreadPool;
23+
import org.elasticsearch.threadpool.ThreadPoolStats;
1824

1925
import java.util.Arrays;
2026
import java.util.Collection;
2127
import java.util.List;
28+
import java.util.concurrent.CyclicBarrier;
2229
import java.util.concurrent.TimeUnit;
2330

2431
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@@ -89,4 +96,58 @@ public void testRetryPostingSnapshotStatusMessages() throws Exception {
8996
assertThat(snapshotInfo.successfulShards(), equalTo(shards));
9097
}, 30L, TimeUnit.SECONDS);
9198
}
99+
100+
public void testStartSnapshotsConcurrently() {
101+
internalCluster().startMasterOnlyNode();
102+
final var dataNode = internalCluster().startDataOnlyNode();
103+
104+
final var repoName = randomIdentifier();
105+
createRepository(repoName, "fs");
106+
107+
final var threadPool = internalCluster().getInstance(ThreadPool.class, dataNode);
108+
final var snapshotThreadCount = threadPool.info(ThreadPool.Names.SNAPSHOT).getMax();
109+
110+
final var indexName = randomIdentifier();
111+
final var shardCount = between(1, snapshotThreadCount * 2);
112+
assertAcked(prepareCreate(indexName, 0, indexSettingsNoReplicas(shardCount)));
113+
indexRandomDocs(indexName, scaledRandomIntBetween(50, 100));
114+
115+
final var snapshotExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
116+
final var barrier = new CyclicBarrier(snapshotThreadCount + 1);
117+
for (int i = 0; i < snapshotThreadCount; i++) {
118+
snapshotExecutor.submit(() -> {
119+
safeAwait(barrier);
120+
safeAwait(barrier);
121+
});
122+
}
123+
124+
// wait until the snapshot threads are all blocked
125+
safeAwait(barrier);
126+
127+
safeGet(
128+
client().execute(
129+
TransportCreateSnapshotAction.TYPE,
130+
new CreateSnapshotRequest(TEST_REQUEST_TIMEOUT, repoName, randomIdentifier())
131+
)
132+
);
133+
134+
// one task for each snapshot thread (throttled) or shard (if fewer), plus one for runSyncTasksEagerly()
135+
assertEquals(Math.min(snapshotThreadCount, shardCount) + 1, getSnapshotQueueLength(threadPool));
136+
137+
// release all the snapshot threads
138+
safeAwait(barrier);
139+
140+
// wait for completion
141+
safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(cs -> SnapshotsInProgress.get(cs).isEmpty()));
142+
}
143+
144+
private static int getSnapshotQueueLength(ThreadPool threadPool) {
145+
for (ThreadPoolStats.Stats stats : threadPool.stats().stats()) {
146+
if (stats.name().equals(ThreadPool.Names.SNAPSHOT)) {
147+
return stats.queue();
148+
}
149+
}
150+
151+
throw new AssertionError("threadpool stats for [" + ThreadPool.Names.SNAPSHOT + "] not found");
152+
}
92153
}

server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.common.util.Maps;
3030
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
3131
import org.elasticsearch.core.Nullable;
32+
import org.elasticsearch.core.Releasable;
3233
import org.elasticsearch.index.IndexVersion;
3334
import org.elasticsearch.index.engine.Engine;
3435
import org.elasticsearch.index.seqno.SequenceNumbers;
@@ -54,7 +55,6 @@
5455
import org.elasticsearch.transport.TransportService;
5556

5657
import java.io.IOException;
57-
import java.util.ArrayList;
5858
import java.util.HashMap;
5959
import java.util.Iterator;
6060
import java.util.List;
@@ -87,6 +87,9 @@ public final class SnapshotShardsService extends AbstractLifecycleComponent impl
8787
// A map of snapshots to the shardIds that we already reported to the master as failed
8888
private final ResultDeduplicator<UpdateIndexShardSnapshotStatusRequest, Void> remoteFailedRequestDeduplicator;
8989

90+
// Runs the tasks that start each shard snapshot (e.g. acquiring the index commit)
91+
private final ThrottledTaskRunner startShardSnapshotTaskRunner;
92+
9093
// Runs the tasks that promptly notify shards of aborted snapshots so that resources can be released ASAP
9194
private final ThrottledTaskRunner notifyOnAbortTaskRunner;
9295

@@ -114,6 +117,11 @@ public SnapshotShardsService(
114117
threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
115118
threadPool.generic()
116119
);
120+
this.startShardSnapshotTaskRunner = new ThrottledTaskRunner(
121+
"start-shard-snapshots",
122+
threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
123+
threadPool.executor(ThreadPool.Names.SNAPSHOT)
124+
);
117125
}
118126

119127
@Override
@@ -304,7 +312,6 @@ private void startNewShardSnapshots(String localNodeId, SnapshotsInProgress.Entr
304312

305313
final var newSnapshotShards = shardSnapshots.computeIfAbsent(snapshot, s -> new HashMap<>());
306314

307-
final List<Runnable> shardSnapshotTasks = new ArrayList<>(shardsToStart.size());
308315
for (final Map.Entry<ShardId, ShardGeneration> shardEntry : shardsToStart.entrySet()) {
309316
final ShardId shardId = shardEntry.getKey();
310317
final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(shardEntry.getValue());
@@ -316,10 +323,36 @@ private void startNewShardSnapshots(String localNodeId, SnapshotsInProgress.Entr
316323
: "Found non-null, non-numeric shard generation ["
317324
+ snapshotStatus.generation()
318325
+ "] for snapshot with old-format compatibility";
319-
shardSnapshotTasks.add(newShardSnapshotTask(shardId, snapshot, indexId, snapshotStatus, entry.version(), entry.startTime()));
326+
final var shardSnapshotTask = newShardSnapshotTask(
327+
shardId,
328+
snapshot,
329+
indexId,
330+
snapshotStatus,
331+
entry.version(),
332+
entry.startTime()
333+
);
334+
startShardSnapshotTaskRunner.enqueueTask(new ActionListener<>() {
335+
@Override
336+
public void onResponse(Releasable releasable) {
337+
try (releasable) {
338+
shardSnapshotTask.run();
339+
}
340+
}
341+
342+
@Override
343+
public void onFailure(Exception e) {
344+
final var wrapperException = new IllegalStateException(
345+
"impossible failure starting shard snapshot for " + shardId + " in " + snapshot,
346+
e
347+
);
348+
logger.error(wrapperException.getMessage(), wrapperException);
349+
assert false : wrapperException; // impossible
350+
}
351+
});
320352
}
321353

322-
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> shardSnapshotTasks.forEach(Runnable::run));
354+
// apply some backpressure by reserving one SNAPSHOT thread for the startup work
355+
startShardSnapshotTaskRunner.runSyncTasksEagerly(threadPool.executor(ThreadPool.Names.SNAPSHOT));
323356
}
324357

325358
private void pauseShardSnapshotsForNodeRemoval(String localNodeId, SnapshotsInProgress.Entry entry) {

0 commit comments

Comments
 (0)