Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/125452.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 125452
summary: Add GCS telemtry with `ThreadLocal`
area: Snapshot/Restore
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BASE_PATH;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BUCKET;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.CLIENT_NAME;
Expand Down Expand Up @@ -213,8 +212,8 @@ public TestGoogleCloudStoragePlugin(Settings settings) {
}

@Override
protected GoogleCloudStorageService createStorageService(Settings settings) {
return new GoogleCloudStorageService(settings) {
protected GoogleCloudStorageService createStorageService() {
return new GoogleCloudStorageService() {
@Override
StorageOptions createStorageOptions(
final GoogleCloudStorageClientSettings gcsClientSettings,
Expand Down Expand Up @@ -260,7 +259,8 @@ public Map<String, Repository.Factory> getRepositories(
this.storageService,
clusterService,
bigArrays,
recoverySettings
recoverySettings,
repositoriesMetrics
) {
@Override
protected GoogleCloudStorageBlobStore createBlobStore() {
Expand All @@ -271,7 +271,8 @@ protected GoogleCloudStorageBlobStore createBlobStore() {
storageService,
bigArrays,
randomIntBetween(1, 8) * 1024,
BackoffPolicy.noBackoff()
BackoffPolicy.noBackoff(),
repositoriesMetrics
) {
@Override
long getLargeBlobThresholdInBytes() {
Expand Down Expand Up @@ -348,16 +349,16 @@ private static class GoogleCloudStorageStatsCollectorHttpHandler extends HttpSta
@Override
public void maybeTrack(final String request, Headers requestHeaders) {
if (Regex.simpleMatch("GET */storage/v1/b/*/o/*", request)) {
trackRequest(Operation.GET_OBJECT.key());
trackRequest(StorageOperation.GET.key());
} else if (Regex.simpleMatch("GET /storage/v1/b/*/o*", request)) {
trackRequest(Operation.LIST_OBJECTS.key());
trackRequest(StorageOperation.LIST.key());
} else if (Regex.simpleMatch("PUT /upload/storage/v1/b/*uploadType=resumable*", request) && isLastPart(requestHeaders)) {
// Resumable uploads are billed as a single operation, that's the reason we're tracking
// the request only when it's the last part.
// See https://cloud.google.com/storage/docs/resumable-uploads#introduction
trackRequest(Operation.INSERT_OBJECT.key());
trackRequest(StorageOperation.INSERT.key());
} else if (Regex.simpleMatch("POST /upload/storage/v1/b/*uploadType=multipart*", request)) {
trackRequest(Operation.INSERT_OBJECT.key());
trackRequest(StorageOperation.INSERT.key());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

package org.elasticsearch.repositories.gcs;

import com.google.api.gax.paging.Page;
import com.google.cloud.BaseServiceException;
import com.google.cloud.BatchResult;
import com.google.cloud.WriteChannel;
Expand Down Expand Up @@ -44,7 +43,7 @@
import org.elasticsearch.core.Streams;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation;
import org.elasticsearch.repositories.RepositoriesMetrics;
import org.elasticsearch.rest.RestStatus;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -111,7 +110,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
private final String clientName;
private final String repositoryName;
private final GoogleCloudStorageService storageService;
private final GoogleCloudStorageOperationsStats stats;
private final RepositoryStatsCollector statsCollector;
private final int bufferSize;
private final BigArrays bigArrays;
private final BackoffPolicy casBackoffPolicy;
Expand All @@ -123,20 +122,21 @@ class GoogleCloudStorageBlobStore implements BlobStore {
GoogleCloudStorageService storageService,
BigArrays bigArrays,
int bufferSize,
BackoffPolicy casBackoffPolicy
BackoffPolicy casBackoffPolicy,
RepositoriesMetrics repositoriesMetrics
) {
this.bucketName = bucketName;
this.clientName = clientName;
this.repositoryName = repositoryName;
this.storageService = storageService;
this.bigArrays = bigArrays;
this.stats = new GoogleCloudStorageOperationsStats(bucketName, storageService.isStateless());
this.statsCollector = new RepositoryStatsCollector(repositoryName, repositoriesMetrics);
this.bufferSize = bufferSize;
this.casBackoffPolicy = casBackoffPolicy;
}

private Storage client(OperationPurpose purpose) throws IOException {
return storageService.client(clientName, repositoryName, purpose, stats);
private MeteredStorage client() throws IOException {
return storageService.client(clientName, repositoryName, statsCollector);
}

@Override
Expand Down Expand Up @@ -174,7 +174,7 @@ Map<String, BlobMetadata> listBlobsByPrefix(OperationPurpose purpose, String pat
final String pathPrefix = buildKey(path, prefix);
final Map<String, BlobMetadata> mapBuilder = new HashMap<>();
SocketAccess.doPrivilegedVoidIOException(
() -> client(purpose).list(bucketName, BlobListOption.currentDirectory(), BlobListOption.prefix(pathPrefix))
() -> client().meteredList(purpose, bucketName, BlobListOption.currentDirectory(), BlobListOption.prefix(pathPrefix))
.iterateAll()
.forEach(blob -> {
assert blob.getName().startsWith(path);
Expand All @@ -191,7 +191,7 @@ Map<String, BlobContainer> listChildren(OperationPurpose purpose, BlobPath path)
final String pathStr = path.buildAsString();
final Map<String, BlobContainer> mapBuilder = new HashMap<>();
SocketAccess.doPrivilegedVoidIOException(
() -> client(purpose).list(bucketName, BlobListOption.currentDirectory(), BlobListOption.prefix(pathStr))
() -> client().meteredList(purpose, bucketName, BlobListOption.currentDirectory(), BlobListOption.prefix(pathStr))
.iterateAll()
.forEach(blob -> {
if (blob.isDirectory()) {
Expand All @@ -217,7 +217,7 @@ Map<String, BlobContainer> listChildren(OperationPurpose purpose, BlobPath path)
*/
boolean blobExists(OperationPurpose purpose, String blobName) throws IOException {
final BlobId blobId = BlobId.of(bucketName, blobName);
final Blob blob = SocketAccess.doPrivilegedIOException(() -> client(purpose).get(blobId));
final Blob blob = SocketAccess.doPrivilegedIOException(() -> client().meteredGet(purpose, blobId));
return blob != null;
}

Expand All @@ -229,7 +229,7 @@ boolean blobExists(OperationPurpose purpose, String blobName) throws IOException
* @return the InputStream used to read the blob's content
*/
InputStream readBlob(OperationPurpose purpose, String blobName) throws IOException {
return new GoogleCloudStorageRetryingInputStream(client(purpose), BlobId.of(bucketName, blobName));
return new GoogleCloudStorageRetryingInputStream(purpose, client(), BlobId.of(bucketName, blobName));
}

/**
Expand All @@ -252,7 +252,8 @@ InputStream readBlob(OperationPurpose purpose, String blobName, long position, l
return new ByteArrayInputStream(new byte[0]);
} else {
return new GoogleCloudStorageRetryingInputStream(
client(purpose),
purpose,
client(),
BlobId.of(bucketName, blobName),
position,
Math.addExact(position, length - 1)
Expand Down Expand Up @@ -375,8 +376,8 @@ public void write(byte[] b, int off, int len) throws IOException {
}

private void initResumableStream() throws IOException {
final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(
() -> client(purpose).writer(blobInfo, writeOptions)
final var writeChannel = SocketAccess.doPrivilegedIOException(
() -> client().meteredWriter(purpose, blobInfo, writeOptions)
);
channelRef.set(writeChannel);
resumableStream = new FilterOutputStream(Channels.newOutputStream(new WritableBlobChannel(writeChannel))) {
Expand All @@ -397,7 +398,6 @@ public void write(byte[] b, int off, int len) throws IOException {
final WritableByteChannel writeChannel = channelRef.get();
if (writeChannel != null) {
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT);
} else {
writeBlob(purpose, blobName, buffer.bytes(), failIfAlreadyExists);
}
Expand Down Expand Up @@ -455,7 +455,7 @@ private void writeBlobResumable(
for (int retry = 0; retry < 3; ++retry) {
try {
final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(
() -> client(purpose).writer(blobInfo, writeOptions)
() -> client().meteredWriter(purpose, blobInfo, writeOptions)
);
/*
* It is not enough to wrap the call to Streams#copy, we have to wrap the privileged calls too; this is because Streams#copy
Expand All @@ -467,7 +467,6 @@ private void writeBlobResumable(
// we do with the GET/LIST operations since this operations
// can trigger multiple underlying http requests but only one
// operation is billed.
stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT);
return;
} catch (final StorageException se) {
final int errorCode = se.getCode();
Expand Down Expand Up @@ -514,12 +513,9 @@ private void writeBlobMultipart(
final Storage.BlobTargetOption[] targetOptions = failIfAlreadyExists
? new Storage.BlobTargetOption[] { Storage.BlobTargetOption.doesNotExist() }
: new Storage.BlobTargetOption[0];
SocketAccess.doPrivilegedVoidIOException(() -> client(purpose).create(blobInfo, buffer, offset, blobSize, targetOptions));
// We don't track this operation on the http layer as
// we do with the GET/LIST operations since this operations
// can trigger multiple underlying http requests but only one
// operation is billed.
stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT);
SocketAccess.doPrivilegedVoidIOException(
() -> client().meteredCreate(purpose, blobInfo, buffer, offset, blobSize, targetOptions)
);
} catch (final StorageException se) {
if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
Expand All @@ -537,11 +533,11 @@ private void writeBlobMultipart(
DeleteResult deleteDirectory(OperationPurpose purpose, String pathStr) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> {
DeleteResult deleteResult = DeleteResult.ZERO;
Page<Blob> page = client(purpose).list(bucketName, BlobListOption.prefix(pathStr));
MeteredStorage.MeteredBlobPage meteredPage = client().meteredList(purpose, bucketName, BlobListOption.prefix(pathStr));
do {
final AtomicLong blobsDeleted = new AtomicLong(0L);
final AtomicLong bytesDeleted = new AtomicLong(0L);
final Iterator<Blob> blobs = page.getValues().iterator();
var blobs = meteredPage.getValues().iterator();
deleteBlobs(purpose, new Iterator<>() {
@Override
public boolean hasNext() {
Expand All @@ -557,8 +553,8 @@ public String next() {
}
});
deleteResult = deleteResult.add(blobsDeleted.get(), bytesDeleted.get());
page = page.getNextPage();
} while (page != null);
meteredPage = meteredPage.getNextPage();
} while (meteredPage != null);
return deleteResult;
});
}
Expand Down Expand Up @@ -588,7 +584,7 @@ public BlobId next() {
try {
SocketAccess.doPrivilegedVoidIOException(() -> {
final AtomicReference<StorageException> ioe = new AtomicReference<>();
StorageBatch batch = client(purpose).batch();
StorageBatch batch = client().batch();
int pendingDeletesInBatch = 0;
while (blobIdsToDelete.hasNext()) {
BlobId blob = blobIdsToDelete.next();
Expand All @@ -612,7 +608,7 @@ public void error(StorageException exception) {
pendingDeletesInBatch++;
if (pendingDeletesInBatch % MAX_DELETES_PER_BATCH == 0) {
batch.submit();
batch = client(purpose).batch();
batch = client().batch();
pendingDeletesInBatch = 0;
}
}
Expand All @@ -638,7 +634,7 @@ private static String buildKey(String keyPath, String s) {

@Override
public Map<String, BlobStoreActionStats> stats() {
return stats.tracker().toMap();
return statsCollector.operationsStats();
}

private static final class WritableBlobChannel implements WritableByteChannel {
Expand Down Expand Up @@ -669,8 +665,8 @@ public void close() {
OptionalBytesReference getRegister(OperationPurpose purpose, String blobName, String container, String key) throws IOException {
final var blobId = BlobId.of(bucketName, blobName);
try (
var readChannel = SocketAccess.doPrivilegedIOException(() -> client(purpose).reader(blobId));
var stream = new PrivilegedReadChannelStream(readChannel)
var meteredReadChannel = SocketAccess.doPrivilegedIOException(() -> client().meteredReader(purpose, blobId));
var stream = new PrivilegedReadChannelStream(meteredReadChannel)
) {
return OptionalBytesReference.of(BlobContainerUtils.getRegisterUsingConsistentRead(stream, container, key));
} catch (Exception e) {
Expand All @@ -696,7 +692,7 @@ OptionalBytesReference compareAndExchangeRegister(
BlobContainerUtils.ensureValidRegisterContent(updated);

final var blobId = BlobId.of(bucketName, blobName);
final var blob = SocketAccess.doPrivilegedIOException(() -> client(purpose).get(blobId));
final var blob = SocketAccess.doPrivilegedIOException(() -> client().meteredGet(purpose, blobId));
final long generation;

if (blob == null || blob.getGeneration() == null) {
Expand All @@ -709,7 +705,7 @@ OptionalBytesReference compareAndExchangeRegister(
try (
var stream = new PrivilegedReadChannelStream(
SocketAccess.doPrivilegedIOException(
() -> client(purpose).reader(blobId, Storage.BlobSourceOption.generationMatch(generation))
() -> client().meteredReader(purpose, blobId, Storage.BlobSourceOption.generationMatch(generation))
)
)
) {
Expand Down Expand Up @@ -741,15 +737,15 @@ OptionalBytesReference compareAndExchangeRegister(
while (true) {
try {
SocketAccess.doPrivilegedVoidIOException(
() -> client(purpose).create(
() -> client().meteredCreate(
purpose,
blobInfo,
bytesRef.bytes,
bytesRef.offset,
bytesRef.length,
Storage.BlobTargetOption.generationMatch()
)
);
stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT);
return OptionalBytesReference.of(expected);
} catch (Exception e) {
final var serviceException = unwrapServiceException(e);
Expand Down
Loading
Loading