Skip to content

Commit 6238090

Browse files
committed
Specify generation on resume
1 parent 73872c8 commit 6238090

File tree

2 files changed

+43
-3
lines changed

2 files changed

+43
-3
lines changed

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class GoogleCloudStorageRetryingInputStream extends InputStream {
5252
private List<StorageException> failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS);
5353
private long currentOffset;
5454
private boolean closed;
55+
private Long lastGeneration;
5556

5657
// Used for testing only
5758
GoogleCloudStorageRetryingInputStream(OperationPurpose purpose, MeteredStorage client, BlobId blobId) throws IOException {
@@ -84,13 +85,22 @@ private InputStream openStream() throws IOException {
8485
return SocketAccess.doPrivilegedIOException(() -> {
8586
final var meteredGet = client.meteredObjectsGet(purpose, blobId.getBucket(), blobId.getName());
8687
meteredGet.setReturnRawInputStream(true);
88+
if (lastGeneration != null) {
89+
meteredGet.setGeneration(lastGeneration);
90+
}
8791

8892
if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) {
8993
if (meteredGet.getRequestHeaders() != null) {
9094
meteredGet.getRequestHeaders().setRange("bytes=" + Math.addExact(start, currentOffset) + "-" + end);
9195
}
9296
}
9397
final HttpResponse resp = meteredGet.executeMedia();
98+
// Store the generation of the first response we received, so we can detect
99+
// if the file has changed if we need to resume
100+
if (lastGeneration == null) {
101+
lastGeneration = parseGenerationHeader(resp);
102+
}
103+
94104
final Long contentLength = resp.getHeaders().getContentLength();
95105
InputStream content = resp.getContent();
96106
if (contentLength != null) {
@@ -107,9 +117,22 @@ private InputStream openStream() throws IOException {
107117
}
108118
} catch (StorageException storageException) {
109119
if (storageException.getCode() == RestStatus.NOT_FOUND.getStatus()) {
110-
throw addSuppressedExceptions(
111-
new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + storageException.getMessage())
112-
);
120+
if (lastGeneration != null) {
121+
throw addSuppressedExceptions(
122+
new NoSuchFileException(
123+
"Blob object ["
124+
+ blobId.getName()
125+
+ "] generation ["
126+
+ lastGeneration
127+
+ "] unavailable on resume: "
128+
+ storageException.getMessage()
129+
)
130+
);
131+
} else {
132+
throw addSuppressedExceptions(
133+
new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + storageException.getMessage())
134+
);
135+
}
113136
}
114137
if (storageException.getCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()) {
115138
long currentPosition = Math.addExact(start, currentOffset);
@@ -126,6 +149,19 @@ private InputStream openStream() throws IOException {
126149
}
127150
}
128151

152+
private Long parseGenerationHeader(HttpResponse response) {
153+
final Object generationHeader = response.getHeaders().get("x-goog-generation");
154+
if (generationHeader instanceof String generationHeaderString) {
155+
try {
156+
return Long.parseLong(generationHeaderString);
157+
} catch (NumberFormatException e) {
158+
assert false : "Unexpected value for x-goog-generation header: " + generationHeaderString;
159+
return null;
160+
}
161+
}
162+
return null;
163+
}
164+
129165
// Google's SDK ignores the Content-Length header when no bytes are sent, see NetHttpResponse.SizeValidatingInputStream
130166
// We have to implement our own validation logic here
131167
static final class ContentLengthValidatingInputStream extends FilterInputStream {

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/MeteredStorage.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ public void setReturnRawInputStream(boolean b) {
144144
get.setReturnRawInputStream(b);
145145
}
146146

147+
public void setGeneration(Long generation) {
148+
get.setGeneration(generation);
149+
}
150+
147151
public HttpHeaders getRequestHeaders() {
148152
return get.getRequestHeaders();
149153
}

0 commit comments

Comments
 (0)