Skip to content

Commit 39f973e

Browse files
committed
Unwrap StorageException in our wrapper to simplify retry logic
1 parent 437456c commit 39f973e

File tree

1 file changed

+32
-32
lines changed

1 file changed

+32
-32
lines changed

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -402,24 +402,19 @@ public void write(byte[] b, int off, int len) throws IOException {
402402
writeBlob(purpose, blobName, buffer.bytes(), failIfAlreadyExists);
403403
}
404404
return;
405-
} catch (IOException ioException) {
406-
// This is for handling exceptions thrown by resumable uploads. BaseStorageWriteChannel.write wraps
407-
// the StorageException in an IOException, so we need to unwrap it.
408-
final StorageException se = (StorageException) ExceptionsHelper.unwrap(ioException, StorageException.class);
409-
if (se != null) {
410-
final int errorCode = se.getCode();
411-
if (errorCode == HTTP_GONE) {
412-
logger.warn(() -> format("Retrying broken resumable upload session for blob %s", blobInfo), se);
413-
storageException = ExceptionsHelper.useOrSuppress(storageException, se);
414-
continue;
415-
} else if (failIfAlreadyExists && errorCode == HTTP_PRECON_FAILED) {
416-
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
417-
}
405+
} catch (final StorageException se) {
406+
final int errorCode = se.getCode();
407+
if (errorCode == HTTP_GONE) {
408+
logger.warn(() -> format("Retrying broken resumable upload session for blob %s", blobInfo), se);
409+
storageException = ExceptionsHelper.useOrSuppress(storageException, se);
410+
continue;
411+
} else if (failIfAlreadyExists && errorCode == HTTP_PRECON_FAILED) {
412+
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
418413
}
419414
if (storageException != null) {
420-
ioException.addSuppressed(storageException);
415+
se.addSuppressed(storageException);
421416
}
422-
throw ioException;
417+
throw se;
423418
}
424419
}
425420
assert storageException != null;
@@ -474,25 +469,20 @@ private void writeBlobResumable(
474469
// operation is billed.
475470
stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT);
476471
return;
477-
} catch (final IOException ioException) {
478-
// This is for handling exceptions thrown by resumable uploads. BaseStorageWriteChannel.write wraps
479-
// the StorageException in an IOException, so we need to unwrap it.
480-
final StorageException se = (StorageException) ExceptionsHelper.unwrap(ioException, StorageException.class);
481-
if (se != null) {
482-
final int errorCode = se.getCode();
483-
if (errorCode == HTTP_GONE) {
484-
logger.warn(() -> format("Retrying broken resumable upload session for blob %s", blobInfo), se);
485-
storageException = ExceptionsHelper.useOrSuppress(storageException, se);
486-
inputStream.reset();
487-
continue;
488-
} else if (failIfAlreadyExists && errorCode == HTTP_PRECON_FAILED) {
489-
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
490-
}
472+
} catch (final StorageException se) {
473+
final int errorCode = se.getCode();
474+
if (errorCode == HTTP_GONE) {
475+
logger.warn(() -> format("Retrying broken resumable upload session for blob %s", blobInfo), se);
476+
storageException = ExceptionsHelper.useOrSuppress(storageException, se);
477+
inputStream.reset();
478+
continue;
479+
} else if (failIfAlreadyExists && errorCode == HTTP_PRECON_FAILED) {
480+
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
491481
}
492482
if (storageException != null) {
493-
ioException.addSuppressed(storageException);
483+
se.addSuppressed(storageException);
494484
}
495-
throw ioException;
485+
throw se;
496486
}
497487
}
498488
assert storageException != null;
@@ -662,7 +652,17 @@ private static final class WritableBlobChannel implements WritableByteChannel {
662652
@SuppressForbidden(reason = "channel is based on a socket")
663653
@Override
664654
public int write(final ByteBuffer src) throws IOException {
665-
return SocketAccess.doPrivilegedIOException(() -> channel.write(src));
655+
try {
656+
return SocketAccess.doPrivilegedIOException(() -> channel.write(src));
657+
} catch (IOException e) {
658+
// BaseStorageWriteChannel#write wraps StorageException in an IOException, but BaseStorageWriteChannel#close
659+
// does not, if we unwrap StorageExceptions here, it simplifies our retry-on-gone logic
660+
final StorageException storageException = (StorageException) ExceptionsHelper.unwrap(e, StorageException.class);
661+
if (storageException != null) {
662+
throw storageException;
663+
}
664+
throw e;
665+
}
666666
}
667667

668668
@Override

0 commit comments

Comments
 (0)