Skip to content

Commit b650f35

Browse files
authored
Generalise blob download retry metrics (elastic#142090)
1 parent 1dfccd3 commit b650f35

File tree

13 files changed

+315
-131
lines changed

13 files changed

+315
-131
lines changed

modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/MetricValidator.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ private static class Attributes {
105105

106106
static final Set<String> REPO_ATTRIBUTES = Set.of("operation", "purpose", "repo_name", "repo_type");
107107
static final Set<String> REPO_SNAPSHOT_ATTRIBUTES = Set.of("repo_name", "repo_type", "state", "stage");
108-
static final Set<String> REPO_S3_ATTRIBUTES = Sets.addToCopy(REPO_ATTRIBUTES, "action");
109108

110109
static final Set<String> REINDEX_ATTRIBUTES = Set.of("reindex_source");
111110

@@ -206,9 +205,9 @@ private static class Attributes {
206205
Map.entry("es.repositories.operations.unsuccessful.total", REPO_ATTRIBUTES),
207206
Map.entry("es.repositories.requests.http_request_time.histogram", REPO_ATTRIBUTES),
208207
Map.entry("es.repositories.requests.total", REPO_ATTRIBUTES),
209-
Map.entry("es.repositories.s3.input_stream.retry.attempts.histogram", REPO_S3_ATTRIBUTES),
210-
Map.entry("es.repositories.s3.input_stream.retry.event.total", REPO_S3_ATTRIBUTES),
211-
Map.entry("es.repositories.s3.input_stream.retry.success.total", REPO_S3_ATTRIBUTES),
208+
Map.entry("es.repositories.input_stream.retry.attempts.histogram", REPO_ATTRIBUTES),
209+
Map.entry("es.repositories.input_stream.retry.event.total", REPO_ATTRIBUTES),
210+
Map.entry("es.repositories.input_stream.retry.success.total", REPO_ATTRIBUTES),
212211
Map.entry("es.repositories.snapshots.blobs.uploaded.total", REPO_SNAPSHOT_ATTRIBUTES),
213212
Map.entry("es.repositories.snapshots.by_state.current", REPO_SNAPSHOT_ATTRIBUTES),
214213
Map.entry("es.repositories.snapshots.completed.total", REPO_SNAPSHOT_ATTRIBUTES),

modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,6 +1079,14 @@ public void onRequestComplete(Operation operation, OperationPurpose purpose, Azu
10791079
}
10801080
}
10811081

1082+
public RepositoriesMetrics getRepositoriesMetrics() {
1083+
return requestMetricsRecorder.repositoriesMetrics;
1084+
}
1085+
1086+
public RepositoryMetadata getRepositoryMetadata() {
1087+
return repositoryMetadata;
1088+
}
1089+
10821090
// visible for testing
10831091
RequestMetricsRecorder getMetricsRecorder() {
10841092
return requestMetricsRecorder;

modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRetryingInputStream.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222

2323
import java.io.IOException;
2424
import java.nio.file.NoSuchFileException;
25+
import java.util.Map;
2526

2627
public class AzureRetryingInputStream extends RetryingInputStream<String> {
2728

2829
protected AzureRetryingInputStream(AzureBlobStore azureBlobStore, OperationPurpose purpose, String blob, long position, Long length)
2930
throws IOException {
3031
super(
32+
azureBlobStore.getRepositoriesMetrics(),
3133
new AzureBlobStoreServices(azureBlobStore, purpose, blob),
3234
purpose,
3335
position,
@@ -59,16 +61,6 @@ public SingleAttemptInputStream<String> getInputStream(@Nullable String version,
5961
}
6062
}
6163

62-
@Override
63-
public void onRetryStarted(StreamAction action) {
64-
// No metrics for Azure
65-
}
66-
67-
@Override
68-
public void onRetrySucceeded(StreamAction action, long numberOfRetries) {
69-
// No metrics for Azure
70-
}
71-
7264
@Override
7365
public long getMeaningfulProgressSize() {
7466
return Math.max(1L, blobStore.getReadChunkSize() / 100L);
@@ -95,5 +87,21 @@ public String getBlobDescription() {
9587
public boolean isRetryableException(StreamAction action, Exception e) {
9688
return e instanceof AlreadyClosedException == false;
9789
}
90+
91+
@Override
92+
public Map<String, Object> getMetricsAttributes(StreamAction action) {
93+
return Map.of(
94+
"repo_type",
95+
AzureRepository.TYPE,
96+
"repo_name",
97+
blobStore.getRepositoryMetadata().name(),
98+
"operation",
99+
AzureBlobStore.Operation.GET_BLOB.getKey(),
100+
"purpose",
101+
purpose.getKey(),
102+
"es_retry_action",
103+
action.getPastTense()
104+
);
105+
}
98106
}
99107
}

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GcsRepositoryStatsCollector.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,5 +281,9 @@ public Map<String, BlobStoreActionStats> operationsStats(boolean isServerless) {
281281
return out;
282282
}
283283

284+
public RepositoriesMetrics getRepositoriesMetrics() {
285+
return telemetry;
286+
}
287+
284288
record Collector(LongAdder operations, LongAdder requests) {}
285289
}

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.elasticsearch.core.Streams;
4646
import org.elasticsearch.core.SuppressForbidden;
4747
import org.elasticsearch.core.TimeValue;
48+
import org.elasticsearch.repositories.RepositoriesMetrics;
4849
import org.elasticsearch.rest.RestStatus;
4950

5051
import java.io.ByteArrayInputStream;
@@ -154,6 +155,14 @@ int getMaxRetries() {
154155
return storageService.clientSettings(projectId, clientName).getMaxRetries();
155156
}
156157

158+
RepositoriesMetrics getRepositoriesMetrics() {
159+
return statsCollector.getRepositoriesMetrics();
160+
}
161+
162+
String getRepositoryName() {
163+
return repositoryName;
164+
}
165+
157166
@Override
158167
public BlobContainer blobContainer(BlobPath path) {
159168
return new GoogleCloudStorageBlobContainer(path, this);

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.io.IOException;
2727
import java.io.InputStream;
2828
import java.nio.file.NoSuchFileException;
29+
import java.util.Map;
2930

3031
/**
3132
* Wrapper around reads from GCS that will retry blob downloads that fail part-way through, resuming from where the failure occurred.
@@ -49,7 +50,7 @@ class GoogleCloudStorageRetryingInputStream extends RetryingInputStream<Long> {
4950
long start,
5051
long end
5152
) throws IOException {
52-
super(new GoogleCloudStorageBlobStoreServices(blobStore, purpose, blobId), purpose, start, end);
53+
super(blobStore.getRepositoriesMetrics(), new GoogleCloudStorageBlobStoreServices(blobStore, purpose, blobId), purpose, start, end);
5354
}
5455

5556
private static class GoogleCloudStorageBlobStoreServices implements BlobStoreServices<Long> {
@@ -123,16 +124,6 @@ public SingleAttemptInputStream<Long> getInputStream(@Nullable Long lastGenerati
123124
}
124125
}
125126

126-
@Override
127-
public void onRetryStarted(StreamAction action) {
128-
// No retry metrics for GCS
129-
}
130-
131-
@Override
132-
public void onRetrySucceeded(StreamAction action, long numberOfRetries) {
133-
// No retry metrics for GCS
134-
}
135-
136127
@Override
137128
public long getMeaningfulProgressSize() {
138129
return Math.max(1L, GoogleCloudStorageBlobStore.SDK_DEFAULT_CHUNK_SIZE / 100L);
@@ -158,6 +149,22 @@ public boolean isRetryableException(StreamAction action, Exception e) {
158149
case READ -> e instanceof StorageException;
159150
};
160151
}
152+
153+
@Override
154+
public Map<String, Object> getMetricsAttributes(StreamAction action) {
155+
return Map.of(
156+
"repo_type",
157+
GoogleCloudStorageRepository.TYPE,
158+
"repo_name",
159+
blobStore.getRepositoryName(),
160+
"operation",
161+
StorageOperation.GET.key(),
162+
"purpose",
163+
purpose.getKey(),
164+
"es_retry_action",
165+
action.getPastTense()
166+
);
167+
}
161168
}
162169

163170
private static Long parseGenerationHeader(HttpResponse response) {

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoriesMetrics.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,31 +10,17 @@
1010
package org.elasticsearch.repositories.s3;
1111

1212
import org.elasticsearch.repositories.RepositoriesMetrics;
13-
import org.elasticsearch.telemetry.metric.LongCounter;
1413
import org.elasticsearch.telemetry.metric.LongHistogram;
1514

16-
public record S3RepositoriesMetrics(
17-
RepositoriesMetrics common,
18-
LongCounter retryStartedCounter,
19-
LongCounter retryCompletedCounter,
20-
LongHistogram retryHistogram,
21-
LongHistogram retryDeletesHistogram
22-
) {
15+
public record S3RepositoriesMetrics(RepositoriesMetrics common, LongHistogram retryDeletesHistogram) {
2316

2417
public static final S3RepositoriesMetrics NOOP = new S3RepositoriesMetrics(RepositoriesMetrics.NOOP);
2518

26-
public static final String METRIC_RETRY_EVENT_TOTAL = "es.repositories.s3.input_stream.retry.event.total";
27-
public static final String METRIC_RETRY_SUCCESS_TOTAL = "es.repositories.s3.input_stream.retry.success.total";
28-
public static final String METRIC_RETRY_ATTEMPTS_HISTOGRAM = "es.repositories.s3.input_stream.retry.attempts.histogram";
2919
public static final String METRIC_DELETE_RETRIES_HISTOGRAM = "es.repositories.s3.delete.retry.attempts.histogram";
3020

3121
public S3RepositoriesMetrics(RepositoriesMetrics common) {
3222
this(
3323
common,
34-
common.meterRegistry().registerLongCounter(METRIC_RETRY_EVENT_TOTAL, "s3 input stream retry event count", "unit"),
35-
common.meterRegistry().registerLongCounter(METRIC_RETRY_SUCCESS_TOTAL, "s3 input stream retry success count", "unit"),
36-
common.meterRegistry()
37-
.registerLongHistogram(METRIC_RETRY_ATTEMPTS_HISTOGRAM, "s3 input stream retry attempts histogram", "unit"),
3824
common.meterRegistry().registerLongHistogram(METRIC_DELETE_RETRIES_HISTOGRAM, "s3 delete retry attempts histogram", "unit")
3925
);
4026
}

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class S3RetryingInputStream extends RetryingInputStream<String> {
4848

4949
// both start and end are inclusive bounds, following the definition in GetObjectRequest.setRange
5050
S3RetryingInputStream(OperationPurpose purpose, S3BlobStore blobStore, String blobKey, long start, long end) throws IOException {
51-
super(new S3BlobStoreServices(blobStore, blobKey, purpose), purpose, start, end);
51+
super(blobStore.getS3RepositoriesMetrics().common(), new S3BlobStoreServices(blobStore, blobKey, purpose), purpose, start, end);
5252
}
5353

5454
private record S3BlobStoreServices(S3BlobStore blobStore, String blobKey, OperationPurpose purpose)
@@ -103,18 +103,6 @@ public SingleAttemptInputStream<String> getInputStream(@Nullable String version,
103103
}
104104
}
105105

106-
@Override
107-
public void onRetryStarted(StreamAction action) {
108-
blobStore.getS3RepositoriesMetrics().retryStartedCounter().incrementBy(1, metricAttributes(action));
109-
}
110-
111-
@Override
112-
public void onRetrySucceeded(StreamAction action, long numberOfRetries) {
113-
final Map<String, Object> attributes = metricAttributes(action);
114-
blobStore.getS3RepositoriesMetrics().retryCompletedCounter().incrementBy(1, attributes);
115-
blobStore.getS3RepositoriesMetrics().retryHistogram().record(numberOfRetries, attributes);
116-
}
117-
118106
@Override
119107
public long getMeaningfulProgressSize() {
120108
return Math.max(1L, blobStore.bufferSizeInBytes() / 100L);
@@ -138,7 +126,8 @@ public boolean isRetryableException(StreamAction action, Exception e) {
138126
};
139127
}
140128

141-
private Map<String, Object> metricAttributes(StreamAction action) {
129+
@Override
130+
public Map<String, Object> getMetricsAttributes(StreamAction action) {
142131
return Map.of(
143132
"repo_type",
144133
S3Repository.TYPE,
@@ -148,7 +137,7 @@ private Map<String, Object> metricAttributes(StreamAction action) {
148137
Operation.GET_OBJECT.getKey(),
149138
"purpose",
150139
purpose.getKey(),
151-
"action",
140+
"es_retry_action",
152141
action.getPastTense()
153142
);
154143
}

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1547,24 +1547,35 @@ private long getOperationMeasurements() {
15471547
private List<Measurement> getRetryStartedMeasurements() {
15481548
return Measurement.combine(
15491549
recordingMeterRegistry.getRecorder()
1550-
.getMeasurements(InstrumentType.LONG_COUNTER, S3RepositoriesMetrics.METRIC_RETRY_EVENT_TOTAL)
1550+
.getMeasurements(InstrumentType.LONG_COUNTER, RepositoriesMetrics.METRIC_INPUT_STREAM_RETRY_EVENT_TOTAL)
15511551
);
15521552
}
15531553

15541554
private List<Measurement> getRetryCompletedMeasurements() {
15551555
return Measurement.combine(
15561556
recordingMeterRegistry.getRecorder()
1557-
.getMeasurements(InstrumentType.LONG_COUNTER, S3RepositoriesMetrics.METRIC_RETRY_SUCCESS_TOTAL)
1557+
.getMeasurements(InstrumentType.LONG_COUNTER, RepositoriesMetrics.METRIC_INPUT_STREAM_RETRY_SUCCESS_TOTAL)
15581558
);
15591559
}
15601560

15611561
private List<Measurement> getRetryHistogramMeasurements() {
15621562
return recordingMeterRegistry.getRecorder()
1563-
.getMeasurements(InstrumentType.LONG_HISTOGRAM, S3RepositoriesMetrics.METRIC_RETRY_ATTEMPTS_HISTOGRAM);
1563+
.getMeasurements(InstrumentType.LONG_HISTOGRAM, RepositoriesMetrics.METRIC_INPUT_STREAM_RETRY_ATTEMPTS_HISTOGRAM);
15641564
}
15651565

15661566
private Map<String, Object> metricAttributes(String action) {
1567-
return Map.of("repo_type", "s3", "repo_name", "repository", "operation", "GetObject", "purpose", "Indices", "action", action);
1567+
return Map.of(
1568+
"repo_type",
1569+
"s3",
1570+
"repo_name",
1571+
"repository",
1572+
"operation",
1573+
"GetObject",
1574+
"purpose",
1575+
"Indices",
1576+
"es_retry_action",
1577+
action
1578+
);
15681579
}
15691580

15701581
private static boolean isMultiDeleteRequest(HttpExchange exchange) {

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RetryingInputStreamTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ private S3RetryingInputStream createInputStream(final byte[] data, @Nullable fin
239239
when(blobStore.clientReference()).thenReturn(clientReference);
240240
final MetricPublisher metricPublisher = mock(MetricPublisher.class);
241241
when(blobStore.getMetricPublisher(any(S3BlobStore.Operation.class), any(OperationPurpose.class))).thenReturn(metricPublisher);
242+
when(blobStore.getS3RepositoriesMetrics()).thenReturn(S3RepositoriesMetrics.NOOP);
242243

243244
if (position != null && length != null) {
244245
if (data.length <= position) {

0 commit comments

Comments
 (0)