Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ public TestGoogleCloudStoragePlugin(Settings settings) {
}

@Override
protected GoogleCloudStorageService createStorageService(Settings settings) {
return new GoogleCloudStorageService(settings) {
protected GoogleCloudStorageService createStorageService() {
return new GoogleCloudStorageService() {
@Override
StorageOptions createStorageOptions(
final GoogleCloudStorageClientSettings gcsClientSettings,
Expand Down Expand Up @@ -260,7 +260,8 @@ public Map<String, Repository.Factory> getRepositories(
this.storageService,
clusterService,
bigArrays,
recoverySettings
recoverySettings,
repositoriesMetrics
) {
@Override
protected GoogleCloudStorageBlobStore createBlobStore() {
Expand All @@ -271,7 +272,9 @@ protected GoogleCloudStorageBlobStore createBlobStore() {
storageService,
bigArrays,
randomIntBetween(1, 8) * 1024,
BackoffPolicy.noBackoff()
BackoffPolicy.noBackoff(),
metadata,
repositoriesMetrics
) {
@Override
long getLargeBlobThresholdInBytes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.BackoffPolicy;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
Expand All @@ -44,7 +45,7 @@
import org.elasticsearch.core.Streams;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation;
import org.elasticsearch.repositories.RepositoriesMetrics;
import org.elasticsearch.rest.RestStatus;

import java.io.ByteArrayInputStream;
Expand All @@ -71,6 +72,9 @@
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Counter.OPERATION;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Counter.OPERATION_EXCEPTION;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation.INSERT_OBJECT;

class GoogleCloudStorageBlobStore implements BlobStore {

Expand Down Expand Up @@ -123,14 +127,16 @@ class GoogleCloudStorageBlobStore implements BlobStore {
GoogleCloudStorageService storageService,
BigArrays bigArrays,
int bufferSize,
BackoffPolicy casBackoffPolicy
BackoffPolicy casBackoffPolicy,
RepositoryMetadata metadata,
RepositoriesMetrics repositoriesMetrics
) {
this.bucketName = bucketName;
this.clientName = clientName;
this.repositoryName = repositoryName;
this.storageService = storageService;
this.bigArrays = bigArrays;
this.stats = new GoogleCloudStorageOperationsStats(bucketName, storageService.isStateless());
this.stats = new GoogleCloudStorageOperationsStats(bucketName, metadata, repositoriesMetrics);
this.bufferSize = bufferSize;
this.casBackoffPolicy = casBackoffPolicy;
}
Expand Down Expand Up @@ -335,7 +341,6 @@ void writeBlob(
final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ? NO_OVERWRITE_NO_MD5 : OVERWRITE_NO_MD5;

StorageException storageException = null;

for (int retry = 0; retry < 3; ++retry) {
// we start out by buffering the write to a buffer, if it exceeds the large blob threshold we start a resumable upload, flush
// the buffer to it and keep writing to the resumable upload. If we never exceed the large blob threshold we just write the
Expand Down Expand Up @@ -397,10 +402,11 @@ public void write(byte[] b, int off, int len) throws IOException {
final WritableByteChannel writeChannel = channelRef.get();
if (writeChannel != null) {
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT);

} else {
writeBlob(purpose, blobName, buffer.bytes(), failIfAlreadyExists);
}
stats.incMetric(purpose, INSERT_OBJECT, OPERATION);
return;
} catch (final StorageException se) {
final int errorCode = se.getCode();
Expand All @@ -409,15 +415,18 @@ public void write(byte[] b, int off, int len) throws IOException {
storageException = ExceptionsHelper.useOrSuppress(storageException, se);
continue;
} else if (failIfAlreadyExists && errorCode == HTTP_PRECON_FAILED) {
stats.incMetric(purpose, INSERT_OBJECT, OPERATION_EXCEPTION);
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
}
if (storageException != null) {
se.addSuppressed(storageException);
}
stats.incMetric(purpose, INSERT_OBJECT, OPERATION_EXCEPTION);
throw se;
}
}
assert storageException != null;
stats.incMetric(purpose, INSERT_OBJECT, OPERATION_EXCEPTION);
throw storageException;
}

Expand Down Expand Up @@ -467,7 +476,7 @@ private void writeBlobResumable(
// we do with the GET/LIST operations since this operations
// can trigger multiple underlying http requests but only one
// operation is billed.
stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT);
stats.incMetric(purpose,INSERT_OBJECT,OPERATION);
return;
} catch (final StorageException se) {
final int errorCode = se.getCode();
Expand All @@ -477,15 +486,18 @@ private void writeBlobResumable(
inputStream.reset();
continue;
} else if (failIfAlreadyExists && errorCode == HTTP_PRECON_FAILED) {
stats.incMetric(purpose, INSERT_OBJECT, OPERATION_EXCEPTION);
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
}
if (storageException != null) {
se.addSuppressed(storageException);
}
stats.incMetric(purpose, INSERT_OBJECT, OPERATION_EXCEPTION);
throw se;
}
}
assert storageException != null;
stats.incMetric(purpose, INSERT_OBJECT, OPERATION_EXCEPTION);
throw storageException;
}

Expand Down Expand Up @@ -519,8 +531,9 @@ private void writeBlobMultipart(
// we do with the GET/LIST operations since this operations
// can trigger multiple underlying http requests but only one
// operation is billed.
stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT);
stats.incMetric(purpose, INSERT_OBJECT, OPERATION);
} catch (final StorageException se) {
stats.incMetric(purpose, INSERT_OBJECT, OPERATION_EXCEPTION);
if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
}
Expand Down Expand Up @@ -638,7 +651,7 @@ private static String buildKey(String keyPath, String s) {

@Override
public Map<String, BlobStoreActionStats> stats() {
return stats.tracker().toMap();
return stats.toMap();
}

private static final class WritableBlobChannel implements WritableByteChannel {
Expand Down Expand Up @@ -749,7 +762,7 @@ OptionalBytesReference compareAndExchangeRegister(
Storage.BlobTargetOption.generationMatch()
)
);
stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT);
stats.incMetric(purpose, INSERT_OBJECT, OPERATION);
return OptionalBytesReference.of(expected);
} catch (Exception e) {
final var serviceException = unwrapServiceException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,65 +18,81 @@

import java.util.regex.Pattern;

import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Counter.OPERATION;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Counter.OPERATION_EXCEPTION;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Counter.REQUEST;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Counter.REQUEST_EXCEPTION;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.StatsTracker;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation.GET_OBJECT;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation.INSERT_OBJECT;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation.LIST_OBJECTS;

final class GoogleCloudStorageHttpStatsCollector implements HttpResponseInterceptor {

private static final Logger logger = LogManager.getLogger("GcpHttpStats");

private final StatsTracker stats;
private final GoogleCloudStorageOperationsStats stats;
private final OperationPurpose purpose;
private final Pattern getObjPattern;
private final Pattern insertObjPattern;
private final Pattern listObjPattern;

GoogleCloudStorageHttpStatsCollector(final GoogleCloudStorageOperationsStats stats, OperationPurpose purpose) {
this.stats = stats.tracker();
this.purpose = purpose;
var bucket = stats.bucketName();

this.stats = stats;
// The specification for the current API (v1) endpoints can be found at:
// https://cloud.google.com/storage/docs/json_api/v1
var bucket = stats.bucket();
this.getObjPattern = Pattern.compile("(/download)?/storage/v1/b/" + bucket + "/o/.+");
this.insertObjPattern = Pattern.compile("(/upload)?/storage/v1/b/" + bucket + "/o");
this.listObjPattern = Pattern.compile("/storage/v1/b/" + bucket + "/o");
}

private void trackRequest(Operation operation) {
stats.trackRequest(purpose, operation);
private void incMetric(Operation op, GoogleCloudStorageOperationsStats.Counter metric) {
stats.incMetric(purpose, op, metric);
}

private void trackRequestAndOperation(Operation operation) {
stats.trackOperationAndRequest(purpose, operation);
private boolean isSuccessCode(int code) {
return (code >= 200 && code < 300) || code == 308;
}

@Override
public void interceptResponse(final HttpResponse response) {
var respCode = response.getStatusCode();
// Some of the intermediate and error codes are still counted as "good" requests
if (((respCode >= 200 && respCode < 300) || respCode == 308 || respCode == 404) == false) {
return;
}
var request = response.getRequest();

var code = response.getStatusCode();
var path = request.getUrl().getRawPath();
var ignored = false;
switch (request.getRequestMethod()) {
case "GET" -> {
// https://cloud.google.com/storage/docs/json_api/v1/objects/get
if (getObjPattern.matcher(path).matches()) {
trackRequestAndOperation(Operation.GET_OBJECT);
if (isSuccessCode(code) || code == 404) {
incMetric(GET_OBJECT, REQUEST);
incMetric(GET_OBJECT, OPERATION);
} else {
incMetric(GET_OBJECT, REQUEST_EXCEPTION);
incMetric(GET_OBJECT, OPERATION_EXCEPTION);
}
} else if (listObjPattern.matcher(path).matches()) {
trackRequestAndOperation(Operation.LIST_OBJECTS);
if (isSuccessCode(code)) {
incMetric(LIST_OBJECTS, REQUEST);
incMetric(LIST_OBJECTS, OPERATION);
} else {
incMetric(LIST_OBJECTS, REQUEST_EXCEPTION);
incMetric(LIST_OBJECTS, OPERATION_EXCEPTION);
}
} else {
ignored = true;
}
}
case "POST", "PUT" -> {
// https://cloud.google.com/storage/docs/json_api/v1/objects/insert
if (insertObjPattern.matcher(path).matches()) {
trackRequest(Operation.INSERT_OBJECT);
if (isSuccessCode(code)) {
incMetric(INSERT_OBJECT, REQUEST);
} else {
incMetric(INSERT_OBJECT, REQUEST_EXCEPTION);
}
} else {
ignored = true;
}
Expand Down
Loading