Skip to content

Commit ff6a463

Browse files
authored
Clean up request parsing in S3HttpHandler (#126094)
The `METHOD /path/components?and=query` string representation of a request is becoming increasingly difficult to parse, with slight variations in parsing between the implementation in `S3HttpHandler` and the various other implementations. This commit gets rid of the string-concatenate-and-split behaviour in favour of a proper object that has predicates for testing all the different kinds of request that might be made against S3. Backport of #126034 to 8.x
1 parent f01a2ae commit ff6a463

File tree

6 files changed

+293
-228
lines changed

6 files changed

+293
-228
lines changed

modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212

1313
import com.azure.storage.common.policy.RequestRetryOptions;
1414
import com.azure.storage.common.policy.RetryPolicyType;
15-
import com.sun.net.httpserver.Headers;
1615
import com.sun.net.httpserver.HttpExchange;
1716
import com.sun.net.httpserver.HttpHandler;
1817

@@ -205,7 +204,9 @@ private AzureHTTPStatsCollectorHandler(HttpHandler delegate) {
205204
}
206205

207206
@Override
208-
protected void maybeTrack(String request, Headers headers) {
207+
protected void maybeTrack(HttpExchange exchange) {
208+
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
209+
final var headers = exchange.getRequestHeaders();
209210
// Same request id is a retry
210211
// https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-ncnbi/817da997-30d2-4cd3-972f-a0073e4e98f7
211212
// Do not count retries since the client side request stats do not track them yet.

modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,9 @@ private static class GoogleCloudStorageStatsCollectorHttpHandler extends HttpSta
343343
}
344344

345345
@Override
346-
public void maybeTrack(final String request, Headers requestHeaders) {
346+
public void maybeTrack(HttpExchange exchange) {
347+
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
348+
final Headers requestHeaders = exchange.getRequestHeaders();
347349
if (Regex.simpleMatch("GET /storage/v1/b/*/o/*", request)) {
348350
trackRequest("GetObject");
349351
} else if (Regex.simpleMatch("GET /storage/v1/b/*/o*", request)) {

modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
1515
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
1616
import com.amazonaws.services.s3.model.MultipartUpload;
17-
import com.sun.net.httpserver.Headers;
1817
import com.sun.net.httpserver.HttpExchange;
1918
import com.sun.net.httpserver.HttpHandler;
2019

@@ -671,16 +670,17 @@ protected class S3StatsCollectorHttpHandler extends HttpStatsCollectorHandler {
671670
super(delegate);
672671
}
673672

673+
private S3HttpHandler.S3Request parseRequest(HttpExchange exchange) {
674+
return new S3HttpHandler("bucket").parseRequest(exchange);
675+
}
676+
674677
@Override
675678
public void handle(HttpExchange exchange) throws IOException {
676-
final S3HttpHandler.RequestComponents requestComponents = S3HttpHandler.parseRequestComponents(
677-
S3HttpHandler.getRawRequestString(exchange)
678-
);
679-
if (false == requestComponents.request().startsWith("HEAD ")) {
680-
assertThat(requestComponents.customQueryParameters(), hasKey(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE));
679+
final S3HttpHandler.S3Request s3Request = parseRequest(exchange);
680+
if ("HEAD".equals(s3Request.method())) {
681+
assertTrue(s3Request.hasQueryParamOnce(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE));
681682
}
682-
final String request = requestComponents.request();
683-
if (shouldFailCompleteMultipartUploadRequest.get() && Regex.simpleMatch("POST /*/*?uploadId=*", request)) {
683+
if (shouldFailCompleteMultipartUploadRequest.get() && s3Request.isCompleteMultipartUploadRequest()) {
684684
try (exchange) {
685685
drainInputStream(exchange.getRequestBody());
686686
exchange.sendResponseHeaders(
@@ -694,20 +694,17 @@ public void handle(HttpExchange exchange) throws IOException {
694694
}
695695

696696
@Override
697-
public void maybeTrack(final String rawRequest, Headers requestHeaders) {
698-
final S3HttpHandler.RequestComponents requestComponents = S3HttpHandler.parseRequestComponents(rawRequest);
699-
final String request = requestComponents.request();
700-
final OperationPurpose purpose = OperationPurpose.parse(
701-
requestComponents.customQueryParameters().get(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE).get(0)
702-
);
703-
if (Regex.simpleMatch("GET /*/?prefix=*", request)) {
697+
public void maybeTrack(HttpExchange exchange) {
698+
final S3HttpHandler.S3Request request = parseRequest(exchange);
699+
final OperationPurpose purpose = OperationPurpose.parse(request.getQueryParamOnce(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE));
700+
if (request.isListObjectsRequest()) {
704701
trackRequest("ListObjects");
705702
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.LIST_OBJECTS, purpose), k -> new AtomicLong())
706703
.incrementAndGet();
707-
} else if (Regex.simpleMatch("GET /*/?uploads&*", request)) {
704+
} else if (request.isListMultipartUploadsRequest()) {
708705
// TODO track ListMultipartUploads requests
709706
logger.info("--> ListMultipartUploads not tracked [{}] with parsed purpose [{}]", request, purpose.getKey());
710-
} else if (Regex.simpleMatch("GET /*/*", request)) {
707+
} else if (request.isGetObjectRequest()) {
711708
trackRequest("GetObject");
712709
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.GET_OBJECT, purpose), k -> new AtomicLong())
713710
.incrementAndGet();
@@ -717,21 +714,21 @@ public void maybeTrack(final String rawRequest, Headers requestHeaders) {
717714
new S3BlobStore.StatsKey(S3BlobStore.Operation.PUT_MULTIPART_OBJECT, purpose),
718715
k -> new AtomicLong()
719716
).incrementAndGet();
720-
} else if (Regex.simpleMatch("PUT /*/*", request)) {
717+
} else if (request.isPutObjectRequest()) {
721718
trackRequest("PutObject");
722719
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.PUT_OBJECT, purpose), k -> new AtomicLong())
723720
.incrementAndGet();
724-
} else if (Regex.simpleMatch("POST /*/?delete", request)) {
721+
} else if (request.isMultiObjectDeleteRequest()) {
725722
trackRequest("DeleteObjects");
726723
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.DELETE_OBJECTS, purpose), k -> new AtomicLong())
727724
.incrementAndGet();
728-
} else if (Regex.simpleMatch("DELETE /*/*?uploadId=*", request)) {
725+
} else if (request.isAbortMultipartUploadRequest()) {
729726
trackRequest("AbortMultipartObject");
730727
metricsCount.computeIfAbsent(
731728
new S3BlobStore.StatsKey(S3BlobStore.Operation.ABORT_MULTIPART_OBJECT, purpose),
732729
k -> new AtomicLong()
733730
).incrementAndGet();
734-
} else if (Regex.simpleMatch("HEAD /*/*", request)) {
731+
} else if (request.isHeadObjectRequest()) {
735732
trackRequest("HeadObject");
736733
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.HEAD_OBJECT, purpose), k -> new AtomicLong())
737734
.incrementAndGet();
@@ -744,10 +741,10 @@ Map<S3BlobStore.StatsKey, AtomicLong> getMetricsCount() {
744741
return metricsCount;
745742
}
746743

747-
private boolean isMultiPartUpload(String request) {
748-
return Regex.simpleMatch("POST /*/*?uploads", request)
749-
|| Regex.simpleMatch("POST /*/*?*uploadId=*", request)
750-
|| Regex.simpleMatch("PUT /*/*?*uploadId=*", request);
744+
private boolean isMultiPartUpload(S3HttpHandler.S3Request s3Request) {
745+
return s3Request.isInitiateMultipartUploadRequest()
746+
|| s3Request.isUploadPartRequest()
747+
|| s3Request.isCompleteMultipartUploadRequest();
751748
}
752749
}
753750
}

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java

Lines changed: 67 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,10 @@ protected long getRetryDelayInMillis() {
242242
};
243243
}
244244

245+
private static S3HttpHandler.S3Request parseRequest(HttpExchange exchange) {
246+
return new S3HttpHandler("bucket").parseRequest(exchange);
247+
}
248+
245249
public void testWriteBlobWithRetries() throws Exception {
246250
final int maxRetries = randomInt(5);
247251
final CountDown countDown = new CountDown(maxRetries + 1);
@@ -250,10 +254,8 @@ public void testWriteBlobWithRetries() throws Exception {
250254

251255
final byte[] bytes = randomBlobContent();
252256
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_max_retries"), exchange -> {
253-
final S3HttpHandler.RequestComponents requestComponents = S3HttpHandler.parseRequestComponents(
254-
S3HttpHandler.getRawRequestString(exchange)
255-
);
256-
if ("PUT".equals(requestComponents.method()) && requestComponents.query().isEmpty()) {
257+
final S3HttpHandler.S3Request s3Request = parseRequest(exchange);
258+
if (s3Request.isPutObjectRequest()) {
257259
if (countDown.countDown()) {
258260
final BytesReference body = Streams.readFully(exchange.getRequestBody());
259261
if (Objects.deepEquals(bytes, BytesReference.toBytes(body))) {
@@ -342,12 +344,10 @@ public void testWriteLargeBlob() throws Exception {
342344
final CountDown countDownComplete = new CountDown(nbErrors);
343345

344346
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_large_blob"), exchange -> {
345-
final S3HttpHandler.RequestComponents requestComponents = S3HttpHandler.parseRequestComponents(
346-
S3HttpHandler.getRawRequestString(exchange)
347-
);
347+
final S3HttpHandler.S3Request s3Request = parseRequest(exchange);
348348
final long contentLength = Long.parseLong(exchange.getRequestHeaders().getFirst("Content-Length"));
349349

350-
if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploads")) {
350+
if (s3Request.isInitiateMultipartUploadRequest()) {
351351
// initiate multipart upload request
352352
if (countDownInitiate.countDown()) {
353353
byte[] response = ("""
@@ -363,39 +363,37 @@ public void testWriteLargeBlob() throws Exception {
363363
exchange.close();
364364
return;
365365
}
366-
} else if ("PUT".equals(requestComponents.method())
367-
&& requestComponents.query().contains("uploadId=TEST")
368-
&& requestComponents.query().contains("partNumber=")) {
369-
// upload part request
370-
MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody());
371-
BytesReference bytes = Streams.readFully(md5);
372-
assertThat((long) bytes.length(), anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes())));
373-
assertThat(contentLength, anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes())));
374-
375-
if (countDownUploads.decrementAndGet() % 2 == 0) {
376-
exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest()));
377-
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
378-
exchange.close();
379-
return;
380-
}
366+
} else if (s3Request.isUploadPartRequest()) {
367+
// upload part request
368+
MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody());
369+
BytesReference bytes = Streams.readFully(md5);
370+
assertThat((long) bytes.length(), anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes())));
371+
assertThat(contentLength, anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes())));
372+
373+
if (countDownUploads.decrementAndGet() % 2 == 0) {
374+
exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest()));
375+
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
376+
exchange.close();
377+
return;
378+
}
381379

382-
} else if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploadId=TEST")) {
383-
// complete multipart upload request
384-
if (countDownComplete.countDown()) {
385-
Streams.readFully(exchange.getRequestBody());
386-
byte[] response = ("""
387-
<?xml version="1.0" encoding="UTF-8"?>
388-
<CompleteMultipartUploadResult>
389-
<Bucket>bucket</Bucket>
390-
<Key>write_large_blob</Key>
391-
</CompleteMultipartUploadResult>""").getBytes(StandardCharsets.UTF_8);
392-
exchange.getResponseHeaders().add("Content-Type", "application/xml");
393-
exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
394-
exchange.getResponseBody().write(response);
395-
exchange.close();
396-
return;
397-
}
380+
} else if (s3Request.isCompleteMultipartUploadRequest()) {
381+
// complete multipart upload request
382+
if (countDownComplete.countDown()) {
383+
Streams.readFully(exchange.getRequestBody());
384+
byte[] response = ("""
385+
<?xml version="1.0" encoding="UTF-8"?>
386+
<CompleteMultipartUploadResult>
387+
<Bucket>bucket</Bucket>
388+
<Key>write_large_blob</Key>
389+
</CompleteMultipartUploadResult>""").getBytes(StandardCharsets.UTF_8);
390+
exchange.getResponseHeaders().add("Content-Type", "application/xml");
391+
exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
392+
exchange.getResponseBody().write(response);
393+
exchange.close();
394+
return;
398395
}
396+
}
399397

400398
// sends an error back or let the request time out
401399
if (useTimeout == false) {
@@ -444,12 +442,10 @@ public void testWriteLargeBlobStreaming() throws Exception {
444442
final CountDown countDownComplete = new CountDown(nbErrors);
445443

446444
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_large_blob_streaming"), exchange -> {
447-
final S3HttpHandler.RequestComponents requestComponents = S3HttpHandler.parseRequestComponents(
448-
S3HttpHandler.getRawRequestString(exchange)
449-
);
445+
final S3HttpHandler.S3Request s3Request = parseRequest(exchange);
450446
final long contentLength = Long.parseLong(exchange.getRequestHeaders().getFirst("Content-Length"));
451447

452-
if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploads")) {
448+
if (s3Request.isInitiateMultipartUploadRequest()) {
453449
// initiate multipart upload request
454450
if (countDownInitiate.countDown()) {
455451
byte[] response = ("""
@@ -465,38 +461,36 @@ public void testWriteLargeBlobStreaming() throws Exception {
465461
exchange.close();
466462
return;
467463
}
468-
} else if ("PUT".equals(requestComponents.method())
469-
&& requestComponents.query().contains("uploadId=TEST")
470-
&& requestComponents.query().contains("partNumber=")) {
471-
// upload part request
472-
MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody());
473-
BytesReference bytes = Streams.readFully(md5);
474-
475-
if (counterUploads.incrementAndGet() % 2 == 0) {
476-
bytesReceived.addAndGet(bytes.length());
477-
exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest()));
478-
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
479-
exchange.close();
480-
return;
481-
}
464+
} else if (s3Request.isUploadPartRequest()) {
465+
// upload part request
466+
MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody());
467+
BytesReference bytes = Streams.readFully(md5);
468+
469+
if (counterUploads.incrementAndGet() % 2 == 0) {
470+
bytesReceived.addAndGet(bytes.length());
471+
exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest()));
472+
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
473+
exchange.close();
474+
return;
475+
}
482476

483-
} else if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploadId=TEST")) {
484-
// complete multipart upload request
485-
if (countDownComplete.countDown()) {
486-
Streams.readFully(exchange.getRequestBody());
487-
byte[] response = ("""
488-
<?xml version="1.0" encoding="UTF-8"?>
489-
<CompleteMultipartUploadResult>
490-
<Bucket>bucket</Bucket>
491-
<Key>write_large_blob_streaming</Key>
492-
</CompleteMultipartUploadResult>""").getBytes(StandardCharsets.UTF_8);
493-
exchange.getResponseHeaders().add("Content-Type", "application/xml");
494-
exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
495-
exchange.getResponseBody().write(response);
496-
exchange.close();
497-
return;
498-
}
477+
} else if (s3Request.isCompleteMultipartUploadRequest()) {
478+
// complete multipart upload request
479+
if (countDownComplete.countDown()) {
480+
Streams.readFully(exchange.getRequestBody());
481+
byte[] response = ("""
482+
<?xml version="1.0" encoding="UTF-8"?>
483+
<CompleteMultipartUploadResult>
484+
<Bucket>bucket</Bucket>
485+
<Key>write_large_blob_streaming</Key>
486+
</CompleteMultipartUploadResult>""").getBytes(StandardCharsets.UTF_8);
487+
exchange.getResponseHeaders().add("Content-Type", "application/xml");
488+
exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
489+
exchange.getResponseBody().write(response);
490+
exchange.close();
491+
return;
499492
}
493+
}
500494

501495
// sends an error back or let the request time out
502496
if (useTimeout == false) {

0 commit comments

Comments
 (0)