14
14
import org .elasticsearch .action .ActionRunnable ;
15
15
import org .elasticsearch .action .admin .cluster .repositories .integrity .VerifyRepositoryIntegrityAction ;
16
16
import org .elasticsearch .action .support .ListenableActionFuture ;
17
+ import org .elasticsearch .cluster .metadata .Metadata ;
17
18
import org .elasticsearch .common .CheckedSupplier ;
18
19
import org .elasticsearch .common .blobstore .support .BlobMetadata ;
19
20
import org .elasticsearch .common .unit .ByteSizeUnit ;
37
38
import java .util .Iterator ;
38
39
import java .util .List ;
39
40
import java .util .Map ;
40
- import java .util .Objects ;
41
41
import java .util .Queue ;
42
42
import java .util .Set ;
43
43
import java .util .concurrent .ConcurrentLinkedQueue ;
@@ -100,9 +100,10 @@ public void close() {
100
100
}
101
101
102
102
private void addFailure (String format , Object ... args ) {
103
- if (failureCount .incrementAndGet () <= verifyRequest .getMaxFailures ()) {
103
+ final var failureNumber = failureCount .incrementAndGet ();
104
+ if (failureNumber <= verifyRequest .getMaxFailures ()) {
104
105
final var failure = format (format , args );
105
- logger .debug ("[{}] found metadata verification failure: {}" , repositoryName , failure );
106
+ logger .debug ("[{}] found metadata verification failure [{}] : {}" , repositoryName , failureNumber , failure );
106
107
failures .add (new RepositoryVerificationException (repositoryName , failure ));
107
108
}
108
109
}
@@ -111,8 +112,9 @@ private void addFailure(Exception exception) {
111
112
if (isCancelledSupplier .getAsBoolean () && exception instanceof TaskCancelledException ) {
112
113
return ;
113
114
}
114
- if (failureCount .incrementAndGet () <= verifyRequest .getMaxFailures ()) {
115
- logger .debug (() -> format ("[%s] exception during metadata verification: {}" , repositoryName ), exception );
115
+ final var failureNumber = failureCount .incrementAndGet ();
116
+ if (failureNumber <= verifyRequest .getMaxFailures ()) {
117
+ logger .debug (() -> format ("[%s] exception [%d] during metadata verification" , repositoryName , failureNumber ), exception );
116
118
failures .add (
117
119
exception instanceof RepositoryVerificationException rve
118
120
? rve
@@ -168,13 +170,28 @@ private void verifySnapshot(RefCounted snapshotRefs, SnapshotId snapshotId) {
168
170
}
169
171
}));
170
172
171
- forkSupply (snapshotRefs , () -> blobStoreRepository . getSnapshotGlobalMetadata (snapshotId ), metadata -> {
172
- if (metadata .indices ().isEmpty () == false ) {
173
+ forkSupply (snapshotRefs , () -> getSnapshotGlobalMetadata (snapshotId ), metadata -> {
174
+ if (metadata != null && metadata .indices ().isEmpty () == false ) {
173
175
addFailure ("snapshot [%s] contains unexpected index metadata within global metadata" , snapshotId );
174
176
}
175
177
});
176
178
}
177
179
180
+ private Metadata getSnapshotGlobalMetadata (SnapshotId snapshotId ) {
181
+ try {
182
+ return blobStoreRepository .getSnapshotGlobalMetadata (snapshotId );
183
+ } catch (Exception e ) {
184
+ addFailure (
185
+ new RepositoryVerificationException (
186
+ repositoryName ,
187
+ format ("failed to get snapshot global metadata for [%s]" , snapshotId ),
188
+ e
189
+ )
190
+ );
191
+ return null ;
192
+ }
193
+ }
194
+
178
195
private void verifyIndices () {
179
196
final var indicesMap = repositoryData .getIndices ();
180
197
@@ -232,22 +249,14 @@ private void verifyIndexSnapshot(RefCounted indexSnapshotRefs, SnapshotId snapsh
232
249
shardCountListenersByBlobId .computeIfAbsent (indexMetaBlobId , ignored -> {
233
250
final var shardCountFuture = new ListenableActionFuture <Integer >();
234
251
forkSupply (() -> {
235
- final var shardCount = blobStoreRepository .getSnapshotIndexMetaData (repositoryData , snapshotId , indexId )
236
- .getNumberOfShards ();
252
+ final var shardCount = getNumberOfShards (indexMetaBlobId , snapshotId );
237
253
for (int i = 0 ; i < shardCount ; i ++) {
238
254
shardContainerContentsListener .computeIfAbsent (i , shardId -> {
239
255
final var shardContainerContentsFuture = new ListenableActionFuture <ShardContainerContents >();
240
256
forkSupply (
241
257
() -> new ShardContainerContents (
242
258
blobStoreRepository .shardContainer (indexId , shardId ).listBlobs (),
243
- blobStoreRepository .getBlobStoreIndexShardSnapshots (
244
- indexId ,
245
- shardId ,
246
- Objects .requireNonNull (
247
- repositoryData .shardGenerations ().getShardGen (indexId , shardId ),
248
- "shard generations for " + indexId + "/" + shardId
249
- )
250
- )
259
+ getBlobStoreIndexShardSnapshots (shardId )
251
260
),
252
261
shardContainerContentsFuture
253
262
);
@@ -266,10 +275,7 @@ private void verifyIndexSnapshot(RefCounted indexSnapshotRefs, SnapshotId snapsh
266
275
indexSnapshotRefs ,
267
276
shardContainerContents -> forkSupply (
268
277
indexSnapshotRefs ,
269
- () -> blobStoreRepository .loadShardSnapshot (
270
- blobStoreRepository .shardContainer (indexId , shardId ),
271
- snapshotId
272
- ),
278
+ () -> getBlobStoreIndexShardSnapshot (snapshotId , shardId ),
273
279
shardSnapshot -> verifyShardSnapshot (snapshotId , shardId , shardContainerContents , shardSnapshot )
274
280
)
275
281
)
@@ -278,12 +284,60 @@ private void verifyIndexSnapshot(RefCounted indexSnapshotRefs, SnapshotId snapsh
278
284
}));
279
285
}
280
286
287
+ private BlobStoreIndexShardSnapshot getBlobStoreIndexShardSnapshot (SnapshotId snapshotId , int shardId ) {
288
+ try {
289
+ return blobStoreRepository .loadShardSnapshot (blobStoreRepository .shardContainer (indexId , shardId ), snapshotId );
290
+ } catch (Exception e ) {
291
+ addFailure (
292
+ new RepositoryVerificationException (
293
+ repositoryName ,
294
+ format ("failed to load shard %s/%d snapshot for %s" , indexId , shardId , snapshotId ),
295
+ e
296
+ )
297
+ );
298
+ return null ;
299
+ }
300
+ }
301
+
302
+ private int getNumberOfShards (String indexMetaBlobId , SnapshotId snapshotId ) {
303
+ try {
304
+ return blobStoreRepository .getSnapshotIndexMetaData (repositoryData , snapshotId , indexId ).getNumberOfShards ();
305
+ } catch (Exception e ) {
306
+ addFailure (
307
+ new RepositoryVerificationException (
308
+ repositoryName ,
309
+ format ("failed to load index %s metadata for %s from blob [%s]" , indexId , snapshotId , indexMetaBlobId ),
310
+ e
311
+ )
312
+ );
313
+ return 0 ;
314
+ }
315
+ }
316
+
317
+ private BlobStoreIndexShardSnapshots getBlobStoreIndexShardSnapshots (int shardId ) {
318
+ final var shardGen = repositoryData .shardGenerations ().getShardGen (indexId , shardId );
319
+ if (shardGen == null ) {
320
+ addFailure ("unknown shard generation for %s/%d" , indexId , shardId );
321
+ return null ;
322
+ }
323
+ try {
324
+ return blobStoreRepository .getBlobStoreIndexShardSnapshots (indexId , shardId , shardGen );
325
+ } catch (Exception e ) {
326
+ addFailure (e );
327
+ return null ;
328
+ }
329
+ }
330
+
281
331
private void verifyShardSnapshot (
282
332
SnapshotId snapshotId ,
283
333
int shardId ,
284
334
ShardContainerContents shardContainerContents ,
285
335
BlobStoreIndexShardSnapshot shardSnapshot
286
336
) {
337
+ if (shardSnapshot == null ) {
338
+ return ;
339
+ }
340
+
287
341
if (shardSnapshot .snapshot ().equals (snapshotId .getName ()) == false ) {
288
342
addFailure (
289
343
"snapshot [%s] for shard [%s/%d] has mismatched name [%s]" ,
@@ -298,17 +352,20 @@ private void verifyShardSnapshot(
298
352
verifyFileInfo (snapshotId .toString (), shardId , shardContainerContents .blobsByName (), fileInfo );
299
353
}
300
354
301
- boolean foundSnapshot = false ;
302
- for (SnapshotFiles summary : shardContainerContents .blobStoreIndexShardSnapshots ().snapshots ()) {
303
- if (summary .snapshot ().equals (snapshotId .getName ())) {
304
- foundSnapshot = true ;
305
- verifyConsistentShardFiles (snapshotId , shardId , shardSnapshot , summary );
306
- break ;
355
+ final var blobStoreIndexShardSnapshots = shardContainerContents .blobStoreIndexShardSnapshots ();
356
+ if (blobStoreIndexShardSnapshots != null ) {
357
+ boolean foundSnapshot = false ;
358
+ for (SnapshotFiles summary : blobStoreIndexShardSnapshots .snapshots ()) {
359
+ if (summary .snapshot ().equals (snapshotId .getName ())) {
360
+ foundSnapshot = true ;
361
+ verifyConsistentShardFiles (snapshotId , shardId , shardSnapshot , summary );
362
+ break ;
363
+ }
307
364
}
308
- }
309
365
310
- if (foundSnapshot == false ) {
311
- addFailure ("snapshot [%s] for shard [%s/%d] has no entry in the shard-level summary" , snapshotId , indexId , shardId );
366
+ if (foundSnapshot == false ) {
367
+ addFailure ("snapshot [%s] for shard [%s/%d] has no entry in the shard-level summary" , snapshotId , indexId , shardId );
368
+ }
312
369
}
313
370
}
314
371
0 commit comments