Skip to content

Commit 94c385d

Browse files
authored
Run newShardSnapshotTask tasks concurrently (#126452)
In #88707 we changed the behaviour here to run the shard-snapshot initialization tasks all in sequence. Yet these tasks do nontrivial work since they may flush to acquire the relevant index commit, so with this commit we go back to distributing them across the `SNAPSHOT` pool again.
1 parent 2c0b71f commit 94c385d

File tree

3 files changed

+103
-5
lines changed

3 files changed

+103
-5
lines changed

docs/changelog/126452.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 126452
2+
summary: Run `newShardSnapshotTask` tasks concurrently
3+
area: Snapshot/Restore
4+
type: bug
5+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,30 @@
1313
import org.apache.logging.log4j.core.LogEvent;
1414
import org.elasticsearch.ElasticsearchException;
1515
import org.elasticsearch.action.ActionListener;
16+
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
17+
import org.elasticsearch.action.admin.cluster.snapshots.create.TransportCreateSnapshotAction;
1618
import org.elasticsearch.action.support.ActionTestUtils;
1719
import org.elasticsearch.action.support.SubscribableListener;
20+
import org.elasticsearch.cluster.SnapshotsInProgress;
1821
import org.elasticsearch.cluster.coordination.Coordinator;
1922
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
2023
import org.elasticsearch.plugins.Plugin;
2124
import org.elasticsearch.snapshots.mockstore.MockRepository;
25+
import org.elasticsearch.test.ClusterServiceUtils;
2226
import org.elasticsearch.test.ESIntegTestCase;
2327
import org.elasticsearch.test.MockLog;
2428
import org.elasticsearch.test.disruption.NetworkDisruption;
2529
import org.elasticsearch.test.junit.annotations.TestLogging;
2630
import org.elasticsearch.test.transport.MockTransportService;
31+
import org.elasticsearch.threadpool.ThreadPool;
32+
import org.elasticsearch.threadpool.ThreadPoolStats;
2733
import org.elasticsearch.transport.TestTransportChannel;
2834
import org.elasticsearch.transport.TransportResponse;
2935

3036
import java.util.Arrays;
3137
import java.util.Collection;
3238
import java.util.List;
39+
import java.util.concurrent.CyclicBarrier;
3340
import java.util.concurrent.TimeUnit;
3441
import java.util.concurrent.atomic.AtomicBoolean;
3542

@@ -166,6 +173,59 @@ public boolean innerMatch(LogEvent event) {
166173
}
167174
}
168175
);
176+
}
177+
178+
public void testStartSnapshotsConcurrently() {
179+
internalCluster().startMasterOnlyNode();
180+
final var dataNode = internalCluster().startDataOnlyNode();
181+
182+
final var repoName = randomIdentifier();
183+
createRepository(repoName, "fs");
184+
185+
final var threadPool = internalCluster().getInstance(ThreadPool.class, dataNode);
186+
final var snapshotThreadCount = threadPool.info(ThreadPool.Names.SNAPSHOT).getMax();
187+
188+
final var indexName = randomIdentifier();
189+
final var shardCount = between(1, snapshotThreadCount * 2);
190+
assertAcked(prepareCreate(indexName, 0, indexSettingsNoReplicas(shardCount)));
191+
indexRandomDocs(indexName, scaledRandomIntBetween(50, 100));
192+
193+
final var snapshotExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
194+
final var barrier = new CyclicBarrier(snapshotThreadCount + 1);
195+
for (int i = 0; i < snapshotThreadCount; i++) {
196+
snapshotExecutor.submit(() -> {
197+
safeAwait(barrier);
198+
safeAwait(barrier);
199+
});
200+
}
201+
202+
// wait until the snapshot threads are all blocked
203+
safeAwait(barrier);
204+
205+
safeGet(
206+
client().execute(
207+
TransportCreateSnapshotAction.TYPE,
208+
new CreateSnapshotRequest(TEST_REQUEST_TIMEOUT, repoName, randomIdentifier())
209+
)
210+
);
211+
212+
// one task for each snapshot thread (throttled) or shard (if fewer), plus one for runSyncTasksEagerly()
213+
assertEquals(Math.min(snapshotThreadCount, shardCount) + 1, getSnapshotQueueLength(threadPool));
214+
215+
// release all the snapshot threads
216+
safeAwait(barrier);
217+
218+
// wait for completion
219+
safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(cs -> SnapshotsInProgress.get(cs).isEmpty()));
220+
}
221+
222+
private static int getSnapshotQueueLength(ThreadPool threadPool) {
223+
for (ThreadPoolStats.Stats stats : threadPool.stats().stats()) {
224+
if (stats.name().equals(ThreadPool.Names.SNAPSHOT)) {
225+
return stats.queue();
226+
}
227+
}
169228

229+
throw new AssertionError("threadpool stats for [" + ThreadPool.Names.SNAPSHOT + "] not found");
170230
}
171231
}

server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.common.util.Maps;
3232
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
3333
import org.elasticsearch.core.Nullable;
34+
import org.elasticsearch.core.Releasable;
3435
import org.elasticsearch.index.IndexVersion;
3536
import org.elasticsearch.index.engine.Engine;
3637
import org.elasticsearch.index.seqno.SequenceNumbers;
@@ -56,7 +57,6 @@
5657
import org.elasticsearch.transport.TransportService;
5758

5859
import java.io.IOException;
59-
import java.util.ArrayList;
6060
import java.util.HashMap;
6161
import java.util.HashSet;
6262
import java.util.Iterator;
@@ -98,6 +98,9 @@ public final class SnapshotShardsService extends AbstractLifecycleComponent impl
9898
// A map of snapshots to the shardIds that we already reported to the master as failed
9999
private final ResultDeduplicator<UpdateIndexShardSnapshotStatusRequest, Void> remoteFailedRequestDeduplicator;
100100

101+
// Runs the tasks that start each shard snapshot (e.g. acquiring the index commit)
102+
private final ThrottledTaskRunner startShardSnapshotTaskRunner;
103+
101104
// Runs the tasks that promptly notify shards of aborted snapshots so that resources can be released ASAP
102105
private final ThrottledTaskRunner notifyOnAbortTaskRunner;
103106

@@ -131,6 +134,11 @@ public SnapshotShardsService(
131134
threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
132135
threadPool.generic()
133136
);
137+
this.startShardSnapshotTaskRunner = new ThrottledTaskRunner(
138+
"start-shard-snapshots",
139+
threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
140+
threadPool.executor(ThreadPool.Names.SNAPSHOT)
141+
);
134142
}
135143

136144
@Override
@@ -384,7 +392,6 @@ private void startNewShardSnapshots(String localNodeId, SnapshotsInProgress.Entr
384392

385393
final var newSnapshotShards = shardSnapshots.computeIfAbsent(snapshot, s -> new HashMap<>());
386394

387-
final List<Runnable> shardSnapshotTasks = new ArrayList<>(shardsToStart.size());
388395
for (final Map.Entry<ShardId, ShardGeneration> shardEntry : shardsToStart.entrySet()) {
389396
final ShardId shardId = shardEntry.getKey();
390397
final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(shardEntry.getValue());
@@ -396,11 +403,37 @@ private void startNewShardSnapshots(String localNodeId, SnapshotsInProgress.Entr
396403
: "Found non-null, non-numeric shard generation ["
397404
+ snapshotStatus.generation()
398405
+ "] for snapshot with old-format compatibility";
399-
shardSnapshotTasks.add(newShardSnapshotTask(shardId, snapshot, indexId, snapshotStatus, entry.version(), entry.startTime()));
400-
snapshotStatus.updateStatusDescription("shard snapshot scheduled to start");
406+
final var shardSnapshotTask = newShardSnapshotTask(
407+
shardId,
408+
snapshot,
409+
indexId,
410+
snapshotStatus,
411+
entry.version(),
412+
entry.startTime()
413+
);
414+
startShardSnapshotTaskRunner.enqueueTask(new ActionListener<>() {
415+
@Override
416+
public void onResponse(Releasable releasable) {
417+
try (releasable) {
418+
shardSnapshotTask.run();
419+
}
420+
}
421+
422+
@Override
423+
public void onFailure(Exception e) {
424+
final var wrapperException = new IllegalStateException(
425+
"impossible failure starting shard snapshot for " + shardId + " in " + snapshot,
426+
e
427+
);
428+
logger.error(wrapperException.getMessage(), wrapperException);
429+
assert false : wrapperException; // impossible
430+
}
431+
});
432+
snapshotStatus.updateStatusDescription("shard snapshot enqueued to start");
401433
}
402434

403-
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> shardSnapshotTasks.forEach(Runnable::run));
435+
// apply some backpressure by reserving one SNAPSHOT thread for the startup work
436+
startShardSnapshotTaskRunner.runSyncTasksEagerly(threadPool.executor(ThreadPool.Names.SNAPSHOT));
404437
}
405438

406439
private void pauseShardSnapshotsForNodeRemoval(String localNodeId, SnapshotsInProgress.Entry entry) {

0 commit comments

Comments
 (0)