Skip to content

Commit 4862340

Browse files
committed
change operations and more test fixes
1 parent 4f3d728 commit 4862340

File tree

9 files changed

+216
-118
lines changed

9 files changed

+216
-118
lines changed

modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING;
6464
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING;
6565
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING;
66+
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation;
6667
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BASE_PATH;
6768
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BUCKET;
6869
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.CLIENT_NAME;
@@ -212,8 +213,8 @@ public TestGoogleCloudStoragePlugin(Settings settings) {
212213
}
213214

214215
@Override
215-
protected GoogleCloudStorageService createStorageService() {
216-
return new GoogleCloudStorageService() {
216+
protected GoogleCloudStorageService createStorageService(Settings settings) {
217+
return new GoogleCloudStorageService(settings) {
217218
@Override
218219
StorageOptions createStorageOptions(
219220
final GoogleCloudStorageClientSettings gcsClientSettings,
@@ -346,19 +347,17 @@ private static class GoogleCloudStorageStatsCollectorHttpHandler extends HttpSta
346347

347348
@Override
348349
public void maybeTrack(final String request, Headers requestHeaders) {
349-
if (Regex.simpleMatch("GET /storage/v1/b/*/o/*", request)) {
350-
trackRequest("GetObject");
350+
if (Regex.simpleMatch("GET */storage/v1/b/*/o/*", request)) {
351+
trackRequest(Operation.GET_OBJECT.key());
351352
} else if (Regex.simpleMatch("GET /storage/v1/b/*/o*", request)) {
352-
trackRequest("ListObjects");
353-
} else if (Regex.simpleMatch("GET /download/storage/v1/b/*", request)) {
354-
trackRequest("GetObject");
353+
trackRequest(Operation.LIST_OBJECTS.key());
355354
} else if (Regex.simpleMatch("PUT /upload/storage/v1/b/*uploadType=resumable*", request) && isLastPart(requestHeaders)) {
356355
// Resumable uploads are billed as a single operation, that's the reason we're tracking
357356
// the request only when it's the last part.
358357
// See https://cloud.google.com/storage/docs/resumable-uploads#introduction
359-
trackRequest("InsertObject");
358+
trackRequest(Operation.INSERT_OBJECT.key());
360359
} else if (Regex.simpleMatch("POST /upload/storage/v1/b/*uploadType=multipart*", request)) {
361-
trackRequest("InsertObject");
360+
trackRequest(Operation.INSERT_OBJECT.key());
362361
}
363362
}
364363

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
130130
this.repositoryName = repositoryName;
131131
this.storageService = storageService;
132132
this.bigArrays = bigArrays;
133-
this.stats = new GoogleCloudStorageOperationsStats(bucketName);
133+
this.stats = new GoogleCloudStorageOperationsStats(bucketName, storageService.isStateless());
134134
this.bufferSize = bufferSize;
135135
this.casBackoffPolicy = casBackoffPolicy;
136136
}
@@ -397,7 +397,7 @@ public void write(byte[] b, int off, int len) throws IOException {
397397
final WritableByteChannel writeChannel = channelRef.get();
398398
if (writeChannel != null) {
399399
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
400-
stats.trackOperation(purpose, Operation.RESUMABLE_UPLOAD);
400+
stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT);
401401
} else {
402402
writeBlob(purpose, blobName, buffer.bytes(), failIfAlreadyExists);
403403
}
@@ -467,7 +467,7 @@ private void writeBlobResumable(
467467
// we do with the GET/LIST operations since this operations
468468
// can trigger multiple underlying http requests but only one
469469
// operation is billed.
470-
stats.trackOperation(purpose, Operation.RESUMABLE_UPLOAD);
470+
stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT);
471471
return;
472472
} catch (final StorageException se) {
473473
final int errorCode = se.getCode();
@@ -519,7 +519,7 @@ private void writeBlobMultipart(
519519
// we do with the GET/LIST operations since this operations
520520
// can trigger multiple underlying http requests but only one
521521
// operation is billed.
522-
stats.trackOperation(purpose, Operation.MULTIPART_UPLOAD);
522+
stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT);
523523
} catch (final StorageException se) {
524524
if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) {
525525
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
@@ -638,7 +638,7 @@ private static String buildKey(String keyPath, String s) {
638638

639639
@Override
640640
public Map<String, BlobStoreActionStats> stats() {
641-
return stats.toMap();
641+
return stats.tracker().toMap();
642642
}
643643

644644
private static final class WritableBlobChannel implements WritableByteChannel {
@@ -749,7 +749,7 @@ OptionalBytesReference compareAndExchangeRegister(
749749
Storage.BlobTargetOption.generationMatch()
750750
)
751751
);
752-
stats.trackOperation(purpose, Operation.MULTIPART_UPLOAD);
752+
stats.tracker().trackOperation(purpose, Operation.INSERT_OBJECT);
753753
return OptionalBytesReference.of(expected);
754754
} catch (Exception e) {
755755
final var serviceException = unwrapServiceException(e);

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageHttpStatsCollector.java

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,17 @@
1717
import java.util.regex.Pattern;
1818

1919
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.Operation;
20+
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageOperationsStats.StatsTracker;
2021

2122
final class GoogleCloudStorageHttpStatsCollector implements HttpResponseInterceptor {
22-
private final GoogleCloudStorageOperationsStats stats;
23+
private final StatsTracker stats;
2324
private final OperationPurpose purpose;
2425
private final Pattern getObjPattern;
2526
private final Pattern insertObjPattern;
2627
private final Pattern listObjPattern;
2728

2829
GoogleCloudStorageHttpStatsCollector(final GoogleCloudStorageOperationsStats stats, OperationPurpose purpose) {
29-
this.stats = stats;
30+
this.stats = stats.tracker();
3031
this.purpose = purpose;
3132
var bucket = stats.bucketName();
3233

@@ -42,24 +43,25 @@ private void trackRequest(Operation operation) {
4243
}
4344

4445
private void trackRequestAndOperation(Operation operation) {
45-
stats.trackRequestAndOperation(purpose, operation);
46+
stats.trackOperationAndRequest(purpose, operation);
4647
}
4748

4849
@Override
4950
public void interceptResponse(final HttpResponse response) {
51+
var respCode = response.getStatusCode();
52+
// Some of the intermediate and error codes are still counted as "good" requests
53+
if (((respCode >= 200 && respCode < 300) || respCode == 308 || respCode == 404) == false) {
54+
return;
55+
}
5056
var request = response.getRequest();
57+
5158
var path = request.getUrl().getRawPath();
5259
var ignored = false;
5360
switch (request.getRequestMethod()) {
5461
case "GET" -> {
5562
// https://cloud.google.com/storage/docs/json_api/v1/objects/get
5663
if (getObjPattern.matcher(path).matches()) {
57-
// Retrieves object metadata. When alt=media is included as a query parameter, retrieves object data.
58-
if ("media".equals(request.getUrl().getFirst("alt"))) {
59-
trackRequestAndOperation(Operation.GET_OBJECT);
60-
} else {
61-
trackRequestAndOperation(Operation.GET_METADATA);
62-
}
64+
trackRequestAndOperation(Operation.GET_OBJECT);
6365
} else if (listObjPattern.matcher(path).matches()) {
6466
trackRequestAndOperation(Operation.LIST_OBJECTS);
6567
} else {
@@ -69,18 +71,7 @@ public void interceptResponse(final HttpResponse response) {
6971
case "POST", "PUT" -> {
7072
// https://cloud.google.com/storage/docs/json_api/v1/objects/insert
7173
if (insertObjPattern.matcher(path).matches()) {
72-
var obj = request.getUrl().getFirst("uploadType");
73-
if (obj instanceof String uploadType) {
74-
switch (uploadType) {
75-
// We dont track insert operations here, only requests. The reason is billing impact.
76-
// Any insert, including multipart or resumable parts, are counted as one operation.
77-
case "multipart" -> trackRequest(Operation.MULTIPART_UPLOAD);
78-
case "resumable" -> trackRequest(Operation.RESUMABLE_UPLOAD);
79-
default -> ignored = true;
80-
}
81-
} else {
82-
ignored = true;
83-
}
74+
trackRequest(Operation.INSERT_OBJECT);
8475
} else {
8576
ignored = true;
8677
}

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java

Lines changed: 126 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,98 @@
1919

2020
final class GoogleCloudStorageOperationsStats {
2121

22+
private final String bucketName;
23+
private final StatsTracker tracker;
24+
25+
GoogleCloudStorageOperationsStats(String bucketName, boolean isStateless) {
26+
this.bucketName = bucketName;
27+
if (isStateless) {
28+
this.tracker = new ServerlessTracker(bucketName);
29+
} else {
30+
this.tracker = new StatefulTracker();
31+
}
32+
}
33+
34+
GoogleCloudStorageOperationsStats(String bucketName) {
35+
this(bucketName, false);
36+
}
37+
38+
public String bucketName() {
39+
return bucketName;
40+
}
41+
42+
public StatsTracker tracker() {
43+
return tracker;
44+
}
45+
46+
public enum Operation {
47+
GET_OBJECT("GetObject"),
48+
LIST_OBJECTS("ListObjects"),
49+
INSERT_OBJECT("InsertObject");
50+
51+
private final String key;
52+
53+
Operation(String key) {
54+
this.key = key;
55+
}
56+
57+
public String key() {
58+
return key;
59+
}
60+
}
61+
62+
sealed interface StatsTracker permits ServerlessTracker, StatefulTracker {
63+
void trackRequest(OperationPurpose purpose, Operation operation);
64+
65+
void trackOperation(OperationPurpose purpose, Operation operation);
66+
67+
Map<String, BlobStoreActionStats> toMap();
68+
69+
default void trackOperationAndRequest(OperationPurpose purpose, Operation operation) {
70+
trackOperation(purpose, operation);
71+
trackRequest(purpose, operation);
72+
}
73+
}
74+
75+
/**
76+
* Stateful tracker is single dimension: Operation only. The OperationPurpose is ignored.
77+
*/
78+
static final class StatefulTracker implements StatsTracker {
79+
80+
private final EnumMap<Operation, Counters> counters;
81+
82+
StatefulTracker() {
83+
this.counters = new EnumMap<>(Operation.class);
84+
for (var operation : Operation.values()) {
85+
counters.put(operation, new Counters(operation.key()));
86+
}
87+
}
88+
89+
@Override
90+
public void trackRequest(OperationPurpose purpose, Operation operation) {
91+
// dont track requests, only operations
92+
}
93+
94+
@Override
95+
public void trackOperation(OperationPurpose purpose, Operation operation) {
96+
counters.get(operation).operations().add(1);
97+
}
98+
99+
@Override
100+
public Map<String, BlobStoreActionStats> toMap() {
101+
return counters.values().stream().collect(Collectors.toUnmodifiableMap(Counters::name, (c) -> {
102+
var ops = c.operations().sum();
103+
return new BlobStoreActionStats(ops, ops);
104+
}));
105+
}
106+
107+
}
108+
22109
/**
23-
* Every operation purpose and operation has a set of counters.
24-
* Represented by {@code Map<Purpose,Map<Operation,Counters>>}
110+
* Serverless tracker is 2-dimensional: OperationPurpose and Operations.
111+
* Every combination of these has own set of counters: number of operations and number of http requests.
112+
* A single operation might have multiple HTTP requests, for example a single ResumableUpload operation
113+
* can perform a series of HTTP requests with size up to {@link GoogleCloudStorageBlobStore#SDK_DEFAULT_CHUNK_SIZE}.
25114
* <pre>
26115
* {@code
27116
* | Purpose | Operation | OperationsCnt | RequestCnt |
@@ -35,63 +124,52 @@ final class GoogleCloudStorageOperationsStats {
35124
* }
36125
* </pre>
37126
*/
38-
private final EnumMap<OperationPurpose, EnumMap<Operation, Counters>> counters;
39-
private final String bucketName;
127+
static final class ServerlessTracker implements StatsTracker {
128+
private final EnumMap<OperationPurpose, EnumMap<Operation, Counters>> counters;
40129

41-
GoogleCloudStorageOperationsStats(String bucketName) {
42-
this.bucketName = bucketName;
43-
this.counters = new EnumMap<>(OperationPurpose.class);
44-
for (var purpose : OperationPurpose.values()) {
45-
var operations = new EnumMap<Operation, Counters>(Operation.class);
46-
for (var operation : Operation.values()) {
47-
operations.put(operation, new Counters(purpose, operation));
130+
ServerlessTracker(String bucketName) {
131+
this.counters = new EnumMap<>(OperationPurpose.class);
132+
for (var purpose : OperationPurpose.values()) {
133+
var operations = new EnumMap<Operation, Counters>(Operation.class);
134+
for (var operation : Operation.values()) {
135+
operations.put(operation, new Counters(purpose.getKey() + "_" + operation.key()));
136+
}
137+
counters.put(purpose, operations);
48138
}
49-
counters.put(purpose, operations);
50139
}
51-
}
52140

53-
public String bucketName() {
54-
return bucketName;
55-
}
56-
57-
void trackOperation(OperationPurpose purpose, Operation operation) {
58-
counters.get(purpose).get(operation).operations.add(1);
59-
}
60-
61-
void trackRequest(OperationPurpose purpose, Operation operation) {
62-
counters.get(purpose).get(operation).requests.add(1);
63-
}
64-
65-
void trackRequestAndOperation(OperationPurpose purpose, Operation operation) {
66-
var c = counters.get(purpose).get(operation);
67-
c.requests.add(1);
68-
c.operations.add(1);
69-
}
70-
71-
Map<String, BlobStoreActionStats> toMap() {
72-
return counters.values()
73-
.stream()
74-
.flatMap(ops -> ops.values().stream())
75-
.collect(Collectors.toUnmodifiableMap(Counters::name, (c) -> new BlobStoreActionStats(c.operations.sum(), c.requests.sum())));
76-
}
141+
@Override
142+
public void trackOperation(OperationPurpose purpose, Operation operation) {
143+
counters.get(purpose).get(operation).operations.add(1);
144+
}
77145

78-
public enum Operation {
79-
GET_METADATA("GetMetadata"),
80-
GET_OBJECT("GetObject"),
81-
LIST_OBJECTS("ListObjects"),
82-
MULTIPART_UPLOAD("MultipartUpload"),
83-
RESUMABLE_UPLOAD("ResumableUpload");
146+
@Override
147+
public void trackRequest(OperationPurpose purpose, Operation operation) {
148+
counters.get(purpose).get(operation).requests.add(1);
149+
}
84150

85-
final String name;
151+
@Override
152+
public void trackOperationAndRequest(OperationPurpose purpose, Operation operation) {
153+
var c = counters.get(purpose).get(operation);
154+
c.requests.add(1);
155+
c.operations.add(1);
156+
}
86157

87-
Operation(String name) {
88-
this.name = name;
158+
@Override
159+
public Map<String, BlobStoreActionStats> toMap() {
160+
return counters.values()
161+
.stream()
162+
.flatMap(ops -> ops.values().stream())
163+
.collect(
164+
Collectors.toUnmodifiableMap(Counters::name, (c) -> new BlobStoreActionStats(c.operations.sum(), c.requests.sum()))
165+
);
89166
}
167+
90168
}
91169

92170
private record Counters(String name, LongAdder operations, LongAdder requests) {
93-
Counters(OperationPurpose purpose, Operation operation) {
94-
this(purpose.name() + '_' + operation.name(), new LongAdder(), new LongAdder());
171+
Counters(String name) {
172+
this(name, new LongAdder(), new LongAdder());
95173
}
96174
}
97175
}

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,14 @@ public class GoogleCloudStoragePlugin extends Plugin implements RepositoryPlugin
3434

3535
@SuppressWarnings("this-escape")
3636
public GoogleCloudStoragePlugin(final Settings settings) {
37-
this.storageService = createStorageService();
37+
this.storageService = createStorageService(settings);
3838
// eagerly load client settings so that secure settings are readable (not closed)
3939
reload(settings);
4040
}
4141

4242
// overridable for tests
43-
protected GoogleCloudStorageService createStorageService() {
44-
return new GoogleCloudStorageService();
43+
protected GoogleCloudStorageService createStorageService(Settings settings) {
44+
return new GoogleCloudStorageService(settings);
4545
}
4646

4747
@Override

0 commit comments

Comments
 (0)