Skip to content

Commit 47be542

Browse files
Add a shard snapshot current status debug string (#118198)
SnapshotShardsService will update the new debug string field in IndexShardSnapshotStatus as a shard snapshot operation proceeds so that the current state can be logged. The SnapshotShutdownProgressTracker will iterate through the SnapshotShardsService's list of shard snapshots and log their current status. We want to know where in the code a shard snapshot operation potentially gets stuck. This new field should be updated as frequently as is reasonable to inform on the shard snapshot's progress. Closes ES-10261
1 parent a27e5db commit 47be542

File tree

6 files changed

+166
-25
lines changed

6 files changed

+166
-25
lines changed

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,15 @@ public void testSnapshotShutdownProgressTracker() throws Exception {
524524
"Pause signals have been set for all shard snapshots on data node [" + nodeForRemovalId + "]"
525525
)
526526
);
527+
mockLog.addExpectation(
528+
new MockLog.SeenEventExpectation(
529+
"SnapshotShutdownProgressTracker index shard snapshot status messages",
530+
SnapshotShutdownProgressTracker.class.getCanonicalName(),
531+
Level.INFO,
532+
// Expect the shard snapshot to stall in data file upload, since we've blocked the data node file upload to the blob store.
533+
"statusDescription='enqueued file snapshot tasks: threads running concurrent file uploads'"
534+
)
535+
);
527536

528537
putShutdownForRemovalMetadata(nodeForRemoval, clusterService);
529538

@@ -583,6 +592,14 @@ public void testSnapshotShutdownProgressTracker() throws Exception {
583592
"Current active shard snapshot stats on data node [" + nodeForRemovalId + "]*Paused [" + numShards + "]"
584593
)
585594
);
595+
mockLog.addExpectation(
596+
new MockLog.SeenEventExpectation(
597+
"SnapshotShutdownProgressTracker index shard snapshot messages",
598+
SnapshotShutdownProgressTracker.class.getCanonicalName(),
599+
Level.INFO,
600+
"statusDescription='finished: master notification attempt complete'"
601+
)
602+
);
586603

587604
// Release the master node to respond
588605
snapshotStatusUpdateLatch.countDown();

server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public enum AbortStatus {
9898
private long processedSize;
9999
private String failure;
100100
private final SubscribableListener<AbortStatus> abortListeners = new SubscribableListener<>();
101+
private volatile String statusDescription;
101102

102103
private IndexShardSnapshotStatus(
103104
final Stage stage,
@@ -110,7 +111,8 @@ private IndexShardSnapshotStatus(
110111
final long totalSize,
111112
final long processedSize,
112113
final String failure,
113-
final ShardGeneration generation
114+
final ShardGeneration generation,
115+
final String statusDescription
114116
) {
115117
this.stage = new AtomicReference<>(Objects.requireNonNull(stage));
116118
this.generation = new AtomicReference<>(generation);
@@ -124,6 +126,7 @@ private IndexShardSnapshotStatus(
124126
this.processedSize = processedSize;
125127
this.incrementalSize = incrementalSize;
126128
this.failure = failure;
129+
updateStatusDescription(statusDescription);
127130
}
128131

129132
public synchronized Copy moveToStarted(
@@ -272,6 +275,15 @@ public synchronized void addProcessedFiles(int count, long totalSize) {
272275
processedSize += totalSize;
273276
}
274277

278+
/**
279+
* Updates the string explanation for what the snapshot is actively doing right now.
280+
*/
281+
public void updateStatusDescription(String statusString) {
282+
assert statusString != null;
283+
assert statusString.isEmpty() == false;
284+
this.statusDescription = statusString;
285+
}
286+
275287
/**
276288
* Returns a copy of the current {@link IndexShardSnapshotStatus}. This method is
277289
* intended to be used when a coherent state of {@link IndexShardSnapshotStatus} is needed.
@@ -289,20 +301,21 @@ public synchronized IndexShardSnapshotStatus.Copy asCopy() {
289301
incrementalSize,
290302
totalSize,
291303
processedSize,
292-
failure
304+
failure,
305+
statusDescription
293306
);
294307
}
295308

296309
public static IndexShardSnapshotStatus newInitializing(ShardGeneration generation) {
297-
return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null, generation);
310+
return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null, generation, "initializing");
298311
}
299312

300313
public static IndexShardSnapshotStatus.Copy newFailed(final String failure) {
301314
assert failure != null : "expecting non null failure for a failed IndexShardSnapshotStatus";
302315
if (failure == null) {
303316
throw new IllegalArgumentException("A failure description is required for a failed IndexShardSnapshotStatus");
304317
}
305-
return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure, null).asCopy();
318+
return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure, null, "initialized as failed").asCopy();
306319
}
307320

308321
public static IndexShardSnapshotStatus.Copy newDone(
@@ -326,7 +339,8 @@ public static IndexShardSnapshotStatus.Copy newDone(
326339
size,
327340
incrementalSize,
328341
null,
329-
generation
342+
generation,
343+
"initialized as done"
330344
).asCopy();
331345
}
332346

@@ -345,6 +359,7 @@ public static class Copy {
345359
private final long processedSize;
346360
private final long incrementalSize;
347361
private final String failure;
362+
private final String statusDescription;
348363

349364
public Copy(
350365
final Stage stage,
@@ -356,7 +371,8 @@ public Copy(
356371
final long incrementalSize,
357372
final long totalSize,
358373
final long processedSize,
359-
final String failure
374+
final String failure,
375+
final String statusDescription
360376
) {
361377
this.stage = stage;
362378
this.startTime = startTime;
@@ -368,6 +384,7 @@ public Copy(
368384
this.processedSize = processedSize;
369385
this.incrementalSize = incrementalSize;
370386
this.failure = failure;
387+
this.statusDescription = statusDescription;
371388
}
372389

373390
public Stage getStage() {
@@ -410,6 +427,10 @@ public String getFailure() {
410427
return failure;
411428
}
412429

430+
public String getStatusDescription() {
431+
return statusDescription;
432+
}
433+
413434
@Override
414435
public String toString() {
415436
return "index shard snapshot status ("
@@ -433,6 +454,8 @@ public String toString() {
433454
+ processedSize
434455
+ ", failure='"
435456
+ failure
457+
+ "', statusDescription='"
458+
+ statusDescription
436459
+ '\''
437460
+ ')';
438461
}
@@ -461,6 +484,8 @@ public String toString() {
461484
+ processedSize
462485
+ ", failure='"
463486
+ failure
487+
+ "', statusDescription='"
488+
+ statusDescription
464489
+ '\''
465490
+ ')';
466491
}

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3186,6 +3186,7 @@ private void writeAtomic(
31863186

31873187
@Override
31883188
public void snapshotShard(SnapshotShardContext context) {
3189+
context.status().updateStatusDescription("queued in snapshot task runner");
31893190
shardSnapshotTaskRunner.enqueueShardSnapshot(context);
31903191
}
31913192

@@ -3198,6 +3199,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
31983199
final ShardId shardId = store.shardId();
31993200
final SnapshotId snapshotId = context.snapshotId();
32003201
final IndexShardSnapshotStatus snapshotStatus = context.status();
3202+
snapshotStatus.updateStatusDescription("snapshot task runner: setting up shard snapshot");
32013203
final long startTime = threadPool.absoluteTimeInMillis();
32023204
try {
32033205
final ShardGeneration generation = snapshotStatus.generation();
@@ -3206,6 +3208,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
32063208
final Set<String> blobs;
32073209
if (generation == null) {
32083210
snapshotStatus.ensureNotAborted();
3211+
snapshotStatus.updateStatusDescription("snapshot task runner: listing blob prefixes");
32093212
try {
32103213
blobs = shardContainer.listBlobsByPrefix(OperationPurpose.SNAPSHOT_METADATA, SNAPSHOT_INDEX_PREFIX).keySet();
32113214
} catch (IOException e) {
@@ -3216,6 +3219,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
32163219
}
32173220

32183221
snapshotStatus.ensureNotAborted();
3222+
snapshotStatus.updateStatusDescription("snapshot task runner: loading snapshot blobs");
32193223
Tuple<BlobStoreIndexShardSnapshots, ShardGeneration> tuple = buildBlobStoreIndexShardSnapshots(
32203224
context.indexId(),
32213225
shardId.id(),
@@ -3316,6 +3320,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
33163320
indexCommitPointFiles = filesFromSegmentInfos;
33173321
}
33183322

3323+
snapshotStatus.updateStatusDescription("snapshot task runner: starting shard snapshot");
33193324
snapshotStatus.moveToStarted(
33203325
startTime,
33213326
indexIncrementalFileCount,
@@ -3342,6 +3347,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
33423347
BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID,
33433348
Boolean.toString(writeFileInfoWriterUUID)
33443349
);
3350+
snapshotStatus.updateStatusDescription("snapshot task runner: updating blob store with new shard generation");
33453351
INDEX_SHARD_SNAPSHOTS_FORMAT.write(
33463352
updatedBlobStoreIndexShardSnapshots,
33473353
shardContainer,
@@ -3387,6 +3393,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
33873393
BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID,
33883394
Boolean.toString(writeFileInfoWriterUUID)
33893395
);
3396+
snapshotStatus.updateStatusDescription("no shard generations: writing new index-${N} file");
33903397
writeShardIndexBlobAtomic(shardContainer, newGen, updatedBlobStoreIndexShardSnapshots, serializationParams);
33913398
} catch (IOException e) {
33923399
throw new IndexShardSnapshotFailedException(
@@ -3401,6 +3408,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
34013408
}
34023409
snapshotStatus.addProcessedFiles(finalFilesInShardMetadataCount, finalFilesInShardMetadataSize);
34033410
try {
3411+
snapshotStatus.updateStatusDescription("no shard generations: deleting blobs");
34043412
deleteFromContainer(OperationPurpose.SNAPSHOT_METADATA, shardContainer, blobsToDelete.iterator());
34053413
} catch (IOException e) {
34063414
logger.warn(
@@ -3414,6 +3422,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
34143422
// filesToSnapshot will be emptied while snapshotting the file. We make a copy here for cleanup purpose in case of failure.
34153423
final AtomicReference<List<FileInfo>> fileToCleanUp = new AtomicReference<>(List.copyOf(filesToSnapshot));
34163424
final ActionListener<Collection<Void>> allFilesUploadedListener = ActionListener.assertOnce(ActionListener.wrap(ignore -> {
3425+
snapshotStatus.updateStatusDescription("all files uploaded: finalizing");
34173426
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize();
34183427

34193428
// now create and write the commit point
@@ -3435,6 +3444,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
34353444
BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID,
34363445
Boolean.toString(writeFileInfoWriterUUID)
34373446
);
3447+
snapshotStatus.updateStatusDescription("all files uploaded: writing to index shard file");
34383448
INDEX_SHARD_SNAPSHOT_FORMAT.write(
34393449
blobStoreIndexShardSnapshot,
34403450
shardContainer,
@@ -3451,10 +3461,12 @@ private void doSnapshotShard(SnapshotShardContext context) {
34513461
ByteSizeValue.ofBytes(blobStoreIndexShardSnapshot.totalSize()),
34523462
getSegmentInfoFileCount(blobStoreIndexShardSnapshot.indexFiles())
34533463
);
3464+
snapshotStatus.updateStatusDescription("all files uploaded: done");
34543465
snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), shardSnapshotResult);
34553466
context.onResponse(shardSnapshotResult);
34563467
}, e -> {
34573468
try {
3469+
snapshotStatus.updateStatusDescription("all files uploaded: cleaning up data files, exception while finalizing: " + e);
34583470
shardContainer.deleteBlobsIgnoringIfNotExists(
34593471
OperationPurpose.SNAPSHOT_DATA,
34603472
Iterators.flatMap(fileToCleanUp.get().iterator(), f -> Iterators.forRange(0, f.numberOfParts(), f::partName))
@@ -3517,6 +3529,7 @@ protected void snapshotFiles(
35173529
) {
35183530
final int noOfFilesToSnapshot = filesToSnapshot.size();
35193531
final ActionListener<Void> filesListener = fileQueueListener(filesToSnapshot, noOfFilesToSnapshot, allFilesUploadedListener);
3532+
context.status().updateStatusDescription("enqueued file snapshot tasks: threads running concurrent file uploads");
35203533
for (int i = 0; i < noOfFilesToSnapshot; i++) {
35213534
shardSnapshotTaskRunner.enqueueFileSnapshot(context, filesToSnapshot::poll, filesListener);
35223535
}

0 commit comments

Comments
 (0)