Skip to content

Commit 0b4f11b

Browse files
committed
Metrics collection
1 parent 328036a commit 0b4f11b

File tree

3 files changed

+133
-166
lines changed

3 files changed

+133
-166
lines changed

modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ public void testMetrics() throws Exception {
285285
final BlobStore blobStore = blobStoreRepository.blobStore();
286286
final BlobStore delegateBlobStore = ((BlobStoreWrapper) blobStore).delegate();
287287
final S3BlobStore s3BlobStore = (S3BlobStore) delegateBlobStore;
288-
final Map<S3BlobStore.StatsKey, S3BlobStore.IgnoreNoResponseMetricsPublisher> statsCollectors = s3BlobStore
288+
final Map<S3BlobStore.StatsKey, S3BlobStore.ElasticsearchS3MetricsCollector> statsCollectors = s3BlobStore
289289
.getStatsCollectors().collectors;
290290

291291
final var plugins = internalCluster().getInstance(PluginsService.class, nodeName)

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java

Lines changed: 116 additions & 165 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import software.amazon.awssdk.core.exception.SdkException;
1414
import software.amazon.awssdk.core.metrics.CoreMetric;
1515
import software.amazon.awssdk.core.retry.RetryUtils;
16+
import software.amazon.awssdk.http.HttpMetric;
1617
import software.amazon.awssdk.metrics.MetricCollection;
1718
import software.amazon.awssdk.metrics.MetricPublisher;
1819
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
@@ -38,7 +39,9 @@
3839
import org.elasticsearch.common.util.BigArrays;
3940
import org.elasticsearch.core.TimeValue;
4041
import org.elasticsearch.repositories.RepositoriesMetrics;
42+
import org.elasticsearch.rest.RestStatus;
4143
import org.elasticsearch.threadpool.ThreadPool;
44+
import org.elasticsearch.xcontent.XContentBuilder;
4245

4346
import java.io.IOException;
4447
import java.util.ArrayList;
@@ -50,6 +53,7 @@
5053
import java.util.Objects;
5154
import java.util.concurrent.ConcurrentHashMap;
5255
import java.util.concurrent.Executor;
56+
import java.util.concurrent.TimeUnit;
5357
import java.util.concurrent.atomic.LongAdder;
5458
import java.util.stream.Collectors;
5559

@@ -139,16 +143,14 @@ public TimeValue getCompareAndExchangeAntiContentionDelay() {
139143
return service.compareAndExchangeAntiContentionDelay;
140144
}
141145

142-
// metrics collector that ignores null responses that we interpret as the request not reaching the S3 endpoint due to a network
143-
// issue
144-
class IgnoreNoResponseMetricsPublisher implements MetricPublisher {
146+
class ElasticsearchS3MetricsCollector implements MetricPublisher {
145147

146148
final LongAdder requests = new LongAdder();
147149
final LongAdder operations = new LongAdder();
148150
private final Operation operation;
149151
private final Map<String, Object> attributes;
150152

151-
private IgnoreNoResponseMetricsPublisher(Operation operation, OperationPurpose purpose) {
153+
private ElasticsearchS3MetricsCollector(Operation operation, OperationPurpose purpose) {
152154
this.operation = operation;
153155
this.attributes = RepositoriesMetrics.createAttributesMap(repositoryMetadata, purpose, operation.getKey());
154156
}
@@ -159,6 +161,83 @@ BlobStoreActionStats getEndpointStats() {
159161

160162
@Override
161163
public void publish(MetricCollection metricCollection) {
164+
logPublication(metricCollection);
165+
assert operation.assertConsistentOperationName(metricCollection);
166+
167+
boolean overallSuccess = false;
168+
for (final var successMetricValue : metricCollection.metricValues(CoreMetric.API_CALL_SUCCESSFUL)) {
169+
if (Boolean.TRUE.equals(successMetricValue)) {
170+
overallSuccess = true; // but keep checking just in case
171+
} else {
172+
overallSuccess = false;
173+
break;
174+
}
175+
}
176+
177+
long totalTimeNanoseconds = 0;
178+
for (final var durationMetricValue : metricCollection.metricValues(CoreMetric.API_CALL_DURATION)) {
179+
totalTimeNanoseconds += durationMetricValue.toNanos();
180+
}
181+
182+
long requestCount = 0;
183+
long responseCount = 0;
184+
long awsErrorCount = 0;
185+
long throttleCount = 0;
186+
long http416ResponseCount = 0;
187+
for (final var apiCallAttemptMetrics : metricCollection.children()) {
188+
if ("ApiCallAttempt".equals(apiCallAttemptMetrics.name()) == false) {
189+
continue;
190+
}
191+
requestCount += 1;
192+
final var errorTypes = apiCallAttemptMetrics.metricValues(CoreMetric.ERROR_TYPE);
193+
if (errorTypes != null && errorTypes.size() > 0) {
194+
awsErrorCount += 1;
195+
if (errorTypes.contains("Throttling")) {
196+
throttleCount += 1;
197+
}
198+
}
199+
200+
final var httpResponses = apiCallAttemptMetrics.metricValues(HttpMetric.HTTP_STATUS_CODE);
201+
if (httpResponses != null && httpResponses.size() > 0) {
202+
responseCount += 1;
203+
if (httpResponses.contains(RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus())) {
204+
http416ResponseCount += 1;
205+
}
206+
}
207+
}
208+
209+
// See https://github.com/elastic/elasticsearch/pull/71406 and https://elasticco.atlassian.net/browse/ES-10223
210+
requests.add(responseCount); // requests that didn't get a HTTP status code assumed not to have reached S3 at all
211+
s3RepositoriesMetrics.common().operationCounter().incrementBy(1, attributes);
212+
operations.increment();
213+
if (overallSuccess) {
214+
s3RepositoriesMetrics.common().unsuccessfulOperationCounter().incrementBy(1, attributes);
215+
}
216+
217+
s3RepositoriesMetrics.common().requestCounter().incrementBy(requestCount, attributes);
218+
if (awsErrorCount > 0) {
219+
s3RepositoriesMetrics.common().exceptionCounter().incrementBy(awsErrorCount, attributes);
220+
s3RepositoriesMetrics.common().exceptionHistogram().record(awsErrorCount, attributes);
221+
}
222+
if (throttleCount > 0) {
223+
s3RepositoriesMetrics.common().throttleCounter().incrementBy(throttleCount, attributes);
224+
s3RepositoriesMetrics.common().throttleHistogram().record(throttleCount, attributes);
225+
}
226+
if (http416ResponseCount > 0) {
227+
s3RepositoriesMetrics.common().requestRangeNotSatisfiedExceptionCounter().incrementBy(http416ResponseCount, attributes);
228+
}
229+
230+
if (totalTimeNanoseconds > 0) {
231+
s3RepositoriesMetrics.common()
232+
.httpRequestTimeInMillisHistogram()
233+
.record(TimeUnit.NANOSECONDS.toMillis(totalTimeNanoseconds), attributes);
234+
}
235+
}
236+
237+
/**
238+
* TODO NOMERGE remove this
239+
*/
240+
private void logPublication(MetricCollection metricCollection) {
162241
logger.info("--> MetricPublisher#publish called:\n{}", Strings.toString((builder, params) -> {
163242
builder.startObject("publish");
164243
builder.field("operation", operation.toString());
@@ -167,175 +246,47 @@ public void publish(MetricCollection metricCollection) {
167246
builder.field(attributesEntry.getKey(), attributesEntry.getValue().toString());
168247
}
169248
builder.endObject();
170-
builder.field("name", metricCollection.name());
171-
builder.field("creationTime", metricCollection.creationTime().toString());
172-
builder.startArray("records");
173-
for (final var metricRecord : metricCollection) {
174-
builder.startObject();
175-
builder.field("name", metricRecord.metric().name());
176-
builder.field("level", metricRecord.metric().level().toString());
177-
builder.startArray("categories");
178-
for (final var category : metricRecord.metric().categories()) {
179-
builder.value(category.toString());
180-
}
181-
builder.endArray();
182-
builder.field("valueClass", metricRecord.metric().valueClass().getCanonicalName());
183-
builder.field("value", metricRecord.value().toString());
184-
builder.endObject();
185-
}
186-
builder.endArray();
249+
renderMetricCollection(metricCollection, builder);
187250
builder.field("stack trace", ExceptionsHelper.stackTrace(new ElasticsearchException("stack trace")));
188251
builder.endObject();
189252
return builder;
190253
}, false, true));
191-
192-
assert operation.assertConsistentOperationName(metricCollection);
193-
194-
// TODO NOMERGE metrics collection
195254
}
196255

197-
// TODO NOMERGE metrics collection
198-
// @Override
199-
// public final void collectMetrics(Request<?> request, Response<?> response) {
200-
// assert assertConsistencyBetweenHttpRequestAndOperation(request, operation);
201-
// final AWSRequestMetrics awsRequestMetrics = request.getAWSRequestMetrics();
202-
// final TimingInfo timingInfo = awsRequestMetrics.getTimingInfo();
203-
// final long requestCount = getCountForMetric(timingInfo, AWSRequestMetrics.Field.RequestCount);
204-
// final long exceptionCount = getCountForMetric(timingInfo, AWSRequestMetrics.Field.Exception);
205-
// final long throttleCount = getCountForMetric(timingInfo, AWSRequestMetrics.Field.ThrottleException);
206-
//
207-
// // For stats reported by API, do not collect stats for null response for BWC.
208-
// // See https://github.com/elastic/elasticsearch/pull/71406
209-
// // TODO Is this BWC really necessary?
210-
// // This behaviour needs to be updated, see https://elasticco.atlassian.net/browse/ES-10223
211-
// if (response != null) {
212-
// requests.add(requestCount);
213-
// }
214-
//
215-
// // We collect all metrics regardless whether response is null
216-
// // There are many situations other than network where a null response can be returned.
217-
// // In addition, we are interested in the stats when there is a network outage.
218-
// final int numberOfAwsErrors = Optional.ofNullable(awsRequestMetrics.getProperty(AWSRequestMetrics.Field.AWSErrorCode))
219-
// .map(List::size)
220-
// .orElse(0);
221-
//
222-
// if (exceptionCount > 0) {
223-
// final List<Object> statusCodes = Objects.requireNonNullElse(
224-
// awsRequestMetrics.getProperty(AWSRequestMetrics.Field.StatusCode),
225-
// List.of()
226-
// );
227-
// // REQUESTED_RANGE_NOT_SATISFIED errors are expected errors due to RCO
228-
// // TODO Add more expected client error codes?
229-
// final long amountOfRequestRangeNotSatisfiedErrors = statusCodes.stream()
230-
// .filter(e -> (Integer) e == REQUESTED_RANGE_NOT_SATISFIED.getStatus())
231-
// .count();
232-
// if (amountOfRequestRangeNotSatisfiedErrors > 0) {
233-
// s3RepositoriesMetrics.common()
234-
// .requestRangeNotSatisfiedExceptionCounter()
235-
// .incrementBy(amountOfRequestRangeNotSatisfiedErrors, attributes);
236-
// }
237-
// }
238-
//
239-
// s3RepositoriesMetrics.common().operationCounter().incrementBy(1, attributes);
240-
// operations.increment();
241-
// if (numberOfAwsErrors == requestCount) {
242-
// s3RepositoriesMetrics.common().unsuccessfulOperationCounter().incrementBy(1, attributes);
243-
// }
244-
//
245-
// s3RepositoriesMetrics.common().requestCounter().incrementBy(requestCount, attributes);
246-
// if (exceptionCount > 0) {
247-
// s3RepositoriesMetrics.common().exceptionCounter().incrementBy(exceptionCount, attributes);
248-
// s3RepositoriesMetrics.common().exceptionHistogram().record(exceptionCount, attributes);
249-
// }
250-
// if (throttleCount > 0) {
251-
// s3RepositoriesMetrics.common().throttleCounter().incrementBy(throttleCount, attributes);
252-
// s3RepositoriesMetrics.common().throttleHistogram().record(throttleCount, attributes);
253-
// }
254-
// maybeRecordHttpRequestTime(request);
255-
// }
256-
//
257-
// /**
258-
// * Used for APM style metrics to measure statics about performance. This is not for billing.
259-
// */
260-
// private void maybeRecordHttpRequestTime(Request<?> request) {
261-
// final List<TimingInfo> requestTimesIncludingRetries = request.getAWSRequestMetrics()
262-
// .getTimingInfo()
263-
// .getAllSubMeasurements(AWSRequestMetrics.Field.HttpRequestTime.name());
264-
// // It can be null if the request did not reach the server for some reason
265-
// if (requestTimesIncludingRetries == null) {
266-
// return;
267-
// }
268-
//
269-
// final long totalTimeInNanos = getTotalTimeInNanos(requestTimesIncludingRetries);
270-
// if (totalTimeInNanos == 0) {
271-
// logger.warn("Expected HttpRequestTime to be tracked for request [{}] but found no count.", request);
272-
// } else {
273-
// s3RepositoriesMetrics.common()
274-
// .httpRequestTimeInMillisHistogram()
275-
// .record(TimeUnit.NANOSECONDS.toMillis(totalTimeInNanos), attributes);
276-
// }
277-
// }
278-
//
279-
// private boolean assertConsistencyBetweenHttpRequestAndOperation(S3Request request, Operation operation) {
280-
// switch (operation) {
281-
// case HEAD_OBJECT -> {
282-
// return request.getHttpMethod().name().equals("HEAD");
283-
// }
284-
// case GET_OBJECT, LIST_OBJECTS -> {
285-
// return request.getHttpMethod().name().equals("GET");
286-
// }
287-
// case PUT_OBJECT -> {
288-
// return request.getHttpMethod().name().equals("PUT");
289-
// }
290-
// case PUT_MULTIPART_OBJECT -> {
291-
// return request.getHttpMethod().name().equals("PUT") || request.getHttpMethod().name().equals("POST");
292-
// }
293-
// case DELETE_OBJECTS -> {
294-
// return request.getHttpMethod().name().equals("POST");
295-
// }
296-
// case ABORT_MULTIPART_OBJECT -> {
297-
// return request.getHttpMethod().name().equals("DELETE");
298-
// }
299-
// default -> throw new AssertionError("unknown operation [" + operation + "]");
300-
// }
301-
// }
256+
/**
257+
* TODO NOMERGE remove this
258+
*/
259+
private void renderMetricCollection(MetricCollection metricCollection, XContentBuilder builder) throws IOException {
260+
builder.field("name", metricCollection.name());
261+
builder.field("creationTime", metricCollection.creationTime().toString());
262+
builder.startArray("records");
263+
for (final var metricRecord : metricCollection) {
264+
builder.startObject();
265+
builder.field("name", metricRecord.metric().name());
266+
builder.field("level", metricRecord.metric().level().toString());
267+
builder.startArray("categories");
268+
for (final var category : metricRecord.metric().categories()) {
269+
builder.value(category.toString());
270+
}
271+
builder.endArray();
272+
builder.field("valueClass", metricRecord.metric().valueClass().getCanonicalName());
273+
builder.field("value", metricRecord.value().toString());
274+
builder.endObject();
275+
}
276+
builder.endArray();
277+
builder.startArray("children");
278+
for (final var child : metricCollection.children()) {
279+
builder.startObject();
280+
renderMetricCollection(child, builder);
281+
builder.endObject();
282+
}
283+
builder.endArray();
284+
}
302285

303286
@Override
304287
public void close() {}
305288
}
306289

307-
// TODO NOMERGE metrics collection
308-
// private static long getCountForMetric(TimingInfo info, AWSRequestMetrics.Field field) {
309-
// var count = info.getCounter(field.name());
310-
// if (count == null) {
311-
// // This can be null if the thread was interrupted
312-
// if (field == AWSRequestMetrics.Field.RequestCount && Thread.currentThread().isInterrupted() == false) {
313-
// final String message = "Expected request count to be tracked but found not count.";
314-
// assert false : message;
315-
// logger.warn(message);
316-
// }
317-
// return 0L;
318-
// } else {
319-
// return count.longValue();
320-
// }
321-
// }
322-
//
323-
// private static long getTotalTimeInNanos(List<TimingInfo> requestTimesIncludingRetries) {
324-
// // Here we calculate the timing in Nanoseconds for the sum of the individual subMeasurements with the goal of deriving the TTFB
325-
// // (time to first byte). We use high precision time here to tell from the case when request time metric is missing (0).
326-
// // The time is converted to milliseconds for later use with an APM style counter (exposed as a long), rather than using the
327-
// // default double exposed by getTimeTakenMillisIfKnown().
328-
// // We don't need sub-millisecond precision. So no need perform the data type castings.
329-
// long totalTimeInNanos = 0;
330-
// for (TimingInfo timingInfo : requestTimesIncludingRetries) {
331-
// var endTimeInNanos = timingInfo.getEndTimeNanoIfKnown();
332-
// if (endTimeInNanos != null) {
333-
// totalTimeInNanos += endTimeInNanos - timingInfo.getStartTimeNano();
334-
// }
335-
// }
336-
// return totalTimeInNanos;
337-
// }
338-
339290
@Override
340291
public String toString() {
341292
return bucket;
@@ -627,7 +578,7 @@ public String toString() {
627578
}
628579

629580
class StatsCollectors {
630-
final Map<StatsKey, IgnoreNoResponseMetricsPublisher> collectors = new ConcurrentHashMap<>();
581+
final Map<StatsKey, ElasticsearchS3MetricsCollector> collectors = new ConcurrentHashMap<>();
631582

632583
MetricPublisher getMetricPublisher(Operation operation, OperationPurpose purpose) {
633584
return collectors.computeIfAbsent(new StatsKey(operation, purpose), k -> buildMetricPublisher(k.operation(), k.purpose()));
@@ -650,8 +601,8 @@ Map<String, BlobStoreActionStats> statsMap(boolean isStateless) {
650601
}
651602
}
652603

653-
IgnoreNoResponseMetricsPublisher buildMetricPublisher(Operation operation, OperationPurpose purpose) {
654-
return new IgnoreNoResponseMetricsPublisher(operation, purpose);
604+
ElasticsearchS3MetricsCollector buildMetricPublisher(Operation operation, OperationPurpose purpose) {
605+
return new ElasticsearchS3MetricsCollector(operation, purpose);
655606
}
656607
}
657608

test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.logging.Logger;
2828
import org.elasticsearch.rest.RestStatus;
2929
import org.elasticsearch.rest.RestUtils;
30+
import org.elasticsearch.test.ESTestCase;
3031
import org.elasticsearch.test.fixture.HttpHeaderParser;
3132

3233
import java.io.IOException;
@@ -93,6 +94,21 @@ public void handle(final HttpExchange exchange) throws IOException {
9394
assert read == -1 : "Request body should have been empty but saw [" + read + "]";
9495
}
9596
try {
97+
// TODO NOMERGE remove this
98+
// if (ESTestCase.randomBoolean()) {
99+
// final var responseBody = """
100+
// <?xml version="1.0" encoding="UTF-8"?>
101+
// <Error>
102+
// <Code>SlowDown</Code>
103+
// <Message>Test throttling</Message>
104+
// <Resource>/mybucket/myfoto.jpg</Resource>\s
105+
// <RequestId>4442587FB7D0A2F9</RequestId>
106+
// </Error>""".getBytes(UTF_8);
107+
// exchange.sendResponseHeaders(RestStatus.SERVICE_UNAVAILABLE.getStatus(), responseBody.length);
108+
// exchange.getResponseBody().write(responseBody);
109+
// return;
110+
// }
111+
96112
if (Regex.simpleMatch("HEAD /" + path + "/*", request)) {
97113
final BytesReference blob = blobs.get(requestComponents.path);
98114
if (blob == null) {

0 commit comments

Comments
 (0)