diff --git a/docs/changelog/122991.yaml b/docs/changelog/122991.yaml new file mode 100644 index 0000000000000..e038bca0a86bc --- /dev/null +++ b/docs/changelog/122991.yaml @@ -0,0 +1,5 @@ +pr: 122991 +summary: "GCS blob store: add `OperationPurpose/Operation` stats counters" +area: Snapshot/Restore +type: enhancement +issues: [] diff --git a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index 1adc380216529..9ef5289b7accb 100644 --- a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -63,6 +63,7 @@ import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BASE_PATH; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BUCKET; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.CLIENT_NAME; @@ -212,8 +213,8 @@ public TestGoogleCloudStoragePlugin(Settings settings) { } @Override - protected GoogleCloudStorageService createStorageService() { - return new GoogleCloudStorageService() { + protected GoogleCloudStorageService createStorageService(Settings settings) { + return new GoogleCloudStorageService(settings) { @Override StorageOptions createStorageOptions( final GoogleCloudStorageClientSettings gcsClientSettings, @@ -346,19 +347,17 @@ private static class GoogleCloudStorageStatsCollectorHttpHandler extends HttpSta @Override public void maybeTrack(final String request, Headers requestHeaders) { - if (Regex.simpleMatch("GET /storage/v1/b/*/o/*", request)) { - trackRequest("GetObject"); + if (Regex.simpleMatch("GET */storage/v1/b/*/o/*", request)) { + trackRequest(Operation.GET_OBJECT.key()); } else if (Regex.simpleMatch("GET /storage/v1/b/*/o*", request)) { - trackRequest("ListObjects"); - } else if (Regex.simpleMatch("GET /download/storage/v1/b/*", request)) { - trackRequest("GetObject"); + trackRequest(Operation.LIST_OBJECTS.key()); } else if (Regex.simpleMatch("PUT /upload/storage/v1/b/*uploadType=resumable*", request) && isLastPart(requestHeaders)) { // Resumable uploads are billed as a single operation, that's the reason we're tracking // the request only when it's the last part. // See https://cloud.google.com/storage/docs/resumable-uploads#introduction - trackRequest("InsertObject"); + trackRequest(Operation.INSERT_OBJECT.key()); } else if (Regex.simpleMatch("POST /upload/storage/v1/b/*uploadType=multipart*", request)) { - trackRequest("InsertObject"); + trackRequest(Operation.INSERT_OBJECT.key()); } } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index e09c601050a8e..7c2b1d55ad732 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -44,6 +44,7 @@ import org.elasticsearch.core.Streams; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation; import org.elasticsearch.rest.RestStatus; import java.io.ByteArrayInputStream; @@ -73,6 +74,11 @@ class GoogleCloudStorageBlobStore implements BlobStore { + /** + * see com.google.cloud.BaseWriteChannel#DEFAULT_CHUNK_SIZE + */ + static final int SDK_DEFAULT_CHUNK_SIZE = 60 * 256 * 1024; + private static final Logger logger = LogManager.getLogger(GoogleCloudStorageBlobStore.class); // The recommended maximum size of a blob that should be uploaded in a single @@ -124,7 +130,7 @@ class GoogleCloudStorageBlobStore implements BlobStore { this.repositoryName = repositoryName; this.storageService = storageService; this.bigArrays = bigArrays; - this.stats = new GoogleCloudStorageOperationsStats(bucketName); + this.stats = new GoogleCloudStorageOperationsStats(bucketName, storageService.isStateless()); this.bufferSize = bufferSize; this.casBackoffPolicy = casBackoffPolicy; } @@ -378,9 +384,7 @@ private void initResumableStream() throws IOException { public void write(byte[] b, int off, int len) throws IOException { int written = 0; while (written < len) { - // at most write the default chunk size in one go to prevent allocating huge buffers in the SDK - // see com.google.cloud.BaseWriteChannel#DEFAULT_CHUNK_SIZE - final int toWrite = Math.min(len - written, 60 * 256 * 1024); + final int toWrite = Math.min(len - written, SDK_DEFAULT_CHUNK_SIZE); out.write(b, off + written, toWrite); written += toWrite; } @@ -393,7 +397,7 @@ public void write(byte[] b, int off, int len) throws IOException { final WritableByteChannel writeChannel = channelRef.get(); if (writeChannel != null) { SocketAccess.doPrivilegedVoidIOException(writeChannel::close); - stats.trackPutOperation(); + stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT); } else { writeBlob(purpose, blobName, buffer.bytes(), failIfAlreadyExists); } @@ -463,7 +467,7 @@ private void writeBlobResumable( // we do with the GET/LIST operations since this operations // can trigger multiple underlying http requests but only one // operation is billed. - stats.trackPutOperation(); + stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT); return; } catch (final StorageException se) { final int errorCode = se.getCode(); @@ -515,7 +519,7 @@ private void writeBlobMultipart( // we do with the GET/LIST operations since this operations // can trigger multiple underlying http requests but only one // operation is billed. - stats.trackPostOperation(); + stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT); } catch (final StorageException se) { if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) { throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage()); @@ -634,7 +638,7 @@ private static String buildKey(String keyPath, String s) { @Override public Map stats() { - return stats.toMap(); + return stats.tracker().toMap(); } private static final class WritableBlobChannel implements WritableByteChannel { @@ -745,7 +749,7 @@ OptionalBytesReference compareAndExchangeRegister( Storage.BlobTargetOption.generationMatch() ) ); - stats.trackPostOperation(); + stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT); return OptionalBytesReference.of(expected); } catch (Exception e) { final var serviceException = unwrapServiceException(e); diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageHttpStatsCollector.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageHttpStatsCollector.java index 9cd657f34c9fe..669a228535c63 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageHttpStatsCollector.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageHttpStatsCollector.java @@ -9,115 +9,88 @@ package org.elasticsearch.repositories.gcs; -import com.google.api.client.http.GenericUrl; -import com.google.api.client.http.HttpRequest; import com.google.api.client.http.HttpResponse; import com.google.api.client.http.HttpResponseInterceptor; import org.elasticsearch.common.blobstore.OperationPurpose; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; -import java.util.List; -import java.util.Locale; -import java.util.function.Consumer; -import java.util.function.Function; import java.util.regex.Pattern; -import static java.lang.String.format; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.StatsTracker; final class GoogleCloudStorageHttpStatsCollector implements HttpResponseInterceptor { - // The specification for the current API (v1) endpoints can be found at: - // https://cloud.google.com/storage/docs/json_api/v1 - private static final List> trackerFactories = List.of( - (bucket) -> HttpRequestTracker.get( - format(Locale.ROOT, "/download/storage/v1/b/%s/o/.+", bucket), - GoogleCloudStorageOperationsStats::trackGetOperation - ), - (bucket) -> HttpRequestTracker.get( - format(Locale.ROOT, "/storage/v1/b/%s/o/.+", bucket), - GoogleCloudStorageOperationsStats::trackGetOperation - ), + private static final Logger logger = LogManager.getLogger("GcpHttpStats"); - (bucket) -> HttpRequestTracker.get( - format(Locale.ROOT, "/storage/v1/b/%s/o", bucket), - GoogleCloudStorageOperationsStats::trackListOperation - ) - ); + private final StatsTracker stats; + private final OperationPurpose purpose; + private final Pattern getObjPattern; + private final Pattern insertObjPattern; + private final Pattern listObjPattern; - private final GoogleCloudStorageOperationsStats gcsOperationStats; - private final OperationPurpose operationPurpose; - private final List trackers; + GoogleCloudStorageHttpStatsCollector(final GoogleCloudStorageOperationsStats stats, OperationPurpose purpose) { + this.stats = stats.tracker(); + this.purpose = purpose; + var bucket = stats.bucketName(); - GoogleCloudStorageHttpStatsCollector(final GoogleCloudStorageOperationsStats gcsOperationStats, OperationPurpose operationPurpose) { - this.gcsOperationStats = gcsOperationStats; - this.operationPurpose = operationPurpose; - this.trackers = trackerFactories.stream() - .map(trackerFactory -> trackerFactory.apply(gcsOperationStats.getTrackedBucket())) - .toList(); + // The specification for the current API (v1) endpoints can be found at: + // https://cloud.google.com/storage/docs/json_api/v1 + this.getObjPattern = Pattern.compile("(/download)?/storage/v1/b/" + bucket + "/o/.+"); + this.insertObjPattern = Pattern.compile("(/upload)?/storage/v1/b/" + bucket + "/o"); + this.listObjPattern = Pattern.compile("/storage/v1/b/" + bucket + "/o"); } - @Override - public void interceptResponse(final HttpResponse response) { - // TODO keep track of unsuccessful requests in different entries - if (response.isSuccessStatusCode() == false) return; - - final HttpRequest request = response.getRequest(); - for (HttpRequestTracker tracker : trackers) { - if (tracker.track(request, gcsOperationStats)) { - return; - } - } + private void trackRequest(Operation operation) { + stats.trackRequest(purpose, operation); } - /** - * Http request tracker that allows to track certain HTTP requests based on the following criteria: - *
    - *
  • The HTTP request method
  • - *
  • An URI path regex expression
  • - *
- * - * The requests that match the previous criteria are tracked using the {@code statsTracker} function. - */ - private static final class HttpRequestTracker { - private final String method; - private final Pattern pathPattern; - private final Consumer statsTracker; - - private HttpRequestTracker( - final String method, - final String pathPattern, - final Consumer statsTracker - ) { - this.method = method; - this.pathPattern = Pattern.compile(pathPattern); - this.statsTracker = statsTracker; - } - - private static HttpRequestTracker get(final String pathPattern, final Consumer statsConsumer) { - return new HttpRequestTracker("GET", pathPattern, statsConsumer); - } - - /** - * Tracks the provided http request if it matches the criteria defined by this tracker. - * - * @param httpRequest the http request to be tracked - * @param stats the operation tracker - * - * @return {@code true} if the http request was tracked, {@code false} otherwise. - */ - private boolean track(final HttpRequest httpRequest, final GoogleCloudStorageOperationsStats stats) { - if (matchesCriteria(httpRequest) == false) return false; + private void trackRequestAndOperation(Operation operation) { + stats.trackOperationAndRequest(purpose, operation); + } - statsTracker.accept(stats); - return true; + @Override + public void interceptResponse(final HttpResponse response) { + var respCode = response.getStatusCode(); + // Some of the intermediate and error codes are still counted as "good" requests + if (((respCode >= 200 && respCode < 300) || respCode == 308 || respCode == 404) == false) { + return; } - - private boolean matchesCriteria(final HttpRequest httpRequest) { - return method.equalsIgnoreCase(httpRequest.getRequestMethod()) && pathMatches(httpRequest.getUrl()); + var request = response.getRequest(); + + var path = request.getUrl().getRawPath(); + var ignored = false; + switch (request.getRequestMethod()) { + case "GET" -> { + // https://cloud.google.com/storage/docs/json_api/v1/objects/get + if (getObjPattern.matcher(path).matches()) { + trackRequestAndOperation(Operation.GET_OBJECT); + } else if (listObjPattern.matcher(path).matches()) { + trackRequestAndOperation(Operation.LIST_OBJECTS); + } else { + ignored = true; + } + } + case "POST", "PUT" -> { + // https://cloud.google.com/storage/docs/json_api/v1/objects/insert + if (insertObjPattern.matcher(path).matches()) { + trackRequest(Operation.INSERT_OBJECT); + } else { + ignored = true; + } + } + default -> ignored = true; } - - private boolean pathMatches(final GenericUrl url) { - return pathPattern.matcher(url.getRawPath()).matches(); + if (ignored) { + logger.debug( + "not handling request:{} {} response:{} {}", + request.getRequestMethod(), + path, + response.getStatusCode(), + response.getStatusMessage() + ); } } } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java index 1859d7041115d..06878a17a97b6 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java @@ -10,53 +10,166 @@ package org.elasticsearch.repositories.gcs; import org.elasticsearch.common.blobstore.BlobStoreActionStats; +import org.elasticsearch.common.blobstore.OperationPurpose; -import java.util.HashMap; +import java.util.EnumMap; import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; +import java.util.stream.Collectors; final class GoogleCloudStorageOperationsStats { - private final AtomicLong getCount = new AtomicLong(); - private final AtomicLong listCount = new AtomicLong(); - private final AtomicLong putCount = new AtomicLong(); - private final AtomicLong postCount = new AtomicLong(); - private final String bucketName; + private final StatsTracker tracker; - GoogleCloudStorageOperationsStats(String bucketName) { + GoogleCloudStorageOperationsStats(String bucketName, boolean isStateless) { this.bucketName = bucketName; + if (isStateless) { + this.tracker = new ServerlessTracker(bucketName); + } else { + this.tracker = new StatefulTracker(); + } } - void trackGetOperation() { - getCount.incrementAndGet(); + GoogleCloudStorageOperationsStats(String bucketName) { + this(bucketName, false); } - void trackPutOperation() { - putCount.incrementAndGet(); + public String bucketName() { + return bucketName; } - void trackPostOperation() { - postCount.incrementAndGet(); + public StatsTracker tracker() { + return tracker; + } + + public enum Operation { + GET_OBJECT("GetObject"), + LIST_OBJECTS("ListObjects"), + INSERT_OBJECT("InsertObject"); + + private final String key; + + Operation(String key) { + this.key = key; + } + + public String key() { + return key; + } } - void trackListOperation() { - listCount.incrementAndGet(); + sealed interface StatsTracker permits ServerlessTracker, StatefulTracker { + void trackRequest(OperationPurpose purpose, Operation operation); + + void trackOperation(OperationPurpose purpose, Operation operation); + + Map toMap(); + + default void trackOperationAndRequest(OperationPurpose purpose, Operation operation) { + trackOperation(purpose, operation); + trackRequest(purpose, operation); + } } - String getTrackedBucket() { - return bucketName; + /** + * Stateful tracker is single dimension: Operation only. The OperationPurpose is ignored. + */ + static final class StatefulTracker implements StatsTracker { + + private final EnumMap counters; + + StatefulTracker() { + this.counters = new EnumMap<>(Operation.class); + for (var operation : Operation.values()) { + counters.put(operation, new Counters(operation.key())); + } + } + + @Override + public void trackRequest(OperationPurpose purpose, Operation operation) { + // dont track requests, only operations + } + + @Override + public void trackOperation(OperationPurpose purpose, Operation operation) { + counters.get(operation).operations().add(1); + } + + @Override + public Map toMap() { + return counters.values().stream().collect(Collectors.toUnmodifiableMap(Counters::name, (c) -> { + var ops = c.operations().sum(); + return new BlobStoreActionStats(ops, ops); + })); + } + + } + + /** + * Serverless tracker is 2-dimensional: OperationPurpose and Operations. + * Every combination of these has own set of counters: number of operations and number of http requests. + * A single operation might have multiple HTTP requests, for example a single ResumableUpload operation + * can perform a series of HTTP requests with size up to {@link GoogleCloudStorageBlobStore#SDK_DEFAULT_CHUNK_SIZE}. + *
+     * {@code
+     * | Purpose      | Operation       | OperationsCnt | RequestCnt |
+     * |--------------+-----------------+---------------+------------|
+     * | SnapshotData | GetObject       |             1 |          1 |
+     * | SnapshotData | ListObjects     |             2 |          2 |
+     * | SnapshotData | ResumableUpload |             1 |         10 |
+     * | SnapshotData | ...             |               |            |
+     * | Translog     | GetObject       |             5 |          5 |
+     * | ...          |                 |               |            |
+     * }
+     * 
+ */ + static final class ServerlessTracker implements StatsTracker { + private final EnumMap> counters; + + ServerlessTracker(String bucketName) { + this.counters = new EnumMap<>(OperationPurpose.class); + for (var purpose : OperationPurpose.values()) { + var operations = new EnumMap(Operation.class); + for (var operation : Operation.values()) { + operations.put(operation, new Counters(purpose.getKey() + "_" + operation.key())); + } + counters.put(purpose, operations); + } + } + + @Override + public void trackOperation(OperationPurpose purpose, Operation operation) { + counters.get(purpose).get(operation).operations.add(1); + } + + @Override + public void trackRequest(OperationPurpose purpose, Operation operation) { + counters.get(purpose).get(operation).requests.add(1); + } + + @Override + public void trackOperationAndRequest(OperationPurpose purpose, Operation operation) { + var c = counters.get(purpose).get(operation); + c.requests.add(1); + c.operations.add(1); + } + + @Override + public Map toMap() { + return counters.values() + .stream() + .flatMap(ops -> ops.values().stream()) + .collect( + Collectors.toUnmodifiableMap(Counters::name, (c) -> new BlobStoreActionStats(c.operations.sum(), c.requests.sum())) + ); + } + } - // TODO: actually track requests and operations separately (see https://elasticco.atlassian.net/browse/ES-10213) - Map toMap() { - final Map results = new HashMap<>(); - final long getOperations = getCount.get(); - results.put("GetObject", new BlobStoreActionStats(getOperations, getOperations)); - final long listOperations = listCount.get(); - results.put("ListObjects", new BlobStoreActionStats(listOperations, listOperations)); - final long insertOperations = postCount.get() + putCount.get(); - results.put("InsertObject", new BlobStoreActionStats(insertOperations, insertOperations)); - return results; + private record Counters(String name, LongAdder operations, LongAdder requests) { + Counters(String name) { + this(name, new LongAdder(), new LongAdder()); + } } } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java index 14281b7288067..81ebe26b1e058 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java @@ -34,14 +34,14 @@ public class GoogleCloudStoragePlugin extends Plugin implements RepositoryPlugin @SuppressWarnings("this-escape") public GoogleCloudStoragePlugin(final Settings settings) { - this.storageService = createStorageService(); + this.storageService = createStorageService(settings); // eagerly load client settings so that secure settings are readable (not closed) reload(settings); } // overridable for tests - protected GoogleCloudStorageService createStorageService() { - return new GoogleCloudStorageService(); + protected GoogleCloudStorageService createStorageService(Settings settings) { + return new GoogleCloudStorageService(settings); } @Override diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java index befd67b5aa565..ed7294d7897ce 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java @@ -24,8 +24,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.OperationPurpose; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.SuppressForbidden; @@ -52,6 +54,11 @@ public class GoogleCloudStorageService { private static final Logger logger = LogManager.getLogger(GoogleCloudStorageService.class); private volatile Map clientSettings = emptyMap(); + private final boolean isStateless; + + public GoogleCloudStorageService(Settings settings) { + this.isStateless = DiscoveryNode.isStateless(settings); + } private record ClientKey(OperationPurpose purpose, String repositoryName) {} @@ -127,6 +134,10 @@ public Storage client( } } + boolean isStateless() { + return isStateless; + } + synchronized void closeRepositoryClients(String repositoryName) { clientCache = clientCache.entrySet() .stream() @@ -182,7 +193,6 @@ public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions ser return (httpRequest) -> { if (requestInitializer != null) requestInitializer.initialize(httpRequest); - httpRequest.setResponseInterceptor(httpStatsCollector); }; } diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index cb7049952b002..c5ae366ac80e2 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -130,7 +130,7 @@ protected BlobContainer createBlobContainer( secureSettings.setFile(CREDENTIALS_FILE_SETTING.getConcreteSettingForNamespace(client).getKey(), createServiceAccount(random())); clientSettings.setSecureSettings(secureSettings); - final GoogleCloudStorageService service = new GoogleCloudStorageService() { + final GoogleCloudStorageService service = new GoogleCloudStorageService(Settings.EMPTY) { @Override StorageOptions createStorageOptions( final GoogleCloudStorageClientSettings gcsClientSettings, diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java index c028c89ff8a65..774dba59c99a2 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.BackoffPolicy; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStoreActionStats; +import org.elasticsearch.common.blobstore.OperationPurpose; import org.elasticsearch.common.blobstore.support.BlobContainerUtils; import org.elasticsearch.common.blobstore.support.BlobMetadata; import org.elasticsearch.common.bytes.BytesArray; @@ -42,14 +43,20 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URI; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.cluster.node.DiscoveryNode.STATELESS_ENABLED_SETTING_NAME; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.APPLICATION_NAME_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CONNECT_TIMEOUT_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.PROJECT_ID_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation.GET_OBJECT; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation.INSERT_OBJECT; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation.LIST_OBJECTS; @SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint") public class GoogleCloudStorageBlobContainerStatsTests extends ESTestCase { @@ -58,16 +65,60 @@ public class GoogleCloudStorageBlobContainerStatsTests extends ESTestCase { private HttpServer httpServer; private ThreadPool threadPool; + + // When isServerless is set to true, test suite will collect + // 2-dimensional metrics OperationPurpose/Operation. + // Otherwise only Operations. + private boolean isServerless; + private GoogleCloudStorageService googleCloudStorageService; private GoogleCloudStorageHttpHandler googleCloudStorageHttpHandler; private ContainerAndBlobStore containerAndStore; + // A utility method that prints nicer diff messages, rather than dumping entire map, like this: + // Stats counts do not match: + // {SNAPSHOT_DATA_RESUMABLE_UPLOAD=Diff[wantOps=1, gotOps=1, diffOps=0, wantReqs=3, gotReqs=2, diffReqs=1]} + static void assertStatsEquals(Map want, Map got) { + assert want.keySet().equals(got.keySet()); + record Diff(long wantOps, long gotOps, long diffOps, long wantReqs, long gotReqs, long diffReqs) {} + var diff = new HashMap(); + for (var wkey : want.keySet()) { + if (got.containsKey(wkey)) { + var gotStat = got.get(wkey); + var wantStat = want.get(wkey); + if (gotStat.equals(wantStat) == false) { + diff.put( + wkey, + new Diff( + wantStat.operations(), + gotStat.operations(), + wantStat.operations() - gotStat.operations(), + wantStat.requests(), + gotStat.requests(), + wantStat.requests() - gotStat.requests() + ) + ); + } + } + } + if (diff.size() > 0) { + fail("Stats counts do not match:\n" + diff); + } + } + @Before public void createStorageService() throws Exception { threadPool = new TestThreadPool(getTestClass().getName()); httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); httpServer.start(); - googleCloudStorageService = new GoogleCloudStorageService(); + isServerless = randomBoolean(); + Settings settings; + if (isServerless) { + settings = Settings.builder().put(STATELESS_ENABLED_SETTING_NAME, true).build(); + } else { + settings = Settings.EMPTY; + } + googleCloudStorageService = new GoogleCloudStorageService(settings); googleCloudStorageHttpHandler = new GoogleCloudStorageHttpHandler(BUCKET); httpServer.createContext("/", googleCloudStorageHttpHandler); httpServer.createContext("/token", new FakeOAuth2HttpHandler()); @@ -86,16 +137,18 @@ public void testSingleMultipartWrite() throws Exception { final GoogleCloudStorageBlobContainer container = containerAndStore.blobContainer(); final GoogleCloudStorageBlobStore store = containerAndStore.blobStore(); + final OperationPurpose purpose = randomPurpose(); final String blobName = randomIdentifier(); final int blobLength = randomIntBetween(1, (int) store.getLargeBlobThresholdInBytes() - 1); final BytesArray blobContents = new BytesArray(randomByteArrayOfLength(blobLength)); - container.writeBlob(randomPurpose(), blobName, blobContents, true); - assertEquals(createStats(1, 0, 0), store.stats()); + container.writeBlob(purpose, blobName, blobContents, true); - try (InputStream is = container.readBlob(randomPurpose(), blobName)) { + final StatsMap wantStats = new StatsMap(purpose); + assertStatsEquals(wantStats.add(INSERT_OBJECT, 1, 1), store.stats()); + try (InputStream is = container.readBlob(purpose, blobName)) { assertEquals(blobContents, Streams.readFully(is)); } - assertEquals(createStats(1, 0, 1), store.stats()); + assertStatsEquals(wantStats.add(GET_OBJECT, 1, 1), store.stats()); } @Test @@ -103,16 +156,25 @@ public void testResumableWrite() throws Exception { final GoogleCloudStorageBlobContainer container = containerAndStore.blobContainer(); final GoogleCloudStorageBlobStore store = containerAndStore.blobStore(); + final OperationPurpose purpose = randomPurpose(); final String blobName = randomIdentifier(); - final int size = randomIntBetween((int) store.getLargeBlobThresholdInBytes(), (int) store.getLargeBlobThresholdInBytes() * 2); + final int parts = between(1, 10); + final int maxPartSize = GoogleCloudStorageBlobStore.SDK_DEFAULT_CHUNK_SIZE; + final int size = (parts - 1) * maxPartSize + between(1, maxPartSize); + assert size >= store.getLargeBlobThresholdInBytes(); final BytesArray blobContents = new BytesArray(randomByteArrayOfLength(size)); - container.writeBlob(randomPurpose(), blobName, blobContents, true); - assertEquals(createStats(1, 0, 0), store.stats()); + container.writeBlob(purpose, blobName, blobContents, true); + + // a resumable upload sends at least 2 requests, a POST with metadata only and multiple PUTs with SDK_DEFAULT_CHUNK_SIZE + // the +1 means a POST request with metadata without PAYLOAD + final int totalRequests = parts + 1; + final StatsMap wantStats = new StatsMap(purpose); + assertStatsEquals(wantStats.add(INSERT_OBJECT, 1, totalRequests), store.stats()); - try (InputStream is = container.readBlob(randomPurpose(), blobName)) { + try (InputStream is = container.readBlob(purpose, blobName)) { assertEquals(blobContents, Streams.readFully(is)); } - assertEquals(createStats(1, 0, 1), store.stats()); + assertStatsEquals(wantStats.add(GET_OBJECT, 1, 1), store.stats()); } @Test @@ -123,14 +185,16 @@ public void testDeleteDirectory() throws Exception { final String directoryName = randomIdentifier(); final BytesArray contents = new BytesArray(randomByteArrayOfLength(50)); final int numberOfFiles = randomIntBetween(1, 20); + final OperationPurpose purpose = randomPurpose(); for (int i = 0; i < numberOfFiles; i++) { - container.writeBlob(randomPurpose(), String.format("%s/file_%d", directoryName, i), contents, true); + container.writeBlob(purpose, String.format("%s/file_%d", directoryName, i), contents, true); } - assertEquals(createStats(numberOfFiles, 0, 0), store.stats()); + final StatsMap wantStats = new StatsMap(purpose); + assertStatsEquals(wantStats.add(INSERT_OBJECT, numberOfFiles, numberOfFiles), store.stats()); - container.delete(randomPurpose()); + container.delete(purpose); // We only count the list because we can't track the bulk delete - assertEquals(createStats(numberOfFiles, 1, 0), store.stats()); + assertStatsEquals(wantStats.add(LIST_OBJECTS, 1, 1), store.stats()); } @Test @@ -143,15 +207,18 @@ public void testListBlobsAccountsForPaging() throws Exception { final int numberOfPages = randomIntBetween(1, 10); final int numberOfObjects = randomIntBetween((numberOfPages - 1) * pageSize, numberOfPages * pageSize - 1); final BytesArray contents = new BytesArray(randomByteArrayOfLength(50)); + final OperationPurpose purpose = randomPurpose(); for (int i = 0; i < numberOfObjects; i++) { - container.writeBlob(randomPurpose(), String.format("file_%d", i), contents, true); + container.writeBlob(purpose, String.format("file_%d", i), contents, true); } - assertEquals(createStats(numberOfObjects, 0, 0), store.stats()); + final StatsMap wantStats = new StatsMap(purpose); + assertStatsEquals(wantStats.add(INSERT_OBJECT, numberOfObjects, numberOfObjects), store.stats()); - final Map stringBlobMetadataMap = container.listBlobs(randomPurpose()); + final Map stringBlobMetadataMap = container.listBlobs(purpose); assertEquals(numberOfObjects, stringBlobMetadataMap.size()); + // There should be {numberOfPages} pages of blobs - assertEquals(createStats(numberOfObjects, numberOfPages, 0), store.stats()); + assertStatsEquals(wantStats.add(LIST_OBJECTS, numberOfPages, numberOfPages), store.stats()); } public void testCompareAndSetRegister() { @@ -161,42 +228,23 @@ public void testCompareAndSetRegister() { // update from empty (adds a single insert) final BytesArray contents = new BytesArray(randomByteArrayOfLength(BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH)); final String registerName = randomIdentifier(); - assertTrue(safeAwait(l -> container.compareAndSetRegister(randomPurpose(), registerName, BytesArray.EMPTY, contents, l))); - assertEquals(createStats(1, 0, 0), store.stats()); + final OperationPurpose purpose = randomPurpose(); + assertTrue(safeAwait(l -> container.compareAndSetRegister(purpose, registerName, BytesArray.EMPTY, contents, l))); + final StatsMap wantStat = new StatsMap(purpose); + assertStatsEquals(wantStat.add(GET_OBJECT, 1, 1).add(INSERT_OBJECT, 1, 1), store.stats()); // successful update from non-null (adds two gets, one insert) final BytesArray nextContents = new BytesArray(randomByteArrayOfLength(BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH)); - assertTrue(safeAwait(l -> container.compareAndSetRegister(randomPurpose(), registerName, contents, nextContents, l))); - assertEquals(createStats(2, 0, 2), store.stats()); + assertTrue(safeAwait(l -> container.compareAndSetRegister(purpose, registerName, contents, nextContents, l))); + assertStatsEquals(wantStat.add(GET_OBJECT, 2, 2).add(INSERT_OBJECT, 1, 1), store.stats()); // failed update (adds two gets, zero inserts) final BytesArray wrongContents = randomValueOtherThan( nextContents, () -> new BytesArray(randomByteArrayOfLength(BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH)) ); - assertFalse(safeAwait(l -> container.compareAndSetRegister(randomPurpose(), registerName, wrongContents, contents, l))); - assertEquals(createStats(2, 0, 4), store.stats()); - } - - private Map createStats(int insertCount, int listCount, int getCount) { - return Map.of( - "GetObject", - new BlobStoreActionStats(getCount, getCount), - "ListObjects", - new BlobStoreActionStats(listCount, listCount), - "InsertObject", - new BlobStoreActionStats(insertCount, insertCount) - ); - } - - private record ContainerAndBlobStore(GoogleCloudStorageBlobContainer blobContainer, GoogleCloudStorageBlobStore blobStore) - implements - Closeable { - - @Override - public void close() { - blobStore.close(); - } + assertFalse(safeAwait(l -> container.compareAndSetRegister(purpose, registerName, wrongContents, contents, l))); + assertStatsEquals(wantStat.add(GET_OBJECT, 2, 2), store.stats()); } private ContainerAndBlobStore createBlobContainer(final String repositoryName) throws Exception { @@ -236,4 +284,49 @@ protected String getEndpointForServer(final HttpServer server) { final InetSocketAddress address = server.getAddress(); return "http://" + address.getHostString() + ":" + address.getPort(); } + + private record ContainerAndBlobStore(GoogleCloudStorageBlobContainer blobContainer, GoogleCloudStorageBlobStore blobStore) + implements + Closeable { + + @Override + public void close() { + blobStore.close(); + } + } + + class StatsMap extends HashMap { + private final OperationPurpose purpose; + + StatsMap(OperationPurpose purpose) { + this.purpose = purpose; + if (isServerless) { + for (var p : OperationPurpose.values()) { + for (var o : Operation.values()) { + put(p.getKey() + "_" + o.key(), new BlobStoreActionStats(0, 0)); + } + } + } else { + for (var o : Operation.values()) { + put(o.key(), new BlobStoreActionStats(0, 0)); + } + } + } + + StatsMap add(Operation operation, long ops, long reqs) { + var key = isServerless ? purpose.getKey() + "_" + operation.key() : operation.key(); + compute(key, (k, v) -> { + BlobStoreActionStats stats; + assert v != null; + if (isServerless) { + stats = new BlobStoreActionStats(v.operations() + ops, v.requests() + reqs); + } else { + stats = new BlobStoreActionStats(v.operations() + ops, v.operations() + ops); + } + return stats; + }); + return this; + } + + } } diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java index 8c7e01188f826..904deb710f24f 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java @@ -75,7 +75,7 @@ public void testClientInitializer() throws Exception { .put(GoogleCloudStorageClientSettings.PROXY_PORT_SETTING.getConcreteSettingForNamespace(clientName).getKey(), 8080) .build(); SetOnce proxy = new SetOnce<>(); - final GoogleCloudStorageService service = new GoogleCloudStorageService() { + final GoogleCloudStorageService service = new GoogleCloudStorageService(Settings.EMPTY) { @Override void notifyProxyIsSet(Proxy p) { proxy.set(p);