Skip to content

Commit 1086bc7

Browse files
committed
cleanup
1 parent 111e19b commit 1086bc7

12 files changed

+57
-49
lines changed

modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ public Map<String, Repository.Factory> getRepositories(
277277
clusterService,
278278
bigArrays,
279279
recoverySettings,
280-
new RepositoryStatsCollector()
280+
new GcsRepositoryStatsCollector()
281281
) {
282282
@Override
283283
protected GoogleCloudStorageBlobStore createBlobStore() {
Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
* Stats collector class that performs metrics initialization and propagation through GCS client
3232
* calls. This class encapsulates ThreadLocal metrics access.
3333
*/
34-
public class RepositoryStatsCollector {
34+
public class GcsRepositoryStatsCollector {
3535

3636
private static final ThreadLocal<OperationStats> OPERATION_STATS = new ThreadLocal<>();
3737

@@ -53,33 +53,30 @@ public class RepositoryStatsCollector {
5353
}
5454
}
5555
};
56-
5756
/**
5857
* Track operations for billing and REST API
5958
*/
60-
private final EnumMap<StorageOperation, LongAdder> restMetering;
61-
59+
private final EnumMap<StorageOperation, OpsCollector> restMetering;
6260
/**
6361
* Telemetry (APM)
6462
*/
6563
private final RepositoriesMetrics telemetry;
6664
private final EnumMap<OperationPurpose, EnumMap<StorageOperation, Map<String, Object>>> telemetryAttributes;
67-
6865
/**
6966
* track request duration
7067
*/
7168
private final LongSupplier timer;
7269

73-
RepositoryStatsCollector() {
70+
GcsRepositoryStatsCollector() {
7471
this(() -> 0L, new RepositoryMetadata(GoogleCloudStorageRepository.TYPE, "", Settings.EMPTY), RepositoriesMetrics.NOOP);
7572
}
7673

77-
RepositoryStatsCollector(LongSupplier timer, RepositoryMetadata metadata, RepositoriesMetrics repositoriesMetrics) {
74+
GcsRepositoryStatsCollector(LongSupplier timer, RepositoryMetadata metadata, RepositoriesMetrics repositoriesMetrics) {
7875
this.timer = timer;
7976
this.telemetry = repositoriesMetrics;
8077
this.restMetering = new EnumMap<>(StorageOperation.class);
8178
for (var op : StorageOperation.values()) {
82-
restMetering.put(op, new LongAdder());
79+
restMetering.put(op, new OpsCollector(new LongAdder(), new LongAdder()));
8380
}
8481
this.telemetryAttributes = new EnumMap<>(OperationPurpose.class);
8582
if (repositoriesMetrics != RepositoriesMetrics.NOOP) {
@@ -215,7 +212,9 @@ private void collect(OperationStats stats) {
215212
opErr = stats.isSuccess ? 0 : 1;
216213
}
217214
}
218-
restMetering.get(op).add(opOk);
215+
var opStats = restMetering.get(op);
216+
opStats.operations.add(opOk);
217+
opStats.requests.add(stats.reqAtt - stats.reqErr + stats.reqBillableErr);
219218

220219
if (telemetry != RepositoriesMetrics.NOOP) {
221220
var attr = telemetryAttributes.get(stats.purpose).get(stats.operation);
@@ -234,8 +233,11 @@ private void collect(OperationStats stats) {
234233

235234
public Map<String, BlobStoreActionStats> operationsStats() {
236235
return restMetering.entrySet().stream().collect(Collectors.toMap(e -> e.getKey().key, e -> {
237-
var ops = e.getValue().sum();
238-
return new BlobStoreActionStats(ops, ops);
236+
var ops = e.getValue().operations.sum();
237+
var reqs = e.getValue().requests.sum();
238+
return new BlobStoreActionStats(ops, reqs);
239239
}));
240240
}
241+
242+
record OpsCollector(LongAdder operations, LongAdder requests) {}
241243
}

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
109109
private final String clientName;
110110
private final String repositoryName;
111111
private final GoogleCloudStorageService storageService;
112-
private final RepositoryStatsCollector statsCollector;
112+
private final GcsRepositoryStatsCollector statsCollector;
113113
private final int bufferSize;
114114
private final BigArrays bigArrays;
115115
private final BackoffPolicy casBackoffPolicy;
@@ -122,7 +122,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
122122
BigArrays bigArrays,
123123
int bufferSize,
124124
BackoffPolicy casBackoffPolicy,
125-
RepositoryStatsCollector statsCollector
125+
GcsRepositoryStatsCollector statsCollector
126126
) {
127127
this.bucketName = bucketName;
128128
this.clientName = clientName;
@@ -462,10 +462,6 @@ private void writeBlobResumable(
462462
*/
463463
org.elasticsearch.core.Streams.copy(inputStream, Channels.newOutputStream(new WritableBlobChannel(writeChannel)), buffer);
464464
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
465-
// We don't track this operation on the http layer as
466-
// we do with the GET/LIST operations since this operations
467-
// can trigger multiple underlying http requests but only one
468-
// operation is billed.
469465
return;
470466
} catch (final StorageException se) {
471467
final int errorCode = se.getCode();

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public Map<String, Repository.Factory> getRepositories(
6262
clusterService,
6363
bigArrays,
6464
recoverySettings,
65-
new RepositoryStatsCollector(() -> clusterService.threadPool().absoluteTimeInMillis(), metadata, repositoriesMetrics)
65+
new GcsRepositoryStatsCollector(() -> clusterService.threadPool().absoluteTimeInMillis(), metadata, repositoriesMetrics)
6666
)
6767
);
6868
}

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {
8585
private final TimeValue retryThrottledCasDelayIncrement;
8686
private final int retryThrottledCasMaxNumberOfRetries;
8787
private final TimeValue retryThrottledCasMaxDelay;
88-
private final RepositoryStatsCollector statsCollector;
88+
private final GcsRepositoryStatsCollector statsCollector;
8989

9090
GoogleCloudStorageRepository(
9191
final RepositoryMetadata metadata,
@@ -94,7 +94,7 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {
9494
final ClusterService clusterService,
9595
final BigArrays bigArrays,
9696
final RecoverySettings recoverySettings,
97-
final RepositoryStatsCollector statsCollector
97+
final GcsRepositoryStatsCollector statsCollector
9898
) {
9999
super(
100100
metadata,
@@ -152,7 +152,7 @@ protected ByteSizeValue chunkSize() {
152152
return chunkSize;
153153
}
154154

155-
RepositoryStatsCollector statsCollector() {
155+
GcsRepositoryStatsCollector statsCollector() {
156156
return statsCollector;
157157
}
158158

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public synchronized void refreshAndClearCache(Map<String, GoogleCloudStorageClie
8989
* @return a cached client storage instance that can be used to manage objects
9090
* (blobs)
9191
*/
92-
public MeteredStorage client(final String clientName, final String repositoryName, final RepositoryStatsCollector statsCollector)
92+
public MeteredStorage client(final String clientName, final String repositoryName, final GcsRepositoryStatsCollector statsCollector)
9393
throws IOException {
9494
ClientKey clientKey = new ClientKey(repositoryName);
9595
{
@@ -137,7 +137,7 @@ synchronized void closeRepositoryClients(String repositoryName) {
137137
* @return a new client storage instance that can be used to manage objects
138138
* (blobs)
139139
*/
140-
private MeteredStorage createClient(GoogleCloudStorageClientSettings gcsClientSettings, RepositoryStatsCollector statsCollector)
140+
private MeteredStorage createClient(GoogleCloudStorageClientSettings gcsClientSettings, GcsRepositoryStatsCollector statsCollector)
141141
throws IOException {
142142
final HttpTransport httpTransport = SocketAccess.doPrivilegedIOException(() -> {
143143
final NetHttpTransport.Builder builder = new NetHttpTransport.Builder();
@@ -171,7 +171,7 @@ public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions<?, ?> ser
171171

172172
return (httpRequest) -> {
173173
if (requestInitializer != null) requestInitializer.initialize(httpRequest);
174-
httpRequest.setResponseInterceptor(RepositoryStatsCollector.METERING_INTERCEPTOR);
174+
httpRequest.setResponseInterceptor(GcsRepositoryStatsCollector.METERING_INTERCEPTOR);
175175
};
176176
}
177177
};

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/MeteredStorage.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,16 @@
4848
public class MeteredStorage {
4949
private final Storage storage;
5050
private final com.google.api.services.storage.Storage storageRpc;
51-
private final RepositoryStatsCollector statsCollector;
51+
private final GcsRepositoryStatsCollector statsCollector;
5252

53-
public MeteredStorage(Storage storage, RepositoryStatsCollector statsCollector) {
53+
public MeteredStorage(Storage storage, GcsRepositoryStatsCollector statsCollector) {
5454
this.storage = storage;
5555
SpecialPermission.check();
5656
this.storageRpc = getStorageRpc(storage);
5757
this.statsCollector = statsCollector;
5858
}
5959

60-
MeteredStorage(Storage storage, com.google.api.services.storage.Storage storageRpc, RepositoryStatsCollector statsCollector) {
60+
MeteredStorage(Storage storage, com.google.api.services.storage.Storage storageRpc, GcsRepositoryStatsCollector statsCollector) {
6161
this.storage = storage;
6262
this.storageRpc = storageRpc;
6363
this.statsCollector = statsCollector;
@@ -128,12 +128,12 @@ public MeteredReadChannel meteredReader(OperationPurpose purpose, BlobId blobId,
128128
* A delegating Objects.Get requests with metrics collection
129129
*/
130130
public static class MeteredObjectsGetRequest {
131-
private final RepositoryStatsCollector statsCollector;
131+
private final GcsRepositoryStatsCollector statsCollector;
132132
private final OperationPurpose purpose;
133133
private final com.google.api.services.storage.Storage.Objects.Get get;
134134

135135
MeteredObjectsGetRequest(
136-
RepositoryStatsCollector statsCollector,
136+
GcsRepositoryStatsCollector statsCollector,
137137
OperationPurpose purpose,
138138
com.google.api.services.storage.Storage.Objects.Get get
139139
) {
@@ -160,11 +160,11 @@ public HttpResponse executeMedia() throws IOException {
160160
*/
161161
@SuppressForbidden(reason = "wraps GCS channel")
162162
public static class MeteredWriteChannel implements WriteChannel {
163-
private final RepositoryStatsCollector statsCollector;
163+
private final GcsRepositoryStatsCollector statsCollector;
164164
private final WriteChannel writeChannel;
165165
private final OperationStats stats;
166166

167-
public MeteredWriteChannel(RepositoryStatsCollector statsCollector, OperationStats initStats, WriteChannel readChannel) {
167+
public MeteredWriteChannel(GcsRepositoryStatsCollector statsCollector, OperationStats initStats, WriteChannel readChannel) {
168168
this.statsCollector = statsCollector;
169169
this.writeChannel = readChannel;
170170
this.stats = initStats;
@@ -201,11 +201,11 @@ public void close() throws IOException {
201201
*/
202202
@SuppressForbidden(reason = "wraps GCS channel")
203203
public static class MeteredReadChannel implements ReadChannel {
204-
private final RepositoryStatsCollector statsCollector;
204+
private final GcsRepositoryStatsCollector statsCollector;
205205
private final ReadChannel readChannel;
206206
private final OperationStats stats;
207207

208-
MeteredReadChannel(RepositoryStatsCollector statsCollector, OperationStats initStats, ReadChannel readChannel) {
208+
MeteredReadChannel(GcsRepositoryStatsCollector statsCollector, OperationStats initStats, ReadChannel readChannel) {
209209
this.statsCollector = statsCollector;
210210
this.readChannel = readChannel;
211211
this.stats = initStats;
@@ -255,11 +255,11 @@ public boolean isOpen() {
255255
}
256256

257257
public static class MeteredBlobPage implements Page<Blob> {
258-
private final RepositoryStatsCollector statsCollector;
258+
private final GcsRepositoryStatsCollector statsCollector;
259259
private final OperationPurpose purpose;
260260
private final Page<Blob> pages;
261261

262-
public MeteredBlobPage(RepositoryStatsCollector statsCollector, OperationPurpose purpose, Page<Blob> pages) {
262+
public MeteredBlobPage(GcsRepositoryStatsCollector statsCollector, OperationPurpose purpose, Page<Blob> pages) {
263263
this.statsCollector = statsCollector;
264264
this.purpose = purpose;
265265
this.pages = pages;

modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions<?, ?> ser
204204
BigArrays.NON_RECYCLING_INSTANCE,
205205
randomIntBetween(1, 8) * 1024,
206206
BackoffPolicy.linearBackoff(TimeValue.timeValueMillis(1), 3, TimeValue.timeValueSeconds(1)),
207-
new RepositoryStatsCollector()
207+
new GcsRepositoryStatsCollector()
208208
);
209209

210210
return new GoogleCloudStorageBlobContainer(randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo"), blobStore);

modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ public void testResumableWrite() throws Exception {
155155
// the +1 means a POST request with metadata without PAYLOAD
156156
final int totalRequests = parts + 1;
157157
final StatsMap wantStats = new StatsMap(purpose);
158-
assertStatsEquals(wantStats.add(INSERT, 1), store.stats());
158+
assertStatsEquals(wantStats.add(INSERT, 1, totalRequests), store.stats());
159159

160160
try (InputStream is = container.readBlob(purpose, blobName)) {
161161
assertEquals(blobContents, Streams.readFully(is));
@@ -258,7 +258,7 @@ private ContainerAndBlobStore createBlobContainer(final String repositoryName) t
258258
BigArrays.NON_RECYCLING_INSTANCE,
259259
Math.toIntExact(BUFFER_SIZE.getBytes()),
260260
BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(10), 10),
261-
new RepositoryStatsCollector()
261+
new GcsRepositoryStatsCollector()
262262
);
263263
final GoogleCloudStorageBlobContainer googleCloudStorageBlobContainer = new GoogleCloudStorageBlobContainer(
264264
BlobPath.EMPTY,
@@ -295,7 +295,15 @@ class StatsMap extends HashMap<String, BlobStoreActionStats> {
295295
StatsMap add(StorageOperation operation, long ops) {
296296
compute(operation.key(), (k, v) -> {
297297
assert v != null;
298-
return new BlobStoreActionStats(v.operations() + ops, v.operations() + ops);
298+
return new BlobStoreActionStats(v.operations() + ops, v.requests() + ops);
299+
});
300+
return this;
301+
}
302+
303+
StatsMap add(StorageOperation operation, long ops, long reqs) {
304+
compute(operation.key(), (k, v) -> {
305+
assert v != null;
306+
return new BlobStoreActionStats(v.operations() + ops, v.requests() + reqs);
299307
});
300308
return this;
301309
}

modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,12 @@ public void testDeleteBlobsIgnoringIfNotExistsThrowsIOException() throws Excepti
7878
when(storage.get("bucket")).thenReturn(mock(Bucket.class));
7979
when(storage.batch()).thenReturn(batch);
8080
final com.google.api.services.storage.Storage storageRpc = mock(com.google.api.services.storage.Storage.class);
81-
final MeteredStorage meteredStorage = new MeteredStorage(storage, storageRpc, new RepositoryStatsCollector());
81+
final MeteredStorage meteredStorage = new MeteredStorage(storage, storageRpc, new GcsRepositoryStatsCollector());
8282

8383
final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class);
84-
when(storageService.client(any(String.class), any(String.class), any(RepositoryStatsCollector.class))).thenReturn(meteredStorage);
84+
when(storageService.client(any(String.class), any(String.class), any(GcsRepositoryStatsCollector.class))).thenReturn(
85+
meteredStorage
86+
);
8587

8688
try (
8789
BlobStore store = new GoogleCloudStorageBlobStore(
@@ -92,7 +94,7 @@ public void testDeleteBlobsIgnoringIfNotExistsThrowsIOException() throws Excepti
9294
BigArrays.NON_RECYCLING_INSTANCE,
9395
randomIntBetween(1, 8) * 1024,
9496
BackoffPolicy.noBackoff(),
95-
new RepositoryStatsCollector()
97+
new GcsRepositoryStatsCollector()
9698
)
9799
) {
98100
final BlobContainer container = store.blobContainer(BlobPath.EMPTY);

0 commit comments

Comments
 (0)