Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6238090
Specify generation on resume
nicktindall May 2, 2025
e94b246
Add test for contents changed on resume
nicktindall May 2, 2025
bcab09b
Update docs/changelog/127626.yaml
nicktindall May 2, 2025
98a80c3
Assert we get a generation header
nicktindall May 2, 2025
08e1b95
Delete docs/changelog/127626.yaml
nicktindall May 2, 2025
09c6e6e
Add random path when present, don't assert generation header is present
nicktindall May 2, 2025
576109c
Update test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MockGcsBlo…
nicktindall May 4, 2025
4feb87d
Update test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MockGcsBlo…
nicktindall May 4, 2025
0886ba7
Reduce indenting
nicktindall May 4, 2025
cdfffa5
Assert that x-goog-generation is present
nicktindall May 4, 2025
d3c0e6a
Assert that x-goog-generation is present
nicktindall May 5, 2025
f3371e4
Log a warning if we see a missing or malformed generation header
nicktindall May 5, 2025
89b926a
Merge branch 'main' into ES-11432_check_generation_on_resume
nicktindall May 17, 2025
fca2161
Add x-goog-generation header to test
nicktindall May 18, 2025
2d2960c
Randomise x-goog-generation
nicktindall May 20, 2025
ce03dae
Tidy, improve naming
nicktindall May 20, 2025
eebcb0c
Test in third-party test
nicktindall May 21, 2025
4fd9b81
Merge branch 'main' into ES-11432_check_generation_on_resume
nicktindall May 21, 2025
a8826fb
Clean proxied repo after test
nicktindall May 21, 2025
aeea9f6
Try SO_REUSEADDR to allow restarting proxy
nicktindall May 21, 2025
03a9b8d
Make httpServer field volatile for thread-safe restarts
nicktindall May 22, 2025
289a594
Merge branch 'main' into ES-11432_check_generation_on_resume
nicktindall May 22, 2025
a60d7e0
Include repoName in base_path
nicktindall May 22, 2025
b0f9298
Add logging in proxy
nicktindall May 22, 2025
caaf45c
Don't use proxy to test resume
nicktindall May 22, 2025
b800710
Merge remote-tracking branch 'origin/main' into ES-11432_check_genera…
nicktindall May 22, 2025
0f8796b
Don't use proxy to test resume
nicktindall May 22, 2025
16d6c39
Merge remote-tracking branch 'origin/ES-11432_check_generation_on_res…
nicktindall May 22, 2025
a70e2ad
Don't use proxy to test resume
nicktindall May 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,53 @@
import com.google.cloud.storage.StorageException;

import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.SecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.rest.RestStatus;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;

import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.util.Base64;
import java.util.Collection;

import static org.elasticsearch.common.io.Streams.readFully;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.PROXY_HOST_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.PROXY_PORT_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.PROXY_TYPE_SETTING;
import static org.hamcrest.Matchers.blankOrNullString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;

public class GoogleCloudStorageThirdPartyTests extends AbstractThirdPartyRepositoryTestCase {
private static final boolean USE_FIXTURE = Booleans.parseBoolean(System.getProperty("test.google.fixture", "true"));
private static final String PROXIED_TEST_REPO = "proxied-test-repo";
private static final String PROXIED_CLIENT = "proxied";

@ClassRule
public static GoogleCloudStorageHttpFixture fixture = new GoogleCloudStorageHttpFixture(USE_FIXTURE, "bucket", "o/oauth2/token");
private static WebProxyServer proxyServer;

@BeforeClass
public static void beforeClass() {
proxyServer = new WebProxyServer();
}

@AfterClass
public static void afterClass() throws Exception {
proxyServer.close();
}

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
Expand All @@ -49,8 +75,15 @@ protected Settings nodeSettings() {
if (USE_FIXTURE) {
builder.put("gcs.client.default.endpoint", fixture.getAddress());
builder.put("gcs.client.default.token_uri", fixture.getAddress() + "/o/oauth2/token");
builder.put("gcs.client.proxied.endpoint", fixture.getAddress());
builder.put("gcs.client.proxied.token_uri", fixture.getAddress() + "/o/oauth2/token");
}

// Add a proxied client so we can test resume on fail
builder.put(PROXY_HOST_SETTING.getConcreteSettingForNamespace(PROXIED_CLIENT).getKey(), proxyServer.getHost());
builder.put(PROXY_PORT_SETTING.getConcreteSettingForNamespace(PROXIED_CLIENT).getKey(), proxyServer.getPort());
builder.put(PROXY_TYPE_SETTING.getConcreteSettingForNamespace(PROXIED_CLIENT).getKey(), "http");

return builder.build();
}

Expand All @@ -64,17 +97,26 @@ protected SecureSettings credentials() {
MockSecureSettings secureSettings = new MockSecureSettings();
if (USE_FIXTURE) {
secureSettings.setFile("gcs.client.default.credentials_file", TestUtils.createServiceAccount(random()));
secureSettings.setFile("gcs.client.proxied.credentials_file", TestUtils.createServiceAccount(random()));
} else {
secureSettings.setFile(
"gcs.client.default.credentials_file",
Base64.getDecoder().decode(System.getProperty("test.google.account"))
);
secureSettings.setFile(
"gcs.client.proxied.credentials_file",
Base64.getDecoder().decode(System.getProperty("test.google.account"))
);
}
return secureSettings;
}

@Override
protected void createRepository(final String repoName) {
createRepository(repoName, "default");
}

private void createRepository(final String repoName, String clientName) {
AcknowledgedResponse putRepositoryResponse = clusterAdmin().preparePutRepository(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
Expand All @@ -85,6 +127,7 @@ protected void createRepository(final String repoName) {
Settings.builder()
.put("bucket", System.getProperty("test.google.bucket"))
.put("base_path", System.getProperty("test.google.base", "/"))
.put("client", clientName)
)
.get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
Expand All @@ -95,4 +138,51 @@ public void testReadFromPositionLargerThanBlobLength() {
e -> asInstanceOf(StorageException.class, e.getCause()).getCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()
);
}

public void testResumeAfterUpdate() {
createRepository(PROXIED_TEST_REPO, PROXIED_CLIENT);

try {
// The blob needs to be large enough that it won't be entirely buffered on the first request
final int enoughBytesToNotBeEntirelyBuffered = Math.toIntExact(ByteSizeValue.ofMb(10).getBytes());

final BlobStoreRepository repo = getRepository(PROXIED_TEST_REPO);
final String blobKey = randomIdentifier();
final byte[] initialValue = randomByteArrayOfLength(enoughBytesToNotBeEntirelyBuffered);
executeOnBlobStore(repo, container -> {
container.writeBlob(randomPurpose(), blobKey, new BytesArray(initialValue), true);

try (InputStream inputStream = container.readBlob(randomPurpose(), blobKey)) {
// Trigger the first request for the blob, partially read it
int read = inputStream.read();
assert read != -1;

// Restart the server (this triggers a retry)
proxyServer.restart();

// Update the file
byte[] updatedValue = randomByteArrayOfLength(enoughBytesToNotBeEntirelyBuffered);
container.writeBlob(randomPurpose(), blobKey, new BytesArray(updatedValue), false);

// Read the rest of the stream, it should throw because the contents changed
String message = assertThrows(NoSuchFileException.class, () -> readFully(inputStream)).getMessage();
assertThat(
message,
startsWith(
"Blob object ["
+ container.path().buildAsString()
+ blobKey
+ "] generation [1] unavailable on resume (contents changed, or object deleted):"
)
);
} catch (Exception e) {
fail(e);
}
return null;
});
} finally {
final BlobStoreRepository repository = getRepository(PROXIED_TEST_REPO);
deleteAndAssertEmpty(repository, repository.basePath());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class GoogleCloudStorageRetryingInputStream extends InputStream {
private List<StorageException> failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS);
private long currentOffset;
private boolean closed;
private Long lastGeneration;

// Used for testing only
GoogleCloudStorageRetryingInputStream(OperationPurpose purpose, MeteredStorage client, BlobId blobId) throws IOException {
Expand Down Expand Up @@ -83,13 +84,22 @@ private InputStream openStream() throws IOException {
try {
final var meteredGet = client.meteredObjectsGet(purpose, blobId.getBucket(), blobId.getName());
meteredGet.setReturnRawInputStream(true);
if (lastGeneration != null) {
meteredGet.setGeneration(lastGeneration);
}

if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) {
if (meteredGet.getRequestHeaders() != null) {
meteredGet.getRequestHeaders().setRange("bytes=" + Math.addExact(start, currentOffset) + "-" + end);
}
}
final HttpResponse resp = meteredGet.executeMedia();
// Store the generation of the first response we received, so we can detect
// if the file has changed if we need to resume
if (lastGeneration == null) {
lastGeneration = parseGenerationHeader(resp);
}

final Long contentLength = resp.getHeaders().getContentLength();
InputStream content = resp.getContent();
if (contentLength != null) {
Expand All @@ -105,9 +115,22 @@ private InputStream openStream() throws IOException {
}
} catch (StorageException storageException) {
if (storageException.getCode() == RestStatus.NOT_FOUND.getStatus()) {
throw addSuppressedExceptions(
new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + storageException.getMessage())
);
if (lastGeneration != null) {
throw addSuppressedExceptions(
new NoSuchFileException(
"Blob object ["
+ blobId.getName()
+ "] generation ["
+ lastGeneration
+ "] unavailable on resume (contents changed, or object deleted): "
+ storageException.getMessage()
)
);
} else {
throw addSuppressedExceptions(
new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + storageException.getMessage())
);
}
}
if (storageException.getCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()) {
long currentPosition = Math.addExact(start, currentOffset);
Expand All @@ -124,6 +147,24 @@ private InputStream openStream() throws IOException {
}
}

private Long parseGenerationHeader(HttpResponse response) {
final String generationHeader = response.getHeaders().getFirstHeaderStringValue("x-goog-generation");
if (generationHeader != null) {
try {
return Long.parseLong(generationHeader);
} catch (NumberFormatException e) {
final String message = "Unexpected value for x-goog-generation header: " + generationHeader;
logger.warn(message);
assert false : message;
}
} else {
String message = "Missing x-goog-generation header";
logger.warn(message);
assert false : message;
}
return null;
}

// Google's SDK ignores the Content-Length header when no bytes are sent, see NetHttpResponse.SizeValidatingInputStream
// We have to implement our own validation logic here
static final class ContentLengthValidatingInputStream extends FilterInputStream {
Expand Down Expand Up @@ -210,7 +251,6 @@ private void ensureOpen() {
}
}

// TODO: check that object did not change when stream is reopened (e.g. based on etag)
private void reopenStreamOrFail(StorageException e) throws IOException {
if (attempt >= maxAttempts) {
throw addSuppressedExceptions(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ public void setReturnRawInputStream(boolean b) {
get.setReturnRawInputStream(b);
}

public void setGeneration(Long generation) {
get.setGeneration(generation);
}

public HttpHeaders getRequestHeaders() {
return get.getRequestHeaders();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.cloud.http.HttpTransportOptions;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

import org.apache.http.HttpStatus;
Expand All @@ -45,6 +46,7 @@
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.http.ResponseInjectingHttpHandler;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -56,6 +58,7 @@
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -71,6 +74,7 @@

import static fixture.gcs.TestUtils.createServiceAccount;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.elasticsearch.common.io.Streams.readFully;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageBlobStore.MAX_DELETES_PER_BATCH;
Expand All @@ -86,6 +90,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;

@SuppressForbidden(reason = "use a http server")
public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobContainerRetriesTestCase {
Expand Down Expand Up @@ -212,6 +217,11 @@ public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions<?, ?> ser
);
}

@Override
protected void addSuccessfulDownloadHeaders(HttpExchange exchange) {
exchange.getResponseHeaders().add("x-goog-generation", String.valueOf(randomNonNegativeInt()));
}

public void testShouldRetryOnConnectionRefused() {
// port 1 should never be open
endpointUrlOverride = "http://127.0.0.1:1";
Expand Down Expand Up @@ -242,6 +252,7 @@ public void testReadLargeBlobWithRetries() throws Exception {
httpServer.createContext(downloadStorageEndpoint(blobContainer, "large_blob_retries"), exchange -> {
Streams.readFully(exchange.getRequestBody());
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
addSuccessfulDownloadHeaders(exchange);
final HttpHeaderParser.Range range = getRange(exchange);
final int offset = Math.toIntExact(range.start());
final byte[] chunk = Arrays.copyOfRange(bytes, offset, Math.toIntExact(Math.min(range.end() + 1, bytes.length)));
Expand Down Expand Up @@ -570,6 +581,55 @@ public void testCompareAndExchangeWhenThrottled() throws IOException {
container.delete(randomPurpose());
}

public void testContentsChangeWhileStreaming() throws IOException {
GoogleCloudStorageHttpHandler handler = new GoogleCloudStorageHttpHandler("bucket");
httpServer.createContext("/", handler);
// The blob needs to be large enough that it won't be entirely buffered on the first request
final int enoughBytesToNotBeEntirelyBuffered = Math.toIntExact(ByteSizeValue.ofMb(30).getBytes());

final BlobContainer container = createBlobContainer(1, null, null, null, null, null, null);

final String key = randomIdentifier();
byte[] initialValue = randomByteArrayOfLength(enoughBytesToNotBeEntirelyBuffered);
container.writeBlob(randomPurpose(), key, new BytesArray(initialValue), true);

BytesReference reference = readFully(container.readBlob(randomPurpose(), key));
assertEquals(new BytesArray(initialValue), reference);

try (InputStream inputStream = container.readBlob(randomPurpose(), key)) {
// Trigger the first chunk to load
int read = inputStream.read();
assert read != -1;

// Restart the server (this triggers a retry)
restartHttpServer();
httpServer.createContext("/", handler);

// Update the file
byte[] updatedValue = randomByteArrayOfLength(enoughBytesToNotBeEntirelyBuffered);
container.writeBlob(randomPurpose(), key, new BytesArray(updatedValue), false);

// Read the rest of the stream, it should throw because the contents changed
String message = assertThrows(NoSuchFileException.class, () -> readFully(inputStream)).getMessage();
assertThat(
message,
startsWith(
"Blob object ["
+ container.path().buildAsString()
+ key
+ "] generation [1] unavailable on resume (contents changed, or object deleted):"
)
);
}
}

private void restartHttpServer() throws IOException {
InetSocketAddress currentAddress = httpServer.getAddress();
httpServer.stop(0);
httpServer = MockHttpServer.createHttp(currentAddress, 0);
Comment on lines +627 to +629
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether rebinding to the same address can be flaky sometimes. We can have it as is for now and see how it goes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be OK as long as we have SO_REUSEADDR set? It seemed to work locally so perhaps that's the default?

httpServer.start();
}

private HttpHandler safeHandler(HttpHandler handler) {
final HttpHandler loggingHandler = ESMockAPIBasedRepositoryIntegTestCase.wrap(handler, logger);
return exchange -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ public LowLevelHttpResponse execute() throws IOException {
result.setContent(content);
result.setContentLength(contentLength);
result.setContentType("application/octet-stream");
result.addHeader("x-goog-generation", String.valueOf(randomNonNegativeInt()));
result.setStatusCode(RestStatus.OK.getStatus());
return result;
}
Expand Down
Loading