Skip to content

Commit 268e39b

Browse files
authored
Make GoogleCloudStorageRetryingInputStream request same generation on resume (elastic#127626)
1 parent 61faf42 commit 268e39b

File tree

8 files changed

+197
-20
lines changed

8 files changed

+197
-20
lines changed

modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageThirdPartyTests.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,27 @@
1515
import com.google.cloud.storage.StorageException;
1616

1717
import org.elasticsearch.action.support.master.AcknowledgedResponse;
18+
import org.elasticsearch.common.bytes.BytesArray;
1819
import org.elasticsearch.common.settings.MockSecureSettings;
1920
import org.elasticsearch.common.settings.SecureSettings;
2021
import org.elasticsearch.common.settings.Settings;
22+
import org.elasticsearch.common.unit.ByteSizeValue;
2123
import org.elasticsearch.core.Booleans;
2224
import org.elasticsearch.plugins.Plugin;
2325
import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;
26+
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
2427
import org.elasticsearch.rest.RestStatus;
2528
import org.junit.ClassRule;
2629

30+
import java.io.InputStream;
31+
import java.nio.file.NoSuchFileException;
2732
import java.util.Base64;
2833
import java.util.Collection;
2934

35+
import static org.elasticsearch.common.io.Streams.readFully;
36+
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
3037
import static org.hamcrest.Matchers.blankOrNullString;
38+
import static org.hamcrest.Matchers.containsString;
3139
import static org.hamcrest.Matchers.equalTo;
3240
import static org.hamcrest.Matchers.not;
3341

@@ -95,4 +103,37 @@ public void testReadFromPositionLargerThanBlobLength() {
95103
e -> asInstanceOf(StorageException.class, e.getCause()).getCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()
96104
);
97105
}
106+
107+
public void testResumeAfterUpdate() {
108+
109+
// The blob needs to be large enough that it won't be entirely buffered on the first request
110+
final int enoughBytesToNotBeEntirelyBuffered = Math.toIntExact(ByteSizeValue.ofMb(5).getBytes());
111+
112+
final BlobStoreRepository repo = getRepository();
113+
final String blobKey = randomIdentifier();
114+
final byte[] initialValue = randomByteArrayOfLength(enoughBytesToNotBeEntirelyBuffered);
115+
executeOnBlobStore(repo, container -> {
116+
container.writeBlob(randomPurpose(), blobKey, new BytesArray(initialValue), true);
117+
118+
try (InputStream inputStream = container.readBlob(randomPurpose(), blobKey)) {
119+
// Trigger the first request for the blob, partially read it
120+
int read = inputStream.read();
121+
assert read != -1;
122+
123+
// Close the current underlying stream (this will force a resume)
124+
asInstanceOf(GoogleCloudStorageRetryingInputStream.class, inputStream).closeCurrentStream();
125+
126+
// Update the file
127+
byte[] updatedValue = randomByteArrayOfLength(enoughBytesToNotBeEntirelyBuffered);
128+
container.writeBlob(randomPurpose(), blobKey, new BytesArray(updatedValue), false);
129+
130+
// Read the rest of the stream, it should throw because the contents changed
131+
String message = assertThrows(NoSuchFileException.class, () -> readFully(inputStream)).getMessage();
132+
assertThat(message, containsString("unavailable on resume (contents changed, or object deleted):"));
133+
} catch (Exception e) {
134+
fail(e);
135+
}
136+
return null;
137+
});
138+
}
98139
}

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class GoogleCloudStorageRetryingInputStream extends InputStream {
5252
private List<StorageException> failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS);
5353
private long currentOffset;
5454
private boolean closed;
55+
private Long lastGeneration;
5556

5657
// Used for testing only
5758
GoogleCloudStorageRetryingInputStream(OperationPurpose purpose, MeteredStorage client, BlobId blobId) throws IOException {
@@ -83,13 +84,22 @@ private InputStream openStream() throws IOException {
8384
try {
8485
final var meteredGet = client.meteredObjectsGet(purpose, blobId.getBucket(), blobId.getName());
8586
meteredGet.setReturnRawInputStream(true);
87+
if (lastGeneration != null) {
88+
meteredGet.setGeneration(lastGeneration);
89+
}
8690

8791
if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) {
8892
if (meteredGet.getRequestHeaders() != null) {
8993
meteredGet.getRequestHeaders().setRange("bytes=" + Math.addExact(start, currentOffset) + "-" + end);
9094
}
9195
}
9296
final HttpResponse resp = meteredGet.executeMedia();
97+
// Store the generation of the first response we received, so we can detect
98+
// if the file has changed if we need to resume
99+
if (lastGeneration == null) {
100+
lastGeneration = parseGenerationHeader(resp);
101+
}
102+
93103
final Long contentLength = resp.getHeaders().getContentLength();
94104
InputStream content = resp.getContent();
95105
if (contentLength != null) {
@@ -105,9 +115,22 @@ private InputStream openStream() throws IOException {
105115
}
106116
} catch (StorageException storageException) {
107117
if (storageException.getCode() == RestStatus.NOT_FOUND.getStatus()) {
108-
throw addSuppressedExceptions(
109-
new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + storageException.getMessage())
110-
);
118+
if (lastGeneration != null) {
119+
throw addSuppressedExceptions(
120+
new NoSuchFileException(
121+
"Blob object ["
122+
+ blobId.getName()
123+
+ "] generation ["
124+
+ lastGeneration
125+
+ "] unavailable on resume (contents changed, or object deleted): "
126+
+ storageException.getMessage()
127+
)
128+
);
129+
} else {
130+
throw addSuppressedExceptions(
131+
new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + storageException.getMessage())
132+
);
133+
}
111134
}
112135
if (storageException.getCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()) {
113136
long currentPosition = Math.addExact(start, currentOffset);
@@ -124,6 +147,24 @@ private InputStream openStream() throws IOException {
124147
}
125148
}
126149

150+
private Long parseGenerationHeader(HttpResponse response) {
151+
final String generationHeader = response.getHeaders().getFirstHeaderStringValue("x-goog-generation");
152+
if (generationHeader != null) {
153+
try {
154+
return Long.parseLong(generationHeader);
155+
} catch (NumberFormatException e) {
156+
final String message = "Unexpected value for x-goog-generation header: " + generationHeader;
157+
logger.warn(message);
158+
assert false : message;
159+
}
160+
} else {
161+
String message = "Missing x-goog-generation header";
162+
logger.warn(message);
163+
assert false : message;
164+
}
165+
return null;
166+
}
167+
127168
// Google's SDK ignores the Content-Length header when no bytes are sent, see NetHttpResponse.SizeValidatingInputStream
128169
// We have to implement our own validation logic here
129170
static final class ContentLengthValidatingInputStream extends FilterInputStream {
@@ -203,14 +244,21 @@ public int read(byte[] b, int off, int len) throws IOException {
203244
}
204245
}
205246

247+
/**
248+
* Close the current stream, used to test resume
249+
*/
250+
// @VisibleForTesting
251+
void closeCurrentStream() throws IOException {
252+
currentStream.close();
253+
}
254+
206255
private void ensureOpen() {
207256
if (closed) {
208257
assert false : "using GoogleCloudStorageRetryingInputStream after close";
209258
throw new IllegalStateException("using GoogleCloudStorageRetryingInputStream after close");
210259
}
211260
}
212261

213-
// TODO: check that object did not change when stream is reopened (e.g. based on etag)
214262
private void reopenStreamOrFail(StorageException e) throws IOException {
215263
if (attempt >= maxAttempts) {
216264
throw addSuppressedExceptions(e);

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/MeteredStorage.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,10 @@ public void setReturnRawInputStream(boolean b) {
140140
get.setReturnRawInputStream(b);
141141
}
142142

143+
public void setGeneration(Long generation) {
144+
get.setGeneration(generation);
145+
}
146+
143147
public HttpHeaders getRequestHeaders() {
144148
return get.getRequestHeaders();
145149
}

modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.google.cloud.http.HttpTransportOptions;
2020
import com.google.cloud.storage.StorageException;
2121
import com.google.cloud.storage.StorageOptions;
22+
import com.sun.net.httpserver.HttpExchange;
2223
import com.sun.net.httpserver.HttpHandler;
2324

2425
import org.apache.http.HttpStatus;
@@ -45,6 +46,7 @@
4546
import org.elasticsearch.core.SuppressForbidden;
4647
import org.elasticsearch.core.TimeValue;
4748
import org.elasticsearch.http.ResponseInjectingHttpHandler;
49+
import org.elasticsearch.mocksocket.MockHttpServer;
4850
import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase;
4951
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
5052
import org.elasticsearch.rest.RestStatus;
@@ -56,6 +58,7 @@
5658
import java.io.InputStream;
5759
import java.net.InetSocketAddress;
5860
import java.net.SocketTimeoutException;
61+
import java.nio.file.NoSuchFileException;
5962
import java.util.Arrays;
6063
import java.util.HashMap;
6164
import java.util.Iterator;
@@ -71,6 +74,7 @@
7174

7275
import static fixture.gcs.TestUtils.createServiceAccount;
7376
import static java.nio.charset.StandardCharsets.UTF_8;
77+
import static org.elasticsearch.common.io.Streams.readFully;
7478
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
7579
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes;
7680
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageBlobStore.MAX_DELETES_PER_BATCH;
@@ -86,6 +90,7 @@
8690
import static org.hamcrest.Matchers.is;
8791
import static org.hamcrest.Matchers.lessThanOrEqualTo;
8892
import static org.hamcrest.Matchers.notNullValue;
93+
import static org.hamcrest.Matchers.startsWith;
8994

9095
@SuppressForbidden(reason = "use a http server")
9196
public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobContainerRetriesTestCase {
@@ -212,6 +217,11 @@ public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions<?, ?> ser
212217
);
213218
}
214219

220+
@Override
221+
protected void addSuccessfulDownloadHeaders(HttpExchange exchange) {
222+
exchange.getResponseHeaders().add("x-goog-generation", String.valueOf(randomNonNegativeInt()));
223+
}
224+
215225
public void testShouldRetryOnConnectionRefused() {
216226
// port 1 should never be open
217227
endpointUrlOverride = "http://127.0.0.1:1";
@@ -242,6 +252,7 @@ public void testReadLargeBlobWithRetries() throws Exception {
242252
httpServer.createContext(downloadStorageEndpoint(blobContainer, "large_blob_retries"), exchange -> {
243253
Streams.readFully(exchange.getRequestBody());
244254
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
255+
addSuccessfulDownloadHeaders(exchange);
245256
final HttpHeaderParser.Range range = getRange(exchange);
246257
final int offset = Math.toIntExact(range.start());
247258
final byte[] chunk = Arrays.copyOfRange(bytes, offset, Math.toIntExact(Math.min(range.end() + 1, bytes.length)));
@@ -570,6 +581,55 @@ public void testCompareAndExchangeWhenThrottled() throws IOException {
570581
container.delete(randomPurpose());
571582
}
572583

584+
public void testContentsChangeWhileStreaming() throws IOException {
585+
GoogleCloudStorageHttpHandler handler = new GoogleCloudStorageHttpHandler("bucket");
586+
httpServer.createContext("/", handler);
587+
// The blob needs to be large enough that it won't be entirely buffered on the first request
588+
final int enoughBytesToNotBeEntirelyBuffered = Math.toIntExact(ByteSizeValue.ofMb(30).getBytes());
589+
590+
final BlobContainer container = createBlobContainer(1, null, null, null, null, null, null);
591+
592+
final String key = randomIdentifier();
593+
byte[] initialValue = randomByteArrayOfLength(enoughBytesToNotBeEntirelyBuffered);
594+
container.writeBlob(randomPurpose(), key, new BytesArray(initialValue), true);
595+
596+
BytesReference reference = readFully(container.readBlob(randomPurpose(), key));
597+
assertEquals(new BytesArray(initialValue), reference);
598+
599+
try (InputStream inputStream = container.readBlob(randomPurpose(), key)) {
600+
// Trigger the first chunk to load
601+
int read = inputStream.read();
602+
assert read != -1;
603+
604+
// Restart the server (this triggers a retry)
605+
restartHttpServer();
606+
httpServer.createContext("/", handler);
607+
608+
// Update the file
609+
byte[] updatedValue = randomByteArrayOfLength(enoughBytesToNotBeEntirelyBuffered);
610+
container.writeBlob(randomPurpose(), key, new BytesArray(updatedValue), false);
611+
612+
// Read the rest of the stream, it should throw because the contents changed
613+
String message = assertThrows(NoSuchFileException.class, () -> readFully(inputStream)).getMessage();
614+
assertThat(
615+
message,
616+
startsWith(
617+
"Blob object ["
618+
+ container.path().buildAsString()
619+
+ key
620+
+ "] generation [1] unavailable on resume (contents changed, or object deleted):"
621+
)
622+
);
623+
}
624+
}
625+
626+
private void restartHttpServer() throws IOException {
627+
InetSocketAddress currentAddress = httpServer.getAddress();
628+
httpServer.stop(0);
629+
httpServer = MockHttpServer.createHttp(currentAddress, 0);
630+
httpServer.start();
631+
}
632+
573633
private HttpHandler safeHandler(HttpHandler handler) {
574634
final HttpHandler loggingHandler = ESMockAPIBasedRepositoryIntegTestCase.wrap(handler, logger);
575635
return exchange -> {

modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStreamTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ public LowLevelHttpResponse execute() throws IOException {
170170
result.setContent(content);
171171
result.setContentLength(contentLength);
172172
result.setContentType("application/octet-stream");
173+
result.addHeader("x-goog-generation", String.valueOf(randomNonNegativeInt()));
173174
result.setStatusCode(RestStatus.OK.getStatus());
174175
return result;
175176
}

test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
import com.sun.net.httpserver.HttpExchange;
1212
import com.sun.net.httpserver.HttpHandler;
1313

14-
import org.apache.logging.log4j.LogManager;
15-
import org.apache.logging.log4j.Logger;
1614
import org.elasticsearch.common.Strings;
1715
import org.elasticsearch.common.bytes.BytesReference;
1816
import org.elasticsearch.common.io.Streams;
@@ -44,8 +42,8 @@
4442
@SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint")
4543
public class GoogleCloudStorageHttpHandler implements HttpHandler {
4644

47-
private static final Logger logger = LogManager.getLogger(GoogleCloudStorageHttpHandler.class);
4845
private static final String IF_GENERATION_MATCH = "ifGenerationMatch";
46+
private static final String GENERATION = "generation";
4947

5048
private final AtomicInteger defaultPageLimit = new AtomicInteger(1_000);
5149
private final MockGcsBlobStore mockGcsBlobStore;
@@ -82,7 +80,8 @@ public void handle(final HttpExchange exchange) throws IOException {
8280
} else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o/*", request)) {
8381
final String key = exchange.getRequestURI().getPath().replace("/storage/v1/b/" + bucket + "/o/", "");
8482
final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH);
85-
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(key, ifGenerationMatch);
83+
final Long generation = parseOptionalLongParameter(exchange, GENERATION);
84+
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(key, ifGenerationMatch, generation);
8685
writeBlobVersionAsJson(exchange, blob);
8786
} else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o*", request)) {
8887
// List Objects https://cloud.google.com/storage/docs/json_api/v1/objects/list
@@ -116,7 +115,8 @@ public void handle(final HttpExchange exchange) throws IOException {
116115
// Download Object https://cloud.google.com/storage/docs/request-body
117116
final String path = exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", "");
118117
final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH);
119-
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(path, ifGenerationMatch);
118+
final Long generation = parseOptionalLongParameter(exchange, GENERATION);
119+
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(path, ifGenerationMatch, generation);
120120
if (blob != null) {
121121
final String rangeHeader = exchange.getRequestHeaders().getFirst("Range");
122122
final BytesReference response;
@@ -144,6 +144,7 @@ public void handle(final HttpExchange exchange) throws IOException {
144144
// we implement "metageneration", at that point we must incorporate both
145145
// See: https://cloud.google.com/storage/docs/metadata#etags
146146
exchange.getResponseHeaders().add("ETag", String.valueOf(blob.generation()));
147+
exchange.getResponseHeaders().add("x-goog-generation", String.valueOf(blob.generation()));
147148
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
148149
exchange.sendResponseHeaders(statusCode, response.length());
149150
response.writeTo(exchange.getResponseBody());

0 commit comments

Comments
 (0)