From 2fb738dad3ac3c0dfd4a26c2d6b5ed9187093331 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Fri, 28 Mar 2025 00:29:39 -0700 Subject: [PATCH 01/11] refactor gcp-fixture multipart parser --- .../gcs/GoogleCloudStorageHttpHandler.java | 28 +-- .../java/fixture/gcs/MultipartUpload.java | 177 ++++++++++++++++++ 2 files changed, 185 insertions(+), 20 deletions(-) create mode 100644 test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java index 0c6aae605c8a7..0c35081dca703 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java @@ -186,26 +186,14 @@ public void handle(final HttpExchange exchange) throws IOException { exchange.getResponseBody().write(response); } else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=multipart*", request)) { - // Multipart upload - Optional> content = parseMultipartRequestBody(requestBody.streamInput()); - if (content.isPresent()) { - final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH); - final MockGcsBlobStore.BlobVersion newBlobVersion = mockGcsBlobStore.updateBlob( - content.get().v1(), - ifGenerationMatch, - content.get().v2() - ); - writeBlobVersionAsJson(exchange, newBlobVersion); - } else { - throw new AssertionError( - "Could not read multi-part request to [" - + request - + "] with headers [" - + new HashMap<>(exchange.getRequestHeaders()) - + "]" - ); - } - + final var multipartUpload = MultipartUpload.parseBody(exchange, requestBody.streamInput()); + final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH); + final MockGcsBlobStore.BlobVersion newBlobVersion = mockGcsBlobStore.updateBlob( + multipartUpload.name(), + ifGenerationMatch, + multipartUpload.content() + ); + writeBlobVersionAsJson(exchange, newBlobVersion); } else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=resumable*", request)) { // Resumable upload initialization https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload final Map params = new HashMap<>(); diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java new file mode 100644 index 0000000000000..bfab6c26328fe --- /dev/null +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java @@ -0,0 +1,177 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package fixture.gcs; + +import com.sun.net.httpserver.HttpExchange; + +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.CompositeBytesReference; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.regex.Pattern; +import java.util.zip.GZIPInputStream; + +record MultipartUpload(String bucket, String name, String generation, String crc32, String md5, BytesReference content) { + + static final byte[] CRLFCRLF = new byte[] { '\r', '\n', '\r', '\n' }; + static final Pattern METADATA_PATTERN = Pattern.compile("\"(bucket|name|generation|crc32c|md5Hash)\":\"([^\"]*)\""); + static final Pattern BOUNDARY_HEADER_PATTERN = Pattern.compile("multipart/\\w+; boundary=\\\"?(.*)\\\"?"); + + /** + * Reads HTTP content of MultipartUpload. First part is always json metadata, followed by binary parts. + * Every part has own headers and content. Parts are separated by dash-boundary(--boundary) delimiter, + * and boundary is defined in the HTTP header Content-Type, + * like this {@code multipart/related; boundary=__END_OF_PART__4914cd49-4065-44f6-9846-ce805fe1e77f__}. + * Last part, close-delimiter, is dashed from both sides {@code --boundary--}. + * Part headers are separated from the body by double CRLF. + * + * More details here rfc2046. + * + *
+     *     {@code
+     * --boundary CRLF
+     * headers CRLF
+     * CRLF
+     * content CRLF
+     * --boundary CRLF
+     * headers CRLF
+     * CRLF
+     * content CRLF
+     * --boundary--
+     *     }
+     * 
+ */ + static MultipartUpload parseBody(HttpExchange exchange, InputStream gzipInput) throws IOException { + var boundary = getBoundaryHeader(exchange); + try (var input = new GZIPInputStream(gzipInput)) { + var dashBoundary = ("--" + boundary).getBytes(); + var bodyPartDelimiter = ("\r\n--" + boundary).getBytes(); + + // https://www.rfc-editor.org/rfc/rfc2046.html#page-22 + // multipart-body := [preamble CRLF] + // dash-boundary transport-padding CRLF + // body-part *encapsulation + // close-delimiter transport-padding + // [CRLF epilogue] + // + // there is no transport-padding and epilogue in our case + skipDashBoundary(input, dashBoundary); + + // read first body-part - blob metadata json + readByDelimiter(input, CRLFCRLF); // ignore body-part headers + var metadataBytes = readByDelimiter(input, bodyPartDelimiter); + var match = METADATA_PATTERN.matcher(metadataBytes.utf8ToString()); + String bucket = "", name = "", gen = "", crc = "", md5 = ""; + while (match.find()) { + switch (match.group(1)) { + case "bucket" -> bucket = match.group(2); + case "name" -> name = match.group(2); + case "generation" -> gen = match.group(2); + case "crc32c" -> crc = match.group(2); + case "md5Hash" -> md5 = match.group(2); + } + } + var blobParts = new ArrayList(); + + // read content from remaining parts, can be 0..n + while (readCloseDelimiterOrCRLF(input) == false) { + readByDelimiter(input, CRLFCRLF); // ignore headers + var content = readByDelimiter(input, bodyPartDelimiter); + blobParts.add(content); + } + + var compositeBuf = CompositeBytesReference.of(blobParts.toArray(new BytesReference[0])); + return new MultipartUpload(bucket, name, gen, crc, md5, compositeBuf); + } + } + + static String getBoundaryHeader(HttpExchange exchange) { + var m = BOUNDARY_HEADER_PATTERN.matcher(exchange.getRequestHeaders().getFirst("Content-Type")); + if (m.matches() == false) { + throw new IllegalArgumentException("boundary header is not present"); + } + return m.group(1); + } + + /** + * read and discard very first delimiter + */ + static void skipDashBoundary(InputStream is, byte[] dashBoundary) throws IOException { + var b = is.readNBytes(dashBoundary.length); + if (Arrays.equals(b, dashBoundary) == false) { + throw new IllegalStateException( + "cannot read dash-boundary, expect=" + Arrays.toString(dashBoundary) + " got=" + Arrays.toString(b) + ); + } + skipCRLF(is); + } + + static void skipCRLF(InputStream is) throws IOException { + var cr = is.read(); + var lf = is.read(); + if (cr != '\r' && lf != '\n') { + throw new IllegalStateException("cannot read CRLF got " + cr + " " + lf); + } + } + + /** + * Must call after reading body-part-delimiter to see if there are more parts. + * If there are no parts, a closing double dash is expected, otherwise CRLF. + */ + static boolean readCloseDelimiterOrCRLF(InputStream is) throws IOException { + var d1 = is.read(); + var d2 = is.read(); + if (d1 == '-' && d2 == '-') { + return true; + } else if (d1 == '\r' && d2 == '\n') { + return false; + } else { + throw new IllegalStateException("expect '--' or CRLF, got " + d1 + " " + d2); + } + } + + /** + * Read bytes from stream into buffer until reach given delimiter. The delimiter is consumed too. + */ + static BytesReference readByDelimiter(InputStream is, byte[] delimiter) throws IOException { + var out = new ByteArrayOutputStream(1024); + var delimiterCheckRing = new byte[delimiter.length]; // a ring-buffer that tracks last N bytes + var ringOffset = 0; + while (true) { + var c = is.read(); + if (c == -1) { + throw new IllegalStateException("expected delimiter, but reached end of stream "); + } + var b = (byte) c; + delimiterCheckRing[ringOffset] = b; + out.write(b); + ringOffset = (ringOffset + 1) % delimiter.length; + if (c == delimiter[delimiter.length - 1]) { // try to compare ring buffer with delimiter when last char matches + var isMatch = true; + for (int i = 0; i < delimiter.length; i++) { + var ri = (i + ringOffset) % delimiter.length; + if (delimiterCheckRing[ri] != delimiter[i]) { + isMatch = false; + break; + } + } + if (isMatch) { + var bytes = out.toByteArray(); + return new BytesArray(bytes, 0, bytes.length - delimiter.length); + } + } + } + } +} From 28d4481930dc1d4a436804ca86035b52c396a031 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Fri, 28 Mar 2025 00:57:44 -0700 Subject: [PATCH 02/11] remove ring-buffer --- .../java/fixture/gcs/MultipartUpload.java | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java index bfab6c26328fe..85eaccf5ce3ef 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java @@ -147,30 +147,26 @@ static boolean readCloseDelimiterOrCRLF(InputStream is) throws IOException { */ static BytesReference readByDelimiter(InputStream is, byte[] delimiter) throws IOException { var out = new ByteArrayOutputStream(1024); - var delimiterCheckRing = new byte[delimiter.length]; // a ring-buffer that tracks last N bytes - var ringOffset = 0; + var delimiterMatchLen = 0; while (true) { var c = is.read(); if (c == -1) { throw new IllegalStateException("expected delimiter, but reached end of stream "); } var b = (byte) c; - delimiterCheckRing[ringOffset] = b; out.write(b); - ringOffset = (ringOffset + 1) % delimiter.length; - if (c == delimiter[delimiter.length - 1]) { // try to compare ring buffer with delimiter when last char matches - var isMatch = true; - for (int i = 0; i < delimiter.length; i++) { - var ri = (i + ringOffset) % delimiter.length; - if (delimiterCheckRing[ri] != delimiter[i]) { - isMatch = false; - break; - } - } - if (isMatch) { + if (delimiter[delimiterMatchLen] == c) { + delimiterMatchLen++; + if (delimiterMatchLen >= delimiter.length) { var bytes = out.toByteArray(); return new BytesArray(bytes, 0, bytes.length - delimiter.length); } + } else { + if (delimiter[0] == c) { + delimiterMatchLen = 1; + } else { + delimiterMatchLen = 0; + } } } } From 4ef1236233cb8de02cde120bc64b9475b89da42d Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Fri, 28 Mar 2025 00:58:52 -0700 Subject: [PATCH 03/11] naming Co-authored-by: David Turner --- .../gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java index 85eaccf5ce3ef..34e735a9026ea 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java @@ -145,7 +145,7 @@ static boolean readCloseDelimiterOrCRLF(InputStream is) throws IOException { /** * Read bytes from stream into buffer until reach given delimiter. The delimiter is consumed too. */ - static BytesReference readByDelimiter(InputStream is, byte[] delimiter) throws IOException { + static BytesReference readUntilDelimiter(InputStream is, byte[] delimiter) throws IOException { var out = new ByteArrayOutputStream(1024); var delimiterMatchLen = 0; while (true) { From 0dfddd641de92a1ac73dc4a23cec7544aa744ab2 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Sat, 29 Mar 2025 17:08:14 -0700 Subject: [PATCH 04/11] add tests --- ...CloudStorageBlobContainerRetriesTests.java | 24 +++--- .../gcs/GoogleCloudStorageHttpHandler.java | 85 ------------------- .../java/fixture/gcs/MultipartUpload.java | 68 +++++++++------ .../GoogleCloudStorageHttpHandlerTests.java | 8 +- .../fixture/gcs/MultipartUploadTests.java | 59 +++++++++++++ 5 files changed, 119 insertions(+), 125 deletions(-) create mode 100644 test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartUploadTests.java diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index 0d8933b44b7ab..40aabf0d5ccdf 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -10,6 +10,7 @@ import fixture.gcs.FakeOAuth2HttpHandler; import fixture.gcs.GoogleCloudStorageHttpHandler; +import fixture.gcs.MultipartUpload; import com.google.api.client.http.HttpExecuteInterceptor; import com.google.api.client.http.HttpRequestInitializer; @@ -42,7 +43,6 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.core.Tuple; import org.elasticsearch.http.ResponseInjectingHttpHandler; import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase; import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; @@ -60,8 +60,6 @@ import java.util.Iterator; import java.util.Locale; import java.util.Map; -import java.util.Objects; -import java.util.Optional; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -69,7 +67,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import static fixture.gcs.GoogleCloudStorageHttpHandler.parseMultipartRequestBody; import static fixture.gcs.TestUtils.createServiceAccount; import static java.nio.charset.StandardCharsets.UTF_8; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; @@ -79,7 +76,6 @@ import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING; -import static org.elasticsearch.test.hamcrest.OptionalMatchers.isPresent; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -261,17 +257,17 @@ public void testWriteBlobWithRetries() throws Exception { final CountDown countDown = new CountDown(maxRetries); final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null); - final byte[] bytes = randomBlobContent(); + final byte[] blobContent = randomBlobContent(); httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> { assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart")); if (countDown.countDown()) { - Optional> content = parseMultipartRequestBody(exchange.getRequestBody()); - assertThat(content, isPresent()); - assertThat(content.get().v1(), equalTo(blobContainer.path().buildAsString() + "write_blob_max_retries")); - if (Objects.deepEquals(bytes, BytesReference.toBytes(content.get().v2()))) { + MultipartUpload multipartUpload = MultipartUpload.parseBody(exchange, exchange.getRequestBody()); + assertTrue(multipartUpload.content().length() > 0); + assertEquals(multipartUpload.name(), blobContainer.path().buildAsString() + "write_blob_max_retries"); + if (multipartUpload.content().equals(new BytesArray(blobContent))) { byte[] response = Strings.format(""" {"bucket":"bucket","name":"%s"} - """, content.get().v1()).getBytes(UTF_8); + """, multipartUpload.name()).getBytes(UTF_8); exchange.getResponseHeaders().add("Content-Type", "application/json"); exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); exchange.getResponseBody().write(response); @@ -284,7 +280,7 @@ public void testWriteBlobWithRetries() throws Exception { if (randomBoolean()) { org.elasticsearch.core.Streams.readFully( exchange.getRequestBody(), - new byte[randomIntBetween(1, Math.max(1, bytes.length - 1))] + new byte[randomIntBetween(1, Math.max(1, blobContent.length - 1))] ); } else { Streams.readFully(exchange.getRequestBody()); @@ -293,8 +289,8 @@ public void testWriteBlobWithRetries() throws Exception { } })); - try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) { - blobContainer.writeBlob(randomPurpose(), "write_blob_max_retries", stream, bytes.length, false); + try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", blobContent), blobContent.length)) { + blobContainer.writeBlob(randomPurpose(), "write_blob_max_retries", stream, blobContent.length, false); } assertThat(countDown.isCountedDown(), is(true)); } diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java index 0c35081dca703..02f41cff5af73 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java @@ -18,7 +18,6 @@ import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.core.SuppressForbidden; -import org.elasticsearch.core.Tuple; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestUtils; import org.elasticsearch.test.fixture.HttpHeaderParser; @@ -27,26 +26,17 @@ import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentType; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.net.URLDecoder; import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Collectors; -import java.util.zip.GZIPInputStream; import static fixture.gcs.MockGcsBlobStore.failAndThrow; import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.stream.Collectors.joining; -import static org.elasticsearch.core.Strings.format; /** * Minimal HTTP handler that acts as a Google Cloud Storage compliant server @@ -319,81 +309,6 @@ private static String httpServerUrl(final HttpExchange exchange) { return "http://" + exchange.getRequestHeaders().get("HOST").get(0); } - private static final Pattern NAME_PATTERN = Pattern.compile("\"name\":\"([^\"]*)\""); - - public static Optional> parseMultipartRequestBody(final InputStream requestBody) throws IOException { - Tuple content = null; - final BytesReference fullRequestBody; - try (InputStream in = new GZIPInputStream(requestBody)) { - fullRequestBody = Streams.readFully(in); - } - String name = null; - boolean skippedEmptyLine = false; - int startPos = 0; - int endPos = 0; - while (startPos < fullRequestBody.length()) { - do { - endPos = fullRequestBody.indexOf((byte) '\r', endPos + 1); - } while (endPos >= 0 && fullRequestBody.get(endPos + 1) != '\n'); - boolean markAndContinue = false; - final String bucketPrefix = "{\"bucket\":"; - if (startPos > 0) { - startPos += 2; - } - if (name == null || skippedEmptyLine == false) { - if ((skippedEmptyLine == false && endPos == startPos) - || (fullRequestBody.get(startPos) == '-' && fullRequestBody.get(startPos + 1) == '-')) { - markAndContinue = true; - } else { - final String start = fullRequestBody.slice(startPos, Math.min(endPos - startPos, bucketPrefix.length())).utf8ToString(); - if (start.toLowerCase(Locale.ROOT).startsWith("content")) { - markAndContinue = true; - } else if (start.startsWith(bucketPrefix)) { - markAndContinue = true; - final String line = fullRequestBody.slice( - startPos + bucketPrefix.length(), - endPos - startPos - bucketPrefix.length() - ).utf8ToString(); - Matcher matcher = NAME_PATTERN.matcher(line); - if (matcher.find()) { - name = matcher.group(1); - } - } - } - skippedEmptyLine = markAndContinue && endPos == startPos; - startPos = endPos; - } else { - while (isEndOfPart(fullRequestBody, endPos) == false) { - endPos = fullRequestBody.indexOf((byte) '\r', endPos + 1); - } - content = Tuple.tuple(name, fullRequestBody.slice(startPos, endPos - startPos)); - break; - } - } - if (content == null) { - final InputStream stream = fullRequestBody.streamInput(); - logger.warn( - () -> format( - "Failed to find multi-part upload in [%s]", - new BufferedReader(new InputStreamReader(stream)).lines().collect(joining("\n")) - ) - ); - } - return Optional.ofNullable(content); - } - - private static final byte[] END_OF_PARTS_MARKER = "\r\n--__END_OF_PART__".getBytes(UTF_8); - - private static boolean isEndOfPart(BytesReference fullRequestBody, int endPos) { - for (int i = 0; i < END_OF_PARTS_MARKER.length; i++) { - final byte b = END_OF_PARTS_MARKER[i]; - if (fullRequestBody.get(endPos + i) != b) { - return false; - } - } - return true; - } - private static String requireHeader(HttpExchange exchange, String headerName) { final String headerValue = exchange.getRequestHeaders().getFirst(headerName); if (headerValue != null) { diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java index 34e735a9026ea..d5fd3b392ac9e 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java @@ -23,9 +23,9 @@ import java.util.regex.Pattern; import java.util.zip.GZIPInputStream; -record MultipartUpload(String bucket, String name, String generation, String crc32, String md5, BytesReference content) { +public record MultipartUpload(String bucket, String name, String generation, String crc32, String md5, BytesReference content) { - static final byte[] CRLFCRLF = new byte[] { '\r', '\n', '\r', '\n' }; + static final byte[] BODY_PART_HEADERS_DELIMITER = new byte[] { '\r', '\n', '\r', '\n' }; static final Pattern METADATA_PATTERN = Pattern.compile("\"(bucket|name|generation|crc32c|md5Hash)\":\"([^\"]*)\""); static final Pattern BOUNDARY_HEADER_PATTERN = Pattern.compile("multipart/\\w+; boundary=\\\"?(.*)\\\"?"); @@ -35,7 +35,7 @@ record MultipartUpload(String bucket, String name, String generation, String crc * and boundary is defined in the HTTP header Content-Type, * like this {@code multipart/related; boundary=__END_OF_PART__4914cd49-4065-44f6-9846-ce805fe1e77f__}. * Last part, close-delimiter, is dashed from both sides {@code --boundary--}. - * Part headers are separated from the body by double CRLF. + * Part headers are separated from the content by double CRLF. * * More details here rfc2046. * @@ -53,25 +53,17 @@ record MultipartUpload(String bucket, String name, String generation, String crc * } * */ - static MultipartUpload parseBody(HttpExchange exchange, InputStream gzipInput) throws IOException { + public static MultipartUpload parseBody(HttpExchange exchange, InputStream gzipInput) throws IOException { var boundary = getBoundaryHeader(exchange); try (var input = new GZIPInputStream(gzipInput)) { var dashBoundary = ("--" + boundary).getBytes(); var bodyPartDelimiter = ("\r\n--" + boundary).getBytes(); - // https://www.rfc-editor.org/rfc/rfc2046.html#page-22 - // multipart-body := [preamble CRLF] - // dash-boundary transport-padding CRLF - // body-part *encapsulation - // close-delimiter transport-padding - // [CRLF epilogue] - // - // there is no transport-padding and epilogue in our case skipDashBoundary(input, dashBoundary); // read first body-part - blob metadata json - readByDelimiter(input, CRLFCRLF); // ignore body-part headers - var metadataBytes = readByDelimiter(input, bodyPartDelimiter); + skipUntilDelimiter(input, BODY_PART_HEADERS_DELIMITER); + var metadataBytes = readUntilDelimiter(input, bodyPartDelimiter); var match = METADATA_PATTERN.matcher(metadataBytes.utf8ToString()); String bucket = "", name = "", gen = "", crc = "", md5 = ""; while (match.find()) { @@ -87,8 +79,8 @@ static MultipartUpload parseBody(HttpExchange exchange, InputStream gzipInput) t // read content from remaining parts, can be 0..n while (readCloseDelimiterOrCRLF(input) == false) { - readByDelimiter(input, CRLFCRLF); // ignore headers - var content = readByDelimiter(input, bodyPartDelimiter); + skipUntilDelimiter(input, BODY_PART_HEADERS_DELIMITER); + var content = readUntilDelimiter(input, bodyPartDelimiter); blobParts.add(content); } @@ -100,7 +92,7 @@ static MultipartUpload parseBody(HttpExchange exchange, InputStream gzipInput) t static String getBoundaryHeader(HttpExchange exchange) { var m = BOUNDARY_HEADER_PATTERN.matcher(exchange.getRequestHeaders().getFirst("Content-Type")); if (m.matches() == false) { - throw new IllegalArgumentException("boundary header is not present"); + throw new AssertionError("boundary header is not present"); } return m.group(1); } @@ -111,9 +103,7 @@ static String getBoundaryHeader(HttpExchange exchange) { static void skipDashBoundary(InputStream is, byte[] dashBoundary) throws IOException { var b = is.readNBytes(dashBoundary.length); if (Arrays.equals(b, dashBoundary) == false) { - throw new IllegalStateException( - "cannot read dash-boundary, expect=" + Arrays.toString(dashBoundary) + " got=" + Arrays.toString(b) - ); + throw new AssertionError("cannot read dash-boundary, expect=" + Arrays.toString(dashBoundary) + " got=" + Arrays.toString(b)); } skipCRLF(is); } @@ -122,7 +112,7 @@ static void skipCRLF(InputStream is) throws IOException { var cr = is.read(); var lf = is.read(); if (cr != '\r' && lf != '\n') { - throw new IllegalStateException("cannot read CRLF got " + cr + " " + lf); + throw new AssertionError("cannot read CRLF got " + cr + " " + lf); } } @@ -138,7 +128,7 @@ static boolean readCloseDelimiterOrCRLF(InputStream is) throws IOException { } else if (d1 == '\r' && d2 == '\n') { return false; } else { - throw new IllegalStateException("expect '--' or CRLF, got " + d1 + " " + d2); + throw new AssertionError("expect '--' or CRLF, got " + d1 + " " + d2); } } @@ -146,23 +136,51 @@ static boolean readCloseDelimiterOrCRLF(InputStream is) throws IOException { * Read bytes from stream into buffer until reach given delimiter. The delimiter is consumed too. */ static BytesReference readUntilDelimiter(InputStream is, byte[] delimiter) throws IOException { + assert delimiter.length > 0; var out = new ByteArrayOutputStream(1024); var delimiterMatchLen = 0; while (true) { var c = is.read(); if (c == -1) { - throw new IllegalStateException("expected delimiter, but reached end of stream "); + throw new AssertionError("expected delimiter, but reached end of stream "); } var b = (byte) c; out.write(b); - if (delimiter[delimiterMatchLen] == c) { + if (delimiter[delimiterMatchLen] == b) { delimiterMatchLen++; if (delimiterMatchLen >= delimiter.length) { var bytes = out.toByteArray(); return new BytesArray(bytes, 0, bytes.length - delimiter.length); } } else { - if (delimiter[0] == c) { + if (delimiter[0] == b) { + delimiterMatchLen = 1; + } else { + delimiterMatchLen = 0; + } + } + } + } + + /** + * Discard bytes from stream until reach given delimiter. The delimiter is consumed too. + */ + static void skipUntilDelimiter(InputStream is, byte[] delimiter) throws IOException { + assert delimiter.length > 0; + var delimiterMatchLen = 0; + while (true) { + var c = is.read(); + if (c == -1) { + throw new AssertionError("expected delimiter, but reached end of stream "); + } + var b = (byte) c; + if (delimiter[delimiterMatchLen] == b) { + delimiterMatchLen++; + if (delimiterMatchLen >= delimiter.length) { + return; + } + } else { + if (delimiter[0] == b) { delimiterMatchLen = 1; } else { delimiterMatchLen = 0; diff --git a/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/GoogleCloudStorageHttpHandlerTests.java b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/GoogleCloudStorageHttpHandlerTests.java index 61d20295b1fd9..1dc3818519ece 100644 --- a/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/GoogleCloudStorageHttpHandlerTests.java +++ b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/GoogleCloudStorageHttpHandlerTests.java @@ -588,11 +588,15 @@ private static TestHttpResponse executeMultipartUpload( BytesReference bytes, Long ifGenerationMatch ) { + var headers = new Headers(); + // multipart upload is required to provide boundary header + headers.put("Content-Type", List.of("multipart/related; boundary=" + MULTIPART_BOUNDARY)); return handleRequest( handler, "POST", "/upload/storage/v1/b/" + bucket + "/" + generateQueryString("uploadType", "multipart", "ifGenerationMatch", ifGenerationMatch), - createGzipCompressedMultipartUploadBody(bucket, blobName, bytes) + createGzipCompressedMultipartUploadBody(bucket, blobName, bytes), + headers ); } @@ -758,6 +762,8 @@ private static Headers rangeHeader(long start, long end) { return Headers.of("Range", Strings.format("bytes=%d-%d", start, end)); } + private static final String MULTIPART_BOUNDARY = "__END_OF_PART__a607a67c-6df7-4b87-b8a1-81f639a75a97__"; + private static BytesReference createGzipCompressedMultipartUploadBody(String bucketName, String path, BytesReference content) { final String metadataString = Strings.format("{\"bucket\":\"%s\", \"name\":\"%s\"}", bucketName, path); final BytesReference header = new BytesArray(Strings.format(""" diff --git a/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartUploadTests.java b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartUploadTests.java new file mode 100644 index 0000000000000..77f0dc779242b --- /dev/null +++ b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartUploadTests.java @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package fixture.gcs; + +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.CompositeBytesReference; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class MultipartUploadTests extends ESTestCase { + + public void testReadUntilDelimiter() throws IOException { + for (int run = 0; run < 100; run++) { + var delimitedContent = DelimitedContent.randomContent(); + var inputStream = delimitedContent.toBytesReference().streamInput(); + var readBytes = MultipartUpload.readUntilDelimiter(inputStream, delimitedContent.delimiter); + assertEquals(new BytesArray(delimitedContent.before), readBytes); + var readRemaining = inputStream.readAllBytes(); + assertArrayEquals(delimitedContent.after, readRemaining); + } + } + + public void testSkipUntilDelimiter() throws IOException { + for (int run = 0; run < 100; run++) { + var delimitedContent = DelimitedContent.randomContent(); + var inputStream = delimitedContent.toBytesReference().streamInput(); + MultipartUpload.skipUntilDelimiter(inputStream, delimitedContent.delimiter); + var readRemaining = inputStream.readAllBytes(); + assertArrayEquals(delimitedContent.after, readRemaining); + } + } + + record DelimitedContent(byte[] before, byte[] delimiter, byte[] after) { + + static DelimitedContent randomContent() { + var before = randomAlphanumericOfLength(between(0, 1024 * 1024)).getBytes(UTF_8); + var delimiter = randomByteArrayOfLength(between(1, 70)); + delimiter[0] = '\r'; // make it distinguishable from the initial bytes + var after = randomAlphanumericOfLength(between(0, 1024 * 1024)).getBytes(UTF_8); + return new DelimitedContent(before, delimiter, after); + } + + BytesReference toBytesReference() { + return CompositeBytesReference.of(new BytesArray(before), new BytesArray(delimiter), new BytesArray(after)); + } + } + +} From 3381864b66d6a52189686aabb45791764b83fe58 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Thu, 10 Apr 2025 14:45:16 -0700 Subject: [PATCH 05/11] generic multipart parser --- .../gcs/GoogleCloudStorageHttpHandler.java | 20 +-- .../java/fixture/gcs/MultipartUpload.java | 119 ++++++++++-------- .../fixture/gcs/MultipartUploadTests.java | 45 +++++++ 3 files changed, 121 insertions(+), 63 deletions(-) diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java index 1f2432fd44491..b83d3381b6f96 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java @@ -173,14 +173,18 @@ public void handle(final HttpExchange exchange) throws IOException { exchange.getResponseBody().write(response); } else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=multipart*", request)) { - final var multipartUpload = MultipartUpload.parseBody(exchange, requestBody.streamInput()); - final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH); - final MockGcsBlobStore.BlobVersion newBlobVersion = mockGcsBlobStore.updateBlob( - multipartUpload.name(), - ifGenerationMatch, - multipartUpload.content() - ); - writeBlobVersionAsJson(exchange, newBlobVersion); + try { + final var multipartUpload = MultipartUpload.parseBody(exchange, requestBody.streamInput()); + final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH); + final MockGcsBlobStore.BlobVersion newBlobVersion = mockGcsBlobStore.updateBlob( + multipartUpload.name(), + ifGenerationMatch, + multipartUpload.content() + ); + writeBlobVersionAsJson(exchange, newBlobVersion); + } catch (IllegalArgumentException e) { + throw new AssertionError(e); + } } else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=resumable*", request)) { // Resumable upload initialization https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload final Map params = new HashMap<>(); diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java index d5fd3b392ac9e..216db0c880cdb 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java @@ -19,7 +19,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Iterator; import java.util.regex.Pattern; import java.util.zip.GZIPInputStream; @@ -36,7 +36,6 @@ public record MultipartUpload(String bucket, String name, String generation, Str * like this {@code multipart/related; boundary=__END_OF_PART__4914cd49-4065-44f6-9846-ce805fe1e77f__}. * Last part, close-delimiter, is dashed from both sides {@code --boundary--}. * Part headers are separated from the content by double CRLF. - * * More details here rfc2046. * *
@@ -54,66 +53,42 @@ public record MultipartUpload(String bucket, String name, String generation, Str
      * 
*/ public static MultipartUpload parseBody(HttpExchange exchange, InputStream gzipInput) throws IOException { - var boundary = getBoundaryHeader(exchange); - try (var input = new GZIPInputStream(gzipInput)) { - var dashBoundary = ("--" + boundary).getBytes(); - var bodyPartDelimiter = ("\r\n--" + boundary).getBytes(); - - skipDashBoundary(input, dashBoundary); - - // read first body-part - blob metadata json - skipUntilDelimiter(input, BODY_PART_HEADERS_DELIMITER); - var metadataBytes = readUntilDelimiter(input, bodyPartDelimiter); - var match = METADATA_PATTERN.matcher(metadataBytes.utf8ToString()); - String bucket = "", name = "", gen = "", crc = "", md5 = ""; - while (match.find()) { - switch (match.group(1)) { - case "bucket" -> bucket = match.group(2); - case "name" -> name = match.group(2); - case "generation" -> gen = match.group(2); - case "crc32c" -> crc = match.group(2); - case "md5Hash" -> md5 = match.group(2); - } - } - var blobParts = new ArrayList(); - - // read content from remaining parts, can be 0..n - while (readCloseDelimiterOrCRLF(input) == false) { - skipUntilDelimiter(input, BODY_PART_HEADERS_DELIMITER); - var content = readUntilDelimiter(input, bodyPartDelimiter); - blobParts.add(content); - } - - var compositeBuf = CompositeBytesReference.of(blobParts.toArray(new BytesReference[0])); - return new MultipartUpload(bucket, name, gen, crc, md5, compositeBuf); - } - } - - static String getBoundaryHeader(HttpExchange exchange) { var m = BOUNDARY_HEADER_PATTERN.matcher(exchange.getRequestHeaders().getFirst("Content-Type")); if (m.matches() == false) { - throw new AssertionError("boundary header is not present"); + throw new IllegalStateException("boundary header is not present"); + } + var boundary = m.group(1); + try (var input = new GZIPInputStream(gzipInput)) { + return parseBody(boundary, input); } - return m.group(1); } - /** - * read and discard very first delimiter - */ - static void skipDashBoundary(InputStream is, byte[] dashBoundary) throws IOException { - var b = is.readNBytes(dashBoundary.length); - if (Arrays.equals(b, dashBoundary) == false) { - throw new AssertionError("cannot read dash-boundary, expect=" + Arrays.toString(dashBoundary) + " got=" + Arrays.toString(b)); + // for tests + static MultipartUpload parseBody(String boundary, InputStream input) throws IOException { + var reader = new MultipartContentReader(boundary, input); + + // read first body-part - blob metadata json + var metadataBytes = reader.next(); + var match = METADATA_PATTERN.matcher(metadataBytes.utf8ToString()); + String bucket = "", name = "", gen = "", crc = "", md5 = ""; + while (match.find()) { + switch (match.group(1)) { + case "bucket" -> bucket = match.group(2); + case "name" -> name = match.group(2); + case "generation" -> gen = match.group(2); + case "crc32c" -> crc = match.group(2); + case "md5Hash" -> md5 = match.group(2); + } } - skipCRLF(is); - } - static void skipCRLF(InputStream is) throws IOException { - var cr = is.read(); - var lf = is.read(); - if (cr != '\r' && lf != '\n') { - throw new AssertionError("cannot read CRLF got " + cr + " " + lf); + // read and combine remaining parts + var blobParts = new ArrayList(); + while (reader.hasNext()) { + blobParts.add(reader.next()); } + var compositeBuf = CompositeBytesReference.of(blobParts.toArray(new BytesReference[0])); + + return new MultipartUpload(bucket, name, gen, crc, md5, compositeBuf); } /** @@ -128,7 +103,7 @@ static boolean readCloseDelimiterOrCRLF(InputStream is) throws IOException { } else if (d1 == '\r' && d2 == '\n') { return false; } else { - throw new AssertionError("expect '--' or CRLF, got " + d1 + " " + d2); + throw new IllegalStateException("expect '--' or CRLF, got " + d1 + " " + d2); } } @@ -188,4 +163,38 @@ static void skipUntilDelimiter(InputStream is, byte[] delimiter) throws IOExcept } } } + + /** + * Multipart content iterator. + */ + static class MultipartContentReader implements Iterator { + private final InputStream input; + private final byte[] bodyPartDelimiter; + private boolean done; + + MultipartContentReader(String boundary, InputStream input) throws IOException { + this.input = input; + this.bodyPartDelimiter = ("\r\n--" + boundary).getBytes(); + byte[] dashBoundary = ("--" + boundary).getBytes(); + skipUntilDelimiter(input, dashBoundary); + readCloseDelimiterOrCRLF(input); + } + + @Override + public boolean hasNext() { + return done == false; + } + + @Override + public BytesReference next() { + try { + skipUntilDelimiter(input, BODY_PART_HEADERS_DELIMITER); + BytesReference buf = readUntilDelimiter(input, bodyPartDelimiter); + done = readCloseDelimiterOrCRLF(input); + return buf; + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + } } diff --git a/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartUploadTests.java b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartUploadTests.java index 77f0dc779242b..e8af1077820b1 100644 --- a/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartUploadTests.java +++ b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartUploadTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; +import org.elasticsearch.common.io.stream.ByteArrayStreamInput; import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -20,6 +21,50 @@ public class MultipartUploadTests extends ESTestCase { + // produces content that does not contain boundary + static String randomPartContent(int len, String boundary) { + assert len > 0 && boundary.isEmpty() == false; + var n = 100; + for (var i = 0; i < 100; i++) { + var content = randomAlphanumericOfLength(len); + if (content.contains(boundary) == false) { + return content; + } + } + throw new IllegalStateException("cannot generate part content for len=" + len + " boundary=" + boundary); + } + + public void testGenericMultipart() throws IOException { + var boundary = randomAlphanumericOfLength(between(1, 70)); + var part1 = "plain text\nwith line break"; + var part2 = ""; + var part3 = randomPartContent(between(1, 1024), boundary); + var strInput = """ + --$boundary\r + \r + \r + $part1\r + --$boundary\r + X-Header: x-man\r + \r + $part2\r + --$boundary\r + Content-Type: application/octet-stream\r + \r + $part3\r + --$boundary--""" + .replace("$boundary", boundary) + .replace("$part1", part1) + .replace("$part2", part2) + .replace("$part3", part3); + + var reader = new MultipartUpload.MultipartContentReader(boundary, new ByteArrayStreamInput(strInput.getBytes())); + assertEquals(part1, reader.next().utf8ToString()); + assertEquals(part2, reader.next().utf8ToString()); + assertEquals(part3, reader.next().utf8ToString()); + assertFalse(reader.hasNext()); + } + public void testReadUntilDelimiter() throws IOException { for (int run = 0; run < 100; run++) { var delimitedContent = DelimitedContent.randomContent(); From f3210e4c1dfcc0c1949e6997aa49a6a11b5d21a2 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 10 Apr 2025 21:53:07 +0000 Subject: [PATCH 06/11] [CI] Auto commit changes from spotless --- .../src/test/java/fixture/gcs/MultipartUploadTests.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartUploadTests.java b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartUploadTests.java index e8af1077820b1..726044973cf5a 100644 --- a/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartUploadTests.java +++ b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartUploadTests.java @@ -52,11 +52,7 @@ public void testGenericMultipart() throws IOException { Content-Type: application/octet-stream\r \r $part3\r - --$boundary--""" - .replace("$boundary", boundary) - .replace("$part1", part1) - .replace("$part2", part2) - .replace("$part3", part3); + --$boundary--""".replace("$boundary", boundary).replace("$part1", part1).replace("$part2", part2).replace("$part3", part3); var reader = new MultipartUpload.MultipartContentReader(boundary, new ByteArrayStreamInput(strInput.getBytes())); assertEquals(part1, reader.next().utf8ToString()); From 9714b329fac5487444d7c947cce72e6c5c83935f Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Thu, 10 Apr 2025 15:04:34 -0700 Subject: [PATCH 07/11] exceptions --- .../src/main/java/fixture/gcs/MultipartUpload.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java index 216db0c880cdb..9dba250a207b5 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java @@ -117,7 +117,7 @@ static BytesReference readUntilDelimiter(InputStream is, byte[] delimiter) throw while (true) { var c = is.read(); if (c == -1) { - throw new AssertionError("expected delimiter, but reached end of stream "); + throw new IllegalStateException("expected delimiter, but reached end of stream "); } var b = (byte) c; out.write(b); @@ -146,7 +146,7 @@ static void skipUntilDelimiter(InputStream is, byte[] delimiter) throws IOExcept while (true) { var c = is.read(); if (c == -1) { - throw new AssertionError("expected delimiter, but reached end of stream "); + throw new IllegalStateException("expected delimiter, but reached end of stream "); } var b = (byte) c; if (delimiter[delimiterMatchLen] == b) { @@ -193,7 +193,7 @@ public BytesReference next() { done = readCloseDelimiterOrCRLF(input); return buf; } catch (IOException e) { - throw new IllegalStateException(e); + throw new RuntimeException(e); } } } From b4869f56706c028e0158eac0a656aa07e3ad5bcc Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Thu, 10 Apr 2025 19:24:14 -0700 Subject: [PATCH 08/11] test zero-sized blob --- .../gcs/GoogleCloudStorageBlobContainerRetriesTests.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index c4cc32ab10ab3..2d6590a17f1db 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -265,12 +265,11 @@ public void testWriteBlobWithRetries() throws Exception { final CountDown countDown = new CountDown(maxRetries); final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).build(); - final byte[] bytes = randomBlobContent(); + final byte[] bytes = randomBlobContent(0); httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> { assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart")); if (countDown.countDown()) { MultipartUpload multipartUpload = MultipartUpload.parseBody(exchange, exchange.getRequestBody()); - assertTrue(multipartUpload.content().length() > 0); assertEquals(multipartUpload.name(), blobContainer.path().buildAsString() + "write_blob_max_retries"); if (multipartUpload.content().equals(new BytesArray(bytes))) { byte[] response = Strings.format(""" From 47912f562e92415107b9b68c76d234a3f8da2da9 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Thu, 10 Apr 2025 19:45:41 -0700 Subject: [PATCH 09/11] sanitize content --- .../test/java/fixture/gcs/MultipartUploadTests.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartUploadTests.java b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartUploadTests.java index 726044973cf5a..6b9b8a201bfb7 100644 --- a/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartUploadTests.java +++ b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartUploadTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.test.ESTestCase; import java.io.IOException; +import java.util.Arrays; import static java.nio.charset.StandardCharsets.UTF_8; @@ -24,14 +25,10 @@ public class MultipartUploadTests extends ESTestCase { // produces content that does not contain boundary static String randomPartContent(int len, String boundary) { assert len > 0 && boundary.isEmpty() == false; - var n = 100; - for (var i = 0; i < 100; i++) { - var content = randomAlphanumericOfLength(len); - if (content.contains(boundary) == false) { - return content; - } - } - throw new IllegalStateException("cannot generate part content for len=" + len + " boundary=" + boundary); + var content = randomAlphanumericOfLength(len); + var replacement = boundary.getBytes(UTF_8); + replacement[0]++; // change single char to make it different from original + return content.replace(boundary, Arrays.toString(replacement)); } public void testGenericMultipart() throws IOException { From 905620d88923eef9ed3de2fb4df63ececdb27576 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Mon, 14 Apr 2025 15:18:07 -0700 Subject: [PATCH 10/11] random multipart boundary --- .../GoogleCloudStorageHttpHandlerTests.java | 56 ++++++++++++------- 1 file changed, 35 insertions(+), 21 deletions(-) diff --git a/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/GoogleCloudStorageHttpHandlerTests.java b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/GoogleCloudStorageHttpHandlerTests.java index d1f003b2097b3..21f9407360b87 100644 --- a/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/GoogleCloudStorageHttpHandlerTests.java +++ b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/GoogleCloudStorageHttpHandlerTests.java @@ -102,9 +102,10 @@ public void testSimpleObjectOperations() { assertEquals(new TestHttpResponse(RestStatus.OK, """ {"kind":"storage#objects","items":[],"prefixes":[]}"""), listBlobs(handler, bucket, "some/other/path", null)); + var boundary = newMultipartBoundary(); assertEquals( new TestHttpResponse(RestStatus.OK, """ - --__END_OF_PART__d8b50acb-87dc-4630-a3d3-17d187132ebc__ + --$boundary Content-Length: 168 Content-Type: application/http content-id: 1 @@ -115,13 +116,13 @@ public void testSimpleObjectOperations() { - --__END_OF_PART__d8b50acb-87dc-4630-a3d3-17d187132ebc__ - """.replaceAll("\n", "\r\n")), + --$boundary-- + """.replace("\n", "\r\n").replace("$boundary", boundary)), handleRequest( handler, "POST", "/batch/storage/v1", - createBatchDeleteRequest(bucket, blobName), + createBatchDeleteRequest(bucket, boundary, blobName), Headers.of("Content-Type", "mixed/multipart") ) ); @@ -131,7 +132,7 @@ public void testSimpleObjectOperations() { handler, "POST", "/batch/storage/v1", - createBatchDeleteRequest(bucket, blobName), + createBatchDeleteRequest(bucket, boundary, blobName), Headers.of("Content-Type", "mixed/multipart") ).restStatus() ); @@ -617,12 +618,13 @@ private static TestHttpResponse executeMultipartUpload( ) { var headers = new Headers(); // multipart upload is required to provide boundary header - headers.put("Content-Type", List.of("multipart/related; boundary=" + MULTIPART_BOUNDARY)); + var boundary = newMultipartBoundary(); + headers.put("Content-Type", List.of("multipart/related; boundary=" + boundary)); return handleRequest( handler, "POST", "/upload/storage/v1/b/" + bucket + "/" + generateQueryString("uploadType", "multipart", "ifGenerationMatch", ifGenerationMatch), - createGzipCompressedMultipartUploadBody(bucket, blobName, bytes), + createGzipCompressedMultipartUploadBody(bucket, blobName, bytes, boundary), headers ); } @@ -791,25 +793,37 @@ private static Headers rangeHeader(long start, long end) { private static final String MULTIPART_BOUNDARY = "__END_OF_PART__a607a67c-6df7-4b87-b8a1-81f639a75a97__"; - private static BytesReference createGzipCompressedMultipartUploadBody(String bucketName, String path, BytesReference content) { + private static String newMultipartBoundary() { + return "__END_OF_PART__" + randomUUID(); + } + + private static BytesReference createGzipCompressedMultipartUploadBody( + String bucketName, + String path, + BytesReference content, + String boundary + ) { final String metadataString = Strings.format("{\"bucket\":\"%s\", \"name\":\"%s\"}", bucketName, path); - final BytesReference header = new BytesArray(Strings.format(""" - --__END_OF_PART__a607a67c-6df7-4b87-b8a1-81f639a75a97__ - Content-Length: %d + final String headerStr = """ + --$boundary + Content-Length: $metadata-length Content-Type: application/json; charset=UTF-8 content-transfer-encoding: binary - %s - --__END_OF_PART__a607a67c-6df7-4b87-b8a1-81f639a75a97__ + $metadata + --$boundary Content-Type: application/octet-stream content-transfer-encoding: binary - """.replaceAll("\n", "\r\n"), metadataString.length(), metadataString).getBytes(StandardCharsets.UTF_8)); - + """.replace("\n", "\r\n") + .replace("$boundary", boundary) + .replace("$metadata-length", Integer.toString(metadataString.length())) + .replace("$metadata", metadataString); + final BytesReference header = new BytesArray(headerStr.getBytes(StandardCharsets.UTF_8)); final BytesReference footer = new BytesArray(""" - --__END_OF_PART__a607a67c-6df7-4b87-b8a1-81f639a75a97__-- - """.replaceAll("\n", "\r\n")); + --$boundary-- + """.replace("\n", "\r\n").replace("$boundary", boundary)); final ByteArrayOutputStream out = new ByteArrayOutputStream(); try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(out)) { gzipOutputStream.write(BytesReference.toBytes(CompositeBytesReference.of(header, content, footer))); @@ -819,7 +833,7 @@ private static BytesReference createGzipCompressedMultipartUploadBody(String buc return new BytesArray(out.toByteArray()); } - private static String createBatchDeleteRequest(String bucketName, String... paths) { + private static String createBatchDeleteRequest(String bucketName, String boundary, String... paths) { final String deleteRequestTemplate = """ DELETE %s/storage/v1/b/%s/o/%s HTTP/1.1 Authorization: Bearer foo @@ -828,14 +842,14 @@ private static String createBatchDeleteRequest(String bucketName, String... path """; final String partTemplate = """ - --__END_OF_PART__d8b50acb-87dc-4630-a3d3-17d187132ebc__ + --$boundary Content-Length: %d Content-Type: application/http content-id: %d content-transfer-encoding: binary %s - """; + """.replace("$boundary", boundary); StringBuilder builder = new StringBuilder(); AtomicInteger contentId = new AtomicInteger(); Arrays.stream(paths).forEach(p -> { @@ -843,7 +857,7 @@ private static String createBatchDeleteRequest(String bucketName, String... path final String part = Strings.format(partTemplate, deleteRequest.length(), contentId.incrementAndGet(), deleteRequest); builder.append(part); }); - builder.append("--__END_OF_PART__d8b50acb-87dc-4630-a3d3-17d187132ebc__"); + builder.append("--").append(boundary).append("--"); return builder.toString(); } From efda0e73f861f09d92576265dc97cb45cf7438fb Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Mon, 14 Apr 2025 15:19:14 -0700 Subject: [PATCH 11/11] cleanup --- .../java/fixture/gcs/GoogleCloudStorageHttpHandlerTests.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/GoogleCloudStorageHttpHandlerTests.java b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/GoogleCloudStorageHttpHandlerTests.java index 21f9407360b87..2185f495ea81f 100644 --- a/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/GoogleCloudStorageHttpHandlerTests.java +++ b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/GoogleCloudStorageHttpHandlerTests.java @@ -791,8 +791,6 @@ private static Headers rangeHeader(long start, long end) { return Headers.of("Range", Strings.format("bytes=%d-%d", start, end)); } - private static final String MULTIPART_BOUNDARY = "__END_OF_PART__a607a67c-6df7-4b87-b8a1-81f639a75a97__"; - private static String newMultipartBoundary() { return "__END_OF_PART__" + randomUUID(); }