diff --git a/docs/changelog/125452.yaml b/docs/changelog/125452.yaml new file mode 100644 index 0000000000000..e12d9c7b69b6a --- /dev/null +++ b/docs/changelog/125452.yaml @@ -0,0 +1,5 @@ +pr: 125452 +summary: Add GCS telemetry with `ThreadLocal` +area: Snapshot/Restore +type: enhancement +issues: [] diff --git a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index 8a15b85262b6c..0597ab303004e 100644 --- a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -17,7 +17,6 @@ import com.google.cloud.http.HttpTransportOptions; import com.google.cloud.storage.StorageOptions; import com.google.cloud.storage.StorageRetryStrategy; -import com.sun.net.httpserver.Headers; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; @@ -57,15 +56,12 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import static org.elasticsearch.common.io.Streams.readFully; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING; -import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BASE_PATH; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BUCKET; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.CLIENT_NAME; @@ -107,7 +103,11 @@ protected Map createHttpHandlers() { @Override protected HttpHandler createErroneousHttpHandler(final HttpHandler delegate) { - return new GoogleErroneousHttpHandler(delegate, randomIntBetween(2, 3)); + if (delegate instanceof FakeOAuth2HttpHandler) { + return new GoogleErroneousHttpHandler(delegate, randomIntBetween(2, 3)); + } else { + return new GoogleCloudStorageStatsCollectorHttpHandler(new GoogleErroneousHttpHandler(delegate, randomIntBetween(2, 3))); + } } @Override @@ -223,6 +223,11 @@ public void testWriteFileMultipleOfChunkSize() throws IOException { } } + @Override + public void testRequestStats() throws Exception { + super.testRequestStats(); + } + public static class TestGoogleCloudStoragePlugin extends GoogleCloudStoragePlugin { public TestGoogleCloudStoragePlugin(Settings settings) { @@ -230,8 +235,8 @@ public TestGoogleCloudStoragePlugin(Settings settings) { } @Override - protected GoogleCloudStorageService createStorageService(Settings settings) { - return new GoogleCloudStorageService(settings) { + protected GoogleCloudStorageService createStorageService(boolean isServerless) { + return new GoogleCloudStorageService() { @Override StorageOptions createStorageOptions( final GoogleCloudStorageClientSettings gcsClientSettings, @@ -277,7 +282,8 @@ public Map getRepositories( this.storageService, clusterService, bigArrays, - recoverySettings + recoverySettings, + new GcsRepositoryStatsCollector() ) { @Override protected GoogleCloudStorageBlobStore createBlobStore() { @@ -288,7 +294,8 @@ protected GoogleCloudStorageBlobStore createBlobStore() { storageService, bigArrays, randomIntBetween(1, 8) * 1024, - BackoffPolicy.noBackoff() + BackoffPolicy.noBackoff(), + this.statsCollector() ) { @Override long getLargeBlobThresholdInBytes() { @@ -356,8 +363,6 @@ protected boolean canFailRequest(final HttpExchange exchange) { @SuppressForbidden(reason = "this tests uses a HttpServer to emulate an GCS endpoint") private static class GoogleCloudStorageStatsCollectorHttpHandler extends HttpStatsCollectorHandler { - public static final Pattern contentRangeMatcher = Pattern.compile("bytes \\d+-(\\d+)/(\\d+)"); - GoogleCloudStorageStatsCollectorHttpHandler(final HttpHandler delegate) { super(delegate); } @@ -365,34 +370,17 @@ private static class GoogleCloudStorageStatsCollectorHttpHandler extends HttpSta @Override public void maybeTrack(HttpExchange exchange) { final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString(); - final Headers requestHeaders = exchange.getRequestHeaders(); if (Regex.simpleMatch("GET */storage/v1/b/*/o/*", request)) { - trackRequest(Operation.GET_OBJECT.key()); + trackRequest(StorageOperation.GET.key()); } else if (Regex.simpleMatch("GET /storage/v1/b/*/o*", request)) { - trackRequest(Operation.LIST_OBJECTS.key()); - } else if (Regex.simpleMatch("PUT /upload/storage/v1/b/*uploadType=resumable*", request) && isLastPart(requestHeaders)) { - // Resumable uploads are billed as a single operation, that's the reason we're tracking - // the request only when it's the last part. - // See https://cloud.google.com/storage/docs/resumable-uploads#introduction - trackRequest(Operation.INSERT_OBJECT.key()); + trackRequest(StorageOperation.LIST.key()); + } else if (Regex.simpleMatch("POST /upload/storage/v1/b/*uploadType=resumable*", request)) { + trackRequest(StorageOperation.INSERT.key()); + } else if (Regex.simpleMatch("PUT /upload/storage/v1/b/*uploadType=resumable*", request)) { + trackRequest(StorageOperation.INSERT.key()); } else if (Regex.simpleMatch("POST /upload/storage/v1/b/*uploadType=multipart*", request)) { - trackRequest(Operation.INSERT_OBJECT.key()); + trackRequest(StorageOperation.INSERT.key()); } } - - boolean isLastPart(Headers requestHeaders) { - if (requestHeaders.containsKey("Content-range") == false) return false; - - // https://cloud.google.com/storage/docs/json_api/v1/parameters#contentrange - final String contentRange = requestHeaders.getFirst("Content-range"); - - final Matcher matcher = contentRangeMatcher.matcher(contentRange); - - if (matcher.matches() == false) return false; - - String upperBound = matcher.group(1); - String totalLength = matcher.group(2); - return Integer.parseInt(upperBound) == Integer.parseInt(totalLength) - 1; - } } } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GcsRepositoryStatsCollector.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GcsRepositoryStatsCollector.java new file mode 100644 index 0000000000000..c767fdf2bb1ca --- /dev/null +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GcsRepositoryStatsCollector.java @@ -0,0 +1,273 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.repositories.gcs; + +import com.google.api.client.http.HttpResponseInterceptor; + +import org.apache.lucene.util.IORunnable; +import org.apache.lucene.util.IOSupplier; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.common.blobstore.BlobStoreActionStats; +import org.elasticsearch.common.blobstore.OperationPurpose; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.time.TimeProvider; +import org.elasticsearch.repositories.RepositoriesMetrics; + +import java.io.IOException; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.Supplier; + +/** + * Stats collector class that performs metrics initialization and propagation through GCS client + * calls. This class encapsulates ThreadLocal metrics access. + */ +public class GcsRepositoryStatsCollector { + + static final TimeProvider NOOP_TIMER = new TimeProvider() { + @Override + public long relativeTimeInMillis() { + return 0; + } + + @Override + public long relativeTimeInNanos() { + return 0; + } + + @Override + public long rawRelativeTimeInMillis() { + return 0; + } + + @Override + public long absoluteTimeInMillis() { + return 0; + } + }; + + private static final ThreadLocal OPERATION_STATS = new ThreadLocal<>(); + + /** + * All 2xx and 308 codes are success. 404 is expected on blob existence check. + * Also, 404 is billable error. Other errors 307, 4xx, and 5xx are not billable. + * GCS pricing tables. + */ + public static final HttpResponseInterceptor METERING_INTERCEPTOR = response -> { + var stats = getThreadLocal(); + var code = response.getStatusCode(); + stats.requestAttempts += 1; + stats.isLastRequestSucceed = true; + if (((code >= 200 && code < 300) || code == 308 || code == 404) == false) { + stats.requestError += 1; + stats.isLastRequestSucceed = false; + switch (code) { + case 416 -> stats.requestRangeError += 1; + case 429 -> stats.requestThrottle += 1; + } + } + }; + + /** + * Track operations for billing and REST API + */ + private final EnumMap> collectors; + + /** + * Telemetry (APM) + */ + private final RepositoriesMetrics telemetry; + private final EnumMap>> telemetryAttributes; + private final TimeProvider timer; + + GcsRepositoryStatsCollector() { + this(NOOP_TIMER, new RepositoryMetadata(GoogleCloudStorageRepository.TYPE, "", Settings.EMPTY), RepositoriesMetrics.NOOP); + } + + GcsRepositoryStatsCollector(TimeProvider timer, RepositoryMetadata metadata, RepositoriesMetrics repositoriesMetrics) { + this.timer = timer; + this.telemetry = repositoriesMetrics; + this.collectors = new EnumMap<>(OperationPurpose.class); + for (var purpose : OperationPurpose.values()) { + var operationsMap = new EnumMap(StorageOperation.class); + for (var op : StorageOperation.values()) { + operationsMap.put(op, new Collector(new LongAdder(), new LongAdder())); + } + collectors.put(purpose, operationsMap); + } + this.telemetryAttributes = new EnumMap<>(OperationPurpose.class); + for (var purpose : OperationPurpose.values()) { + var purposeMap = new EnumMap>(StorageOperation.class); + telemetryAttributes.put(purpose, purposeMap); + for (var operation : StorageOperation.values()) { + var attrMap = RepositoriesMetrics.createAttributesMap(metadata, purpose, operation.key); + purposeMap.put(operation, attrMap); + } + } + } + + private static OperationStats initAndGetThreadLocal(OperationPurpose purpose, StorageOperation operation) { + assert OPERATION_STATS.get() == null : "cannot init stats, thread local is not empty"; + var stats = new OperationStats(purpose, operation); + OPERATION_STATS.set(stats); + return stats; + } + + static OperationStats getThreadLocal() { + var stats = OPERATION_STATS.get(); + assert stats != null : "must initialize operation stats"; + return stats; + } + + private static void setThreadLocal(OperationStats stats) { + assert OPERATION_STATS.get() == null : "cannot set stats, thread local is not empty"; + OPERATION_STATS.set(stats); + } + + private static void clearThreadLocal() { + assert OPERATION_STATS.get() != null : "cannot clear already emptied thread local"; + OPERATION_STATS.remove(); + } + + /** + * Continue collecting metrics with given OperationStats. Useful for readers and writers. + */ + public T continueWithStats(OperationStats stats, IOSupplier blobFn) throws IOException { + setThreadLocal(stats); + var t = timer.absoluteTimeInMillis(); + try { + return blobFn.get(); + } finally { + stats.totalDuration += timer.absoluteTimeInMillis() - t; + clearThreadLocal(); + } + } + + /** + * Final step in continual collection + */ + public void finishRunnable(OperationStats stats, IORunnable runnable) throws IOException { + setThreadLocal(stats); + var t = timer.absoluteTimeInMillis(); + try { + runnable.run(); + } finally { + stats.totalDuration += timer.absoluteTimeInMillis() - t; + collect(stats); + clearThreadLocal(); + } + } + + public void collectIORunnable(OperationPurpose purpose, StorageOperation operation, IORunnable runnable) throws IOException { + var stats = initAndGetThreadLocal(purpose, operation); + var t = timer.absoluteTimeInMillis(); + try { + runnable.run(); + } finally { + stats.totalDuration += timer.absoluteTimeInMillis() - t; + clearThreadLocal(); + } + } + + public void collectRunnable(OperationPurpose purpose, StorageOperation operation, Runnable runnable) { + var t = timer.absoluteTimeInMillis(); + var stats = initAndGetThreadLocal(purpose, operation); + try { + runnable.run(); + } finally { + stats.totalDuration += timer.absoluteTimeInMillis() - t; + collect(stats); + clearThreadLocal(); + } + } + + public T collectIOSupplier(OperationPurpose purpose, StorageOperation operation, IOSupplier blobFn) throws IOException { + var t = timer.absoluteTimeInMillis(); + var stats = initAndGetThreadLocal(purpose, operation); + try { + return blobFn.get(); + } finally { + stats.totalDuration += timer.absoluteTimeInMillis() - t; + collect(stats); + clearThreadLocal(); + } + } + + public T collectSupplier(OperationPurpose purpose, StorageOperation operation, Supplier blobFn) { + var t = timer.absoluteTimeInMillis(); + var stats = initAndGetThreadLocal(purpose, operation); + try { + return blobFn.get(); + } finally { + stats.totalDuration += timer.absoluteTimeInMillis() - t; + collect(stats); + clearThreadLocal(); + } + } + + private void collect(OperationStats stats) { + if (stats.requestAttempts == 0) { + return; // nothing happened + } + var purpose = stats.purpose; + var operation = stats.operation; + var operationSuccess = stats.isLastRequestSucceed ? 1 : 0; + var operationError = stats.isLastRequestSucceed ? 0 : 1; + var collector = collectors.get(purpose).get(operation); + assert collector != null; + collector.operations.add(operationSuccess); + collector.requests.add(stats.requestAttempts); + + var attr = telemetryAttributes.get(purpose).get(operation); + assert attr != null; + telemetry.operationCounter().incrementBy(operationSuccess, attr); + telemetry.unsuccessfulOperationCounter().incrementBy(operationError, attr); + telemetry.requestCounter().incrementBy(stats.requestAttempts, attr); + telemetry.exceptionCounter().incrementBy(stats.requestError, attr); + telemetry.exceptionHistogram().record(stats.requestError, attr); + telemetry.throttleCounter().incrementBy(stats.requestThrottle, attr); + telemetry.throttleHistogram().record(stats.requestThrottle, attr); + telemetry.requestRangeNotSatisfiedExceptionCounter().incrementBy(stats.requestRangeError, attr); + telemetry.httpRequestTimeInMillisHistogram().record(stats.totalDuration, attr); + } + + public Map operationsStats(boolean isServerless) { + var out = new HashMap(); + // Map<'Purpose_Operation', + for (var purposeCollector : collectors.entrySet()) { + for (var operationCollector : collectors.get(purposeCollector.getKey()).entrySet()) { + var collector = operationCollector.getValue(); + var operations = collector.operations.sum(); + var requests = collector.requests.sum(); + if (isServerless) { + // Map<'Purpose_Operation', + out.put( + purposeCollector.getKey().getKey() + "_" + operationCollector.getKey().key(), + new BlobStoreActionStats(operations, requests) + ); + } else { + // merge all purposes into Map<'Operation', + out.compute(operationCollector.getKey().key(), (k, v) -> { + if (v == null) { + return new BlobStoreActionStats(operations, requests); + } else { + return v.add(new BlobStoreActionStats(operations, requests)); + } + }); + } + } + } + return out; + } + + record Collector(LongAdder operations, LongAdder requests) {} +} diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index a529d08ce47b5..c4b73f35ac6e8 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -9,7 +9,6 @@ package org.elasticsearch.repositories.gcs; -import com.google.api.gax.paging.Page; import com.google.cloud.BaseServiceException; import com.google.cloud.BatchResult; import com.google.cloud.WriteChannel; @@ -44,7 +43,6 @@ import org.elasticsearch.core.Streams; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation; import org.elasticsearch.rest.RestStatus; import java.io.ByteArrayInputStream; @@ -111,7 +109,7 @@ class GoogleCloudStorageBlobStore implements BlobStore { private final String clientName; private final String repositoryName; private final GoogleCloudStorageService storageService; - private final GoogleCloudStorageOperationsStats stats; + private final GcsRepositoryStatsCollector statsCollector; private final int bufferSize; private final BigArrays bigArrays; private final BackoffPolicy casBackoffPolicy; @@ -123,20 +121,21 @@ class GoogleCloudStorageBlobStore implements BlobStore { GoogleCloudStorageService storageService, BigArrays bigArrays, int bufferSize, - BackoffPolicy casBackoffPolicy + BackoffPolicy casBackoffPolicy, + GcsRepositoryStatsCollector statsCollector ) { this.bucketName = bucketName; this.clientName = clientName; this.repositoryName = repositoryName; this.storageService = storageService; this.bigArrays = bigArrays; - this.stats = new GoogleCloudStorageOperationsStats(bucketName, storageService.isStateless()); + this.statsCollector = statsCollector; this.bufferSize = bufferSize; this.casBackoffPolicy = casBackoffPolicy; } - private Storage client(OperationPurpose purpose) throws IOException { - return storageService.client(clientName, repositoryName, purpose, stats); + private MeteredStorage client() throws IOException { + return storageService.client(clientName, repositoryName, statsCollector); } @Override @@ -174,7 +173,7 @@ Map listBlobsByPrefix(OperationPurpose purpose, String pat final String pathPrefix = buildKey(path, prefix); final Map mapBuilder = new HashMap<>(); SocketAccess.doPrivilegedVoidIOException( - () -> client(purpose).list(bucketName, BlobListOption.currentDirectory(), BlobListOption.prefix(pathPrefix)) + () -> client().meteredList(purpose, bucketName, BlobListOption.currentDirectory(), BlobListOption.prefix(pathPrefix)) .iterateAll() .forEach(blob -> { assert blob.getName().startsWith(path); @@ -191,7 +190,7 @@ Map listChildren(OperationPurpose purpose, BlobPath path) final String pathStr = path.buildAsString(); final Map mapBuilder = new HashMap<>(); SocketAccess.doPrivilegedVoidIOException( - () -> client(purpose).list(bucketName, BlobListOption.currentDirectory(), BlobListOption.prefix(pathStr)) + () -> client().meteredList(purpose, bucketName, BlobListOption.currentDirectory(), BlobListOption.prefix(pathStr)) .iterateAll() .forEach(blob -> { if (blob.isDirectory()) { @@ -217,7 +216,7 @@ Map listChildren(OperationPurpose purpose, BlobPath path) */ boolean blobExists(OperationPurpose purpose, String blobName) throws IOException { final BlobId blobId = BlobId.of(bucketName, blobName); - final Blob blob = SocketAccess.doPrivilegedIOException(() -> client(purpose).get(blobId)); + final Blob blob = SocketAccess.doPrivilegedIOException(() -> client().meteredGet(purpose, blobId)); return blob != null; } @@ -229,7 +228,7 @@ boolean blobExists(OperationPurpose purpose, String blobName) throws IOException * @return the InputStream used to read the blob's content */ InputStream readBlob(OperationPurpose purpose, String blobName) throws IOException { - return new GoogleCloudStorageRetryingInputStream(client(purpose), BlobId.of(bucketName, blobName)); + return new GoogleCloudStorageRetryingInputStream(purpose, client(), BlobId.of(bucketName, blobName)); } /** @@ -252,7 +251,8 @@ InputStream readBlob(OperationPurpose purpose, String blobName, long position, l return new ByteArrayInputStream(new byte[0]); } else { return new GoogleCloudStorageRetryingInputStream( - client(purpose), + purpose, + client(), BlobId.of(bucketName, blobName), position, Math.addExact(position, length - 1) @@ -375,8 +375,8 @@ public void write(byte[] b, int off, int len) throws IOException { } private void initResumableStream() throws IOException { - final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException( - () -> client(purpose).writer(blobInfo, writeOptions) + final var writeChannel = SocketAccess.doPrivilegedIOException( + () -> client().meteredWriter(purpose, blobInfo, writeOptions) ); channelRef.set(writeChannel); resumableStream = new FilterOutputStream(Channels.newOutputStream(new WritableBlobChannel(writeChannel))) { @@ -397,7 +397,6 @@ public void write(byte[] b, int off, int len) throws IOException { final WritableByteChannel writeChannel = channelRef.get(); if (writeChannel != null) { SocketAccess.doPrivilegedVoidIOException(writeChannel::close); - stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT); } else { writeBlob(purpose, blobName, buffer.bytes(), failIfAlreadyExists); } @@ -455,7 +454,7 @@ private void writeBlobResumable( for (int retry = 0; retry < 3; ++retry) { try { final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException( - () -> client(purpose).writer(blobInfo, writeOptions) + () -> client().meteredWriter(purpose, blobInfo, writeOptions) ); /* * It is not enough to wrap the call to Streams#copy, we have to wrap the privileged calls too; this is because Streams#copy @@ -463,11 +462,6 @@ private void writeBlobResumable( */ org.elasticsearch.core.Streams.copy(inputStream, Channels.newOutputStream(new WritableBlobChannel(writeChannel)), buffer); SocketAccess.doPrivilegedVoidIOException(writeChannel::close); - // We don't track this operation on the http layer as - // we do with the GET/LIST operations since this operations - // can trigger multiple underlying http requests but only one - // operation is billed. - stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT); return; } catch (final StorageException se) { final int errorCode = se.getCode(); @@ -514,12 +508,9 @@ private void writeBlobMultipart( final Storage.BlobTargetOption[] targetOptions = failIfAlreadyExists ? new Storage.BlobTargetOption[] { Storage.BlobTargetOption.doesNotExist() } : new Storage.BlobTargetOption[0]; - SocketAccess.doPrivilegedVoidIOException(() -> client(purpose).create(blobInfo, buffer, offset, blobSize, targetOptions)); - // We don't track this operation on the http layer as - // we do with the GET/LIST operations since this operations - // can trigger multiple underlying http requests but only one - // operation is billed. - stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT); + SocketAccess.doPrivilegedVoidIOException( + () -> client().meteredCreate(purpose, blobInfo, buffer, offset, blobSize, targetOptions) + ); } catch (final StorageException se) { if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) { throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage()); @@ -537,11 +528,11 @@ private void writeBlobMultipart( DeleteResult deleteDirectory(OperationPurpose purpose, String pathStr) throws IOException { return SocketAccess.doPrivilegedIOException(() -> { DeleteResult deleteResult = DeleteResult.ZERO; - Page page = client(purpose).list(bucketName, BlobListOption.prefix(pathStr)); + MeteredStorage.MeteredBlobPage meteredPage = client().meteredList(purpose, bucketName, BlobListOption.prefix(pathStr)); do { final AtomicLong blobsDeleted = new AtomicLong(0L); final AtomicLong bytesDeleted = new AtomicLong(0L); - final Iterator blobs = page.getValues().iterator(); + var blobs = meteredPage.getValues().iterator(); deleteBlobs(purpose, new Iterator<>() { @Override public boolean hasNext() { @@ -557,8 +548,8 @@ public String next() { } }); deleteResult = deleteResult.add(blobsDeleted.get(), bytesDeleted.get()); - page = page.getNextPage(); - } while (page != null); + meteredPage = meteredPage.getNextPage(); + } while (meteredPage != null); return deleteResult; }); } @@ -588,7 +579,7 @@ public BlobId next() { try { SocketAccess.doPrivilegedVoidIOException(() -> { final AtomicReference ioe = new AtomicReference<>(); - StorageBatch batch = client(purpose).batch(); + StorageBatch batch = client().batch(); int pendingDeletesInBatch = 0; while (blobIdsToDelete.hasNext()) { BlobId blob = blobIdsToDelete.next(); @@ -612,7 +603,7 @@ public void error(StorageException exception) { pendingDeletesInBatch++; if (pendingDeletesInBatch % MAX_DELETES_PER_BATCH == 0) { batch.submit(); - batch = client(purpose).batch(); + batch = client().batch(); pendingDeletesInBatch = 0; } } @@ -638,7 +629,7 @@ private static String buildKey(String keyPath, String s) { @Override public Map stats() { - return stats.tracker().toMap(); + return statsCollector.operationsStats(storageService.isServerless()); } private static final class WritableBlobChannel implements WritableByteChannel { @@ -679,8 +670,8 @@ public void close() { OptionalBytesReference getRegister(OperationPurpose purpose, String blobName, String container, String key) throws IOException { final var blobId = BlobId.of(bucketName, blobName); try ( - var readChannel = SocketAccess.doPrivilegedIOException(() -> client(purpose).reader(blobId)); - var stream = new PrivilegedReadChannelStream(readChannel) + var meteredReadChannel = SocketAccess.doPrivilegedIOException(() -> client().meteredReader(purpose, blobId)); + var stream = new PrivilegedReadChannelStream(meteredReadChannel) ) { return OptionalBytesReference.of(BlobContainerUtils.getRegisterUsingConsistentRead(stream, container, key)); } catch (Exception e) { @@ -706,7 +697,7 @@ OptionalBytesReference compareAndExchangeRegister( BlobContainerUtils.ensureValidRegisterContent(updated); final var blobId = BlobId.of(bucketName, blobName); - final var blob = SocketAccess.doPrivilegedIOException(() -> client(purpose).get(blobId)); + final var blob = SocketAccess.doPrivilegedIOException(() -> client().meteredGet(purpose, blobId)); final long generation; if (blob == null || blob.getGeneration() == null) { @@ -719,7 +710,7 @@ OptionalBytesReference compareAndExchangeRegister( try ( var stream = new PrivilegedReadChannelStream( SocketAccess.doPrivilegedIOException( - () -> client(purpose).reader(blobId, Storage.BlobSourceOption.generationMatch(generation)) + () -> client().meteredReader(purpose, blobId, Storage.BlobSourceOption.generationMatch(generation)) ) ) ) { @@ -751,7 +742,8 @@ OptionalBytesReference compareAndExchangeRegister( while (true) { try { SocketAccess.doPrivilegedVoidIOException( - () -> client(purpose).create( + () -> client().meteredCreate( + purpose, blobInfo, bytesRef.bytes, bytesRef.offset, @@ -759,7 +751,6 @@ OptionalBytesReference compareAndExchangeRegister( Storage.BlobTargetOption.generationMatch() ) ); - stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT); return OptionalBytesReference.of(expected); } catch (Exception e) { final var serviceException = unwrapServiceException(e); diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageHttpStatsCollector.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageHttpStatsCollector.java deleted file mode 100644 index 669a228535c63..0000000000000 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageHttpStatsCollector.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.repositories.gcs; - -import com.google.api.client.http.HttpResponse; -import com.google.api.client.http.HttpResponseInterceptor; - -import org.elasticsearch.common.blobstore.OperationPurpose; -import org.elasticsearch.logging.LogManager; -import org.elasticsearch.logging.Logger; - -import java.util.regex.Pattern; - -import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation; -import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.StatsTracker; - -final class GoogleCloudStorageHttpStatsCollector implements HttpResponseInterceptor { - - private static final Logger logger = LogManager.getLogger("GcpHttpStats"); - - private final StatsTracker stats; - private final OperationPurpose purpose; - private final Pattern getObjPattern; - private final Pattern insertObjPattern; - private final Pattern listObjPattern; - - GoogleCloudStorageHttpStatsCollector(final GoogleCloudStorageOperationsStats stats, OperationPurpose purpose) { - this.stats = stats.tracker(); - this.purpose = purpose; - var bucket = stats.bucketName(); - - // The specification for the current API (v1) endpoints can be found at: - // https://cloud.google.com/storage/docs/json_api/v1 - this.getObjPattern = Pattern.compile("(/download)?/storage/v1/b/" + bucket + "/o/.+"); - this.insertObjPattern = Pattern.compile("(/upload)?/storage/v1/b/" + bucket + "/o"); - this.listObjPattern = Pattern.compile("/storage/v1/b/" + bucket + "/o"); - } - - private void trackRequest(Operation operation) { - stats.trackRequest(purpose, operation); - } - - private void trackRequestAndOperation(Operation operation) { - stats.trackOperationAndRequest(purpose, operation); - } - - @Override - public void interceptResponse(final HttpResponse response) { - var respCode = response.getStatusCode(); - // Some of the intermediate and error codes are still counted as "good" requests - if (((respCode >= 200 && respCode < 300) || respCode == 308 || respCode == 404) == false) { - return; - } - var request = response.getRequest(); - - var path = request.getUrl().getRawPath(); - var ignored = false; - switch (request.getRequestMethod()) { - case "GET" -> { - // https://cloud.google.com/storage/docs/json_api/v1/objects/get - if (getObjPattern.matcher(path).matches()) { - trackRequestAndOperation(Operation.GET_OBJECT); - } else if (listObjPattern.matcher(path).matches()) { - trackRequestAndOperation(Operation.LIST_OBJECTS); - } else { - ignored = true; - } - } - case "POST", "PUT" -> { - // https://cloud.google.com/storage/docs/json_api/v1/objects/insert - if (insertObjPattern.matcher(path).matches()) { - trackRequest(Operation.INSERT_OBJECT); - } else { - ignored = true; - } - } - default -> ignored = true; - } - if (ignored) { - logger.debug( - "not handling request:{} {} response:{} {}", - request.getRequestMethod(), - path, - response.getStatusCode(), - response.getStatusMessage() - ); - } - } -} diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java deleted file mode 100644 index 06878a17a97b6..0000000000000 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.repositories.gcs; - -import org.elasticsearch.common.blobstore.BlobStoreActionStats; -import org.elasticsearch.common.blobstore.OperationPurpose; - -import java.util.EnumMap; -import java.util.Map; -import java.util.concurrent.atomic.LongAdder; -import java.util.stream.Collectors; - -final class GoogleCloudStorageOperationsStats { - - private final String bucketName; - private final StatsTracker tracker; - - GoogleCloudStorageOperationsStats(String bucketName, boolean isStateless) { - this.bucketName = bucketName; - if (isStateless) { - this.tracker = new ServerlessTracker(bucketName); - } else { - this.tracker = new StatefulTracker(); - } - } - - GoogleCloudStorageOperationsStats(String bucketName) { - this(bucketName, false); - } - - public String bucketName() { - return bucketName; - } - - public StatsTracker tracker() { - return tracker; - } - - public enum Operation { - GET_OBJECT("GetObject"), - LIST_OBJECTS("ListObjects"), - INSERT_OBJECT("InsertObject"); - - private final String key; - - Operation(String key) { - this.key = key; - } - - public String key() { - return key; - } - } - - sealed interface StatsTracker permits ServerlessTracker, StatefulTracker { - void trackRequest(OperationPurpose purpose, Operation operation); - - void trackOperation(OperationPurpose purpose, Operation operation); - - Map toMap(); - - default void trackOperationAndRequest(OperationPurpose purpose, Operation operation) { - trackOperation(purpose, operation); - trackRequest(purpose, operation); - } - } - - /** - * Stateful tracker is single dimension: Operation only. The OperationPurpose is ignored. - */ - static final class StatefulTracker implements StatsTracker { - - private final EnumMap counters; - - StatefulTracker() { - this.counters = new EnumMap<>(Operation.class); - for (var operation : Operation.values()) { - counters.put(operation, new Counters(operation.key())); - } - } - - @Override - public void trackRequest(OperationPurpose purpose, Operation operation) { - // dont track requests, only operations - } - - @Override - public void trackOperation(OperationPurpose purpose, Operation operation) { - counters.get(operation).operations().add(1); - } - - @Override - public Map toMap() { - return counters.values().stream().collect(Collectors.toUnmodifiableMap(Counters::name, (c) -> { - var ops = c.operations().sum(); - return new BlobStoreActionStats(ops, ops); - })); - } - - } - - /** - * Serverless tracker is 2-dimensional: OperationPurpose and Operations. - * Every combination of these has own set of counters: number of operations and number of http requests. - * A single operation might have multiple HTTP requests, for example a single ResumableUpload operation - * can perform a series of HTTP requests with size up to {@link GoogleCloudStorageBlobStore#SDK_DEFAULT_CHUNK_SIZE}. - *
-     * {@code
-     * | Purpose      | Operation       | OperationsCnt | RequestCnt |
-     * |--------------+-----------------+---------------+------------|
-     * | SnapshotData | GetObject       |             1 |          1 |
-     * | SnapshotData | ListObjects     |             2 |          2 |
-     * | SnapshotData | ResumableUpload |             1 |         10 |
-     * | SnapshotData | ...             |               |            |
-     * | Translog     | GetObject       |             5 |          5 |
-     * | ...          |                 |               |            |
-     * }
-     * 
- */ - static final class ServerlessTracker implements StatsTracker { - private final EnumMap> counters; - - ServerlessTracker(String bucketName) { - this.counters = new EnumMap<>(OperationPurpose.class); - for (var purpose : OperationPurpose.values()) { - var operations = new EnumMap(Operation.class); - for (var operation : Operation.values()) { - operations.put(operation, new Counters(purpose.getKey() + "_" + operation.key())); - } - counters.put(purpose, operations); - } - } - - @Override - public void trackOperation(OperationPurpose purpose, Operation operation) { - counters.get(purpose).get(operation).operations.add(1); - } - - @Override - public void trackRequest(OperationPurpose purpose, Operation operation) { - counters.get(purpose).get(operation).requests.add(1); - } - - @Override - public void trackOperationAndRequest(OperationPurpose purpose, Operation operation) { - var c = counters.get(purpose).get(operation); - c.requests.add(1); - c.operations.add(1); - } - - @Override - public Map toMap() { - return counters.values() - .stream() - .flatMap(ops -> ops.values().stream()) - .collect( - Collectors.toUnmodifiableMap(Counters::name, (c) -> new BlobStoreActionStats(c.operations.sum(), c.requests.sum())) - ); - } - - } - - private record Counters(String name, LongAdder operations, LongAdder requests) { - Counters(String name) { - this(name, new LongAdder(), new LongAdder()); - } - } -} diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java index 81ebe26b1e058..3f7d3ae4825ff 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java @@ -9,6 +9,7 @@ package org.elasticsearch.repositories.gcs; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -34,14 +35,15 @@ public class GoogleCloudStoragePlugin extends Plugin implements RepositoryPlugin @SuppressWarnings("this-escape") public GoogleCloudStoragePlugin(final Settings settings) { - this.storageService = createStorageService(settings); + var isServerless = DiscoveryNode.isStateless(settings); + this.storageService = createStorageService(isServerless); // eagerly load client settings so that secure settings are readable (not closed) reload(settings); } // overridable for tests - protected GoogleCloudStorageService createStorageService(Settings settings) { - return new GoogleCloudStorageService(settings); + protected GoogleCloudStorageService createStorageService(boolean isServerless) { + return new GoogleCloudStorageService(isServerless); } @Override @@ -61,7 +63,8 @@ public Map getRepositories( this.storageService, clusterService, bigArrays, - recoverySettings + recoverySettings, + new GcsRepositoryStatsCollector(clusterService.threadPool(), metadata, repositoriesMetrics) ) ); } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java index 16233d3b391d7..8eff8be0762e3 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java @@ -85,6 +85,7 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository { private final TimeValue retryThrottledCasDelayIncrement; private final int retryThrottledCasMaxNumberOfRetries; private final TimeValue retryThrottledCasMaxDelay; + private final GcsRepositoryStatsCollector statsCollector; GoogleCloudStorageRepository( final RepositoryMetadata metadata, @@ -92,7 +93,8 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository { final GoogleCloudStorageService storageService, final ClusterService clusterService, final BigArrays bigArrays, - final RecoverySettings recoverySettings + final RecoverySettings recoverySettings, + final GcsRepositoryStatsCollector statsCollector ) { super( metadata, @@ -104,13 +106,13 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository { buildLocation(metadata) ); this.storageService = storageService; - this.chunkSize = getSetting(CHUNK_SIZE, metadata); this.bucket = getSetting(BUCKET, metadata); this.clientName = CLIENT_NAME.get(metadata.settings()); this.retryThrottledCasDelayIncrement = RETRY_THROTTLED_CAS_DELAY_INCREMENT.get(metadata.settings()); this.retryThrottledCasMaxNumberOfRetries = RETRY_THROTTLED_CAS_MAX_NUMBER_OF_RETRIES.get(metadata.settings()); this.retryThrottledCasMaxDelay = RETRY_THROTTLED_CAS_MAXIMUM_DELAY.get(metadata.settings()); + this.statsCollector = statsCollector; logger.debug("using bucket [{}], base_path [{}], chunk_size [{}], compress [{}]", bucket, basePath(), chunkSize, isCompress()); } @@ -140,7 +142,8 @@ protected GoogleCloudStorageBlobStore createBlobStore() { storageService, bigArrays, bufferSize, - BackoffPolicy.linearBackoff(retryThrottledCasDelayIncrement, retryThrottledCasMaxNumberOfRetries, retryThrottledCasMaxDelay) + BackoffPolicy.linearBackoff(retryThrottledCasDelayIncrement, retryThrottledCasMaxNumberOfRetries, retryThrottledCasMaxDelay), + statsCollector ); } @@ -149,6 +152,10 @@ protected ByteSizeValue chunkSize() { return chunkSize; } + GcsRepositoryStatsCollector statsCollector() { + return statsCollector; + } + /** * Get a given setting from the repository settings, throwing a {@link RepositoryException} if the setting does not exist or is empty. */ diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java index 9777c542bc605..1f96f54c09ba5 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java @@ -9,33 +9,24 @@ package org.elasticsearch.repositories.gcs; import com.google.api.client.http.HttpResponse; -import com.google.api.services.storage.Storage.Objects.Get; import com.google.cloud.BaseService; import com.google.cloud.RetryHelper; import com.google.cloud.storage.BlobId; -import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageException; -import com.google.cloud.storage.spi.v1.HttpStorageRpc; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.SpecialPermission; +import org.elasticsearch.common.blobstore.OperationPurpose; import org.elasticsearch.core.IOUtils; -import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException; import org.elasticsearch.rest.RestStatus; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; -import java.lang.reflect.Field; import java.nio.file.NoSuchFileException; -import java.security.AccessController; -import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.List; -import java.util.function.Supplier; -import java.util.stream.Stream; import static org.elasticsearch.core.Strings.format; @@ -50,94 +41,56 @@ class GoogleCloudStorageRetryingInputStream extends InputStream { static final int MAX_SUPPRESSED_EXCEPTIONS = 10; - private final Storage client; - private final com.google.api.services.storage.Storage storage; - + private final OperationPurpose purpose; + private final MeteredStorage client; private final BlobId blobId; - private final long start; private final long end; - private final int maxAttempts; - private InputStream currentStream; private int attempt = 1; private List failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS); private long currentOffset; private boolean closed; - GoogleCloudStorageRetryingInputStream(Storage client, BlobId blobId) throws IOException { - this(client, blobId, 0, Long.MAX_VALUE - 1); - } - - // both start and end are inclusive bounds, following the definition in https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 - GoogleCloudStorageRetryingInputStream(Storage client, BlobId blobId, long start, long end) throws IOException { - this(client, () -> getStorage(client), blobId, start, end); - } - // Used for testing only - GoogleCloudStorageRetryingInputStream( - com.google.cloud.storage.Storage client, - Supplier storage, - BlobId blobId - ) throws IOException { - this(client, storage, blobId, 0, Long.MAX_VALUE - 1); + GoogleCloudStorageRetryingInputStream(OperationPurpose purpose, MeteredStorage client, BlobId blobId) throws IOException { + this(purpose, client, blobId, 0, Long.MAX_VALUE - 1); } // Used for testing only - GoogleCloudStorageRetryingInputStream( - com.google.cloud.storage.Storage client, - Supplier storage, - BlobId blobId, - long start, - long end - ) throws IOException { + GoogleCloudStorageRetryingInputStream(OperationPurpose purpose, MeteredStorage client, BlobId blobId, long start, long end) + throws IOException { if (start < 0L) { throw new IllegalArgumentException("start must be non-negative"); } if (end < start || end == Long.MAX_VALUE) { throw new IllegalArgumentException("end must be >= start and not Long.MAX_VALUE"); } + this.purpose = purpose; this.client = client; this.blobId = blobId; this.start = start; this.end = end; this.maxAttempts = client.getOptions().getRetrySettings().getMaxAttempts(); - SpecialPermission.check(); - this.storage = storage.get(); // to bypass static init for unit testing this.currentStream = openStream(); } - @SuppressForbidden(reason = "need access to storage client") - private static com.google.api.services.storage.Storage getStorage(Storage client) { - return AccessController.doPrivileged((PrivilegedAction) () -> { - assert client.getOptions().getRpc() instanceof HttpStorageRpc; - assert Stream.of(client.getOptions().getRpc().getClass().getDeclaredFields()).anyMatch(f -> f.getName().equals("storage")); - try { - final Field storageField = client.getOptions().getRpc().getClass().getDeclaredField("storage"); - storageField.setAccessible(true); - return (com.google.api.services.storage.Storage) storageField.get(client.getOptions().getRpc()); - } catch (Exception e) { - throw new IllegalStateException("storage could not be set up", e); - } - }); - } - private InputStream openStream() throws IOException { try { try { return RetryHelper.runWithRetries(() -> { try { return SocketAccess.doPrivilegedIOException(() -> { - final Get get = storage.objects().get(blobId.getBucket(), blobId.getName()); - get.setReturnRawInputStream(true); + final var meteredGet = client.meteredObjectsGet(purpose, blobId.getBucket(), blobId.getName()); + meteredGet.setReturnRawInputStream(true); if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) { - if (get.getRequestHeaders() != null) { - get.getRequestHeaders().setRange("bytes=" + Math.addExact(start, currentOffset) + "-" + end); + if (meteredGet.getRequestHeaders() != null) { + meteredGet.getRequestHeaders().setRange("bytes=" + Math.addExact(start, currentOffset) + "-" + end); } } - final HttpResponse resp = get.executeMedia(); + final HttpResponse resp = meteredGet.executeMedia(); final Long contentLength = resp.getHeaders().getContentLength(); InputStream content = resp.getContent(); if (contentLength != null) { diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java index 14c5be8c9ca9f..bb83b767abb4c 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java @@ -19,17 +19,13 @@ import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.ServiceOptions; import com.google.cloud.http.HttpTransportOptions; -import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import com.google.cloud.storage.StorageRetryStrategy; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.blobstore.OperationPurpose; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.SuppressForbidden; @@ -58,21 +54,26 @@ public class GoogleCloudStorageService { private static final Logger logger = LogManager.getLogger(GoogleCloudStorageService.class); private volatile Map clientSettings = emptyMap(); - private final boolean isStateless; - public GoogleCloudStorageService(Settings settings) { - this.isStateless = DiscoveryNode.isStateless(settings); + private final boolean isServerless; + + public GoogleCloudStorageService() { + this.isServerless = false; + } + + public GoogleCloudStorageService(boolean isServerless) { + this.isServerless = isServerless; } - private record ClientKey(OperationPurpose purpose, String repositoryName) {} + public boolean isServerless() { + return isServerless; + } /** * Dictionary of client instances. Client instances are built lazily from the - * latest settings. Clients are cached by a composite OperationPurpose/repositoryName - * key. - * @see ClientKey + * latest settings. Clients are cached by a composite repositoryName key. */ - private volatile Map clientCache = emptyMap(); + private volatile Map clientCache = emptyMap(); /** * Refreshes the client settings and clears the client cache. Subsequent calls to @@ -95,26 +96,19 @@ public synchronized void refreshAndClearCache(Map format("creating GCS client with client_name [%s], endpoint [%s]", clientName, settings.getHost())); - final Storage storage = createClient(settings, stats, operationPurpose); - clientCache = Maps.copyMapWithAddedEntry(clientCache, clientKey, storage); + final MeteredStorage storage = createClient(settings, statsCollector); + clientCache = Maps.copyMapWithAddedEntry(clientCache, repositoryName, storage); return storage; } } - boolean isStateless() { - return isStateless; - } - synchronized void closeRepositoryClients(String repositoryName) { clientCache = clientCache.entrySet() .stream() - .filter(entry -> entry.getKey().repositoryName().equals(repositoryName) == false) + .filter(entry -> entry.getKey().equals(repositoryName) == false) .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); } @@ -153,16 +143,11 @@ synchronized void closeRepositoryClients(String repositoryName) { * Creates a client that can be used to manage Google Cloud Storage objects. The client is thread-safe. * * @param gcsClientSettings client settings to use, including secure settings - * @param stats the stats collector to use by the underlying SDK - * @param operationPurpose the purpose this client will be used for * @return a new client storage instance that can be used to manage objects * (blobs) */ - private Storage createClient( - GoogleCloudStorageClientSettings gcsClientSettings, - GoogleCloudStorageOperationsStats stats, - OperationPurpose operationPurpose - ) throws IOException { + private MeteredStorage createClient(GoogleCloudStorageClientSettings gcsClientSettings, GcsRepositoryStatsCollector statsCollector) + throws IOException { final HttpTransport httpTransport = SocketAccess.doPrivilegedIOException(() -> { final NetHttpTransport.Builder builder = new NetHttpTransport.Builder(); // requires java.lang.RuntimePermission "setFactory" @@ -182,8 +167,6 @@ private Storage createClient( return builder.build(); }); - final GoogleCloudStorageHttpStatsCollector httpStatsCollector = new GoogleCloudStorageHttpStatsCollector(stats, operationPurpose); - final HttpTransportOptions httpTransportOptions = new HttpTransportOptions( HttpTransportOptions.newBuilder() .setConnectTimeout(toTimeout(gcsClientSettings.getConnectTimeout())) @@ -197,13 +180,13 @@ public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions ser return (httpRequest) -> { if (requestInitializer != null) requestInitializer.initialize(httpRequest); - httpRequest.setResponseInterceptor(httpStatsCollector); + httpRequest.setResponseInterceptor(GcsRepositoryStatsCollector.METERING_INTERCEPTOR); }; } }; final StorageOptions storageOptions = createStorageOptions(gcsClientSettings, httpTransportOptions); - return storageOptions.getService(); + return new MeteredStorage(storageOptions.getService(), statsCollector); } StorageOptions createStorageOptions( diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/MeteredStorage.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/MeteredStorage.java new file mode 100644 index 0000000000000..dda4602ecb5e3 --- /dev/null +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/MeteredStorage.java @@ -0,0 +1,333 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.repositories.gcs; + +import com.google.api.client.http.HttpHeaders; +import com.google.api.client.http.HttpResponse; +import com.google.api.gax.paging.Page; +import com.google.cloud.ReadChannel; +import com.google.cloud.RestorableState; +import com.google.cloud.WriteChannel; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageBatch; +import com.google.cloud.storage.StorageOptions; +import com.google.cloud.storage.spi.v1.HttpStorageRpc; + +import org.elasticsearch.SpecialPermission; +import org.elasticsearch.common.blobstore.OperationPurpose; +import org.elasticsearch.core.SuppressForbidden; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Iterator; +import java.util.stream.Stream; + +import static org.elasticsearch.repositories.gcs.StorageOperation.GET; +import static org.elasticsearch.repositories.gcs.StorageOperation.INSERT; +import static org.elasticsearch.repositories.gcs.StorageOperation.LIST; + +/** + * A wrapper for GCP {@link Storage} client. Provides metering and telemetry. + * It's mostly boilerplate code that wraps Storage calls with thread-local metrics. + * And special cases, such as Paginated List calls, WriteChannel, ReadChannel. + */ +public class MeteredStorage { + private final Storage storage; + private final com.google.api.services.storage.Storage storageRpc; + private final GcsRepositoryStatsCollector statsCollector; + + public MeteredStorage(Storage storage, GcsRepositoryStatsCollector statsCollector) { + this.storage = storage; + SpecialPermission.check(); + this.storageRpc = getStorageRpc(storage); + this.statsCollector = statsCollector; + } + + MeteredStorage(Storage storage, com.google.api.services.storage.Storage storageRpc, GcsRepositoryStatsCollector statsCollector) { + this.storage = storage; + this.storageRpc = storageRpc; + this.statsCollector = statsCollector; + } + + @SuppressForbidden(reason = "need access to storage client") + private static com.google.api.services.storage.Storage getStorageRpc(Storage client) { + return AccessController.doPrivileged((PrivilegedAction) () -> { + assert client.getOptions().getRpc() instanceof HttpStorageRpc; + assert Stream.of(client.getOptions().getRpc().getClass().getDeclaredFields()).anyMatch(f -> f.getName().equals("storage")); + try { + final Field storageField = client.getOptions().getRpc().getClass().getDeclaredField("storage"); + storageField.setAccessible(true); + return (com.google.api.services.storage.Storage) storageField.get(client.getOptions().getRpc()); + } catch (Exception e) { + throw new IllegalStateException("storage could not be set up", e); + } + }); + } + + public MeteredBlobPage meteredList(OperationPurpose purpose, String bucket, Storage.BlobListOption... options) throws IOException { + var pages = statsCollector.collectSupplier(purpose, LIST, () -> storage.list(bucket, options)); + return new MeteredBlobPage(statsCollector, purpose, pages); + } + + public Blob meteredGet(OperationPurpose purpose, BlobId blobId) throws IOException { + return statsCollector.collectIOSupplier(purpose, GET, () -> storage.get(blobId)); + } + + public void meteredCreate( + OperationPurpose purpose, + BlobInfo blobInfo, + byte[] buffer, + int offset, + int blobSize, + Storage.BlobTargetOption... targetOptions + ) throws IOException { + statsCollector.collectIOSupplier(purpose, INSERT, () -> storage.create(blobInfo, buffer, offset, blobSize, targetOptions)); + } + + public StorageBatch batch() { + return storage.batch(); + } + + public StorageOptions getOptions() { + return storage.getOptions(); + } + + public MeteredObjectsGetRequest meteredObjectsGet(OperationPurpose purpose, String bucket, String blob) throws IOException { + return new MeteredObjectsGetRequest(statsCollector, purpose, storageRpc.objects().get(bucket, blob)); + } + + public MeteredWriteChannel meteredWriter(OperationPurpose purpose, BlobInfo blobInfo, Storage.BlobWriteOption... writeOptions) + throws IOException { + var initStats = new OperationStats(purpose, INSERT); + return statsCollector.continueWithStats( + initStats, + () -> new MeteredWriteChannel(statsCollector, initStats, storage.writer(blobInfo, writeOptions)) + ); + } + + public MeteredReadChannel meteredReader(OperationPurpose purpose, BlobId blobId, Storage.BlobSourceOption... options) { + return new MeteredReadChannel(purpose, statsCollector, storage.reader(blobId, options)); + } + + /** + * A delegating Objects.Get requests with metrics collection + */ + public static class MeteredObjectsGetRequest { + private final GcsRepositoryStatsCollector statsCollector; + private final OperationPurpose purpose; + private final com.google.api.services.storage.Storage.Objects.Get get; + + MeteredObjectsGetRequest( + GcsRepositoryStatsCollector statsCollector, + OperationPurpose purpose, + com.google.api.services.storage.Storage.Objects.Get get + ) { + this.statsCollector = statsCollector; + this.purpose = purpose; + this.get = get; + } + + public void setReturnRawInputStream(boolean b) { + get.setReturnRawInputStream(b); + } + + public HttpHeaders getRequestHeaders() { + return get.getRequestHeaders(); + } + + public HttpResponse executeMedia() throws IOException { + return statsCollector.collectIOSupplier(purpose, GET, get::executeMedia); + } + } + + /** + * A delegating WriteChannel. Write channel performs at most one operation for all writes. + * This is implication of GCS billing, all insert operations, even resumable, are counted as one, + * despite number of parts. It's different from ReadChannel and BlobPage, where every read/list + * call is counted as separate operation. + */ + @SuppressForbidden(reason = "wraps GCS channel") + public static class MeteredWriteChannel implements WriteChannel { + private final GcsRepositoryStatsCollector statsCollector; + private final WriteChannel writeChannel; + private final OperationStats stats; + + public MeteredWriteChannel(GcsRepositoryStatsCollector statsCollector, OperationStats initStats, WriteChannel readChannel) { + this.statsCollector = statsCollector; + this.writeChannel = readChannel; + this.stats = initStats; + } + + @Override + public void setChunkSize(int chunkSize) { + writeChannel.setChunkSize(chunkSize); + } + + @Override + public RestorableState capture() { + return () -> new MeteredWriteChannel(statsCollector, stats, writeChannel.capture().restore()); + } + + @Override + public int write(ByteBuffer src) throws IOException { + return statsCollector.continueWithStats(stats, () -> writeChannel.write(src)); + } + + @Override + public boolean isOpen() { + return writeChannel.isOpen(); + } + + @Override + public void close() throws IOException { + statsCollector.finishRunnable(stats, writeChannel::close); + } + } + + /** + * A delegating ReadChannel. Each method call is at most one storage operation, or none. + */ + @SuppressForbidden(reason = "wraps GCS channel") + public static class MeteredReadChannel implements ReadChannel { + private final GcsRepositoryStatsCollector statsCollector; + private final ReadChannel readChannel; + private final OperationPurpose purpose; + + MeteredReadChannel(OperationPurpose purpose, GcsRepositoryStatsCollector statsCollector, ReadChannel readChannel) { + this.statsCollector = statsCollector; + this.readChannel = readChannel; + this.purpose = purpose; + } + + @Override + public void close() { + statsCollector.collectRunnable(purpose, GET, readChannel::close); + } + + @Override + public void seek(long position) throws IOException { + statsCollector.collectIORunnable(purpose, GET, () -> readChannel.seek(position)); + } + + @Override + public void setChunkSize(int chunkSize) { + readChannel.setChunkSize(chunkSize); + } + + @Override + public RestorableState capture() { + return () -> new MeteredReadChannel(purpose, statsCollector, readChannel.capture().restore()); + } + + @Override + public ReadChannel limit(long limit) { + readChannel.limit(limit); + return this; + } + + @Override + public long limit() { + return readChannel.limit(); + } + + @Override + public int read(ByteBuffer dst) throws IOException { + return statsCollector.collectIOSupplier(purpose, GET, () -> readChannel.read(dst)); + } + + @Override + public boolean isOpen() { + return readChannel.isOpen(); + } + + } + + /** + * A delegating paginated blob list. Each list operation is at most one storage operation, or none. + */ + public static class MeteredBlobPage implements Page { + private final GcsRepositoryStatsCollector statsCollector; + private final OperationPurpose purpose; + private final Page pages; + + public MeteredBlobPage(GcsRepositoryStatsCollector statsCollector, OperationPurpose purpose, Page pages) { + this.statsCollector = statsCollector; + this.purpose = purpose; + this.pages = pages; + } + + @Override + public boolean hasNextPage() { + return pages.hasNextPage(); + } + + @Override + public String getNextPageToken() { + return pages.getNextPageToken(); + } + + @Override + public MeteredBlobPage getNextPage() { + var nextPage = statsCollector.collectSupplier(purpose, LIST, pages::getNextPage); + if (nextPage != null) { + return new MeteredBlobPage(statsCollector, purpose, nextPage); + } else { + return null; + } + } + + @Override + public MeteredIterableBlob iterateAll() { + return new MeteredIterableBlob(pages.iterateAll()); + } + + @Override + public MeteredIterableBlob getValues() { + return new MeteredIterableBlob(pages.getValues()); + } + + public class MeteredIterator implements Iterator { + final Iterator iterator; + + MeteredIterator(Iterator iterator) { + this.iterator = iterator; + } + + @Override + public boolean hasNext() { + return statsCollector.collectSupplier(purpose, LIST, iterator::hasNext); + } + + @Override + public Blob next() { + return statsCollector.collectSupplier(purpose, LIST, iterator::next); + } + } + + public class MeteredIterableBlob implements Iterable { + final Iterable iterable; + + MeteredIterableBlob(Iterable iterable) { + this.iterable = iterable; + } + + @Override + public Iterator iterator() { + return new MeteredIterator(iterable.iterator()); + } + } + } +} diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/OperationStats.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/OperationStats.java new file mode 100644 index 0000000000000..5d43a003f149f --- /dev/null +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/OperationStats.java @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.repositories.gcs; + +import org.elasticsearch.common.blobstore.OperationPurpose; + +public class OperationStats { + + final OperationPurpose purpose; + final StorageOperation operation; + + /** + * total time taken for the operation + */ + long totalDuration; + + /** + * true if last request is completed successfully + */ + boolean isLastRequestSucceed; + + /** + * request attempts including retires and multi part requests + */ + int requestAttempts; + + /** + * request errors, all unsuccessful request attempts {@code reqErr<=reqAtt} + */ + int requestError; + + /** + * request throttles (429), {@code reqErrThrottle<=reqErr} + */ + int requestThrottle; + + /** + * request range not satisfied error(416), only applicable for GetObject operations, {@code reqErrRange<=reqErr} + */ + int requestRangeError; + + OperationStats(OperationPurpose purpose, StorageOperation operation) { + this.purpose = purpose; + this.operation = operation; + } + + @Override + public String toString() { + return "OperationStats{" + + "purpose=" + + purpose + + ", operation=" + + operation + + ", totalDuration=" + + totalDuration + + ", isLastReqSuccess=" + + isLastRequestSucceed + + ", reqAtt=" + + requestAttempts + + ", reqErr=" + + requestError + + ", reqErrThrottle=" + + requestThrottle + + ", reqErrRange=" + + requestRangeError + + '}'; + } +} diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/StorageOperation.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/StorageOperation.java new file mode 100644 index 0000000000000..0da43c874524d --- /dev/null +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/StorageOperation.java @@ -0,0 +1,26 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.repositories.gcs; + +public enum StorageOperation { + INSERT("InsertObject"), + GET("GetObject"), + LIST("ListObjects"); + + final String key; + + public String key() { + return key; + } + + StorageOperation(String key) { + this.key = key; + } +} diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index 10a62842a766f..1acde0be845d2 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -141,7 +141,7 @@ protected BlobContainer createBlobContainer( secureSettings.setFile(CREDENTIALS_FILE_SETTING.getConcreteSettingForNamespace(client).getKey(), createServiceAccount(random())); clientSettings.setSecureSettings(secureSettings); - final GoogleCloudStorageService service = new GoogleCloudStorageService(Settings.EMPTY) { + final GoogleCloudStorageService service = new GoogleCloudStorageService() { @Override StorageOptions createStorageOptions( final GoogleCloudStorageClientSettings gcsClientSettings, @@ -203,7 +203,8 @@ public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions ser service, BigArrays.NON_RECYCLING_INSTANCE, randomIntBetween(1, 8) * 1024, - BackoffPolicy.linearBackoff(TimeValue.timeValueMillis(1), 3, TimeValue.timeValueSeconds(1)) + BackoffPolicy.linearBackoff(TimeValue.timeValueMillis(1), 3, TimeValue.timeValueSeconds(1)), + new GcsRepositoryStatsCollector() ); return new GoogleCloudStorageBlobContainer(randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo"), blobStore); diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java index 971e072a18f65..01149709724ea 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java @@ -47,16 +47,14 @@ import java.util.Map; import java.util.concurrent.TimeUnit; -import static org.elasticsearch.cluster.node.DiscoveryNode.STATELESS_ENABLED_SETTING_NAME; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.APPLICATION_NAME_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CONNECT_TIMEOUT_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.PROJECT_ID_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING; -import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation; -import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation.GET_OBJECT; -import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation.INSERT_OBJECT; -import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation.LIST_OBJECTS; +import static org.elasticsearch.repositories.gcs.StorageOperation.GET; +import static org.elasticsearch.repositories.gcs.StorageOperation.INSERT; +import static org.elasticsearch.repositories.gcs.StorageOperation.LIST; @SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint") public class GoogleCloudStorageBlobContainerStatsTests extends ESTestCase { @@ -66,11 +64,6 @@ public class GoogleCloudStorageBlobContainerStatsTests extends ESTestCase { private HttpServer httpServer; private ThreadPool threadPool; - // When isServerless is set to true, test suite will collect - // 2-dimensional metrics OperationPurpose/Operation. - // Otherwise only Operations. - private boolean isServerless; - private GoogleCloudStorageService googleCloudStorageService; private GoogleCloudStorageHttpHandler googleCloudStorageHttpHandler; private ContainerAndBlobStore containerAndStore; @@ -111,14 +104,7 @@ public void createStorageService() throws Exception { threadPool = new TestThreadPool(getTestClass().getName()); httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); httpServer.start(); - isServerless = randomBoolean(); - Settings settings; - if (isServerless) { - settings = Settings.builder().put(STATELESS_ENABLED_SETTING_NAME, true).build(); - } else { - settings = Settings.EMPTY; - } - googleCloudStorageService = new GoogleCloudStorageService(settings); + googleCloudStorageService = new GoogleCloudStorageService(); googleCloudStorageHttpHandler = new GoogleCloudStorageHttpHandler(BUCKET); httpServer.createContext("/", googleCloudStorageHttpHandler); httpServer.createContext("/token", new FakeOAuth2HttpHandler()); @@ -144,11 +130,11 @@ public void testSingleMultipartWrite() throws Exception { container.writeBlob(purpose, blobName, blobContents, true); final StatsMap wantStats = new StatsMap(purpose); - assertStatsEquals(wantStats.add(INSERT_OBJECT, 1, 1), store.stats()); + assertStatsEquals(wantStats.add(INSERT, 1), store.stats()); try (InputStream is = container.readBlob(purpose, blobName)) { assertEquals(blobContents, Streams.readFully(is)); } - assertStatsEquals(wantStats.add(GET_OBJECT, 1, 1), store.stats()); + assertStatsEquals(wantStats.add(GET, 1), store.stats()); } @Test @@ -169,12 +155,12 @@ public void testResumableWrite() throws Exception { // the +1 means a POST request with metadata without PAYLOAD final int totalRequests = parts + 1; final StatsMap wantStats = new StatsMap(purpose); - assertStatsEquals(wantStats.add(INSERT_OBJECT, 1, totalRequests), store.stats()); + assertStatsEquals(wantStats.add(INSERT, 1, totalRequests), store.stats()); try (InputStream is = container.readBlob(purpose, blobName)) { assertEquals(blobContents, Streams.readFully(is)); } - assertStatsEquals(wantStats.add(GET_OBJECT, 1, 1), store.stats()); + assertStatsEquals(wantStats.add(GET, 1), store.stats()); } @Test @@ -190,11 +176,11 @@ public void testDeleteDirectory() throws Exception { container.writeBlob(purpose, String.format("%s/file_%d", directoryName, i), contents, true); } final StatsMap wantStats = new StatsMap(purpose); - assertStatsEquals(wantStats.add(INSERT_OBJECT, numberOfFiles, numberOfFiles), store.stats()); + assertStatsEquals(wantStats.add(INSERT, numberOfFiles), store.stats()); container.delete(purpose); // We only count the list because we can't track the bulk delete - assertStatsEquals(wantStats.add(LIST_OBJECTS, 1, 1), store.stats()); + assertStatsEquals(wantStats.add(LIST, 1), store.stats()); } @Test @@ -212,13 +198,13 @@ public void testListBlobsAccountsForPaging() throws Exception { container.writeBlob(purpose, String.format("file_%d", i), contents, true); } final StatsMap wantStats = new StatsMap(purpose); - assertStatsEquals(wantStats.add(INSERT_OBJECT, numberOfObjects, numberOfObjects), store.stats()); + assertStatsEquals(wantStats.add(INSERT, numberOfObjects), store.stats()); final Map stringBlobMetadataMap = container.listBlobs(purpose); assertEquals(numberOfObjects, stringBlobMetadataMap.size()); // There should be {numberOfPages} pages of blobs - assertStatsEquals(wantStats.add(LIST_OBJECTS, numberOfPages, numberOfPages), store.stats()); + assertStatsEquals(wantStats.add(LIST, numberOfPages), store.stats()); } public void testCompareAndSetRegister() { @@ -231,12 +217,12 @@ public void testCompareAndSetRegister() { final OperationPurpose purpose = randomPurpose(); assertTrue(safeAwait(l -> container.compareAndSetRegister(purpose, registerName, BytesArray.EMPTY, contents, l))); final StatsMap wantStat = new StatsMap(purpose); - assertStatsEquals(wantStat.add(GET_OBJECT, 1, 1).add(INSERT_OBJECT, 1, 1), store.stats()); + assertStatsEquals(wantStat.add(GET, 1).add(INSERT, 1), store.stats()); // successful update from non-null (adds two gets, one insert) final BytesArray nextContents = new BytesArray(randomByteArrayOfLength(BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH)); assertTrue(safeAwait(l -> container.compareAndSetRegister(purpose, registerName, contents, nextContents, l))); - assertStatsEquals(wantStat.add(GET_OBJECT, 2, 2).add(INSERT_OBJECT, 1, 1), store.stats()); + assertStatsEquals(wantStat.add(GET, 2).add(INSERT, 1), store.stats()); // failed update (adds two gets, zero inserts) final BytesArray wrongContents = randomValueOtherThan( @@ -244,7 +230,7 @@ public void testCompareAndSetRegister() { () -> new BytesArray(randomByteArrayOfLength(BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH)) ); assertFalse(safeAwait(l -> container.compareAndSetRegister(purpose, registerName, wrongContents, contents, l))); - assertStatsEquals(wantStat.add(GET_OBJECT, 2, 2), store.stats()); + assertStatsEquals(wantStat.add(GET, 2), store.stats()); } private ContainerAndBlobStore createBlobContainer(final String repositoryName) throws Exception { @@ -271,7 +257,8 @@ private ContainerAndBlobStore createBlobContainer(final String repositoryName) t googleCloudStorageService, BigArrays.NON_RECYCLING_INSTANCE, Math.toIntExact(BUFFER_SIZE.getBytes()), - BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(10), 10) + BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(10), 10), + new GcsRepositoryStatsCollector() ); final GoogleCloudStorageBlobContainer googleCloudStorageBlobContainer = new GoogleCloudStorageBlobContainer( BlobPath.EMPTY, @@ -300,33 +287,25 @@ class StatsMap extends HashMap { StatsMap(OperationPurpose purpose) { this.purpose = purpose; - if (isServerless) { - for (var p : OperationPurpose.values()) { - for (var o : Operation.values()) { - put(p.getKey() + "_" + o.key(), new BlobStoreActionStats(0, 0)); - } - } - } else { - for (var o : Operation.values()) { - put(o.key(), new BlobStoreActionStats(0, 0)); - } + for (var o : StorageOperation.values()) { + put(o.key(), new BlobStoreActionStats(0, 0)); } } - StatsMap add(Operation operation, long ops, long reqs) { - var key = isServerless ? purpose.getKey() + "_" + operation.key() : operation.key(); - compute(key, (k, v) -> { - BlobStoreActionStats stats; + StatsMap add(StorageOperation operation, long ops) { + compute(operation.key(), (k, v) -> { assert v != null; - if (isServerless) { - stats = new BlobStoreActionStats(v.operations() + ops, v.requests() + reqs); - } else { - stats = new BlobStoreActionStats(v.operations() + ops, v.operations() + ops); - } - return stats; + return new BlobStoreActionStats(v.operations() + ops, v.requests() + ops); }); return this; } + StatsMap add(StorageOperation operation, long ops, long reqs) { + compute(operation.key(), (k, v) -> { + assert v != null; + return new BlobStoreActionStats(v.operations() + ops, v.requests() + reqs); + }); + return this; + } } } diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java index c3693d70c8469..881f4e05667c5 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java @@ -21,7 +21,6 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; -import org.elasticsearch.common.blobstore.OperationPurpose; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.test.ESTestCase; @@ -78,16 +77,13 @@ public void testDeleteBlobsIgnoringIfNotExistsThrowsIOException() throws Excepti final Storage storage = mock(Storage.class); when(storage.get("bucket")).thenReturn(mock(Bucket.class)); when(storage.batch()).thenReturn(batch); + final com.google.api.services.storage.Storage storageRpc = mock(com.google.api.services.storage.Storage.class); + final MeteredStorage meteredStorage = new MeteredStorage(storage, storageRpc, new GcsRepositoryStatsCollector()); final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class); - when( - storageService.client( - any(String.class), - any(String.class), - any(OperationPurpose.class), - any(GoogleCloudStorageOperationsStats.class) - ) - ).thenReturn(storage); + when(storageService.client(any(String.class), any(String.class), any(GcsRepositoryStatsCollector.class))).thenReturn( + meteredStorage + ); try ( BlobStore store = new GoogleCloudStorageBlobStore( @@ -97,7 +93,8 @@ public void testDeleteBlobsIgnoringIfNotExistsThrowsIOException() throws Excepti storageService, BigArrays.NON_RECYCLING_INSTANCE, randomIntBetween(1, 8) * 1024, - BackoffPolicy.noBackoff() + BackoffPolicy.noBackoff(), + new GcsRepositoryStatsCollector() ) ) { final BlobContainer container = store.blobContainer(BlobPath.EMPTY); diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStreamTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStreamTests.java index 2f632a32261ed..91ea4930c7421 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStreamTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStreamTests.java @@ -23,6 +23,7 @@ import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; +import org.elasticsearch.common.blobstore.OperationPurpose; import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; @@ -47,26 +48,28 @@ public class GoogleCloudStorageRetryingInputStreamTests extends ESTestCase { private final BlobId blobId = BlobId.of(BUCKET_NAME, BLOB_NAME); - private com.google.api.services.storage.Storage storage; - private com.google.cloud.storage.Storage client; + private com.google.api.services.storage.Storage storageRpc; + private com.google.cloud.storage.Storage storage; private com.google.api.services.storage.Storage.Objects.Get get; + private MeteredStorage meteredStorage; @Before public void init() throws IOException { - storage = mock(com.google.api.services.storage.Storage.class); + storageRpc = mock(com.google.api.services.storage.Storage.class); com.google.api.services.storage.Storage.Objects objects = mock(com.google.api.services.storage.Storage.Objects.class); - when(storage.objects()).thenReturn(objects); + when(storageRpc.objects()).thenReturn(objects); get = mock(com.google.api.services.storage.Storage.Objects.Get.class); when(objects.get(BUCKET_NAME, BLOB_NAME)).thenReturn(get); - client = mock(com.google.cloud.storage.Storage.class); - when(client.getOptions()).thenReturn( + storage = mock(com.google.cloud.storage.Storage.class); + when(storage.getOptions()).thenReturn( StorageOptions.newBuilder() .setProjectId("ignore") .setRetrySettings(RetrySettings.newBuilder().setMaxAttempts(randomIntBetween(1, 3)).build()) .build() ); + meteredStorage = new MeteredStorage(storage, storageRpc, new GcsRepositoryStatsCollector()); } public void testReadWithinBlobLength() throws IOException { @@ -129,7 +132,7 @@ private GoogleCloudStorageRetryingInputStream createRetryingInputStream(byte[] d HttpResponse httpResponse = httpRequest.execute(); when(get.executeMedia()).thenReturn(httpResponse); - return new GoogleCloudStorageRetryingInputStream(client, () -> storage, blobId); + return new GoogleCloudStorageRetryingInputStream(OperationPurpose.SNAPSHOT_DATA, meteredStorage, blobId); } private GoogleCloudStorageRetryingInputStream createRetryingInputStream(byte[] data, int position, int length) throws IOException { @@ -145,7 +148,13 @@ private GoogleCloudStorageRetryingInputStream createRetryingInputStream(byte[] d when(get.executeMedia()).thenReturn(httpResponse); } - return new GoogleCloudStorageRetryingInputStream(client, () -> storage, blobId, position, position + length - 1); + return new GoogleCloudStorageRetryingInputStream( + OperationPurpose.SNAPSHOT_DATA, + meteredStorage, + blobId, + position, + position + length - 1 + ); } private static HttpTransport getMockHttpTransport(byte[] data, int position, int length) { diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java index 904deb710f24f..b888a9e97f76f 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java @@ -10,7 +10,6 @@ package org.elasticsearch.repositories.gcs; import com.google.cloud.http.HttpTransportOptions; -import com.google.cloud.storage.Storage; import org.apache.http.HttpRequest; import org.apache.http.HttpResponse; @@ -18,13 +17,11 @@ import org.apache.http.entity.StringEntity; import org.apache.http.protocol.HttpContext; import org.apache.lucene.util.SetOnce; -import org.elasticsearch.common.blobstore.OperationPurpose; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.XContentBuilder; import org.hamcrest.Matchers; @@ -38,7 +35,6 @@ import java.util.Locale; import java.util.UUID; -import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -75,23 +71,23 @@ public void testClientInitializer() throws Exception { .put(GoogleCloudStorageClientSettings.PROXY_PORT_SETTING.getConcreteSettingForNamespace(clientName).getKey(), 8080) .build(); SetOnce proxy = new SetOnce<>(); - final GoogleCloudStorageService service = new GoogleCloudStorageService(Settings.EMPTY) { + final GoogleCloudStorageService service = new GoogleCloudStorageService() { @Override void notifyProxyIsSet(Proxy p) { proxy.set(p); } }; service.refreshAndClearCache(GoogleCloudStorageClientSettings.load(settings)); - GoogleCloudStorageOperationsStats statsCollector = new GoogleCloudStorageOperationsStats("bucket"); + var statsCollector = new GcsRepositoryStatsCollector(); final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> service.client("another_client", "repo", randomPurpose(), statsCollector) + () -> service.client("another_client", "repo", statsCollector) ); assertThat(e.getMessage(), Matchers.startsWith("Unknown client name")); assertSettingDeprecationsAndWarnings( new Setting[] { GoogleCloudStorageClientSettings.APPLICATION_NAME_SETTING.getConcreteSettingForNamespace(clientName) } ); - final Storage storage = service.client(clientName, "repo", randomPurpose(), statsCollector); + final var storage = service.client(clientName, "repo", statsCollector); assertThat(storage.getOptions().getApplicationName(), Matchers.containsString(applicationName)); assertThat(storage.getOptions().getHost(), Matchers.is(endpoint)); assertThat(storage.getOptions().getProjectId(), Matchers.is(projectIdName)); @@ -118,15 +114,15 @@ public void testReinitClientSettings() throws Exception { final Settings settings2 = Settings.builder().setSecureSettings(secureSettings2).build(); try (GoogleCloudStoragePlugin plugin = new GoogleCloudStoragePlugin(settings1)) { final GoogleCloudStorageService storageService = plugin.storageService; - GoogleCloudStorageOperationsStats statsCollector = new GoogleCloudStorageOperationsStats("bucket"); - final Storage client11 = storageService.client("gcs1", "repo1", randomPurpose(), statsCollector); + var statsCollector = new GcsRepositoryStatsCollector(); + final var client11 = storageService.client("gcs1", "repo1", statsCollector); assertThat(client11.getOptions().getProjectId(), equalTo("project_gcs11")); - final Storage client12 = storageService.client("gcs2", "repo2", randomPurpose(), statsCollector); + final var client12 = storageService.client("gcs2", "repo2", statsCollector); assertThat(client12.getOptions().getProjectId(), equalTo("project_gcs12")); // client 3 is missing final IllegalArgumentException e1 = expectThrows( IllegalArgumentException.class, - () -> storageService.client("gcs3", "repo3", randomPurpose(), statsCollector) + () -> storageService.client("gcs3", "repo3", statsCollector) ); assertThat(e1.getMessage(), containsString("Unknown client name [gcs3].")); // update client settings @@ -134,18 +130,18 @@ public void testReinitClientSettings() throws Exception { // old client 1 not changed assertThat(client11.getOptions().getProjectId(), equalTo("project_gcs11")); // new client 1 is changed - final Storage client21 = storageService.client("gcs1", "repo1", randomPurpose(), statsCollector); + final var client21 = storageService.client("gcs1", "repo1", statsCollector); assertThat(client21.getOptions().getProjectId(), equalTo("project_gcs21")); // old client 2 not changed assertThat(client12.getOptions().getProjectId(), equalTo("project_gcs12")); // new client2 is gone final IllegalArgumentException e2 = expectThrows( IllegalArgumentException.class, - () -> storageService.client("gcs2", "repo2", randomPurpose(), statsCollector) + () -> storageService.client("gcs2", "repo2", statsCollector) ); assertThat(e2.getMessage(), containsString("Unknown client name [gcs2].")); // client 3 emerged - final Storage client23 = storageService.client("gcs3", "repo3", randomPurpose(), statsCollector); + final var client23 = storageService.client("gcs3", "repo3", statsCollector); assertThat(client23.getOptions().getProjectId(), equalTo("project_gcs23")); } } @@ -157,35 +153,11 @@ public void testClientsAreNotSharedAcrossRepositories() throws Exception { try (GoogleCloudStoragePlugin plugin = new GoogleCloudStoragePlugin(settings)) { final GoogleCloudStorageService storageService = plugin.storageService; - final OperationPurpose operationPurpose = randomPurpose(); - final OperationPurpose differentOperationPurpose = randomValueOtherThan(operationPurpose, BlobStoreTestUtil::randomPurpose); - final Storage repo1Client = storageService.client( - "gcs1", - "repo1", - operationPurpose, - new GoogleCloudStorageOperationsStats("bucket") - ); - final Storage repo1ClientOtherPurpose = storageService.client( - "gcs1", - "repo1", - differentOperationPurpose, - new GoogleCloudStorageOperationsStats("bucket") - ); - final Storage repo2Client = storageService.client( - "gcs1", - "repo2", - operationPurpose, - new GoogleCloudStorageOperationsStats("bucket") - ); - final Storage repo1ClientSecondInstance = storageService.client( - "gcs1", - "repo1", - operationPurpose, - new GoogleCloudStorageOperationsStats("bucket") - ); + final MeteredStorage repo1Client = storageService.client("gcs1", "repo1", new GcsRepositoryStatsCollector()); + final MeteredStorage repo2Client = storageService.client("gcs1", "repo2", new GcsRepositoryStatsCollector()); + final MeteredStorage repo1ClientSecondInstance = storageService.client("gcs1", "repo1", new GcsRepositoryStatsCollector()); assertNotSame(repo1Client, repo2Client); - assertNotSame(repo1Client, repo1ClientOtherPurpose); assertSame(repo1Client, repo1ClientSecondInstance); } } diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesMetrics.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesMetrics.java index a4185a12d7c3b..9c7511279adf5 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesMetrics.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesMetrics.java @@ -122,21 +122,8 @@ public RepositoriesMetrics(MeterRegistry meterRegistry) { /** * Create the map of attributes we expect to see on repository metrics */ - public static Map createAttributesMap( - RepositoryMetadata repositoryMetadata, - OperationPurpose purpose, - String operation - ) { - return Map.of( - "repo_type", - repositoryMetadata.type(), - "repo_name", - repositoryMetadata.name(), - "operation", - operation, - "purpose", - purpose.getKey() - ); + public static Map createAttributesMap(RepositoryMetadata meta, OperationPurpose purpose, String operation) { + return Map.of("repo_type", meta.type(), "repo_name", meta.name(), "operation", operation, "purpose", purpose.getKey()); } }