Skip to content

Commit 111e19b

Browse files
committed
update
1 parent 761f358 commit 111e19b

14 files changed

+165
-126
lines changed

modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ public Map<String, Repository.Factory> getRepositories(
277277
clusterService,
278278
bigArrays,
279279
recoverySettings,
280-
repositoriesMetrics
280+
new RepositoryStatsCollector()
281281
) {
282282
@Override
283283
protected GoogleCloudStorageBlobStore createBlobStore() {
@@ -289,7 +289,7 @@ protected GoogleCloudStorageBlobStore createBlobStore() {
289289
bigArrays,
290290
randomIntBetween(1, 8) * 1024,
291291
BackoffPolicy.noBackoff(),
292-
repositoriesMetrics
292+
this.statsCollector()
293293
) {
294294
@Override
295295
long getLargeBlobThresholdInBytes() {

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.elasticsearch.core.Streams;
4444
import org.elasticsearch.core.SuppressForbidden;
4545
import org.elasticsearch.core.TimeValue;
46-
import org.elasticsearch.repositories.RepositoriesMetrics;
4746
import org.elasticsearch.rest.RestStatus;
4847

4948
import java.io.ByteArrayInputStream;
@@ -123,14 +122,14 @@ class GoogleCloudStorageBlobStore implements BlobStore {
123122
BigArrays bigArrays,
124123
int bufferSize,
125124
BackoffPolicy casBackoffPolicy,
126-
RepositoriesMetrics repositoriesMetrics
125+
RepositoryStatsCollector statsCollector
127126
) {
128127
this.bucketName = bucketName;
129128
this.clientName = clientName;
130129
this.repositoryName = repositoryName;
131130
this.storageService = storageService;
132131
this.bigArrays = bigArrays;
133-
this.statsCollector = new RepositoryStatsCollector(repositoryName, repositoriesMetrics);
132+
this.statsCollector = statsCollector;
134133
this.bufferSize = bufferSize;
135134
this.casBackoffPolicy = casBackoffPolicy;
136135
}

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public Map<String, Repository.Factory> getRepositories(
6262
clusterService,
6363
bigArrays,
6464
recoverySettings,
65-
repositoriesMetrics
65+
new RepositoryStatsCollector(() -> clusterService.threadPool().absoluteTimeInMillis(), metadata, repositoriesMetrics)
6666
)
6767
);
6868
}

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.elasticsearch.common.util.BigArrays;
2323
import org.elasticsearch.core.TimeValue;
2424
import org.elasticsearch.indices.recovery.RecoverySettings;
25-
import org.elasticsearch.repositories.RepositoriesMetrics;
2625
import org.elasticsearch.repositories.RepositoryException;
2726
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
2827
import org.elasticsearch.xcontent.NamedXContentRegistry;
@@ -86,7 +85,7 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {
8685
private final TimeValue retryThrottledCasDelayIncrement;
8786
private final int retryThrottledCasMaxNumberOfRetries;
8887
private final TimeValue retryThrottledCasMaxDelay;
89-
private final RepositoriesMetrics repositoriesMetrics;
88+
private final RepositoryStatsCollector statsCollector;
9089

9190
GoogleCloudStorageRepository(
9291
final RepositoryMetadata metadata,
@@ -95,7 +94,7 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {
9594
final ClusterService clusterService,
9695
final BigArrays bigArrays,
9796
final RecoverySettings recoverySettings,
98-
final RepositoriesMetrics repositoriesMetrics
97+
final RepositoryStatsCollector statsCollector
9998
) {
10099
super(
101100
metadata,
@@ -113,7 +112,7 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {
113112
this.retryThrottledCasDelayIncrement = RETRY_THROTTLED_CAS_DELAY_INCREMENT.get(metadata.settings());
114113
this.retryThrottledCasMaxNumberOfRetries = RETRY_THROTTLED_CAS_MAX_NUMBER_OF_RETRIES.get(metadata.settings());
115114
this.retryThrottledCasMaxDelay = RETRY_THROTTLED_CAS_MAXIMUM_DELAY.get(metadata.settings());
116-
this.repositoriesMetrics = repositoriesMetrics;
115+
this.statsCollector = statsCollector;
117116
logger.debug("using bucket [{}], base_path [{}], chunk_size [{}], compress [{}]", bucket, basePath(), chunkSize, isCompress());
118117
}
119118

@@ -144,7 +143,7 @@ protected GoogleCloudStorageBlobStore createBlobStore() {
144143
bigArrays,
145144
bufferSize,
146145
BackoffPolicy.linearBackoff(retryThrottledCasDelayIncrement, retryThrottledCasMaxNumberOfRetries, retryThrottledCasMaxDelay),
147-
repositoriesMetrics
146+
statsCollector
148147
);
149148
}
150149

@@ -153,6 +152,10 @@ protected ByteSizeValue chunkSize() {
153152
return chunkSize;
154153
}
155154

155+
RepositoryStatsCollector statsCollector() {
156+
return statsCollector;
157+
}
158+
156159
/**
157160
* Get a given setting from the repository settings, throwing a {@link RepositoryException} if the setting does not exist or is empty.
158161
*/

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions<?, ?> ser
171171

172172
return (httpRequest) -> {
173173
if (requestInitializer != null) requestInitializer.initialize(httpRequest);
174-
httpRequest.setResponseInterceptor(new MeteringResponseInterceptor());
174+
httpRequest.setResponseInterceptor(RepositoryStatsCollector.METERING_INTERCEPTOR);
175175
};
176176
}
177177
};

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/MeteredStorage.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323
import com.google.cloud.storage.StorageOptions;
2424
import com.google.cloud.storage.spi.v1.HttpStorageRpc;
2525

26+
import org.apache.lucene.util.IORunnable;
2627
import org.elasticsearch.SpecialPermission;
2728
import org.elasticsearch.common.blobstore.OperationPurpose;
28-
import org.elasticsearch.core.CheckedRunnable;
2929
import org.elasticsearch.core.SuppressForbidden;
3030

3131
import java.io.IOException;
@@ -42,6 +42,8 @@
4242

4343
/**
4444
* A wrapper for GCP {@link Storage} client. Provides metering and telemetry.
45+
* It's mostly boilerplate code that wraps Storage calls with thread-local metrics.
46+
* And special cases, such as Paginated List calls, WriteChannel, ReadChannel.
4547
*/
4648
public class MeteredStorage {
4749
private final Storage storage;
@@ -108,13 +110,17 @@ public MeteredObjectsGetRequest meteredObjectsGet(OperationPurpose purpose, Stri
108110
return new MeteredObjectsGetRequest(statsCollector, purpose, storageRpc.objects().get(bucket, blob));
109111
}
110112

111-
public MeteredWriteChannel meteredWriter(OperationPurpose purpose, BlobInfo blobInfo, Storage.BlobWriteOption... writeOptions) {
112-
var initStats = OperationStats.initAndGet(purpose, INSERT);
113-
return new MeteredWriteChannel(statsCollector, initStats, storage.writer(blobInfo, writeOptions));
113+
public MeteredWriteChannel meteredWriter(OperationPurpose purpose, BlobInfo blobInfo, Storage.BlobWriteOption... writeOptions)
114+
throws IOException {
115+
var initStats = new OperationStats(purpose, INSERT);
116+
return statsCollector.continueAndCollect(
117+
initStats,
118+
() -> new MeteredWriteChannel(statsCollector, initStats, storage.writer(blobInfo, writeOptions))
119+
);
114120
}
115121

116122
public MeteredReadChannel meteredReader(OperationPurpose purpose, BlobId blobId, Storage.BlobSourceOption... options) {
117-
var initStats = OperationStats.initAndGet(purpose, GET);
123+
var initStats = new OperationStats(purpose, GET);
118124
return new MeteredReadChannel(statsCollector, initStats, storage.reader(blobId, options));
119125
}
120126

@@ -186,7 +192,7 @@ public boolean isOpen() {
186192

187193
@Override
188194
public void close() throws IOException {
189-
statsCollector.finishAndCollect(stats, (CheckedRunnable<IOException>) writeChannel::close);
195+
statsCollector.finishAndCollect(stats, (IORunnable) writeChannel::close);
190196
}
191197
}
192198

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/MeteringResponseInterceptor.java

Lines changed: 0 additions & 33 deletions
This file was deleted.

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/OperationStats.java

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
public class OperationStats {
1515

16-
private static final ThreadLocal<OperationStats> OPERATION_STATS = new ThreadLocal<>();
1716
final OperationPurpose purpose;
1817
final StorageOperation operation;
1918

@@ -58,24 +57,4 @@ public class OperationStats {
5857
this.startTimeMs = 0;
5958
}
6059

61-
static OperationStats initAndGet(OperationPurpose purpose, StorageOperation operation) {
62-
var stats = new OperationStats(purpose, operation);
63-
OPERATION_STATS.set(stats);
64-
return stats;
65-
}
66-
67-
static void set(OperationStats stats) {
68-
OPERATION_STATS.set(stats);
69-
}
70-
71-
static OperationStats get() {
72-
var stats = OPERATION_STATS.get();
73-
assert stats != null : "must initialize operation stats";
74-
return stats;
75-
}
76-
77-
static void clear() {
78-
OPERATION_STATS.remove();
79-
}
80-
8160
}

0 commit comments

Comments
 (0)