Skip to content

Commit 0d64aab

Browse files
authored
Clean up request parsing in S3HttpHandler (#126034)
The `METHOD /path/components?and=query` string representation of a request is becoming increasingly difficult to parse, with slight variations in parsing between the implementation in `S3HttpHandler` and the various other implementations. This commit gets rid of the string-concatenate-and-split behaviour in favour of a proper object that has predicates for testing all the different kinds of request that might be made against S3.
1 parent b8c70ae commit 0d64aab

File tree

6 files changed

+294
-230
lines changed

6 files changed

+294
-230
lines changed

modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
import com.azure.storage.common.policy.RequestRetryOptions;
1515
import com.azure.storage.common.policy.RetryPolicyType;
16-
import com.sun.net.httpserver.Headers;
1716
import com.sun.net.httpserver.HttpExchange;
1817
import com.sun.net.httpserver.HttpHandler;
1918

@@ -246,7 +245,8 @@ private AzureHTTPStatsCollectorHandler(HttpHandler delegate) {
246245
}
247246

248247
@Override
249-
protected void maybeTrack(String request, Headers headers) {
248+
protected void maybeTrack(HttpExchange exchange) {
249+
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
250250
if (GET_BLOB_PATTERN.test(request)) {
251251
trackRequest("GetBlob");
252252
} else if (Regex.simpleMatch("HEAD /*/*/*", request)) {

modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,9 @@ private static class GoogleCloudStorageStatsCollectorHttpHandler extends HttpSta
346346
}
347347

348348
@Override
349-
public void maybeTrack(final String request, Headers requestHeaders) {
349+
public void maybeTrack(HttpExchange exchange) {
350+
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
351+
final Headers requestHeaders = exchange.getRequestHeaders();
350352
if (Regex.simpleMatch("GET */storage/v1/b/*/o/*", request)) {
351353
trackRequest(Operation.GET_OBJECT.key());
352354
} else if (Regex.simpleMatch("GET /storage/v1/b/*/o*", request)) {

modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
1515
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
1616
import com.amazonaws.services.s3.model.MultipartUpload;
17-
import com.sun.net.httpserver.Headers;
1817
import com.sun.net.httpserver.HttpExchange;
1918
import com.sun.net.httpserver.HttpHandler;
2019

@@ -692,16 +691,17 @@ protected class S3StatsCollectorHttpHandler extends HttpStatsCollectorHandler {
692691
super(delegate);
693692
}
694693

694+
private S3HttpHandler.S3Request parseRequest(HttpExchange exchange) {
695+
return new S3HttpHandler("bucket").parseRequest(exchange);
696+
}
697+
695698
@Override
696699
public void handle(HttpExchange exchange) throws IOException {
697-
final S3HttpHandler.RequestComponents requestComponents = S3HttpHandler.parseRequestComponents(
698-
S3HttpHandler.getRawRequestString(exchange)
699-
);
700-
if (false == requestComponents.request().startsWith("HEAD ")) {
701-
assertThat(requestComponents.customQueryParameters(), hasKey(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE));
700+
final S3HttpHandler.S3Request s3Request = parseRequest(exchange);
701+
if ("HEAD".equals(s3Request.method())) {
702+
assertTrue(s3Request.hasQueryParamOnce(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE));
702703
}
703-
final String request = requestComponents.request();
704-
if (shouldFailCompleteMultipartUploadRequest.get() && Regex.simpleMatch("POST /*/*?uploadId=*", request)) {
704+
if (shouldFailCompleteMultipartUploadRequest.get() && s3Request.isCompleteMultipartUploadRequest()) {
705705
try (exchange) {
706706
drainInputStream(exchange.getRequestBody());
707707
exchange.sendResponseHeaders(
@@ -715,20 +715,17 @@ public void handle(HttpExchange exchange) throws IOException {
715715
}
716716

717717
@Override
718-
public void maybeTrack(final String rawRequest, Headers requestHeaders) {
719-
final S3HttpHandler.RequestComponents requestComponents = S3HttpHandler.parseRequestComponents(rawRequest);
720-
final String request = requestComponents.request();
721-
final OperationPurpose purpose = OperationPurpose.parse(
722-
requestComponents.customQueryParameters().get(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE).get(0)
723-
);
724-
if (Regex.simpleMatch("GET /*/?prefix=*", request)) {
718+
public void maybeTrack(HttpExchange exchange) {
719+
final S3HttpHandler.S3Request request = parseRequest(exchange);
720+
final OperationPurpose purpose = OperationPurpose.parse(request.getQueryParamOnce(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE));
721+
if (request.isListObjectsRequest()) {
725722
trackRequest("ListObjects");
726723
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.LIST_OBJECTS, purpose), k -> new AtomicLong())
727724
.incrementAndGet();
728-
} else if (Regex.simpleMatch("GET /*/?uploads&*", request)) {
725+
} else if (request.isListMultipartUploadsRequest()) {
729726
// TODO track ListMultipartUploads requests
730727
logger.info("--> ListMultipartUploads not tracked [{}] with parsed purpose [{}]", request, purpose.getKey());
731-
} else if (Regex.simpleMatch("GET /*/*", request)) {
728+
} else if (request.isGetObjectRequest()) {
732729
trackRequest("GetObject");
733730
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.GET_OBJECT, purpose), k -> new AtomicLong())
734731
.incrementAndGet();
@@ -738,21 +735,21 @@ public void maybeTrack(final String rawRequest, Headers requestHeaders) {
738735
new S3BlobStore.StatsKey(S3BlobStore.Operation.PUT_MULTIPART_OBJECT, purpose),
739736
k -> new AtomicLong()
740737
).incrementAndGet();
741-
} else if (Regex.simpleMatch("PUT /*/*", request)) {
738+
} else if (request.isPutObjectRequest()) {
742739
trackRequest("PutObject");
743740
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.PUT_OBJECT, purpose), k -> new AtomicLong())
744741
.incrementAndGet();
745-
} else if (Regex.simpleMatch("POST /*/?delete", request)) {
742+
} else if (request.isMultiObjectDeleteRequest()) {
746743
trackRequest("DeleteObjects");
747744
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.DELETE_OBJECTS, purpose), k -> new AtomicLong())
748745
.incrementAndGet();
749-
} else if (Regex.simpleMatch("DELETE /*/*?uploadId=*", request)) {
746+
} else if (request.isAbortMultipartUploadRequest()) {
750747
trackRequest("AbortMultipartObject");
751748
metricsCount.computeIfAbsent(
752749
new S3BlobStore.StatsKey(S3BlobStore.Operation.ABORT_MULTIPART_OBJECT, purpose),
753750
k -> new AtomicLong()
754751
).incrementAndGet();
755-
} else if (Regex.simpleMatch("HEAD /*/*", request)) {
752+
} else if (request.isHeadObjectRequest()) {
756753
trackRequest("HeadObject");
757754
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.HEAD_OBJECT, purpose), k -> new AtomicLong())
758755
.incrementAndGet();
@@ -765,10 +762,10 @@ Map<S3BlobStore.StatsKey, AtomicLong> getMetricsCount() {
765762
return metricsCount;
766763
}
767764

768-
private boolean isMultiPartUpload(String request) {
769-
return Regex.simpleMatch("POST /*/*?uploads", request)
770-
|| Regex.simpleMatch("POST /*/*?*uploadId=*", request)
771-
|| Regex.simpleMatch("PUT /*/*?*uploadId=*", request);
765+
private boolean isMultiPartUpload(S3HttpHandler.S3Request s3Request) {
766+
return s3Request.isInitiateMultipartUploadRequest()
767+
|| s3Request.isUploadPartRequest()
768+
|| s3Request.isCompleteMultipartUploadRequest();
772769
}
773770
}
774771
}

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java

Lines changed: 69 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,10 @@ protected long getRetryDelayInMillis() {
257257
};
258258
}
259259

260+
private static S3HttpHandler.S3Request parseRequest(HttpExchange exchange) {
261+
return new S3HttpHandler("bucket").parseRequest(exchange);
262+
}
263+
260264
public void testWriteBlobWithRetries() throws Exception {
261265
final int maxRetries = randomInt(5);
262266
final CountDown countDown = new CountDown(maxRetries + 1);
@@ -265,10 +269,8 @@ public void testWriteBlobWithRetries() throws Exception {
265269

266270
final byte[] bytes = randomBlobContent();
267271
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_max_retries"), exchange -> {
268-
final S3HttpHandler.RequestComponents requestComponents = S3HttpHandler.parseRequestComponents(
269-
S3HttpHandler.getRawRequestString(exchange)
270-
);
271-
if ("PUT".equals(requestComponents.method()) && requestComponents.query().isEmpty()) {
272+
final S3HttpHandler.S3Request s3Request = parseRequest(exchange);
273+
if (s3Request.isPutObjectRequest()) {
272274
if (countDown.countDown()) {
273275
final BytesReference body = Streams.readFully(exchange.getRequestBody());
274276
if (Objects.deepEquals(bytes, BytesReference.toBytes(body))) {
@@ -353,8 +355,8 @@ public void testWriteBlobWithExceptionThrownAtClosingTime() throws Exception {
353355

354356
var uploadedBytes = new AtomicReference<BytesReference>();
355357
httpServer.createContext(downloadStorageEndpoint(blobContainer, blobName), exchange -> {
356-
var requestComponents = S3HttpHandler.parseRequestComponents(S3HttpHandler.getRawRequestString(exchange));
357-
if ("PUT".equals(requestComponents.method()) && requestComponents.query().isEmpty()) {
358+
var requestComponents = parseRequest(exchange);
359+
if (requestComponents.isPutObjectRequest()) {
358360
var body = Streams.readFully(exchange.getRequestBody());
359361
if (uploadedBytes.compareAndSet(null, body)) {
360362
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
@@ -408,12 +410,10 @@ public void testWriteLargeBlob() throws Exception {
408410
final CountDown countDownComplete = new CountDown(nbErrors);
409411

410412
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_large_blob"), exchange -> {
411-
final S3HttpHandler.RequestComponents requestComponents = S3HttpHandler.parseRequestComponents(
412-
S3HttpHandler.getRawRequestString(exchange)
413-
);
413+
final S3HttpHandler.S3Request s3Request = parseRequest(exchange);
414414
final long contentLength = Long.parseLong(exchange.getRequestHeaders().getFirst("Content-Length"));
415415

416-
if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploads")) {
416+
if (s3Request.isInitiateMultipartUploadRequest()) {
417417
// initiate multipart upload request
418418
if (countDownInitiate.countDown()) {
419419
byte[] response = ("""
@@ -429,39 +429,37 @@ public void testWriteLargeBlob() throws Exception {
429429
exchange.close();
430430
return;
431431
}
432-
} else if ("PUT".equals(requestComponents.method())
433-
&& requestComponents.query().contains("uploadId=TEST")
434-
&& requestComponents.query().contains("partNumber=")) {
435-
// upload part request
436-
MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody());
437-
BytesReference bytes = Streams.readFully(md5);
438-
assertThat((long) bytes.length(), anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes())));
439-
assertThat(contentLength, anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes())));
440-
441-
if (countDownUploads.decrementAndGet() % 2 == 0) {
442-
exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest()));
443-
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
444-
exchange.close();
445-
return;
446-
}
432+
} else if (s3Request.isUploadPartRequest()) {
433+
// upload part request
434+
MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody());
435+
BytesReference bytes = Streams.readFully(md5);
436+
assertThat((long) bytes.length(), anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes())));
437+
assertThat(contentLength, anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes())));
438+
439+
if (countDownUploads.decrementAndGet() % 2 == 0) {
440+
exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest()));
441+
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
442+
exchange.close();
443+
return;
444+
}
447445

448-
} else if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploadId=TEST")) {
449-
// complete multipart upload request
450-
if (countDownComplete.countDown()) {
451-
Streams.readFully(exchange.getRequestBody());
452-
byte[] response = ("""
453-
<?xml version="1.0" encoding="UTF-8"?>
454-
<CompleteMultipartUploadResult>
455-
<Bucket>bucket</Bucket>
456-
<Key>write_large_blob</Key>
457-
</CompleteMultipartUploadResult>""").getBytes(StandardCharsets.UTF_8);
458-
exchange.getResponseHeaders().add("Content-Type", "application/xml");
459-
exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
460-
exchange.getResponseBody().write(response);
461-
exchange.close();
462-
return;
463-
}
446+
} else if (s3Request.isCompleteMultipartUploadRequest()) {
447+
// complete multipart upload request
448+
if (countDownComplete.countDown()) {
449+
Streams.readFully(exchange.getRequestBody());
450+
byte[] response = ("""
451+
<?xml version="1.0" encoding="UTF-8"?>
452+
<CompleteMultipartUploadResult>
453+
<Bucket>bucket</Bucket>
454+
<Key>write_large_blob</Key>
455+
</CompleteMultipartUploadResult>""").getBytes(StandardCharsets.UTF_8);
456+
exchange.getResponseHeaders().add("Content-Type", "application/xml");
457+
exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
458+
exchange.getResponseBody().write(response);
459+
exchange.close();
460+
return;
464461
}
462+
}
465463

466464
// sends an error back or let the request time out
467465
if (useTimeout == false) {
@@ -510,12 +508,10 @@ public void testWriteLargeBlobStreaming() throws Exception {
510508
final CountDown countDownComplete = new CountDown(nbErrors);
511509

512510
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_large_blob_streaming"), exchange -> {
513-
final S3HttpHandler.RequestComponents requestComponents = S3HttpHandler.parseRequestComponents(
514-
S3HttpHandler.getRawRequestString(exchange)
515-
);
511+
final S3HttpHandler.S3Request s3Request = parseRequest(exchange);
516512
final long contentLength = Long.parseLong(exchange.getRequestHeaders().getFirst("Content-Length"));
517513

518-
if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploads")) {
514+
if (s3Request.isInitiateMultipartUploadRequest()) {
519515
// initiate multipart upload request
520516
if (countDownInitiate.countDown()) {
521517
byte[] response = ("""
@@ -531,38 +527,36 @@ public void testWriteLargeBlobStreaming() throws Exception {
531527
exchange.close();
532528
return;
533529
}
534-
} else if ("PUT".equals(requestComponents.method())
535-
&& requestComponents.query().contains("uploadId=TEST")
536-
&& requestComponents.query().contains("partNumber=")) {
537-
// upload part request
538-
MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody());
539-
BytesReference bytes = Streams.readFully(md5);
540-
541-
if (counterUploads.incrementAndGet() % 2 == 0) {
542-
bytesReceived.addAndGet(bytes.length());
543-
exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest()));
544-
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
545-
exchange.close();
546-
return;
547-
}
530+
} else if (s3Request.isUploadPartRequest()) {
531+
// upload part request
532+
MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody());
533+
BytesReference bytes = Streams.readFully(md5);
534+
535+
if (counterUploads.incrementAndGet() % 2 == 0) {
536+
bytesReceived.addAndGet(bytes.length());
537+
exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest()));
538+
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
539+
exchange.close();
540+
return;
541+
}
548542

549-
} else if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploadId=TEST")) {
550-
// complete multipart upload request
551-
if (countDownComplete.countDown()) {
552-
Streams.readFully(exchange.getRequestBody());
553-
byte[] response = ("""
554-
<?xml version="1.0" encoding="UTF-8"?>
555-
<CompleteMultipartUploadResult>
556-
<Bucket>bucket</Bucket>
557-
<Key>write_large_blob_streaming</Key>
558-
</CompleteMultipartUploadResult>""").getBytes(StandardCharsets.UTF_8);
559-
exchange.getResponseHeaders().add("Content-Type", "application/xml");
560-
exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
561-
exchange.getResponseBody().write(response);
562-
exchange.close();
563-
return;
564-
}
543+
} else if (s3Request.isCompleteMultipartUploadRequest()) {
544+
// complete multipart upload request
545+
if (countDownComplete.countDown()) {
546+
Streams.readFully(exchange.getRequestBody());
547+
byte[] response = ("""
548+
<?xml version="1.0" encoding="UTF-8"?>
549+
<CompleteMultipartUploadResult>
550+
<Bucket>bucket</Bucket>
551+
<Key>write_large_blob_streaming</Key>
552+
</CompleteMultipartUploadResult>""").getBytes(StandardCharsets.UTF_8);
553+
exchange.getResponseHeaders().add("Content-Type", "application/xml");
554+
exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
555+
exchange.getResponseBody().write(response);
556+
exchange.close();
557+
return;
565558
}
559+
}
566560

567561
// sends an error back or let the request time out
568562
if (useTimeout == false) {

0 commit comments

Comments
 (0)