Skip to content

Commit df9eaff

Browse files
committed
Fix handling of 410 gone, change of default chunk size
1 parent 946f9ac commit df9eaff

File tree

2 files changed

+33
-23
lines changed

2 files changed

+33
-23
lines changed

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -465,20 +465,24 @@ private void writeBlobResumable(
465465
// operation is billed.
466466
stats.trackPutOperation();
467467
return;
468-
} catch (final StorageException se) {
469-
final int errorCode = se.getCode();
470-
if (errorCode == HTTP_GONE) {
471-
logger.warn(() -> format("Retrying broken resumable upload session for blob %s", blobInfo), se);
472-
storageException = ExceptionsHelper.useOrSuppress(storageException, se);
473-
inputStream.reset();
474-
continue;
475-
} else if (failIfAlreadyExists && errorCode == HTTP_PRECON_FAILED) {
476-
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
477-
}
478-
if (storageException != null) {
479-
se.addSuppressed(storageException);
468+
} catch (final IOException ioException) {
469+
final StorageException se = (StorageException) ExceptionsHelper.unwrap(ioException, StorageException.class);
470+
if (se != null) {
471+
final int errorCode = se.getCode();
472+
if (errorCode == HTTP_GONE) {
473+
logger.warn(() -> format("Retrying broken resumable upload session for blob %s", blobInfo), se);
474+
storageException = ExceptionsHelper.useOrSuppress(storageException, se);
475+
inputStream.reset();
476+
continue;
477+
} else if (failIfAlreadyExists && errorCode == HTTP_PRECON_FAILED) {
478+
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
479+
}
480+
if (storageException != null) {
481+
se.addSuppressed(storageException);
482+
}
483+
throw se;
480484
}
481-
throw se;
485+
throw ioException;
482486
}
483487
}
484488
assert storageException != null;

modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.sun.net.httpserver.HttpHandler;
2020

2121
import org.apache.http.HttpStatus;
22+
import org.elasticsearch.ExceptionsHelper;
2223
import org.elasticsearch.common.BackoffPolicy;
2324
import org.elasticsearch.common.Strings;
2425
import org.elasticsearch.common.UUIDs;
@@ -273,8 +274,8 @@ public void testWriteBlobWithReadTimeouts() {
273274
}
274275

275276
public void testWriteLargeBlob() throws IOException {
276-
// See {@link BaseWriteChannel#DEFAULT_CHUNK_SIZE}
277-
final int defaultChunkSize = 60 * 256 * 1024;
277+
// See {@link com.google.cloud.storage.BaseStorageWriteChannel.BaseStorageWriteChannel}
278+
final int defaultChunkSize = Math.toIntExact(ByteSizeValue.ofMb(16).getBytes());
278279
final int nbChunks = randomIntBetween(3, 5);
279280
final int lastChunkSize = randomIntBetween(1, defaultChunkSize - 1);
280281
final int totalChunks = nbChunks + 1;
@@ -358,13 +359,16 @@ public void testWriteLargeBlob() throws IOException {
358359
}
359360
}
360361

361-
final String range = exchange.getRequestHeaders().getFirst("Content-Range");
362-
assertTrue(Strings.hasLength(range));
362+
final String contentRangeHeaderValue = exchange.getRequestHeaders().getFirst("Content-Range");
363+
final HttpHeaderParser.ContentRange contentRange = HttpHeaderParser.parseContentRangeHeader(contentRangeHeaderValue);
364+
assertNotNull("Invalid content range header: " + contentRangeHeaderValue, contentRange);
363365

364-
if (range.equals("bytes */*")) {
366+
if (contentRange.hasRange() == false) {
367+
// Content-Range: */... is a status check
368+
// https://cloud.google.com/storage/docs/performing-resumable-uploads#status-check
365369
final int receivedSoFar = bytesReceived.get();
366370
if (receivedSoFar > 0) {
367-
exchange.getResponseHeaders().add("Range", Strings.format("bytes=0-%d", receivedSoFar));
371+
exchange.getResponseHeaders().add("Range", Strings.format("bytes=0-%s", receivedSoFar));
368372
}
369373
exchange.getResponseHeaders().add("Content-Length", "0");
370374
exchange.sendResponseHeaders(308 /* Resume Incomplete */, -1);
@@ -375,27 +379,29 @@ public void testWriteLargeBlob() throws IOException {
375379

376380
assertThat(Math.toIntExact(requestBody.length()), anyOf(equalTo(defaultChunkSize), equalTo(lastChunkSize)));
377381

378-
final HttpHeaderParser.ContentRange contentRange = HttpHeaderParser.parseContentRangeHeader(range);
379382
final int rangeStart = Math.toIntExact(contentRange.start());
380383
final int rangeEnd = Math.toIntExact(contentRange.end());
381384
assertThat(rangeEnd + 1 - rangeStart, equalTo(Math.toIntExact(requestBody.length())));
382385
assertThat(new BytesArray(data, rangeStart, rangeEnd - rangeStart + 1), is(requestBody));
383386
bytesReceived.updateAndGet(existing -> Math.max(existing, rangeEnd));
384387

385388
if (contentRange.size() != null) {
389+
exchange.getResponseHeaders().add("x-goog-stored-content-length", String.valueOf(bytesReceived.get() + 1));
386390
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
387391
return;
388392
} else {
389-
exchange.getResponseHeaders().add("Range", Strings.format("bytes=%d/%d", rangeStart, rangeEnd));
393+
exchange.getResponseHeaders().add("Range", Strings.format("bytes=%s-%s", rangeStart, rangeEnd));
390394
exchange.getResponseHeaders().add("Content-Length", "0");
391395
exchange.sendResponseHeaders(308 /* Resume Incomplete */, -1);
392396
return;
393397
}
394398
}
395-
}
396399

397-
if (randomBoolean()) {
398400
exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1);
401+
} else {
402+
ExceptionsHelper.maybeDieOnAnotherThread(
403+
new AssertionError("Unexpected request" + exchange.getRequestMethod() + " " + exchange.getRequestURI())
404+
);
399405
}
400406
}));
401407

0 commit comments

Comments
 (0)