From a0cf1baeea2bb7981af32af473cccda11f59b49c Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 20 Feb 2025 18:29:33 +1100 Subject: [PATCH 1/3] Propagate OperationPurpose --- .../gcs/GoogleCloudStorageBlobContainer.java | 29 +++-- .../gcs/GoogleCloudStorageBlobStore.java | 115 ++++++++++++------ 2 files changed, 91 insertions(+), 53 deletions(-) diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java index 3ed492881afa9..22c600445c609 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java @@ -41,7 +41,7 @@ class GoogleCloudStorageBlobContainer extends AbstractBlobContainer { @Override public boolean blobExists(OperationPurpose purpose, String blobName) { try { - return blobStore.blobExists(buildKey(blobName)); + return blobStore.blobExists(purpose, buildKey(blobName)); } catch (Exception e) { throw new BlobStoreException("Failed to check if blob [" + blobName + "] exists", e); } @@ -49,39 +49,39 @@ public boolean blobExists(OperationPurpose purpose, String blobName) { @Override public Map listBlobs(OperationPurpose purpose) throws IOException { - return blobStore.listBlobs(path); + return blobStore.listBlobs(purpose, path); } @Override public Map children(OperationPurpose purpose) throws IOException { - return blobStore.listChildren(path()); + return blobStore.listChildren(purpose, path()); } @Override public Map listBlobsByPrefix(OperationPurpose purpose, String prefix) throws IOException { - return blobStore.listBlobsByPrefix(path, prefix); + return blobStore.listBlobsByPrefix(purpose, path, prefix); } @Override public InputStream readBlob(OperationPurpose purpose, String blobName) throws IOException { - return blobStore.readBlob(buildKey(blobName)); + return blobStore.readBlob(purpose, buildKey(blobName)); } @Override public InputStream readBlob(OperationPurpose purpose, final String blobName, final long position, final long length) throws IOException { - return blobStore.readBlob(buildKey(blobName), position, length); + return blobStore.readBlob(purpose, buildKey(blobName), position, length); } @Override public void writeBlob(OperationPurpose purpose, String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { - blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists); + blobStore.writeBlob(purpose, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists); } @Override public void writeBlob(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { - blobStore.writeBlob(buildKey(blobName), bytes, failIfAlreadyExists); + blobStore.writeBlob(purpose, buildKey(blobName), bytes, failIfAlreadyExists); } @Override @@ -92,7 +92,7 @@ public void writeMetadataBlob( boolean atomic, CheckedConsumer writer ) throws IOException { - blobStore.writeBlob(buildKey(blobName), failIfAlreadyExists, writer); + blobStore.writeBlob(purpose, buildKey(blobName), failIfAlreadyExists, writer); } @Override @@ -114,12 +114,12 @@ public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesRefe @Override public DeleteResult delete(OperationPurpose purpose) throws IOException { - return blobStore.deleteDirectory(path().buildAsString()); + return blobStore.deleteDirectory(purpose, path().buildAsString()); } @Override public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator blobNames) throws IOException { - blobStore.deleteBlobs(new Iterator<>() { + blobStore.deleteBlobs(purpose, new Iterator<>() { @Override public boolean hasNext() { return blobNames.hasNext(); @@ -145,11 +145,14 @@ public void compareAndExchangeRegister( BytesReference updated, ActionListener listener ) { - ActionListener.completeWith(listener, () -> blobStore.compareAndExchangeRegister(buildKey(key), path, key, expected, updated)); + ActionListener.completeWith( + listener, + () -> blobStore.compareAndExchangeRegister(purpose, buildKey(key), path, key, expected, updated) + ); } @Override public void getRegister(OperationPurpose purpose, String key, ActionListener listener) { - ActionListener.completeWith(listener, () -> blobStore.getRegister(buildKey(key), path, key)); + ActionListener.completeWith(listener, () -> blobStore.getRegister(purpose, buildKey(key), path, key)); } } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 1bb388eaaf34b..1dfb95fa4537d 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.BlobStoreActionStats; import org.elasticsearch.common.blobstore.DeleteResult; +import org.elasticsearch.common.blobstore.OperationPurpose; import org.elasticsearch.common.blobstore.OptionalBytesReference; import org.elasticsearch.common.blobstore.support.BlobContainerUtils; import org.elasticsearch.common.blobstore.support.BlobMetadata; @@ -128,7 +129,7 @@ class GoogleCloudStorageBlobStore implements BlobStore { this.casBackoffPolicy = casBackoffPolicy; } - private Storage client() throws IOException { + private Storage client(OperationPurpose purpose) throws IOException { return storageService.client(clientName, repositoryName, stats); } @@ -145,27 +146,29 @@ public void close() { /** * List blobs in the specific bucket under the specified path. The path root is removed. * + * @param purpose the operation purpose * @param path base path of the blobs to list * @return a map of blob names and their metadata */ - Map listBlobs(String path) throws IOException { - return listBlobsByPrefix(path, ""); + Map listBlobs(OperationPurpose purpose, String path) throws IOException { + return listBlobsByPrefix(purpose, path, ""); } /** * List all blobs in the specific bucket with names prefixed * + * @param purpose the operation purpose * @param path * base path of the blobs to list. This path is removed from the * names of the blobs returned. * @param prefix prefix of the blobs to list. * @return a map of blob names and their metadata. */ - Map listBlobsByPrefix(String path, String prefix) throws IOException { + Map listBlobsByPrefix(OperationPurpose purpose, String path, String prefix) throws IOException { final String pathPrefix = buildKey(path, prefix); final Map mapBuilder = new HashMap<>(); SocketAccess.doPrivilegedVoidIOException( - () -> client().list(bucketName, BlobListOption.currentDirectory(), BlobListOption.prefix(pathPrefix)) + () -> client(purpose).list(bucketName, BlobListOption.currentDirectory(), BlobListOption.prefix(pathPrefix)) .iterateAll() .forEach(blob -> { assert blob.getName().startsWith(path); @@ -178,11 +181,11 @@ Map listBlobsByPrefix(String path, String prefix) throws I return Map.copyOf(mapBuilder); } - Map listChildren(BlobPath path) throws IOException { + Map listChildren(OperationPurpose purpose, BlobPath path) throws IOException { final String pathStr = path.buildAsString(); final Map mapBuilder = new HashMap<>(); SocketAccess.doPrivilegedVoidIOException( - () -> client().list(bucketName, BlobListOption.currentDirectory(), BlobListOption.prefix(pathStr)) + () -> client(purpose).list(bucketName, BlobListOption.currentDirectory(), BlobListOption.prefix(pathStr)) .iterateAll() .forEach(blob -> { if (blob.isDirectory()) { @@ -202,34 +205,37 @@ Map listChildren(BlobPath path) throws IOException { /** * Returns true if the blob exists in the specific bucket * + * @param purpose the operation purpose * @param blobName name of the blob * @return true iff the blob exists */ - boolean blobExists(String blobName) throws IOException { + boolean blobExists(OperationPurpose purpose, String blobName) throws IOException { final BlobId blobId = BlobId.of(bucketName, blobName); - final Blob blob = SocketAccess.doPrivilegedIOException(() -> client().get(blobId)); + final Blob blob = SocketAccess.doPrivilegedIOException(() -> client(purpose).get(blobId)); return blob != null; } /** * Returns an {@link java.io.InputStream} for the given blob name * + * @param purpose the operation purpose * @param blobName name of the blob * @return the InputStream used to read the blob's content */ - InputStream readBlob(String blobName) throws IOException { - return new GoogleCloudStorageRetryingInputStream(client(), BlobId.of(bucketName, blobName)); + InputStream readBlob(OperationPurpose purpose, String blobName) throws IOException { + return new GoogleCloudStorageRetryingInputStream(client(purpose), BlobId.of(bucketName, blobName)); } /** * Returns an {@link java.io.InputStream} for the given blob's position and length * + * @param purpose the operation purpose * @param blobName name of the blob * @param position starting position to read from * @param length length of bytes to read * @return the InputStream used to read the blob's content */ - InputStream readBlob(String blobName, long position, long length) throws IOException { + InputStream readBlob(OperationPurpose purpose, String blobName, long position, long length) throws IOException { if (position < 0L) { throw new IllegalArgumentException("position must be non-negative"); } @@ -240,7 +246,7 @@ InputStream readBlob(String blobName, long position, long length) throws IOExcep return new ByteArrayInputStream(new byte[0]); } else { return new GoogleCloudStorageRetryingInputStream( - client(), + client(purpose), BlobId.of(bucketName, blobName), position, Math.addExact(position, length - 1) @@ -250,16 +256,18 @@ InputStream readBlob(String blobName, long position, long length) throws IOExcep /** * Writes a blob in the specific bucket + * @param purpose the operation purpose * @param bytes content of the blob to be written * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists */ - void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { + void writeBlob(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { if (bytes.length() > getLargeBlobThresholdInBytes()) { // Compute md5 here so #writeBlobResumable forces the integrity check on the resumable upload. // This is needed since we rely on atomic write behavior when writing BytesReferences in BlobStoreRepository which is not // guaranteed for resumable uploads. final String md5 = Base64.getEncoder().encodeToString(MessageDigests.digest(bytes, MessageDigests.md5())); writeBlobResumable( + purpose, BlobInfo.newBuilder(bucketName, blobName).setMd5(md5).build(), bytes.streamInput(), bytes.length(), @@ -268,30 +276,33 @@ void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExist } else { final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, blobName).build(); if (bytes.hasArray()) { - writeBlobMultipart(blobInfo, bytes.array(), bytes.arrayOffset(), bytes.length(), failIfAlreadyExists); + writeBlobMultipart(purpose, blobInfo, bytes.array(), bytes.arrayOffset(), bytes.length(), failIfAlreadyExists); } else { - writeBlob(bytes.streamInput(), bytes.length(), failIfAlreadyExists, blobInfo); + writeBlob(purpose, bytes.streamInput(), bytes.length(), failIfAlreadyExists, blobInfo); } } } /** * Writes a blob in the specific bucket + * @param purpose the operation purpose * @param inputStream content of the blob to be written * @param blobSize expected size of the blob to be written * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists */ - void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { - writeBlob(inputStream, blobSize, failIfAlreadyExists, BlobInfo.newBuilder(bucketName, blobName).build()); + void writeBlob(OperationPurpose purpose, String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) + throws IOException { + writeBlob(purpose, inputStream, blobSize, failIfAlreadyExists, BlobInfo.newBuilder(bucketName, blobName).build()); } - private void writeBlob(InputStream inputStream, long blobSize, boolean failIfAlreadyExists, BlobInfo blobInfo) throws IOException { + private void writeBlob(OperationPurpose purpose, InputStream inputStream, long blobSize, boolean failIfAlreadyExists, BlobInfo blobInfo) + throws IOException { if (blobSize > getLargeBlobThresholdInBytes()) { - writeBlobResumable(blobInfo, inputStream, blobSize, failIfAlreadyExists); + writeBlobResumable(purpose, blobInfo, inputStream, blobSize, failIfAlreadyExists); } else { final byte[] buffer = new byte[Math.toIntExact(blobSize)]; Streams.readFully(inputStream, buffer); - writeBlobMultipart(blobInfo, buffer, 0, Math.toIntExact(blobSize), failIfAlreadyExists); + writeBlobMultipart(purpose, blobInfo, buffer, 0, Math.toIntExact(blobSize), failIfAlreadyExists); } } @@ -308,7 +319,12 @@ long getLargeBlobThresholdInBytes() { Storage.BlobWriteOption.md5Match() }; private static final Storage.BlobWriteOption[] OVERWRITE_CHECK_MD5 = { Storage.BlobWriteOption.md5Match() }; - void writeBlob(String blobName, boolean failIfAlreadyExists, CheckedConsumer writer) throws IOException { + void writeBlob( + OperationPurpose purpose, + String blobName, + boolean failIfAlreadyExists, + CheckedConsumer writer + ) throws IOException { final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, blobName).build(); final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ? NO_OVERWRITE_NO_MD5 : OVERWRITE_NO_MD5; @@ -354,7 +370,7 @@ public void write(byte[] b, int off, int len) throws IOException { private void initResumableStream() throws IOException { final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException( - () -> client().writer(blobInfo, writeOptions) + () -> client(purpose).writer(blobInfo, writeOptions) ); channelRef.set(writeChannel); resumableStream = new FilterOutputStream(Channels.newOutputStream(new WritableBlobChannel(writeChannel))) { @@ -379,7 +395,7 @@ public void write(byte[] b, int off, int len) throws IOException { SocketAccess.doPrivilegedVoidIOException(writeChannel::close); stats.trackPutOperation(); } else { - writeBlob(blobName, buffer.bytes(), failIfAlreadyExists); + writeBlob(purpose, blobName, buffer.bytes(), failIfAlreadyExists); } return; } catch (final StorageException se) { @@ -405,12 +421,19 @@ public void write(byte[] b, int off, int len) throws IOException { * Uploads a blob using the "resumable upload" method (multiple requests, which * can be independently retried in case of failure, see * https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload + * @param purpose the operation purpose * @param blobInfo the info for the blob to be uploaded * @param inputStream the stream containing the blob data * @param size expected size of the blob to be written * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists */ - private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, long size, boolean failIfAlreadyExists) throws IOException { + private void writeBlobResumable( + OperationPurpose purpose, + BlobInfo blobInfo, + InputStream inputStream, + long size, + boolean failIfAlreadyExists + ) throws IOException { // We retry 410 GONE errors to cover the unlikely but possible scenario where a resumable upload session becomes broken and // needs to be restarted from scratch. Given how unlikely a 410 error should be according to SLAs we retry only twice. assert inputStream.markSupported(); @@ -427,7 +450,9 @@ private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, long } for (int retry = 0; retry < 3; ++retry) { try { - final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions)); + final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException( + () -> client(purpose).writer(blobInfo, writeOptions) + ); /* * It is not enough to wrap the call to Streams#copy, we have to wrap the privileged calls too; this is because Streams#copy * is in the stacktrace and is not granted the permissions needed to close and write the channel. @@ -465,20 +490,27 @@ private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, long * 'multipart/related' request containing both data and metadata. The request is * gziped), see: * https://cloud.google.com/storage/docs/json_api/v1/how-tos/multipart-upload + * @param purpose the operation purpose * @param blobInfo the info for the blob to be uploaded * @param buffer the byte array containing the data * @param offset offset at which the blob contents start in the buffer * @param blobSize the size * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists */ - private void writeBlobMultipart(BlobInfo blobInfo, byte[] buffer, int offset, int blobSize, boolean failIfAlreadyExists) - throws IOException { + private void writeBlobMultipart( + OperationPurpose purpose, + BlobInfo blobInfo, + byte[] buffer, + int offset, + int blobSize, + boolean failIfAlreadyExists + ) throws IOException { assert blobSize <= getLargeBlobThresholdInBytes() : "large blob uploads should use the resumable upload method"; try { final Storage.BlobTargetOption[] targetOptions = failIfAlreadyExists ? new Storage.BlobTargetOption[] { Storage.BlobTargetOption.doesNotExist() } : new Storage.BlobTargetOption[0]; - SocketAccess.doPrivilegedVoidIOException(() -> client().create(blobInfo, buffer, offset, blobSize, targetOptions)); + SocketAccess.doPrivilegedVoidIOException(() -> client(purpose).create(blobInfo, buffer, offset, blobSize, targetOptions)); // We don't track this operation on the http layer as // we do with the GET/LIST operations since this operations // can trigger multiple underlying http requests but only one @@ -495,17 +527,18 @@ private void writeBlobMultipart(BlobInfo blobInfo, byte[] buffer, int offset, in /** * Deletes the given path and all its children. * + * @param purpose the operation purpose * @param pathStr Name of path to delete */ - DeleteResult deleteDirectory(String pathStr) throws IOException { + DeleteResult deleteDirectory(OperationPurpose purpose, String pathStr) throws IOException { return SocketAccess.doPrivilegedIOException(() -> { DeleteResult deleteResult = DeleteResult.ZERO; - Page page = client().list(bucketName, BlobListOption.prefix(pathStr)); + Page page = client(purpose).list(bucketName, BlobListOption.prefix(pathStr)); do { final AtomicLong blobsDeleted = new AtomicLong(0L); final AtomicLong bytesDeleted = new AtomicLong(0L); final Iterator blobs = page.getValues().iterator(); - deleteBlobs(new Iterator<>() { + deleteBlobs(purpose, new Iterator<>() { @Override public boolean hasNext() { return blobs.hasNext(); @@ -529,9 +562,10 @@ public String next() { /** * Deletes multiple blobs from the specific bucket using a batch request * + * @param purpose the operation purpose * @param blobNames names of the blobs to delete */ - void deleteBlobs(Iterator blobNames) throws IOException { + void deleteBlobs(OperationPurpose purpose, Iterator blobNames) throws IOException { if (blobNames.hasNext() == false) { return; } @@ -550,7 +584,7 @@ public BlobId next() { try { SocketAccess.doPrivilegedVoidIOException(() -> { final AtomicReference ioe = new AtomicReference<>(); - StorageBatch batch = client().batch(); + StorageBatch batch = client(purpose).batch(); int pendingDeletesInBatch = 0; while (blobIdsToDelete.hasNext()) { BlobId blob = blobIdsToDelete.next(); @@ -574,7 +608,7 @@ public void error(StorageException exception) { pendingDeletesInBatch++; if (pendingDeletesInBatch % MAX_DELETES_PER_BATCH == 0) { batch.submit(); - batch = client().batch(); + batch = client(purpose).batch(); pendingDeletesInBatch = 0; } } @@ -628,10 +662,10 @@ public void close() { } } - OptionalBytesReference getRegister(String blobName, String container, String key) throws IOException { + OptionalBytesReference getRegister(OperationPurpose purpose, String blobName, String container, String key) throws IOException { final var blobId = BlobId.of(bucketName, blobName); try ( - var readChannel = SocketAccess.doPrivilegedIOException(() -> client().reader(blobId)); + var readChannel = SocketAccess.doPrivilegedIOException(() -> client(purpose).reader(blobId)); var stream = new PrivilegedReadChannelStream(readChannel) ) { return OptionalBytesReference.of(BlobContainerUtils.getRegisterUsingConsistentRead(stream, container, key)); @@ -648,6 +682,7 @@ OptionalBytesReference getRegister(String blobName, String container, String key } OptionalBytesReference compareAndExchangeRegister( + OperationPurpose purpose, String blobName, String container, String key, @@ -657,7 +692,7 @@ OptionalBytesReference compareAndExchangeRegister( BlobContainerUtils.ensureValidRegisterContent(updated); final var blobId = BlobId.of(bucketName, blobName); - final var blob = SocketAccess.doPrivilegedIOException(() -> client().get(blobId)); + final var blob = SocketAccess.doPrivilegedIOException(() -> client(purpose).get(blobId)); final long generation; if (blob == null || blob.getGeneration() == null) { @@ -670,7 +705,7 @@ OptionalBytesReference compareAndExchangeRegister( try ( var stream = new PrivilegedReadChannelStream( SocketAccess.doPrivilegedIOException( - () -> client().reader(blobId, Storage.BlobSourceOption.generationMatch(generation)) + () -> client(purpose).reader(blobId, Storage.BlobSourceOption.generationMatch(generation)) ) ) ) { @@ -702,7 +737,7 @@ OptionalBytesReference compareAndExchangeRegister( while (true) { try { SocketAccess.doPrivilegedVoidIOException( - () -> client().create( + () -> client(purpose).create( blobInfo, bytesRef.bytes, bytesRef.offset, From 49c564059783180ecc4f19eb6631a6267d3f29a6 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 21 Feb 2025 10:11:23 +1100 Subject: [PATCH 2/3] Cache clients by purpose, add purpose to HttpStatsCollector --- .../gcs/GoogleCloudStorageBlobStore.java | 4 +- .../GoogleCloudStorageHttpStatsCollector.java | 6 ++- .../gcs/GoogleCloudStorageService.java | 46 +++++++++++++------ 3 files changed, 39 insertions(+), 17 deletions(-) diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 1dfb95fa4537d..e09c601050a8e 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -130,7 +130,7 @@ class GoogleCloudStorageBlobStore implements BlobStore { } private Storage client(OperationPurpose purpose) throws IOException { - return storageService.client(clientName, repositoryName, stats); + return storageService.client(clientName, repositoryName, purpose, stats); } @Override @@ -140,7 +140,7 @@ public BlobContainer blobContainer(BlobPath path) { @Override public void close() { - storageService.closeRepositoryClient(repositoryName); + storageService.closeRepositoryClients(repositoryName); } /** diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageHttpStatsCollector.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageHttpStatsCollector.java index e7e21360de545..9cd657f34c9fe 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageHttpStatsCollector.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageHttpStatsCollector.java @@ -14,6 +14,8 @@ import com.google.api.client.http.HttpResponse; import com.google.api.client.http.HttpResponseInterceptor; +import org.elasticsearch.common.blobstore.OperationPurpose; + import java.util.List; import java.util.Locale; import java.util.function.Consumer; @@ -43,10 +45,12 @@ final class GoogleCloudStorageHttpStatsCollector implements HttpResponseIntercep ); private final GoogleCloudStorageOperationsStats gcsOperationStats; + private final OperationPurpose operationPurpose; private final List trackers; - GoogleCloudStorageHttpStatsCollector(final GoogleCloudStorageOperationsStats gcsOperationStats) { + GoogleCloudStorageHttpStatsCollector(final GoogleCloudStorageOperationsStats gcsOperationStats, OperationPurpose operationPurpose) { this.gcsOperationStats = gcsOperationStats; + this.operationPurpose = operationPurpose; this.trackers = trackerFactories.stream() .map(trackerFactory -> trackerFactory.apply(gcsOperationStats.getTrackedBucket())) .toList(); diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java index 6a4eeeeabbb6f..befd67b5aa565 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java @@ -25,6 +25,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.blobstore.OperationPurpose; import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.SuppressForbidden; @@ -40,6 +41,7 @@ import java.net.URL; import java.security.KeyStore; import java.util.Map; +import java.util.stream.Collectors; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Collections.emptyMap; @@ -51,12 +53,15 @@ public class GoogleCloudStorageService { private volatile Map clientSettings = emptyMap(); + private record ClientKey(OperationPurpose purpose, String repositoryName) {} + /** * Dictionary of client instances. Client instances are built lazily from the - * latest settings. Each repository has its own client instance identified by - * the repository name. + * latest settings. Clients are cached by a composite OperationPurpose/repositoryName + * key. + * @see ClientKey */ - private volatile Map clientCache = emptyMap(); + private volatile Map clientCache = emptyMap(); /** * Refreshes the client settings and clears the client cache. Subsequent calls to @@ -79,20 +84,26 @@ public synchronized void refreshAndClearCache(Map format("creating GCS client with client_name [%s], endpoint [%s]", clientName, settings.getHost())); - final Storage storage = createClient(settings, stats); - clientCache = Maps.copyMapWithAddedEntry(clientCache, repositoryName, storage); + final Storage storage = createClient(settings, stats, operationPurpose); + clientCache = Maps.copyMapWithAddedEntry(clientCache, clientKey, storage); return storage; } } - synchronized void closeRepositoryClient(String repositoryName) { - clientCache = Maps.copyMapWithRemovedEntry(clientCache, repositoryName); + synchronized void closeRepositoryClients(String repositoryName) { + clientCache = clientCache.entrySet() + .stream() + .filter(entry -> entry.getKey().repositoryName().equals(repositoryName) == false) + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); } /** @@ -125,11 +139,15 @@ synchronized void closeRepositoryClient(String repositoryName) { * * @param gcsClientSettings client settings to use, including secure settings * @param stats the stats collector to use by the underlying SDK + * @param operationPurpose the purpose this client will be used for * @return a new client storage instance that can be used to manage objects * (blobs) */ - private Storage createClient(GoogleCloudStorageClientSettings gcsClientSettings, GoogleCloudStorageOperationsStats stats) - throws IOException { + private Storage createClient( + GoogleCloudStorageClientSettings gcsClientSettings, + GoogleCloudStorageOperationsStats stats, + OperationPurpose operationPurpose + ) throws IOException { final HttpTransport httpTransport = SocketAccess.doPrivilegedIOException(() -> { final NetHttpTransport.Builder builder = new NetHttpTransport.Builder(); // requires java.lang.RuntimePermission "setFactory" @@ -149,7 +167,7 @@ private Storage createClient(GoogleCloudStorageClientSettings gcsClientSettings, return builder.build(); }); - final GoogleCloudStorageHttpStatsCollector httpStatsCollector = new GoogleCloudStorageHttpStatsCollector(stats); + final GoogleCloudStorageHttpStatsCollector httpStatsCollector = new GoogleCloudStorageHttpStatsCollector(stats, operationPurpose); final HttpTransportOptions httpTransportOptions = new HttpTransportOptions( HttpTransportOptions.newBuilder() From 98c475192633544a7996ea627a1a44eaf167a39d Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 21 Feb 2025 10:17:22 +1100 Subject: [PATCH 3/3] Fix tests --- ...leCloudStorageBlobStoreContainerTests.java | 10 +++++- .../gcs/GoogleCloudStorageServiceTests.java | 32 +++++++++++++------ 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java index 81509c7f2183b..c3693d70c8469 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.blobstore.OperationPurpose; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.test.ESTestCase; @@ -79,7 +80,14 @@ public void testDeleteBlobsIgnoringIfNotExistsThrowsIOException() throws Excepti when(storage.batch()).thenReturn(batch); final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class); - when(storageService.client(any(String.class), any(String.class), any(GoogleCloudStorageOperationsStats.class))).thenReturn(storage); + when( + storageService.client( + any(String.class), + any(String.class), + any(OperationPurpose.class), + any(GoogleCloudStorageOperationsStats.class) + ) + ).thenReturn(storage); try ( BlobStore store = new GoogleCloudStorageBlobStore( diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java index c6f6b0315f595..4dbbb18cdab95 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java @@ -36,6 +36,7 @@ import java.util.Locale; import java.util.UUID; +import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -82,13 +83,13 @@ void notifyProxyIsSet(Proxy p) { GoogleCloudStorageOperationsStats statsCollector = new GoogleCloudStorageOperationsStats("bucket"); final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> service.client("another_client", "repo", statsCollector) + () -> service.client("another_client", "repo", randomPurpose(), statsCollector) ); assertThat(e.getMessage(), Matchers.startsWith("Unknown client name")); assertSettingDeprecationsAndWarnings( new Setting[] { GoogleCloudStorageClientSettings.APPLICATION_NAME_SETTING.getConcreteSettingForNamespace(clientName) } ); - final Storage storage = service.client(clientName, "repo", statsCollector); + final Storage storage = service.client(clientName, "repo", randomPurpose(), statsCollector); assertThat(storage.getOptions().getApplicationName(), Matchers.containsString(applicationName)); assertThat(storage.getOptions().getHost(), Matchers.is(endpoint)); assertThat(storage.getOptions().getProjectId(), Matchers.is(projectIdName)); @@ -116,14 +117,14 @@ public void testReinitClientSettings() throws Exception { try (GoogleCloudStoragePlugin plugin = new GoogleCloudStoragePlugin(settings1)) { final GoogleCloudStorageService storageService = plugin.storageService; GoogleCloudStorageOperationsStats statsCollector = new GoogleCloudStorageOperationsStats("bucket"); - final Storage client11 = storageService.client("gcs1", "repo1", statsCollector); + final Storage client11 = storageService.client("gcs1", "repo1", randomPurpose(), statsCollector); assertThat(client11.getOptions().getProjectId(), equalTo("project_gcs11")); - final Storage client12 = storageService.client("gcs2", "repo2", statsCollector); + final Storage client12 = storageService.client("gcs2", "repo2", randomPurpose(), statsCollector); assertThat(client12.getOptions().getProjectId(), equalTo("project_gcs12")); // client 3 is missing final IllegalArgumentException e1 = expectThrows( IllegalArgumentException.class, - () -> storageService.client("gcs3", "repo3", statsCollector) + () -> storageService.client("gcs3", "repo3", randomPurpose(), statsCollector) ); assertThat(e1.getMessage(), containsString("Unknown client name [gcs3].")); // update client settings @@ -131,18 +132,18 @@ public void testReinitClientSettings() throws Exception { // old client 1 not changed assertThat(client11.getOptions().getProjectId(), equalTo("project_gcs11")); // new client 1 is changed - final Storage client21 = storageService.client("gcs1", "repo1", statsCollector); + final Storage client21 = storageService.client("gcs1", "repo1", randomPurpose(), statsCollector); assertThat(client21.getOptions().getProjectId(), equalTo("project_gcs21")); // old client 2 not changed assertThat(client12.getOptions().getProjectId(), equalTo("project_gcs12")); // new client2 is gone final IllegalArgumentException e2 = expectThrows( IllegalArgumentException.class, - () -> storageService.client("gcs2", "repo2", statsCollector) + () -> storageService.client("gcs2", "repo2", randomPurpose(), statsCollector) ); assertThat(e2.getMessage(), containsString("Unknown client name [gcs2].")); // client 3 emerged - final Storage client23 = storageService.client("gcs3", "repo3", statsCollector); + final Storage client23 = storageService.client("gcs3", "repo3", randomPurpose(), statsCollector); assertThat(client23.getOptions().getProjectId(), equalTo("project_gcs23")); } } @@ -154,11 +155,22 @@ public void testClientsAreNotSharedAcrossRepositories() throws Exception { try (GoogleCloudStoragePlugin plugin = new GoogleCloudStoragePlugin(settings)) { final GoogleCloudStorageService storageService = plugin.storageService; - final Storage repo1Client = storageService.client("gcs1", "repo1", new GoogleCloudStorageOperationsStats("bucket")); - final Storage repo2Client = storageService.client("gcs1", "repo2", new GoogleCloudStorageOperationsStats("bucket")); + final Storage repo1Client = storageService.client( + "gcs1", + "repo1", + randomPurpose(), + new GoogleCloudStorageOperationsStats("bucket") + ); + final Storage repo2Client = storageService.client( + "gcs1", + "repo2", + randomPurpose(), + new GoogleCloudStorageOperationsStats("bucket") + ); final Storage repo1ClientSecondInstance = storageService.client( "gcs1", "repo1", + randomPurpose(), new GoogleCloudStorageOperationsStats("bucket") );