Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.regex.Regex;
Expand Down Expand Up @@ -59,6 +60,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.elasticsearch.common.io.Streams.readFully;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING;
Expand Down Expand Up @@ -206,6 +208,21 @@ public void testWriteReadLarge() throws IOException {
}
}

public void testWriteFileMultipleOfChunkSize() throws IOException {
final int uploadSize = randomIntBetween(2, 4) * GoogleCloudStorageBlobStore.SDK_DEFAULT_CHUNK_SIZE;
try (BlobStore store = newBlobStore()) {
final BlobContainer container = store.blobContainer(BlobPath.EMPTY);
final String key = randomIdentifier();
byte[] initialValue = randomByteArrayOfLength(uploadSize);
container.writeBlob(randomPurpose(), key, new BytesArray(initialValue), true);

BytesReference reference = readFully(container.readBlob(randomPurpose(), key));
assertEquals(new BytesArray(initialValue), reference);

container.deleteBlobsIgnoringIfNotExists(randomPurpose(), Iterators.single(key));
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a bug when we wrote a multiple of the chunk size in bytes, it sends
POST metadata
PUT chunk1
PUT chunk2
...
PUT no data with content-range header to indicate we're done

We had assumed the finished header was sent with the final chunk.


public static class TestGoogleCloudStoragePlugin extends GoogleCloudStoragePlugin {

public TestGoogleCloudStoragePlugin(Settings settings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,36 +129,33 @@ public void handle(final HttpExchange exchange) throws IOException {
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(path, ifGenerationMatch);
if (blob != null) {
final String rangeHeader = exchange.getRequestHeaders().getFirst("Range");
final long offset;
final long end;
final BytesReference response;
final int statusCode;
if (rangeHeader == null) {
offset = 0L;
end = blob.contents().length() - 1;
response = blob.contents();
statusCode = RestStatus.OK.getStatus();
} else {
final HttpHeaderParser.Range range = HttpHeaderParser.parseRangeHeader(rangeHeader);
if (range == null) {
throw new AssertionError("Range bytes header does not match expected format: " + rangeHeader);
}
offset = range.start();
end = range.end();
}

if (offset >= blob.contents().length()) {
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.sendResponseHeaders(RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus(), -1);
return;
}
if (range.start() >= blob.contents().length()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder, should we be checking the range end here? If you ask for bytes 0-10 of an 8-byte blob, we cannot satisfy that range.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, probably. I'll have a look at what the spec says, currently we just send as much of it as we have.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also adding ETags, because it looks like that's the mechanism that the BlobReadChannel uses to detect that the file it's streaming has changed under it. We can implement them trivially using generation I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it looks like

  • We don't yet check that the file hasn't changed when resuming a download (I raised ES-11432 to address at some point)
  • We do need to allow ranges beyond the end of actual content (streaming uses this to request a chunk). There may be more to the story, but if we start enforcing the ranges all that breaks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I confirmed this testing against the actual API

  • If the start of the range is past the end of the file, you get a 416
  • If the end of the range is past the end of the file, but the start is in it, you get a 206 and as much of the file as can be delivered

exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.sendResponseHeaders(RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus(), -1);
return;
}

BytesReference response = blob.contents();
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
final int bufferedLength = response.length();
if (offset > 0 || bufferedLength > end) {
response = response.slice(
Math.toIntExact(offset),
Math.toIntExact(Math.min(end + 1 - offset, bufferedLength - offset))
);
final long lastIndex = Math.min(range.end(), blob.contents().length() - 1);
response = blob.contents().slice(Math.toIntExact(range.start()), Math.toIntExact(lastIndex - range.start() + 1));
statusCode = RestStatus.PARTIAL_CONTENT.getStatus();
}
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length());
// I think it's enough to use the generation here, at least until
// we implement "metageneration", at that point we must incorporate both
// See: https://cloud.google.com/storage/docs/metadata#etags
exchange.getResponseHeaders().add("ETag", String.valueOf(blob.generation()));
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.sendResponseHeaders(statusCode, response.length());
response.writeTo(exchange.getResponseBody());
} else {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,53 @@ public class MockGcsBlobStore {

record BlobVersion(String path, long generation, BytesReference contents) {}

record ResumableUpload(String uploadId, String path, Long ifGenerationMatch, BytesReference contents, boolean completed) {
record ResumableUpload(
String uploadId,
String path,
Long ifGenerationMatch,
BytesReference contents,
Integer finalLength,
boolean completed
) {

ResumableUpload(String uploadId, String path, Long ifGenerationMatch) {
this(uploadId, path, ifGenerationMatch, BytesArray.EMPTY, null, false);
}

public ResumableUpload update(BytesReference contents) {
if (completed) {
throw new IllegalStateException("Blob already completed");
}
return new ResumableUpload(uploadId, path, ifGenerationMatch, contents, null, false);
}

/**
* When we complete, we nullify our reference to the contents to allow it to be collected if it gets overwritten
*/
public ResumableUpload complete() {
if (completed) {
throw new IllegalStateException("Blob already completed");
}
return new ResumableUpload(uploadId, path, ifGenerationMatch, null, contents.length(), true);
}

public ResumableUpload update(BytesReference contents, boolean completed) {
return new ResumableUpload(uploadId, path, ifGenerationMatch, contents, completed);
public HttpHeaderParser.Range getRange() {
int length = length();
if (length > 0) {
return new HttpHeaderParser.Range(0, length - 1);
} else {
return null;
}
}

public int length() {
if (finalLength != null) {
return finalLength;
}
if (contents != null) {
return contents.length();
}
return 0;
}
}

Expand Down Expand Up @@ -93,7 +136,7 @@ BlobVersion updateBlob(String path, Long ifGenerationMatch, BytesReference conte

ResumableUpload createResumableUpload(String path, Long ifGenerationMatch) {
final String uploadId = UUIDs.randomBase64UUID();
final ResumableUpload value = new ResumableUpload(uploadId, path, ifGenerationMatch, BytesArray.EMPTY, false);
final ResumableUpload value = new ResumableUpload(uploadId, path, ifGenerationMatch);
resumableUploads.put(uploadId, value);
return value;
}
Expand All @@ -114,16 +157,14 @@ UpdateResponse updateResumableUpload(String uploadId, HttpHeaderParser.ContentRa
throw failAndThrow("Attempted to update a non-existent resumable: " + uid);
}

if (contentRange.hasRange() == false) {
// Content-Range: */... is a status check https://cloud.google.com/storage/docs/performing-resumable-uploads#status-check
ResumableUpload valueToReturn = existing;

// Handle the request, a range indicates a chunk of data was submitted
if (contentRange.hasRange()) {
if (existing.completed) {
updateResponse.set(new UpdateResponse(RestStatus.OK.getStatus(), calculateRangeHeader(blobs.get(existing.path))));
} else {
final HttpHeaderParser.Range range = calculateRangeHeader(existing);
updateResponse.set(new UpdateResponse(RESUME_INCOMPLETE, range));
throw failAndThrow("Attempted to write more to a completed resumable upload");
}
return existing;
} else {

if (contentRange.start() > contentRange.end()) {
throw failAndThrow("Invalid content range " + contentRange);
}
Expand All @@ -143,29 +184,25 @@ UpdateResponse updateResumableUpload(String uploadId, HttpHeaderParser.ContentRa
existing.contents,
requestBody.slice(offset, requestBody.length())
);
// We just received the last chunk, update the blob and remove the resumable upload from the map
if (contentRange.hasSize() && updatedContent.length() == contentRange.size()) {
updateBlob(existing.path(), existing.ifGenerationMatch, updatedContent);
updateResponse.set(new UpdateResponse(RestStatus.OK.getStatus(), null));
return existing.update(BytesArray.EMPTY, true);
}
final ResumableUpload updated = existing.update(updatedContent, false);
updateResponse.set(new UpdateResponse(RESUME_INCOMPLETE, calculateRangeHeader(updated)));
return updated;
valueToReturn = existing.update(updatedContent);
}

// Next we determine the response
if (valueToReturn.completed) {
updateResponse.set(new UpdateResponse(RestStatus.OK.getStatus(), valueToReturn.getRange()));
} else if (contentRange.hasSize() && contentRange.size() == valueToReturn.contents.length()) {
updateBlob(valueToReturn.path(), valueToReturn.ifGenerationMatch(), valueToReturn.contents);
valueToReturn = valueToReturn.complete();
updateResponse.set(new UpdateResponse(RestStatus.OK.getStatus(), valueToReturn.getRange()));
} else {
updateResponse.set(new UpdateResponse(RESUME_INCOMPLETE, valueToReturn.getRange()));
}
return valueToReturn;
});
assert updateResponse.get() != null : "Should always produce an update response";
return updateResponse.get();
}

private static HttpHeaderParser.Range calculateRangeHeader(ResumableUpload resumableUpload) {
return resumableUpload.contents.length() > 0 ? new HttpHeaderParser.Range(0, resumableUpload.contents.length() - 1) : null;
}

private static HttpHeaderParser.Range calculateRangeHeader(BlobVersion blob) {
return blob.contents.length() > 0 ? new HttpHeaderParser.Range(0, blob.contents.length() - 1) : null;
}

record UpdateResponse(int statusCode, HttpHeaderParser.Range rangeHeader) {}

void deleteBlob(String path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,14 @@ public void testGetWithBytesRange() {
var end = blobBytes.length() - 1;
assertEquals(
"Exact Range: bytes=0-" + end,
new TestHttpResponse(RestStatus.OK, blobBytes, TestHttpExchange.EMPTY_HEADERS),
new TestHttpResponse(RestStatus.PARTIAL_CONTENT, blobBytes, TestHttpExchange.EMPTY_HEADERS),
getBlobContents(handler, bucket, blobName, null, new HttpHeaderParser.Range(0, end))
);

end = randomIntBetween(blobBytes.length() - 1, Integer.MAX_VALUE);
assertEquals(
"Larger Range: bytes=0-" + end,
new TestHttpResponse(RestStatus.OK, blobBytes, TestHttpExchange.EMPTY_HEADERS),
new TestHttpResponse(RestStatus.PARTIAL_CONTENT, blobBytes, TestHttpExchange.EMPTY_HEADERS),
getBlobContents(handler, bucket, blobName, null, new HttpHeaderParser.Range(0, end))
);

Expand All @@ -181,11 +181,38 @@ public void testGetWithBytesRange() {
end = start + length - 1;
assertEquals(
"Range: bytes=" + start + '-' + end,
new TestHttpResponse(RestStatus.OK, blobBytes.slice(start, length), TestHttpExchange.EMPTY_HEADERS),
new TestHttpResponse(RestStatus.PARTIAL_CONTENT, blobBytes.slice(start, length), TestHttpExchange.EMPTY_HEADERS),
getBlobContents(handler, bucket, blobName, null, new HttpHeaderParser.Range(start, end))
);
}

public void testZeroLengthObjectGets() {
final var bucket = randomIdentifier();
final var handler = new GoogleCloudStorageHttpHandler(bucket);
final var blobName = "blob_name_" + randomIdentifier();
final var blobBytes = BytesArray.EMPTY;

assertEquals(RestStatus.OK, executeMultipartUpload(handler, bucket, blobName, blobBytes, 0L).restStatus());

assertEquals(
"No Range",
new TestHttpResponse(RestStatus.OK, blobBytes, TestHttpExchange.EMPTY_HEADERS),
getBlobContents(handler, bucket, blobName, null, null)
);

assertEquals(
"Range 0-0",
new TestHttpResponse(RestStatus.REQUESTED_RANGE_NOT_SATISFIED, BytesArray.EMPTY, TestHttpExchange.EMPTY_HEADERS),
getBlobContents(handler, bucket, blobName, null, new HttpHeaderParser.Range(0, 0))
);

assertEquals(
"Random range x-y",
new TestHttpResponse(RestStatus.REQUESTED_RANGE_NOT_SATISFIED, BytesArray.EMPTY, TestHttpExchange.EMPTY_HEADERS),
getBlobContents(handler, bucket, blobName, null, new HttpHeaderParser.Range(randomIntBetween(0, 30), randomIntBetween(31, 100)))
);
}

public void testResumableUpload() {
final var bucket = randomIdentifier();
final var handler = new GoogleCloudStorageHttpHandler(bucket);
Expand Down Expand Up @@ -225,7 +252,7 @@ public void testResumableUpload() {

final var part3 = randomAlphaOfLength(30);
final var uploadPart3Response = handleRequest(handler, "PUT", sessionURI, part3, contentRangeHeader(100, 129, 130));
assertEquals(new TestHttpResponse(RestStatus.OK, TestHttpExchange.EMPTY_HEADERS), uploadPart3Response);
assertEquals(new TestHttpResponse(RestStatus.OK, rangeHeader(0, 129)), uploadPart3Response);

// status check
assertEquals(
Expand Down Expand Up @@ -545,7 +572,6 @@ private static TestHttpResponse executeUpload(
BytesReference bytes,
Long ifGenerationMatch
) {
assert bytes.length() > 20;
if (randomBoolean()) {
return executeResumableUpload(handler, bucket, blobName, bytes, ifGenerationMatch);
} else {
Expand All @@ -560,6 +586,7 @@ private static TestHttpResponse executeResumableUpload(
BytesReference bytes,
Long ifGenerationMatch
) {
assert bytes.length() >= 2 : "We can't split anything smaller than two";
final var createUploadResponse = handleRequest(
handler,
"POST",
Expand All @@ -572,7 +599,7 @@ private static TestHttpResponse executeResumableUpload(
final var sessionURI = locationHeader.substring(locationHeader.indexOf(HOST) + HOST.length());
assertEquals(RestStatus.OK, createUploadResponse.restStatus());

final int partBoundary = randomIntBetween(10, bytes.length() - 1);
final int partBoundary = randomIntBetween(1, bytes.length() - 1);
final var part1 = bytes.slice(0, partBoundary);
final var uploadPart1Response = handleRequest(handler, "PUT", sessionURI, part1, contentRangeHeader(0, partBoundary - 1, null));
assertEquals(RESUME_INCOMPLETE, uploadPart1Response.status());
Expand Down