Skip to content

Commit a3d427b

Browse files
committed
wip
1 parent 7731316 commit a3d427b

File tree

5 files changed

+255
-144
lines changed

5 files changed

+255
-144
lines changed

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.elasticsearch.core.Streams;
4545
import org.elasticsearch.core.SuppressForbidden;
4646
import org.elasticsearch.core.TimeValue;
47+
import org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation;
4748
import org.elasticsearch.rest.RestStatus;
4849

4950
import java.io.ByteArrayInputStream;
@@ -73,6 +74,11 @@
7374

7475
class GoogleCloudStorageBlobStore implements BlobStore {
7576

77+
/**
78+
* see com.google.cloud.BaseWriteChannel#DEFAULT_CHUNK_SIZE
79+
*/
80+
static final int SDK_DEFAULT_CHUNK_SIZE = 60 * 256 * 1024;
81+
7682
private static final Logger logger = LogManager.getLogger(GoogleCloudStorageBlobStore.class);
7783

7884
// The recommended maximum size of a blob that should be uploaded in a single
@@ -380,7 +386,7 @@ public void write(byte[] b, int off, int len) throws IOException {
380386
while (written < len) {
381387
// at most write the default chunk size in one go to prevent allocating huge buffers in the SDK
382388
// see com.google.cloud.BaseWriteChannel#DEFAULT_CHUNK_SIZE
383-
final int toWrite = Math.min(len - written, 60 * 256 * 1024);
389+
final int toWrite = Math.min(len - written, SDK_DEFAULT_CHUNK_SIZE);
384390
out.write(b, off + written, toWrite);
385391
written += toWrite;
386392
}
@@ -393,7 +399,7 @@ public void write(byte[] b, int off, int len) throws IOException {
393399
final WritableByteChannel writeChannel = channelRef.get();
394400
if (writeChannel != null) {
395401
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
396-
stats.trackPutOperation();
402+
stats.trackOperation(purpose, Operation.RESUMABLE_UPLOAD);
397403
} else {
398404
writeBlob(purpose, blobName, buffer.bytes(), failIfAlreadyExists);
399405
}
@@ -463,7 +469,7 @@ private void writeBlobResumable(
463469
// we do with the GET/LIST operations since this operations
464470
// can trigger multiple underlying http requests but only one
465471
// operation is billed.
466-
stats.trackPutOperation();
472+
stats.trackOperation(purpose, Operation.RESUMABLE_UPLOAD);
467473
return;
468474
} catch (final StorageException se) {
469475
final int errorCode = se.getCode();
@@ -515,7 +521,7 @@ private void writeBlobMultipart(
515521
// we do with the GET/LIST operations since this operations
516522
// can trigger multiple underlying http requests but only one
517523
// operation is billed.
518-
stats.trackPostOperation();
524+
stats.trackOperation(purpose, Operation.MULTIPART_UPLOAD);
519525
} catch (final StorageException se) {
520526
if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) {
521527
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
@@ -745,7 +751,7 @@ OptionalBytesReference compareAndExchangeRegister(
745751
Storage.BlobTargetOption.generationMatch()
746752
)
747753
);
748-
stats.trackPostOperation();
754+
stats.trackOperation(OperationPurpose.SNAPSHOT_DATA, Operation.MULTIPART_UPLOAD);
749755
return OptionalBytesReference.of(expected);
750756
} catch (Exception e) {
751757
final var serviceException = unwrapServiceException(e);

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageHttpStatsCollector.java

Lines changed: 60 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -9,115 +9,82 @@
99

1010
package org.elasticsearch.repositories.gcs;
1111

12-
import com.google.api.client.http.GenericUrl;
13-
import com.google.api.client.http.HttpRequest;
1412
import com.google.api.client.http.HttpResponse;
1513
import com.google.api.client.http.HttpResponseInterceptor;
1614

1715
import org.elasticsearch.common.blobstore.OperationPurpose;
1816

19-
import java.util.List;
20-
import java.util.Locale;
21-
import java.util.function.Consumer;
22-
import java.util.function.Function;
2317
import java.util.regex.Pattern;
2418

25-
import static java.lang.String.format;
19+
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation;
2620

2721
final class GoogleCloudStorageHttpStatsCollector implements HttpResponseInterceptor {
28-
// The specification for the current API (v1) endpoints can be found at:
29-
// https://cloud.google.com/storage/docs/json_api/v1
30-
private static final List<Function<String, HttpRequestTracker>> trackerFactories = List.of(
31-
(bucket) -> HttpRequestTracker.get(
32-
format(Locale.ROOT, "/download/storage/v1/b/%s/o/.+", bucket),
33-
GoogleCloudStorageOperationsStats::trackGetOperation
34-
),
35-
36-
(bucket) -> HttpRequestTracker.get(
37-
format(Locale.ROOT, "/storage/v1/b/%s/o/.+", bucket),
38-
GoogleCloudStorageOperationsStats::trackGetOperation
39-
),
40-
41-
(bucket) -> HttpRequestTracker.get(
42-
format(Locale.ROOT, "/storage/v1/b/%s/o", bucket),
43-
GoogleCloudStorageOperationsStats::trackListOperation
44-
)
45-
);
22+
private final GoogleCloudStorageOperationsStats stats;
23+
private final OperationPurpose purpose;
24+
private final Pattern getObjPattern;
25+
private final Pattern insertObPattern;
26+
private final Pattern listObjPattern;
27+
28+
GoogleCloudStorageHttpStatsCollector(final GoogleCloudStorageOperationsStats stats, OperationPurpose purpose) {
29+
this.stats = stats;
30+
this.purpose = purpose;
31+
var bucket = stats.bucketName();
32+
33+
// The specification for the current API (v1) endpoints can be found at:
34+
// https://cloud.google.com/storage/docs/json_api/v1
35+
this.getObjPattern = Pattern.compile("(/download)?/storage/v1/b/" + bucket + "/o/.+");
36+
this.insertObPattern = Pattern.compile("(/upload)?/storage/v1/b/" + bucket + "/o");
37+
this.listObjPattern = Pattern.compile("/storage/v1/b/" + bucket + "/o");
38+
}
4639

47-
private final GoogleCloudStorageOperationsStats gcsOperationStats;
48-
private final OperationPurpose operationPurpose;
49-
private final List<HttpRequestTracker> trackers;
40+
private void trackRequest(Operation operation) {
41+
stats.trackRequest(purpose, operation);
42+
}
5043

51-
GoogleCloudStorageHttpStatsCollector(final GoogleCloudStorageOperationsStats gcsOperationStats, OperationPurpose operationPurpose) {
52-
this.gcsOperationStats = gcsOperationStats;
53-
this.operationPurpose = operationPurpose;
54-
this.trackers = trackerFactories.stream()
55-
.map(trackerFactory -> trackerFactory.apply(gcsOperationStats.getTrackedBucket()))
56-
.toList();
44+
private void trackRequestAndOperation(Operation operation) {
45+
stats.trackRequestAndOperation(purpose, operation);
5746
}
5847

5948
@Override
6049
public void interceptResponse(final HttpResponse response) {
61-
// TODO keep track of unsuccessful requests in different entries
62-
if (response.isSuccessStatusCode() == false) return;
63-
64-
final HttpRequest request = response.getRequest();
65-
for (HttpRequestTracker tracker : trackers) {
66-
if (tracker.track(request, gcsOperationStats)) {
67-
return;
50+
var request = response.getRequest();
51+
var path = request.getUrl().getRawPath();
52+
switch (request.getRequestMethod()) {
53+
case "GET" -> {
54+
// https://cloud.google.com/storage/docs/json_api/v1/objects/get
55+
if (getObjPattern.matcher(path).matches()) {
56+
// Retrieves object metadata. When alt=media is included as a query parameter, retrieves object data.
57+
if (request.getUrl().getFirst("alt").equals("media")) {
58+
trackRequestAndOperation(Operation.GET_OBJECT);
59+
} else {
60+
trackRequestAndOperation(Operation.GET_METADATA);
61+
}
62+
} else if (listObjPattern.matcher(path).matches()) {
63+
trackRequestAndOperation(Operation.LIST_OBJECTS);
64+
}
65+
// ignore other get requests
66+
}
67+
case "POST", "PUT" -> {
68+
// https://cloud.google.com/storage/docs/json_api/v1/objects/insert
69+
if (insertObPattern.matcher(path).matches()) {
70+
var obj = request.getUrl().getFirst("uploadType");
71+
if (obj instanceof String uploadType) {
72+
switch (uploadType) {
73+
// We dont track insert operations here, only requests. The reason is billing impact.
74+
// Any insert, including multipart or resumable parts, are counted as one operation.
75+
case "multipart" -> trackRequest(Operation.MULTIPART_UPLOAD);
76+
case "resumable" -> trackRequest(Operation.RESUMABLE_UPLOAD);
77+
default -> {
78+
// ignore "media" - Data-only upload. Upload the object data only, without any metadata.
79+
}
80+
}
81+
}
82+
}
83+
// ignore other post requests
84+
}
85+
default -> {
86+
// ignore other http methods
6887
}
69-
}
70-
}
71-
72-
/**
73-
* Http request tracker that allows to track certain HTTP requests based on the following criteria:
74-
* <ul>
75-
* <li>The HTTP request method</li>
76-
* <li>An URI path regex expression</li>
77-
* </ul>
78-
*
79-
* The requests that match the previous criteria are tracked using the {@code statsTracker} function.
80-
*/
81-
private static final class HttpRequestTracker {
82-
private final String method;
83-
private final Pattern pathPattern;
84-
private final Consumer<GoogleCloudStorageOperationsStats> statsTracker;
85-
86-
private HttpRequestTracker(
87-
final String method,
88-
final String pathPattern,
89-
final Consumer<GoogleCloudStorageOperationsStats> statsTracker
90-
) {
91-
this.method = method;
92-
this.pathPattern = Pattern.compile(pathPattern);
93-
this.statsTracker = statsTracker;
94-
}
95-
96-
private static HttpRequestTracker get(final String pathPattern, final Consumer<GoogleCloudStorageOperationsStats> statsConsumer) {
97-
return new HttpRequestTracker("GET", pathPattern, statsConsumer);
98-
}
99-
100-
/**
101-
* Tracks the provided http request if it matches the criteria defined by this tracker.
102-
*
103-
* @param httpRequest the http request to be tracked
104-
* @param stats the operation tracker
105-
*
106-
* @return {@code true} if the http request was tracked, {@code false} otherwise.
107-
*/
108-
private boolean track(final HttpRequest httpRequest, final GoogleCloudStorageOperationsStats stats) {
109-
if (matchesCriteria(httpRequest) == false) return false;
110-
111-
statsTracker.accept(stats);
112-
return true;
113-
}
114-
115-
private boolean matchesCriteria(final HttpRequest httpRequest) {
116-
return method.equalsIgnoreCase(httpRequest.getRequestMethod()) && pathMatches(httpRequest.getUrl());
117-
}
118-
119-
private boolean pathMatches(final GenericUrl url) {
120-
return pathPattern.matcher(url.getRawPath()).matches();
12188
}
12289
}
12390
}

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java

Lines changed: 61 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,53 +10,87 @@
1010
package org.elasticsearch.repositories.gcs;
1111

1212
import org.elasticsearch.common.blobstore.BlobStoreActionStats;
13+
import org.elasticsearch.common.blobstore.OperationPurpose;
1314

14-
import java.util.HashMap;
15+
import java.util.EnumMap;
1516
import java.util.Map;
16-
import java.util.concurrent.atomic.AtomicLong;
17+
import java.util.concurrent.atomic.LongAdder;
18+
import java.util.stream.Collectors;
1719

1820
final class GoogleCloudStorageOperationsStats {
1921

20-
private final AtomicLong getCount = new AtomicLong();
21-
private final AtomicLong listCount = new AtomicLong();
22-
private final AtomicLong putCount = new AtomicLong();
23-
private final AtomicLong postCount = new AtomicLong();
24-
22+
/**
23+
* Every operation purpose and operation has a set of counters.
24+
* Represented by {@code Map<Purpose,Map<Operation,Counters>>}
25+
* <pre>
26+
* {@code
27+
* | Purpose | Operation | OperationsCnt | RequestCnt |
28+
* |--------------+-------------+---------------+-------------|
29+
* | SnapshotData | GetObject | 10 | 10 |
30+
* | SnapshotData | ListObjects | 20 | 21(1 retry) |
31+
* | SnapshotData | ... | | |
32+
* | Translog | GetObject | 5 | 5 |
33+
* | ... | | | |
34+
* }
35+
* </pre>
36+
*/
37+
private final EnumMap<OperationPurpose, EnumMap<Operation, Counters>> counters;
2538
private final String bucketName;
2639

2740
GoogleCloudStorageOperationsStats(String bucketName) {
2841
this.bucketName = bucketName;
42+
this.counters = new EnumMap<>(OperationPurpose.class);
43+
for (var purpose : OperationPurpose.values()) {
44+
var operations = new EnumMap<Operation, Counters>(Operation.class);
45+
for (var operation : Operation.values()) {
46+
operations.put(operation, new Counters(purpose, operation));
47+
}
48+
counters.put(purpose, operations);
49+
}
2950
}
3051

31-
void trackGetOperation() {
32-
getCount.incrementAndGet();
52+
public String bucketName() {
53+
return bucketName;
3354
}
3455

35-
void trackPutOperation() {
36-
putCount.incrementAndGet();
56+
void trackOperation(OperationPurpose purpose, Operation operation) {
57+
counters.get(purpose).get(operation).operations.add(1);
3758
}
3859

39-
void trackPostOperation() {
40-
postCount.incrementAndGet();
60+
void trackRequest(OperationPurpose purpose, Operation operation) {
61+
counters.get(purpose).get(operation).requests.add(1);
4162
}
4263

43-
void trackListOperation() {
44-
listCount.incrementAndGet();
64+
void trackRequestAndOperation(OperationPurpose purpose, Operation operation) {
65+
var c = counters.get(purpose).get(operation);
66+
c.requests.add(1);
67+
c.operations.add(1);
4568
}
4669

47-
String getTrackedBucket() {
48-
return bucketName;
70+
Map<String, BlobStoreActionStats> toMap() {
71+
return counters.values()
72+
.stream()
73+
.flatMap(ops -> ops.values().stream())
74+
.collect(Collectors.toUnmodifiableMap(Counters::name, (c) -> new BlobStoreActionStats(c.operations.sum(), c.requests.sum())));
4975
}
5076

51-
// TODO: actually track requests and operations separately (see https://elasticco.atlassian.net/browse/ES-10213)
52-
Map<String, BlobStoreActionStats> toMap() {
53-
final Map<String, BlobStoreActionStats> results = new HashMap<>();
54-
final long getOperations = getCount.get();
55-
results.put("GetObject", new BlobStoreActionStats(getOperations, getOperations));
56-
final long listOperations = listCount.get();
57-
results.put("ListObjects", new BlobStoreActionStats(listOperations, listOperations));
58-
final long insertOperations = postCount.get() + putCount.get();
59-
results.put("InsertObject", new BlobStoreActionStats(insertOperations, insertOperations));
60-
return results;
77+
public enum Operation {
78+
GET_METADATA("GetMetadata"),
79+
GET_OBJECT("GetObject"),
80+
LIST_OBJECTS("ListObjects"),
81+
MULTIPART_UPLOAD("MultipartUpload"),
82+
RESUMABLE_UPLOAD("ResumableUpload");
83+
84+
final String name;
85+
86+
Operation(String name) {
87+
this.name = name;
88+
}
89+
}
90+
91+
private record Counters(String name, LongAdder operations, LongAdder requests) {
92+
Counters(OperationPurpose purpose, Operation operation) {
93+
this(purpose.name() + '_' + operation.name(), new LongAdder(), new LongAdder());
94+
}
6195
}
6296
}

0 commit comments

Comments
 (0)