Skip to content

Commit 90e7061

Browse files
committed
Test that requests and operations are tracked separately
1 parent 30968f5 commit 90e7061

File tree

3 files changed

+204
-113
lines changed

3 files changed

+204
-113
lines changed

modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryMetricsTests.java

Lines changed: 13 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
package org.elasticsearch.repositories.azure;
1111

12-
import com.sun.net.httpserver.Headers;
1312
import com.sun.net.httpserver.HttpExchange;
1413
import com.sun.net.httpserver.HttpHandler;
1514

@@ -34,7 +33,6 @@
3433

3534
import java.io.IOException;
3635
import java.nio.ByteBuffer;
37-
import java.nio.charset.StandardCharsets;
3836
import java.util.ArrayList;
3937
import java.util.List;
4038
import java.util.Map;
@@ -48,6 +46,7 @@
4846
import java.util.stream.IntStream;
4947

5048
import static org.elasticsearch.repositories.azure.AbstractAzureServerTestCase.randomBlobContent;
49+
import static org.elasticsearch.repositories.azure.ResponseInjectingAzureHttpHandler.createFailNRequestsHandler;
5150
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
5251
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
5352
import static org.hamcrest.Matchers.hasSize;
@@ -61,7 +60,7 @@ public class AzureBlobStoreRepositoryMetricsTests extends AzureBlobStoreReposito
6160
);
6261
private static final int MAX_RETRIES = 3;
6362

64-
private final Queue<RequestHandler> requestHandlers = new ConcurrentLinkedQueue<>();
63+
private final Queue<ResponseInjectingAzureHttpHandler.RequestHandler> requestHandlers = new ConcurrentLinkedQueue<>();
6564

6665
@Override
6766
protected Map<String, HttpHandler> createHttpHandlers() {
@@ -106,7 +105,8 @@ public void testThrottleResponsesAreCountedInMetrics() throws IOException {
106105

107106
// Queue up some throttle responses
108107
final int numThrottles = randomIntBetween(1, MAX_RETRIES);
109-
IntStream.range(0, numThrottles).forEach(i -> requestHandlers.offer(new FixedRequestHandler(RestStatus.TOO_MANY_REQUESTS)));
108+
IntStream.range(0, numThrottles)
109+
.forEach(i -> requestHandlers.offer(new ResponseInjectingAzureHttpHandler.FixedRequestHandler(RestStatus.TOO_MANY_REQUESTS)));
110110

111111
// Check that the blob exists
112112
blobContainer.blobExists(purpose, blobName);
@@ -131,7 +131,13 @@ public void testRangeNotSatisfiedAreCountedInMetrics() throws IOException {
131131
clearMetrics(dataNodeName);
132132

133133
// Queue up a range-not-satisfied error
134-
requestHandlers.offer(new FixedRequestHandler(RestStatus.REQUESTED_RANGE_NOT_SATISFIED, null, GET_BLOB_REQUEST_PREDICATE));
134+
requestHandlers.offer(
135+
new ResponseInjectingAzureHttpHandler.FixedRequestHandler(
136+
RestStatus.REQUESTED_RANGE_NOT_SATISFIED,
137+
null,
138+
GET_BLOB_REQUEST_PREDICATE
139+
)
140+
);
135141

136142
// Attempt to read the blob
137143
assertThrows(RequestedRangeNotSatisfiedException.class, () -> blobContainer.readBlob(purpose, blobName));
@@ -163,7 +169,7 @@ public void testErrorResponsesAreCountedInMetrics() throws IOException {
163169
if (status == RestStatus.TOO_MANY_REQUESTS) {
164170
throttles.incrementAndGet();
165171
}
166-
requestHandlers.offer(new FixedRequestHandler(status));
172+
requestHandlers.offer(new ResponseInjectingAzureHttpHandler.FixedRequestHandler(status));
167173
});
168174

169175
// Check that the blob exists
@@ -259,7 +265,7 @@ public void testBatchDeleteFailure() throws IOException {
259265
clearMetrics(dataNodeName);
260266

261267
// Handler will fail one or more of the batch requests
262-
final RequestHandler failNRequestRequestHandler = createFailNRequestsHandler(failedBatches);
268+
final ResponseInjectingAzureHttpHandler.RequestHandler failNRequestRequestHandler = createFailNRequestsHandler(failedBatches);
263269

264270
// Exhaust the retries
265271
IntStream.range(0, (numberOfBatches - failedBatches) + (failedBatches * (MAX_RETRIES + 1)))
@@ -287,35 +293,6 @@ private long getLongCounterTotal(String dataNodeName, String metricKey) {
287293
.reduce(0L, Long::sum);
288294
}
289295

290-
/**
291-
* Creates a {@link RequestHandler} that will persistently fail the first <code>numberToFail</code> distinct requests
292-
* it sees. Any other requests are passed through to the delegate.
293-
*
294-
* @param numberToFail The number of requests to fail
295-
* @return the handler
296-
*/
297-
private static RequestHandler createFailNRequestsHandler(int numberToFail) {
298-
final List<String> requestsToFail = new ArrayList<>(numberToFail);
299-
return (exchange, delegate) -> {
300-
final Headers requestHeaders = exchange.getRequestHeaders();
301-
final String requestId = requestHeaders.get("X-ms-client-request-id").get(0);
302-
boolean failRequest = false;
303-
synchronized (requestsToFail) {
304-
if (requestsToFail.contains(requestId)) {
305-
failRequest = true;
306-
} else if (requestsToFail.size() < numberToFail) {
307-
requestsToFail.add(requestId);
308-
failRequest = true;
309-
}
310-
}
311-
if (failRequest) {
312-
exchange.sendResponseHeaders(500, -1);
313-
} else {
314-
delegate.handle(exchange);
315-
}
316-
};
317-
}
318-
319296
private void clearMetrics(String discoveryNode) {
320297
internalCluster().getInstance(PluginsService.class, discoveryNode)
321298
.filterPlugins(TestTelemetryPlugin.class)
@@ -480,80 +457,4 @@ private void assertMatchingMetricRecorded(MetricType metricType, String metricNa
480457
assertion.accept(measurement);
481458
}
482459
}
483-
484-
@SuppressForbidden(reason = "we use a HttpServer to emulate Azure")
485-
private static class ResponseInjectingAzureHttpHandler implements DelegatingHttpHandler {
486-
487-
private final HttpHandler delegate;
488-
private final Queue<RequestHandler> requestHandlerQueue;
489-
490-
ResponseInjectingAzureHttpHandler(Queue<RequestHandler> requestHandlerQueue, HttpHandler delegate) {
491-
this.delegate = delegate;
492-
this.requestHandlerQueue = requestHandlerQueue;
493-
}
494-
495-
@Override
496-
public void handle(HttpExchange exchange) throws IOException {
497-
RequestHandler nextHandler = requestHandlerQueue.peek();
498-
if (nextHandler != null && nextHandler.matchesRequest(exchange)) {
499-
requestHandlerQueue.poll().writeResponse(exchange, delegate);
500-
} else {
501-
delegate.handle(exchange);
502-
}
503-
}
504-
505-
@Override
506-
public HttpHandler getDelegate() {
507-
return delegate;
508-
}
509-
}
510-
511-
@SuppressForbidden(reason = "we use a HttpServer to emulate Azure")
512-
@FunctionalInterface
513-
private interface RequestHandler {
514-
void writeResponse(HttpExchange exchange, HttpHandler delegate) throws IOException;
515-
516-
default boolean matchesRequest(HttpExchange exchange) {
517-
return true;
518-
}
519-
}
520-
521-
@SuppressForbidden(reason = "we use a HttpServer to emulate Azure")
522-
private static class FixedRequestHandler implements RequestHandler {
523-
524-
private final RestStatus status;
525-
private final String responseBody;
526-
private final Predicate<HttpExchange> requestMatcher;
527-
528-
FixedRequestHandler(RestStatus status) {
529-
this(status, null, req -> true);
530-
}
531-
532-
/**
533-
* Create a handler that only gets executed for requests that match the supplied predicate. Note
534-
* that because the errors are stored in a queue this will prevent any subsequently queued errors from
535-
* being returned until after it returns.
536-
*/
537-
FixedRequestHandler(RestStatus status, String responseBody, Predicate<HttpExchange> requestMatcher) {
538-
this.status = status;
539-
this.responseBody = responseBody;
540-
this.requestMatcher = requestMatcher;
541-
}
542-
543-
@Override
544-
public boolean matchesRequest(HttpExchange exchange) {
545-
return requestMatcher.test(exchange);
546-
}
547-
548-
@Override
549-
public void writeResponse(HttpExchange exchange, HttpHandler delegateHandler) throws IOException {
550-
if (responseBody != null) {
551-
byte[] responseBytes = responseBody.getBytes(StandardCharsets.UTF_8);
552-
exchange.sendResponseHeaders(status.getStatus(), responseBytes.length);
553-
exchange.getResponseBody().write(responseBytes);
554-
} else {
555-
exchange.sendResponseHeaders(status.getStatus(), -1);
556-
}
557-
}
558-
}
559460
}

modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerStatsTests.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,78 @@
1616
import org.elasticsearch.common.blobstore.OperationPurpose;
1717
import org.elasticsearch.common.bytes.BytesReference;
1818
import org.elasticsearch.core.SuppressForbidden;
19+
import org.elasticsearch.rest.RestStatus;
1920
import org.junit.Before;
2021

2122
import java.io.IOException;
2223
import java.nio.ByteBuffer;
24+
import java.util.Arrays;
25+
import java.util.HashMap;
2326
import java.util.List;
2427
import java.util.Map;
28+
import java.util.Queue;
29+
import java.util.concurrent.ConcurrentLinkedQueue;
30+
31+
import static org.elasticsearch.repositories.azure.AzureBlobStore.Operation.BLOB_BATCH;
32+
import static org.elasticsearch.repositories.azure.AzureBlobStore.Operation.LIST_BLOBS;
33+
import static org.elasticsearch.repositories.azure.AzureBlobStore.Operation.PUT_BLOB;
2534

2635
public class AzureBlobContainerStatsTests extends AbstractAzureServerTestCase {
2736

37+
private final Queue<ResponseInjectingAzureHttpHandler.RequestHandler> requestHandlers = new ConcurrentLinkedQueue<>();
38+
2839
@SuppressForbidden(reason = "use a http server")
2940
@Before
3041
public void configureAzureHandler() {
31-
httpServer.createContext("/", new AzureHttpHandler(ACCOUNT, CONTAINER, null, MockAzureBlobStore.LeaseExpiryPredicate.NEVER_EXPIRE));
42+
httpServer.createContext(
43+
"/",
44+
new ResponseInjectingAzureHttpHandler(
45+
requestHandlers,
46+
new AzureHttpHandler(ACCOUNT, CONTAINER, null, MockAzureBlobStore.LeaseExpiryPredicate.NEVER_EXPIRE)
47+
)
48+
);
49+
}
50+
51+
public void testRetriesAndOperationsAreTrackedSeparately() throws IOException {
52+
serverlessMode = true;
53+
final AzureBlobContainer blobContainer = asInstanceOf(AzureBlobContainer.class, createBlobContainer(between(1, 3)));
54+
final AzureBlobStore blobStore = blobContainer.getBlobStore();
55+
final OperationPurpose purpose = randomFrom(OperationPurpose.values());
56+
57+
// Just a sample of the easy operations to test
58+
final List<AzureBlobStore.Operation> supportedOperations = Arrays.asList(PUT_BLOB, LIST_BLOBS, BLOB_BATCH);
59+
final Map<AzureBlobStore.Operation, BlobStoreActionStats> expectedActionStats = new HashMap<>();
60+
supportedOperations.forEach(operation -> expectedActionStats.put(operation, BlobStoreActionStats.ZERO));
61+
62+
for (int i = 0; i < randomIntBetween(10, 50); i++) {
63+
final boolean triggerRetry = randomBoolean();
64+
if (triggerRetry) {
65+
requestHandlers.offer(new ResponseInjectingAzureHttpHandler.FixedRequestHandler(RestStatus.TOO_MANY_REQUESTS));
66+
}
67+
final AzureBlobStore.Operation operation = randomFrom(supportedOperations);
68+
switch (operation) {
69+
case PUT_BLOB -> blobStore.writeBlob(
70+
purpose,
71+
randomIdentifier(),
72+
BytesReference.fromByteBuffer(ByteBuffer.wrap(randomBlobContent())),
73+
false
74+
);
75+
case LIST_BLOBS -> blobStore.listBlobsByPrefix(purpose, randomIdentifier(), randomIdentifier());
76+
case BLOB_BATCH -> blobStore.deleteBlobsIgnoringIfNotExists(
77+
purpose,
78+
List.of(randomIdentifier(), randomIdentifier(), randomIdentifier()).iterator()
79+
);
80+
}
81+
expectedActionStats.computeIfPresent(operation, (op, stats) -> stats.add(new BlobStoreActionStats(1, triggerRetry ? 2 : 1)));
82+
}
83+
84+
final Map<String, BlobStoreActionStats> stats = blobStore.stats();
85+
expectedActionStats.forEach((operation, value) -> {
86+
if (value.isZero() == false) {
87+
String key = statsKey(purpose, operation);
88+
assertEquals(key, stats.get(key), value);
89+
}
90+
});
3291
}
3392

3493
public void testOperationPurposeIsReflectedInBlobStoreStats() throws IOException {

0 commit comments

Comments
 (0)