From c6d036f05ac03282ee23987d1e846568190274a3 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 1 Apr 2025 11:05:03 +0100 Subject: [PATCH 1/2] Clean up request parsing in `S3HttpHandler` The `METHOD /path/components?and=query` string representation of a request is becoming increasingly difficult to parse, with slight variations in parsing between the implementation in `S3HttpHandler` and the various other implementations. This commit gets rid of the string-concatenate-and-split behaviour in favour of a proper object that has predicates for testing all the different kinds of request that might be made against S3. --- .../azure/AzureBlobStoreRepositoryTests.java | 4 +- ...eCloudStorageBlobStoreRepositoryTests.java | 4 +- .../s3/S3BlobStoreRepositoryTests.java | 47 ++- .../s3/S3BlobContainerRetriesTests.java | 144 ++++---- .../main/java/fixture/s3/S3HttpHandler.java | 309 +++++++++++------- ...ESMockAPIBasedRepositoryIntegTestCase.java | 13 +- 6 files changed, 291 insertions(+), 230 deletions(-) diff --git a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java index f0242b6c3ef38..878970eb8056c 100644 --- a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java +++ b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java @@ -13,7 +13,6 @@ import com.azure.storage.common.policy.RequestRetryOptions; import com.azure.storage.common.policy.RetryPolicyType; -import com.sun.net.httpserver.Headers; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; @@ -246,7 +245,8 @@ private AzureHTTPStatsCollectorHandler(HttpHandler delegate) { } @Override - protected void maybeTrack(String request, Headers headers) { + protected void maybeTrack(HttpExchange exchange) { + final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString(); if (GET_BLOB_PATTERN.test(request)) { trackRequest("GetBlob"); } else if (Regex.simpleMatch("HEAD /*/*/*", request)) { diff --git a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index 9ef5289b7accb..95eaf8978b468 100644 --- a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -346,7 +346,9 @@ private static class GoogleCloudStorageStatsCollectorHttpHandler extends HttpSta } @Override - public void maybeTrack(final String request, Headers requestHeaders) { + public void maybeTrack(HttpExchange exchange) { + final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString(); + final Headers requestHeaders = exchange.getRequestHeaders(); if (Regex.simpleMatch("GET */storage/v1/b/*/o/*", request)) { trackRequest(Operation.GET_OBJECT.key()); } else if (Regex.simpleMatch("GET /storage/v1/b/*/o*", request)) { diff --git a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index bb25b85f3763b..51064631cb84f 100644 --- a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -14,7 +14,6 @@ import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.amazonaws.services.s3.model.ListMultipartUploadsRequest; import com.amazonaws.services.s3.model.MultipartUpload; -import com.sun.net.httpserver.Headers; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; @@ -692,16 +691,17 @@ protected class S3StatsCollectorHttpHandler extends HttpStatsCollectorHandler { super(delegate); } + private S3HttpHandler.S3Request parseRequest(HttpExchange exchange) { + return new S3HttpHandler("bucket").parseRequest(exchange); + } + @Override public void handle(HttpExchange exchange) throws IOException { - final S3HttpHandler.RequestComponents requestComponents = S3HttpHandler.parseRequestComponents( - S3HttpHandler.getRawRequestString(exchange) - ); - if (false == requestComponents.request().startsWith("HEAD ")) { - assertThat(requestComponents.customQueryParameters(), hasKey(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE)); + final S3HttpHandler.S3Request s3Request = parseRequest(exchange); + if ("HEAD".equals(s3Request.method())) { + assertTrue(s3Request.hasQueryParamOnce(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE)); } - final String request = requestComponents.request(); - if (shouldFailCompleteMultipartUploadRequest.get() && Regex.simpleMatch("POST /*/*?uploadId=*", request)) { + if (shouldFailCompleteMultipartUploadRequest.get() && s3Request.isCompleteMultipartUploadRequest()) { try (exchange) { drainInputStream(exchange.getRequestBody()); exchange.sendResponseHeaders( @@ -715,20 +715,17 @@ public void handle(HttpExchange exchange) throws IOException { } @Override - public void maybeTrack(final String rawRequest, Headers requestHeaders) { - final S3HttpHandler.RequestComponents requestComponents = S3HttpHandler.parseRequestComponents(rawRequest); - final String request = requestComponents.request(); - final OperationPurpose purpose = OperationPurpose.parse( - requestComponents.customQueryParameters().get(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE).get(0) - ); - if (Regex.simpleMatch("GET /*/?prefix=*", request)) { + public void maybeTrack(HttpExchange exchange) { + final S3HttpHandler.S3Request request = parseRequest(exchange); + final OperationPurpose purpose = OperationPurpose.parse(request.getQueryParamOnce(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE)); + if (request.isListObjectsRequest()) { trackRequest("ListObjects"); metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.LIST_OBJECTS, purpose), k -> new AtomicLong()) .incrementAndGet(); - } else if (Regex.simpleMatch("GET /*/?uploads&*", request)) { + } else if (request.isListMultipartUploadsRequest()) { // TODO track ListMultipartUploads requests logger.info("--> ListMultipartUploads not tracked [{}] with parsed purpose [{}]", request, purpose.getKey()); - } else if (Regex.simpleMatch("GET /*/*", request)) { + } else if (request.isGetObjectRequest()) { trackRequest("GetObject"); metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.GET_OBJECT, purpose), k -> new AtomicLong()) .incrementAndGet(); @@ -738,21 +735,21 @@ public void maybeTrack(final String rawRequest, Headers requestHeaders) { new S3BlobStore.StatsKey(S3BlobStore.Operation.PUT_MULTIPART_OBJECT, purpose), k -> new AtomicLong() ).incrementAndGet(); - } else if (Regex.simpleMatch("PUT /*/*", request)) { + } else if (request.isPutObjectRequest()) { trackRequest("PutObject"); metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.PUT_OBJECT, purpose), k -> new AtomicLong()) .incrementAndGet(); - } else if (Regex.simpleMatch("POST /*/?delete", request)) { + } else if (request.isMultiObjectDeleteRequest()) { trackRequest("DeleteObjects"); metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.DELETE_OBJECTS, purpose), k -> new AtomicLong()) .incrementAndGet(); - } else if (Regex.simpleMatch("DELETE /*/*?uploadId=*", request)) { + } else if (request.isAbortMultipartUploadRequest()) { trackRequest("AbortMultipartObject"); metricsCount.computeIfAbsent( new S3BlobStore.StatsKey(S3BlobStore.Operation.ABORT_MULTIPART_OBJECT, purpose), k -> new AtomicLong() ).incrementAndGet(); - } else if (Regex.simpleMatch("HEAD /*/*", request)) { + } else if (request.isHeadObjectRequest()) { trackRequest("HeadObject"); metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.HEAD_OBJECT, purpose), k -> new AtomicLong()) .incrementAndGet(); @@ -765,10 +762,10 @@ Map getMetricsCount() { return metricsCount; } - private boolean isMultiPartUpload(String request) { - return Regex.simpleMatch("POST /*/*?uploads", request) - || Regex.simpleMatch("POST /*/*?*uploadId=*", request) - || Regex.simpleMatch("PUT /*/*?*uploadId=*", request); + private boolean isMultiPartUpload(S3HttpHandler.S3Request s3Request) { + return s3Request.isInitiateMultipartUploadRequest() + || s3Request.isUploadPartRequest() + || s3Request.isCompleteMultipartUploadRequest(); } } } diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 3ddb53988b3a6..a3f2ca10bfd06 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -257,6 +257,10 @@ protected long getRetryDelayInMillis() { }; } + private static S3HttpHandler.S3Request parseRequest(HttpExchange exchange) { + return new S3HttpHandler("bucket").parseRequest(exchange); + } + public void testWriteBlobWithRetries() throws Exception { final int maxRetries = randomInt(5); final CountDown countDown = new CountDown(maxRetries + 1); @@ -265,10 +269,8 @@ public void testWriteBlobWithRetries() throws Exception { final byte[] bytes = randomBlobContent(); httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_max_retries"), exchange -> { - final S3HttpHandler.RequestComponents requestComponents = S3HttpHandler.parseRequestComponents( - S3HttpHandler.getRawRequestString(exchange) - ); - if ("PUT".equals(requestComponents.method()) && requestComponents.query().isEmpty()) { + final S3HttpHandler.S3Request s3Request = parseRequest(exchange); + if (s3Request.isPutObjectRequest()) { if (countDown.countDown()) { final BytesReference body = Streams.readFully(exchange.getRequestBody()); if (Objects.deepEquals(bytes, BytesReference.toBytes(body))) { @@ -353,8 +355,8 @@ public void testWriteBlobWithExceptionThrownAtClosingTime() throws Exception { var uploadedBytes = new AtomicReference(); httpServer.createContext(downloadStorageEndpoint(blobContainer, blobName), exchange -> { - var requestComponents = S3HttpHandler.parseRequestComponents(S3HttpHandler.getRawRequestString(exchange)); - if ("PUT".equals(requestComponents.method()) && requestComponents.query().isEmpty()) { + var requestComponents = parseRequest(exchange); + if (requestComponents.isPutObjectRequest()) { var body = Streams.readFully(exchange.getRequestBody()); if (uploadedBytes.compareAndSet(null, body)) { exchange.sendResponseHeaders(HttpStatus.SC_OK, -1); @@ -408,12 +410,10 @@ public void testWriteLargeBlob() throws Exception { final CountDown countDownComplete = new CountDown(nbErrors); httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_large_blob"), exchange -> { - final S3HttpHandler.RequestComponents requestComponents = S3HttpHandler.parseRequestComponents( - S3HttpHandler.getRawRequestString(exchange) - ); + final S3HttpHandler.S3Request s3Request = parseRequest(exchange); final long contentLength = Long.parseLong(exchange.getRequestHeaders().getFirst("Content-Length")); - if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploads")) { + if (s3Request.isInitiateMultipartUploadRequest()) { // initiate multipart upload request if (countDownInitiate.countDown()) { byte[] response = (""" @@ -429,39 +429,37 @@ public void testWriteLargeBlob() throws Exception { exchange.close(); return; } - } else if ("PUT".equals(requestComponents.method()) - && requestComponents.query().contains("uploadId=TEST") - && requestComponents.query().contains("partNumber=")) { - // upload part request - MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody()); - BytesReference bytes = Streams.readFully(md5); - assertThat((long) bytes.length(), anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes()))); - assertThat(contentLength, anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes()))); - - if (countDownUploads.decrementAndGet() % 2 == 0) { - exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest())); - exchange.sendResponseHeaders(HttpStatus.SC_OK, -1); - exchange.close(); - return; - } + } else if (s3Request.isUploadPartRequest()) { + // upload part request + MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody()); + BytesReference bytes = Streams.readFully(md5); + assertThat((long) bytes.length(), anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes()))); + assertThat(contentLength, anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes()))); + + if (countDownUploads.decrementAndGet() % 2 == 0) { + exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest())); + exchange.sendResponseHeaders(HttpStatus.SC_OK, -1); + exchange.close(); + return; + } - } else if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploadId=TEST")) { - // complete multipart upload request - if (countDownComplete.countDown()) { - Streams.readFully(exchange.getRequestBody()); - byte[] response = (""" - - - bucket - write_large_blob - """).getBytes(StandardCharsets.UTF_8); - exchange.getResponseHeaders().add("Content-Type", "application/xml"); - exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length); - exchange.getResponseBody().write(response); - exchange.close(); - return; - } + } else if (s3Request.isCompleteMultipartUploadRequest()) { + // complete multipart upload request + if (countDownComplete.countDown()) { + Streams.readFully(exchange.getRequestBody()); + byte[] response = (""" + + + bucket + write_large_blob + """).getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "application/xml"); + exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length); + exchange.getResponseBody().write(response); + exchange.close(); + return; } + } // sends an error back or let the request time out if (useTimeout == false) { @@ -510,12 +508,10 @@ public void testWriteLargeBlobStreaming() throws Exception { final CountDown countDownComplete = new CountDown(nbErrors); httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_large_blob_streaming"), exchange -> { - final S3HttpHandler.RequestComponents requestComponents = S3HttpHandler.parseRequestComponents( - S3HttpHandler.getRawRequestString(exchange) - ); + final S3HttpHandler.S3Request s3Request = parseRequest(exchange); final long contentLength = Long.parseLong(exchange.getRequestHeaders().getFirst("Content-Length")); - if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploads")) { + if (s3Request.isInitiateMultipartUploadRequest()) { // initiate multipart upload request if (countDownInitiate.countDown()) { byte[] response = (""" @@ -531,38 +527,36 @@ public void testWriteLargeBlobStreaming() throws Exception { exchange.close(); return; } - } else if ("PUT".equals(requestComponents.method()) - && requestComponents.query().contains("uploadId=TEST") - && requestComponents.query().contains("partNumber=")) { - // upload part request - MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody()); - BytesReference bytes = Streams.readFully(md5); - - if (counterUploads.incrementAndGet() % 2 == 0) { - bytesReceived.addAndGet(bytes.length()); - exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest())); - exchange.sendResponseHeaders(HttpStatus.SC_OK, -1); - exchange.close(); - return; - } + } else if (s3Request.isUploadPartRequest()) { + // upload part request + MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody()); + BytesReference bytes = Streams.readFully(md5); + + if (counterUploads.incrementAndGet() % 2 == 0) { + bytesReceived.addAndGet(bytes.length()); + exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest())); + exchange.sendResponseHeaders(HttpStatus.SC_OK, -1); + exchange.close(); + return; + } - } else if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploadId=TEST")) { - // complete multipart upload request - if (countDownComplete.countDown()) { - Streams.readFully(exchange.getRequestBody()); - byte[] response = (""" - - - bucket - write_large_blob_streaming - """).getBytes(StandardCharsets.UTF_8); - exchange.getResponseHeaders().add("Content-Type", "application/xml"); - exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length); - exchange.getResponseBody().write(response); - exchange.close(); - return; - } + } else if (s3Request.isCompleteMultipartUploadRequest()) { + // complete multipart upload request + if (countDownComplete.countDown()) { + Streams.readFully(exchange.getRequestBody()); + byte[] response = (""" + + + bucket + write_large_blob_streaming + """).getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "application/xml"); + exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length); + exchange.getResponseBody().write(response); + exchange.close(); + return; } + } // sends an error back or let the request time out if (useTimeout == false) { diff --git a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java index 754069178f496..5c090358e8882 100644 --- a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java +++ b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java @@ -19,14 +19,12 @@ import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.hash.MessageDigests; import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.regex.Regex; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.Tuple; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.rest.RestUtils; import org.elasticsearch.test.fixture.HttpHeaderParser; import java.io.IOException; @@ -34,7 +32,6 @@ import java.io.PrintStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -42,6 +39,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -51,6 +49,8 @@ import javax.xml.parsers.DocumentBuilderFactory; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.w3c.dom.Node.ELEMENT_NODE; /** @@ -62,8 +62,8 @@ public class S3HttpHandler implements HttpHandler { private static final Logger logger = LogManager.getLogger(S3HttpHandler.class); private final String bucket; - private final String path; - private final String basePrefix; + private final String basePath; + private final String bucketAndBasePath; private final ConcurrentMap blobs = new ConcurrentHashMap<>(); private final ConcurrentMap uploads = new ConcurrentHashMap<>(); @@ -74,38 +74,35 @@ public S3HttpHandler(final String bucket) { public S3HttpHandler(final String bucket, @Nullable final String basePath) { this.bucket = Objects.requireNonNull(bucket); - this.basePrefix = Objects.requireNonNullElse(basePath, ""); - this.path = bucket + (basePath != null && basePath.isEmpty() == false ? "/" + basePath : ""); + this.basePath = Objects.requireNonNullElse(basePath, ""); + this.bucketAndBasePath = bucket + (Strings.hasText(basePath) ? "/" + basePath : ""); } + private static final Set NO_REQUEST_BODY_METHODS = Set.of("GET", "HEAD", "DELETE"); + @Override public void handle(final HttpExchange exchange) throws IOException { // Remove custom query parameters before processing the request. This simulates how S3 ignores them. // https://docs.aws.amazon.com/AmazonS3/latest/userguide/LogFormat.html#LogFormatCustom - final RequestComponents requestComponents = parseRequestComponents( - exchange.getRequestMethod() + " " + exchange.getRequestURI().toString() - ); - final String request = requestComponents.request(); - onCustomQueryParameters(requestComponents.customQueryParameters); + final S3Request request = parseRequest(exchange); - if (request.startsWith("GET") || request.startsWith("HEAD") || request.startsWith("DELETE")) { + if (NO_REQUEST_BODY_METHODS.contains(request.method())) { int read = exchange.getRequestBody().read(); assert read == -1 : "Request body should have been empty but saw [" + read + "]"; } - try { - if (Regex.simpleMatch("HEAD /" + path + "/*", request)) { - final BytesReference blob = blobs.get(requestComponents.path); + + try (exchange) { + if (request.isHeadObjectRequest()) { + final BytesReference blob = blobs.get(request.path()); if (blob == null) { exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); } else { exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); } - } else if (isListMultipartUploadsRequest(request)) { - assert request.contains("prefix=" + basePrefix) : basePrefix + " vs " + request; + } else if (request.isListMultipartUploadsRequest()) { - final Map params = new HashMap<>(); - RestUtils.decodeQueryString(request, request.indexOf('?') + 1, params); - final var prefix = params.get("prefix"); + final var prefix = request.getQueryParamOnce("prefix"); + assert Objects.requireNonNullElse(prefix, "").contains(basePath) : basePath + " vs " + request; final var uploadsList = new StringBuilder(); uploadsList.append(""); @@ -133,8 +130,8 @@ public void handle(final HttpExchange exchange) throws IOException { exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); exchange.getResponseBody().write(response); - } else if (Regex.simpleMatch("POST /" + path + "/*?uploads", request)) { - final var upload = new MultipartUpload(UUIDs.randomBase64UUID(), requestComponents.path.substring(bucket.length() + 2)); + } else if (request.isInitiateMultipartUploadRequest()) { + final var upload = new MultipartUpload(UUIDs.randomBase64UUID(), request.path().substring(bucket.length() + 2)); uploads.put(upload.getUploadId(), upload); final var uploadResult = new StringBuilder(); @@ -150,11 +147,8 @@ public void handle(final HttpExchange exchange) throws IOException { exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); exchange.getResponseBody().write(response); - } else if (isUploadPartRequest(request)) { - final Map params = new HashMap<>(); - RestUtils.decodeQueryString(request, request.indexOf('?') + 1, params); - - final var upload = uploads.get(params.get("uploadId")); + } else if (request.isUploadPartRequest()) { + final var upload = uploads.get(request.getQueryParamOnce("uploadId")); if (upload == null) { exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); } else { @@ -164,10 +158,8 @@ public void handle(final HttpExchange exchange) throws IOException { exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); } - } else if (Regex.simpleMatch("POST /" + path + "/*?uploadId=*", request)) { - final Map params = new HashMap<>(); - RestUtils.decodeQueryString(request, request.indexOf('?') + 1, params); - final var upload = uploads.remove(params.get("uploadId")); + } else if (request.isCompleteMultipartUploadRequest()) { + final var upload = uploads.remove(request.getQueryParamOnce("uploadId")); if (upload == null) { if (Randomness.get().nextBoolean()) { exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); @@ -186,7 +178,7 @@ public void handle(final HttpExchange exchange) throws IOException { } } else { final var blobContents = upload.complete(extractPartEtags(Streams.readFully(exchange.getRequestBody()))); - blobs.put(requestComponents.path, blobContents); + blobs.put(request.path(), blobContents); byte[] response = ("\n" + "\n" @@ -194,41 +186,37 @@ public void handle(final HttpExchange exchange) throws IOException { + bucket + "\n" + "" - + requestComponents.path + + request.path() + "\n" + "").getBytes(StandardCharsets.UTF_8); exchange.getResponseHeaders().add("Content-Type", "application/xml"); exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); exchange.getResponseBody().write(response); } - } else if (Regex.simpleMatch("DELETE /" + path + "/*?uploadId=*", request)) { - final Map params = new HashMap<>(); - RestUtils.decodeQueryString(request, request.indexOf('?') + 1, params); - final var upload = uploads.remove(params.get("uploadId")); + } else if (request.isAbortMultipartUploadRequest()) { + final var upload = uploads.remove(request.getQueryParamOnce("uploadId")); exchange.sendResponseHeaders((upload == null ? RestStatus.NOT_FOUND : RestStatus.NO_CONTENT).getStatus(), -1); - } else if (Regex.simpleMatch("PUT /" + path + "/*", request)) { + } else if (request.isPutObjectRequest()) { final Tuple blob = parseRequestBody(exchange); - blobs.put(requestComponents.uri(), blob.v2()); + blobs.put(request.path(), blob.v2()); exchange.getResponseHeaders().add("ETag", blob.v1()); exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); - } else if (isListObjectsRequest(request)) { - final Map params = new HashMap<>(); - RestUtils.decodeQueryString(request, request.indexOf('?') + 1, params); - if (params.get("list-type") != null) { + } else if (request.isListObjectsRequest()) { + if (request.queryParameters().containsKey("list-type")) { throw new AssertionError("Test must be adapted for GET Bucket (List Objects) Version 2"); } final StringBuilder list = new StringBuilder(); list.append(""); list.append(""); - final String prefix = params.get("prefix"); + final String prefix = request.getOptionalQueryParam("prefix").orElse(null); if (prefix != null) { list.append("").append(prefix).append(""); } final Set commonPrefixes = new HashSet<>(); - final String delimiter = params.get("delimiter"); + final String delimiter = request.getOptionalQueryParam("delimiter").orElse(null); if (delimiter != null) { list.append("").append(delimiter).append(""); } @@ -263,8 +251,8 @@ public void handle(final HttpExchange exchange) throws IOException { exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); exchange.getResponseBody().write(response); - } else if (Regex.simpleMatch("GET /" + path + "/*", request)) { - final BytesReference blob = blobs.get(requestComponents.uri()); + } else if (request.isGetObjectRequest()) { + final BytesReference blob = blobs.get(request.path()); if (blob == null) { exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); return; @@ -304,18 +292,18 @@ public void handle(final HttpExchange exchange) throws IOException { exchange.sendResponseHeaders(RestStatus.PARTIAL_CONTENT.getStatus(), responseBlob.length()); responseBlob.writeTo(exchange.getResponseBody()); - } else if (Regex.simpleMatch("DELETE /" + path + "/*", request)) { + } else if (request.isDeleteObjectRequest()) { int deletions = 0; for (Iterator> iterator = blobs.entrySet().iterator(); iterator.hasNext();) { Map.Entry blob = iterator.next(); - if (blob.getKey().startsWith(requestComponents.uri())) { + if (blob.getKey().startsWith(request.path())) { iterator.remove(); deletions++; } } exchange.sendResponseHeaders((deletions > 0 ? RestStatus.OK : RestStatus.NO_CONTENT).getStatus(), -1); - } else if (isMultiObjectDeleteRequest(request)) { + } else if (request.isMultiObjectDeleteRequest()) { final String requestBody = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), UTF_8)); final StringBuilder deletes = new StringBuilder(); @@ -343,72 +331,13 @@ public void handle(final HttpExchange exchange) throws IOException { } catch (Exception e) { logger.error("exception in request " + request, e); throw e; - } finally { - exchange.close(); } } - private boolean isUploadPartRequest(String request) { - return Regex.simpleMatch("PUT /" + path + "/*?uploadId=*&partNumber=*", request) - || Regex.simpleMatch("PUT /" + path + "/*?partNumber=*&uploadId=*", request); - } - - private boolean isListMultipartUploadsRequest(String request) { - return Regex.simpleMatch("GET /" + bucket + "/?uploads&prefix=*", request) - || Regex.simpleMatch("GET /" + bucket + "/?uploads&max-uploads=*&prefix=*", request) - || Regex.simpleMatch("GET /" + bucket + "?uploads&prefix=*", request) - || Regex.simpleMatch("GET /" + bucket + "?uploads&max-uploads=*&prefix=*", request); - } - - private boolean isListObjectsRequest(String request) { - return Regex.simpleMatch("GET /" + bucket + "/?prefix=*", request) - || Regex.simpleMatch("GET /" + bucket + "?list-type=2&*prefix=*", request); - } - - private boolean isMultiObjectDeleteRequest(String request) { - return request.equals("POST /" + bucket + "/?delete") || request.equals("POST /" + bucket + "?delete"); - } - public Map blobs() { return blobs; } - protected void onCustomQueryParameters(final Map> params) {} - - public static RequestComponents parseRequestComponents(final String request) { - final int spacePos = request.indexOf(' '); - final String method = request.substring(0, spacePos); - final String uriString = request.substring(spacePos + 1); - final int questsionMarkPos = uriString.indexOf('?'); - // AWS s3 allows the same custom query parameter to be specified multiple times - final Map> customQueryParameters = new HashMap<>(); - if (questsionMarkPos == -1) { - return new RequestComponents(method, uriString, "", customQueryParameters); - } else { - final String queryString = uriString.substring(questsionMarkPos + 1); - final ArrayList queryParameters = new ArrayList<>(); - Arrays.stream(Strings.tokenizeToStringArray(queryString, "&")).forEach(param -> { - if (param.startsWith("x-")) { - final int equalPos = param.indexOf('='); - customQueryParameters.computeIfAbsent(param.substring(0, equalPos), k -> new ArrayList<>()) - .add(param.substring(equalPos + 1)); - } else { - queryParameters.add(param); - } - }); - return new RequestComponents( - method, - uriString.substring(0, questsionMarkPos), - Strings.collectionToDelimitedString(queryParameters, "&"), - customQueryParameters - ); - } - } - - public static String getRawRequestString(final HttpExchange exchange) { - return exchange.getRequestMethod() + " " + exchange.getRequestURI(); - } - private static final Pattern chunkSignaturePattern = Pattern.compile("^([0-9a-z]+);chunk-signature=([^\\r\\n]*)$"); private static Tuple parseRequestBody(final HttpExchange exchange) throws IOException { @@ -543,18 +472,162 @@ MultipartUpload getUpload(String uploadId) { return uploads.get(uploadId); } - public record RequestComponents(String method, String path, String query, Map> customQueryParameters) { + public S3Request parseRequest(HttpExchange exchange) { + final String queryString = exchange.getRequestURI().getQuery(); + final Map> queryParameters; + if (Strings.hasText(queryString)) { + queryParameters = new HashMap<>(); + for (final String queryPart : queryString.split("&")) { + final String paramName, paramValue; + final int equalsPos = queryPart.indexOf('='); + if (equalsPos == -1) { + paramName = queryPart; + paramValue = null; + } else { + paramName = queryPart.substring(0, equalsPos); + paramValue = queryPart.substring(equalsPos + 1); + } + queryParameters.computeIfAbsent(paramName, ignored -> new ArrayList<>()).add(paramValue); + } + } else { + queryParameters = Map.of(); + } + + return new S3Request(exchange.getRequestMethod(), exchange.getRequestURI().getPath(), queryParameters); + } + + public class S3Request { + private final String method; + private final String path; + private final Map> queryParameters; - public String request() { - return method + " " + uri(); + public S3Request(String method, String path, Map> queryParameters) { + this.method = method; + this.path = path; + this.queryParameters = queryParameters; } - public String uri() { - if (query.isEmpty()) { - return path; - } else { - return path + "?" + query; + public String method() { + return method; + } + + public String path() { + return path; + } + + public Map> queryParameters() { + return queryParameters; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) return true; + if (obj == null || obj.getClass() != this.getClass()) return false; + var that = (S3Request) obj; + return Objects.equals(this.method, that.method) + && Objects.equals(this.path, that.path) + && Objects.equals(this.queryParameters, that.queryParameters); + } + + @Override + public int hashCode() { + return Objects.hash(method, path, queryParameters); + } + + @Override + public String toString() { + return Strings.format("RequestComponents[method=%s, path=%s, queryParameters=%s]", method, path, queryParameters); + } + + public boolean hasQueryParamOnce(String name) { + final var values = queryParameters.get(name); + return values != null && values.size() == 1; + } + + public String getQueryParamOnce(String name) { + final var values = queryParameters.get(name); + assertNotNull(name, values); + assertEquals(name + "=" + values, 1, values.size()); + return values.get(0); + } + + public Optional getOptionalQueryParam(String name) { + final var values = queryParameters.get(name); + if (values == null) { + return Optional.empty(); } + assertEquals(name + "=" + values, 1, values.size()); + return Optional.of(values.get(0)); + } + + private boolean isBucketRootPath() { + return path.equals("/" + bucket) || path.equals("/" + bucket + "/"); + } + + private boolean isUnderBucketRootAndBasePath() { + return path.startsWith("/" + bucketAndBasePath + "/"); + } + + public boolean isHeadObjectRequest() { + return "HEAD".equals(method) && path.startsWith("/" + S3HttpHandler.this.bucketAndBasePath + "/"); + } + + public boolean isListMultipartUploadsRequest() { + return "GET".equals(method) + && isBucketRootPath() + && hasQueryParamOnce("uploads") + && getQueryParamOnce("uploads") == null + && hasQueryParamOnce("prefix"); + } + + public boolean isInitiateMultipartUploadRequest() { + return "POST".equals(method) + && isUnderBucketRootAndBasePath() + && hasQueryParamOnce("uploads") + && getQueryParamOnce("uploads") == null; + } + + public boolean isUploadPartRequest() { + return "PUT".equals(method) + && isUnderBucketRootAndBasePath() + && hasQueryParamOnce("uploadId") + && getQueryParamOnce("uploadId") != null + && hasQueryParamOnce("partNumber"); + } + + public boolean isCompleteMultipartUploadRequest() { + return "POST".equals(method) + && isUnderBucketRootAndBasePath() + && hasQueryParamOnce("uploadId") + && getQueryParamOnce("uploadId") != null; + } + + public boolean isAbortMultipartUploadRequest() { + return "DELETE".equals(method) + && isUnderBucketRootAndBasePath() + && hasQueryParamOnce("uploadId") + && getQueryParamOnce("uploadId") != null; + } + + public boolean isPutObjectRequest() { + return "PUT".equals(method) && isUnderBucketRootAndBasePath() && queryParameters.containsKey("uploadId") == false; } + + public boolean isGetObjectRequest() { + return "GET".equals(method) && isUnderBucketRootAndBasePath(); + } + + public boolean isDeleteObjectRequest() { + return "DELETE".equals(method) && isUnderBucketRootAndBasePath(); + } + + public boolean isListObjectsRequest() { + return "GET".equals(method) && isBucketRootPath() && hasQueryParamOnce("prefix") && hasQueryParamOnce("uploads") == false; + } + + public boolean isMultiObjectDeleteRequest() { + return "POST".equals(method) && isBucketRootPath() && hasQueryParamOnce("delete") && getQueryParamOnce("delete") == null; + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java index 372f61698ed8a..d289533e840a8 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java @@ -8,7 +8,6 @@ */ package org.elasticsearch.repositories.blobstore; -import com.sun.net.httpserver.Headers; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpServer; @@ -347,7 +346,7 @@ public interface DelegatingHttpHandler extends HttpHandler { /** * HTTP handler that allows collect request stats per request type. * - * Implementors should keep track of the desired requests on {@link #maybeTrack(String, Headers)}. + * Implementors should keep track of the desired requests on {@link #maybeTrack(HttpExchange)}. */ @SuppressForbidden(reason = "this test uses a HttpServer to emulate a cloud-based storage service") public abstract static class HttpStatsCollectorHandler implements DelegatingHttpHandler { @@ -375,10 +374,7 @@ protected synchronized void trackRequest(final String requestType) { @Override public void handle(HttpExchange exchange) throws IOException { - final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString(); - - maybeTrack(request, exchange.getRequestHeaders()); - + maybeTrack(exchange); delegate.handle(exchange); } @@ -388,10 +384,9 @@ public void handle(HttpExchange exchange) throws IOException { * The request is represented as: * Request = Method SP Request-URI * - * @param request the request to be tracked if it matches the criteria - * @param requestHeaders the http request headers + * @param exchange the exchange to possibly track */ - protected abstract void maybeTrack(String request, Headers requestHeaders); + protected abstract void maybeTrack(HttpExchange exchange); } /** From dc6107655a8e10a9f3e040d5a855db49b1872600 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 1 Apr 2025 18:46:43 +0100 Subject: [PATCH 2/2] Review --- .../src/main/java/fixture/s3/S3HttpHandler.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java index 5c090358e8882..444c4627ffa8a 100644 --- a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java +++ b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java @@ -78,7 +78,10 @@ public S3HttpHandler(final String bucket, @Nullable final String basePath) { this.bucketAndBasePath = bucket + (Strings.hasText(basePath) ? "/" + basePath : ""); } - private static final Set NO_REQUEST_BODY_METHODS = Set.of("GET", "HEAD", "DELETE"); + /** + * Requests using these HTTP methods never have a request body (this is checked in the handler). + */ + private static final Set METHODS_HAVING_NO_REQUEST_BODY = Set.of("GET", "HEAD", "DELETE"); @Override public void handle(final HttpExchange exchange) throws IOException { @@ -86,7 +89,7 @@ public void handle(final HttpExchange exchange) throws IOException { // https://docs.aws.amazon.com/AmazonS3/latest/userguide/LogFormat.html#LogFormatCustom final S3Request request = parseRequest(exchange); - if (NO_REQUEST_BODY_METHODS.contains(request.method())) { + if (METHODS_HAVING_NO_REQUEST_BODY.contains(request.method())) { int read = exchange.getRequestBody().read(); assert read == -1 : "Request body should have been empty but saw [" + read + "]"; } @@ -569,7 +572,7 @@ private boolean isUnderBucketRootAndBasePath() { } public boolean isHeadObjectRequest() { - return "HEAD".equals(method) && path.startsWith("/" + S3HttpHandler.this.bucketAndBasePath + "/"); + return "HEAD".equals(method) && isUnderBucketRootAndBasePath(); } public boolean isListMultipartUploadsRequest() {