Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/125278.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 125278
summary: Integrate GCP client with telemetry
area: Snapshot/Restore
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ public TestGoogleCloudStoragePlugin(Settings settings) {
}

@Override
protected GoogleCloudStorageService createStorageService(Settings settings) {
return new GoogleCloudStorageService(settings) {
protected GoogleCloudStorageService createStorageService() {
return new GoogleCloudStorageService() {
@Override
StorageOptions createStorageOptions(
final GoogleCloudStorageClientSettings gcsClientSettings,
Expand Down Expand Up @@ -260,7 +260,8 @@ public Map<String, Repository.Factory> getRepositories(
this.storageService,
clusterService,
bigArrays,
recoverySettings
recoverySettings,
repositoriesMetrics
) {
@Override
protected GoogleCloudStorageBlobStore createBlobStore() {
Expand All @@ -271,7 +272,9 @@ protected GoogleCloudStorageBlobStore createBlobStore() {
storageService,
bigArrays,
randomIntBetween(1, 8) * 1024,
BackoffPolicy.noBackoff()
BackoffPolicy.noBackoff(),
metadata,
repositoriesMetrics
) {
@Override
long getLargeBlobThresholdInBytes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,29 @@
import org.elasticsearch.common.blobstore.support.BlobMetadata;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.CheckedRunnable;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.Map;

import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Counter.OPERATION;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Counter.OPERATION_EXCEPTION;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation.INSERT_OBJECT;

class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {

private final GoogleCloudStorageBlobStore blobStore;
private final GoogleCloudStorageOperationsStats tracker;
private final String path;

GoogleCloudStorageBlobContainer(BlobPath path, GoogleCloudStorageBlobStore blobStore) {
super(path);
this.blobStore = blobStore;
this.path = path.buildAsString();
this.tracker = blobStore.tracker();
}

@Override
Expand Down Expand Up @@ -73,15 +80,32 @@ public InputStream readBlob(OperationPurpose purpose, final String blobName, fin
return blobStore.readBlob(purpose, buildKey(blobName), position, length);
}

// We don't track InsertObject operation on the http layer as
// we do with the GET/LIST operations since this operations
// can trigger multiple underlying http requests but only one
// operation is billed.
private void trackInsertObject(OperationPurpose purpose, CheckedRunnable<IOException> r) throws IOException {
try {
tracker.inc(purpose, INSERT_OBJECT, OPERATION);
r.run();
} catch (Exception e) {
tracker.inc(purpose, INSERT_OBJECT, OPERATION_EXCEPTION);
throw e;
}
}

@Override
public void writeBlob(OperationPurpose purpose, String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
throws IOException {
blobStore.writeBlob(purpose, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
trackInsertObject(
purpose,
() -> blobStore.writeStreamBlob(purpose, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists)
);
}

@Override
public void writeBlob(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
blobStore.writeBlob(purpose, buildKey(blobName), bytes, failIfAlreadyExists);
trackInsertObject(purpose, () -> blobStore.writeFullBlob(purpose, buildKey(blobName), bytes, failIfAlreadyExists));
}

@Override
Expand All @@ -92,7 +116,7 @@ public void writeMetadataBlob(
boolean atomic,
CheckedConsumer<OutputStream, IOException> writer
) throws IOException {
blobStore.writeBlob(purpose, buildKey(blobName), failIfAlreadyExists, writer);
trackInsertObject(purpose, () -> blobStore.writeMetadata(purpose, buildKey(blobName), failIfAlreadyExists, writer));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.BackoffPolicy;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
Expand All @@ -44,7 +45,7 @@
import org.elasticsearch.core.Streams;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation;
import org.elasticsearch.repositories.RepositoriesMetrics;
import org.elasticsearch.rest.RestStatus;

import java.io.ByteArrayInputStream;
Expand All @@ -71,6 +72,9 @@
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Counter.OPERATION;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Counter.OPERATION_EXCEPTION;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation.INSERT_OBJECT;

class GoogleCloudStorageBlobStore implements BlobStore {

Expand Down Expand Up @@ -111,7 +115,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
private final String clientName;
private final String repositoryName;
private final GoogleCloudStorageService storageService;
private final GoogleCloudStorageOperationsStats stats;
private final GoogleCloudStorageOperationsStats tracker;
private final int bufferSize;
private final BigArrays bigArrays;
private final BackoffPolicy casBackoffPolicy;
Expand All @@ -123,20 +127,22 @@ class GoogleCloudStorageBlobStore implements BlobStore {
GoogleCloudStorageService storageService,
BigArrays bigArrays,
int bufferSize,
BackoffPolicy casBackoffPolicy
BackoffPolicy casBackoffPolicy,
RepositoryMetadata metadata,
RepositoriesMetrics repositoriesMetrics
) {
this.bucketName = bucketName;
this.clientName = clientName;
this.repositoryName = repositoryName;
this.storageService = storageService;
this.bigArrays = bigArrays;
this.stats = new GoogleCloudStorageOperationsStats(bucketName, storageService.isStateless());
this.tracker = new GoogleCloudStorageOperationsStats(bucketName, metadata, repositoriesMetrics);
this.bufferSize = bufferSize;
this.casBackoffPolicy = casBackoffPolicy;
}

private Storage client(OperationPurpose purpose) throws IOException {
return storageService.client(clientName, repositoryName, purpose, stats);
return storageService.client(clientName, repositoryName, purpose, tracker);
}

@Override
Expand All @@ -149,6 +155,10 @@ public void close() {
storageService.closeRepositoryClients(repositoryName);
}

GoogleCloudStorageOperationsStats tracker() {
return tracker;
}

/**
* List blobs in the specific bucket under the specified path. The path root is removed.
*
Expand Down Expand Up @@ -266,7 +276,7 @@ InputStream readBlob(OperationPurpose purpose, String blobName, long position, l
* @param bytes content of the blob to be written
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
*/
void writeBlob(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
void writeFullBlob(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overloaded writeBlob is hard to read, especially when we call one from the inside of another.

if (bytes.length() > getLargeBlobThresholdInBytes()) {
// Compute md5 here so #writeBlobResumable forces the integrity check on the resumable upload.
// This is needed since we rely on atomic write behavior when writing BytesReferences in BlobStoreRepository which is not
Expand All @@ -284,7 +294,7 @@ void writeBlob(OperationPurpose purpose, String blobName, BytesReference bytes,
if (bytes.hasArray()) {
writeBlobMultipart(purpose, blobInfo, bytes.array(), bytes.arrayOffset(), bytes.length(), failIfAlreadyExists);
} else {
writeBlob(purpose, bytes.streamInput(), bytes.length(), failIfAlreadyExists, blobInfo);
writeStreamBlobWithInfo(purpose, bytes.streamInput(), bytes.length(), failIfAlreadyExists, blobInfo);
}
}
}
Expand All @@ -296,13 +306,18 @@ void writeBlob(OperationPurpose purpose, String blobName, BytesReference bytes,
* @param blobSize expected size of the blob to be written
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
*/
void writeBlob(OperationPurpose purpose, String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
void writeStreamBlob(OperationPurpose purpose, String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
throws IOException {
writeBlob(purpose, inputStream, blobSize, failIfAlreadyExists, BlobInfo.newBuilder(bucketName, blobName).build());
writeStreamBlobWithInfo(purpose, inputStream, blobSize, failIfAlreadyExists, BlobInfo.newBuilder(bucketName, blobName).build());
}

private void writeBlob(OperationPurpose purpose, InputStream inputStream, long blobSize, boolean failIfAlreadyExists, BlobInfo blobInfo)
throws IOException {
private void writeStreamBlobWithInfo(
OperationPurpose purpose,
InputStream inputStream,
long blobSize,
boolean failIfAlreadyExists,
BlobInfo blobInfo
) throws IOException {
if (blobSize > getLargeBlobThresholdInBytes()) {
writeBlobResumable(purpose, blobInfo, inputStream, blobSize, failIfAlreadyExists);
} else {
Expand All @@ -325,7 +340,7 @@ long getLargeBlobThresholdInBytes() {
Storage.BlobWriteOption.md5Match() };
private static final Storage.BlobWriteOption[] OVERWRITE_CHECK_MD5 = { Storage.BlobWriteOption.md5Match() };

void writeBlob(
void writeMetadata(
OperationPurpose purpose,
String blobName,
boolean failIfAlreadyExists,
Expand All @@ -335,7 +350,6 @@ void writeBlob(
final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ? NO_OVERWRITE_NO_MD5 : OVERWRITE_NO_MD5;

StorageException storageException = null;

for (int retry = 0; retry < 3; ++retry) {
// we start out by buffering the write to a buffer, if it exceeds the large blob threshold we start a resumable upload, flush
// the buffer to it and keep writing to the resumable upload. If we never exceed the large blob threshold we just write the
Expand Down Expand Up @@ -397,9 +411,8 @@ public void write(byte[] b, int off, int len) throws IOException {
final WritableByteChannel writeChannel = channelRef.get();
if (writeChannel != null) {
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT);
} else {
writeBlob(purpose, blobName, buffer.bytes(), failIfAlreadyExists);
writeFullBlob(purpose, blobName, buffer.bytes(), failIfAlreadyExists);
}
return;
} catch (final StorageException se) {
Expand Down Expand Up @@ -463,11 +476,7 @@ private void writeBlobResumable(
*/
org.elasticsearch.core.Streams.copy(inputStream, Channels.newOutputStream(new WritableBlobChannel(writeChannel)), buffer);
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
// We don't track this operation on the http layer as
// we do with the GET/LIST operations since this operations
// can trigger multiple underlying http requests but only one
// operation is billed.
stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT);

return;
} catch (final StorageException se) {
final int errorCode = se.getCode();
Expand Down Expand Up @@ -519,7 +528,6 @@ private void writeBlobMultipart(
// we do with the GET/LIST operations since this operations
// can trigger multiple underlying http requests but only one
// operation is billed.
stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT);
} catch (final StorageException se) {
if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
Expand Down Expand Up @@ -638,7 +646,7 @@ private static String buildKey(String keyPath, String s) {

@Override
public Map<String, BlobStoreActionStats> stats() {
return stats.tracker().toMap();
return tracker.toMap();
}

private static final class WritableBlobChannel implements WritableByteChannel {
Expand Down Expand Up @@ -740,6 +748,7 @@ OptionalBytesReference compareAndExchangeRegister(
BaseServiceException finalException = null;
while (true) {
try {
tracker.inc(purpose, INSERT_OBJECT, OPERATION);
SocketAccess.doPrivilegedVoidIOException(
() -> client(purpose).create(
blobInfo,
Expand All @@ -749,9 +758,9 @@ OptionalBytesReference compareAndExchangeRegister(
Storage.BlobTargetOption.generationMatch()
)
);
stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT);
return OptionalBytesReference.of(expected);
} catch (Exception e) {
tracker.inc(purpose, INSERT_OBJECT, OPERATION_EXCEPTION);
final var serviceException = unwrapServiceException(e);
if (serviceException == null) {
throw e;
Expand Down
Loading
Loading