Skip to content

Commit e94b246

Browse files
committed
Add test for contents changed on resume
1 parent 6238090 commit e94b246

File tree

4 files changed

+81
-19
lines changed

4 files changed

+81
-19
lines changed

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ private InputStream openStream() throws IOException {
124124
+ blobId.getName()
125125
+ "] generation ["
126126
+ lastGeneration
127-
+ "] unavailable on resume: "
127+
+ "] unavailable on resume (contents changed, or object deleted): "
128128
+ storageException.getMessage()
129129
)
130130
);
@@ -150,12 +150,12 @@ private InputStream openStream() throws IOException {
150150
}
151151

152152
private Long parseGenerationHeader(HttpResponse response) {
153-
final Object generationHeader = response.getHeaders().get("x-goog-generation");
154-
if (generationHeader instanceof String generationHeaderString) {
153+
final String generationHeader = response.getHeaders().getFirstHeaderStringValue("x-goog-generation");
154+
if (generationHeader != null) {
155155
try {
156-
return Long.parseLong(generationHeaderString);
156+
return Long.parseLong(generationHeader);
157157
} catch (NumberFormatException e) {
158-
assert false : "Unexpected value for x-goog-generation header: " + generationHeaderString;
158+
assert false : "Unexpected value for x-goog-generation header: " + generationHeader;
159159
return null;
160160
}
161161
}
@@ -248,7 +248,6 @@ private void ensureOpen() {
248248
}
249249
}
250250

251-
// TODO: check that object did not change when stream is reopened (e.g. based on etag)
252251
private void reopenStreamOrFail(StorageException e) throws IOException {
253252
if (attempt >= maxAttempts) {
254253
throw addSuppressedExceptions(e);

modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.elasticsearch.core.SuppressForbidden;
4646
import org.elasticsearch.core.TimeValue;
4747
import org.elasticsearch.http.ResponseInjectingHttpHandler;
48+
import org.elasticsearch.mocksocket.MockHttpServer;
4849
import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase;
4950
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
5051
import org.elasticsearch.rest.RestStatus;
@@ -56,6 +57,7 @@
5657
import java.io.InputStream;
5758
import java.net.InetSocketAddress;
5859
import java.net.SocketTimeoutException;
60+
import java.nio.file.NoSuchFileException;
5961
import java.util.Arrays;
6062
import java.util.HashMap;
6163
import java.util.Iterator;
@@ -71,6 +73,7 @@
7173

7274
import static fixture.gcs.TestUtils.createServiceAccount;
7375
import static java.nio.charset.StandardCharsets.UTF_8;
76+
import static org.elasticsearch.common.io.Streams.readFully;
7477
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
7578
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes;
7679
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageBlobStore.MAX_DELETES_PER_BATCH;
@@ -86,6 +89,7 @@
8689
import static org.hamcrest.Matchers.is;
8790
import static org.hamcrest.Matchers.lessThanOrEqualTo;
8891
import static org.hamcrest.Matchers.notNullValue;
92+
import static org.hamcrest.Matchers.startsWith;
8993

9094
@SuppressForbidden(reason = "use a http server")
9195
public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobContainerRetriesTestCase {
@@ -570,6 +574,49 @@ public void testCompareAndExchangeWhenThrottled() throws IOException {
570574
container.delete(randomPurpose());
571575
}
572576

577+
public void testContentsChangeWhileStreaming() throws IOException {
578+
GoogleCloudStorageHttpHandler handler = new GoogleCloudStorageHttpHandler("bucket");
579+
httpServer.createContext("/", handler);
580+
final int enoughBytesToTriggerChunkedDownload = Math.toIntExact(ByteSizeValue.ofMb(30).getBytes());
581+
582+
final BlobContainer container = createBlobContainer(1, null, null, null, null, null, null);
583+
584+
final String key = randomIdentifier();
585+
byte[] initialValue = randomByteArrayOfLength(enoughBytesToTriggerChunkedDownload);
586+
container.writeBlob(randomPurpose(), key, new BytesArray(initialValue), true);
587+
588+
BytesReference reference = readFully(container.readBlob(randomPurpose(), key));
589+
assertEquals(new BytesArray(initialValue), reference);
590+
591+
try (InputStream inputStream = container.readBlob(randomPurpose(), key)) {
592+
// Trigger the first chunk to load
593+
int read = inputStream.read();
594+
assert read != -1;
595+
596+
// Restart the server (this triggers a retry)
597+
restartHttpServer();
598+
httpServer.createContext("/", handler);
599+
600+
// Update the file
601+
byte[] updatedValue = randomByteArrayOfLength(enoughBytesToTriggerChunkedDownload);
602+
container.writeBlob(randomPurpose(), key, new BytesArray(updatedValue), false);
603+
604+
// Read the rest of the stream, it should throw because the contents changed
605+
String message = assertThrows(NoSuchFileException.class, () -> readFully(inputStream)).getMessage();
606+
assertThat(
607+
message,
608+
startsWith("Blob object [" + key + "] generation [1] unavailable on resume (contents changed, or object deleted):")
609+
);
610+
}
611+
}
612+
613+
private void restartHttpServer() throws IOException {
614+
InetSocketAddress currentAddress = httpServer.getAddress();
615+
httpServer.stop(0);
616+
httpServer = MockHttpServer.createHttp(currentAddress, 0);
617+
httpServer.start();
618+
}
619+
573620
private HttpHandler safeHandler(HttpHandler handler) {
574621
final HttpHandler loggingHandler = ESMockAPIBasedRepositoryIntegTestCase.wrap(handler, logger);
575622
return exchange -> {

test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
import com.sun.net.httpserver.HttpExchange;
1212
import com.sun.net.httpserver.HttpHandler;
1313

14-
import org.apache.logging.log4j.LogManager;
15-
import org.apache.logging.log4j.Logger;
1614
import org.elasticsearch.common.Strings;
1715
import org.elasticsearch.common.bytes.BytesReference;
1816
import org.elasticsearch.common.io.Streams;
@@ -44,8 +42,8 @@
4442
@SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint")
4543
public class GoogleCloudStorageHttpHandler implements HttpHandler {
4644

47-
private static final Logger logger = LogManager.getLogger(GoogleCloudStorageHttpHandler.class);
4845
private static final String IF_GENERATION_MATCH = "ifGenerationMatch";
46+
private static final String GENERATION = "generation";
4947

5048
private final AtomicInteger defaultPageLimit = new AtomicInteger(1_000);
5149
private final MockGcsBlobStore mockGcsBlobStore;
@@ -82,7 +80,8 @@ public void handle(final HttpExchange exchange) throws IOException {
8280
} else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o/*", request)) {
8381
final String key = exchange.getRequestURI().getPath().replace("/storage/v1/b/" + bucket + "/o/", "");
8482
final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH);
85-
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(key, ifGenerationMatch);
83+
final Long generation = parseOptionalLongParameter(exchange, GENERATION);
84+
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(key, ifGenerationMatch, generation);
8685
writeBlobVersionAsJson(exchange, blob);
8786
} else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o*", request)) {
8887
// List Objects https://cloud.google.com/storage/docs/json_api/v1/objects/list
@@ -116,7 +115,8 @@ public void handle(final HttpExchange exchange) throws IOException {
116115
// Download Object https://cloud.google.com/storage/docs/request-body
117116
final String path = exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", "");
118117
final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH);
119-
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(path, ifGenerationMatch);
118+
final Long generation = parseOptionalLongParameter(exchange, GENERATION);
119+
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(path, ifGenerationMatch, generation);
120120
if (blob != null) {
121121
final String rangeHeader = exchange.getRequestHeaders().getFirst("Range");
122122
final BytesReference response;
@@ -144,6 +144,7 @@ public void handle(final HttpExchange exchange) throws IOException {
144144
// we implement "metageneration", at that point we must incorporate both
145145
// See: https://cloud.google.com/storage/docs/metadata#etags
146146
exchange.getResponseHeaders().add("ETag", String.valueOf(blob.generation()));
147+
exchange.getResponseHeaders().add("X-Goog-Generation", String.valueOf(blob.generation()));
147148
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
148149
exchange.sendResponseHeaders(statusCode, response.length());
149150
response.writeTo(exchange.getResponseBody());

test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MockGcsBlobStore.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,18 +88,29 @@ public int length() {
8888
}
8989
}
9090

91-
BlobVersion getBlob(String path, Long ifGenerationMatch) {
91+
/**
92+
* Get the blob at the specified path
93+
*
94+
* @param path The path
95+
* @param ifGenerationMatch The ifGenerationMatch parameter value (if present)
96+
* @param generation The generation parameter value (if present)
97+
* @return The blob if it exists
98+
* @throws BlobNotFoundException if there is no blob at the path, or it's generation does not match the generation parameter
99+
* @throws GcsRestException if the blob's generation does not match the ifGenerationMatch parameter
100+
*/
101+
BlobVersion getBlob(String path, Long ifGenerationMatch, Long generation) {
92102
final BlobVersion blob = blobs.get(path);
93103
if (blob == null) {
94104
throw new BlobNotFoundException(path);
95105
} else {
96-
if (ifGenerationMatch != null) {
97-
if (blob.generation != ifGenerationMatch) {
98-
throw new GcsRestException(
99-
RestStatus.PRECONDITION_FAILED,
100-
"Generation mismatch, expected " + ifGenerationMatch + " but got " + blob.generation
101-
);
102-
}
106+
if (generation != null && generation != blob.generation) {
107+
throw new BlobNotFoundException(blob.path, blob.generation);
108+
}
109+
if (ifGenerationMatch != null && blob.generation != ifGenerationMatch) {
110+
throw new GcsRestException(
111+
RestStatus.PRECONDITION_FAILED,
112+
"Generation mismatch, expected " + ifGenerationMatch + " but got " + blob.generation
113+
);
103114
}
104115
return blob;
105116
}
@@ -324,6 +335,10 @@ static class BlobNotFoundException extends GcsRestException {
324335
BlobNotFoundException(String path) {
325336
super(RestStatus.NOT_FOUND, "Blob not found: " + path);
326337
}
338+
339+
BlobNotFoundException(String path, long generation) {
340+
super(RestStatus.NOT_FOUND, "Blob not found: " + path + ", generation " + generation);
341+
}
327342
}
328343

329344
static class GcsRestException extends RuntimeException {

0 commit comments

Comments
 (0)