Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.core.Streams;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation;
import org.elasticsearch.rest.RestStatus;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -73,6 +74,11 @@

class GoogleCloudStorageBlobStore implements BlobStore {

/**
* see com.google.cloud.BaseWriteChannel#DEFAULT_CHUNK_SIZE
*/
static final int SDK_DEFAULT_CHUNK_SIZE = 60 * 256 * 1024;

private static final Logger logger = LogManager.getLogger(GoogleCloudStorageBlobStore.class);

// The recommended maximum size of a blob that should be uploaded in a single
Expand Down Expand Up @@ -380,7 +386,7 @@ public void write(byte[] b, int off, int len) throws IOException {
while (written < len) {
// at most write the default chunk size in one go to prevent allocating huge buffers in the SDK
// see com.google.cloud.BaseWriteChannel#DEFAULT_CHUNK_SIZE
final int toWrite = Math.min(len - written, 60 * 256 * 1024);
final int toWrite = Math.min(len - written, SDK_DEFAULT_CHUNK_SIZE);
out.write(b, off + written, toWrite);
written += toWrite;
}
Expand All @@ -393,7 +399,7 @@ public void write(byte[] b, int off, int len) throws IOException {
final WritableByteChannel writeChannel = channelRef.get();
if (writeChannel != null) {
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
stats.trackPutOperation();
stats.trackOperation(purpose, Operation.RESUMABLE_UPLOAD);
} else {
writeBlob(purpose, blobName, buffer.bytes(), failIfAlreadyExists);
}
Expand Down Expand Up @@ -463,7 +469,7 @@ private void writeBlobResumable(
// we do with the GET/LIST operations since this operations
// can trigger multiple underlying http requests but only one
// operation is billed.
stats.trackPutOperation();
stats.trackOperation(purpose, Operation.RESUMABLE_UPLOAD);
return;
} catch (final StorageException se) {
final int errorCode = se.getCode();
Expand Down Expand Up @@ -515,7 +521,7 @@ private void writeBlobMultipart(
// we do with the GET/LIST operations since this operations
// can trigger multiple underlying http requests but only one
// operation is billed.
stats.trackPostOperation();
stats.trackOperation(purpose, Operation.MULTIPART_UPLOAD);
} catch (final StorageException se) {
if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
Expand Down Expand Up @@ -745,7 +751,7 @@ OptionalBytesReference compareAndExchangeRegister(
Storage.BlobTargetOption.generationMatch()
)
);
stats.trackPostOperation();
stats.trackOperation(purpose, Operation.MULTIPART_UPLOAD);
return OptionalBytesReference.of(expected);
} catch (Exception e) {
final var serviceException = unwrapServiceException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,115 +9,82 @@

package org.elasticsearch.repositories.gcs;

import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseInterceptor;

import org.elasticsearch.common.blobstore.OperationPurpose;

import java.util.List;
import java.util.Locale;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Pattern;

import static java.lang.String.format;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation;

final class GoogleCloudStorageHttpStatsCollector implements HttpResponseInterceptor {
// The specification for the current API (v1) endpoints can be found at:
// https://cloud.google.com/storage/docs/json_api/v1
private static final List<Function<String, HttpRequestTracker>> trackerFactories = List.of(
(bucket) -> HttpRequestTracker.get(
format(Locale.ROOT, "/download/storage/v1/b/%s/o/.+", bucket),
GoogleCloudStorageOperationsStats::trackGetOperation
),

(bucket) -> HttpRequestTracker.get(
format(Locale.ROOT, "/storage/v1/b/%s/o/.+", bucket),
GoogleCloudStorageOperationsStats::trackGetOperation
),

(bucket) -> HttpRequestTracker.get(
format(Locale.ROOT, "/storage/v1/b/%s/o", bucket),
GoogleCloudStorageOperationsStats::trackListOperation
)
);
private final GoogleCloudStorageOperationsStats stats;
private final OperationPurpose purpose;
private final Pattern getObjPattern;
private final Pattern insertObPattern;
private final Pattern listObjPattern;

GoogleCloudStorageHttpStatsCollector(final GoogleCloudStorageOperationsStats stats, OperationPurpose purpose) {
this.stats = stats;
this.purpose = purpose;
var bucket = stats.bucketName();

// The specification for the current API (v1) endpoints can be found at:
// https://cloud.google.com/storage/docs/json_api/v1
this.getObjPattern = Pattern.compile("(/download)?/storage/v1/b/" + bucket + "/o/.+");
this.insertObPattern = Pattern.compile("(/upload)?/storage/v1/b/" + bucket + "/o");
this.listObjPattern = Pattern.compile("/storage/v1/b/" + bucket + "/o");
}

private final GoogleCloudStorageOperationsStats gcsOperationStats;
private final OperationPurpose operationPurpose;
private final List<HttpRequestTracker> trackers;
private void trackRequest(Operation operation) {
stats.trackRequest(purpose, operation);
}

GoogleCloudStorageHttpStatsCollector(final GoogleCloudStorageOperationsStats gcsOperationStats, OperationPurpose operationPurpose) {
this.gcsOperationStats = gcsOperationStats;
this.operationPurpose = operationPurpose;
this.trackers = trackerFactories.stream()
.map(trackerFactory -> trackerFactory.apply(gcsOperationStats.getTrackedBucket()))
.toList();
private void trackRequestAndOperation(Operation operation) {
stats.trackRequestAndOperation(purpose, operation);
}

@Override
public void interceptResponse(final HttpResponse response) {
// TODO keep track of unsuccessful requests in different entries
if (response.isSuccessStatusCode() == false) return;

final HttpRequest request = response.getRequest();
for (HttpRequestTracker tracker : trackers) {
if (tracker.track(request, gcsOperationStats)) {
return;
var request = response.getRequest();
var path = request.getUrl().getRawPath();
switch (request.getRequestMethod()) {
case "GET" -> {
// https://cloud.google.com/storage/docs/json_api/v1/objects/get
if (getObjPattern.matcher(path).matches()) {
// Retrieves object metadata. When alt=media is included as a query parameter, retrieves object data.
if (request.getUrl().getFirst("alt").equals("media")) {
trackRequestAndOperation(Operation.GET_OBJECT);
} else {
trackRequestAndOperation(Operation.GET_METADATA);
}
} else if (listObjPattern.matcher(path).matches()) {
trackRequestAndOperation(Operation.LIST_OBJECTS);
}
// ignore other get requests
}
case "POST", "PUT" -> {
// https://cloud.google.com/storage/docs/json_api/v1/objects/insert
if (insertObPattern.matcher(path).matches()) {
var obj = request.getUrl().getFirst("uploadType");
if (obj instanceof String uploadType) {
switch (uploadType) {
// We dont track insert operations here, only requests. The reason is billing impact.
// Any insert, including multipart or resumable parts, are counted as one operation.
case "multipart" -> trackRequest(Operation.MULTIPART_UPLOAD);
case "resumable" -> trackRequest(Operation.RESUMABLE_UPLOAD);
default -> {
// ignore "media" - Data-only upload. Upload the object data only, without any metadata.
}
}
}
}
// ignore other post requests
}
default -> {
// ignore other http methods
}
}
}

/**
* Http request tracker that allows to track certain HTTP requests based on the following criteria:
* <ul>
* <li>The HTTP request method</li>
* <li>An URI path regex expression</li>
* </ul>
*
* The requests that match the previous criteria are tracked using the {@code statsTracker} function.
*/
private static final class HttpRequestTracker {
private final String method;
private final Pattern pathPattern;
private final Consumer<GoogleCloudStorageOperationsStats> statsTracker;

private HttpRequestTracker(
final String method,
final String pathPattern,
final Consumer<GoogleCloudStorageOperationsStats> statsTracker
) {
this.method = method;
this.pathPattern = Pattern.compile(pathPattern);
this.statsTracker = statsTracker;
}

private static HttpRequestTracker get(final String pathPattern, final Consumer<GoogleCloudStorageOperationsStats> statsConsumer) {
return new HttpRequestTracker("GET", pathPattern, statsConsumer);
}

/**
* Tracks the provided http request if it matches the criteria defined by this tracker.
*
* @param httpRequest the http request to be tracked
* @param stats the operation tracker
*
* @return {@code true} if the http request was tracked, {@code false} otherwise.
*/
private boolean track(final HttpRequest httpRequest, final GoogleCloudStorageOperationsStats stats) {
if (matchesCriteria(httpRequest) == false) return false;

statsTracker.accept(stats);
return true;
}

private boolean matchesCriteria(final HttpRequest httpRequest) {
return method.equalsIgnoreCase(httpRequest.getRequestMethod()) && pathMatches(httpRequest.getUrl());
}

private boolean pathMatches(final GenericUrl url) {
return pathPattern.matcher(url.getRawPath()).matches();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,53 +10,88 @@
package org.elasticsearch.repositories.gcs;

import org.elasticsearch.common.blobstore.BlobStoreActionStats;
import org.elasticsearch.common.blobstore.OperationPurpose;

import java.util.HashMap;
import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;

final class GoogleCloudStorageOperationsStats {

private final AtomicLong getCount = new AtomicLong();
private final AtomicLong listCount = new AtomicLong();
private final AtomicLong putCount = new AtomicLong();
private final AtomicLong postCount = new AtomicLong();

/**
* Every operation purpose and operation has a set of counters.
* Represented by {@code Map<Purpose,Map<Operation,Counters>>}
* <pre>
* {@code
* | Purpose | Operation | OperationsCnt | RequestCnt |
* |--------------+-----------------+---------------+------------|
* | SnapshotData | GetObject | 1 | 1 |
* | SnapshotData | ListObjects | 2 | 2 |
* | SnapshotData | ResumableUpload | 1 | 10 |
* | SnapshotData | ... | | |
* | Translog | GetObject | 5 | 5 |
* | ... | | | |
* }
* </pre>
*/
private final EnumMap<OperationPurpose, EnumMap<Operation, Counters>> counters;
private final String bucketName;

GoogleCloudStorageOperationsStats(String bucketName) {
this.bucketName = bucketName;
this.counters = new EnumMap<>(OperationPurpose.class);
for (var purpose : OperationPurpose.values()) {
var operations = new EnumMap<Operation, Counters>(Operation.class);
for (var operation : Operation.values()) {
operations.put(operation, new Counters(purpose, operation));
}
counters.put(purpose, operations);
}
}

void trackGetOperation() {
getCount.incrementAndGet();
public String bucketName() {
return bucketName;
}

void trackPutOperation() {
putCount.incrementAndGet();
void trackOperation(OperationPurpose purpose, Operation operation) {
counters.get(purpose).get(operation).operations.add(1);
}

void trackPostOperation() {
postCount.incrementAndGet();
void trackRequest(OperationPurpose purpose, Operation operation) {
counters.get(purpose).get(operation).requests.add(1);
}

void trackListOperation() {
listCount.incrementAndGet();
void trackRequestAndOperation(OperationPurpose purpose, Operation operation) {
var c = counters.get(purpose).get(operation);
c.requests.add(1);
c.operations.add(1);
}

String getTrackedBucket() {
return bucketName;
Map<String, BlobStoreActionStats> toMap() {
return counters.values()
.stream()
.flatMap(ops -> ops.values().stream())
.collect(Collectors.toUnmodifiableMap(Counters::name, (c) -> new BlobStoreActionStats(c.operations.sum(), c.requests.sum())));
}

// TODO: actually track requests and operations separately (see https://elasticco.atlassian.net/browse/ES-10213)
Map<String, BlobStoreActionStats> toMap() {
final Map<String, BlobStoreActionStats> results = new HashMap<>();
final long getOperations = getCount.get();
results.put("GetObject", new BlobStoreActionStats(getOperations, getOperations));
final long listOperations = listCount.get();
results.put("ListObjects", new BlobStoreActionStats(listOperations, listOperations));
final long insertOperations = postCount.get() + putCount.get();
results.put("InsertObject", new BlobStoreActionStats(insertOperations, insertOperations));
return results;
public enum Operation {
GET_METADATA("GetMetadata"),
GET_OBJECT("GetObject"),
LIST_OBJECTS("ListObjects"),
MULTIPART_UPLOAD("MultipartUpload"),
RESUMABLE_UPLOAD("ResumableUpload");

final String name;

Operation(String name) {
this.name = name;
}
}

private record Counters(String name, LongAdder operations, LongAdder requests) {
Counters(OperationPurpose purpose, Operation operation) {
this(purpose.name() + '_' + operation.name(), new LongAdder(), new LongAdder());
}
}
}
Loading