Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/122991.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 122991
summary: "GCS blob store: add `OperationPurpose/Operation` stats counters"
area: Snapshot/Restore
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BASE_PATH;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BUCKET;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.CLIENT_NAME;
Expand Down Expand Up @@ -212,8 +213,8 @@ public TestGoogleCloudStoragePlugin(Settings settings) {
}

@Override
protected GoogleCloudStorageService createStorageService() {
return new GoogleCloudStorageService() {
protected GoogleCloudStorageService createStorageService(Settings settings) {
return new GoogleCloudStorageService(settings) {
@Override
StorageOptions createStorageOptions(
final GoogleCloudStorageClientSettings gcsClientSettings,
Expand Down Expand Up @@ -346,19 +347,17 @@ private static class GoogleCloudStorageStatsCollectorHttpHandler extends HttpSta

@Override
public void maybeTrack(final String request, Headers requestHeaders) {
if (Regex.simpleMatch("GET /storage/v1/b/*/o/*", request)) {
trackRequest("GetObject");
if (Regex.simpleMatch("GET */storage/v1/b/*/o/*", request)) {
trackRequest(Operation.GET_OBJECT.key());
} else if (Regex.simpleMatch("GET /storage/v1/b/*/o*", request)) {
trackRequest("ListObjects");
} else if (Regex.simpleMatch("GET /download/storage/v1/b/*", request)) {
trackRequest("GetObject");
trackRequest(Operation.LIST_OBJECTS.key());
} else if (Regex.simpleMatch("PUT /upload/storage/v1/b/*uploadType=resumable*", request) && isLastPart(requestHeaders)) {
// Resumable uploads are billed as a single operation, that's the reason we're tracking
// the request only when it's the last part.
// See https://cloud.google.com/storage/docs/resumable-uploads#introduction
trackRequest("InsertObject");
trackRequest(Operation.INSERT_OBJECT.key());
} else if (Regex.simpleMatch("POST /upload/storage/v1/b/*uploadType=multipart*", request)) {
trackRequest("InsertObject");
trackRequest(Operation.INSERT_OBJECT.key());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.core.Streams;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation;
import org.elasticsearch.rest.RestStatus;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -73,6 +74,11 @@

class GoogleCloudStorageBlobStore implements BlobStore {

/**
* see com.google.cloud.BaseWriteChannel#DEFAULT_CHUNK_SIZE
*/
static final int SDK_DEFAULT_CHUNK_SIZE = 60 * 256 * 1024;

private static final Logger logger = LogManager.getLogger(GoogleCloudStorageBlobStore.class);

// The recommended maximum size of a blob that should be uploaded in a single
Expand Down Expand Up @@ -124,7 +130,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
this.repositoryName = repositoryName;
this.storageService = storageService;
this.bigArrays = bigArrays;
this.stats = new GoogleCloudStorageOperationsStats(bucketName);
this.stats = new GoogleCloudStorageOperationsStats(bucketName, storageService.isStateless());
this.bufferSize = bufferSize;
this.casBackoffPolicy = casBackoffPolicy;
}
Expand Down Expand Up @@ -378,9 +384,7 @@ private void initResumableStream() throws IOException {
public void write(byte[] b, int off, int len) throws IOException {
int written = 0;
while (written < len) {
// at most write the default chunk size in one go to prevent allocating huge buffers in the SDK
// see com.google.cloud.BaseWriteChannel#DEFAULT_CHUNK_SIZE
final int toWrite = Math.min(len - written, 60 * 256 * 1024);
final int toWrite = Math.min(len - written, SDK_DEFAULT_CHUNK_SIZE);
out.write(b, off + written, toWrite);
written += toWrite;
}
Expand All @@ -393,7 +397,7 @@ public void write(byte[] b, int off, int len) throws IOException {
final WritableByteChannel writeChannel = channelRef.get();
if (writeChannel != null) {
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
stats.trackPutOperation();
stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT);
} else {
writeBlob(purpose, blobName, buffer.bytes(), failIfAlreadyExists);
}
Expand Down Expand Up @@ -463,7 +467,7 @@ private void writeBlobResumable(
// we do with the GET/LIST operations since this operations
// can trigger multiple underlying http requests but only one
// operation is billed.
stats.trackPutOperation();
stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT);
return;
} catch (final StorageException se) {
final int errorCode = se.getCode();
Expand Down Expand Up @@ -515,7 +519,7 @@ private void writeBlobMultipart(
// we do with the GET/LIST operations since this operations
// can trigger multiple underlying http requests but only one
// operation is billed.
stats.trackPostOperation();
stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT);
} catch (final StorageException se) {
if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
Expand Down Expand Up @@ -634,7 +638,7 @@ private static String buildKey(String keyPath, String s) {

@Override
public Map<String, BlobStoreActionStats> stats() {
return stats.toMap();
return stats.tracker().toMap();
}

private static final class WritableBlobChannel implements WritableByteChannel {
Expand Down Expand Up @@ -745,7 +749,7 @@ OptionalBytesReference compareAndExchangeRegister(
Storage.BlobTargetOption.generationMatch()
)
);
stats.trackPostOperation();
stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT);
return OptionalBytesReference.of(expected);
} catch (Exception e) {
final var serviceException = unwrapServiceException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,115 +9,75 @@

package org.elasticsearch.repositories.gcs;

import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseInterceptor;

import org.elasticsearch.common.blobstore.OperationPurpose;

import java.util.List;
import java.util.Locale;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Pattern;

import static java.lang.String.format;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.StatsTracker;

final class GoogleCloudStorageHttpStatsCollector implements HttpResponseInterceptor {
// The specification for the current API (v1) endpoints can be found at:
// https://cloud.google.com/storage/docs/json_api/v1
private static final List<Function<String, HttpRequestTracker>> trackerFactories = List.of(
(bucket) -> HttpRequestTracker.get(
format(Locale.ROOT, "/download/storage/v1/b/%s/o/.+", bucket),
GoogleCloudStorageOperationsStats::trackGetOperation
),

(bucket) -> HttpRequestTracker.get(
format(Locale.ROOT, "/storage/v1/b/%s/o/.+", bucket),
GoogleCloudStorageOperationsStats::trackGetOperation
),

(bucket) -> HttpRequestTracker.get(
format(Locale.ROOT, "/storage/v1/b/%s/o", bucket),
GoogleCloudStorageOperationsStats::trackListOperation
)
);
private final StatsTracker stats;
private final OperationPurpose purpose;
private final Pattern getObjPattern;
private final Pattern insertObjPattern;
private final Pattern listObjPattern;

GoogleCloudStorageHttpStatsCollector(final GoogleCloudStorageOperationsStats stats, OperationPurpose purpose) {
this.stats = stats.tracker();
this.purpose = purpose;
var bucket = stats.bucketName();

// The specification for the current API (v1) endpoints can be found at:
// https://cloud.google.com/storage/docs/json_api/v1
this.getObjPattern = Pattern.compile("(/download)?/storage/v1/b/" + bucket + "/o/.+");
this.insertObjPattern = Pattern.compile("(/upload)?/storage/v1/b/" + bucket + "/o");
this.listObjPattern = Pattern.compile("/storage/v1/b/" + bucket + "/o");
}

private final GoogleCloudStorageOperationsStats gcsOperationStats;
private final OperationPurpose operationPurpose;
private final List<HttpRequestTracker> trackers;
private void trackRequest(Operation operation) {
stats.trackRequest(purpose, operation);
}

GoogleCloudStorageHttpStatsCollector(final GoogleCloudStorageOperationsStats gcsOperationStats, OperationPurpose operationPurpose) {
this.gcsOperationStats = gcsOperationStats;
this.operationPurpose = operationPurpose;
this.trackers = trackerFactories.stream()
.map(trackerFactory -> trackerFactory.apply(gcsOperationStats.getTrackedBucket()))
.toList();
private void trackRequestAndOperation(Operation operation) {
stats.trackOperationAndRequest(purpose, operation);
}

@Override
public void interceptResponse(final HttpResponse response) {
// TODO keep track of unsuccessful requests in different entries
if (response.isSuccessStatusCode() == false) return;

final HttpRequest request = response.getRequest();
for (HttpRequestTracker tracker : trackers) {
if (tracker.track(request, gcsOperationStats)) {
return;
}
}
}

/**
* Http request tracker that allows to track certain HTTP requests based on the following criteria:
* <ul>
* <li>The HTTP request method</li>
* <li>An URI path regex expression</li>
* </ul>
*
* The requests that match the previous criteria are tracked using the {@code statsTracker} function.
*/
private static final class HttpRequestTracker {
private final String method;
private final Pattern pathPattern;
private final Consumer<GoogleCloudStorageOperationsStats> statsTracker;

private HttpRequestTracker(
final String method,
final String pathPattern,
final Consumer<GoogleCloudStorageOperationsStats> statsTracker
) {
this.method = method;
this.pathPattern = Pattern.compile(pathPattern);
this.statsTracker = statsTracker;
var respCode = response.getStatusCode();
// Some of the intermediate and error codes are still counted as "good" requests
if (((respCode >= 200 && respCode < 300) || respCode == 308 || respCode == 404) == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: for the first condition we could use org.elasticsearch.rest.RestStatus#isSuccessful, that logic was repeated in lots of places. Unless you think that obscures it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's more readable in this form since multiple codes are handled. I'm more concerned about codes I dont handle here, but we would like to.

return;
}

private static HttpRequestTracker get(final String pathPattern, final Consumer<GoogleCloudStorageOperationsStats> statsConsumer) {
return new HttpRequestTracker("GET", pathPattern, statsConsumer);
}

/**
* Tracks the provided http request if it matches the criteria defined by this tracker.
*
* @param httpRequest the http request to be tracked
* @param stats the operation tracker
*
* @return {@code true} if the http request was tracked, {@code false} otherwise.
*/
private boolean track(final HttpRequest httpRequest, final GoogleCloudStorageOperationsStats stats) {
if (matchesCriteria(httpRequest) == false) return false;

statsTracker.accept(stats);
return true;
}

private boolean matchesCriteria(final HttpRequest httpRequest) {
return method.equalsIgnoreCase(httpRequest.getRequestMethod()) && pathMatches(httpRequest.getUrl());
}

private boolean pathMatches(final GenericUrl url) {
return pathPattern.matcher(url.getRawPath()).matches();
var request = response.getRequest();

var path = request.getUrl().getRawPath();
var ignored = false;
switch (request.getRequestMethod()) {
case "GET" -> {
// https://cloud.google.com/storage/docs/json_api/v1/objects/get
if (getObjPattern.matcher(path).matches()) {
trackRequestAndOperation(Operation.GET_OBJECT);
} else if (listObjPattern.matcher(path).matches()) {
trackRequestAndOperation(Operation.LIST_OBJECTS);
} else {
ignored = true;
}
}
case "POST", "PUT" -> {
// https://cloud.google.com/storage/docs/json_api/v1/objects/insert
if (insertObjPattern.matcher(path).matches()) {
trackRequest(Operation.INSERT_OBJECT);
} else {
ignored = true;
}
}
default -> ignored = true;
}
assert ignored == false : "must handle response: " + request.getRequestMethod() + " " + path;
}
}
Loading