diff --git a/docs/changelog/125278.yaml b/docs/changelog/125278.yaml new file mode 100644 index 0000000000000..2601dd52359df --- /dev/null +++ b/docs/changelog/125278.yaml @@ -0,0 +1,5 @@ +pr: 125278 +summary: Integrate GCP client with telemetry +area: Snapshot/Restore +type: enhancement +issues: [] diff --git a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index 9ef5289b7accb..86bd975616fe4 100644 --- a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -213,8 +213,8 @@ public TestGoogleCloudStoragePlugin(Settings settings) { } @Override - protected GoogleCloudStorageService createStorageService(Settings settings) { - return new GoogleCloudStorageService(settings) { + protected GoogleCloudStorageService createStorageService() { + return new GoogleCloudStorageService() { @Override StorageOptions createStorageOptions( final GoogleCloudStorageClientSettings gcsClientSettings, @@ -260,7 +260,8 @@ public Map getRepositories( this.storageService, clusterService, bigArrays, - recoverySettings + recoverySettings, + repositoriesMetrics ) { @Override protected GoogleCloudStorageBlobStore createBlobStore() { @@ -271,7 +272,9 @@ protected GoogleCloudStorageBlobStore createBlobStore() { storageService, bigArrays, randomIntBetween(1, 8) * 1024, - BackoffPolicy.noBackoff() + BackoffPolicy.noBackoff(), + metadata, + repositoriesMetrics ) { @Override long getLargeBlobThresholdInBytes() { diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java index 22c600445c609..7996cdf8ab1c6 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java @@ -20,6 +20,7 @@ import org.elasticsearch.common.blobstore.support.BlobMetadata; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.core.CheckedRunnable; import java.io.IOException; import java.io.InputStream; @@ -27,15 +28,21 @@ import java.util.Iterator; import java.util.Map; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Counter.OPERATION; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Counter.OPERATION_EXCEPTION; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation.INSERT_OBJECT; + class GoogleCloudStorageBlobContainer extends AbstractBlobContainer { private final GoogleCloudStorageBlobStore blobStore; + private final GoogleCloudStorageOperationsStats tracker; private final String path; GoogleCloudStorageBlobContainer(BlobPath path, GoogleCloudStorageBlobStore blobStore) { super(path); this.blobStore = blobStore; this.path = path.buildAsString(); + this.tracker = blobStore.tracker(); } @Override @@ -73,15 +80,32 @@ public InputStream readBlob(OperationPurpose purpose, final String blobName, fin return blobStore.readBlob(purpose, buildKey(blobName), position, length); } + // We don't track InsertObject operation on the http layer as + // we do with the GET/LIST operations since this operations + // can trigger multiple underlying http requests but only one + // operation is billed. + private void trackInsertObject(OperationPurpose purpose, CheckedRunnable r) throws IOException { + try { + tracker.inc(purpose, INSERT_OBJECT, OPERATION); + r.run(); + } catch (Exception e) { + tracker.inc(purpose, INSERT_OBJECT, OPERATION_EXCEPTION); + throw e; + } + } + @Override public void writeBlob(OperationPurpose purpose, String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { - blobStore.writeBlob(purpose, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists); + trackInsertObject( + purpose, + () -> blobStore.writeStreamBlob(purpose, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists) + ); } @Override public void writeBlob(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { - blobStore.writeBlob(purpose, buildKey(blobName), bytes, failIfAlreadyExists); + trackInsertObject(purpose, () -> blobStore.writeFullBlob(purpose, buildKey(blobName), bytes, failIfAlreadyExists)); } @Override @@ -92,7 +116,7 @@ public void writeMetadataBlob( boolean atomic, CheckedConsumer writer ) throws IOException { - blobStore.writeBlob(purpose, buildKey(blobName), failIfAlreadyExists, writer); + trackInsertObject(purpose, () -> blobStore.writeMetadata(purpose, buildKey(blobName), failIfAlreadyExists, writer)); } @Override diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 7c2b1d55ad732..e789661afc61a 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.BackoffPolicy; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; @@ -44,7 +45,7 @@ import org.elasticsearch.core.Streams; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation; +import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.rest.RestStatus; import java.io.ByteArrayInputStream; @@ -71,6 +72,9 @@ import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static java.net.HttpURLConnection.HTTP_PRECON_FAILED; import static org.elasticsearch.core.Strings.format; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Counter.OPERATION; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Counter.OPERATION_EXCEPTION; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation.INSERT_OBJECT; class GoogleCloudStorageBlobStore implements BlobStore { @@ -111,7 +115,7 @@ class GoogleCloudStorageBlobStore implements BlobStore { private final String clientName; private final String repositoryName; private final GoogleCloudStorageService storageService; - private final GoogleCloudStorageOperationsStats stats; + private final GoogleCloudStorageOperationsStats tracker; private final int bufferSize; private final BigArrays bigArrays; private final BackoffPolicy casBackoffPolicy; @@ -123,20 +127,22 @@ class GoogleCloudStorageBlobStore implements BlobStore { GoogleCloudStorageService storageService, BigArrays bigArrays, int bufferSize, - BackoffPolicy casBackoffPolicy + BackoffPolicy casBackoffPolicy, + RepositoryMetadata metadata, + RepositoriesMetrics repositoriesMetrics ) { this.bucketName = bucketName; this.clientName = clientName; this.repositoryName = repositoryName; this.storageService = storageService; this.bigArrays = bigArrays; - this.stats = new GoogleCloudStorageOperationsStats(bucketName, storageService.isStateless()); + this.tracker = new GoogleCloudStorageOperationsStats(bucketName, metadata, repositoriesMetrics); this.bufferSize = bufferSize; this.casBackoffPolicy = casBackoffPolicy; } private Storage client(OperationPurpose purpose) throws IOException { - return storageService.client(clientName, repositoryName, purpose, stats); + return storageService.client(clientName, repositoryName, purpose, tracker); } @Override @@ -149,6 +155,10 @@ public void close() { storageService.closeRepositoryClients(repositoryName); } + GoogleCloudStorageOperationsStats tracker() { + return tracker; + } + /** * List blobs in the specific bucket under the specified path. The path root is removed. * @@ -266,7 +276,7 @@ InputStream readBlob(OperationPurpose purpose, String blobName, long position, l * @param bytes content of the blob to be written * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists */ - void writeBlob(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { + void writeFullBlob(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { if (bytes.length() > getLargeBlobThresholdInBytes()) { // Compute md5 here so #writeBlobResumable forces the integrity check on the resumable upload. // This is needed since we rely on atomic write behavior when writing BytesReferences in BlobStoreRepository which is not @@ -284,7 +294,7 @@ void writeBlob(OperationPurpose purpose, String blobName, BytesReference bytes, if (bytes.hasArray()) { writeBlobMultipart(purpose, blobInfo, bytes.array(), bytes.arrayOffset(), bytes.length(), failIfAlreadyExists); } else { - writeBlob(purpose, bytes.streamInput(), bytes.length(), failIfAlreadyExists, blobInfo); + writeStreamBlobWithInfo(purpose, bytes.streamInput(), bytes.length(), failIfAlreadyExists, blobInfo); } } } @@ -296,13 +306,18 @@ void writeBlob(OperationPurpose purpose, String blobName, BytesReference bytes, * @param blobSize expected size of the blob to be written * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists */ - void writeBlob(OperationPurpose purpose, String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) + void writeStreamBlob(OperationPurpose purpose, String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { - writeBlob(purpose, inputStream, blobSize, failIfAlreadyExists, BlobInfo.newBuilder(bucketName, blobName).build()); + writeStreamBlobWithInfo(purpose, inputStream, blobSize, failIfAlreadyExists, BlobInfo.newBuilder(bucketName, blobName).build()); } - private void writeBlob(OperationPurpose purpose, InputStream inputStream, long blobSize, boolean failIfAlreadyExists, BlobInfo blobInfo) - throws IOException { + private void writeStreamBlobWithInfo( + OperationPurpose purpose, + InputStream inputStream, + long blobSize, + boolean failIfAlreadyExists, + BlobInfo blobInfo + ) throws IOException { if (blobSize > getLargeBlobThresholdInBytes()) { writeBlobResumable(purpose, blobInfo, inputStream, blobSize, failIfAlreadyExists); } else { @@ -325,7 +340,7 @@ long getLargeBlobThresholdInBytes() { Storage.BlobWriteOption.md5Match() }; private static final Storage.BlobWriteOption[] OVERWRITE_CHECK_MD5 = { Storage.BlobWriteOption.md5Match() }; - void writeBlob( + void writeMetadata( OperationPurpose purpose, String blobName, boolean failIfAlreadyExists, @@ -335,7 +350,6 @@ void writeBlob( final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ? NO_OVERWRITE_NO_MD5 : OVERWRITE_NO_MD5; StorageException storageException = null; - for (int retry = 0; retry < 3; ++retry) { // we start out by buffering the write to a buffer, if it exceeds the large blob threshold we start a resumable upload, flush // the buffer to it and keep writing to the resumable upload. If we never exceed the large blob threshold we just write the @@ -397,9 +411,8 @@ public void write(byte[] b, int off, int len) throws IOException { final WritableByteChannel writeChannel = channelRef.get(); if (writeChannel != null) { SocketAccess.doPrivilegedVoidIOException(writeChannel::close); - stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT); } else { - writeBlob(purpose, blobName, buffer.bytes(), failIfAlreadyExists); + writeFullBlob(purpose, blobName, buffer.bytes(), failIfAlreadyExists); } return; } catch (final StorageException se) { @@ -463,11 +476,7 @@ private void writeBlobResumable( */ org.elasticsearch.core.Streams.copy(inputStream, Channels.newOutputStream(new WritableBlobChannel(writeChannel)), buffer); SocketAccess.doPrivilegedVoidIOException(writeChannel::close); - // We don't track this operation on the http layer as - // we do with the GET/LIST operations since this operations - // can trigger multiple underlying http requests but only one - // operation is billed. - stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT); + return; } catch (final StorageException se) { final int errorCode = se.getCode(); @@ -519,7 +528,6 @@ private void writeBlobMultipart( // we do with the GET/LIST operations since this operations // can trigger multiple underlying http requests but only one // operation is billed. - stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT); } catch (final StorageException se) { if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) { throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage()); @@ -638,7 +646,7 @@ private static String buildKey(String keyPath, String s) { @Override public Map stats() { - return stats.tracker().toMap(); + return tracker.toMap(); } private static final class WritableBlobChannel implements WritableByteChannel { @@ -740,6 +748,7 @@ OptionalBytesReference compareAndExchangeRegister( BaseServiceException finalException = null; while (true) { try { + tracker.inc(purpose, INSERT_OBJECT, OPERATION); SocketAccess.doPrivilegedVoidIOException( () -> client(purpose).create( blobInfo, @@ -749,9 +758,9 @@ OptionalBytesReference compareAndExchangeRegister( Storage.BlobTargetOption.generationMatch() ) ); - stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT); return OptionalBytesReference.of(expected); } catch (Exception e) { + tracker.inc(purpose, INSERT_OBJECT, OPERATION_EXCEPTION); final var serviceException = unwrapServiceException(e); if (serviceException == null) { throw e; diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageHttpStatsCollector.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageHttpStatsCollector.java index 669a228535c63..86e49d296661b 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageHttpStatsCollector.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageHttpStatsCollector.java @@ -18,57 +18,77 @@ import java.util.regex.Pattern; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Counter.OPERATION; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Counter.OPERATION_EXCEPTION; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Counter.RANGE_NOT_SATISFIED; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Counter.REQUEST; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Counter.REQUEST_EXCEPTION; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Counter.THROTTLE; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation; -import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.StatsTracker; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation.GET_OBJECT; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation.INSERT_OBJECT; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation.LIST_OBJECTS; final class GoogleCloudStorageHttpStatsCollector implements HttpResponseInterceptor { private static final Logger logger = LogManager.getLogger("GcpHttpStats"); - private final StatsTracker stats; + private final GoogleCloudStorageOperationsStats stats; private final OperationPurpose purpose; private final Pattern getObjPattern; private final Pattern insertObjPattern; private final Pattern listObjPattern; GoogleCloudStorageHttpStatsCollector(final GoogleCloudStorageOperationsStats stats, OperationPurpose purpose) { - this.stats = stats.tracker(); this.purpose = purpose; - var bucket = stats.bucketName(); - + this.stats = stats; // The specification for the current API (v1) endpoints can be found at: // https://cloud.google.com/storage/docs/json_api/v1 + var bucket = stats.bucket(); this.getObjPattern = Pattern.compile("(/download)?/storage/v1/b/" + bucket + "/o/.+"); this.insertObjPattern = Pattern.compile("(/upload)?/storage/v1/b/" + bucket + "/o"); this.listObjPattern = Pattern.compile("/storage/v1/b/" + bucket + "/o"); } - private void trackRequest(Operation operation) { - stats.trackRequest(purpose, operation); + private void inc(Operation op, GoogleCloudStorageOperationsStats.Counter counter) { + stats.inc(purpose, op, counter); } - private void trackRequestAndOperation(Operation operation) { - stats.trackOperationAndRequest(purpose, operation); + private boolean isSuccessCode(int code) { + return (code >= 200 && code < 300) || code == 308; } @Override public void interceptResponse(final HttpResponse response) { - var respCode = response.getStatusCode(); - // Some of the intermediate and error codes are still counted as "good" requests - if (((respCode >= 200 && respCode < 300) || respCode == 308 || respCode == 404) == false) { - return; - } var request = response.getRequest(); - + var code = response.getStatusCode(); var path = request.getUrl().getRawPath(); var ignored = false; switch (request.getRequestMethod()) { case "GET" -> { // https://cloud.google.com/storage/docs/json_api/v1/objects/get if (getObjPattern.matcher(path).matches()) { - trackRequestAndOperation(Operation.GET_OBJECT); + inc(GET_OBJECT, REQUEST); + inc(GET_OBJECT, OPERATION); + if (isSuccessCode(code) == false) { + inc(GET_OBJECT, REQUEST_EXCEPTION); + inc(GET_OBJECT, OPERATION_EXCEPTION); + if (code == 429) { + inc(GET_OBJECT, THROTTLE); + } else if (code == 416) { + inc(GET_OBJECT, RANGE_NOT_SATISFIED); + } + } } else if (listObjPattern.matcher(path).matches()) { - trackRequestAndOperation(Operation.LIST_OBJECTS); + inc(LIST_OBJECTS, REQUEST); + inc(LIST_OBJECTS, OPERATION); + if (isSuccessCode(code) == false) { + inc(LIST_OBJECTS, REQUEST_EXCEPTION); + inc(LIST_OBJECTS, OPERATION_EXCEPTION); + if (code == 429) { + inc(LIST_OBJECTS, THROTTLE); + } + } } else { ignored = true; } @@ -76,7 +96,13 @@ public void interceptResponse(final HttpResponse response) { case "POST", "PUT" -> { // https://cloud.google.com/storage/docs/json_api/v1/objects/insert if (insertObjPattern.matcher(path).matches()) { - trackRequest(Operation.INSERT_OBJECT); + inc(INSERT_OBJECT, REQUEST); + if (isSuccessCode(code) == false) { + inc(INSERT_OBJECT, REQUEST_EXCEPTION); + if (code == 429) { + inc(INSERT_OBJECT, THROTTLE); + } + } } else { ignored = true; } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java index 06878a17a97b6..d746e6d08467b 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java @@ -9,8 +9,10 @@ package org.elasticsearch.repositories.gcs; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.blobstore.BlobStoreActionStats; import org.elasticsearch.common.blobstore.OperationPurpose; +import org.elasticsearch.repositories.RepositoriesMetrics; import java.util.EnumMap; import java.util.Map; @@ -19,157 +21,102 @@ final class GoogleCloudStorageOperationsStats { - private final String bucketName; - private final StatsTracker tracker; + private final String bucket; - GoogleCloudStorageOperationsStats(String bucketName, boolean isStateless) { - this.bucketName = bucketName; - if (isStateless) { - this.tracker = new ServerlessTracker(bucketName); - } else { - this.tracker = new StatefulTracker(); - } - } - - GoogleCloudStorageOperationsStats(String bucketName) { - this(bucketName, false); - } - - public String bucketName() { - return bucketName; - } - - public StatsTracker tracker() { - return tracker; - } - - public enum Operation { - GET_OBJECT("GetObject"), - LIST_OBJECTS("ListObjects"), - INSERT_OBJECT("InsertObject"); - - private final String key; + /** + * Track operations only, required for REST repository metering API + */ + private final EnumMap operations; - Operation(String key) { - this.key = key; + /** + * Telemetry (APM) + */ + private final RepositoriesMetrics telemetry; + private final EnumMap>> telemetryAttributes; + + GoogleCloudStorageOperationsStats(String bucket, RepositoryMetadata metadata, RepositoriesMetrics repositoriesMetrics) { + this.bucket = bucket; + this.telemetry = repositoriesMetrics; + this.operations = new EnumMap<>(Operation.class); + for (var op : Operation.values()) { + operations.put(op, new LongAdder()); } + this.telemetryAttributes = new EnumMap<>(OperationPurpose.class); + for (var purpose : OperationPurpose.values()) { + var purposeMap = new EnumMap>(Operation.class); + telemetryAttributes.put(purpose, purposeMap); + for (var operation : Operation.values()) { + var attrMap = RepositoriesMetrics.createAttributesMap(metadata, purpose, operation.key); + } - public String key() { - return key; } } - sealed interface StatsTracker permits ServerlessTracker, StatefulTracker { - void trackRequest(OperationPurpose purpose, Operation operation); - - void trackOperation(OperationPurpose purpose, Operation operation); - - Map toMap(); - - default void trackOperationAndRequest(OperationPurpose purpose, Operation operation) { - trackOperation(purpose, operation); - trackRequest(purpose, operation); - } + String bucket() { + return bucket; } /** - * Stateful tracker is single dimension: Operation only. The OperationPurpose is ignored. + * Increment counter by 1 for given: + * @param purpose {@link OperationPurpose} + * @param operation {@link Operation} + * @param counter {@link Counter} */ - static final class StatefulTracker implements StatsTracker { - - private final EnumMap counters; - - StatefulTracker() { - this.counters = new EnumMap<>(Operation.class); - for (var operation : Operation.values()) { - counters.put(operation, new Counters(operation.key())); + void inc(OperationPurpose purpose, Operation operation, Counter counter) { + var attr = telemetryAttributes.get(purpose).get(operation); + switch (counter) { + case REQUEST -> telemetry.requestCounter().incrementBy(1, attr); + case REQUEST_EXCEPTION -> { + telemetry.exceptionCounter().incrementBy(1, attr); + telemetry.exceptionHistogram().record(1, attr); } + case RANGE_NOT_SATISFIED -> telemetry.requestRangeNotSatisfiedExceptionCounter().incrementBy(1, attr); + case THROTTLE -> { + telemetry.throttleCounter().incrementBy(1, attr); + telemetry.throttleHistogram().record(1, attr); + } + case OPERATION -> { + telemetry.operationCounter().incrementBy(1, attr); + operations.get(operation).add(1); + } + case OPERATION_EXCEPTION -> telemetry.unsuccessfulOperationCounter().incrementBy(1, attr); } + } - @Override - public void trackRequest(OperationPurpose purpose, Operation operation) { - // dont track requests, only operations - } - - @Override - public void trackOperation(OperationPurpose purpose, Operation operation) { - counters.get(operation).operations().add(1); - } - - @Override - public Map toMap() { - return counters.values().stream().collect(Collectors.toUnmodifiableMap(Counters::name, (c) -> { - var ops = c.operations().sum(); - return new BlobStoreActionStats(ops, ops); - })); - } - + public Map toMap() { + return operations.entrySet().stream().collect(Collectors.toMap(e -> e.getKey().key, e -> { + var ops = e.getValue().sum(); + return new BlobStoreActionStats(ops, ops); + })); } + // void recordExceptionHistogram(OperationPurpose purpose, Operation operation, ) + /** - * Serverless tracker is 2-dimensional: OperationPurpose and Operations. - * Every combination of these has own set of counters: number of operations and number of http requests. - * A single operation might have multiple HTTP requests, for example a single ResumableUpload operation - * can perform a series of HTTP requests with size up to {@link GoogleCloudStorageBlobStore#SDK_DEFAULT_CHUNK_SIZE}. - *
-     * {@code
-     * | Purpose      | Operation       | OperationsCnt | RequestCnt |
-     * |--------------+-----------------+---------------+------------|
-     * | SnapshotData | GetObject       |             1 |          1 |
-     * | SnapshotData | ListObjects     |             2 |          2 |
-     * | SnapshotData | ResumableUpload |             1 |         10 |
-     * | SnapshotData | ...             |               |            |
-     * | Translog     | GetObject       |             5 |          5 |
-     * | ...          |                 |               |            |
-     * }
-     * 
+ * enumerates telemetry counters */ - static final class ServerlessTracker implements StatsTracker { - private final EnumMap> counters; - - ServerlessTracker(String bucketName) { - this.counters = new EnumMap<>(OperationPurpose.class); - for (var purpose : OperationPurpose.values()) { - var operations = new EnumMap(Operation.class); - for (var operation : Operation.values()) { - operations.put(operation, new Counters(purpose.getKey() + "_" + operation.key())); - } - counters.put(purpose, operations); - } - } - - @Override - public void trackOperation(OperationPurpose purpose, Operation operation) { - counters.get(purpose).get(operation).operations.add(1); - } + enum Counter { + REQUEST, + REQUEST_EXCEPTION, + RANGE_NOT_SATISFIED, + THROTTLE, + OPERATION, + OPERATION_EXCEPTION + } - @Override - public void trackRequest(OperationPurpose purpose, Operation operation) { - counters.get(purpose).get(operation).requests.add(1); - } + enum Operation { + GET_OBJECT("GetObject"), + LIST_OBJECTS("ListObjects"), + INSERT_OBJECT("InsertObject"); - @Override - public void trackOperationAndRequest(OperationPurpose purpose, Operation operation) { - var c = counters.get(purpose).get(operation); - c.requests.add(1); - c.operations.add(1); - } + private final String key; - @Override - public Map toMap() { - return counters.values() - .stream() - .flatMap(ops -> ops.values().stream()) - .collect( - Collectors.toUnmodifiableMap(Counters::name, (c) -> new BlobStoreActionStats(c.operations.sum(), c.requests.sum())) - ); + Operation(String key) { + this.key = key; } - } - - private record Counters(String name, LongAdder operations, LongAdder requests) { - Counters(String name) { - this(name, new LongAdder(), new LongAdder()); + public String key() { + return key; } } } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java index 81ebe26b1e058..909e410d28f37 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java @@ -34,14 +34,14 @@ public class GoogleCloudStoragePlugin extends Plugin implements RepositoryPlugin @SuppressWarnings("this-escape") public GoogleCloudStoragePlugin(final Settings settings) { - this.storageService = createStorageService(settings); + this.storageService = createStorageService(); // eagerly load client settings so that secure settings are readable (not closed) reload(settings); } // overridable for tests - protected GoogleCloudStorageService createStorageService(Settings settings) { - return new GoogleCloudStorageService(settings); + protected GoogleCloudStorageService createStorageService() { + return new GoogleCloudStorageService(); } @Override @@ -61,7 +61,8 @@ public Map getRepositories( this.storageService, clusterService, bigArrays, - recoverySettings + recoverySettings, + repositoriesMetrics ) ); } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java index 16233d3b391d7..4b50b2f9ce509 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.TimeValue; import org.elasticsearch.indices.recovery.RecoverySettings; +import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -85,6 +86,7 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository { private final TimeValue retryThrottledCasDelayIncrement; private final int retryThrottledCasMaxNumberOfRetries; private final TimeValue retryThrottledCasMaxDelay; + private final RepositoriesMetrics repositoriesMetrics; GoogleCloudStorageRepository( final RepositoryMetadata metadata, @@ -92,7 +94,8 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository { final GoogleCloudStorageService storageService, final ClusterService clusterService, final BigArrays bigArrays, - final RecoverySettings recoverySettings + final RecoverySettings recoverySettings, + final RepositoriesMetrics repositoriesMetrics ) { super( metadata, @@ -104,7 +107,7 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository { buildLocation(metadata) ); this.storageService = storageService; - + this.repositoriesMetrics = repositoriesMetrics; this.chunkSize = getSetting(CHUNK_SIZE, metadata); this.bucket = getSetting(BUCKET, metadata); this.clientName = CLIENT_NAME.get(metadata.settings()); @@ -140,7 +143,9 @@ protected GoogleCloudStorageBlobStore createBlobStore() { storageService, bigArrays, bufferSize, - BackoffPolicy.linearBackoff(retryThrottledCasDelayIncrement, retryThrottledCasMaxNumberOfRetries, retryThrottledCasMaxDelay) + BackoffPolicy.linearBackoff(retryThrottledCasDelayIncrement, retryThrottledCasMaxNumberOfRetries, retryThrottledCasMaxDelay), + metadata, + repositoriesMetrics ); } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java index 6263853775693..aad4f9aea642b 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java @@ -26,10 +26,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.OperationPurpose; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.SuppressForbidden; @@ -57,11 +55,6 @@ public class GoogleCloudStorageService { private static final Logger logger = LogManager.getLogger(GoogleCloudStorageService.class); private volatile Map clientSettings = emptyMap(); - private final boolean isStateless; - - public GoogleCloudStorageService(Settings settings) { - this.isStateless = DiscoveryNode.isStateless(settings); - } private record ClientKey(OperationPurpose purpose, String repositoryName) {} @@ -137,10 +130,6 @@ public Storage client( } } - boolean isStateless() { - return isStateless; - } - synchronized void closeRepositoryClients(String repositoryName) { clientCache = clientCache.entrySet() .stream() diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index 0d8933b44b7ab..b08e4b8210e9e 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -21,6 +21,7 @@ import com.sun.net.httpserver.HttpHandler; import org.apache.http.HttpStatus; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.BackoffPolicy; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; @@ -44,6 +45,7 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.http.ResponseInjectingHttpHandler; +import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase; import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; import org.elasticsearch.rest.RestStatus; @@ -140,7 +142,7 @@ protected BlobContainer createBlobContainer( secureSettings.setFile(CREDENTIALS_FILE_SETTING.getConcreteSettingForNamespace(client).getKey(), createServiceAccount(random())); clientSettings.setSecureSettings(secureSettings); - final GoogleCloudStorageService service = new GoogleCloudStorageService(Settings.EMPTY) { + final GoogleCloudStorageService service = new GoogleCloudStorageService() { @Override StorageOptions createStorageOptions( final GoogleCloudStorageClientSettings gcsClientSettings, @@ -202,7 +204,9 @@ public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions ser service, BigArrays.NON_RECYCLING_INSTANCE, randomIntBetween(1, 8) * 1024, - BackoffPolicy.linearBackoff(TimeValue.timeValueMillis(1), 3, TimeValue.timeValueSeconds(1)) + BackoffPolicy.linearBackoff(TimeValue.timeValueMillis(1), 3, TimeValue.timeValueSeconds(1)), + new RepositoryMetadata("repo", "gcs", Settings.EMPTY), + RepositoriesMetrics.NOOP ); return new GoogleCloudStorageBlobContainer(randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo"), blobStore); diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java index 971e072a18f65..d76bd50a616d5 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java @@ -15,6 +15,7 @@ import com.google.auth.oauth2.ServiceAccountCredentials; import com.sun.net.httpserver.HttpServer; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.BackoffPolicy; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStoreActionStats; @@ -31,6 +32,7 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.mocksocket.MockHttpServer; +import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -47,7 +49,6 @@ import java.util.Map; import java.util.concurrent.TimeUnit; -import static org.elasticsearch.cluster.node.DiscoveryNode.STATELESS_ENABLED_SETTING_NAME; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.APPLICATION_NAME_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CONNECT_TIMEOUT_SETTING; @@ -111,14 +112,7 @@ public void createStorageService() throws Exception { threadPool = new TestThreadPool(getTestClass().getName()); httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); httpServer.start(); - isServerless = randomBoolean(); - Settings settings; - if (isServerless) { - settings = Settings.builder().put(STATELESS_ENABLED_SETTING_NAME, true).build(); - } else { - settings = Settings.EMPTY; - } - googleCloudStorageService = new GoogleCloudStorageService(settings); + googleCloudStorageService = new GoogleCloudStorageService(); googleCloudStorageHttpHandler = new GoogleCloudStorageHttpHandler(BUCKET); httpServer.createContext("/", googleCloudStorageHttpHandler); httpServer.createContext("/token", new FakeOAuth2HttpHandler()); @@ -271,7 +265,9 @@ private ContainerAndBlobStore createBlobContainer(final String repositoryName) t googleCloudStorageService, BigArrays.NON_RECYCLING_INSTANCE, Math.toIntExact(BUFFER_SIZE.getBytes()), - BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(10), 10) + BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(10), 10), + new RepositoryMetadata(repositoryName, "gcs", Settings.EMPTY), + RepositoriesMetrics.NOOP ); final GoogleCloudStorageBlobContainer googleCloudStorageBlobContainer = new GoogleCloudStorageBlobContainer( BlobPath.EMPTY, diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java index c3693d70c8469..3c8c3f221f388 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java @@ -17,12 +17,15 @@ import com.google.cloud.storage.StorageBatchResult; import com.google.cloud.storage.StorageException; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.BackoffPolicy; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.OperationPurpose; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -97,7 +100,9 @@ public void testDeleteBlobsIgnoringIfNotExistsThrowsIOException() throws Excepti storageService, BigArrays.NON_RECYCLING_INSTANCE, randomIntBetween(1, 8) * 1024, - BackoffPolicy.noBackoff() + BackoffPolicy.noBackoff(), + new RepositoryMetadata("repo", "gcs", Settings.EMPTY), + RepositoriesMetrics.NOOP ) ) { final BlobContainer container = store.blobContainer(BlobPath.EMPTY); diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java index 904deb710f24f..437ddfbff9501 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java @@ -18,12 +18,14 @@ import org.apache.http.entity.StringEntity; import org.apache.http.protocol.HttpContext; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.blobstore.OperationPurpose; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.XContentBuilder; @@ -45,6 +47,14 @@ public class GoogleCloudStorageServiceTests extends ESTestCase { + private GoogleCloudStorageOperationsStats gcpStats(String bucket) { + return new GoogleCloudStorageOperationsStats( + bucket, + new RepositoryMetadata("repo", "gcp", Settings.EMPTY), + RepositoriesMetrics.NOOP + ); + } + public void testClientInitializer() throws Exception { final String clientName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT); final TimeValue connectTimeValue = TimeValue.timeValueNanos(randomIntBetween(0, 2000000)); @@ -75,14 +85,14 @@ public void testClientInitializer() throws Exception { .put(GoogleCloudStorageClientSettings.PROXY_PORT_SETTING.getConcreteSettingForNamespace(clientName).getKey(), 8080) .build(); SetOnce proxy = new SetOnce<>(); - final GoogleCloudStorageService service = new GoogleCloudStorageService(Settings.EMPTY) { + final GoogleCloudStorageService service = new GoogleCloudStorageService() { @Override void notifyProxyIsSet(Proxy p) { proxy.set(p); } }; service.refreshAndClearCache(GoogleCloudStorageClientSettings.load(settings)); - GoogleCloudStorageOperationsStats statsCollector = new GoogleCloudStorageOperationsStats("bucket"); + GoogleCloudStorageOperationsStats statsCollector = gcpStats("bucket"); final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, () -> service.client("another_client", "repo", randomPurpose(), statsCollector) @@ -118,7 +128,7 @@ public void testReinitClientSettings() throws Exception { final Settings settings2 = Settings.builder().setSecureSettings(secureSettings2).build(); try (GoogleCloudStoragePlugin plugin = new GoogleCloudStoragePlugin(settings1)) { final GoogleCloudStorageService storageService = plugin.storageService; - GoogleCloudStorageOperationsStats statsCollector = new GoogleCloudStorageOperationsStats("bucket"); + GoogleCloudStorageOperationsStats statsCollector = gcpStats("bucket"); final Storage client11 = storageService.client("gcs1", "repo1", randomPurpose(), statsCollector); assertThat(client11.getOptions().getProjectId(), equalTo("project_gcs11")); final Storage client12 = storageService.client("gcs2", "repo2", randomPurpose(), statsCollector); @@ -159,30 +169,10 @@ public void testClientsAreNotSharedAcrossRepositories() throws Exception { final OperationPurpose operationPurpose = randomPurpose(); final OperationPurpose differentOperationPurpose = randomValueOtherThan(operationPurpose, BlobStoreTestUtil::randomPurpose); - final Storage repo1Client = storageService.client( - "gcs1", - "repo1", - operationPurpose, - new GoogleCloudStorageOperationsStats("bucket") - ); - final Storage repo1ClientOtherPurpose = storageService.client( - "gcs1", - "repo1", - differentOperationPurpose, - new GoogleCloudStorageOperationsStats("bucket") - ); - final Storage repo2Client = storageService.client( - "gcs1", - "repo2", - operationPurpose, - new GoogleCloudStorageOperationsStats("bucket") - ); - final Storage repo1ClientSecondInstance = storageService.client( - "gcs1", - "repo1", - operationPurpose, - new GoogleCloudStorageOperationsStats("bucket") - ); + final Storage repo1Client = storageService.client("gcs1", "repo1", operationPurpose, gcpStats("bucket")); + final Storage repo1ClientOtherPurpose = storageService.client("gcs1", "repo1", differentOperationPurpose, gcpStats("bucket")); + final Storage repo2Client = storageService.client("gcs1", "repo2", operationPurpose, gcpStats("bucket")); + final Storage repo1ClientSecondInstance = storageService.client("gcs1", "repo1", operationPurpose, gcpStats("bucket")); assertNotSame(repo1Client, repo2Client); assertNotSame(repo1Client, repo1ClientOtherPurpose);